In [220]:
import pyspark
from pyspark.sql import SparkSession
import pandas as pd
from pyspark.sql import types
from pyspark.sql.functions import *

In [221]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('fhvtaxi') \
    .getOrCreate() 
    
spark.sparkContext.setLogLevel("ERROR")

### Question 1. Install Spark and PySpark###

In [222]:
spark.version

'3.2.1'

In [5]:
!wget https://nyc-tlc.s3.amazonaws.com/trip+data/fhvhv_tripdata_2021-02.csv

--2022-02-22 20:11:52--  https://nyc-tlc.s3.amazonaws.com/trip+data/fhvhv_tripdata_2021-02.csv
Resolving nyc-tlc.s3.amazonaws.com (nyc-tlc.s3.amazonaws.com)... 52.217.64.140
Connecting to nyc-tlc.s3.amazonaws.com (nyc-tlc.s3.amazonaws.com)|52.217.64.140|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 733822658 (700M) [text/csv]
Saving to: ‘fhvhv_tripdata_2021-02.csv’


2022-02-22 20:24:53 (918 KB/s) - ‘fhvhv_tripdata_2021-02.csv’ saved [733822658/733822658]



In [18]:
!head -n 100 fhvhv_tripdata_2021-02.csv > head.csv

In [20]:
!wc -l head.csv

     100 head.csv


In [33]:
df_pandas = pd.read_csv('head.csv')

In [22]:
spark.createDataFrame(df_pandas).schema

StructType(List(StructField(hvfhs_license_num,StringType,true),StructField(dispatching_base_num,StringType,true),StructField(pickup_datetime,StringType,true),StructField(dropoff_datetime,StringType,true),StructField(PULocationID,LongType,true),StructField(DOLocationID,LongType,true),StructField(SR_Flag,DoubleType,true)))

In [80]:
schema = types.StructType([
    types.StructField('hvfhs_license_num', types.StringType(), True),
    types.StructField('dispatching_base_num', types.StringType(), True),
    types.StructField('pickup_datetime', types.TimestampType(), True),
    types.StructField('dropoff_datetime', types.TimestampType(), True),
    types.StructField('PULocationID', types.IntegerType(), True),
    types.StructField('DOLocationID', types.IntegerType(), True),
    types.StructField('SR_Flag', types.StringType(), True)
])

In [24]:
df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv('fhvhv_tripdata_2021-02.csv')

### Question 2. HVFHW February 2021###

In [28]:
df = df.repartition(24)


In [29]:
df.write.parquet('fhvhv/2021/02/')

                                                                                

In [226]:
#!ls -lrt
!du -sh fhvhv

207M	fhvhv


In [227]:
df= spark.read.parquet('fhvhv/2021/02/')

In [228]:
df.show(n=5)

+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|hvfhs_license_num|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|
+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|           HV0003|              B02764|2021-02-02 22:33:30|2021-02-02 22:51:06|         192|          53|   null|
|           HV0003|              B02876|2021-02-03 06:23:38|2021-02-03 06:43:30|         244|          50|   null|
|           HV0005|              B02510|2021-02-03 17:17:27|2021-02-03 18:13:06|         132|         265|   null|
|           HV0003|              B02877|2021-02-02 16:06:44|2021-02-02 16:32:44|         140|         160|   null|
|           HV0003|              B02866|2021-02-04 14:16:27|2021-02-04 14:59:42|          68|          80|   null|
+-----------------+--------------------+-------------------+-------------------+

### Question 3. Count records ###

In [229]:
 df_date= df \
    .withColumn("pickup_date",to_date("pickup_datetime"))

In [230]:
df_date.groupBy("pickup_date").count().filter(df_date.pickup_date =='2021-02-15').show()
                                               

+-----------+------+
|pickup_date| count|
+-----------+------+
| 2021-02-15|367170|
+-----------+------+





### Question 4. Longest trip for each day ###

#Unix_timestamp approach
#.withColumn('DiffInSecondsUnix',unix_timestamp("dropoff_datetime") - unix_timestamp('pickup_datetime')) \

#cast to long approach converts to seconds

In [140]:

df_sec= df_date.withColumn('DiffInSeconds', col('dropoff_datetime').cast("long") - col('pickup_datetime').cast("long"))
    


In [141]:
df_time= df_sec.withColumn('Time_Diff_MS',round(col('DiffInSeconds')/60,2))


In [149]:
df_time.select("pickup_date","pickup_datetime", "dropoff_datetime", "Time_Diff_MS") \
        .orderBy('Time_Diff_MS', ascending=False).show(1)




+-----------+-------------------+-------------------+------------+
|pickup_date|    pickup_datetime|   dropoff_datetime|Time_Diff_MS|
+-----------+-------------------+-------------------+------------+
| 2021-02-11|2021-02-11 13:40:44|2021-02-12 10:39:44|      1259.0|
+-----------+-------------------+-------------------+------------+
only showing top 1 row



                                                                                

### Question 5. Most frequent dispatching_base_num ###

In [231]:
df_time.groupBy("dispatching_base_num").count().orderBy("count", ascending=False).show(1)

+--------------------+-------+
|dispatching_base_num|  count|
+--------------------+-------+
|              B02510|3233664|
+--------------------+-------+
only showing top 1 row



### Question 6. Most common locations pair ###

In [175]:
df_time.count()

11613942

df_zone= spark.read.parquet('zones/')

df_time.filter(df_time.PULocationID == df_time.DOLocationID).show(1)

df_zone.filter(df_zone.LocationID == '192').show()


In [176]:
df_time.createOrReplaceTempView('fhv_trips')

In [178]:
df_zone.createOrReplaceTempView('taxi_zone')

In [219]:

spark.sql("""
SELECT PULocationID, count(PULocationID)
   
FROM
    fhv_trips
where PULocationID=DOLocationID
group by PULocationID
order by count(PULocationID) desc limit 1

""").show()

+------------+-------------------+
|PULocationID|count(PULocationID)|
+------------+-------------------+
|          76|              45041|
+------------+-------------------+



#### solve with spark sql ####

In [217]:
spark.sql("""
with cte as (
select Y.PULocationID, Y.DOLocationID,
CONCAT(D.Zone, '/', P.Zone) As PU_DO_Pair
from fhv_trips Y
join taxi_zone D on D.LocationID = Y.PULocationID
join taxi_zone P on P.LocationID = Y.DOLocationID
)
select PULocationID, DOLocationID, PU_DO_Pair, count(PU_DO_Pair) as Total from cte
group by PULocationID, DOLocationID, pu_do_pair
order by Total  desc limit 1
""").show(truncate=False)



+------------+------------+---------------------------+-----+
|PULocationID|DOLocationID|PU_DO_Pair                 |Total|
+------------+------------+---------------------------+-----+
|76          |76          |East New York/East New York|45041|
+------------+------------+---------------------------+-----+





#### Solve with pyspark ####

In [213]:
df_join = df_time.alias("main").join(df_zone.alias("zone1"), \
        col("main.PULocationID") == col("zone1.LocationID"),"inner") \
        .join(df_zone.alias("zone2"), \
         col("main.DOLocationID") == col("zone2.LocationID"),"inner") \
        .select("PULocationID","DOLocationID", concat_ws(' / ', col("zone1.zone"), col("zone2.zone")).alias("PU_DO_Pair")) \
        #.show(truncate = False)


In [216]:
df_join.groupBy("PU_DO_Pair").count().orderBy("count", ascending=False).show(n=1, truncate = False)



+-----------------------------+-----+
|PU_DO_Pair                   |count|
+-----------------------------+-----+
|East New York / East New York|45041|
+-----------------------------+-----+
only showing top 1 row



