In [None]:
spark.version

Question 2 - What's the size of the folder with results (in MB)?

In [48]:
import pyspark
from pyspark.sql import SparkSession

In [49]:
spark = SparkSession.builder \
        .master("local[*]") \
        .appName('test') \
        .getOrCreate()



In [3]:
!wget https://nyc-tlc.s3.amazonaws.com/trip+data/fhvhv_tripdata_2021-02.csv

--2022-03-01 14:47:58--  https://nyc-tlc.s3.amazonaws.com/trip+data/fhvhv_tripdata_2021-02.csv
Resolving nyc-tlc.s3.amazonaws.com (nyc-tlc.s3.amazonaws.com)... 52.217.77.172
Connecting to nyc-tlc.s3.amazonaws.com (nyc-tlc.s3.amazonaws.com)|52.217.77.172|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 733822658 (700M) [text/csv]
Saving to: ‘fhvhv_tripdata_2021-02.csv.1’

2021-02.csv.1        22%[===>                ] 160.15M  28.4MB/s    eta 21s    ^C


In [3]:
df = spark.read \
    .option('header','true') \
    .csv('fhvhv_tripdata_2021-02.csv')


[Stage 0:>                                                          (0 + 1) / 1]

                                                                                

In [None]:
df \
    .repartition(24) \
    .write.parquet('data/pq/fhvhv')

Question 3 - Count records

In [4]:
df_fhvhv = spark.read.parquet('data/pq/fhvhv/')

In [5]:
df_fhvhv.printSchema()

root
 |-- hvfhs_license_num: string (nullable = true)
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: string (nullable = true)
 |-- dropoff_datetime: string (nullable = true)
 |-- PULocationID: string (nullable = true)
 |-- DOLocationID: string (nullable = true)
 |-- SR_Flag: string (nullable = true)



In [50]:
from pyspark.sql import types

In [51]:
fhvhv_schema = types.StructType([
    types.StructField("hvfhs_license_num", types.StringType(), True),
    types.StructField("dispatching_base_num", types.StringType(), True),
    types.StructField("pickup_datetime", types.TimestampType(), True),
	types.StructField("dropoff_datetime", types.TimestampType(), True),
    types.StructField("PULocationID", types.IntegerType(), True),
    types.StructField("DOLocationID", types.IntegerType(), True),
    types.StructField("SR_Flag", types.StringType(), True)
])

In [131]:
df_fhvhv = spark.read \
    .option('header','true') \
    .schema(fhvhv_schema) \
    .csv('fhvhv_tripdata_2021-02.csv')

In [132]:
df_fhvhv.printSchema()

root
 |-- hvfhs_license_num: string (nullable = true)
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- SR_Flag: string (nullable = true)



In [63]:
df_fhvhv.registerTempTable('fhvhv_data_sql')

In [12]:
df_result = spark.sql("""
select 
    count(0) 
from 
    fhvhv_data_sql
where pickup_datetime >= '2021-02-15' and pickup_datetime < '2021-02-16'
""")

In [13]:
df_result.show()



+--------+
|count(0)|
+--------+
|  367170|
+--------+




                                                                                

Question 4 -Longest trip for each day

In [53]:
from pyspark.sql import functions as F

In [152]:
dr_result_duration = df_fhvhv \
    .withColumn('pickup_datetime',F.to_timestamp('pickup_datetime')) \
    .withColumn('dropoff_datetime',F.to_timestamp('dropoff_datetime')) \
    .withColumn('DiffInMinutes',(F.unix_timestamp("dropoff_datetime") - F.unix_timestamp('pickup_datetime'))/60)
 

In [154]:
dr_result_duration.select("pickup_datetime") \
    .orderBy(col("DiffInMinutes").desc()).limit(1).show()



+-------------------+
|    pickup_datetime|
+-------------------+
|2021-02-11 13:40:44|
+-------------------+




                                                                                

Question 5 - Most frequent dispatching_base_num

In [61]:
df_result = spark.sql("""
select 
    dispatching_base_num,
    count(0) as Num
from 
    fhvhv_data_sql
group by
    dispatching_base_num
Order by 
    Num desc
""")

In [62]:
df_result.show()



+--------------------+-------+
|dispatching_base_num|    Num|
+--------------------+-------+
|              B02510|3233664|
|              B02764| 965568|
|              B02872| 882689|
|              B02875| 685390|
|              B02765| 559768|
|              B02869| 429720|
|              B02887| 322331|
|              B02871| 312364|
|              B02864| 311603|
|              B02866| 311089|
|              B02878| 305185|
|              B02682| 303255|
|              B02617| 274510|
|              B02883| 251617|
|              B02884| 244963|
|              B02882| 232173|
|              B02876| 215693|
|              B02879| 210137|
|              B02867| 200530|
|              B02877| 198938|
+--------------------+-------+
only showing top 20 rows




                                                                                

Question 6 - Most common locations pair

In [64]:
df_zones = spark.read.parquet('zones/')

In [143]:
df_pickup_zones = df_zones \
        .withColumnRenamed('Zone','PickupZone')

df_dropoff_zones = df_zones \
        .withColumnRenamed('Zone','DropoffZone')

In [144]:
df_join_data=df_fhvhv.join(df_pickup_zones,df_fhvhv.PULocationID == df_pickup_zones.LocationID) \
    .drop('LocationID','Borough','service_zone')

In [145]:
df_join_data_new = df_join_data.join(df_dropoff_zones,df_join_data.DOLocationID == df_dropoff_zones.LocationID) \
    .drop('LocationID','Borough','service_zone')

In [147]:
df_join_data_new.registerTempTable('df_join_data_sql')

In [148]:
df_join_result = spark.sql("""
select 
    CONCAT(nvl(PickupZone,'Unknown'),' / ',nvl(DropoffZone,'Unknown')) as ZoneMatch,
    Count(0) as Num
from 
    df_join_data_sql
group by
    CONCAT(nvl(PickupZone,'Unknown'),' / ',nvl(DropoffZone,'Unknown'))
Order by 
    Num desc
limit 5
""")

In [149]:
df_join_result.show()



+--------------------+-----+
|           ZoneMatch|  Num|
+--------------------+-----+
|East New York / E...|45041|
|Borough Park / Bo...|37329|
| Canarsie / Canarsie|28026|
|Crown Heights Nor...|25976|
|Bay Ridge / Bay R...|17934|
+--------------------+-----+





                                                                                