In [1]:
# %pip install pandas sqlalchemy psycopg2-binary python-dotenv tqdm ipywidgets

# Import Block

In [1]:
import pandas as pd
import numpy as np
from sqlalchemy import TIMESTAMP

import os
from threading import Thread
from dotenv import load_dotenv

import warnings
from tqdm.notebook import tqdm

import sys
sys.path.append("../src")

from utils.Database import Database

In [2]:
warnings.filterwarnings("ignore")

# Load Environment

In [3]:
load_dotenv(".env")

TABLE_WEATHER_DATA = os.getenv("TABLE_WEATHER_DATA")
TABLE_WEATHER_METADATA = os.getenv("TABLE_WEATHER_METADATA")
CWEEDS_STATION_DATA_DIR = os.getenv("CWEEDS_STATION_DATA_DIR")

# Constants

## Data Patterns

In [4]:
WEATHER_DATA_PATTERN = '(?P<year>\d{4})(?P<month>\d{2})(?P<day>\d{2})(?P<hour>\d{2})'

## DTypes

In [5]:
WEATHER_DATA_DTYPES = {
    0: 'object',
    1: 'object',
    2: 'object',
    3: 'int64',
    4: 'int64',
    5: 'object',
    6: 'int64',
    7: 'object',
    8: 'int64',
    9: 'object',
    10: 'int64',
    11: 'object',
    12: 'int64',
    13: 'object',
    14: 'int64',
    15: 'object',
    16: 'int64',
    17: 'object',
    18: 'int64',
    19: 'object',
    20: 'int64',
    21: 'object',
    22: 'str',
    23: 'object',
    24: 'int64',
    25: 'object',
    26: 'str',
    27: 'object',
    28: 'int64',
    29: 'object',
    30: 'int64',
    31: 'object',
    32: 'int64',
    33: 'object',
    34: 'int64',
    35: 'object',
    36: 'int64',
    37: 'object',
    38: 'int64',
    39: 'object',
    40: 'int64',
    41: 'object',
    42: 'int64',
    43: 'object',
}

WEATHER_DATA_DB_DTYPES = {
    "TIMESTAMP": TIMESTAMP
}

WEATHER_DATA_PROCESSED_DB_DTYPES = {
    "DATE": TIMESTAMP
}

## Mapping

In [6]:
WEATHER_DATA_NON_DIGIT_CODE_FLAG_MAPPING = {
    'Flag': 'Global horizontal irradiance / kJ/m2',
    'Flag.1': 'Direct normal irradiance / kJ/m2',
    'Flag.2': 'Diffuse horizontal irradiance / kJ/m2',
    'Flag.3': 'Global horizontal illuminance / 100 lux',
    'Flag.4': 'Direct normal illuminance / 100 lux',
    'Flag.5': 'Diffuse horizontal illuminance / 100 lux',
    'Flag.6': 'Zenith luminance / 100 Cd/m2',
    'Flag.7': 'Minutes of sunshine / 0-60 minutes',
    'Flag.8': 'Ceiling height / 10 m',
    'Flag.10': 'Visibility / 100 m',
    'Flag.12': 'Station pressure / 10 Pa',
    'Flag.13': 'Dry bulb temperature / 0.1 C',
    'Flag.14': 'Dew point temperature / 0.1 C',
    'Flag.15': 'Wind direction / 0-359 degrees',
    'Flag.16': 'Wind speed / 0.1 m/s',
    'Flag.17': 'Total sky cover / 0-10 in tenths',
    'Flag.18': 'Opaque sky cover / 0-10 in tenths',
    'Flag.19': 'Snow cover (0 = no snow cover 1 = snow cover)'
}

In [7]:
WEATHER_DATA_DIGIT_CODE_FLAG_MAPPING = {
    'Sky condition': (4,'Flag.9'),
    'Present Weather': (8, 'Flag.11')
    
}

In [8]:
WEATHER_COLUMN_RENAME_MAPPING = {
    'ECCC station identifier': 'CLIMATE_ID',
    'Year Month Day Hour (YYYYMMDDHH)': 'TIMESTAMP',
    'Date': 'DATE',
    'Extraterrestrial irradiance / kJ/m2': 'EXTRATERRESTRIAL_IRRADIANCE',
    'Global horizontal irradiance / kJ/m2': 'GLOBAL_HORIZONTAL_IRRADIANCE',
    'Direct normal irradiance / kJ/m2': 'DIRECT_NORAML_IRRADIANCE',
    'Diffuse horizontal irradiance / kJ/m2': 'DIFFUSE_HORIZONTAL_IRRADIANCE',
    'Global horizontal illuminance / 100 lux': 'GLOBAL_HORIZONTAL_LLUMINANCE',
    'Direct normal illuminance / 100 lux': 'DIRECT_NORAML_ILLUMINANCE',
    'Diffuse horizontal illuminance / 100 lux': 'DIFFUSED_HORIZONTAL_ILLUMINANCE',
    'Zenith luminance / 100 Cd/m2': 'ZEENATH_LUMINANCE',
    'Minutes of sunshine / 0-60 minutes': 'SUNSHINE_MIN',
    'Ceiling height / 10 m': 'CEILING_HEIGHT',
    'Visibility / 100 m': 'VISIBILITY',
    'Station pressure / 10 Pa': 'PRESSURE',
    'Dry bulb temperature / 0.1 C': 'DRY_BULB_TEMPERATURE',
    'Dew point temperature / 0.1 C': 'DEW_POINT_TEMPERATURE',
    'Wind direction / 0-359 degrees': 'WIND_DIRECTION',
    'Wind speed / 0.1 m/s': 'WIND_SPEED',
    'Total sky cover / 0-10 in tenths': 'TOTAL_SKY_COVER',
    'Opaque sky cover / 0-10 in tenths': 'OPAQUE_SKY_COVER',
    'Snow cover (0 = no snow cover 1 = snow cover)': 'SNOW',
    'Sky condition_0': 'SKY_CONDITION_0',
    'Sky condition_1': 'SKY_CONDITION_1',
    'Sky condition_2': 'SKY_CONDITION_2',
    'Sky condition_3': 'SKY_CONDITION_3',
    'Present Weather_0': 'WEATHER_0',
    'Present Weather_1': 'WEATHER_1',
    'Present Weather_2': 'WEATHER_2',
    'Present Weather_3': 'WEATHER_3',
    'Present Weather_4': 'WEATHER_4',
    'Present Weather_5': 'WEATHER_5',
    'Present Weather_6': 'WEATHER_6',
    'Present Weather_7': 'WEATHER_7'
 }

In [9]:
WEATHER_DATA_COLUMN_LIST_PROCESSED = list(WEATHER_COLUMN_RENAME_MAPPING.values())[3:]

assert len(WEATHER_DATA_COLUMN_LIST_PROCESSED) == 31, "There should be a total of 31 features in CWEEDS !!!"

WEATHER_DATA_COLUMN_LIST_PROCESSED

['EXTRATERRESTRIAL_IRRADIANCE',
 'GLOBAL_HORIZONTAL_IRRADIANCE',
 'DIRECT_NORAML_IRRADIANCE',
 'DIFFUSE_HORIZONTAL_IRRADIANCE',
 'GLOBAL_HORIZONTAL_LLUMINANCE',
 'DIRECT_NORAML_ILLUMINANCE',
 'DIFFUSED_HORIZONTAL_ILLUMINANCE',
 'ZEENATH_LUMINANCE',
 'SUNSHINE_MIN',
 'CEILING_HEIGHT',
 'VISIBILITY',
 'PRESSURE',
 'DRY_BULB_TEMPERATURE',
 'DEW_POINT_TEMPERATURE',
 'WIND_DIRECTION',
 'WIND_SPEED',
 'TOTAL_SKY_COVER',
 'OPAQUE_SKY_COVER',
 'SNOW',
 'SKY_CONDITION_0',
 'SKY_CONDITION_1',
 'SKY_CONDITION_2',
 'SKY_CONDITION_3',
 'WEATHER_0',
 'WEATHER_1',
 'WEATHER_2',
 'WEATHER_3',
 'WEATHER_4',
 'WEATHER_5',
 'WEATHER_6',
 'WEATHER_7']

In [10]:
WEATHER_DATA_INDEX_COLUMN = list(WEATHER_COLUMN_RENAME_MAPPING.values())[:2]

assert len(WEATHER_DATA_INDEX_COLUMN) == 2, "There are only 2 index columns!!!" 
assert 'CLIMATE_ID' in WEATHER_DATA_INDEX_COLUMN, "CLIMATE_ID should be in index!!!"
assert 'TIMESTAMP' in WEATHER_DATA_INDEX_COLUMN, "TIMESTAMP should be in index!!!"

WEATHER_DATA_INDEX_COLUMN

['CLIMATE_ID', 'TIMESTAMP']

In [11]:
WEATHER_DATA_AGG_INDEX_COLUMN = list(WEATHER_COLUMN_RENAME_MAPPING.values())[:1] + list(WEATHER_COLUMN_RENAME_MAPPING.values())[2:3]

assert len(WEATHER_DATA_AGG_INDEX_COLUMN) == 2, "There are only 2 index columns!!!" 
assert 'CLIMATE_ID' in WEATHER_DATA_AGG_INDEX_COLUMN, "CLIMATE_ID should be in index!!!"
assert 'DATE' in WEATHER_DATA_AGG_INDEX_COLUMN, "DATE should be in index!!!"

WEATHER_DATA_AGG_INDEX_COLUMN

['CLIMATE_ID', 'DATE']

## Helper Functions

In [12]:
def handel_null_values(
    df: pd.DataFrame, 
    map: dict = WEATHER_DATA_NON_DIGIT_CODE_FLAG_MAPPING
) -> pd.DataFrame:
    for flag, column in map.items():
        ### !!!! Alright "vectorized operations" increased the spped from 21.3 sec to 1.7 sec
        df[column] = np.where(df[flag] == "9", -9999, df[column])
    return df

In [14]:
def expand_digit_code(
    df:pd.DataFrame, 
    map:dict = WEATHER_DATA_DIGIT_CODE_FLAG_MAPPING
)->pd.DataFrame:
    for column in map:
        num_col, flag = map[column]
        new_columns = [f"{column}_{index}" for index in range(num_col)]
        ### !!!! Alright IDK how this thing is better but the computation time went form 8.6 sec on avrage to 1.7 sec
        # {
        for index, new_column in enumerate(new_columns):
            df[new_column] = df[column].str[index] 
        # }
        df.drop(
            column,
            axis = 1,
            inplace=True
        )
        df[new_columns] = df[new_columns].astype('int16')
    return df

In [15]:
def process_column_header_names(
    columns:list
)->list:
    columns = [
        "_".join(
            "_".join(
                col.upper().split(" / ")
            ).split(" ")
        ) for col in columns
    ]
    
    return columns

# Establish Database Connection

In [16]:
db = Database()

Connection Established!!!
	Engine(postgresql://wireaiadmin:***@localhost:5434/weather_db)


# Data Pre-Processing

In [17]:
def preprocess_file(
    file_path:str,
    db:Database = db,
    header:pd.DataFrame = None,
    raw_data_table_name:str = TABLE_WEATHER_DATA,
    df_dtypes:dict = WEATHER_DATA_DTYPES,
    db_dtype:dict = WEATHER_DATA_DB_DTYPES,
    patter_weather:str = WEATHER_DATA_PATTERN,
    features:list = WEATHER_DATA_COLUMN_LIST_PROCESSED,
    agg_index_list:list = WEATHER_DATA_AGG_INDEX_COLUMN,
    colum_mapper:dict = WEATHER_COLUMN_RENAME_MAPPING
):
    # read data to DF
    weather_df = pd.read_csv(
        file_path,                      # path to file
        sep = ",",                      # data seperator
        skiprows = [0,1,2],             # header is in 3 row, so we skip reading the lines
        header = None,                  # header is read seperately
        parse_dates = [2],              # parse data
        dtype = df_dtypes               # specify datatypes
    )

    # drop last column as it is an artifact of data parsing
    weather_df.drop(
        [44],                           # last column
        axis = 1, 
        inplace = True
    )

    # attach column header to weather data
    weather_df.rename(
        {
            index: name for index, name in enumerate(header.columns)
        },
        axis = 1, 
        inplace = True
    )

    # parse datatime pattern 
    weather_df["Year Month Day Hour (YYYYMMDDHH)"] = pd.to_datetime(
        weather_df["Year Month Day Hour (YYYYMMDDHH)"].str.extract(
            patter_weather, 
            expand = True
        )
    )
    

    # expand digit codes
    weather_df = expand_digit_code(
        df = weather_df
    )

    # WE will do the aggrigation in db directely 
    
    # # handel missing values
    # weather_df = handel_null_values(
    #     df = weather_df
    # )

    # # convert hourly reading to daily readings
    # weather_df['Date'] = weather_df['Year Month Day Hour (YYYYMMDDHH)'].dt.date
    
    # # convert hourly reading to daily readings
    # weather_agg_df = weather_df[
    #     agg_index_list + features 
    # ].groupby(
    #     by = agg_index_list
    # )[features].agg(
    #     [
    #         'mean', 
    #         'max', 
    #         'min', 
    #         'std'
    #     ]
    # ).reset_index()
    
    # rename columns
    weather_df.rename(
        mapper = colum_mapper, 
        axis = 1, 
        inplace = True
    )
    try:
        db.send_df_to_db(
            df = weather_df,
            table_name = raw_data_table_name,
            if_exists = 'append',
            dtypes = db_dtype
        )
    except Exception as e:
        print(f"{file_path}:   {e}")

    # cleat temp data
    del weather_df

In [18]:
def preprocess_worker(
    worker_id:int,
    dir:str,
    root_data_dir:str = CWEEDS_STATION_DATA_DIR,
    header:pd.DataFrame = None
):    
    # get proviential dir path
    dir_path = f"{root_data_dir}{os.sep}{dir}"

    # get station list ftom proviential dir
    file_list = os.listdir(dir_path)

    # init progress bar
    progress_bar = tqdm(
        iterable = file_list,               # The list to loop over
        desc = dir,                         # The descreption in tqdm
        position=worker_id,                 # Add the position since we have multiple running
        leave=True                          
    )    
    
    # start processing the station data
    for file_name in file_list:
        
        # updated progress bar details
        progress_bar.set_postfix_str(file_name)

        # get station data file path
        file_path = f"{dir_path}{os.sep}{file_name}"
            
        # load header if not cached
        if header is None:
            header = pd.read_csv(
                file_path,                  # path to file
                nrows = 0,                  # do not read any rows
                header = 2                  # header is at line 2
            )

        preprocess_file(
            file_path = file_path,
            header = header
        )

## Pre-Processing

In [18]:
assert False, f"Warning!!! This block should be run only ONCE..."
# get the list of provential dir in the station data dir
provience_list = os.listdir(CWEEDS_STATION_DATA_DIR)

# init column header cache...
# once this is initlaised this will be cached and file readings will be done
weather_header_df = None


# loop over each dir and extract station data in one thread
threads = []
for index, provience_dir in enumerate(provience_list):
    thread = Thread(
        target = preprocess_worker, 
        args=(index, provience_dir)
    )
    threads.append(thread)
    thread.start()

# wait for all threads to complete
for thread in threads:
    thread.join()

CWEEDS_2020_NL:   0%|          | 0/28 [00:00<?, ?it/s]

CWEEDS_2020_NT_Rev_20210324:   0%|          | 0/31 [00:00<?, ?it/s]

CWEEDS_2020_SK:   0%|          | 0/44 [00:00<?, ?it/s]

CWEEDS_2020_NB:   0%|          | 0/13 [00:00<?, ?it/s]

CWEEDS_2020_MB:   0%|          | 0/39 [00:00<?, ?it/s]

CWEEDS_2020_AB:   0%|          | 0/102 [00:00<?, ?it/s]

CWEEDS_2020_NS:   0%|          | 0/29 [00:00<?, ?it/s]

CWEEDS_2020_BC:   0%|          | 0/78 [00:00<?, ?it/s]

CWEEDS_2020_YT_Rev_20210324:   0%|          | 0/14 [00:00<?, ?it/s]

CWEEDS_2020_NU:   0%|          | 0/37 [00:00<?, ?it/s]

CWEEDS_2020_QC:   0%|          | 0/81 [00:00<?, ?it/s]

CWEEDS_2020_ON:   0%|          | 0/61 [00:00<?, ?it/s]

CWEEDS_2020_PE:   0%|          | 0/7 [00:00<?, ?it/s]

## Adding Keys and Index

In [20]:
# add primary key for faster retrival
primary_key_statement = f'ALTER TABLE "{TABLE_WEATHER_DATA}" ADD PRIMARY KEY ("{WEATHER_DATA_INDEX_COLUMN[0]}", "{WEATHER_DATA_INDEX_COLUMN[1]}");'
db.execute_sql(primary_key_statement)

In [22]:
# add foreign key to link to metadata
foreign_key_statement = f'ALTER TABLE "{TABLE_WEATHER_DATA}" ADD FOREIGN KEY ("{WEATHER_DATA_INDEX_COLUMN[0]}") REFERENCES "{TABLE_WEATHER_METADATA}" ("{WEATHER_DATA_INDEX_COLUMN[0]}");'
db.execute_sql(foreign_key_statement)

Execution started --> ALTER TABLE "weather_readings" ADD FOREIGN KEY ("CLIMATE_ID") REFERENCES "weather_metadata" ("CLIMATE_ID");
Exectution completed --> ALTER TABLE "weather_readings" ADD FOREIGN KEY ("CLIMATE_ID") REFERENCES "weather_metadata" ("CLIMATE_ID");


In [24]:
# add index on ids
index_climate_id_statement = f'CREATE INDEX weather_readings_index_climate_id ON "{TABLE_WEATHER_DATA}" ("{WEATHER_DATA_INDEX_COLUMN[0]}");'
db.execute_sql(index_climate_id_statement)

index_timestamp_statement = f'CREATE INDEX weather_readings_index_timestamp ON "{TABLE_WEATHER_DATA}" ("{WEATHER_DATA_INDEX_COLUMN[1]}");'
db.execute_sql(index_timestamp_statement)

Execution started --> CREATE INDEX weather_readings_index_climate_id ON "weather_readings" ("CLIMATE_ID");
Exectution completed --> CREATE INDEX weather_readings_index_climate_id ON "weather_readings" ("CLIMATE_ID");
Execution started --> CREATE INDEX weather_readings_index_timestamp ON "weather_readings" ("TIMESTAMP");
Exectution completed --> CREATE INDEX weather_readings_index_timestamp ON "weather_readings" ("TIMESTAMP");
