# Import Pyspark

In [1]:
import pyspark
from pyspark.sql import SparkSession

In [2]:
spark=(SparkSession.builder
       .master('local[*]')
       .appName('homework')
       .getOrCreate())

## Question 1. Checking spark version

In [3]:
spark.version

'3.3.2'

## Question 2. Reading csv to spark df

In [4]:
df_fhv=spark.read.option('header','true').csv('fhvhv_tripdata_2021-06.csv.gz')

In [5]:
# Check schema
df_fhv.schema

StructType([StructField('dispatching_base_num', StringType(), True), StructField('pickup_datetime', StringType(), True), StructField('dropoff_datetime', StringType(), True), StructField('PULocationID', StringType(), True), StructField('DOLocationID', StringType(), True), StructField('SR_Flag', StringType(), True), StructField('Affiliated_base_number', StringType(), True)])

In [6]:
df_fhv.show(2)

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B02764|2021-06-01 00:02:41|2021-06-01 00:07:46|         174|          18|      N|                B02764|
|              B02764|2021-06-01 00:16:16|2021-06-01 00:21:14|          32|         254|      N|                B02764|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
only showing top 2 rows



In [7]:
# import types
from pyspark.sql import types

In [8]:
# Fixed schema
fhv_schema=types.StructType([
    types.StructField('dispatching_base_num', types.StringType(), True), 
    types.StructField('pickup_datetime', types.TimestampType(), True), 
    types.StructField('dropoff_datetime', types.TimestampType(), True), 
    types.StructField('PULocationID', types.IntegerType(), True), 
    types.StructField('DOLocationID', types.IntegerType(), True), 
    types.StructField('SR_Flag', types.StringType(), True), 
    types.StructField('Affiliated_base_number', types.StringType(), True)
    ])

In [9]:
transformed_df=(spark.read
                .option('header','true')
                .schema(fhv_schema)
                .csv('fhvhv_tripdata_2021-06.csv.gz')
               )

In [10]:
transformed_df.repartition(12).write.parquet('pq_files/fhvhv/2021/06/')

AnalysisException: path file:/home/rbbel/notebooks/pq_files/fhvhv/2021/06 already exists.

## Question 3. Count Records

In [14]:
df_fhv_pq=spark.read.parquet('pq_files/fhvhv/2021/06/*')

In [15]:
df_fhv_pq.createOrReplaceTempView('fhv')

In [18]:
result=spark.sql("""
SELECT COUNT(*) as trip_number

FROM fhv

WHERE pickup_datetime LIKE "2021-06-15%"
""")

In [19]:
result.show()

+-----------+
|trip_number|
+-----------+
|     452470|
+-----------+



### Question 4. Longest Trip 

In [20]:
# make a function computing the pickup - dropoff
import pyspark.sql.functions as F 

In [21]:
# New df with added trip_duration column
trips=df_fhv_pq \
        .withColumn('trip_duration', (df_fhv_pq['dropoff_datetime']) - (df_fhv_pq['pickup_datetime']))

In [22]:
# Df with only the trip_duration column
duration=trips \
        .select('trip_duration') \
        .orderBy(trips['trip_duration'].desc())


In [27]:
# Check trip_duration column type
duration.printSchema

<bound method DataFrame.printSchema of DataFrame[trip_duration: interval day to second]>

In [51]:
# Df with converted trip_duration column () from interval day to second to interval hour to minute
duration_hrs=duration \
     .withColumn('duration_hrs', ((duration['trip_duration'].cast('INTERVAL MINUTE')).cast('INTEGER')/60))

In [52]:
duration_hrs.orderBy(duration_hrs['duration_hrs'].desc()).show(5)

+--------------------+------------------+
|       trip_duration|      duration_hrs|
+--------------------+------------------+
|INTERVAL '2 18:52...| 66.86666666666666|
|INTERVAL '1 01:32...|25.533333333333335|
|INTERVAL '0 19:58...|19.966666666666665|
|INTERVAL '0 18:11...|18.183333333333334|
|INTERVAL '0 16:28...|16.466666666666665|
+--------------------+------------------+
only showing top 5 rows



## Question 6 Most frequent pickup location zone

In [53]:
taxi_zone=spark.read.option('header','true').csv('taxi_zone_lookup.csv')

In [54]:
taxi_zone.show(5)

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
+----------+-------------+--------------------+------------+
only showing top 5 rows



In [55]:
df_fhv_pq.show(2)

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B02764|2021-06-30 20:30:27|2021-06-30 20:46:23|         247|         168|      N|                B02764|
|              B02876|2021-06-08 14:42:05|2021-06-08 15:05:06|          76|          17|      N|                B02876|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
only showing top 2 rows



In [56]:
taxi_zone.createOrReplaceTempView('zones')

In [57]:
zones.show(5)

NameError: name 'zones' is not defined

In [58]:
query=spark.sql("""
SELECT z1.Zone, COUNT(*) as Frequency

FROM fhv
INNER JOIN zones AS z1 on z1.LocationID=fhv.DOLocationID
INNER JOIN zones AS z2 on z2.LocationID=fhv.DOLocationID

WHERE z1.Zone != 'NA'
GROUP BY z1.Zone
ORDER BY Frequency DESC
""")

In [59]:
query.show(5)

+-------------------+---------+
|               Zone|Frequency|
+-------------------+---------+
|Crown Heights North|   236244|
|        JFK Airport|   224571|
|     Bushwick South|   187946|
|      East New York|   186038|
|  LaGuardia Airport|   182694|
+-------------------+---------+
only showing top 5 rows

