In [1]:
import findspark
findspark.init()
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

In [2]:
spark.version

'3.0.3'

In [3]:
df = spark.read \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .csv('fhvhv_tripdata_2021-06.csv.gz')

In [4]:
df.schema

StructType(List(StructField(dispatching_base_num,StringType,true),StructField(pickup_datetime,StringType,true),StructField(dropoff_datetime,StringType,true),StructField(PULocationID,IntegerType,true),StructField(DOLocationID,IntegerType,true),StructField(SR_Flag,StringType,true),StructField(Affiliated_base_number,StringType,true)))

In [5]:
df = df.repartition(12)

In [None]:
df.write.parquet('fhvhv/2021/06/')

In [7]:
# SQL Queries

In [8]:
df.registerTempTable('trips_data')

In [11]:
# Count records from June 15
spark.sql("""
SELECT COUNT(1) FROM trips_data
WHERE DAY(pickup_datetime) = 15
""").show()

+--------+
|count(1)|
+--------+
|  452470|
+--------+



In [18]:
from pyspark.sql import functions as F

In [29]:
# Longest trip
spark.sql("""
select max((double(to_timestamp(dropoff_datetime))-double(to_timestamp(pickup_datetime)))/3600) as hours
from trips_data
""").show()

+----------------+
|           hours|
+----------------+
|66.8788888888889|
+----------------+



In [None]:
# Most frequent pick up location zone

In [30]:
df_zones = spark.read.parquet('zones/')

In [34]:
df_join = df.join(df_zones, df.PULocationID == df_zones.LocationID)

In [35]:
df_join.registerTempTable('trips_data_joined')

In [41]:
spark.sql("""
SELECT zone
FROM trips_data_joined
WHERE PULocationID = (
  SELECT PULocationID
  FROM trips_data_joined
  GROUP BY PULocationID
  ORDER BY COUNT(*) DESC
  LIMIT 1
)
LIMIT 1
""").show()

+-------------------+
|               zone|
+-------------------+
|Crown Heights North|
+-------------------+

