In [None]:
import sys 
import os
import glob as glob
import dask.dataframe as dd
import dask.distributed
from dask import delayed
import pandas as pd
import geopandas as gpd
from shapely.geometry import Point
import numpy as np
import pyarrow as pa
import matplotlib.pyplot as plt
import seaborn as sns

```
Name: variables
Purpose: variables used to run different data sets
```

In [None]:
green_2013_2014 = glob.glob(os.path.join(os.getcwd(), 'data2', 'green_tripdata_201[3-4]*.csv'))
columnsGreen =[
    'vendor_id', 'pickup_datetime', 'dropoff_datetime'
    , 'Store_and_fwd_flag', 'RateCodeID', 'pickup_longitude', 'pickup_latitude'
    , 'dropoff_longitude', 'dropoff_latitude', 'Passenger_count', 'Trip_distance'
    , 'Fare_amount', 'Extra', 'MTA_tax', 'Tip_amount', 'Tolls_amount', 'Ehail_fee'
    , 'Total_amount', 'Payment_type', 'Trip_type', 'extra1', 'extra2']

```
Name: ReadData
Purpose: import Yellow Data to Dask DF
```

In [None]:
def ReadData(globString, columns):

    dtypes ={
            'vendor_id': object               , 'passenger_count': object             , 'trip_distance': np.float64
        , 'pickup_longitude': np.float64    , 'pickup_latitude': np.float64         , 'rate_code_id': object
        , 'store_and_fwd_flag': object      , 'dropoff_longitude': np.float64       , 'dropoff_latitude': np.float64
        , 'payment_type': object            , 'fare_amount': np.float64             , 'extra': np.float64
        , 'mta_tax': np.float64             , 'tip_amount': np.float64              , 'tolls_amount': np.float64        
        , 'total_amount': np.float64        , 'extra1': np.float64                  , 'extra2': np.float64
        , 'extra3': np.float64              , 'improvement_surcharge': np.float64}	

    # Glob Import Strings for data files by file structure type
    data = dd.read_csv( globString
                        , header=0
                        , na_values=['NA']
                        , parse_dates=['pickup_datetime', 'dropoff_datetime']
                        , usecols= ['pickup_datetime', 'dropoff_datetime', 'pickup_longitude', 'pickup_latitude','dropoff_longitude', 'dropoff_latitude']
                        #, infer_datetime_format=True
                        , dtype= dtypes
                        , names= columns)
    return data


```
Name: joinLocIDBorough
Purpose: map partition of Loc ID/Borough from Shape to lon lat in csv
```

In [None]:
def joinLocIDBorough(data, longitude, latitude, newVar, ttype):

    dataCopy = data[[longitude, latitude]].copy()
    dataCopy[longitude] = dataCopy[longitude].fillna(value=0.)
    dataCopy[latitude] = dataCopy[latitude].fillna(value=0.)


    local_gdf = gpd.GeoDataFrame(dataCopy, crs={'init': 'epsg:4326'},geometry=[Point(xy) for xy in zip(dataCopy[longitude], dataCopy[latitude])])
    local_gdf = gpd.sjoin(local_gdf, taxiShape, how='left', op='within')

    # Determine Which Feature to add to DataFrame (Borough/Location ID)
    if ttype== 'p':
        return local_gdf.borough.rename(newVar)
    elif ttype== 'd':
        return local_gdf.LocationID.rename(newVar)

```
Name: readShape
Purpose: import Geometry/Weather Files
```

In [None]:
def readShape():
    global taxiShape

    #Fetch Geometry Shape File
    taxiShape = gpd.read_file(os.path.join(os.getcwd(), 'taxi_zones', 'taxi_zones.shp'))
    taxiShape = taxiShape.to_crs({'init': 'epsg:4326'})

```
Name: filterData
Purpose: maps long/lat to LocID/ Filters for MAN->JFK
```

In [None]:
def filterData(data, folderName):

    # sets borough for pickup location
    data['pickup_borough'] = data.map_partitions(
            joinLocIDBorough            # function to join lat/lon with Borough
            , 'pickup_longitude'        # longitude param
            , 'pickup_latitude'         # latitude param
            , 'pickup_borough'          # new variable Name
            , 'p'                       # denotes pickup
            , meta=('pickup_borough', object))
    # sets Location ID for destination
    data['dropoff_location_id'] = data.map_partitions(
            joinLocIDBorough            # function to join lat/lon with Loc ID
            , 'dropoff_longitude'       # longitude param
            , 'dropoff_latitude'        # latitude param
            ,'dropoff_location_id'      # new variable Name
            , 'd'                       # denotes dropoff
            , meta=('dropoff_location_id', np.float64))

    # Modify DateTime->Date/Filter Data to show only Manhattan->JFK
    data = data[data['dropoff_location_id'] == np.float64(132)]
    data = data[data['pickup_borough'] == 'Manhattan']
    
    # Output to Parquet for further processing
    data.to_parquet(os.path.join(os.getcwd(),  'filtered', folderName), engine='pyarrow', compression='GZIP')

```
Name: ReadInData
Purpose: import Modified Data/Weather
```

In [None]:
def ReadInData():

    # Fetch Modified File from Parquet/Modify Date
    filtered = dd.read_parquet(os.path.join(os.getcwd(), 'filtered', 'green_2013_2014'), engine='pyarrow')
    filtered['pickup_datetime'] = filtered['pickup_datetime'].dt.date

    # Fetch NOAA Weather Data File
    weatherData = dd.read_csv(os.path.join(os.getcwd(), 'documentation', 'NOAA_Central_Park_data.csv')
    , usecols=('DATE', 'PRCP'))
    weatherData['DATE'] = dd.to_datetime(weatherData['DATE'])

    return filtered, weatherData

```
Name: filterData
Purpose: filter to Man->JFK trips
```

In [None]:
def filterData(filtered, weather):

    #Create New Aggregated DataFrame
    GroupedDates = filtered[['pickup_datetime']].groupby(['pickup_datetime']).size().reset_index()
    GroupedDates['pickup_datetime'] = dd.to_datetime(GroupedDates['pickup_datetime'])
    GroupedDates.columns = ['pickup_datetime', 'count']

    #Join Aggregated DataFrame with Weather Data
    newDF = dd.merge(GroupedDates, weather, left_on=['pickup_datetime'], right_on=['DATE'])
    newDF.set_index('pickup_datetime')

    return newDF

```
Name: analyzeData
Purpose: find correlation between # of rides and weather
```

In [None]:
def analyzeData(merged):
    
    # Seperate Data into 6 month periods (Winter/Summer)
    winterSet = merged[merged['pickup_datetime'].dt.month.isin([10, 11, 12, 1, 2, 3])]
    summerSet = merged[merged['pickup_datetime'].dt.month.isin([4, 5, 6, 7, 8, 9])]
    
    overallDataSet = merged[['count', 'PRCP']]
    winterDataSet = winterSet[['count', 'PRCP']]
    summerDataSet = summerSet[['count', 'PRCP']]

    corr = overallDataSet.corr(method ='pearson')
    wintercorr = winterDataSet.corr(method ='pearson')
    summercorr = summerDataSet.corr(method ='pearson')
    print('Overall Correlation Matrix:')
    print(corr.head(2))
    print('Winter Correlation Matrix:')
    print(wintercorr.head(2))
    print('Summer Correlation Matrix:')
    print(summercorr.head(2))

    fig = plt.figure(figsize = (30,30))
    ax1 = fig.add_subplot(3,2,1)
    ax2 = fig.add_subplot(3,2,2)
    ax3 = fig.add_subplot(3,2,3)

    sns.heatmap(corr, mask=np.zeros_like(corr, dtype=np.bool), vmin=-0.1, vmax=0.1, cmap=sns.diverging_palette(220, 10, as_cmap=True), square=True, ax=ax1)
    sns.heatmap(wintercorr, mask=np.zeros_like(wintercorr, dtype=np.bool), vmin=-0.1, vmax=0.1, cmap=sns.diverging_palette(220, 10, as_cmap=True), square=True, ax=ax2)
    sns.heatmap(summercorr, mask=np.zeros_like(summercorr, dtype=np.bool), vmin=-0.1, vmax=0.1, cmap=sns.diverging_palette(220, 10, as_cmap=True), square=True, ax=ax3)

```
Name: main
Purpose: run above functions
```

In [None]:
def main(client):

    # data processing 
    data = ReadData(green_2013_2014, columnsGreen)
    readShape()
    filterData(data, 'green_2013_2014')
    
    # data analysis
    filtered, weatherData = ReadInData()
    merged = filterData(filtered, weatherData)
    analyzeData(merged)

if __name__ == "__main__":
    client = dask.distributed.Client()
    main(client)
