## Imports

In [3]:
import pandas as pd
import numpy as np
import codecs
import csv
import os
import urllib.request
from sqlalchemy import create_engine

# Global Path Variables
PARENT_DIRECTORY = os.pardir

# Global Paths to Data Folders
RAW_DATA_FOLDER = os.path.join(PARENT_DIRECTORY, 'raw')
PROCESSED_DATA_FOLDER = os.path.join(PARENT_DIRECTORY, 'processed')

## Get the Data

In [5]:
# Set path to local Visual Crossing API key file
vc_key_file = 'visualcrossing_apikey.txt'
vc_key_filepath = os.path.join(PARENT_DIRECTORY, os.path.join('..', vc_key_file))

# Raise exception if key file not found
if not os.path.exists(vc_key_filepath):
    raise FileNotFoundError('Visual Crossing API key file not found! Please check directory.')

# Read in Visual Crossing API key as environment variable
with open(vc_key_filepath, 'r') as f:
    os.environ['vc_api_key'] = f.readline().strip()

FileNotFoundError: Visual Crossing API key file not found! Please check directory.

In [None]:
# Make directory for storing downloaded weather data files
weather_dir = 'weather_data'
weather_dir_path = os.path.join(RAW_DATA_FOLDER, weather_dir)

if not os.path.exists(weather_dir_path):
    os.mkdir(weather_dir_path)

In [None]:
# Read in the longitude and latitude of the centroid of all the counties in New York state
mapping_file = 'mapping.csv'
mapping_file_path = os.path.join(PROCESSED_DATA_FOLDER, mapping_file)
counties = []
with open(mapping_file_path, 'r') as csv_file:
    mapping_reader = csv.DictReader(csv_file)
    for row in mapping_reader:
        long, lat = row['County Centroid'].replace('(', '').replace(')', '').split(', ')
        counties.append((row['County'], long, lat))

In [None]:
# Check county list
print(f"County list length = {len(counties)}")
for county in counties[:5]:
    print(county)

In [None]:
def get_daily_weather_data(county, start_date, end_date, long, lat, content_type='csv'):
    """Retrives the data for a given county using the latitude and longitude provided.
        Data is retrieved for each day from start_date upto and including the end_date. 

    Args:
        county (str): The name of the county you want data for
        start_date (str): The starting date to retrieve data
        end_date (str): The final day you want data for
        long (float): The longitude of the county
        lat (float): the latitude of the county
        content_type (str, optional): The format to save the data. Defaults to 'csv'.
    """
    
    try:
        print(f"Downloading weather data for {county} county...")
        url = f'https://weather.visualcrossing.com/VisualCrossingWebServices/rest/services/timeline/{long},{lat}/{start_date}/{end_date}?unitGroup=us&include=days&key={os.environ.get("vc_api_key")}&contentType={content_type}'
        result_bytes = urllib.request.urlopen(url)
        # Parse the results as CSV
        csv_text = csv.reader(codecs.iterdecode(result_bytes, 'utf-8'))
        county_filename = county.replace(' ', '_') + '.csv'
        csv_filepath = weather_dir_path/county_filename
        # Create new CSV file and write to it
        with open(csv_filepath, 'x', newline='') as csv_file:
            csv_writer = csv.writer(csv_file)
            csv_writer.writerows(csv_text)
        print(f"Weather data for {county} county successfully downloaded!")
    except urllib.error.HTTPError as e: # Handle HTTP exceptions
        error_info = e.read().decode()
        print('Error code: ', e.code, error_info)
        return
    except urllib.error.URLError as e: # Handle URL exceptions
        error_info = e.read().decode()
        print('Error code: ', e.code, error_info)
        return

In [None]:
# Download weather data for all NY counties
start_date = '2020-01-01' # Dates are in YYYY-MM-DD format
end_date = '2022-09-30' # End date is inclusive
for county in counties:
    county_name, long, lat = county
    get_daily_weather_data(county_name, start_date, end_date, long, lat)

## Clean the Data

In [None]:
# Make directory for storing processed/cleaned weather data file
cleaned_weather_dir_path = os.path.join(PROCESSED_DATA_FOLDER, 'weather_data')
cleaned_weather_csv = 'weather_data.csv'
cleaned_weather_csv_path = os.path.join(cleaned_weather_dir_path, cleaned_weather_csv)

if not os.path.exists(cleaned_weather_dir_path):
    os.mkdir(cleaned_weather_dir_path)

# Create new csv file for storing cleaned and aggregated weather data
with open(cleaned_weather_csv_path, 'x', newline='') as f:
    print(f"Created new CSV file at: {cleaned_weather_csv_path}")

In [None]:
def process_weather_data(county, dest_csv_file_path, first=False):
    """Cleans the weather data so it can be used in analysis and modeling. 
        Includes removing unnecessary and redundant columns, imputing missing values,
        and aggregating the data on a weekly basis

    Args:
        county (str): Name of the county
        dest_csv_file_path (str): path to the file where the data should be saved
        first (bool, optional): Includes the first header line if first is set to True. Otherwise it is ignored. Defaults to False.
    """
    
    print(f"Processing weather data for {county} county...")
    county_filename = county.replace(' ', '_')
    csv_filename = county_filename + '.csv'
    # Read data from raw CSV file
    df_county = pd.read_csv(weather_dir_path/csv_filename)
    # Remove redundant columns
    df_county = df_county.drop(labels=['description', 'icon', 'stations'], axis=1)
    df_county['name'] = county
    df_county = df_county.rename(columns={'name': 'county'})
    # Drop first few and last few rows to ensure conformity with weekly-basis
    df_county = df_county.drop(labels=[0, 1, 2, 3, 998, 999, 1000, 1001, 1002, 1003], axis=0).reset_index(drop=True)
    agg_col_map = {col: 'mean' for col in df_county.columns}
    # Delete unnecessary columns
    del agg_col_map['sunrise']
    del agg_col_map['sunset']
    # Choose last appeared value for these columns
    agg_col_map['county'] = 'last'
    agg_col_map['datetime'] = 'last'
    # Choose the most frequent value for these columns
    agg_col_map['preciptype'] = lambda x: '' if len(pd.Series.mode(x)) == 0 else pd.Series.mode(x)[0]
    agg_col_map['conditions'] = lambda x: '' if len(pd.Series.mode(x)) == 0 else pd.Series.mode(x)[0]
    # Group data by every 7 days
    df_county_agg = df_county.groupby(df_county.index//7).agg(agg_col_map)
    # Append dataframe to CSV file
    if first:
        df_county_agg.to_csv(dest_csv_file_path, mode='a', index=False, header=True)
    else:
        df_county_agg.to_csv(dest_csv_file_path, mode='a', index=False, header=False)
    print(f"Appended cleaned weather data for {county} county to CSV!")

In [None]:
# Process weather data for all counties
for i in range(len(counties)):
    county_name = counties[i][0]
    if i == 0:
        process_weather_data(county_name, cleaned_weather_csv_path, True)
    else:
        process_weather_data(county_name, cleaned_weather_csv_path)

## Upload to the Database

In [None]:
# Specify path to cleaned weather csv
cleaned_weather_csv_path = os.path.join(PROCESSED_DATA_FOLDER, 'weather_data.csv')
# Read in as dataframe from csv
df_weather = pd.read_csv(cleaned_weather_csv_path)


In [None]:
# Display the data
display(df_weather)

In [None]:
# Set path to local MySQL password file
sql_pw_filepath = os.path.join(PARENT_DIRECTORY, os.path.join('..', 'sql_password.txt'))

# Raise exception if key file not found
if not os.path.exists(sql_pw_filepath):
    raise FileNotFoundError('Local MySQL password file not found! Please check directory.')

# Read in MySQL username and password as environment variable
with open(sql_pw_filepath, 'r') as f:
    os.environ['sql_username'] = f.readline().strip()
    os.environ['sql_password'] = f.readline().strip()

In [None]:
# Create the connection to the database
db_connection_str = f'mysql+pymysql://{os.environ.get("sql_username")}:{os.environ.get("sql_password")}@aipi510.mysql.database.azure.com:3306/project'
db_connection_args = {'ssl': {'enable_tls': True}}
sql_engine = create_engine(db_connection_str, connect_args=db_connection_args)
db_connection= sql_engine.connect()

In [None]:
# Add the weather data to the database
weather_table = 'weather'
try:
    df_weather.to_sql(weather_table, db_connection, if_exists='replace')
except ValueError as vx:
    print(vx)
except Exception as ex:
    print(ex)
else:
    print(f'Table {weather_table} created successfully!');   

In [None]:
# Ensure the data was added properly
test_sql_query = 'SELECT * FROM weather LIMIT 10'
df_test = pd.read_sql(test_sql_query, db_connection)
display(df_test)

In [None]:
# Close the connection
db_connection.close()