In [1]:
import os
import pandas as pd
import geopandas as gpd
import pygeos as pg
import numpy as np
import tensorflow as tf
import sqlalchemy as sq
import calendar
from dotenv import load_dotenv
from IPython.display import clear_output
from matplotlib import pyplot as plt
from DataService import DataService

2022-12-17 00:59:40.474485: I tensorflow/core/platform/cpu_feature_guard.cc:193] This TensorFlow binary is optimized with oneAPI Deep Neural Network Library (oneDNN) to use the following CPU instructions in performance-critical operations:  AVX2 FMA
To enable them in other operations, rebuild TensorFlow with the appropriate compiler flags.
2022-12-17 00:59:40.583713: E tensorflow/stream_executor/cuda/cuda_blas.cc:2981] Unable to register cuBLAS factory: Attempting to register factory for plugin cuBLAS when one has already been registered


In [2]:
# The following lines adjust the granularity of reporting.
pd.options.display.max_rows = 10
pd.options.display.float_format = "{:.1f}".format
pd.set_option('display.max_columns', None)
os.chdir('/tf')
PGUSER = os.getenv('POSTGRES_USER')
PGPW = os.getenv('POSTGRES_PW')
PGDB = os.getenv('POSTGRES_DB')
NULLFLAG = -9999

In [3]:
# Connect to the database
pullService = DataService(PGDB, PGUSER, PGPW)
db_pull_con = pullService.connect()

pushService = DataService(PGDB, PGUSER, PGPW)
db_push_con = pushService.connect()

In [4]:
def date_iter(year, month):
    """Iterate over days in a month."""
    for i in range(1, calendar.monthrange(year, month)[1] + 1):
        yield i

In [5]:
def summaryDaily(df: pd.DataFrame) -> pd.DataFrame:
    """Returns new df with min max mean for each numeric column"""
    
    result = df.groupby(['ClimateID', 'ProvinceCode', 'Year', 'Month', 'Day'], as_index=False).agg({
                                                                                                    'Temp':[('MeanTemp', 'mean'), ('MinTemp', 'min'), ('MaxTemp', 'max')],
                                                                                                  'DewPointTemp':[('MeanDewPoint', 'mean'), ('MinDewPoint', 'min'), ('MaxDewPoint', 'max')],
                                                                                                  'RelativeHumidity':[('MeanHumidity', 'mean'), ('MinHumidity', 'min'), ('MaxHumidity', 'max')],
                                                                                                  'StationPressure':[('MeanPressure', 'mean'), ('MinPressure', 'min'), ('MaxPressure', 'max')],
                                                                                                  'WindSpeed':[('MeanWindSpeed', 'mean'), ('MinWindSpeed', 'min'), ('MaxWindSpeed', 'max')],
                                                                                                  'WindChill':[('MeanWindChill', 'mean'), ('MinWindChill', 'min'), ('MaxWindChill', 'max')],
                                                                                                  'PrecipAmount':[('TotalPrecip', 'sum')], 'WindDirection':[('MeanWindDirection', 'mean')]
                                                                                                  })
    # result.drop(columns=['Hour'], inplace=True)
    return result


In [6]:
def summaryMonthly(df: pd.DataFrame) -> pd.DataFrame:
    """Returns new df with min max mean for each numeric column"""
    
    result = df.groupby(['ClimateID', 'ProvinceCode', 'Year', 'Month'], as_index=False).agg({
                                                                                              'Temp':[('MeanTemp', 'mean'), ('MinTemp', 'min'), ('MaxTemp', 'max')],
                                                                                            'DewPointTemp':[('MeanDewPoint', 'mean'), ('MinDewPoint', 'min'), ('MaxDewPoint', 'max')],
                                                                                            'RelativeHumidity':[('MeanHumidity', 'mean'), ('MinHumidity', 'min'), ('MaxHumidity', 'max')],
                                                                                            'StationPressure':[('MeanPressure', 'mean'), ('MinPressure', 'min'), ('MaxPressure', 'max')],
                                                                                            'WindSpeed':[('MeanWindSpeed', 'mean'), ('MinWindSpeed', 'min'), ('MaxWindSpeed', 'max')],
                                                                                            'WindChill':[('MeanWindChill', 'mean'), ('MinWindChill', 'min'), ('MaxWindChill', 'max')],
                                                                                            'PrecipAmount':[('TotalPrecip', 'sum')], 'WindDirection':[('MeanWindDirection', 'mean')]
                                                                                            })
    # result.drop(columns=['Hour'], inplace=True)
    return result


In [7]:
# def cleanup(id: str, year: int, month: int, day: int, srcTable: str, destTable: str) -> None:
def cleanup(df: pd.DataFrame) -> pd.DataFrame:
    df.astype({'ClimateID': 'str', 
            'ProvinceCode': 'str', 
                    'Year': 'int', 
                    'Month': 'int', 
                    'Day': 'int', 
                    'Hour': 'int', 
                    'Temp': 'float', 
                    'DewPointTemp': 'float', 
                    'PrecipAmount': 'float', 
                    'RelativeHumidity': 'float', 
                    'StationPressure': 'float', 
                    'WindChill': 'float', 
                    'WindDirection': 'float', 
                    'WindSpeed': 'float'}, copy=False)

    # Replace NULLFLAG values with mean for each column
    df = df.replace(NULLFLAG, np.nan)
    df = df.fillna(df.mean())

    return df
   


In [8]:
# Load the data
climateIdTable = "WeatherDataHourlyTwentyYear"

# first get distinct list of stations
query = "SELECT DISTINCT \"ClimateID\" FROM public.\"{}\";".format(climateIdTable)
dfIDs = pd.read_sql(query, db_pull_con)


In [None]:
srcTable = "WeatherDataHourlyTwentyYear"
destTable = "WeatherDataHourlyAggDaily"
# Replace NULLFLAG values with mean for each column
for id in dfIDs['ClimateID']:
    clear_output(wait=False)
    print("Processing ClimateID: {}".format(id))

    # Iterate through days from 2009 to 2022
    year = 2010
    month = 1
    for year in range(2000, 2022):
        # first we check if the db has data for the year
        query = "SELECT * FROM public.\"{}\" WHERE \"ClimateID\" like '{}' AND \"Year\" = {};".format(srcTable, id, year)
        dfYear = pd.read_sql(query, db_pull_con)
        if dfYear.empty:
            continue
        
        monthList = dfYear['Month'].unique()
        for month in monthList:
            dfMonth = dfYear[dfYear['Month'] == month]
            
            dayList = dfMonth['Day'].unique()
            for day in dayList:
                dfDay = dfMonth[dfMonth['Day'] == day]
               
                dfClean = cleanup(dfDay)
                
                # Get df with min max mean for each numeric column
                dfSummary = summaryDaily(dfClean)

                # Update the database
                dfSummary.to_sql(destTable, db_push_con, if_exists='append', index=False)

                print("Processed ClimateID: {} for {}".format(id, year))

In [None]:
srcTable = "WeatherDataHourlyAggDaily"
dailyAgg = pd.read_sql("SELECT * FROM public.\"{}\";".format(srcTable), db_pull_con)
dailyAgg.to_csv("WeatherDataHourlyAggDaily.csv", index=False)

srcTable = "WeatherDataHourly"
destTable = "WeatherDataHourlyAggMonthly"
# Replace NULLFLAG values with mean for each column
for id in dfIDs['ClimateID']:
    clear_output(wait=False)
    print("Processing ClimateID: {}".format(id))

    # Iterate through days from 2009 to 2022
    year = 2010
    month = 1
    for year in range(2010, 2022):
        # first we check if the db has data for the year
        query = "SELECT * FROM public.\"{}\" WHERE \"ClimateID\" like '{}' AND \"Year\" = {};".format(srcTable, id, year)
        dfYear = pd.read_sql(query, db_pull_con)
        if dfYear.empty:
            continue
        
        dfClean = cleanup(id, year, srcTable, destTable)

         # Get df with min max mean for each numeric column
        dfSummary = summaryMonthly(dfClean)

        # Update the database
        dfSummary.to_sql(destTable, db_push_con, if_exists='append', index=False)

        print("Processed ClimateID: {} for {}".format(id, year))

In [4]:
from datetime import datetime

In [5]:
# load WeatherDataHourlyAggDaily
srcTable = "WeatherDataHourlyAggDaily"
destTable = "WeatherDataHourlyAggDaily"
query = "SELECT * FROM public.\"{}\";".format(srcTable)
df = pd.read_sql(query, db_pull_con)

In [7]:
# fill utc column using year, month, day
df['utc'] = df.apply(lambda row: int(datetime(row['Year'], row['Month'], row['Day']).timestamp()), axis=1)

In [8]:
df.to_sql(destTable, db_push_con, if_exists='replace', index=False)

790

In [9]:
# load FireWaterElev
srcTable = "FireWaterElev"
destTable = "FireWaterElev"
query = "SELECT * FROM public.\"{}\";".format(srcTable)
df = pd.read_sql(query, db_pull_con)

In [10]:
# fill utc column using year, month, day
df['utc'] = df.apply(lambda row: int(datetime(row['YEAR'], row['MONTH'], row['DAY']).timestamp()), axis=1)

In [11]:
df.to_sql(destTable, db_push_con, if_exists='replace', index=False)

  self.meta.reflect(bind=self.connectable, only=[table_name], schema=schema)


428