In [1]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql import types
from pyspark.sql import functions as F

In [2]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('de-hw') \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


23/03/03 22:02:25 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
data_path = "data/fhvhv_tripdata_2021-06.csv.gz"

In [4]:
df = spark.read \
    .option("header", "true") \
    .csv(data_path)

In [5]:
df.printSchema()

root
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: string (nullable = true)
 |-- dropoff_datetime: string (nullable = true)
 |-- PULocationID: string (nullable = true)
 |-- DOLocationID: string (nullable = true)
 |-- SR_Flag: string (nullable = true)
 |-- Affiliated_base_number: string (nullable = true)



In [6]:
schema = {
    "dispatching_base_num": types.StringType(),
    "pickup_datetime": types.TimestampType(),
    "dropoff_datetime": types.TimestampType(),
    "PULocationID": types.IntegerType(),
    "DOLocationID": types.IntegerType(),
    "SR_Flag": types.StringType(),
    "Affiliated_base_number": types.StringType(),
}
for col in df.columns:
    df = df.withColumn(col, df[col].cast(schema[col]))

In [7]:
df = df.repartition(12)

In [None]:
df.write.parquet("data/partitioned/")

In [8]:
from datetime import datetime
df_f = df.filter(F.to_date(df.pickup_datetime) == datetime(2021, 6, 15))

In [9]:
df_f.count()

                                                                                

452470

In [10]:
df.withColumn(
    "trip_length",
    (F.unix_timestamp(df.dropoff_datetime) - F.unix_timestamp(df.pickup_datetime)) / 3600,
).select(F.max(F.col("trip_length"))).show()



+----------------+
|max(trip_length)|
+----------------+
|66.8788888888889|
+----------------+



                                                                                

In [11]:
df_zones = spark.read \
    .option("header", "true") \
    .csv("data/taxi+_zone_lookup.csv")

In [12]:
df_zones.printSchema()

root
 |-- LocationID: string (nullable = true)
 |-- Borough: string (nullable = true)
 |-- Zone: string (nullable = true)
 |-- service_zone: string (nullable = true)



In [13]:
df_zones = df_zones.withColumn("LocationID", df_zones["LocationID"].cast(types.IntegerType()))

In [14]:
df_count_per_location = df \
    .groupBy("PULocationID") \
    .count() \
    .join(df_zones.withColumnRenamed("LocationID", "PULocationID"), on="PULocationID")

In [15]:
df_count_per_location.orderBy(F.desc(F.col("count"))).take(1)

                                                                                

[Row(PULocationID=61, count=231279, Borough='Brooklyn', Zone='Crown Heights North', service_zone='Boro Zone')]