In [1]:
from dotenv import load_dotenv
import pandas as pd
# import multiextractor
from datetime import datetime
import numpy as np
import json
import io
import requests
from urllib.parse import urlparse
import os
import spacy
import polars as pl
from test_extract_rss import create_entry_from_rss
from google.cloud import bigquery

load_dotenv()

nlp = spacy.load('en_core_web_md')
key = os.environ['IQ_AIR_KEY']
project = os.environ['GCLOUD_PROJECT']
os.environ['GOOGLE_APPLICATION_CREDENTIALS'] = os.environ['GCLOUD_SERVICE_KEY_PATH']

In [2]:
metrics_dict = {
    'aqius': 'AQI_US_STANDARD',
    'aqicn': 'AQI_CHINA_STANDARD',
    'mainus': 'MAIN_POLLUTANT_US',
    'maincn': 'MAIN_POLLUTANT_CHINA',
    'tp': 'TEMPERATURE_CELSIUS',
    'tp_min': 'MIN_TEMPERATURE_CELSIUS',
    'pr': 'ATMOSPHERIC_PRESSURE_HPA',
    'hu': 'HUMIDITY_PERC',
    'ws': 'WIND_SPEED_METRES_PER_SEC',
    'wd': 'WIND_DIRECTION_ANGLE',
    'ic': 'WEATHER_ICON_CODE',
    'p2': 'PM2.5_UGM3',
    'p1': 'PM10_UGM3',
    'o3': 'OZONE_PPB',
    'n2': 'NITROGEN_DIOXIDE_PPB',
    's2': 'SULFUR_DIOXIDE_PPB',
    'co': 'CARBON_MONOXIDE_PPM'
}

main_pollutant_dict = {
    'conc': 'CONCENTRATION',
    'aqius': 'AQI_US_STANDARD',
    'aqicn': 'AQI_CHINA_STANDARD'
}

icon_code_dict = {
    '01d': 'CLEAR_SKY_DAY',
    '01n': 'CLEAR_SKY_NIGHT',
    '02d': 'FEW_CLOUDS_DAY',
    '02n': 'FEW_CLOUDS_NIGHT',
    '03d': 'SCATTERED_CLOUDS',
    '04d': 'BROKEN_CLOUDS',
    '09d': 'SHOWER_RAIN',
    '10d': 'RAIN_DAY',
    '10n': 'RAIN_NIGHT',
    '11d': 'THUNDERSTORM',
    '13d': 'SNOW',
    '50d': 'MIST'
}

In [3]:
url = 'http://api.airvisual.com/v2/countries?key={key}'

payload={}
files={}
headers = {}

response = requests.request("GET", url.format(key=key), headers=headers, data=payload, files=files)

print(response.text)


{"status":"success","data":[{"country":"Afghanistan"},{"country":"Algeria"},{"country":"Andorra"},{"country":"Angola"},{"country":"Argentina"},{"country":"Armenia"},{"country":"Australia"},{"country":"Austria"},{"country":"Azerbaijan"},{"country":"Bahamas"},{"country":"Bahrain"},{"country":"Bangladesh"},{"country":"Barbados"},{"country":"Belgium"},{"country":"Belize"},{"country":"Bermuda"},{"country":"Bolivia"},{"country":"Bosnia Herzegovina"},{"country":"Brazil"},{"country":"Bulgaria"},{"country":"Burkina Faso"},{"country":"Burundi"},{"country":"Cambodia"},{"country":"Cameroon"},{"country":"Canada"},{"country":"Cape Verde"},{"country":"Cayman Islands"},{"country":"Central African Republic"},{"country":"Chad"},{"country":"Chile"},{"country":"China"},{"country":"Colombia"},{"country":"Costa Rica"},{"country":"Croatia"},{"country":"Cyprus"},{"country":"Czech Republic"},{"country":"Democratic Republic of the Congo"},{"country":"Denmark"},{"country":"Djibouti"},{"country":"Ecuador"},{"coun

In [4]:
json.loads(response.text)

{'status': 'success',
 'data': [{'country': 'Afghanistan'},
  {'country': 'Algeria'},
  {'country': 'Andorra'},
  {'country': 'Angola'},
  {'country': 'Argentina'},
  {'country': 'Armenia'},
  {'country': 'Australia'},
  {'country': 'Austria'},
  {'country': 'Azerbaijan'},
  {'country': 'Bahamas'},
  {'country': 'Bahrain'},
  {'country': 'Bangladesh'},
  {'country': 'Barbados'},
  {'country': 'Belgium'},
  {'country': 'Belize'},
  {'country': 'Bermuda'},
  {'country': 'Bolivia'},
  {'country': 'Bosnia Herzegovina'},
  {'country': 'Brazil'},
  {'country': 'Bulgaria'},
  {'country': 'Burkina Faso'},
  {'country': 'Burundi'},
  {'country': 'Cambodia'},
  {'country': 'Cameroon'},
  {'country': 'Canada'},
  {'country': 'Cape Verde'},
  {'country': 'Cayman Islands'},
  {'country': 'Central African Republic'},
  {'country': 'Chad'},
  {'country': 'Chile'},
  {'country': 'China'},
  {'country': 'Colombia'},
  {'country': 'Costa Rica'},
  {'country': 'Croatia'},
  {'country': 'Cyprus'},
  {'cou

In [5]:
url2 = 'http://api.airvisual.com/v2/states?country={country}&key={key}'

payload={}
files={}
headers = {}
country = 'Hong Kong SAR'
country = 'Canada'

response2 = requests.request("GET", url2.format(country=country, key=key), headers=headers, data=payload, files=files)

print(response2.text)


{"status":"success","data":[{"state":"Alberta"},{"state":"British Columbia"},{"state":"Manitoba"},{"state":"New Brunswick"},{"state":"Newfoundland and Labrador"},{"state":"Northwest Territories"},{"state":"Nova Scotia"},{"state":"Nunavut"},{"state":"Ontario"},{"state":"Prince Edward Island"},{"state":"Quebec"},{"state":"Saskatchewan"},{"state":"Yukon"}]}


In [6]:
json.loads(response2.text)

{'status': 'success',
 'data': [{'state': 'Alberta'},
  {'state': 'British Columbia'},
  {'state': 'Manitoba'},
  {'state': 'New Brunswick'},
  {'state': 'Newfoundland and Labrador'},
  {'state': 'Northwest Territories'},
  {'state': 'Nova Scotia'},
  {'state': 'Nunavut'},
  {'state': 'Ontario'},
  {'state': 'Prince Edward Island'},
  {'state': 'Quebec'},
  {'state': 'Saskatchewan'},
  {'state': 'Yukon'}]}

In [7]:
url3 = 'http://api.airvisual.com/v2/cities?state={state}&country={country}&key={key}'

payload={}
files={}
headers = {}
country = 'Hong Kong SAR'
state = 'Hong Kong'
country = 'Canada'
state = 'British Columbia'

response3 = requests.request("GET", url3.format(state=state, country=country, key=key), headers=headers, data=payload, files=files)

print(response3.text)


{"status":"success","data":[{"city":"Abbotsford"},{"city":"Anmore"},{"city":"Ashcroft"},{"city":"Bowen Island"},{"city":"Burnaby"},{"city":"Burns Lake"},{"city":"Cache Creek"},{"city":"Campbell River"},{"city":"Capital Regional District"},{"city":"Castlegar"},{"city":"Chase"},{"city":"Chetwynd"},{"city":"Chilliwack"},{"city":"Clinton"},{"city":"Coldstream"},{"city":"Comox"},{"city":"Coquitlam"},{"city":"Courtenay"},{"city":"Cranbrook"},{"city":"Crawford Bay"},{"city":"Creston"},{"city":"Crofton"},{"city":"Duncan"},{"city":"Elk Falls Dogwood"},{"city":"Elkford"},{"city":"Enderby"},{"city":"Fernie"},{"city":"Fort Nelson"},{"city":"Fort St John"},{"city":"Fruitvale"},{"city":"Gibsons"},{"city":"Golden"},{"city":"Grand Forks"},{"city":"Hope"},{"city":"Houston"},{"city":"Invermere"},{"city":"Kamloops"},{"city":"Kelowna"},{"city":"Kimberley"},{"city":"Kitimat"},{"city":"Ladysmith"},{"city":"Lake Cowichan"},{"city":"Langley"},{"city":"Lions Bay"},{"city":"Logan Lake"},{"city":"Lytton"},{"city

In [8]:
json.loads(response3.text)

{'status': 'success',
 'data': [{'city': 'Abbotsford'},
  {'city': 'Anmore'},
  {'city': 'Ashcroft'},
  {'city': 'Bowen Island'},
  {'city': 'Burnaby'},
  {'city': 'Burns Lake'},
  {'city': 'Cache Creek'},
  {'city': 'Campbell River'},
  {'city': 'Capital Regional District'},
  {'city': 'Castlegar'},
  {'city': 'Chase'},
  {'city': 'Chetwynd'},
  {'city': 'Chilliwack'},
  {'city': 'Clinton'},
  {'city': 'Coldstream'},
  {'city': 'Comox'},
  {'city': 'Coquitlam'},
  {'city': 'Courtenay'},
  {'city': 'Cranbrook'},
  {'city': 'Crawford Bay'},
  {'city': 'Creston'},
  {'city': 'Crofton'},
  {'city': 'Duncan'},
  {'city': 'Elk Falls Dogwood'},
  {'city': 'Elkford'},
  {'city': 'Enderby'},
  {'city': 'Fernie'},
  {'city': 'Fort Nelson'},
  {'city': 'Fort St John'},
  {'city': 'Fruitvale'},
  {'city': 'Gibsons'},
  {'city': 'Golden'},
  {'city': 'Grand Forks'},
  {'city': 'Hope'},
  {'city': 'Houston'},
  {'city': 'Invermere'},
  {'city': 'Kamloops'},
  {'city': 'Kelowna'},
  {'city': 'Kimber

In [9]:
url4 = 'http://api.airvisual.com/v2/city?city={city}&state={state}&country={country}&key={key}'

payload={}
headers = {}
country = 'Hong Kong SAR'
state = 'Hong Kong'
city = 'Hong Kong'
country = 'Canada'
state = 'British Columbia'
city = 'Vancouver BC'

response4 = requests.request("GET", url4.format(city=city, state=state, country=country, key=key), headers=headers, data=payload)

print(response4.text)


{"status":"success","data":{"city":"Vancouver BC","state":"British Columbia","country":"Canada","location":{"type":"Point","coordinates":[-123.15222,49.18639]},"current":{"pollution":{"ts":"2023-11-29T05:00:00.000Z","aqius":57,"mainus":"p2","aqicn":22,"maincn":"p2"},"weather":{"ts":"2023-11-29T04:00:00.000Z","tp":1,"pr":1021,"hu":95,"ws":1.03,"wd":0,"ic":"02n"}}}}


In [10]:
res = json.loads(response4.text)
res

{'status': 'success',
 'data': {'city': 'Vancouver BC',
  'state': 'British Columbia',
  'country': 'Canada',
  'location': {'type': 'Point', 'coordinates': [-123.15222, 49.18639]},
  'current': {'pollution': {'ts': '2023-11-29T05:00:00.000Z',
    'aqius': 57,
    'mainus': 'p2',
    'aqicn': 22,
    'maincn': 'p2'},
   'weather': {'ts': '2023-11-29T04:00:00.000Z',
    'tp': 1,
    'pr': 1021,
    'hu': 95,
    'ws': 1.03,
    'wd': 0,
    'ic': '02n'}}}}

In [11]:
class WeatherBuilder:
    _icon_url_template = 'https://airvisual.com/images/{icon}.png'    
    
    def __init__(self, resp_data):
        self._s_data = self.__build_sys(resp_data)
        self.proc_data = self.__process_records(resp_data)
    
    def __build_record(self, record):
        return {
            'weather': self.__build_weather(record.get('current', None)),
            'pollution': self.__build_pollution(record.get('current', None)),
            'sys': self._s_data
        }

    @staticmethod
    def __build_coords(vals):
        coords = vals.get('coordinates', None)
        if coords is not None:
            return {'lat': coords[0], 'lon': coords[-1]}
        else:
            return {'lat': None, 'lon': None}
        
    @classmethod
    def __build_pollution(cls, vals):
        pollution = vals.get('pollution', None)
        if pollution is not None:
            return {
                'pollution_ts': cls.__build_date(pollution, 'ts'),
                'aqi_us_epa': pollution.get('aqius', None), 
                'aqi_china_mep': pollution.get('aqicn', None), 
                'main_pollutant_us': pollution.get('mainus', None),
                'main_pollutant_cn': pollution.get('maincn', None)
            }
        else:
            return {'lat': None, 'lon': None}

    @classmethod
    def __build_weather(cls, vals):
        weather = vals.get('weather', None)
        if weather is not None:
            return {
                'weather_ts': cls.__build_date(weather, 'ts'),
                'temp_c': weather.get('tp'),
                'pressure_hpa': weather.get('pr', None),
                'humidity_perc': weather.get('hu', None),
                'wind_speed_m_s': weather.get('ws', None),
                'wind_direction_deg': weather.get('wd', None),
                'icon': cls._icon_url_template.format(icon=weather['ic'])
            }
        
    @classmethod
    def __build_sys(cls, vals):
        if vals is not None:
            return {
                'country': vals.get('country', None),
                'state': vals.get('state', None),
                'city': vals.get('city', None),
                'coord': cls.__build_coords(vals.get('location', None))
            }
        
    @staticmethod
    def __build_date(vals, key):
        date_val = vals.get(key, None)
        match date_val:
            case int():
                return datetime.fromtimestamp(date_val)
            case str():
                date_val_2 = date_val.split('.')[0].replace('T', ' ')
                return datetime.strptime(date_val_2, '%Y-%m-%d %H:%M:%S')
            case _:
                return date_val
                    
    def __process_records(self, resp):
        tmp = resp.get('list', None)
        if tmp is not None:
            return [self.__build_record(data) for data in tmp]
        else:
            return self.__build_record(resp)

In [12]:
res = json.loads(response4.text)
c = WeatherBuilder(res['data'])
c.proc_data

{'weather': {'weather_ts': datetime.datetime(2023, 11, 29, 4, 0),
  'temp_c': 1,
  'pressure_hpa': 1021,
  'humidity_perc': 95,
  'wind_speed_m_s': 1.03,
  'wind_direction_deg': 0,
  'icon': 'https://airvisual.com/images/02n.png'},
 'pollution': {'pollution_ts': datetime.datetime(2023, 11, 29, 5, 0),
  'aqi_us_epa': 57,
  'aqi_china_mep': 22,
  'main_pollutant_us': 'p2',
  'main_pollutant_cn': 'p2'},
 'sys': {'country': 'Canada',
  'state': 'British Columbia',
  'city': 'Vancouver BC',
  'coord': {'lat': -123.15222, 'lon': 49.18639}}}

In [13]:
pdf = (
    pl
    .from_records([c.proc_data])
    .unnest('weather')
    .unnest('pollution')
    .unnest('sys')
    .with_columns([
        pl.col('pollution_ts').dt.date().alias('pollution_date'), 
        pl.col('pollution_ts').dt.time().alias('pollution_time'), 
        pl.col('weather_ts').dt.date().alias('weather_date'),
        pl.col('weather_ts').dt.time().alias('weather_time')
    ])
    .drop(['pollution_ts', 'weather_ts'])
)

In [14]:
def get_bq_type(col_name, data_type):
    bq_mode = 'NULLABLE'
    match data_type:
        case pl.Float64:
            bq_type = 'FLOAT64'
        case pl.Int32 | pl.Int64:
            bq_type = 'INT64'
        case pl.Utf8:
            bq_type = 'STRING'
        case pl.Boolean:
            bq_type = 'BOOL'
        case pl.Datetime:
            bq_type = 'DATETIME'
            # bq_type = 'TIMESTAMP'
            bq_mode = 'REQUIRED'
        case pl.Date:
            bq_type = 'DATE'
            bq_mode = 'REQUIRED'
        case pl.Time:
            bq_type = 'TIME'
            bq_mode = 'REQUIRED'            
        case _:
            match col_name:
                case 'lat' | 'lon':
                    bq_type = 'FLOAT64'
                case _:
                    raise Exception('Error processing BQ data type')
    return bq_type, bq_mode
            

def create_bq_schema(schema):
    _bq_schema_list = []
    for col_name, data_type in schema.items():
        if isinstance(data_type, (pl.Struct, pl.List)):
            if isinstance(data_type, pl.List):
                data_type_struct = data_type.inner
                bq_mode = 'REPEATED'
            else:
                data_type_struct = data_type
                bq_mode = 'NULLABLE'
                
            if isinstance(data_type, pl.Struct):
                _tmp_struct_list = []
                for col_name2, data_type2 in data_type_struct.to_schema().items():
                    bq_type2, _ = get_bq_type(col_name2, 
                                              data_type2)
                    if col_name2 in ['1h_mm', '3h_mm']: col_name2 = '_'.join([col_name2.split('_')[-1], col_name2.split('_')[0]])
                    _tmp_struct_list.append(bigquery.SchemaField(col_name2, bq_type2))
                    
                bq_type = 'RECORD'
            else:
                bq_type, _ = get_bq_type(col_name, 
                                         data_type_struct)
        else:
            bq_type, bq_mode = get_bq_type(col_name, data_type)
            
        if isinstance(data_type, pl.Struct):
            _field = bigquery.SchemaField(col_name, bq_type, mode=bq_mode, fields=tuple(_tmp_struct_list))
        elif isinstance(data_type, pl.List):
            if isinstance(data_type.inner, pl.Struct):
                _field = bigquery.SchemaField(col_name, bq_type, mode=bq_mode, fields=tuple(_tmp_struct_list))
            else:
                _field = bigquery.SchemaField(col_name, bq_type, mode=bq_mode)
        else:
            _field = bigquery.SchemaField(col_name, bq_type, mode=bq_mode)
        _bq_schema_list.append(_field)
    return _bq_schema_list

def create_load_bq_dataset(client, dataset_name):
    dataset_id = f'{client.project}.{dataset_name}'
    dataset = bigquery.Dataset(dataset_id)
    dataset = client.create_dataset(dataset, timeout=30, exists_ok=True)
    print("Created dataset {}.{}".format(client.project, dataset.dataset_id))
    return dataset
    
def create_load_bq_table(client, dataset, table_name, table_schema):
    table_id = f'{client.project}.{dataset.dataset_id}.{table_name}'
    table = bigquery.Table(table_id, schema=table_schema)
    table = client.create_table(table, timeout=30, exists_ok=True)
    print('Created table {}.{}.{}'.format(table.project, table.dataset_id, table.table_id))
    return table

def load_table_to_gcp(client, data, dataset_name, table_name, table_schema, src_format='polars'):
    dataset = create_load_bq_dataset(client, dataset_name)
    table = create_load_bq_table(client, dataset, table_name, table_schema)
    
    match src_format:
        case 'polars':
            with io.BytesIO() as stream:
                data.write_parquet(stream)
                stream.seek(0)
                job = client.load_table_from_file(
                    stream,
                    destination=table,
                    job_config=bigquery.LoadJobConfig(
                        source_format=bigquery.SourceFormat.PARQUET,
                        ignore_unknown_values=True,
                        schema=table_schema
                    ),
                )
            job.result()
        case 'pandas':
            job_config = bigquery.LoadJobConfig(schema=table_schema, write_disposition="WRITE_APPEND")
            job = client.load_table_from_dataframe(data.to_pandas(), table, job_config=job_config)
            job.result()            
        case _:
            raise Exception('Improper option')
        
    print(f'Data loaded to {table.project}.{table.dataset_id}.{table.table_id}!')

In [15]:
# pdf = (
#     pl
#     .from_records([res])
#     .unnest('data')
#     .unnest('location')
#     .unnest('current')
#     .unnest('pollution')
#     .rename({'ts': 'current_ts'})
#     .unnest('weather')
#     .rename({'ts': 'weather_ts'})    
#     .with_columns([
#         pl.col('current_ts').apply(lambda x: datetime.strptime(x.split('.')[0].replace('T', ' '), '%Y-%m-%d %H:%M:%S')), 
#         pl.col('weather_ts').apply(lambda x: datetime.strptime(x.split('.')[0].replace('T', ' '), '%Y-%m-%d %H:%M:%S')),
#     ])
#     .with_columns([
#         pl.col('current_ts').dt.date().alias('current_date'), 
#         pl.col('current_ts').dt.time().alias('current_time'), 
#         pl.col('weather_ts').dt.date().alias('weather_date'),
#         pl.col('weather_ts').dt.time().alias('weather_time')
#     ])
#     .drop(['current_ts', 'weather_ts'])
# )

In [16]:
schema_projected_pdf = create_bq_schema(pdf.schema)

In [17]:
client = bigquery.Client()
load_table_to_gcp(client, pdf, 'IQAIR', 'IQA_CURRENT_WEATHER', schema_projected_pdf, src_format='pandas')

Created dataset ordinal-stone-402505.IQAIR
Created table ordinal-stone-402505.IQAIR.IQA_CURRENT_WEATHER
Data loaded to ordinal-stone-402505.IQAIR.IQA_CURRENT_WEATHER!
