In [1]:
import pyspark
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/03/03 13:27:56 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
spark.version

'3.5.1'

In [7]:
!gunzip -c fhv_tripdata_2019-10.csv.gz > fhv_tripdata_2019.csv

In [8]:
!wc -l fhv_tripdata_2019.csv

 1897494 fhv_tripdata_2019.csv


In [9]:
df = spark.read \
    .option("header", "true") \
    .csv('fhv_tripdata_2019.csv')

df.show()

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropOff_datetime|PUlocationID|DOlocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B00009|2019-10-01 00:23:00|2019-10-01 00:35:00|         264|         264|   NULL|                B00009|
|              B00013|2019-10-01 00:11:29|2019-10-01 00:13:22|         264|         264|   NULL|                B00013|
|              B00014|2019-10-01 00:11:43|2019-10-01 00:37:20|         264|         264|   NULL|                B00014|
|              B00014|2019-10-01 00:56:29|2019-10-01 00:57:47|         264|         264|   NULL|                B00014|
|              B00014|2019-10-01 00:23:09|2019-10-01 00:28:27|         264|         264|   NULL|                B00014|
|     B00021         |2019-10-01 00:00:4

In [10]:
df.schema

StructType([StructField('dispatching_base_num', StringType(), True), StructField('pickup_datetime', StringType(), True), StructField('dropOff_datetime', StringType(), True), StructField('PUlocationID', StringType(), True), StructField('DOlocationID', StringType(), True), StructField('SR_Flag', StringType(), True), StructField('Affiliated_base_number', StringType(), True)])

In [11]:
from pyspark.sql import types

In [12]:
schema = types.StructType([
    types.StructField('dispatching_base_num', types.StringType(), True), 
    types.StructField('pickup_datetime', types.TimestampType(), True), 
    types.StructField('dropOff_datetime', types.TimestampType(), True), 
    types.StructField('PULocationID', types.IntegerType(), True), 
    types.StructField('DOLocationID', types.IntegerType(), True), 
    types.StructField('SR_Flag', types.StringType(), True),
    types.StructField('Affiliated_base_number', types.StringType(), True)
])

In [13]:
df_test = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv('fhv_tripdata_2019.csv')

In [14]:
df_partition = df_test.repartition(6)

In [None]:
df_partition.write.parquet('fhv/2019/10/', mode='overwrite')

Count from localhost:4040-> 36.1MB for 6 partitions

In [16]:
df_partition.registerTempTable('fhv_trips_data')



In [25]:
spark.sql("""
SELECT 
    count(1)
FROM
    fhv_trips_data
WHERE 
    to_date(pickup_datetime) = '2019-10-15'
""").show()

+--------+
|count(1)|
+--------+
|   62610|
+--------+



In [33]:
spark.sql("""
SELECT 
    MAX(TIMESTAMPDIFF(HOUR, pickup_datetime, dropOff_datetime)) AS max_diff_hours
FROM
    fhv_trips_data
""").show()



+--------------+
|max_diff_hours|
+--------------+
|        631152|
+--------------+



                                                                                

In [34]:
df_zones = spark.read \
    .option("header", "true") \
    .csv('taxi_zone_lookup.csv')

In [35]:
df_zones.write.parquet('zones')

In [42]:
df_fhv_revenue = spark.sql("""
SELECT 
    date_trunc('hour', pickup_datetime) AS hour, 
    PULocationID AS zone_fhv,
    COUNT(1) AS number_records
FROM
    fhv_trips_data
WHERE
    pickup_datetime >= '2019-10-01 00:00:00'
    AND
    pickup_datetime <= '2019-10-31 23:59:59'
GROUP BY
    1, 2
""")

In [43]:
df_result = df_fhv_revenue.join(df_zones, df_fhv_revenue.zone_fhv == df_zones.LocationID)

In [44]:
df_result.write.parquet('data/report/revenue/fhv/2019/10', mode='overwrite')

                                                                                

In [48]:
df_result = spark.read.parquet('data/report/revenue/fhv/2019/10')

In [49]:
df_result

DataFrame[hour: timestamp, zone_fhv: int, number_records: bigint, LocationID: string, Borough: string, Zone: string, service_zone: string]

In [50]:
df_result.registerTempTable('fhv_trips_data_2019_10')

In [53]:
spark.sql("""
SELECT 
    Zone,
    COUNT(1) AS total_count
FROM
    fhv_trips_data_2019_10
GROUP BY 
    Zone
ORDER BY 
    total_count
LIMIT 1
""").show()

+-----------+-----------+
|       Zone|total_count|
+-----------+-----------+
|Jamaica Bay|          1|
+-----------+-----------+

