In [1]:
import pyspark
print(pyspark.__file__)
print(pyspark.__version__)

c:\Users\Owner\anaconda3\Lib\site-packages\pyspark\__init__.py
3.5.5


In [2]:
from pyspark.sql import SparkSession
from pyspark.sql import Row

In [3]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

In [4]:
df = spark.read \
    .parquet('data/fhvhv_tripdata_2024-01.parquet')

In [5]:
df.show()

+-----------------+--------------------+--------------------+-------------------+-------------------+-------------------+-------------------+------------+------------+----------+---------+-------------------+-----+----+---------+--------------------+-----------+----+----------+-------------------+-----------------+------------------+----------------+--------------+
|hvfhs_license_num|dispatching_base_num|originating_base_num|   request_datetime|  on_scene_datetime|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|trip_miles|trip_time|base_passenger_fare|tolls| bcf|sales_tax|congestion_surcharge|airport_fee|tips|driver_pay|shared_request_flag|shared_match_flag|access_a_ride_flag|wav_request_flag|wav_match_flag|
+-----------------+--------------------+--------------------+-------------------+-------------------+-------------------+-------------------+------------+------------+----------+---------+-------------------+-----+----+---------+--------------------+-----------+--

In [6]:
df.schema

StructType([StructField('hvfhs_license_num', StringType(), True), StructField('dispatching_base_num', StringType(), True), StructField('originating_base_num', StringType(), True), StructField('request_datetime', TimestampNTZType(), True), StructField('on_scene_datetime', TimestampNTZType(), True), StructField('pickup_datetime', TimestampNTZType(), True), StructField('dropoff_datetime', TimestampNTZType(), True), StructField('PULocationID', IntegerType(), True), StructField('DOLocationID', IntegerType(), True), StructField('trip_miles', DoubleType(), True), StructField('trip_time', LongType(), True), StructField('base_passenger_fare', DoubleType(), True), StructField('tolls', DoubleType(), True), StructField('bcf', DoubleType(), True), StructField('sales_tax', DoubleType(), True), StructField('congestion_surcharge', DoubleType(), True), StructField('airport_fee', DoubleType(), True), StructField('tips', DoubleType(), True), StructField('driver_pay', DoubleType(), True), StructField('sha

In [7]:
df = df.repartition(24)

In [9]:
df.write.csv('fhvhv/2024/01/', mode='overwrite')

We can create a dataframe from the parquet files we created in the previous section:



In [10]:
test_df = spark.read.option("header", "true").csv('fhvhv/2024/01/')

In [11]:
test_df.show()

+------+-------+-------+-----------------------+-----------------------+-----------------------+-----------------------+---+---+-----+----+-----+-----+----+----+-----+-----+-----+-----+---+---+---+---+---+
|HV0003|B034041|B034042|2024-01-01T22:58:00.000|2024-01-01T23:02:44.000|2024-01-01T23:03:53.000|2024-01-01T23:13:39.000| 39| 72|  1.7| 586|10.74|0.012| 0.3|0.95|0.015|0.016|0.017| 8.12|N19|N20|N21|N22|N23|
+------+-------+-------+-----------------------+-----------------------+-----------------------+-----------------------+---+---+-----+----+-----+-----+----+----+-----+-----+-----+-----+---+---+---+---+---+
|HV0003| B03404| B03404|   2024-01-01T01:09:...|   2024-01-01T01:19:...|   2024-01-01T01:19:...|   2024-01-01T01:32:...|197|258| 1.91| 769|16.07|  0.0|0.44|1.43|  0.0|  0.0|  0.0|12.99|  N|  N|  N|  N|  N|
|HV0003| B03404| B03404|   2024-01-01T07:34:...|   2024-01-01T07:41:...|   2024-01-01T07:42:...|   2024-01-01T08:05:...| 36|249| 6.09|1383|39.61|  0.0|1.09|3.52| 2.75|  0.0|  0

In [12]:
df = spark.read \
    .parquet('data/fhvhv_tripdata_2024-01.parquet')

Unlike CSV files, parquet files contain the schema of the dataset, so there is no need to specify a schema like we previously did when reading the CSV file. You can check the schema like this:

In [13]:
df.printSchema()

root
 |-- hvfhs_license_num: string (nullable = true)
 |-- dispatching_base_num: string (nullable = true)
 |-- originating_base_num: string (nullable = true)
 |-- request_datetime: timestamp_ntz (nullable = true)
 |-- on_scene_datetime: timestamp_ntz (nullable = true)
 |-- pickup_datetime: timestamp_ntz (nullable = true)
 |-- dropoff_datetime: timestamp_ntz (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- trip_miles: double (nullable = true)
 |-- trip_time: long (nullable = true)
 |-- base_passenger_fare: double (nullable = true)
 |-- tolls: double (nullable = true)
 |-- bcf: double (nullable = true)
 |-- sales_tax: double (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- airport_fee: double (nullable = true)
 |-- tips: double (nullable = true)
 |-- driver_pay: double (nullable = true)
 |-- shared_request_flag: string (nullable = true)
 |-- shared_match_flag: string (nullable = true)
 |-- access_a_

### Pandas-like operations

#### 1. Display & Inspect Data

In [14]:
df.show()  # Show first 20 rows

+-----------------+--------------------+--------------------+-------------------+-------------------+-------------------+-------------------+------------+------------+----------+---------+-------------------+-----+----+---------+--------------------+-----------+----+----------+-------------------+-----------------+------------------+----------------+--------------+
|hvfhs_license_num|dispatching_base_num|originating_base_num|   request_datetime|  on_scene_datetime|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|trip_miles|trip_time|base_passenger_fare|tolls| bcf|sales_tax|congestion_surcharge|airport_fee|tips|driver_pay|shared_request_flag|shared_match_flag|access_a_ride_flag|wav_request_flag|wav_match_flag|
+-----------------+--------------------+--------------------+-------------------+-------------------+-------------------+-------------------+------------+------------+----------+---------+-------------------+-----+----+---------+--------------------+-----------+--

In [15]:
df.show(5)  # Show first 5 rows

+-----------------+--------------------+--------------------+-------------------+-------------------+-------------------+-------------------+------------+------------+----------+---------+-------------------+-----+----+---------+--------------------+-----------+----+----------+-------------------+-----------------+------------------+----------------+--------------+
|hvfhs_license_num|dispatching_base_num|originating_base_num|   request_datetime|  on_scene_datetime|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|trip_miles|trip_time|base_passenger_fare|tolls| bcf|sales_tax|congestion_surcharge|airport_fee|tips|driver_pay|shared_request_flag|shared_match_flag|access_a_ride_flag|wav_request_flag|wav_match_flag|
+-----------------+--------------------+--------------------+-------------------+-------------------+-------------------+-------------------+------------+------------+----------+---------+-------------------+-----+----+---------+--------------------+-----------+--

In [16]:
df.describe().show()  # Summary statistics

+-------+-----------------+--------------------+--------------------+------------------+-----------------+-----------------+------------------+-------------------+------------------+-----------------+------------------+--------------------+-------------------+------------------+------------------+-------------------+-----------------+------------------+----------------+--------------+
|summary|hvfhs_license_num|dispatching_base_num|originating_base_num|      PULocationID|     DOLocationID|       trip_miles|         trip_time|base_passenger_fare|             tolls|              bcf|         sales_tax|congestion_surcharge|        airport_fee|              tips|        driver_pay|shared_request_flag|shared_match_flag|access_a_ride_flag|wav_request_flag|wav_match_flag|
+-------+-----------------+--------------------+--------------------+------------------+-----------------+-----------------+------------------+-------------------+------------------+-----------------+------------------+-----

#### 2. Selecting Columns

In [17]:
df.select("dispatching_base_num").show()

+--------------------+
|dispatching_base_num|
+--------------------+
|              B03404|
|              B03404|
|              B03404|
|              B03404|
|              B03404|
|              B03404|
|              B03404|
|              B03404|
|              B03404|
|              B03404|
|              B03406|
|              B03406|
|              B03404|
|              B03406|
|              B03406|
|              B03406|
|              B03406|
|              B03406|
|              B03406|
|              B03404|
+--------------------+
only showing top 20 rows



In [18]:
df.select(["PULocationID", "DOLocationID"]).show(6)

+------------+------------+
|PULocationID|DOLocationID|
+------------+------------+
|         161|         158|
|         137|          79|
|          79|         186|
|         234|         148|
|         148|          97|
|         255|          95|
+------------+------------+
only showing top 6 rows



#### 3. Filtering Data (Equivalent to df[df['col'] == value] in Pandas)


In [19]:
df.filter(df["PULocationID"] == 255).show()

+-----------------+--------------------+--------------------+-------------------+-------------------+-------------------+-------------------+------------+------------+----------+---------+-------------------+-----+----+---------+--------------------+-----------+-----+----------+-------------------+-----------------+------------------+----------------+--------------+
|hvfhs_license_num|dispatching_base_num|originating_base_num|   request_datetime|  on_scene_datetime|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|trip_miles|trip_time|base_passenger_fare|tolls| bcf|sales_tax|congestion_surcharge|airport_fee| tips|driver_pay|shared_request_flag|shared_match_flag|access_a_ride_flag|wav_request_flag|wav_match_flag|
+-----------------+--------------------+--------------------+-------------------+-------------------+-------------------+-------------------+------------+------------+----------+---------+-------------------+-----+----+---------+--------------------+-----------+

In [20]:
df.filter(df["PULocationID"] > 200).show()

+-----------------+--------------------+--------------------+-------------------+-------------------+-------------------+-------------------+------------+------------+----------+---------+-------------------+-----+----+---------+--------------------+-----------+----+----------+-------------------+-----------------+------------------+----------------+--------------+
|hvfhs_license_num|dispatching_base_num|originating_base_num|   request_datetime|  on_scene_datetime|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|trip_miles|trip_time|base_passenger_fare|tolls| bcf|sales_tax|congestion_surcharge|airport_fee|tips|driver_pay|shared_request_flag|shared_match_flag|access_a_ride_flag|wav_request_flag|wav_match_flag|
+-----------------+--------------------+--------------------+-------------------+-------------------+-------------------+-------------------+------------+------------+----------+---------+-------------------+-----+----+---------+--------------------+-----------+--

In [21]:
df.filter((df["PULocationID"] > 200) & (df["DOLocationID"] > 200)).show()

+-----------------+--------------------+--------------------+-------------------+-------------------+-------------------+-------------------+------------+------------+----------+---------+-------------------+-----+----+---------+--------------------+-----------+-----+----------+-------------------+-----------------+------------------+----------------+--------------+
|hvfhs_license_num|dispatching_base_num|originating_base_num|   request_datetime|  on_scene_datetime|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|trip_miles|trip_time|base_passenger_fare|tolls| bcf|sales_tax|congestion_surcharge|airport_fee| tips|driver_pay|shared_request_flag|shared_match_flag|access_a_ride_flag|wav_request_flag|wav_match_flag|
+-----------------+--------------------+--------------------+-------------------+-------------------+-------------------+-------------------+------------+------------+----------+---------+-------------------+-----+----+---------+--------------------+-----------+

In [22]:
# Length
df.filter((df["PULocationID"] > 200) & (df["DOLocationID"] > 200)).count()

1897405

### Functions and User Defined Functions (UDFs)
In Apache Spark, functions are used to manipulate data within DataFrames. Spark provides built-in functions for common operations, but when those are not enough, you can create User-Defined Functions (UDFs).

#### Built-in Functions

In [23]:
from pyspark.sql import functions as F

In [24]:
df \
    .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \
    .withColumn('dropoff_date', F.to_date(df.dropoff_datetime)) \
    .select('pickup_date', 'dropoff_date', 'PULocationID', 'DOLocationID') \
    .show()

+-----------+------------+------------+------------+
|pickup_date|dropoff_date|PULocationID|DOLocationID|
+-----------+------------+------------+------------+
| 2024-01-01|  2024-01-01|         161|         158|
| 2024-01-01|  2024-01-01|         137|          79|
| 2024-01-01|  2024-01-01|          79|         186|
| 2024-01-01|  2024-01-01|         234|         148|
| 2024-01-01|  2024-01-01|         148|          97|
| 2024-01-01|  2024-01-01|         255|          95|
| 2024-01-01|  2024-01-01|          95|         212|
| 2024-01-01|  2024-01-01|         213|          47|
| 2024-01-01|  2024-01-01|         209|         114|
| 2024-01-01|  2024-01-01|         113|         209|
| 2024-01-01|  2024-01-01|          88|         255|
| 2024-01-01|  2024-01-01|         255|          79|
| 2024-01-01|  2024-01-01|         195|         112|
| 2024-01-01|  2024-01-01|         229|          87|
| 2024-01-01|  2024-01-01|         209|         249|
| 2024-01-01|  2024-01-01|         249|       