## Scenario/Stakeholder Based Analysis of NYC taxi rides data
##### Authors: Panini Mokrala, Dmitrii Danilov

In [None]:
from google.colab import drive
from os.path import join

ROOT = '/content/drive'
PROJ = 'MyDrive/Milestones/Milestone1'

drive.mount(ROOT)
PROJECT_PATH = join(ROOT, PROJ)
%cd "$PROJECT_PATH"
%pwd

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).
/content/drive/MyDrive/Milestones/Milestone1


'/content/drive/MyDrive/Milestones/Milestone1'

In [None]:
!pip install geopandas
!pip install altair_data_server

import io
import json
import pandas as pd
import numpy as np
import geopandas as gpd
from shapely.ops import cascaded_union
from google.cloud import bigquery
from google.oauth2 import service_account
import datetime as dt
import altair as alt

alt.data_transformers.enable('data_server')



DataTransformerRegistry.enable('data_server')

In [None]:
taxi_zones = gpd.read_file('https://s3.amazonaws.com/nyc-tlc/misc/taxi_zones.zip')
taxi_zones.to_crs(epsg=4326, inplace=True)
taxi_zones['centroid_lon'] = taxi_zones['geometry'].centroid.x
taxi_zones['centroid_lat'] = taxi_zones['geometry'].centroid.y

taxi_zones_b = taxi_zones.groupby('borough')['geometry'].agg(lambda x: cascaded_union(x).centroid).to_frame()
taxi_zones_b.columns = ['geometry']
taxi_zones_b.reset_index(inplace=True)
taxi_zones_b['centroid_lon'] = taxi_zones_b['geometry'].centroid.x
taxi_zones_b['centroid_lat'] = taxi_zones_b['geometry'].centroid.y



  This is separate from the ipykernel package so we can avoid doing imports until

  after removing the cwd from sys.path.


In [None]:
tz_geo = json.loads(taxi_zones.to_json())['features']
tz_geo_b = json.loads(taxi_zones_b.to_json())['features']

alt.themes.enable('opaque')

base = alt.Chart(alt.Data(values=tz_geo)).mark_geoshape(
        stroke='black',
        strokeWidth=1
    ).encode(
        color=alt.Color('properties.borough:N', legend=None)
    ).properties(
        width=800,
        height=800
    )

labels = alt.Chart(alt.Data(values=tz_geo)).mark_text(
    baseline='top',
     ).properties(
        width=800,
        height=800
     ).encode(
         longitude='properties.centroid_lon:Q',
         latitude='properties.centroid_lat:Q',
         text='properties.LocationID:O',
         size=alt.value(8),
         opacity=alt.value(1)
     )

boroughs = alt.Chart(alt.Data(values=tz_geo_b)).mark_text(
    color='white',
    stroke='black',
    fontWeight='bold',
    strokeWidth=0.7,
    baseline='top'
     ).properties(
        width=800,
        height=800,
        title=alt.Text(text="NYC boroughs and taxi zones", fontSize=22)
     ).encode(
         longitude='properties.centroid_lon:Q',
         latitude='properties.centroid_lat:Q',
         text='properties.borough:N',
         size=alt.value(26),
         opacity=alt.value(1)
     )


base + labels + boroughs

In [None]:
key_path = 'auth.json'
credentials = service_account.Credentials.from_service_account_file(key_path)
client = bigquery.Client(credentials=credentials, project=credentials.project_id)

sql = '''SELECT dropoff_location_id, count(*) as count
FROM bigquery-public-data.new_york_taxi_trips.tlc_yellow_trips_2017 
where dropoff_datetime between '2017-01-01' and '2018-01-01' 
group by dropoff_location_id;'''
dropoff_2017_df = client.query(sql).to_dataframe()

In [None]:
dropoff_2017_df.rename(columns={'dropoff_location_id': 'LocationID'}, inplace=True)
dropoff_2017_df['LocationID'] = dropoff_2017_df['LocationID'].astype('int64')

dropoff_2017 = taxi_zones.merge(dropoff_2017_df, on='LocationID')
dropoff_2017 = json.loads(dropoff_2017.to_json())['features']

base = alt.Chart(alt.Data(values=dropoff_2017)).mark_geoshape(
        stroke='black',
        strokeWidth=1
    ).encode(
        color=alt.Color('properties.count:Q', scale=alt.Scale(type='log'), legend=alt.Legend(title="Drop-off count"))
    ).properties(
        title=alt.Text(text="NYC taxi drop-off zones popularity", fontSize=22),
        width=800,
        height=800
    )

labels = alt.Chart(alt.Data(values=tz_geo)).mark_text(
    baseline='top',
     ).properties(
        width=800,
        height=800
     ).encode(
         longitude='properties.centroid_lon:Q',
         latitude='properties.centroid_lat:Q',
         text='properties.LocationID:O',
         size=alt.value(8),
         opacity=alt.value(1)
     )

base + labels

In [None]:
key_path = 'auth.json'
credentials = service_account.Credentials.from_service_account_file(key_path)

sql = '''SELECT pickup_location_id, count(*) as count
FROM bigquery-public-data.new_york_taxi_trips.tlc_yellow_trips_2017 
where pickup_datetime between '2017-01-01' and '2018-01-01' 
group by pickup_location_id;'''
pickup_2017_df = client.query(sql).to_dataframe()

In [None]:
pickup_2017_df.rename(columns={'pickup_location_id': 'LocationID'}, inplace=True)
pickup_2017_df['LocationID'] = pickup_2017_df['LocationID'].astype('int64')

pickup_2017 = taxi_zones.merge(pickup_2017_df, on='LocationID')
pickup_2017 = json.loads(pickup_2017.to_json())['features']

base = alt.Chart(alt.Data(values=pickup_2017)).mark_geoshape(
        stroke='black',
        strokeWidth=1
    ).encode(
        color=alt.Color('properties.count:Q', scale=alt.Scale(type='log'), legend=alt.Legend(title="Pickup count"))
    ).properties(
        title=alt.Text(text="NYC taxi pickup zones popularity", fontSize=22),
        width=800,
        height=800
    )
    
labels = alt.Chart(alt.Data(values=tz_geo)).mark_text(
    baseline='top',
     ).properties(
        width=800,
        height=800
     ).encode(
         longitude='properties.centroid_lon:Q',
         latitude='properties.centroid_lat:Q',
         text='properties.LocationID:O',
         size=alt.value(8),
         opacity=alt.value(1)
     )

base + labels

In [None]:
sql = '''
SELECT 
dropoff_location_id, avg(fare_amount) as avg_fare
FROM 
bigquery-public-data.new_york_taxi_trips.tlc_yellow_trips_2018
WHERE dropoff_datetime > '2018-01-01' and dropoff_datetime < '2019-01-01'
AND fare_amount > 0 and fare_amount < 1000
GROUP BY dropoff_location_id;
'''
avg_fare_2018_df = client.query(sql).to_dataframe()

In [None]:
avg_fare_2018_df.rename(columns={'dropoff_location_id': 'LocationID'}, inplace=True)
avg_fare_2018_df['LocationID'] = avg_fare_2018_df['LocationID'].astype('int64')
avg_fare_2018_df['avg_fare'] = avg_fare_2018_df['avg_fare'].astype('float64')

avg_fare_2018 = taxi_zones.merge(avg_fare_2018_df, on='LocationID')
avg_fare_2018 = json.loads(avg_fare_2018.to_json())['features']

base = alt.Chart(alt.Data(values=avg_fare_2018)).mark_geoshape(
        stroke='black',
        strokeWidth=1
    ).encode(
        color=alt.Color('properties.avg_fare:Q', legend=alt.Legend(title="Avg. fare"))
    ).properties(
        title=alt.Text(text="NYC average fare by taxi zone", fontSize=22),
        width=800,
        height=800
    )
    
labels = alt.Chart(alt.Data(values=tz_geo)).mark_text(
    baseline='top',
     ).properties(
        width=800,
        height=800
     ).encode(
         longitude='properties.centroid_lon:Q',
         latitude='properties.centroid_lat:Q',
         text='properties.LocationID:O',
         size=alt.value(8),
         opacity=alt.value(1)
     )

base + labels

### Create weather data tables in BigQuery

In [24]:
key_path = 'auth.json'
credentials = service_account.Credentials.from_service_account_file(key_path)
client = bigquery.Client(credentials=credentials, project=credentials.project_id)

#TODO - change to 'weather'
client.create_dataset('mads-milestone-1.weather_test')

daily_schema = [
    bigquery.SchemaField("datetime", "TIMESTAMP", mode="REQUIRED"),
    bigquery.SchemaField("avg_temp", "FLOAT"),
    bigquery.SchemaField("precip_depth", "FLOAT"),
    bigquery.SchemaField("snow_depth", "FLOAT"),
    bigquery.SchemaField("snow_fall", "FLOAT")
]
# TODO -- change to 'daily'
daily_table_id = 'mads-milestone-1.weather_test.daily_test'

daily_table = bigquery.Table(daily_table_id, schema=daily_schema)
daily_table = client.create_table(daily_table)

hourly_schema = [
    bigquery.SchemaField("datetime", "TIMESTAMP", mode="REQUIRED"),
    bigquery.SchemaField("avg_temp", "FLOAT"),
    bigquery.SchemaField("precip_type", "STRING"),
    bigquery.SchemaField("precip_depth", "FLOAT")
]
# TODO - change to 'hourly' 
hourly_table_id = 'mads-milestone-1.weather_test.hourly_test'

hourly_table = bigquery.Table(hourly_table_id, schema=hourly_schema)
hourly_table = client.create_table(hourly_table)



### Populate weather data tables

In [26]:
weather = pd.read_csv('2378375.csv')
weather_h = weather[weather['REPORT_TYPE'] == 'FM-15']
weather_hdf = weather_h.filter(items=['HourlyDryBulbTemperature', 'HourlyPrecipitation', 'HourlyPresentWeatherType'])
weather_hdf.rename(columns={'HourlyDryBulbTemperature': 'avg_temp',
                           'HourlyPrecipitation': 'precip_depth',
                           'HourlyPresentWeatherType': 'precip_type'}, inplace=True)
weather_hdf.loc[:, 'datetime'] = weather_h['DATE']
weather_hdf['datetime'] = pd.to_datetime(weather_hdf['datetime'])
weather_hdf['precip_depth'].replace(to_replace='T', value=0.0, inplace=True)
weather_hdf['precip_depth'].replace(to_replace='[a-zA-Z]', value='', regex=True, inplace=True)
weather_hdf['avg_temp'].replace(to_replace='[a-zA-Z]', value='', regex=True, inplace=True)
weather_hdf['precip_depth'] = weather_hdf['precip_depth'].astype('double')
weather_hdf['precip_type'] = weather_hdf['precip_type'].astype('str')
weather_hdf['precip_type'] = ['rain' if 'RA' in x else 'snow' if 'SN' in x else np.nan for x in weather_hdf['precip_type']]
weather_hdf['avg_temp'] = weather_hdf['avg_temp'].astype('double')


weather_d = weather[weather['REPORT_TYPE'] == 'SOD  ']
weather_ddf = weather_d.filter(items=['DailyAverageDryBulbTemperature', 'DailyPrecipitation', 'DailySnowDepth', 'DailySnowfall'])
weather_ddf.rename(columns={'DailyAverageDryBulbTemperature': 'avg_temp',
                          'DailyPrecipitation': 'precip_depth',
                          'DailySnowDepth': 'snow_depth',
                          'DailySnowfall': 'snow_fall'}, inplace=True)
weather_ddf.loc[:, 'datetime'] = weather_d['DATE']
weather_ddf['datetime'] = pd.to_datetime(weather_ddf['datetime'])
weather_ddf['precip_depth'].replace(to_replace='T', value=0.0, inplace=True)
weather_ddf['snow_depth'].replace(to_replace='T', value=0.0, inplace=True)
weather_ddf['snow_fall'].replace(to_replace='T', value=0.0, inplace=True)
weather_ddf['precip_depth'] = weather_ddf['precip_depth'].astype('double')
weather_ddf['snow_depth'] = weather_ddf['snow_depth'].astype('double')
weather_ddf['snow_fall'] = weather_ddf['snow_fall'].astype('double')

bg_daily_job = client.load_table_from_dataframe(weather_ddf, daily_table)
bg_daily_job.result()

bg_hourly_job = client.load_table_from_dataframe(weather_hdf, hourly_table)
bg_hourly_job.result()

  interactivity=interactivity, compiler=compiler, result=result)


<google.cloud.bigquery.job.LoadJob at 0x7f16941b4908>

## Caching SQL queries

In [41]:
CACHE_SQL_RESULTS = True

# Dictionary that maps SQL query names to SQL queries
sql_dict = {
    'my_sql': 
'''
SELECT 
dropoff_location_id, avg(fare_amount) as avg_fare
FROM 
bigquery-public-data.new_york_taxi_trips.tlc_yellow_trips_2018
WHERE dropoff_datetime > '2018-01-01' and dropoff_datetime < '2019-01-01'
AND fare_amount > 0 and fare_amount < 1000
GROUP BY dropoff_location_id;
'''
}

def run_cached(sql_name):
  if not CACHE_SQL_RESULTS:
    sql_query = sql_dict[sql_name]
    print('Caching is disabled, querying database...')
    return client.query(sql_query).to_dataframe()
  try:
    print('Reading cached dataframe from pickle...')
    return pd.read_pickle(''.join(['./cache/', sql_name, '.gz']))
  except FileNotFoundError:
    print('Cached dataframe not found, querying database..')
    sql_query = sql_dict[sql_name]
    df = client.query(sql_query).to_dataframe()
    print('Caching resulting dataframe...')
    df.to_pickle(''.join(['./cache/', sql_name, '.gz']))
    print('Cached dataframe saved to file')
    return df


In [44]:
run_cached('my_sql')

Reading cached dataframe from pickle...


Unnamed: 0,dropoff_location_id,avg_fare
0,264,12.59518806
1,143,10.243478579
2,134,28.880852854
3,17,22.80154518
4,138,32.54891797
...,...,...
259,204,78.701125402
260,6,43.403077598
261,167,25.849132622
262,184,39.062620773
