In [24]:
#Required Imports
import pyspark
import pandas as pd
from pyspark.sql import SparkSession
from pyspark.sql import types

In [10]:
pyspark.__file__

'/home/gary/spark/spark-3.0.3-bin-hadoop3.2/python/pyspark/__init__.py'

In [11]:
#Make Session
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

In [12]:
#Q1 - Get Version
spark.version

'3.0.3'

In [6]:
#Download Dataset
#!wget https://nyc-tlc.s3.amazonaws.com/trip+data/fhvhv_tripdata_2021-02.csv

--2022-02-24 18:13:58--  https://nyc-tlc.s3.amazonaws.com/trip+data/fhvhv_tripdata_2021-02.csv
Resolving nyc-tlc.s3.amazonaws.com (nyc-tlc.s3.amazonaws.com)... 52.217.72.60
Connecting to nyc-tlc.s3.amazonaws.com (nyc-tlc.s3.amazonaws.com)|52.217.72.60|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 733822658 (700M) [text/csv]
Saving to: ‘fhvhv_tripdata_2021-02.csv’


2022-02-24 18:15:40 (6.92 MB/s) - ‘fhvhv_tripdata_2021-02.csv’ saved [733822658/733822658]



In [1]:
#Check file
!ls

 fhvhv_tripdata_2021-02.csv  'HomeWork Week5.ipynb'


In [16]:
#Gzip file
!gzip fhvhv_tripdata_2021-02.csv

In [17]:
#Check gzip was successful
!ls

 04_pyspark.ipynb		 head.csv
 de-engineering-spark.ipynb	'HomeWork Week5.ipynb'
 fhvhv_tripdata_2021-02.csv.gz


In [20]:
#Read first 1000 rows in to get schema info
df_pandas = pd.read_csv('fhvhv_tripdata_2021-02.csv.gz', nrows=1000)

In [21]:
#Show the schema
spark.createDataFrame(df_pandas).schema

StructType(List(StructField(hvfhs_license_num,StringType,true),StructField(dispatching_base_num,StringType,true),StructField(pickup_datetime,StringType,true),StructField(dropoff_datetime,StringType,true),StructField(PULocationID,LongType,true),StructField(DOLocationID,LongType,true),StructField(SR_Flag,DoubleType,true)))

In [22]:
#Function to help make schema more readable
def formatSchema(frame):
    ip = str(frame).split('StructField')
    print('StructType(')
    for i in ip:
        if '(' in i and ')' in i:
            if ')))' in i:
                i=i.replace(')))',')')
            print(f"    StructField {i}")
    print(')')

In [23]:
#Print out schema with adjustments
formatSchema(spark.createDataFrame(df_pandas).schema)

StructType(
    StructField (hvfhs_license_num,StringType,true),
    StructField (dispatching_base_num,StringType,true),
    StructField (pickup_datetime,StringType,true),
    StructField (dropoff_datetime,StringType,true),
    StructField (PULocationID,LongType,true),
    StructField (DOLocationID,LongType,true),
    StructField (SR_Flag,DoubleType,true)
)


In [26]:
#Create schema for fhv data
fhv_schema = types.StructType([
    types.StructField ("hvfhs_license_num",types.StringType(),True),
    types.StructField ("dispatching_base_num",types.StringType(),True),
    types.StructField ("pickup_datetime",types.TimestampType(),True),
    types.StructField ("dropoff_datetime",types.TimestampType(),True),
    types.StructField ("PULocationID",types.LongType(),True),
    types.StructField ("DOLocationID",types.LongType(),True),
    types.StructField ("SR_Flag",types.DoubleType(),True)])

In [27]:
#Check that there are actually headers
df_pandas.head()

Unnamed: 0,hvfhs_license_num,dispatching_base_num,pickup_datetime,dropoff_datetime,PULocationID,DOLocationID,SR_Flag
0,HV0003,B02764,2021-02-01 00:10:40,2021-02-01 00:21:09,35,39,
1,HV0003,B02764,2021-02-01 00:27:23,2021-02-01 00:44:01,39,35,
2,HV0005,B02510,2021-02-01 00:28:38,2021-02-01 00:38:27,39,91,
3,HV0005,B02510,2021-02-01 00:43:37,2021-02-01 01:23:20,91,228,
4,HV0003,B02872,2021-02-01 00:08:42,2021-02-01 00:17:57,126,250,


In [28]:
#Create pyspark dataframe
df = spark.read \
    .option("header","true") \
    .schema(fhv_schema) \
    .csv('fhvhv_tripdata_2021-02.csv.gz')

In [29]:
#Get record count
df.count()

                                                                                

11613942

In [30]:
#Repartition dataframe
df = df.repartition(24)

In [32]:
#Make directory for parquet files
#This was not needed
!mkdir fhvh_data

In [33]:
#Check schema is correct
df.printSchema()

root
 |-- hvfhs_license_num: string (nullable = true)
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: long (nullable = true)
 |-- DOLocationID: long (nullable = true)
 |-- SR_Flag: double (nullable = true)



In [34]:
#Write parquet files to directory
df.write.parquet('fhvh_data/',mode='overwrite')

                                                                                

In [71]:
#Get total parquet file size
!du -sh fhvh_data/

220M	fhvh_data/


In [65]:
#Read in data from parquet files
df = spark.read.parquet('fhvh_data/*')

In [66]:
#Check record count matches original
df.count()

11613942

In [67]:
#Register temp table
df.registerTempTable('fhv')

In [68]:
#Get number of trips on 2/15/2021
df_trips = spark.sql("""
SELECT COUNT(*)
FROM fhv
WHERE date(pickup_datetime) = '2021-02-15'
""")

In [69]:
df_trips.show()

[Stage 37:>                                                         (0 + 4) / 4]

+--------+
|count(1)|
+--------+
|  367170|
+--------+



                                                                                

In [58]:
longest_trip = spark.sql("""
SELECT max(unix_timestamp(dropoff_datetime) -unix_timestamp(pickup_datetime)) as diff
FROM fhv
""")

In [59]:
longest_trip.show()

+-----+
| diff|
+-----+
|75540|
+-----+



In [62]:
longest = spark.sql("""
SELECT *
FROM fhv
WHERE unix_timestamp(dropoff_datetime) -unix_timestamp(pickup_datetime) = (SELECT max(unix_timestamp(dropoff_datetime) -unix_timestamp(pickup_datetime)) as diff
FROM fhv)
""")

In [63]:
longest.show()

[Stage 28:>                                                         (0 + 4) / 4]                                                                                

+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|hvfhs_license_num|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|
+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|           HV0005|              B02510|2021-02-11 13:40:44|2021-02-12 10:39:44|         247|          41|   null|
+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+



In [106]:
most_freq_dispatch = spark.sql("""
SELECT dispatching_base_num
,count(*) as Cnt
FROM fhv
GROUP BY dispatching_base_num
ORDER BY Cnt Desc""")

In [107]:
most_freq_dispatch.show()

+--------------------+-------+
|dispatching_base_num|    Cnt|
+--------------------+-------+
|              B02510|3233664|
|              B02764| 965568|
|              B02872| 882689|
|              B02875| 685390|
|              B02765| 559768|
|              B02869| 429720|
|              B02887| 322331|
|              B02871| 312364|
|              B02864| 311603|
|              B02866| 311089|
|              B02878| 305185|
|              B02682| 303255|
|              B02617| 274510|
|              B02883| 251617|
|              B02884| 244963|
|              B02882| 232173|
|              B02876| 215693|
|              B02879| 210137|
|              B02867| 200530|
|              B02877| 198938|
+--------------------+-------+
only showing top 20 rows



In [76]:
!wget https://s3.amazonaws.com/nyc-tlc/misc/taxi+_zone_lookup.csv

--2022-02-26 17:50:09--  https://s3.amazonaws.com/nyc-tlc/misc/taxi+_zone_lookup.csv
Resolving s3.amazonaws.com (s3.amazonaws.com)... 52.217.228.136
Connecting to s3.amazonaws.com (s3.amazonaws.com)|52.217.228.136|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 12322 (12K) [application/octet-stream]
Saving to: ‘taxi+_zone_lookup.csv’


2022-02-26 17:50:09 (72.0 MB/s) - ‘taxi+_zone_lookup.csv’ saved [12322/12322]



In [77]:
!ls

 04_pyspark.ipynb		 head.csv
 de-engineering-spark.ipynb	'HomeWork Week5.ipynb'
 fhvh_data			 taxi+_zone_lookup.csv
 fhvhv_tripdata_2021-02.csv.gz


In [81]:
!gzip taxi+_zone_lookup.csv

In [82]:
!ls

 04_pyspark.ipynb		 head.csv
 de-engineering-spark.ipynb	'HomeWork Week5.ipynb'
 fhvh_data			 taxi+_zone_lookup.csv.gz
 fhvhv_tripdata_2021-02.csv.gz


In [84]:
df_zones = pd.read_csv('taxi+_zone_lookup.csv.gz',nrows=100)

In [86]:
formatSchema(spark.createDataFrame(df_zones).schema)

StructType(
    StructField (LocationID,LongType,true),
    StructField (Borough,StringType,true),
    StructField (Zone,StringType,true),
    StructField (service_zone,StringType,true)
)


In [88]:
zones_schema = types.StructType([
    types.StructField ("LocationID",types.LongType(),True),
    types.StructField ("Borough",types.StringType(),True),
    types.StructField ("Zone",types.StringType(),True),
    types.StructField ("service_zone",types.StringType(),True)])    

In [92]:
spark_zones = spark.read \
    .option("header","true") \
    .schema(zones_schema) \
    .csv('taxi+_zone_lookup.csv.gz')

In [93]:
spark_zones = spark_zones.repartition(8)

In [94]:
spark_zones.write.parquet('zone_data/',mode='overwrite')

In [95]:
zones = spark.read.parquet('zone_data/')

In [98]:
zones.show()

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|       127|    Manhattan|              Inwood|   Boro Zone|
|       252|       Queens|          Whitestone|   Boro Zone|
|       104|    Manhattan|Governor's Island...| Yellow Zone|
|       206|Staten Island|Saint George/New ...|   Boro Zone|
|       233|    Manhattan| UN/Turtle Bay South| Yellow Zone|
|       119|        Bronx|          Highbridge|   Boro Zone|
|        62|     Brooklyn| Crown Heights South|   Boro Zone|
|       199|        Bronx|       Rikers Island|   Boro Zone|
|       107|    Manhattan|            Gramercy| Yellow Zone|
|       186|    Manhattan|Penn Station/Madi...| Yellow Zone|
|        96|       Queens|Forest Park/Highl...|   Boro Zone|
|       138|       Queens|   LaGuardia Airport|    Airports|
|       244|    Manhattan|Washington Height...|   Boro Zone|
|        17|     Brookly

In [97]:
zones.registerTempTable('zones')

In [99]:
df.show()

+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|hvfhs_license_num|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|
+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|           HV0003|              B02872|2021-02-08 14:44:38|2021-02-08 15:15:00|         129|          98|   null|
|           HV0003|              B02887|2021-02-07 08:19:00|2021-02-07 08:32:43|         188|          35|   null|
|           HV0005|              B02510|2021-02-03 07:58:56|2021-02-03 08:22:21|          75|          50|   null|
|           HV0003|              B02869|2021-02-13 09:18:39|2021-02-13 09:41:07|          61|         162|   null|
|           HV0003|              B02872|2021-02-06 15:55:43|2021-02-06 16:50:06|         125|         220|   null|
|           HV0005|              B02510|2021-02-12 23:29:09|2021-02-12 23:44:24|

In [104]:
most_common = spark.sql("""
with mostcomm as(SELECT fhv.PULocationID
,fhv.DOLocationID
,COUNT(*) as Count
FROM fhv
GROUP BY PULocationID, DOLocationID
ORDER BY Count Desc
LIMIT 1)


SELECT a.Zone
,b.Zone
FROM mostcomm inner join zones a
on PULocationID = a.LocationID
inner join zones b
on DOLocationID = b.LocationID
""")

In [105]:
most_common.show()



+-------------+-------------+
|         Zone|         Zone|
+-------------+-------------+
|East New York|East New York|
+-------------+-------------+



