In [3]:
from sqlalchemy import create_engine 
import pandas as pd

In [4]:
DATABASE_TYPE="postgresql"
USERNAME="mutakabbir"
PASSWORD="lightning"
HOST="localhost"
PORT=5432
DATABASE_NAME="postgres"
DIVISION_ICS_DATASET_TABLE="ics_division_dataset"

In [5]:
engine = create_engine(f"{DATABASE_TYPE}://{USERNAME}:{PASSWORD}@{HOST}:{PORT}/{DATABASE_NAME}")

In [6]:
query_drop_dataset_table = f"""DROP TABLE IF EXISTS {DIVISION_ICS_DATASET_TABLE};"""

query_add_pk_to_dataset_table = f"""ALTER TABLE {DIVISION_ICS_DATASET_TABLE} ADD PRIMARY KEY ("division_id", "record_date");"""

query_subdivision_list = f"""select distinct(ds.cid) from division_station ds"""

query_stations_in_subdivision = lambda division_id: f"""select distinct(ds."climate_ID") from division_station ds where ds.cid = {division_id}"""

In [7]:
query_extract_subdivision_dataset = lambda division_id, station_list: f"""
select 
	{division_id} as division_id,
	date_trunc('hour', date_trunc('day', dd."Year Month Day Hour (YYYYMMDDHH)")) as "record_date",
	avg(dd."Extraterrestrial irradiance / kJ/m2") as "extraterrestrial_irradiance",
	avg(
		case 
			when dd."Flag" = '9' then 0
			else dd."Global horizontal irradiance / kJ/m2"
		end
	) as "global_horizontal_irradiance",
	avg(
		case 
			when dd."Flag.1" = '9' then 0
			else dd."Direct normal irradiance / kJ/m2"
		end
	) as "direct_normal_irradience",
	avg(
		case 
			when "Flag.2" = '9' then 0
			else dd."Diffuse horizontal irradiance / kJ/m2"
		end
	) as "diffuse_horizontal_irradiance",
	avg(
		case 
			when dd."Flag.3" = '9' then 0
			else dd."Global horizontal illuminance / 100 lux" 
		end
	) / 10 "global_horizontal_illumination_klux", -- calculate /kluxas 
	avg(
		case 
			when dd."Flag.4" = '9' then 0 
			else dd."Direct normal illuminance / 100 lux" 
		end
	) / 10 as "direct_normal_illumination_klux", -- calculate /klux
	avg(
		case 
			when dd."Flag.5" = '9' then 0 
			else dd."Diffuse horizontal illuminance / 100 lux"
		end
	) / 10 as "diffuse_horizontal_illumination_klux", -- calculate /klux
	avg(
		case 
			when dd."Flag.6" = '9' then 0 
			else dd."Zenith luminance / 100 Cd/m2"
		end
	) as "zenith_illumination",
	avg(
		case 
			when dd."Flag.7" = '9' then 0 
			else dd."Minutes of sunshine / 0-60 minutes"
		end
	) as "sunlight_min",
	avg(
		case 
			when dd."Flag.8" = '9' then 0 
			when dd."Ceiling height / 10 m" = 7777 then 3000 -- 7777 represents no limit. hence put max value of 3,000 
			else dd."Ceiling height / 10 m" 
		end
	) * 10 as "ceiling_height_meters", -- multiply wwith 10 since it is represented per 10 m
	avg(
		case 
			when dd."Flag.9" = '9' then 0
			else substring(lpad(dd."Sky condition"::text, 4, '0'), 1, 1)::integer  -- extract layer 1
		end
	) as "sky_layer_1",
	avg(
		case 
			when dd."Flag.9" = '9' then 0
			else substring(lpad(dd."Sky condition"::text, 4, '0'), 2, 1)::integer -- extract layer 2
		end
	) as "sky_layer_2",
	avg(
		case 
			when dd."Flag.9" = '9' then 0
			else substring(lpad(dd."Sky condition"::text, 4, '0'), 3, 1)::integer -- extract layer 3
		end
	) as "sky_layer_3",
	avg(
		case 
			when dd."Flag.9" = '9' then 0
			else substring(lpad(dd."Sky condition"::text, 4, '0'), 4, 1)::integer -- extract layer 4
		end
	) as "sky_layer_4",
	avg(
		case 
			when dd."Flag.10" = '9' then 0
			else dd."Visibility / 100 m" 
		end
	) / 10 as "visibility_km",  -- convert to visibility / 1 km
	avg(
		case 
			when dd."Flag.11" = '9' then 0
			else substring(lpad(dd."Present Weather"::text, 8, '0'), 1, 1)::integer
		end
	) as "weather_thunderstorm",
	avg(
		case 
			when dd."Flag.11" = '9' then 0
			else substring(lpad(dd."Present Weather"::text, 8, '0'), 2, 1)::integer
		end
	) as "weather_rain",
	avg(
		case 
			when dd."Flag.11" = '9' then 0
			else substring(lpad(dd."Present Weather"::text, 8, '0'), 3, 1)::integer
		end
	) as "weather_drizzle",
	avg(
		case 
			when dd."Flag.11" = '9' then 0
			else substring(lpad(dd."Present Weather"::text, 8, '0'), 4, 1)::integer
		end
	) as "weather_snow_1",
	avg(
		case 
			when dd."Flag.11" = '9' then 0
			else substring(lpad(dd."Present Weather"::text, 8, '0'), 5, 1)::integer
		end
	) as "weather_snow_2",
	avg(
		case 
			when dd."Flag.11" = '9' then 0
			else substring(lpad(dd."Present Weather"::text, 8, '0'), 6, 1)::integer
		end
	) as "weather_ice",
	avg(
		case 
			when dd."Flag.11" = '9' then 0
			else substring(lpad(dd."Present Weather"::text, 8, '0'), 7, 1)::integer
		end
	) as "weather_visibility_1",
	avg(
		case 
			when dd."Flag.11" = '9' then 0
			else substring(lpad(dd."Present Weather"::text, 8, '0'), 8, 1)::integer
		end
	) as "weather_visibility_2",
	avg(
		case 
			when dd."Flag.12" = '9' then 0
			else dd."Station pressure / 10 Pa"  
		end
	) / 100 as "pressure_kpa", -- convert to presure / kPa
	avg(
		case 
			when dd."Flag.13" = '9' then null
			else dd."Dry bulb temperature / 0.1 C"
		end
	) / 10 as "dry_bulb_temp_c", -- convert to / 1 C
	avg(
		case 
			when dd."Flag.14" = '9' then null
			else dd."Dew point temperature / 0.1 C"
		end
	) / 10 as "dew_point_temp_c", -- convert to / 1 C
	avg(
		case 
			when dd."Flag.15" = '9' then null
			else dd."Wind direction / 0-359 degrees"
		end
	) as "wind_direction_deg",
	avg(
		case 
			when dd."Flag.16" = '9' then null
			else dd."Wind speed / 0.1 m/s"
		end
	)  / 10 as "wind_speed_mps",
	avg(
		case 
			when dd."Flag.17" = '9' then null
			else dd."Total sky cover / 0-10 in tenths"
		end
	) as "sky_cover",
	avg(
		case 
			when dd."Flag.18" = '9' then null
			else dd."Opaque sky cover / 0-10 in tenths"
		end
	) as "sky_cover_opaque",
	avg(
		case 
			when dd."Flag.19" = '9' then 0
			else dd."Snow cover (0 = no snow cover 1 = snow cover)"
		end
	) as "snow"
from (
	select * 
	from 
		ics 
	where 
		ics."ECCC station identifier" in ('{"','".join(station_list)}') 
) as dd
group by record_date
"""

In [8]:
# add all the data to db

# drop table data if it exists
with engine.connect() as con:
    con.execute(query_drop_dataset_table)
# get division list
division_list = pd.read_sql(query_subdivision_list, con=engine)["cid"].values.tolist()

# add data for each division
for index, division_id in enumerate(division_list):
    print(f"Started subdivision ({index}): {division_id}")
    station_list = pd.read_sql(query_stations_in_subdivision(division_id), con=engine)["climate_ID"].values.tolist()
    print(f"\tstation list: {station_list}")
    division_dataset = pd.read_sql(
        query_extract_subdivision_dataset(
            division_id=division_id, 
            station_list=station_list
        ), 
        con=engine,
    )
    division_dataset.to_sql(name=DIVISION_ICS_DATASET_TABLE, con=engine, if_exists='append', index=False)
    del division_dataset
    print(f"\tfinished adding subdivision: {division_id}")
# add primary key for faster query
with engine.connect() as con:
    con.execute(query_add_pk_to_dataset_table)

Started subdivision (0): 51
	station list: ['3070600', '3072921', '3073140', '3074190', '3074935', '3076069']
	finished adding subdivision: 51
Started subdivision (1): 52
	station list: ['5010481', '5010547', '5012469', '5012654', '5013117', '501A7AR', '5022125', '5040689', '5043158', '504K80K']
	finished adding subdivision: 52
Started subdivision (2): 60
	station list: ['8202000', '8202592', '8206495']
	finished adding subdivision: 60
Started subdivision (3): 22
	station list: ['704607C', '706I155', '7061288', '7041166', '7060826', '7061541', '706I001', '7065639', '7063370', '7066686', '7064181']
	finished adding subdivision: 22
Started subdivision (4): 59
	station list: ['4028038', '4032322', '4028060', '4020286', '4024714', '4036844', '402DAF0', '4043901', '4038412', '4024919', '4031999']
	finished adding subdivision: 59
Started subdivision (5): 65
	station list: ['6114295', '6115529', '6117700', '6166415']
	finished adding subdivision: 65
Started subdivision (6): 44
	station list: 