In [1]:
import pyspark

In [2]:
from pyspark.sql import SparkSession

In [3]:
spark = SparkSession.builder \
        .master("local[*]") \
        .appName('fhv analysis') \
        .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/03/01 09:37:39 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
24/03/01 09:37:40 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
24/03/01 09:37:40 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.


1.spark.version

In [32]:
spark.version

'3.5.0'

In [6]:
fhv_df=spark \
        .read \
        .option("header", "true") \
        .csv('./data/raw/fhv/fhv_tripdata_2019-10.csv.gz')

In [11]:
fhv_df.printSchema

<bound method DataFrame.printSchema of DataFrame[dispatching_base_num: string, pickup_datetime: string, dropOff_datetime: string, PUlocationID: string, DOlocationID: string, SR_Flag: string, Affiliated_base_number: string]>

In [12]:
import pandas as pd

In [13]:
fhv_pd=pd.read_csv('./data/raw/fhv/fhv_tripdata_2019-10.csv.gz')

In [14]:
fhv_pd.dtypes

dispatching_base_num       object
pickup_datetime            object
dropOff_datetime           object
PUlocationID              float64
DOlocationID              float64
SR_Flag                   float64
Affiliated_base_number     object
dtype: object

In [16]:
spark.createDataFrame(fhv_pd).schema

StructType([StructField('dispatching_base_num', StringType(), True), StructField('pickup_datetime', StringType(), True), StructField('dropOff_datetime', StringType(), True), StructField('PUlocationID', DoubleType(), True), StructField('DOlocationID', DoubleType(), True), StructField('SR_Flag', DoubleType(), True), StructField('Affiliated_base_number', StringType(), True)])

In [17]:
from pyspark.sql import types

In [19]:
fhv_schema=types.StructType([
 types.StructField('dispatching_base_num', types.IntegerType(), True),
 types.StructField('pickup_datetime', types.TimestampType(), True),
 types.StructField('dropOff_datetime', types.TimestampType(), True),
 types.StructField('PUlocationID', types.IntegerType(), True),
 types.StructField('DOlocationID', types.IntegerType(), True),
 types.StructField('SR_Flag', types.IntegerType(), True),
 types.StructField('Affiliated_base_number', types.StringType(), True)
 ])

In [20]:
df=spark \
        .read \
        .option("header", "true") \
        .schema(fhv_schema) \
        .csv('./data/raw/fhv/fhv_tripdata_2019-10.csv.gz')

In [21]:
df.show()

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropOff_datetime|PUlocationID|DOlocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|                NULL|2019-10-01 00:23:00|2019-10-01 00:35:00|         264|         264|   NULL|                B00009|
|                NULL|2019-10-01 00:11:29|2019-10-01 00:13:22|         264|         264|   NULL|                B00013|
|                NULL|2019-10-01 00:11:43|2019-10-01 00:37:20|         264|         264|   NULL|                B00014|
|                NULL|2019-10-01 00:56:29|2019-10-01 00:57:47|         264|         264|   NULL|                B00014|
|                NULL|2019-10-01 00:23:09|2019-10-01 00:28:27|         264|         264|   NULL|                B00014|
|                NULL|2019-10-01 00:00:4

In [23]:
df.printSchema()

root
 |-- dispatching_base_num: integer (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropOff_datetime: timestamp (nullable = true)
 |-- PUlocationID: integer (nullable = true)
 |-- DOlocationID: integer (nullable = true)
 |-- SR_Flag: integer (nullable = true)
 |-- Affiliated_base_number: string (nullable = true)



2.Repartition into 6

In [28]:
df=df.repartition(6)

In [29]:
df.write.parquet('./data/pq/fhv')

                                                                                

3. select count(*)

In [30]:
df_fhv_2019=spark.read.parquet('./data/pq/fhv')

In [46]:
df_fhv_2019.take(1)

[Row(dispatching_base_num=None, pickup_datetime=datetime.datetime(2019, 10, 11, 6, 49, 28), dropOff_datetime=datetime.datetime(2019, 10, 11, 7, 19, 28), PUlocationID=264, DOlocationID=264, SR_Flag=None, Affiliated_base_number='B02839')]

In [47]:
from pyspark.sql.functions import year,month,dayofmonth

In [64]:
columns=['pickup_datetime','dropOff_datetime','PUlocationID','DOlocationID']

result=df_fhv_2019 \
    .select(columns) \
    .filter(year("pickup_datetime")==2019) \
    .filter(month("pickup_datetime")==10) \
    .filter(dayofmonth("pickup_datetime")==15)

In [66]:
result.count()

62610

4.Longest trip

In [78]:
from pyspark.sql.functions import unix_timestamp

In [82]:
df_fhv_2019=df_fhv_2019.withColumn('duration',unix_timestamp('dropOff_datetime')-unix_timestamp('pickup_datetime'))

In [84]:
df_fhv_2019.take(1)

[Row(dispatching_base_num=None, pickup_datetime=datetime.datetime(2019, 10, 11, 6, 49, 28), dropOff_datetime=datetime.datetime(2019, 10, 11, 7, 19, 28), PUlocationID=264, DOlocationID=264, SR_Flag=None, Affiliated_base_number='B02839', duration=1800)]

In [85]:
df_fhv_2019.createOrReplaceTempView('fhv_2019')

In [94]:
duration_analysis=spark.sql("""
select *
from fhv_2019
order by duration desc
limit 10;
""")

In [95]:
duration_analysis.show()

[Stage 61:>                                                         (0 + 6) / 6]

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+----------+
|dispatching_base_num|    pickup_datetime|   dropOff_datetime|PUlocationID|DOlocationID|SR_Flag|Affiliated_base_number|  duration|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+----------+
|                NULL|2019-10-11 18:00:00|2091-10-11 18:30:00|         264|         264|   NULL|                B02832|2272149000|
|                NULL|2019-10-28 09:00:00|2091-10-28 09:30:00|         264|         264|   NULL|                B02832|2272149000|
|                NULL|2019-10-31 23:46:33|2029-11-01 00:13:00|        NULL|        NULL|   NULL|                B02416| 315620787|
|                NULL|2019-10-01 21:43:42|2027-10-01 21:45:23|         159|         264|   NULL|       B00746         | 252460901|
|                NULL|2019-10-17 14:00:00|2020-10-18 00:00:00|        NULL|        

                                                                                

6. Join two tables to get the least pickup zone name

In [98]:
zone=spark.read \
    .option("header","true") \
    .csv('./data/raw/fhv/taxi_zone_lookup.csv')

In [101]:
zone.createOrReplaceTempView('zone_lookup')

In [113]:
result=spark.sql("""
select Zone,count(*) 
from fhv_2019
inner join zone_lookup on fhv_2019.PUlocationID=zone_lookup.LocationID
group by Zone
order by count(*)
limit  3;
""")

In [114]:
result.show()

+--------------------+--------+
|                Zone|count(1)|
+--------------------+--------+
|         Jamaica Bay|       1|
|Governor's Island...|       2|
| Green-Wood Cemetery|       5|
+--------------------+--------+

