In [1]:
import json
import pandas as pd
import numpy as np
from pprint import pprint
from datetime import datetime, timedelta


import re
from ast import literal_eval

from sqlalchemy import create_engine
import psycopg2

import time

from config import db_password 
pd.options.display.max_columns = None

## Open and import our 2 files into dataframes

In [2]:
file_dir = "Resources"
# Observation data
weather_file = f'../{file_dir}/rcc_data.json'

#Parameter File
params_file = f'../{file_dir}/params_list.json'

# #County File - this is a file downloaded from this government site - https://www2.census.gov/geo/docs/reference/codes/files/st27_mn_cou.txt
# county_file = f'../{file_dir}/mn_county_ref.csv'

In [3]:

# Open the read the Observation data JSON file.
with open(f'{weather_file}', mode='r') as file:
    weather_raw = json.load(file)  # Load the file into a list of Dictionaries NOT RIGHT TO JSON

# open the API params file
with open(f'{params_file}', mode='r') as file:
    params_data = json.load(file)  # Load the file into a list of Dictionaries NOT RIGHT TO JSON

# # open the MN county reference
# county_df = pd.read_csv(county_file)


In [5]:
weather_df = pd.DataFrame(weather_raw)

In [7]:
# Move the API parameters we used into a dataframe
# We need the parameters to know the date range and the values in the observation Array
params_df = pd.DataFrame(params_data)

In [8]:
# Parse out the JSON data into a Stations Dataframe and an Observations Dataframe
counter = 0
stations_list =[]
observations_list = []

# Convert the start/end date used in the API call stored in the params file
startDate = pd.to_datetime(params_df['sdate'][0])
endDate = pd.to_datetime(params_df['edate'][0])

# Loop through the upper most level of the JSON file called "data"
for observation in weather_raw['data']:

    # Create a list of stations
    stations_list.append(observation["meta"])
        
    # Start the current date at the first date in the range for each stations observations
    currentDate = startDate
    
    # Loop through the stations observations and create a list of observations
    for observation_data in observation["data"]:
        observations_list.append({"station_uid":observation["meta"]['uid'],"date":currentDate, "data":observation_data})

        # Increment the date by one day
        currentDate = currentDate + timedelta(days=1)

        # Increment our counter (for debugging use)
        counter = counter + 1


In [9]:

# Convert our lists to dataframes
stations_df = pd.DataFrame(stations_list)
observations_df = pd.DataFrame(observations_list)


In [10]:
observations_df.tail()

Unnamed: 0,station_uid,date,data
3266594,63155,2022-05-12,"[M, M, 0.96, M, M, M]"
3266595,63155,2022-05-13,"[M, M, 0.11, M, M, M]"
3266596,63155,2022-05-14,"[M, M, M, M, M, M]"
3266597,63155,2022-05-15,"[M, M, M, M, M, M]"
3266598,63155,2022-05-16,"[M, M, M, M, M, M]"


## Clean the stations data

In [11]:
# split the ll list into Latitude and longitude 
split_df = pd.DataFrame(stations_df['ll'].to_list(), columns = ['latitude','longitude'])
# concat df and split_df
stations_df = pd.concat([stations_df, split_df], axis=1)
# Drop ll column.
stations_df = stations_df.drop(columns=["ll"])
# # display df
# stations_df.head()

In [12]:
# # create sids columns for the array sids
# sids = []
# sid_string = ""
# counter = 1
# for x in range(stations_df.sids.map(len).max()):
#   sids.append(f"sid_{counter}")
#   sid_string = f"{sid_string}, 'sid_{counter}'"
#   counter = counter + 1

# # split the array into separate columns
# split_df = pd.DataFrame(stations_df['sids'].to_list(), columns = sids)

# # concat df and split_df
# stations_df = pd.concat([stations_df, split_df], axis=1)

# # Drop names from columns.
stations_df = stations_df.drop(columns=["sids"])


In [None]:
# Change the county field to a numeric type
stations_df['county'] = stations_df['county'].apply(pd.to_numeric)

In [13]:
# Reorder columns in the DF - Remove the valid_daterange and sid_dates since they are not necessary
#column_list = f"['uid','name','county','state','latitude','longitude','climdiv'{sid_string}]"  #'valid_daterange','sid_dates'
column_list = f"['uid','name','county','state','latitude','longitude','climdiv']"  #'valid_daterange','sid_dates'

# change the string into an array for the reordering
columnArray = literal_eval(column_list)

stations_df= stations_df.loc[:, columnArray]
stations_df.rename(columns={'uid':'station_uid'},inplace=True)


## Clean the observations dataframe

In [14]:
observations_df.tail()

Unnamed: 0,station_uid,date,data
3266594,63155,2022-05-12,"[M, M, 0.96, M, M, M]"
3266595,63155,2022-05-13,"[M, M, 0.11, M, M, M]"
3266596,63155,2022-05-14,"[M, M, M, M, M, M]"
3266597,63155,2022-05-15,"[M, M, M, M, M, M]"
3266598,63155,2022-05-16,"[M, M, M, M, M, M]"


In [15]:
df2 = observations_df.loc[(observations_df['station_uid'] == 96447)]
df2.head()

Unnamed: 0,station_uid,date,data
3058251,96447,2002-01-01,"[M, M, M, M, M, M]"
3058252,96447,2002-01-02,"[M, M, M, M, M, M]"
3058253,96447,2002-01-03,"[M, M, M, M, M, M]"
3058254,96447,2002-01-04,"[M, M, M, M, M, M]"
3058255,96447,2002-01-05,"[M, M, M, M, M, M]"


In [16]:
# Separate the data columns into the elems requested
elems = str(params_df["elems"][0])

elems = elems.replace(",","','")

# convert the elems from the params list to a string for splitting apart the Array 
column_list = f"['{elems}']"
columnArray = literal_eval(column_list)

# split the array into separate columns]
split_df = pd.DataFrame(observations_df['data'].to_list(), columns = columnArray)


In [17]:
# Replace any of the "M" values with NaN in the data.  This is a Missing Value
split_df.replace("M",np.NaN, inplace=True)

In [18]:
# Replace any of the "T" values with 0 in the data.  This is a trace amount of precipitation
split_df.replace("T",0, inplace=True)

In [19]:
# split up the data returned based on the Elements stored in the params json file

# concat df and split_df
observations_df = pd.concat([observations_df, split_df], axis=1)
# Drop data column.
observations_df = observations_df.drop(columns=["data"])

In [20]:
observations_df.count()

station_uid    3266599
date           3266599
maxt            126528
mint            126467
pcpn            525442
snow            326223
snwd            135202
avgt            126073
dtype: int64

In [21]:
# Drop any observations where it's missing a min temperature
observations_df = observations_df.dropna(subset=['mint'])

In [22]:
observations_df.count()

station_uid    126467
date           126467
maxt           126073
mint           126467
pcpn           115512
snow            91141
snwd            85409
avgt           126073
dtype: int64

In [23]:
# change the datatypes of the columns to numeric
observations_df[columnArray] = observations_df[columnArray].apply(pd.to_numeric, errors='coerce', axis=1)

In [24]:
stations_df.count()

station_uid    439
name           439
county         439
state          439
latitude       439
longitude      439
climdiv        439
dtype: int64

In [25]:
stations_df = stations_df[stations_df.station_uid.isin(observations_df['station_uid'])]

In [26]:
stations_df.count()

station_uid    27
name           27
county         27
state          27
latitude       27
longitude      27
climdiv        27
dtype: int64

In [27]:
stations_df.dtypes

station_uid      int64
name            object
county          object
state           object
latitude       float64
longitude      float64
climdiv         object
dtype: object

## Create calculated fields for dashboard and ML

### Add columns to observation table

In [28]:
# Create a freeze_day column based on min temperature
observations_df['freeze_day'] = np.where(observations_df['mint'] <= 32, 1, 0)

# Flag if it reaches above freezing on the day
observations_df['above_freezing'] = np.where(observations_df['maxt'] > 32, 1, 0)

# Create columns for the different date parts to make processing easier
observations_df['obs_year'] = pd.to_datetime(observations_df['date']).dt.year
observations_df['obs_month'] = pd.to_datetime(observations_df['date']).dt.month
observations_df['obs_day'] = pd.to_datetime(observations_df['date']).dt.day
observations_df['obs_dayofyear'] = pd.to_datetime(observations_df['date']).dt.dayofyear

### Station/Year dataframe creation and calculations

In [30]:
# create a dataframe to store yearly summary info by station ID
years = pd.to_datetime(observations_df['date']).dt.year.unique()
years_df = pd.DataFrame(years,columns=['obs_year'])
station_yearly_metrics_df = pd.merge(stations_df['station_uid'], years_df, how='cross')
station_yearly_metrics_df = station_yearly_metrics_df.set_index(['station_uid','obs_year'])

In [31]:
# get the last frost date of each station for each year of data
last_freeze_df = observations_df.loc[(observations_df['freeze_day']==1)  & (observations_df['obs_dayofyear'] < 180),['station_uid','date','obs_year','obs_dayofyear'] ]. \
        groupby(["station_uid","obs_year"])[['date','obs_dayofyear']].max().rename(columns={'date':'last_freeze_date','obs_dayofyear':'last_freeze_dayofyear'})

# get the first freeze in the fall
first_freeze_df = observations_df.loc[(observations_df['freeze_day']==1)  & (observations_df['obs_dayofyear'] >= 180),['station_uid','date','obs_year','obs_dayofyear'] ]. \
        groupby(["station_uid","obs_year"])[['date','obs_dayofyear']].min().rename(columns={'date':'first_freeze_date','obs_dayofyear':'first_freeze_dayofyear'})

# Determine if we have a complete set of observations for april to may for each station/year
april_to_may_days_recorderd_df = pd.DataFrame(observations_df.loc[(observations_df['obs_month']>=4 )&(observations_df['obs_month'] <= 6),['station_uid','obs_year','mint']]\
        .groupby(['station_uid','obs_year'])['mint'].count()).rename(columns={'mint':'observations_recorded_april_to_may'})

In [32]:
# # Get the coldest day of the year
coldest_day_of_year = observations_df.groupby(["station_uid","obs_year"])[['mint']].min().rename(columns={'mint':'coldest_day'})
coldest_day_of_year.head()

Unnamed: 0_level_0,Unnamed: 1_level_0,coldest_day
station_uid,obs_year,Unnamed: 2_level_1
10392,2002,-5.0
10392,2003,-16.0
10392,2004,-11.0
10392,2005,-16.0
10392,2006,-14.0


In [33]:
coldest_day_df = pd.merge(coldest_day_of_year, observations_df, how='left', left_on=['station_uid', 'obs_year','coldest_day'], right_on = ["station_uid","obs_year","mint"])
coldest_day_of_year_df = pd.DataFrame(coldest_day_df.groupby(["station_uid","obs_year",'coldest_day'])['obs_dayofyear'].max())
coldest_day_of_year_df.rename(columns={'obs_dayofyear':'coldest_dayofyear'},inplace=True)
coldest_day_of_year_df = coldest_day_of_year_df.reset_index()
coldest_day_of_year_df.head()

Unnamed: 0,station_uid,obs_year,coldest_day,coldest_dayofyear
0,10392,2002,-5.0,18
1,10392,2003,-16.0,38
2,10392,2004,-11.0,359
3,10392,2005,-16.0,14
4,10392,2006,-14.0,49


In [34]:
coldest_day_of_year_df = coldest_day_of_year_df.set_index(keys=['station_uid','obs_year'])
station_yearly_metrics_df = pd.merge(station_yearly_metrics_df, coldest_day_of_year_df, how='left', left_index=True, right_index =True)

In [35]:
#station_yearly_metrics_df.head()

In [36]:
# df2 = station_yearly_metrics_df.loc[(station_yearly_metrics_df['station_uid'] == 40882)]
# df2.head()

In [37]:
# Get the hottest day of the year, if there are multiple days with the temperature, use the latest one in the year (the one closest to the next last freeze date the next spring)
hottest_day_of_year = observations_df.groupby(["station_uid","obs_year"])[['maxt']].max().rename(columns={'maxt':'hottest_day'})
hottest_day_df = pd.merge(hottest_day_of_year, observations_df, how='left', left_on=['station_uid', 'obs_year','hottest_day'], right_on = ["station_uid","obs_year","maxt"])
hottest_day_of_year_df = pd.DataFrame(hottest_day_df.groupby(["station_uid","obs_year",'hottest_day'])['obs_dayofyear'].max())
hottest_day_of_year_df.rename(columns={'obs_dayofyear':'hottest_dayofyear'},inplace=True)
hottest_day_of_year_df = hottest_day_of_year_df.reset_index()
hottest_day_of_year_df = hottest_day_of_year_df.set_index(keys=['station_uid','obs_year'])
station_yearly_metrics_df = pd.merge(station_yearly_metrics_df, hottest_day_of_year_df, how='left', left_index=True, right_index =True)
#hottest_day_of_year_df.head()

In [38]:
#station_yearly_metrics_df.head()

In [39]:
# merge all the yearly data 
station_yearly_metrics_df = pd.merge(station_yearly_metrics_df, last_freeze_df, how='left', left_on=['station_uid','obs_year'], right_index=True) #,  left_on=['station_uid', 'year'], right_on = ["station_uid","obs_year"])
station_yearly_metrics_df = pd.merge(station_yearly_metrics_df, first_freeze_df, how='left', left_on=['station_uid','obs_year'], right_index=True) # ,  left_on=['station_uid', 'year'], right_on = ["station_uid","obs_year"])
station_yearly_metrics_df = pd.merge(station_yearly_metrics_df, april_to_may_days_recorderd_df, how='left', left_on=['station_uid','obs_year'], right_index=True) # ,  left_on=['station_uid', 'year'], right_on = ["station_uid","obs_year"])

In [40]:
#station_yearly_metrics_df.head(20)

### Start of calculations for the Stations dataframe

In [41]:
# Determine the mean/average last freeze date for a station 
avg_last_freeze_df = pd.DataFrame(station_yearly_metrics_df.groupby(['station_uid'])['last_freeze_dayofyear'].mean().round(0)).rename(columns={'last_freeze_dayofyear':'avg_last_freeze_dayofyear'})

# Convert the day of year to a string value for mm/dd
avg_last_freeze_df["avg_last_freeze_mm_dd"] = pd.to_datetime(avg_last_freeze_df["avg_last_freeze_dayofyear"],format='%j').dt.strftime('%m/%d')

# determine the mean, get the string value
median_last_freeze_df = pd.DataFrame(station_yearly_metrics_df.groupby(['station_uid'])['last_freeze_dayofyear'].median().round(0)).rename(columns={'last_freeze_dayofyear':'median_last_freeze_dayofyear'})
median_last_freeze_df["median_last_freeze_mm_dd"] = pd.to_datetime(median_last_freeze_df["median_last_freeze_dayofyear"],format='%j').dt.strftime('%m/%d')

# Merge the values into a single table
station_metrics_df = pd.merge(stations_df, avg_last_freeze_df, left_on=['station_uid'], right_on = ['station_uid'])
station_metrics_df = pd.merge(station_metrics_df, median_last_freeze_df, left_on=['station_uid'], right_on = ['station_uid'])

In [42]:
station_yearly_metrics_df.reset_index(inplace=True)
# station_yearly_metrics_df.head()

In [43]:
# merge the station metrics and station/yearly DF to determine metrics for each station
merged_station_and_yearly_df = pd.merge(station_yearly_metrics_df, station_metrics_df, how="left", on=["station_uid", "station_uid"])

In [44]:
# Get a count of how many years the station is in the dataset
stations_years_count = merged_station_and_yearly_df.groupby("station_uid").count()["obs_year"]

# Calculate the number of years where the last freeze was before or on the average date
stations_count_at_or_before_avg_last_freeze = merged_station_and_yearly_df[(merged_station_and_yearly_df["last_freeze_dayofyear"] <= merged_station_and_yearly_df['avg_last_freeze_dayofyear'])]
stations_count_at_or_before_avg_last_freeze = stations_count_at_or_before_avg_last_freeze.groupby("station_uid").count()["obs_year"]

# Calculate the number of years where the last freeze was after the average date
stations_count_later_than_avg_last_freeze = merged_station_and_yearly_df[(merged_station_and_yearly_df["last_freeze_dayofyear"] > merged_station_and_yearly_df['avg_last_freeze_dayofyear'])]
stations_count_later_than_avg_last_freeze = stations_count_later_than_avg_last_freeze.groupby("station_uid").count()["obs_year"]



In [45]:
station_calc_values_df = pd.DataFrame(
          {"years_included": stations_years_count,
          "count_at_or_before_avg_last_freeze": stations_count_at_or_before_avg_last_freeze, 
          "count_later_than_avg_last_freeze": stations_count_later_than_avg_last_freeze})
# station_calc_values_df.head(20)

In [46]:
station_metrics_full_df = pd.merge(station_metrics_df, station_calc_values_df, how="left", on=["station_uid", "station_uid"])
# station_metrics_full_df.head(20)

### Start of Observation Day Calculations

In [47]:
def observation_calcs_loop(date_range):
    counter = 0
    observation_calcs_df = pd.DataFrame()

    # Loop through all the observations and calculate the metrics based on the number of days previous passed in
    for index, row in observations_df.iterrows():
        # call function to get previous X days of data

        df = pd.DataFrame(observations_df.loc[(observations_df.station_uid == row['station_uid']) & \
                            (observations_df.date < row['date']) & \
                            (observations_df.date >= row['date'] - timedelta(days=date_range))] \
                    .groupby('station_uid').agg(max_temp=('maxt', 'max'), 
                            min_temp=('mint', 'min'),
                            avgt=('avgt','mean'),
                            precip=('pcpn','sum'),
                            count=('date','count')) \
                    .reset_index(drop=True))

        df['source_index'] = index

        observation_calcs_df = pd.concat([observation_calcs_df,df], ignore_index=True)
        
    # Rename the columns for the range
    column_array = [f'maxt_{date_range}_day', f'mint_{date_range}_day', f'avgt_{date_range}_day', f'precip_{date_range}_day',f'obs_count_{date_range}_day','source_index']
    observation_calcs_df.columns = column_array

    observation_calcs_df = observation_calcs_df.set_index("source_index")
    return observation_calcs_df


In [48]:
# Run the function for 7 days and merge the new columns into the table
observations_calcs_7_day = observation_calcs_loop(7)
observations_new_df = pd.merge(observations_df, observations_calcs_7_day, how="left", left_index=True, right_index=True)
# observations_new_df.head()

In [50]:
# Run the function for 30 days and merge the new columns into the table
observations_calcs_30_day = observation_calcs_loop(30)
observations_new_df = pd.merge(observations_new_df, observations_calcs_30_day, how="left", left_index=True, right_index=True)

In [51]:
# Create the Output file (CSV
output_station_file = "../Resources/station_data.csv"
output_station_year_file = "../Resources/station_year_data.csv"
output_observation_file = "../Resources/observation_data.csv"

station_metrics_full_df.to_csv(output_station_file, index=False)
station_yearly_metrics_df.to_csv(output_station_year_file, index=False)
observations_new_df.to_csv(output_observation_file, index=False)



In [62]:
# Connect to PostgreSQL movie_data DB
db_string = f"postgresql://postgres:{db_password}@127.0.0.1:5432/last_freeze_analysis"

# Create the database engine with the following line 
engine = create_engine(db_string)

# These two drop statements are to cleanup an old set of tablenames
with engine.connect() as con:
    con.execute("DROP TABLE IF EXISTS stations;")
with engine.connect() as con:
    con.execute("DROP TABLE IF EXISTS observations;")

# Drop the view since we'll recreate it later
with engine.connect() as con:
    con.execute("DROP VIEW IF EXISTS v_days_until_freeze_calcs_0_to_180_days;")

# RECREATE THE TABLES WITH DATA.  CREATE IN THIS ORDER TO DEAL WITH FK's
# Save the observations DataFrame to a SQL table "observations"- Replace the table if it already exists
observations_new_df.to_sql(name='observation', con=engine, if_exists='replace', index=False)   

# Save the stations DataFrame to a SQL table "stations"- Replace the table if it already exists
station_yearly_metrics_df.to_sql(name='station_yearly', con=engine, if_exists='replace', index=False)   

# Save the stations DataFrame to a SQL table "stations"- Replace the table if it already exists
station_metrics_full_df.to_sql(name='station', con=engine, if_exists='replace', index=False)   



# Add primary keys
with engine.connect() as con:
    con.execute("ALTER TABLE station ADD PRIMARY KEY (station_uid);")

with engine.connect() as con:
    con.execute("ALTER TABLE station_yearly ADD PRIMARY KEY (station_uid,obs_year);")

with engine.connect() as con:
    con.execute("ALTER TABLE observation ADD PRIMARY KEY (station_uid,date);")



In [59]:
# Create a view for additional calculations
with engine.connect() as con:
    con.execute("CREATE OR REPLACE VIEW v_days_until_freeze_calcs_0_to_180_days \
                AS SELECT o.station_uid, o.date, s.avg_last_freeze_dayofyear - o.obs_dayofyear AS days_until_avg_freeze_dayofyear, \
                sy.last_freeze_dayofyear - o.obs_dayofyear AS days_until_current_year_last_freeze_dayofyear \
                FROM observation o \
                JOIN station_yearly sy ON sy.station_uid = o.station_uid AND sy.obs_year = o.obs_year \
                JOIN station s ON sy.station_uid = s.station_uid \
                WHERE o.obs_dayofyear >= 0 AND o.obs_dayofyear < 180;")


In [60]:
# # # Add foreign keys
#### not adding an FK to this reporting table since it's a reporting data, not source data
# with engine.connect() as con:
#     con.execute("ALTER TABLE station_yearly ADD CONSTRAINT stn_yrly_station_uid_fk FOREIGN KEY (station_uid) REFERENCES station (station_uid);")

with engine.connect() as con:
    con.execute("ALTER TABLE observation ADD CONSTRAINT obs_station_uid_fk FOREIGN KEY (station_uid) REFERENCES station (station_uid);")
        

In [56]:
print('end of script')

end of script
