# Weather Data Pipeline for [C2A2](C2A2.ipynb)

[GHCN_Daily Readme](ghcn_daily_readme.txt)

[GHCN_Daily by year Readme](ghcn_daily_by_year_readme.txt)

## [GHCN-D Downloader](GHCN-D Downloader.ipynb)

Downloads the GHCN-D data and cleans it up.

In [None]:
import pandas as pd
import numpy as np
from tqdm import tqdm

# These are the years we want to get the data for
years = np.arange(11)+2005

ghcnd = []

# For each year, download the corresponding data, and perform cleaning
for year in tqdm(years):
    df = pd.read_csv('ftp://ftp.ncdc.noaa.gov/pub/data/ghcn/daily/by_year/{}.csv.gz'.format(year),
                        compression='gzip', 
                        index_col=0, 
                        header=None,
                        usecols=[0,1,2,3,5],
                        names=['ID', 'Date', 'Element', 'Data_Value', 'M-FLAG', 'Q-FLAG', 'S-FLAG', 'OBS-TIME'],
                        parse_dates = [1],
                        infer_datetime_format=True)
    df = df[(df['Element'].isin(['TMAX', 'TMIN'])) & (df['Q-FLAG'].isnull()) & (df['Data_Value'].notnull())]
    df = df.drop('Q-FLAG', axis=1)
    ghcnd.append(df)

# Create master DataFrame
ghcnd_df = pd.concat(ghcnd)

# Save to hdf5
ghcnd_df.to_hdf('GHCND_10Year.h5', 'ghcn_df')

## [Binning](Binning.ipynb)

Splits the surface of the earth into equal area bins. Creates folders by bin size and in each folder are csvs of the binned data.

In [None]:
import os
import pandas as pd
import numpy as np
import hashlib
from tqdm import tqdm

def reproject(latitude, longitude):
        """Returns the x & y coordinates in meters using a sinusoidal projection"""
        earth_radius = 6371009 # in meters
        lat_dist = np.pi * earth_radius / 180.0

        y = latitude*lat_dist
        x = longitude * lat_dist * np.cos(np.radians(latitude))
        return x, y

    
def hash_it(s):
    return hashlib.sha224(s.encode('utf-8')).hexdigest()    


def Bin_and_Split(d):

    stations = pd.read_csv('ghcnd-stations.csv', index_col=0)
    
    # Reduce the stations to the ones that exist in ghcn_df (from GHCN-D Downloader.ipynb)
    stations = stations[stations.index.isin(stations_with_data)]

    stations['x'], stations['y'] = reproject(stations['LATITUDE'], stations['LONGITUDE'])

    step_size = (stations['x'].max()-stations['x'].min())/d

    # Bin stations 
    labels = [ "{0} to {1}".format(i, i + step_size) for i in np.arange(stations['x'].min(), stations['x'].max(), step_size) ]
    stations['x_group'] = pd.cut(stations.x, np.arange(stations['x'].min(), stations['x'].max()+step_size, step_size), right=False, labels=labels)
    stations['y_group'] = pd.cut(stations.y, np.arange(stations['x'].min(), stations['x'].max()+step_size, step_size), right=False, labels=labels)
    stations['xy_group'] = stations['x_group'].astype('str') + ', ' + stations['y_group'].astype('str')
    
    # Find hashing from xy_group
    stations['hash']=stations['xy_group'].apply(hash_it)
    
    # Map hashing to ghcn data, sort for faster querying
    ghcn2 = ghcn_df.copy()
    ghcn2['hash'] = stations['hash']
    ghcn2 = ghcn2.sort_values('hash')
    ghcn3 = ghcn2.drop('hash', axis=1)

    # Create the folder that will hold the csvs for that specific step size
    new_folder = 'BinnedCsvs_d{}'.format(d)
    os.mkdir(new_folder)
    
    # For each hash, select from ghcn data and save to csv
    sorted_hashes = np.sort(stations['hash'].unique())
    for hashid in sorted_hashes:
        left, = ghcn2['hash'].searchsorted(hashid, 'left')
        right, = ghcn2['hash'].searchsorted(hashid, 'right')
        df_by_bin = ghcn3.iloc[left:right]
        df_by_bin.to_csv('./'+new_folder+'/{}.csv'.format(hashid))

    stations.to_csv('BinSize_d{}.csv'.format(d))

In [None]:
# These are the step sizes we want to use
step_sizes = [400, 200, 100, 50, 25, 18, 12.5]

# Load ghcn data
ghcn_df = pd.read_hdf('GHCND_10Year.h5')

# Pull out the set of stations that have passed the initial requirements from `GHCN-D Downloader`
stations_with_data = set(ghcn_df.index)

for s in tqdm(step_sizes):
    Bin_and_Split(s)

## [CSV_Checker](CSV_Checker.ipynb)

Checks each csv for three conditions:

* make sure the datafile has data for years 2005-2015 and correct columns
* length of `lastyear_maxmin['TMAX']`, `lastyear_maxmin['TMIN']`, `previousyears_maxmin['TMAX']`, and `previousyears_maxmin['TMIN']` should be 365 or 366.
* There are at least `x` `tmin` outliers + `tmax` outliers in the last year (`x` default is 1)

In [None]:
import pandas as pd
import numpy as np
from tqdm import tqdm
import os

# This is the number the step size is divided by (aka larger d yields smaller bin size)
# Step sizes used: [400, 200, 100, 50, 25, 18, 12.5]
d = 18

# Get a list of the csvs in the directory
destdir = 'BinnedCsvs_d{}'.format(d)
csvs = [ os.path.join(destdir,f) for f in os.listdir(destdir) ]

In [None]:
def main_check(csv, reduced=False):
    '''Check csv, if reduced == True, check csv when it has been reduced to the top 10 stations'''
    try:
        csv_passes = True

        # Read in the csv (Created by GHCN_Binned.ipynb)
        df = pd.read_csv(csv, index_col=0)
        # Only want TMAX and TMIN elements
        # df = df[df.element.isin(['TMAX', 'TMIN'])]

        # New binning used different variable names, for compatability
        df.reset_index(0, inplace=True)
        df.columns = ['id', 'date', 'element', 'value']

        # Use datetime for easy indexing
        df['date'] = pd.to_datetime(df['date'])
        
        
        ################### REDUCE #########################
        # If we want to perform the checks using only the top 10 most frequent stations in each bin
        if reduced == True:
            station_count_df = df.groupby('id')['id'].agg({'station_count':'count'})
            station_list_sorted = list(station_count_df.sort_values('station_count', ascending=False).index)
            top10_stations = station_list_sorted[:10]

            df = df[df['id'].isin(top10_stations)]
        ####################################################

        # Convert table from long format to wide
        df_wide = df.pivot_table(index=['id', 'date'], columns=['element'], values='value')

        # Convert temperature readings to degrees C and precipitation readings to mm 
        # (not needed for checking)
        # df_wide['TMAX'] /= 10
        # df_wide['TMIN'] /= 10

        df_wide.reset_index(0, inplace=True)

        # Check 1: make sure the datafile has data for years 2005-2015 and correct columns
        # Returns True if the data passes the check
        def check_1():
            a = set(df_wide.columns) >= {'TMAX', 'TMIN', 'id'}
            b = set(np.arange(2005,2016)) <= set(df_wide.index.year)
            return a and b
        
        # Update csv_passes to reflect result from the first check
        csv_passes = check_1()
    
        # If the first check is failed, return
        if csv_passes == False:
            return (csv, csv_passes)
    
        #########################################################################

        # Number of years to aggregate over that will be compared against final_year
        num_years = 10

        # The final year that will be used to check for broken records
        final_year = 2015

        #########################################################################

        # Split the data into the last year and the previous years
        previousyears = df_wide[str(final_year-num_years):str(final_year-1)]
        lastyear = df_wide[str(final_year)]

        # Calculate previous years' record highs and lows
        previousyears_maxmin = pd.groupby(previousyears, by=[previousyears.index.month,previousyears.index.day]).agg({'TMAX':'max', 'TMIN':'min'})

        # Calculate the last year's record highs and lows
        lastyear_maxmin = pd.groupby(lastyear, by=[lastyear.index.month,lastyear.index.day]).agg({'TMAX':'max', 'TMIN':'min'})


        # Only do check 2 if check 1 passes
        # check 2: length of lastyear_maxmin['TMAX'] lastyear_maxmin['TMIN'] previousyears_maxmin['TMAX'] 
        # and previousyears_maxmin['TMIN'] should be 365 or 366.
        # Returns True if the data passes the check
        def check_2():
            # Get lengths
            n1 = lastyear_maxmin['TMAX'].count()
            n2 = lastyear_maxmin['TMIN'].count()
            n3 = previousyears_maxmin['TMAX'].count()
            n4 = previousyears_maxmin['TMIN'].count()
            # Check that lengths are 365 or 366
            return set([n1, n2, n3, n4]) <= set([365, 366])


        # Only do check 3 if check 2 passes
        # check 3: There are at least x tmin outliers + tmax outliers in the last year
        # Returns True if the data passes the check
        def check_3(x=1):
            # Deal with leap years
            leap_day_missing = False

            try:
                leap_day_missing = False if previousyears_maxmin.xs([2,29]).count() == 2 else True
            except:
                leap_day_missing = True

            if not leap_day_missing:
                previousyears_WLY = previousyears_maxmin.loc[(previousyears_maxmin.index.get_level_values(0) != 2) | 
                                                             (previousyears_maxmin.index.get_level_values(1) != 29)]
            else:
                previousyears_WLY = previousyears_maxmin

            # Find record highs and lows (outliers)
            record_high_lastyear = lastyear_maxmin['TMAX'].where(lastyear_maxmin['TMAX'] >= previousyears_WLY['TMAX'])
            record_low_lastyear = lastyear_maxmin['TMIN'].where(lastyear_maxmin['TMIN'] <= previousyears_WLY['TMIN'])

            total_number_of_outliers = record_high_lastyear.count()+record_low_lastyear.count()

            return total_number_of_outliers >= x


        csv_passes = check_2()

        if csv_passes == False:
            return (csv, csv_passes)
        else:
            csv_passes = check_3()
            return (csv, csv_passes)
    except:
        return '{} failed!'.format(csv)

In [None]:
# Iterate through all the csvs and test them using main_check
results = []
for csv in tqdm(csvs):
    results.append(main_check(csv))

In [None]:
# These are the csvs that raised an error
error_csvs = []
for i in results:
    if type(i) != tuple:
        csv_name = i.split()[0]
        error_csvs.append(csv_name)
print('There is/are {} csv/s that produced an error.'.format(len(error_csvs)))

In [None]:
# These are the csvs that failed (should be deleted)
failed_csvs = []
for i in results:
    if i[1] == False:
        failed_csvs.append(i[0])
print('There are {} csvs that failed.'.format(len(failed_csvs)))

In [None]:
# These are the csvs that passed (should be used for the assignment)
passed_csvs = []
for i in results:
    if i[1] == True:
        passed_csvs.append(i[0])
print('There are {} csvs that passed.'.format(len(passed_csvs)))

In [None]:
# Sanity check
len(failed_csvs) + len(passed_csvs) + len(error_csvs) == len(csvs)

In [None]:
# check what stations are in the passed_csvs
passed_stations = []
for csv in passed_csvs:
    passed_df = pd.read_csv(csv, index_col=0)
    passed_stations += list(passed_df.index.unique())

print('There are {} stations left!'.format(len(passed_stations)))

In [None]:
# Save the stations that passed to csv
#pd.Series(passed_stations).to_csv('passed_stations_d{}.csv'.format(d))

## [Plotting](Plot_Bin.ipynb)

Plotting functions to show the learner where the data is coming from.

### Plot Stations (Basemap)

In [None]:
from mpl_toolkits.basemap import Basemap
import matplotlib.pyplot as plt
import pandas as pd
import numpy as np

def plot_stations(binsize, hashid):

    df = pd.read_csv('BinSize_d{}.csv'.format(binsize))

    station_locations_by_hash = df[df['hash'] == hashid]

    lons = station_locations_by_hash['LONGITUDE'].tolist()
    lats = station_locations_by_hash['LATITUDE'].tolist()

    mean_lat = np.mean(lats)
    mean_lon = np.mean(lons)
    max_lat = np.max(lats)
    min_lat = np.min(lats)
    max_lon = np.max(lons)
    min_lon = np.min(lons)
    buffer = 3

    map = Basemap(projection='merc', resolution='l', 
                  lat_0=mean_lat, lon_0=mean_lon,
                  llcrnrlon=min_lon-buffer, llcrnrlat=min_lat-buffer,
                  urcrnrlon=max_lon+buffer, urcrnrlat=max_lat+buffer)

    map.drawmapboundary(fill_color='darkblue')
    map.fillcontinents(color='white',lake_color='darkblue', zorder=0)
    map.drawcoastlines()

    x, y = map(lons,lats)

    map.scatter(x, y, marker='o',color='r', alpha=0.7, zorder=10)

    plt.show()

In [None]:
plot_stations(50, '7a12fe430a08c938daaaeab98da6a60744de60df8d6159d82fe3a764')

### Leaflet

#### Plot Stations

In [None]:
import matplotlib.pyplot as plt
import mplleaflet
import pandas as pd

def leaflet_plot_stations(binsize, hashid):

    df = pd.read_csv('BinSize_d{}.csv'.format(binsize))

    station_locations_by_hash = df[df['hash'] == hashid]

    lons = station_locations_by_hash['LONGITUDE'].tolist()
    lats = station_locations_by_hash['LATITUDE'].tolist()

    plt.figure(figsize=(8,8))

    plt.scatter(lons, lats, c='r', alpha=0.7, s=200)

    return mplleaflet.display()

In [None]:
leaflet_plot_stations(400, 'ba06c674e1bbe085f70f2cde04a98e78f46a1d7ae56ebdcd2a61c6ff')

#### Plot Hull

In [None]:
from scipy.spatial import ConvexHull
import matplotlib.pyplot as plt
import mplleaflet
import pandas as pd
import numpy as np


def leaflet_plot_hull(binsize, hashid):
    
    df = pd.read_csv('BinSize_d{}.csv'.format(binsize))

    station_locations_by_hash = df[df['hash'] == hashid]

    lons = station_locations_by_hash['LONGITUDE'].tolist()
    lats = station_locations_by_hash['LATITUDE'].tolist()

    points = np.array([lons,lats]).T
    hull = ConvexHull(points)

    plt.figure(figsize=(8,8))

    plt.fill(points[hull.vertices,0], points[hull.vertices,1], 'k', alpha=0.3)

    for simplex in hull.simplices:
        plt.plot(points[simplex, 0], points[simplex, 1], 'k-', lw=5)

    return mplleaflet.display()

In [None]:
leaflet_plot_hull(400, 'ba06c674e1bbe085f70f2cde04a98e78f46a1d7ae56ebdcd2a61c6ff')

## ~Additional~

In [None]:
import pandas as pd
import numpy as np

def reproject(latitude, longitude):
    """Returns the x & y coordinates in meters using a sinusoidal projection"""
    earth_radius = 6371009 # in meters
    lat_dist = np.pi * earth_radius / 180.0

    y = latitude*lat_dist
    x = longitude * lat_dist * np.cos(np.radians(latitude))
    return x, y

def deproject(x,y):
    '''Returns the latitude and longitude coordinates from x and y'''
    earth_radius = 6371009 # in meters
    lat_dist = np.pi * earth_radius / 180.0

    latitude = y / lat_dist
    longitude = x / (np.cos(np.radians(latitude))* lat_dist)
    return latitude, longitude

def latlon_from_xygroup(xygroup):
    '''Returns corner 1 latitude and longitude, and corner 2 latitude and longitude.''' 
    '''xygroup looks like '-6571743.681047112 to -6474410.252330746, 1896264.6172766648 to 1993598.0459930308'''
    left_x = float(xygroup.split()[0])
    right_x = float(xygroup.split()[2][:-1])
    left_y = float(xygroup.split()[3])
    right_y = float(xygroup.split()[5])
    c1_lat, c1_lon = deproject(left_x, left_y)
    c2_lat, c2_lon = deproject(right_x, right_y)
    return c1_lat, c1_lon, c2_lat, c2_lon