## import library

In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import types
from pyspark.sql import functions as F
import numpy as np
from datetime import datetime
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()


23/03/09 10:16:21 WARN Utils: Your hostname, david-HP-EliteBook-840-G6 resolves to a loopback address: 127.0.1.1; using 213.255.147.57 instead (on interface wlp0s20f3)
23/03/09 10:16:21 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/03/09 10:16:22 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


## Read data

In [2]:
df= spark.read.option("header", "true").csv('fhvhv_tripdata_2021-06.csv.gz')

[Stage 0:>                                                          (0 + 1) / 1]                                                                                

In [3]:
df.schema

StructType([StructField('dispatching_base_num', StringType(), True), StructField('pickup_datetime', StringType(), True), StructField('dropoff_datetime', StringType(), True), StructField('PULocationID', StringType(), True), StructField('DOLocationID', StringType(), True), StructField('SR_Flag', StringType(), True), StructField('Affiliated_base_number', StringType(), True)])

## Create schema

In [8]:
schema = types.StructType([types.StructField('dispatching_base_num', types.StringType(), True), 
                           types.StructField('pickup_datetime', types.TimestampType(), True), 
                           types.StructField('dropoff_datetime', types.TimestampType(), True), 
                           types.StructField('PULocationID', types.StringType(), True), 
                           types.StructField('DOLocationID', types.StringType(), True), 
                           types.StructField('SR_Flag', types.StringType(), True), 
                           types.StructField('Affiliated_base_number', types.StringType(), True)])

## Repartition Data to 12

In [4]:
df=df.repartition(12)

In [5]:
df.rdd.getNumPartitions()

[Stage 1:>                                                          (0 + 1) / 1]

12

In [6]:
df.printSchema()

root
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: string (nullable = true)
 |-- dropoff_datetime: string (nullable = true)
 |-- PULocationID: string (nullable = true)
 |-- DOLocationID: string (nullable = true)
 |-- SR_Flag: string (nullable = true)
 |-- Affiliated_base_number: string (nullable = true)



In [9]:
df= spark.read.schema(schema).option("header", "true").csv('fhvhv_tripdata_2021-06.csv.gz').repartition(12)

## save to parquet file

In [11]:
df.write.parquet('homework')

[Stage 4:>                                                         (0 + 8) / 12]

23/03/09 10:20:51 WARN MemoryManager: Total allocation exceeds 95.00% (1,020,054,720 bytes) of heap memory
Scaling row group sizes to 95.00% for 8 writers


                                                                                

## Read parquet file

In [12]:
df=spark.read.schema(schema).parquet('homework')

In [13]:
df.count()

14961892

In [14]:
df.take(1)

[Row(dispatching_base_num='B02888', pickup_datetime=datetime.datetime(2021, 6, 18, 12, 42, 21), dropoff_datetime=datetime.datetime(2021, 6, 18, 13, 36, 18), PULocationID='72', DOLocationID='113', SR_Flag='N', Affiliated_base_number='B02888')]

## trips started on June 15th

In [15]:
def fil(row):
    return row.pickup_datetime.month==6

df.filter((F.month(df.pickup_datetime)==6) & (F.dayofmonth(df.pickup_datetime)==15) ).count()

                                                                                

452470

## create dataframe with duration column

In [16]:
def crazy_stuff(a,b):
    c=b-a
    return c.total_seconds()/3600

crazy_stuff_udf = F.udf(crazy_stuff, returnType=types.FloatType())
df2=df.withColumn('duration', crazy_stuff_udf(df.pickup_datetime,df.dropoff_datetime))

df2.show()

[Stage 12:>                                                         (0 + 1) / 1]

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+-----------+
|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|Affiliated_base_number|   duration|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+-----------+
|              B02888|2021-06-18 12:42:21|2021-06-18 13:36:18|          72|         113|      N|                B02888| 0.89916664|
|              B02764|2021-06-09 20:24:11|2021-06-09 20:41:41|         117|         265|      N|                B02764| 0.29166666|
|              B02883|2021-06-18 00:00:14|2021-06-18 00:06:28|          33|          25|      N|                B02883| 0.10388889|
|              B02835|2021-06-15 09:19:10|2021-06-15 09:39:35|          78|         259|      N|                B02835|  0.3402778|
|              B02879|2021-06-02 17:17:11|2021-06-02 17:36:30|          41| 

                                                                                

In [17]:
df2.take(1)

[Row(dispatching_base_num='B02888', pickup_datetime=datetime.datetime(2021, 6, 18, 12, 42, 21), dropoff_datetime=datetime.datetime(2021, 6, 18, 13, 36, 18), PULocationID='72', DOLocationID='113', SR_Flag='N', Affiliated_base_number='B02888', duration=0.8991666436195374)]

## longest trip in the dataset

In [18]:
df2.agg({"duration": "max"}).collect()[0]

                                                                                

Row(max(duration)=66.87889099121094)

## Merge with location data

In [20]:
location = spark.read.option("header", "true").csv('taxi_zone_lookup.csv')

In [21]:
location.head(5)

[Row(LocationID='1', Borough='EWR', Zone='Newark Airport', service_zone='EWR'),
 Row(LocationID='2', Borough='Queens', Zone='Jamaica Bay', service_zone='Boro Zone'),
 Row(LocationID='3', Borough='Bronx', Zone='Allerton/Pelham Gardens', service_zone='Boro Zone'),
 Row(LocationID='4', Borough='Manhattan', Zone='Alphabet City', service_zone='Yellow Zone'),
 Row(LocationID='5', Borough='Staten Island', Zone='Arden Heights', service_zone='Boro Zone')]

In [30]:
df=df.withColumnRenamed('PULocationID','LocationID')

In [35]:
df3 = df.join(location, on=['LocationID'], how='left')
df3.take(1)

[Row(LocationID='72', dispatching_base_num='B02888', pickup_datetime=datetime.datetime(2021, 6, 18, 12, 42, 21), dropoff_datetime=datetime.datetime(2021, 6, 18, 13, 36, 18), DOLocationID='113', SR_Flag='N', Affiliated_base_number='B02888', Borough='Brooklyn', Zone='East Flatbush/Remsen Village', service_zone='Boro Zone')]

In [44]:
df3.groupby('Zone').count().agg({'count':'max'}).collect()[0]

                                                                                

Row(max(count)=231279)

In [51]:
df3.groupby('Zone').count().agg({'count':'max'}).show()



+----------+
|max(count)|
+----------+
|    231279|
+----------+



                                                                                

## Getting location with max count

In [57]:
df3.groupby('Zone').count().filter(F.col('count')==231279).show()



+-------------------+------+
|               Zone| count|
+-------------------+------+
|Crown Heights North|231279|
+-------------------+------+



                                                                                