In [1]:
# pathlib
from pathlib import Path
from pyspark.sql import SparkSession
from pyspark.sql import types as T
from pyspark.sql import functions as F
# from typing import List
import pandas as pd

In [13]:
data_path = Path('../apache-spark/data')
spark = SparkSession.builder.master("local[*]").appName('adis').getOrCreate()
spark.sparkContext.setLogLevel('ERROR')
# disable summary metadata
spark.conf.set('spark.sql.parquet.mergeSchema', 'false')

# spark.hadoopConfiguration.set("parquet.enable.summary-metadata", "false")

In [14]:
def pd_infer_schema(file: str, file_extension: str):
    
    if file_extension == "csv":
        df = pd.read_csv(file, nrows=5)
        # print(df.head(5))
        # schema = df.types
    df_s = spark.createDataFrame(df)
    schema = df_s.schema
    return schema

    

In [15]:
schema_df = pd_infer_schema(data_path / "fhv_tripdata_2019-10.csv.gz", 'csv')
schema_df

StructType([StructField('dispatching_base_num', StringType(), True), StructField('pickup_datetime', StringType(), True), StructField('dropOff_datetime', StringType(), True), StructField('PUlocationID', LongType(), True), StructField('DOlocationID', LongType(), True), StructField('SR_Flag', DoubleType(), True), StructField('Affiliated_base_number', StringType(), True)])

In [16]:
data_schema = T.StructType(
    [
        T.StructField("dispatching_base_num", T.StringType(), True),
        T.StructField("pickup_date", T.TimestampType(), True),
        T.StructField("dropOff_datetime", T.TimestampType(), True),
        T.StructField("PUlocationID", T.IntegerType(), True),
        T.StructField("DOlocationID", T.IntegerType(), True),
        T.StructField("SR_Flag", T.StringType(), True),
        # T.StructField("Dispatching_base_number", T.StringType(), True)
    ]
)

In [17]:
df_spark = spark.read.csv(str(data_path / "*"), header=True, schema=data_schema)
df_spark.printSchema()

root
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_date: timestamp (nullable = true)
 |-- dropOff_datetime: timestamp (nullable = true)
 |-- PUlocationID: integer (nullable = true)
 |-- DOlocationID: integer (nullable = true)
 |-- SR_Flag: string (nullable = true)



In [18]:
df_spark  = df_spark \
    .withColumnRenamed("dispatching_base_num", "dispatching_base_number") \
    .withColumnRenamed("pickup_date", "pickup_datetime") \
    .withColumnRenamed("dropOff_datetime", "dropoff_datetime") \
    .withColumnRenamed("PUlocationID", "pickup_location_id") \
    .withColumnRenamed("DOlocationID", "dropoff_location_id") \
    .withColumnRenamed("SR_Flag", "sr_flag") \
    .withColumnRenamed("Dispatching_base_number", "dispatching_base_number")
    
df_spark.printSchema()

root
 |-- dispatching_base_number: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- pickup_location_id: integer (nullable = true)
 |-- dropoff_location_id: integer (nullable = true)
 |-- sr_flag: string (nullable = true)



In [19]:
# df_spark.select(
#     [
#         F.count(F.col(col).isNull()).alias(f"{col}_null") for col in df_spark.columns
#     ]
# ).show()

In [21]:
# create temp view
df_sql_table = df_spark.createOrReplaceTempView("fhv_tripdata")

In [22]:
df_spark.printSchema()

root
 |-- dispatching_base_number: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- pickup_location_id: integer (nullable = true)
 |-- dropoff_location_id: integer (nullable = true)
 |-- sr_flag: string (nullable = true)



In [23]:
!ls -lh ../apache-spark/data/fhv_tripdata_2019-10/

total 36M
-rwxrwxrwx 1 root root    0 Feb 27 11:47 _SUCCESS
-rwxrwxrwx 1 root root 6.0M Feb 27 11:47 part-00000-1120756e-6103-4db1-a3b5-e09111852157-c000.snappy.parquet
-rwxrwxrwx 1 root root 6.0M Feb 27 11:47 part-00001-1120756e-6103-4db1-a3b5-e09111852157-c000.snappy.parquet
-rwxrwxrwx 1 root root 6.0M Feb 27 11:47 part-00002-1120756e-6103-4db1-a3b5-e09111852157-c000.snappy.parquet
-rwxrwxrwx 1 root root 6.0M Feb 27 11:47 part-00003-1120756e-6103-4db1-a3b5-e09111852157-c000.snappy.parquet
-rwxrwxrwx 1 root root 6.0M Feb 27 11:47 part-00004-1120756e-6103-4db1-a3b5-e09111852157-c000.snappy.parquet
-rwxrwxrwx 1 root root 6.0M Feb 27 11:47 part-00005-1120756e-6103-4db1-a3b5-e09111852157-c000.snappy.parquet


In [24]:
# obatain the count of trip on 2019-10-15
spark.sql(
    """
    select count(*) as trip_oct_i5 from fhv_tripdata
    where date(pickup_datetime) = '2019-10-15'
    
    """
).show()

                                                                                

+-----------+
|trip_oct_i5|
+-----------+
|      62610|
+-----------+



In [None]:
# 
spark.sql(
    """
    select * from fhv_tripdata
    limit 5
    """
).show()

+-----------------------+-------------------+-------------------+------------------+-------------------+-------+
|dispatching_base_number|    pickup_datetime|   dropoff_datetime|pickup_location_id|dropoff_location_id|sr_flag|
+-----------------------+-------------------+-------------------+------------------+-------------------+-------+
|                 B00009|2019-10-01 00:23:00|2019-10-01 00:35:00|               264|                264|   NULL|
|                 B00013|2019-10-01 00:11:29|2019-10-01 00:13:22|               264|                264|   NULL|
|                 B00014|2019-10-01 00:11:43|2019-10-01 00:37:20|               264|                264|   NULL|
|                 B00014|2019-10-01 00:56:29|2019-10-01 00:57:47|               264|                264|   NULL|
|                 B00014|2019-10-01 00:23:09|2019-10-01 00:28:27|               264|                264|   NULL|
+-----------------------+-------------------+-------------------+------------------+------------

In [25]:
# evaluate the longest trip in hourss
spark.sql(
    """
    with trip_hours as (
        select DATEDIFF(hour, pickup_datetime, dropoff_datetime) as trip_hours
        from fhv_tripdata
    )
    
    select 
        trip_hours 
    from trip_hours 
        order by trip_hours desc limit 1
    """
).show()



+----------+
|trip_hours|
+----------+
|    631152|
+----------+



                                                                                

In [26]:
df_lookup = spark.read.csv(str(data_path / "taxi_zone_lookup.csv"), header=True)
df_lookup = df_lookup \
    .withColumnRenamed("LocationID", "location_id") \
    .withColumnRenamed("Borough", "borough") \
    
df_lookup.createOrReplaceTempView("taxi_zone_lookup")

In [27]:
df_lookup.printSchema(), df_spark.printSchema()

root
 |-- location_id: string (nullable = true)
 |-- borough: string (nullable = true)
 |-- Zone: string (nullable = true)
 |-- service_zone: string (nullable = true)

root
 |-- dispatching_base_number: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- pickup_location_id: integer (nullable = true)
 |-- dropoff_location_id: integer (nullable = true)
 |-- sr_flag: string (nullable = true)



(None, None)

In [52]:
# obtain the zone with the least pickup
spark.sql(
    """
        select 
            pickup.zone, count(pickup.zone) as pickup_zone_count 
        from 
            fhv_tripdata as fhv
        join 
            taxi_zone_lookup as pickup
        where 
            fhv.pickup_location_id = pickup.location_id
        group by 
            1
        order by
            2 asc
        limit 1
    """
).show()




+-----------+-----------------+
|       zone|pickup_zone_count|
+-----------+-----------------+
|Jamaica Bay|                1|
+-----------+-----------------+



                                                                                