In [68]:
sc

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

<SparkContext master=yarn appName=livy-session-1>

In [69]:
sc.list_packages()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Package                    Version
-------------------------- -------
beautifulsoup4             4.8.1  
boto                       2.49.0 
cycler                     0.10.0 
descartes                  1.1.0  
jmespath                   0.9.4  
kiwisolver                 1.1.0  
lxml                       4.4.2  
matplotlib                 3.2.1  
mysqlclient                1.4.6  
nltk                       3.4.5  
nose                       1.3.4  
numpy                      1.14.5 
pandas                     1.0.3  
pip                        20.0.2 
py-dateutil                2.2    
pyparsing                  2.4.6  
pyshp                      2.1.0  
python-dateutil            2.8.1  
python36-sagemaker-pyspark 1.2.6  
pytz                       2019.3 
PyYAML                     3.11   
setuptools                 46.0.0 
Shapely                    1.7.0  
six                        1.13.0 
soupsieve                  1.9.5  
SQLAlchemy                 1.3.15 
wheel               

#### step 1: load raw data from s3 and do some basic clean and polishing 

In [70]:
from pyspark.sql.types import *
from pyspark.sql import SparkSession
import pyspark.sql.functions as f
import warnings; warnings.simplefilter('ignore')
import datetime

# config spark to build context
spark=SparkSession.builder.appName("NYC.TLC-Green-Car-Data-ETL").getOrCreate()
spark.conf.set("spark.sql.session.timeZone", "America/New_York")

# read in our raw dataset
# add new column :pickup_hour
# add new column :dropoff_hour
# add new column :duration
# add new column :minute_rate
# add new column :average_speed

# remove colum :store_and_fwd_flag
# remove colum :ehail_fee
# remove colum :congestion_surcharge

df = spark.read.option('header', 'true') \
    .option('mode', 'FAILFAST') \
    .option('timeStampFormat', 'yyyy-MM-dd HH:mm:ss') \
    .option('columnNameOfCorruptRecord', 'error') \
    .csv('s3://data-etl-o-original-raw/green/*.csv') \
    .filter(f.year('lpep_pickup_datetime') == 2019) \
    .withColumn('trip_distance',f.col('trip_distance').cast(DoubleType())) \
    .withColumn('pickup_hour',f.hour('lpep_pickup_datetime')) \
    .withColumn('dropoff_hour',f.hour('lpep_dropoff_datetime')) \
    .withColumn('duration', f.unix_timestamp('lpep_dropoff_datetime') - f.unix_timestamp('lpep_pickup_datetime')) \
    .withColumn('minute_rate',f.when(f.col('total_amount') <= 0.001,0).otherwise(f.col('total_amount')/f.col('duration') * 60)) \
    .withColumn('average_speed',f.when(f.col('trip_distance') < 0.001,0).otherwise(f.col('trip_distance')/f.col('duration') * 60 * 60)) \
    .drop('store_and_fwd_flag') \
    .drop('ehail_fee') \
    .drop('congestion_surcharge')


VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [71]:
df.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- VendorID: string (nullable = true)
 |-- lpep_pickup_datetime: string (nullable = true)
 |-- lpep_dropoff_datetime: string (nullable = true)
 |-- RatecodeID: string (nullable = true)
 |-- PULocationID: string (nullable = true)
 |-- DOLocationID: string (nullable = true)
 |-- passenger_count: string (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- fare_amount: string (nullable = true)
 |-- extra: string (nullable = true)
 |-- mta_tax: string (nullable = true)
 |-- tip_amount: string (nullable = true)
 |-- tolls_amount: string (nullable = true)
 |-- improvement_surcharge: string (nullable = true)
 |-- total_amount: string (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- trip_type: string (nullable = true)
 |-- pickup_hour: integer (nullable = true)
 |-- dropoff_hour: integer (nullable = true)
 |-- duration: long (nullable = true)
 |-- minute_rate: double (nullable = true)
 |-- average_speed: double (nullable = true)

In [72]:
sc.install_pypi_package("pandas")
sc.install_pypi_package("pyshp")
sc.install_pypi_package("shapely")
sc.install_pypi_package("descartes")
sc.install_pypi_package("sqlalchemy")

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

Package already installed for current Spark context!
Traceback (most recent call last):
  File "/usr/lib/spark/python/lib/pyspark.zip/pyspark/context.py", line 1110, in install_pypi_package
    raise ValueError("Package already installed for current Spark context!")
ValueError: Package already installed for current Spark context!



In [73]:
import pandas as pd
import numpy as np
import urllib.request
import zipfile
import random
import itertools
import math
import shapefile
from shapely.geometry import Polygon
from descartes.patch import PolygonPatch
import matplotlib as mpl
import matplotlib.pyplot as plt
plt.style.use('ggplot')

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

#### step 2: cache data im memory for comparing some key properties

In [74]:
dfcache = df.cache()

# dfcache
## check schema structure
df.printSchema(),dfcache.printSchema()

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

root
 |-- VendorID: string (nullable = true)
 |-- lpep_pickup_datetime: string (nullable = true)
 |-- lpep_dropoff_datetime: string (nullable = true)
 |-- RatecodeID: string (nullable = true)
 |-- PULocationID: string (nullable = true)
 |-- DOLocationID: string (nullable = true)
 |-- passenger_count: string (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- fare_amount: string (nullable = true)
 |-- extra: string (nullable = true)
 |-- mta_tax: string (nullable = true)
 |-- tip_amount: string (nullable = true)
 |-- tolls_amount: string (nullable = true)
 |-- improvement_surcharge: string (nullable = true)
 |-- total_amount: string (nullable = true)
 |-- payment_type: string (nullable = true)
 |-- trip_type: string (nullable = true)
 |-- pickup_hour: integer (nullable = true)
 |-- dropoff_hour: integer (nullable = true)
 |-- duration: long (nullable = true)
 |-- minute_rate: double (nullable = true)
 |-- average_speed: double (nullable = true)

root
 |-- VendorID: strin

### step 3: processing further polishing and transformation

In [None]:
## fix colums with regular names

for c in df.columns:
    df = df.withColumnRenamed(c, c.replace(' ', '_'))

## polish data

## remove data which is tooooo small and ilegal value
df = df.filter(df['trip_distance'] >= 0.001)

## round minute_rate and average_speed to scale 3 

df = df.withColumn('minute_rate', f.round(df['minute_rate'],4))

df = df.withColumn('average_speed', f.round(df['average_speed'],4))

df = df.withColumn('average_speed', f.round(df['trip_distance'],4))

# rename 'VendorID'
# df = df.withColumnRenamed('VendorID', 'vendor_id')

# rename passenger_count to pcount shortly
df = df.withColumnRenamed('passenger_count', 'p_count').withColumn('p_count',f.col('p_count').cast(IntegerType())) 

# df.show(10)

df.select(['VendorID', 'lpep_pickup_datetime', 'lpep_dropoff_datetime', 'duration','total_amount','trip_distance','minute_rate','average_speed']) \
.show(10)

## Write Data Back to S3 using AWS API
# currenttime = datetime.datetime.now()

# df.write.option('compression', 'snappy') \
#    .save( "s3://data-etl-o-target-0/result/green/green_car_data_processed_{date}/".format(date=currenttime.strftime('%Y%m%d%H%M%s')))

VBox()

FloatProgress(value=0.0, bar_style='info', description='Progress:', layout=Layout(height='25px', width='50%'),…

In [None]:
## caculate and comparing the count of the target data 

df.columns,dfcache.columns

df.count(), dfcache.count()

#### step 4: load taxi zone data from s3 for future merging using

In [None]:
#read in our raw dataset
dfz = spark.read.option('header', 'true') \
    .option('mode', 'FAILFAST') \
    .option('columnNameOfCorruptRecord', 'error') \
    .csv('s3://data-etl-o-original-raw/zone/nyc.tlc.taxi_zone_lookup.csv')

dfzcache = dfz.cache()

# dfz.count(),dfz.columns

dfz.show(20)

In [None]:
tmp = df.select(['VendorID','PULocationID', 'DOLocationID', 'p_count', 'lpep_pickup_datetime', 'lpep_dropoff_datetime', 'duration','total_amount','trip_distance','minute_rate','average_speed'])
tmp.show(20)

#### step 5: handle taxi_zones shape files for further UI visualization

In [None]:
## download files from internets to prepare data VI

import zipfile
import os

## define dir to use
nyc_data_dir_path = '/tmp/nyc'

if not os.path.exists(nyc_data_dir_path):
    os.makedirs(nyc_data_dir_path)

target_shapefile_path = '/tmp/nyc/taxi_zones.zip'

if not os.path.exists(target_shapefile_path):  
    urllib.request.urlretrieve("https://s3.amazonaws.com/nyc-tlc/misc/taxi_zones.zip", target_shapefile_path)

target_shapefile_dir = '/tmp/nyc/shape'
if not os.path.exists(target_shapefile_dir): 
    with zipfile.ZipFile(target_shapefile_path,'r') as zip_ref:
        zip_ref.extractall(target_shapefile_dir)

In [None]:
# define processing to extract LocationID、longitude、latitude

In [None]:
# define proc to extract "LocationID", "longitude", "latitude"

def get_lat_lon(sf,shp_dic):
    content = []
    for sr in sf.shapeRecords():
        shape = sr.shape
        rec = sr.record
        loc_id = rec[shp_dic['LocationID']]
        x = (shape.bbox[0]+shape.bbox[2])/2
        y = (shape.bbox[1]+shape.bbox[3])/2
        content.append((loc_id, x, y))
    return pd.DataFrame(content, columns=["LocationID", "longitude", "latitude"]).replace(np.NaN, '')


In [None]:
## read previous file we download and extracted to /tmp/nyc/shape/
sf = shapefile.Reader("/tmp/nyc/shape/taxi_zones.shp")
fields_name = [field[0] for field in sf.fields[1:]]
shp_dic = dict(zip(fields_name, list(range(len(fields_name)))))
attributes = sf.records()
shp_attr = [dict(zip(fields_name, attr)) for attr in attributes]

df_loc = pd.DataFrame(shp_attr).join(get_lat_lon(sf,shp_dic).set_index("LocationID"), on="LocationID")

## df_loc.head()
## ported to spark to analyze
spark_dfl = spark.createDataFrame(df_loc)
spark_dfl.show(10)

### define processing for draw region map and draw zone map by plot

In [None]:
def draw_zone_map(ax, sf, heat={}, text=[], arrows=[]):
    continent = [235/256, 151/256, 78/256]
    ocean = (89/256, 171/256, 227/256)
    theta = np.linspace(0, 2*np.pi, len(text)+1).tolist()
    ax.set_facecolor(ocean)
    
    # colorbar
    if len(heat) != 0:
        norm = mpl.colors.Normalize(vmin=min(heat.values()),vmax=max(heat.values())) #norm = mpl.colors.LogNorm(vmin=1,vmax=max(heat))
        cm=plt.get_cmap('Reds')
        sm = plt.cm.ScalarMappable(cmap=cm, norm=norm)
        sm.set_array([])
        plt.colorbar(sm, ticks=np.linspace(min(heat.values()),max(heat.values()),8),
                     boundaries=np.arange(min(heat.values())-10,max(heat.values())+10,.1))
    
    for sr in sf.shapeRecords():
        shape = sr.shape
        rec = sr.record
        loc_id = rec[shp_dic['LocationID']]
        zone = rec[shp_dic['zone']]
        
        if len(heat) == 0:
            col = continent
        else:
            if loc_id not in heat:
                R,G,B,A = cm(norm(0))
            else:
                R,G,B,A = cm(norm(heat[loc_id]))
            col = [R,G,B]

        # check number of parts (could use MultiPolygon class of shapely?)
        nparts = len(shape.parts) # total parts
        if nparts == 1:
            polygon = Polygon(shape.points)
            patch = PolygonPatch(polygon, facecolor=col, alpha=1.0, zorder=2)
            ax.add_patch(patch)
        else: # loop over parts of each shape, plot separately
            for ip in range(nparts): # loop over parts, plot separately
                i0 = shape.parts[ip]
                if ip < nparts-1:
                    i1 = shape.parts[ip+1]-1
                else:
                    i1 = len(shape.points)

                polygon = Polygon(shape.points[i0:i1+1])
                patch = PolygonPatch(polygon, facecolor=col, alpha=1.0, zorder=2)
                ax.add_patch(patch)
        
        x = (shape.bbox[0]+shape.bbox[2])/2
        y = (shape.bbox[1]+shape.bbox[3])/2
        if (len(text) == 0 and rec[shp_dic['Shape_Area']] > 0.0001):
            plt.text(x, y, str(loc_id), horizontalalignment='center', verticalalignment='center')            
        elif len(text) != 0 and loc_id in text:
            #plt.text(x+0.01, y-0.01, str(loc_id), fontsize=12, color="white", bbox=dict(facecolor='black', alpha=0.5))
            eta_x = 0.05*np.cos(theta[text.index(loc_id)])
            eta_y = 0.05*np.sin(theta[text.index(loc_id)])
            ax.annotate("[{}] {}".format(loc_id, zone), xy=(x, y), xytext=(x+eta_x, y+eta_y),
                        bbox=dict(facecolor='black', alpha=0.5), color="white", fontsize=12,
                        arrowprops=dict(facecolor='black', width=3, shrink=0.05))
    if len(arrows)!=0:
        for arr in arrows:
            ax.annotate('', xy = arr['dest'], xytext = arr['src'], size = arr['cnt'],
                    arrowprops=dict(arrowstyle="fancy", fc="0.6", ec="none"))
    
    # display
    limits = get_boundaries(sf)
    plt.xlim(limits[0], limits[1])
    plt.ylim(limits[2], limits[3])

# Draw Borough region

def draw_region_map(ax, sf, heat={}):
    continent = [235/256, 151/256, 78/256]
    ocean = (89/256, 171/256, 227/256)    
    
    reg_list={'Staten Island':1, 'Queens':2, 'Bronx':3, 'Manhattan':4, 'EWR':5, 'Brooklyn':6}
    reg_x = {'Staten Island':[], 'Queens':[], 'Bronx':[], 'Manhattan':[], 'EWR':[], 'Brooklyn':[]}
    reg_y = {'Staten Island':[], 'Queens':[], 'Bronx':[], 'Manhattan':[], 'EWR':[], 'Brooklyn':[]}
    
    # colorbar
    if len(heat) != 0:
        norm = mpl.colors.Normalize(vmin=math.sqrt(min(heat.values())), vmax=math.sqrt(max(heat.values()))) #norm = mpl.colors.LogNorm(vmin=1,vmax=max(heat))
        cm=plt.get_cmap('Reds')
        #sm = plt.cm.ScalarMappable(cmap=cm, norm=norm)
        #sm.set_array([])
        #plt.colorbar(sm, ticks=np.linspace(min(heat.values()),max(heat.values()),8), \
        #             boundaries=np.arange(min(heat.values())-10,max(heat.values())+10,.1))
    
    ax.set_facecolor(ocean)
    for sr in sf.shapeRecords():
        shape = sr.shape
        rec = sr.record
        reg_name = rec[shp_dic['borough']]
        
        if len(heat) == 0:
            norm = mpl.colors.Normalize(vmin=1,vmax=6) #norm = mpl.colors.LogNorm(vmin=1,vmax=max(heat))
            cm=plt.get_cmap('Pastel1')
            R,G,B,A = cm(norm(reg_list[reg_name]))
            col = [R,G,B]
        else:
            R,G,B,A = cm(norm(math.sqrt(heat[reg_name])))
            col = [R,G,B]
            
        # check number of parts (could use MultiPolygon class of shapely?)
        nparts = len(shape.parts) # total parts
        if nparts == 1:
            polygon = Polygon(shape.points)
            patch = PolygonPatch(polygon, facecolor=col, alpha=1.0, zorder=2)
            ax.add_patch(patch)
        else: # loop over parts of each shape, plot separately
            for ip in range(nparts): # loop over parts, plot separately
                i0 = shape.parts[ip]
                if ip < nparts-1:
                    i1 = shape.parts[ip+1]-1
                else:
                    i1 = len(shape.points)

                polygon = Polygon(shape.points[i0:i1+1])
                patch = PolygonPatch(polygon, facecolor=col, alpha=1.0, zorder=2)
                ax.add_patch(patch)
                
        reg_x[reg_name].append((shape.bbox[0]+shape.bbox[2])/2)
        reg_y[reg_name].append((shape.bbox[1]+shape.bbox[3])/2)
        
    for k in reg_list:
        if len(heat)==0:
            plt.text(np.mean(reg_x[k]), np.mean(reg_y[k]), k, horizontalalignment='center', verticalalignment='center',
                        bbox=dict(facecolor='black', alpha=0.5), color="white", fontsize=12)     
        else:
            plt.text(np.mean(reg_x[k]), np.mean(reg_y[k]), "{}\n({}K)".format(k, heat[k]/1000), horizontalalignment='center', 
                     verticalalignment='center',bbox=dict(facecolor='black', alpha=0.5), color="white", fontsize=12)       

    # display
    limits = get_boundaries(sf)
    plt.xlim(limits[0], limits[1])
    plt.ylim(limits[2], limits[3])

# get boundaries of zone
# %matplotlib inline

def get_boundaries(sf):
    lat, lon = [], []
    for shape in list(sf.iterShapes()):
        lat.extend([shape.bbox[0], shape.bbox[2]])
        lon.extend([shape.bbox[1], shape.bbox[3]])

    margin = 0.01 # buffer to add to the range
    lat_min = min(lat) - margin
    lat_max = max(lat) + margin
    lon_min = min(lon) - margin
    lon_max = max(lon) + margin

    return lat_min, lat_max, lon_min, lon_max

In [None]:
# plot Borough and zones
fig, ax = plt.subplots(nrows=1, ncols=2, figsize=(15,8))
ax = plt.subplot(1, 2, 1)
ax.set_title("Borough Area in NYC")
draw_region_map(ax, sf)

ax = plt.subplot(1, 2, 2)
ax.set_title("Zones in NYC")
draw_zone_map(ax, sf)

%matplot plt

### for business trending ,we need to exploring some data from merged data

### Q1: Which zone have most pickups and drop-offs?

In [None]:
## count().orderBy('count', ascending=False).limit(10)

df_pu = df.select(f.col("PULocationID").alias("LocationID")).groupby('LocationID').count().withColumnRenamed('count', 'pu_count')
df_do = df.select(f.col("DOLocationID").alias("LocationID")).groupby('LocationID').count().withColumnRenamed('count', 'do_count')

## use taxi zone talbe
joined1 = df_pu.join(df_do, 'LocationID', 'left').withColumn('total_count', df_pu.pu_count + df_do.do_count)

joined1.show(5)

In [None]:
dfz.show(5)

In [None]:
# order tables
joined2 = joined1.join(dfz,'LocationID', 'left') \
    .withColumn("LocationID", f.col('LocationID').cast(IntegerType())) \
    .orderBy('LocationID')

joined2.show(10)

In [None]:
# joined2.select('*').limit(10).show(10)
# joinedtemplate = joined2.createOrReplaceTempView("joinedtemplate")
#df_putp5 = joined2.orderBy('pu_count', ascending=False).select('*').limit(5)

df_putp5 = joined2.orderBy('pu_count', ascending=False).limit(5)
df_putp5.show()

In [None]:

#df_dotp5 = joined2.orderBy('do_count', ascending=False).select(['LocationID','do_count','pu_count','Borough','Zone'])

df_dotp5 = joined2.orderBy('do_count', ascending=False).limit(5)

df_dotp5.show()


In [None]:
# get UI parameters

# [Row(LocationID=74, pu_count=457848), Row(LocationID=75, pu_count=368752), Row(LocationID=41, pu_count=333623)]
#list_borough =  [row.LocationID for row in joined2.select(['LocationID','pu_count']).collect()]

list_borough = joined2.select("borough").rdd.flatMap(lambda x: x).collect()
list_pu_count = joined2.select("pu_count").rdd.flatMap(lambda x: x).collect()
list_do_count = joined2.select("do_count").rdd.flatMap(lambda x: x).collect()

#list_LocationID[1],list_pu_count[1]
# get the pick mount from different zone 

#pu_mount_list = dict(zip(list_borough, list_pu_count))
pu_mount_list= joined2.select(['borough','pu_count']).rdd.collectAsMap()

list_index_df_putp5 = df_putp5.select("LocationID").rdd.flatMap(lambda x: x).collect()

#do_mount_list = dict(zip(list_borough, list_do_count))
do_mount_list= joined2.select(['borough','do_count']).rdd.collectAsMap()

list_index_df_dotp5 = df_dotp5.select("LocationID").rdd.flatMap(lambda x: x).collect()

pu_mount_list,do_mount_list,list_index_df_putp5,list_index_df_dotp5

In [None]:
fig, ax = plt.subplots(nrows=1, ncols=2, figsize=(18,8))
ax = plt.subplot(1, 2, 1)
ax.set_title("Boroughs with most pick-ups")
#draw_zone_map(ax, sf, heat = pu_mount_list,text=list_index_df_putp5)
draw_region_map(ax, sf, heat = pu_mount_list)
ax = plt.subplot(1, 2, 2)
ax.set_title("Boroughs with most drop-offs")
#draw_zone_map(ax, sf, heat = do_mount_list,text=list_index_df_dotp5)
draw_region_map(ax, sf, heat = do_mount_list)
%matplot plt

In [None]:
#df_do = df.select(f.col("DOLocationID").alias("LocationID")).groupby('LocationID').count().withColumnRenamed('count', 'do_count')

df_pu_busy_hour = df.select(f.col('pickup_hour').alias('time')).groupby('time').count().withColumnRenamed('count','pu_count')
df_do_busy_hour = df.select(f.col('dropoff_hour').alias('time')).groupby('time').count().withColumnRenamed('count','do_count')

df_busy_hours = df_pu_busy_hour.join(df_do_busy_hour, on='time').orderBy('time')

df_busy_hours.show(24)

In [None]:
list_hours_time = df_busy_hours.select('time').rdd.flatMap(lambda x: x).collect()
list_pu_hours_datas = df_busy_hours.select('pu_count').rdd.flatMap(lambda x: x).collect()
list_do_hours_datas = df_busy_hours.select('do_count').rdd.flatMap(lambda x: x).collect()

list_hours_time,list_pu_hours_datas,list_do_hours_datas

In [None]:
#ax = df_q2.plot(x='time', y=['Pick-ups', 'Drop-offs'], kind='line', style="-o", figsize=(15,1))

fig, ax = plt.subplots(figsize=(12,5))
ax.set_title('peak hours in 2019')

ax.plot(list_hours_time, list_pu_hours_datas, '-o', label='Pick-ups')

ax.plot(list_hours_time, list_do_hours_datas, '-o', label='Drop-offs')

ax.set_xlabel('time')
ax.set_ylabel('count')


ax.legend()

plt.show()

%matplot plt

### Q3: What are the differences between short and long distance trips of taking taxi?

In [None]:
# avoid distance less than 0

#df_dist = pd.read_sql_query('SELECT trip_distance FROM table_record WHERE trip_distance > 0', nyc_database)
#df_dist['trip_distance'].describe()

# get all distance

df_trip_distance = df.where(f.col('trip_distance') > 0.0001).select('trip_distance')

df_trip_distance.show(10)

In [None]:
df_trip_distance.printSchema()

In [None]:
#from decimal import Decimal
# lambda x: round(Decimal(x),3)

list_trip_distance = df_trip_distance.select('trip_distance').rdd.flatMap(lambda x: x).collect()

for i in range(10) :
    list_trip_distance[i]

In [None]:
df_trip_distance.printSchema()

In [None]:

fig, ax = plt.subplots(figsize=(15,5))

ax.set_title('trip distance (miles)')

ax.set_yscale('log')
ax.set_xlabel('trip distance (miles)')
ax.set_ylabel('total distant count')
ax.hist(list_trip_distance,bins=30,label='trip hist')

ax.legend()
plt.show()

%matplot plt

In [None]:
df_q3_short_count = df.where(f.col('trip_distance') < 30 ).where(f.col('trip_distance') > 0).select('trip_distance').count()

df_q3_long_count = df.where(f.col('trip_distance') >= 30 ).select('trip_distance').count()

df_q3_short_count,df_q3_long_count

In [None]:
df_short_count = df.where(f.col('trip_distance') < 30 ).where(f.col('trip_distance') > 0) \
    .select(['pickup_hour','dropoff_hour']) \
    .groupBy(['pickup_hour','dropoff_hour']) \
    .count() \
    .withColumnRenamed('count','count(short trip)') \
    .withColumnRenamed('pickup_hour','pickup_time') \
    .withColumnRenamed('dropoff_hour','dropoff_time') 
df_long_count = df.where(f.col('trip_distance') > 30 ) \
    .select(['pickup_hour','dropoff_hour']) \
    .groupBy(['pickup_hour','dropoff_hour']) \
    .count() \
    .withColumnRenamed('count','count(long trip)') \
    .withColumnRenamed('pickup_hour','pickup_time') \
    .withColumnRenamed('dropoff_hour','dropoff_time') 

In [None]:
df_short_count.show()

In [None]:
df_long_count.show()

In [None]:
#df_short_count = df_short_count.
df_trip_hours_theta = df_long_count.join(df_short_count,['pickup_time','dropoff_time'])
df_trip_hours_theta.show(10)


In [None]:
df_trip_hours_theta_pu  = df_trip_hours_theta.groupBy('pickup_time').agg({'count(short trip)': 'sum', 'count(long trip)':'sum'}).orderBy('pickup_time')
df_trip_hours_theta_pu.show(10)


In [None]:
df_trip_hours_theta_do = df_trip_hours_theta.groupBy('dropoff_time').agg({'count(short trip)': 'sum', 'count(long trip)':'sum'}).orderBy('dropoff_time')
df_trip_hours_theta_do.show(10)

In [None]:
df_trip_hours_theta_do.printSchema()

In [None]:
# render clock time

def plt_clock(ax, radii, title, color):
    
    N = 24
    bottom = 2
    # create theta for 24 hours
    theta = np.linspace(0.0, 2 * np.pi, N, endpoint=False)

    # width of each bin on the plot
    width = (2*np.pi) / N   
    bars = ax.bar(theta, radii, width=width, bottom=bottom, color=color, edgecolor="#999999")

    # set the lable go clockwise and start from the top
    ax.set_theta_zero_location("N")
    # clockwise
    ax.set_theta_direction(-1)

    # set the label
    ax.set_xticks(theta)
    ticks = ["{}:00".format(x) for x in range(24)]
    ax.set_xticklabels(ticks)
    ax.set_title(title)

In [None]:
#setup 4 ax plates and plot them

fig, ax = plt.subplots(nrows=2, ncols=2, figsize=(18,18))

df_trip_count_list = df_trip_hours_theta_pu.select('sum(count(short trip))').rdd.flatMap(lambda x: x).collect()
ax = plt.subplot(2,2,1, polar=True)    
plt_clock(ax, df_trip_count_list, "Pickup Time for Short Trips in 2019", "#dc143c")

df_trip_count_list = df_trip_hours_theta_pu.select('sum(count(long trip))').rdd.flatMap(lambda x: x).collect()
ax = plt.subplot(2,2,2, polar=True)
plt_clock(ax, df_trip_count_list, 'Pickup Time for Long Trips in 2019', "#56B4E9")

df_trip_count_list = df_trip_hours_theta_do.select('sum(count(short trip))').rdd.flatMap(lambda x: x).collect()
ax = plt.subplot(2,2,3, polar=True)
plt_clock(ax, df_trip_count_list, 'Dropoff Time for Short Trips in 2019', "#dc143c")

df_trip_count_list = df_trip_hours_theta_do.select('sum(count(long trip))').rdd.flatMap(lambda x: x).collect()
ax = plt.subplot(2,2,4, polar=True)
plt_clock(ax, df_trip_count_list, 'Dropoff Time for Long Trips in 2019', "#56B4E9")

%matplot plt

### Q4 Which Zones with most pickups for Long or short Trips

In [None]:
sc

In [None]:
df_short_zone = df.where(f.col('trip_distance') < 30 ).where(f.col('trip_distance') > 0) \
    .select(['PULocationID','DOLocationID']) \
    .groupBy(['PULocationID','DOLocationID']) \
    .count() \
    .withColumnRenamed('count','count(short trip)')
df_long_zone = df.where(f.col('trip_distance') >= 30 ) \
    .select(['PULocationID','DOLocationID']) \
    .groupBy(['PULocationID','DOLocationID']) \
    .count() \
    .withColumnRenamed('count','count(long trip)')
#    .withColumnRenamed('count','count(short trip)') \
#    .withColumnRenamed('pickup_hour','pickup_time') \
#    .withColumnRenamed('dropoff_hour','dropoff_time') 

In [None]:
df_long_zone.show(5)

In [None]:
df_long_zone.printSchema()

In [None]:
df_short_zone.show(5)

In [None]:
df_two_zone_counts = df_short_zone \
    .join(df_long_zone,on=['PULocationID', 'DOLocationID']) \
    .join(dfz,f.col('PULocationID') == dfz.LocationID) \
    .drop('service_zone','LocationID') \
    .withColumnRenamed('Borough','From_Borough') \
    .withColumnRenamed('Zone','From_Zone') 

df_two_zone_counts.show(5)

In [None]:
dfz.printSchema()

In [None]:
df_two_zone_all = df_two_zone_counts.join(dfz,df_two_zone_counts.DOLocationID == dfz.LocationID ) \
    .drop('LocationID') \
    .withColumnRenamed('Borough','To_Borough') \
    .withColumnRenamed('Zone','To_Zone') \
    .withColumnRenamed('count(short trip)','short_trip') \
    .withColumnRenamed('count(long trip)','long_trip') \
    .withColumn('PULocationID', f.col('PULocationID').cast(LongType())) \
    .withColumn('DOLocationID', f.col('DOLocationID').cast(LongType()))
#df_two_zone_all.show(5)
df_two_zone_all.printSchema()

In [None]:
df_long_tip_top5 = df_two_zone_all.select(['long_trip','From_Zone','To_Zone']).orderBy(f.desc('long_trip'))
df_long_tip_top5.show(5)

In [None]:
## agg the count between 2 zones
df_two_zone_pu = df_two_zone_all.groupBy('PULocationID') \
    .agg({'short_trip':'sum', 'long_trip':'sum'}) \
    .withColumnRenamed('sum(long_trip)','long_trip') \
    .withColumnRenamed('sum(short_trip)','short_trip') 
df_two_zone_do = df_two_zone_all.groupBy('DOLocationID') \
    .agg({'short_trip':'sum', 'long_trip':'sum'}) \
    .withColumnRenamed('sum(long_trip)','long_trip') \
    .withColumnRenamed('sum(short_trip)','short_trip') 

In [None]:
#pu_mount_list = dict(zip(list_borough, list_pu_count))
df_two_zone_pu_short_dic =df_two_zone_pu.select(['PULocationID','short_trip']).rdd.collectAsMap()
df_two_zone_pu_long_dic =df_two_zone_pu.select(['PULocationID','long_trip']).rdd.collectAsMap()

df_two_zone_do_short_dic =df_two_zone_do.select(['DOLocationID','short_trip']).rdd.collectAsMap()
df_two_zone_do_long_dic =df_two_zone_do.select(['DOLocationID','long_trip']).rdd.collectAsMap()

#df_two_zone_pu_short_dic,df_two_zone_pu_long_dic,df_two_zone_do_short_dic,df_two_zone_do_long_dic

In [None]:
df_two_zone_pu.printSchema()
#shp_dic
#df_two_zone_pu_short_dic
#,df_two_zone_pu_long_dic,df_two_zone_do_short_dic,df_two_zone_do_long_dic

In [None]:
# get top 5 data list to plot
df_two_zone_pu_short = df_two_zone_pu.orderBy(f.desc('short_trip')).limit(5)
df_two_zone_pu_long = df_two_zone_pu.orderBy(f.desc('long_trip')).limit(5)

# get top 5 data list to plot
df_two_zone_do_short = df_two_zone_do.orderBy(f.desc('short_trip')).limit(5)
df_two_zone_do_long = df_two_zone_do.orderBy(f.desc('long_trip')).limit(5)

In [None]:
# PUcount_short = dict(zip(df_q3_PU['PULocationID'].tolist(), df_q3_PU['short trips'].tolist()))

df_two_zone_pu_short_index_list = df_two_zone_pu_short.select('PULocationID').rdd.flatMap(lambda x : x).collect()
df_two_zone_pu_long_index_list = df_two_zone_pu_long.select('PULocationID').rdd.flatMap(lambda x : x).collect()

df_two_zone_do_short_index_list = df_two_zone_do_short.select('DOLocationID').rdd.flatMap(lambda x : x).collect()
df_two_zone_do_long_index_list = df_two_zone_do_long.select('DOLocationID').rdd.flatMap(lambda x : x).collect()

df_two_zone_pu_short_index_list,df_two_zone_pu_long_index_list,df_two_zone_do_short_index_list,df_two_zone_do_long_index_list

In [None]:
# plot data 

fig, ax = plt.subplots(nrows=2, ncols=2, figsize=(18,18))
ax = plt.subplot(2, 2, 1)
ax.set_title("Zones with most pickups for Short Trips")
draw_zone_map(ax, sf, heat=df_two_zone_pu_short_dic, text=df_two_zone_pu_short_index_list)
#draw_region_map(ax, sf, heat=df_two_zone_pu_short_dic)
ax = plt.subplot(2, 2, 2)
ax.set_title("Zones with most pickups for Long Trips")
draw_zone_map(ax, sf, heat=df_two_zone_pu_long_dic, text=df_two_zone_pu_long_index_list)
ax = plt.subplot(2, 2, 3)
ax.set_title("Zones with most drop-offs for Short Trips")
draw_zone_map(ax, sf, heat=df_two_zone_do_short_dic, text=df_two_zone_do_short_index_list)
ax = plt.subplot(2, 2, 4)
ax.set_title("Zones with most drop-offs for Long Trips")
draw_zone_map(ax, sf, heat=df_two_zone_do_long_dic, text=df_two_zone_do_long_index_list)

%matplot plt

In [None]:
# this exploring more 
df.printSchema()

In [None]:
def filterAndCaculateTargetFields(target_field_attr):    
    #handle short trip 
    #.where(f.col('p_count') >=1 ) \
    df_short_trips_groups = df.where(f.col('trip_distance') < 30 ).where(f.col('trip_distance') > 0) \
            .select(target_field_attr) \
            .groupBy(target_field_attr) \
            .agg({target_field_attr:'sum'}) \
            .withColumnRenamed('sum('+ target_field_attr +')','short_trip_count') 

    # hndle long trip
    # filter target trip and begain to handle long trip
    df_long_trips_groups = df.where(f.col('trip_distance') >= 30 ) \
        .select(target_field_attr,'trip_distance') \
        .groupBy(target_field_attr).agg({target_field_attr:'sum','trip_distance':'avg'}) \
        .withColumnRenamed('avg(trip_distance)','av_trip_distance') \
        .withColumnRenamed('sum('+ target_field_attr +')','long_trip_count') 

    # merge tables
    df_trips_groups_merged = df_short_trips_groups.join(df_long_trips_groups,on=target_field_attr).orderBy(target_field_attr)

    short_trips_sum = df_trips_groups_merged.select('short_trip_count').rdd.map(lambda x:x[0]).reduce(lambda x,y: x + y)
    long_trips_sum= df_trips_groups_merged.select('long_trip_count').rdd.map(lambda x:x[0]).reduce(lambda x,y: x + y)
    #short_trips_sum,long_trips_sum
    #df_sum_short_trip = df_short_trips_group_merged.withColumn('rate_short_trip_count',f.col('short_trip_count')/f.sum('short_trip_count'))

    df_trips_groups_merged_all = df_trips_groups_merged \
        .withColumn('rate_of_short_trip',f.col('short_trip_count')/short_trips_sum)\
        .withColumn('rate_of_long_trip',f.col('long_trip_count')/long_trips_sum) \
        .withColumn('av_trip_distance', f.round(f.col('av_trip_distance'),4))  \
        .withColumn('rate_of_short_trip', f.round(f.col('rate_of_short_trip'),3))  \
        .withColumn('rate_of_long_trip', f.round(f.col('rate_of_long_trip'),3)) 
    return df_trips_groups_merged_all

df_trips_groups_merged_all = filterAndCaculateTargetFields('p_count')

#df_trips_target_data_table.show(10)
df_trips_groups_merged_all.show(10)

In [None]:

## here we can create a pie
df_trips_groups_merged_all.show()

In [None]:
#long_trip_count,short_trip_count we can create a pie plot
#short_trips_sum,long_trips_sum
df_trips_groups_merged_all.printSchema()

In [None]:
def diff_short_long_trip_on(attr_field,df_trips_groups_merged_all):

    x_axis_label_list = df_trips_groups_merged_all.select(attr_field).rdd.map(lambda x:x[0]).collect()
    y_axis_short_trip_rate_list = df_trips_groups_merged_all.select('rate_of_short_trip').rdd.map(lambda x:x[0]).collect()
    y_axis_long_trip_rate_list = df_trips_groups_merged_all.select('rate_of_long_trip').rdd.map(lambda x:x[0]).collect()

    
    fig, ax = plt.subplots(figsize=(10,5))

    width = 0.35  # the width of the bars

    x = np.arange(len(x_axis_label_list))  # the label locations

    ax.set_ylabel('rate of trip distance')
    ax.set_xticks(x)
    ax.set_xticklabels(x_axis_label_list)
    ax.set_title(attr_field.replace('_', ' ')+' difference in short/long trip')

    rects1 = ax.bar(x - width/2, y_axis_short_trip_rate_list, width, label='short trip')
    rects2 = ax.bar(x + width/2, y_axis_long_trip_rate_list, width, label='long trip')

    ax.legend()

    def stamplabel(rects):
        for rect in rects:
            height = rect.get_height()
            ax.annotate('{}'.format(height), xy=(rect.get_x() + rect.get_width() / 2, height), textcoords="offset points", ha='center')

    stamplabel(rects1)
    stamplabel(rects2)

    fig.tight_layout()
    plt.show()

diff_short_long_trip_on('p_count',df_trips_groups_merged_all)
%matplot plt

In [None]:
# 1=Standard rate
# 2=JFK
# 3=Newark
# 4=Nassau or Westchester
# 5=Negotiated fare
# 6=Group ride

tmpt = filterAndCaculateTargetFields('RatecodeID')
diff_short_long_trip_on('RatecodeID',tmpt)
%matplot plt

In [None]:
# 1=Credit card
# 2=Cash
# 3=No charge
# 4=Dispute

tmpt = filterAndCaculateTargetFields('payment_type')
diff_short_long_trip_on('payment_type',tmpt)
%matplot plt


#### Q5 Explorer average speed between citys with some complicate logic

In [None]:
# .withColumnRenamed('count','count(short trip)')
    
df_zone_statics = df.where(f.col('trip_distance') > 0.001) \
    .select(['PULocationID','DOLocationID','average_speed','minute_rate']) \
    .groupBy(['PULocationID','DOLocationID']) \
    .agg(f.avg('average_speed'),f.avg('minute_rate')) \
    .withColumn('avg(average_speed)',f.round('avg(average_speed)',3)) \
    .withColumn('avg(minute_rate)',f.round('avg(minute_rate)',3)) \
    .withColumnRenamed('avg(average_speed)','average_speed') \
    .withColumnRenamed('avg(minute_rate)','minute_rate') 

df_zone_statics.show(10)

In [None]:
df_zone_speed_statics_all_1  = dfz.join(df_zone_statics,f.col('LocationID') == df_zone_statics.PULocationID) \
    .drop('service_zone','LocationID','PULocationID') \
    .withColumnRenamed('Borough','From_Borough') \
    .withColumnRenamed('Zone','From_Zone') 

#df_zone_speed_statics_all_1.show(10)

df_zone_speed_statics_all = dfz.join(df_zone_speed_statics_all_1,f.col('LocationID') == df_zone_speed_statics_all_1.DOLocationID) \
    .drop('service_zone','LocationID','DOLocationID') \
    .withColumnRenamed('Borough','To_Borough') \
    .withColumnRenamed('Zone','To_Zone') 

#    .withColumn('PULocationID', f.col('PULocationID').cast(LongType())) \
#    .withColumn('DOLocationID', f.col('DOLocationID').cast(LongType()))

df_zone_speed_statics_all.show(10)

In [None]:
df_zone_speed_statics_bro = df_zone_speed_statics_all.groupBy(['To_Borough','From_Borough']).agg({'average_speed':'avg','minute_rate':'avg'}) \
    .withColumn('avg(average_speed)',f.round('avg(average_speed)',3)) \
    .withColumn('avg(minute_rate)',f.round('avg(minute_rate)',3)) \
    .withColumnRenamed('avg(average_speed)','average_speed') \
    .withColumnRenamed('avg(minute_rate)','minute_rate') 
    
df_zone_speed_statics_bro.show(10)

In [None]:
all_avg_speeds_sum = df_zone_speed_statics_bro.select('average_speed').rdd.map(lambda x:x[0]).reduce(lambda x,y: x + y)

df_zone_speed_statics_tags10 = df_zone_speed_statics_bro.withColumn('driving_direction',f.concat_ws( ' To ','From_Borough','To_Borough')) \
    .withColumn('speed_rate',f.col('average_speed')/all_avg_speeds_sum) \
    .withColumn('speed_rate', f.round(f.col('speed_rate'),4)) \
    .orderBy(f.desc('average_speed')).select('*').limit(10)

df_zone_speed_statics_tags10.show()

In [None]:
df_zone_speed_statics_tags10.show()

In [None]:
#all_avg_speeds_sum_top10 = df_zone_speed_statics_tags10.select('average_speed').rdd.map(lambda x:x[0]).reduce(lambda x,y: x + y)

df_zone_speed_statics_top10_size_list = df_zone_speed_statics_tags10.select('average_speed').rdd.map(lambda x:x[0]).collect()
df_zone_speed_statics_top10_tags_list  = df_zone_speed_statics_tags10_rank.select(['driving_direction']).rdd.map(lambda x:x[0]).collect()

df_zone_speed_statics_top10_size_list,df_zone_speed_statics_top10_tags_list

In [None]:
fig1, ax1 = plt.subplots(figsize=(15,9))

#explode =  np.arange(len(df_zone_speed_statics_top10_size_list))  # the label locations

explode =  [x* 0 for x in range(len(df_zone_speed_statics_top10_size_list))]  # the label locations

explode[0] = 0.1 # set explode 1st

wedges, texts, autotexts = ax1.pie(df_zone_speed_statics_top10_size_list, 
        explode=explode, 
        labels=df_zone_speed_statics_top10_tags_list, 
        autopct='%2.2f%%',
        shadow=True, startangle=90)

ax1.set_title("Top 10 Speed Between Broughs as PIE")

plt.axis('equal') # # Equal aspect ratio ensures that pie is drawn as a circle.

ax1.legend(wedges, df_zone_speed_statics_top10_tags_list,
          title="Ingredients",
          loc="center left",
          bbox_to_anchor=(-0.15, 0.3, 0.5, 1))

plt.show()

%matplot plt

## that is all