In [56]:
import pyspark
import pandas as pd
pd.DataFrame.iteritems = pd.DataFrame.items

In [57]:
from pyspark.sql import SparkSession

In [58]:
import sys
print(type(df_pandas))
print("PySpark version:", pyspark.__version__)
print("Pandas version:", pd.__version__)
print("Python version:", sys.version)

<class 'pandas.core.frame.DataFrame'>
PySpark version: 3.3.2
Pandas version: 2.0.3
Python version: 3.8.18 (default, Sep 11 2023, 13:40:15) 
[GCC 11.2.0]


In [59]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()


24/02/25 22:32:13 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.


In [60]:

df = spark.read \
    .option("header", "true") \
    .csv('data/fhv_tripdata_2019-10.csv')

df.show()

+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|dispatching_base_num|    pickup_datetime|   dropOff_datetime|PUlocationID|DOlocationID|SR_Flag|Affiliated_base_number|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+
|              B00009|2019-10-01 00:23:00|2019-10-01 00:35:00|         264|         264|   null|                B00009|
|              B00013|2019-10-01 00:11:29|2019-10-01 00:13:22|         264|         264|   null|                B00013|
|              B00014|2019-10-01 00:11:43|2019-10-01 00:37:20|         264|         264|   null|                B00014|
|              B00014|2019-10-01 00:56:29|2019-10-01 00:57:47|         264|         264|   null|                B00014|
|              B00014|2019-10-01 00:23:09|2019-10-01 00:28:27|         264|         264|   null|                B00014|
|     B00021         |2019-10-01 00:00:4

In [61]:
df.printSchema()

root
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: string (nullable = true)
 |-- dropOff_datetime: string (nullable = true)
 |-- PUlocationID: string (nullable = true)
 |-- DOlocationID: string (nullable = true)
 |-- SR_Flag: string (nullable = true)
 |-- Affiliated_base_number: string (nullable = true)



In [62]:
df_pandas = pd.read_csv('data/head.csv')

In [63]:
df_pandas.dtypes

dispatching_base_num       object
pickup_datetime            object
dropOff_datetime           object
PUlocationID              float64
DOlocationID              float64
SR_Flag                   float64
Affiliated_base_number     object
dtype: object

In [64]:
from pyspark.sql import types

In [65]:
schema = types.StructType([
    types.StructField('dispatching_base_num', types.StringType(), True),
    types.StructField('pickup_datetime', types.TimestampType(), True),
    types.StructField('dropoff_datetime', types.TimestampType(), True),
    types.StructField('PULocationID', types.IntegerType(), True),
    types.StructField('DOLocationID', types.IntegerType(), True),
    types.StructField('SR_Flag', types.IntegerType(), True),
    types.StructField('Affiliated_base_number', types.StringType(), True),
])

In [66]:
df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv('data/fhv_tripdata_2019-10.csv')

In [67]:
df = df.repartition(6)

In [68]:
df.write.parquet('data/fhv_pq/', mode='overwrite') 

                                                                                

In [69]:
df.printSchema()

root
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- SR_Flag: integer (nullable = true)
 |-- Affiliated_base_number: string (nullable = true)



In [70]:
df_pq = spark.read.parquet('data/fhv_pq/*')

In [71]:
df_pq.registerTempTable('fhv')



In [72]:
spark.sql("""
SELECT COUNT(*) AS number_of_trips
FROM fhv
WHERE DAY(pickup_datetime) = 15 AND MONTH(pickup_datetime) = 10;
""").show()

+---------------+
|number_of_trips|
+---------------+
|          62610|
+---------------+



In [73]:
spark.sql("""
SELECT 
    *,
    TIMESTAMPDIFF(HOUR, pickup_datetime, dropoff_datetime) AS trip_duration_hours
FROM 
    fhv
ORDER BY 
    trip_duration_hours DESC
LIMIT 1;
""").show()



+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+-------------------+
|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|Affiliated_base_number|trip_duration_hours|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+-------------------+
|              B02832|2019-10-11 18:00:00|2091-10-11 18:30:00|         264|         264|   null|                B02832|             631152|
+--------------------+-------------------+-------------------+------------+------------+-------+----------------------+-------------------+



                                                                                

In [74]:
df_zones = spark.read.parquet('zones/')

In [75]:
df_result = df_pq.join(df_zones, df_pq.PULocationID == df_zones.LocationID)

In [76]:
df_result.printSchema()

root
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: timestamp (nullable = true)
 |-- dropoff_datetime: timestamp (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- SR_Flag: integer (nullable = true)
 |-- Affiliated_base_number: string (nullable = true)
 |-- LocationID: string (nullable = true)
 |-- Borough: string (nullable = true)
 |-- Zone: string (nullable = true)
 |-- service_zone: string (nullable = true)



In [77]:
df_result.registerTempTable('fhv_zone')

In [78]:
spark.sql("""
SELECT Zone, COUNT(*) AS occurrence
FROM fhv_zone
GROUP BY Zone
ORDER BY occurrence ASC
LIMIT 1;
""").show()

+-----------+----------+
|       Zone|occurrence|
+-----------+----------+
|Jamaica Bay|         1|
+-----------+----------+



In [79]:
spark.stop()