In [2]:
import findspark

In [3]:
findspark.init()

In [4]:
import pyspark

In [5]:
from pyspark.sql import SparkSession

In [5]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .config("spark.driver.extraClassPath", "C:\spark\spark-3.5.0-bin-hadoop3")\
    .getOrCreate()

In [None]:
!wget https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhvhv/fhvhv_tripdata_2021-01.csv.gz

In [6]:
!wc -l fhvhv_tripdata_2021-01.csv

'wc' is not recognized as an internal or external command,
operable program or batch file.


In [25]:
df = spark.read \
    .option("header", "true") \
    .csv('fhvhv_tripdata_2021-01.csv')

In [26]:
df.show()

+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|hvfhs_license_num|dispatching_base_num|    pickup_datetime|   dropoff_datetime|PULocationID|DOLocationID|SR_Flag|
+-----------------+--------------------+-------------------+-------------------+------------+------------+-------+
|           HV0003|              B02682|2021-01-01 00:33:44|2021-01-01 00:49:07|         230|         166|   NULL|
|           HV0003|              B02682|2021-01-01 00:55:19|2021-01-01 01:18:21|         152|         167|   NULL|
|           HV0003|              B02764|2021-01-01 00:23:56|2021-01-01 00:38:05|         233|         142|   NULL|
|           HV0003|              B02764|2021-01-01 00:42:51|2021-01-01 00:45:50|         142|         143|   NULL|
|           HV0003|              B02764|2021-01-01 00:48:14|2021-01-01 01:08:42|         143|          78|   NULL|
|           HV0005|              B02510|2021-01-01 00:06:59|2021-01-01 00:43:01|

In [27]:
df.head(5)

[Row(hvfhs_license_num='HV0003', dispatching_base_num='B02682', pickup_datetime='2021-01-01 00:33:44', dropoff_datetime='2021-01-01 00:49:07', PULocationID='230', DOLocationID='166', SR_Flag=None),
 Row(hvfhs_license_num='HV0003', dispatching_base_num='B02682', pickup_datetime='2021-01-01 00:55:19', dropoff_datetime='2021-01-01 01:18:21', PULocationID='152', DOLocationID='167', SR_Flag=None),
 Row(hvfhs_license_num='HV0003', dispatching_base_num='B02764', pickup_datetime='2021-01-01 00:23:56', dropoff_datetime='2021-01-01 00:38:05', PULocationID='233', DOLocationID='142', SR_Flag=None),
 Row(hvfhs_license_num='HV0003', dispatching_base_num='B02764', pickup_datetime='2021-01-01 00:42:51', dropoff_datetime='2021-01-01 00:45:50', PULocationID='142', DOLocationID='143', SR_Flag=None),
 Row(hvfhs_license_num='HV0003', dispatching_base_num='B02764', pickup_datetime='2021-01-01 00:48:14', dropoff_datetime='2021-01-01 01:08:42', PULocationID='143', DOLocationID='78', SR_Flag=None)]

In [10]:
# Read the first 1001 lines from the file
with open('fhvhv_tripdata_2021-01.csv', 'r', encoding='utf-8') as f:
    lines = f.readlines()[:1001]

In [11]:
len(lines)

1001

In [12]:
# Write the lines to a new file
with open('head.csv', 'w', encoding='utf-8') as f:
    f.writelines(lines)

In [13]:
import pandas as pd

In [14]:
df_pandas = pd.read_csv('head.csv')

In [15]:
df_pandas.dtypes

hvfhs_license_num        object
dispatching_base_num     object
pickup_datetime          object
dropoff_datetime         object
PULocationID              int64
DOLocationID              int64
SR_Flag                 float64
dtype: object

In [16]:
spark.createDataFrame(df_pandas).schema

StructType([StructField('hvfhs_license_num', StringType(), True), StructField('dispatching_base_num', StringType(), True), StructField('pickup_datetime', StringType(), True), StructField('dropoff_datetime', StringType(), True), StructField('PULocationID', LongType(), True), StructField('DOLocationID', LongType(), True), StructField('SR_Flag', DoubleType(), True)])

In [17]:
from pyspark.sql import types

In [18]:
schema = types.StructType([
    types.StructField('hvfhs_license_num', types.StringType(), True),
    types.StructField('dispatching_base_num', types.StringType(), True),
    types.StructField('pickup_datetime', types.TimestampType(), True),
    types.StructField('dropoff_datetime', types.TimestampType(), True),
    types.StructField('PULocationID', types.IntegerType(), True),
    types.StructField('DOLocationID', types.IntegerType(), True),
    types.StructField('SR_Flag', types.StringType(), True)
])

In [19]:
df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv('fhvhv_tripdata_2021-01.csv')

In [20]:
df.head(10)

[Row(hvfhs_license_num='HV0003', dispatching_base_num='B02682', pickup_datetime=datetime.datetime(2021, 1, 1, 0, 33, 44), dropoff_datetime=datetime.datetime(2021, 1, 1, 0, 49, 7), PULocationID=230, DOLocationID=166, SR_Flag=None),
 Row(hvfhs_license_num='HV0003', dispatching_base_num='B02682', pickup_datetime=datetime.datetime(2021, 1, 1, 0, 55, 19), dropoff_datetime=datetime.datetime(2021, 1, 1, 1, 18, 21), PULocationID=152, DOLocationID=167, SR_Flag=None),
 Row(hvfhs_license_num='HV0003', dispatching_base_num='B02764', pickup_datetime=datetime.datetime(2021, 1, 1, 0, 23, 56), dropoff_datetime=datetime.datetime(2021, 1, 1, 0, 38, 5), PULocationID=233, DOLocationID=142, SR_Flag=None),
 Row(hvfhs_license_num='HV0003', dispatching_base_num='B02764', pickup_datetime=datetime.datetime(2021, 1, 1, 0, 42, 51), dropoff_datetime=datetime.datetime(2021, 1, 1, 0, 45, 50), PULocationID=142, DOLocationID=143, SR_Flag=None),
 Row(hvfhs_license_num='HV0003', dispatching_base_num='B02764', pickup_dat

In [21]:
df.write.mode("overwrite").parquet('zones')

## Partitions

In [28]:
# create 24 partitions in our dataframe
df = df.repartition(24)

In [31]:
# parquetize and write to fhvhv/2021/01/ folder
df.write.mode("overwrite").parquet('D:/data_engenering/week4/fhvhv/2021/01')

## Spark dataframes

In [32]:
df = spark.read.parquet('D:/data_engenering/week4/fhvhv/2021/01')

In [33]:
df.printSchema()

root
 |-- hvfhs_license_num: string (nullable = true)
 |-- dispatching_base_num: string (nullable = true)
 |-- pickup_datetime: string (nullable = true)
 |-- dropoff_datetime: string (nullable = true)
 |-- PULocationID: string (nullable = true)
 |-- DOLocationID: string (nullable = true)
 |-- SR_Flag: string (nullable = true)



In [36]:
new_df1 = df.select('pickup_datetime', 'dropoff_datetime', 'PULocationID', 'DOLocationID')

In [37]:
new_df2 = df.select('pickup_datetime', 'dropoff_datetime', 'PULocationID', 'DOLocationID').filter(df.hvfhs_license_num == 'HV0003')

## Functions and User Defined Functions (UDFs)

In [40]:
from pyspark.sql import functions as F

In [41]:
df \
    .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \
    .withColumn('dropoff_date', F.to_date(df.dropoff_datetime)) \
    .select('pickup_date', 'dropoff_date', 'PULocationID', 'DOLocationID') \
    .show()

+-----------+------------+------------+------------+
|pickup_date|dropoff_date|PULocationID|DOLocationID|
+-----------+------------+------------+------------+
| 2021-01-02|  2021-01-02|           7|         223|
| 2021-01-03|  2021-01-03|         254|         167|
| 2021-01-04|  2021-01-04|          74|          41|
| 2021-01-04|  2021-01-04|         142|         116|
| 2021-01-03|  2021-01-03|          20|         174|
| 2021-01-06|  2021-01-06|         161|         246|
| 2021-01-01|  2021-01-01|         129|         258|
| 2021-01-03|  2021-01-03|         162|          79|
| 2021-01-05|  2021-01-05|         143|         132|
| 2021-01-01|  2021-01-01|         197|         205|
| 2021-01-01|  2021-01-01|          61|          62|
| 2021-01-04|  2021-01-04|         263|          41|
| 2021-01-02|  2021-01-02|         258|          77|
| 2021-01-03|  2021-01-03|          61|         181|
| 2021-01-05|  2021-01-05|          86|         117|
| 2021-01-03|  2021-01-03|          61|       

In [42]:
# A crazy function that changes values when they're divisible by 7 or 3
def crazy_stuff(base_num):
    num = int(base_num[1:])
    if num % 7 == 0:
        return f's/{num:03x}'
    elif num % 3 == 0:
        return f'a/{num:03x}'
    else:
        return f'e/{num:03x}'

# Creating the actual UDF
crazy_stuff_udf = F.udf(crazy_stuff, returnType=types.StringType())

In [43]:
df \
    .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \
    .withColumn('dropoff_date', F.to_date(df.dropoff_datetime)) \
    .withColumn('base_id', crazy_stuff_udf(df.dispatching_base_num)) \
    .select('base_id', 'pickup_date', 'dropoff_date', 'PULocationID', 'DOLocationID') \
    .show()

+-------+-----------+------------+------------+------------+
|base_id|pickup_date|dropoff_date|PULocationID|DOLocationID|
+-------+-----------+------------+------------+------------+
|  e/9ce| 2021-01-02|  2021-01-02|           7|         223|
|  s/b3d| 2021-01-03|  2021-01-03|         254|         167|
|  s/b13| 2021-01-04|  2021-01-04|          74|          41|
|  e/9ce| 2021-01-04|  2021-01-04|         142|         116|
|  e/b47| 2021-01-03|  2021-01-03|          20|         174|
|  s/acd| 2021-01-06|  2021-01-06|         161|         246|
|  e/9ce| 2021-01-01|  2021-01-01|         129|         258|
|  e/b38| 2021-01-03|  2021-01-03|         162|          79|
|  a/b49| 2021-01-05|  2021-01-05|         143|         132|
|  e/a39| 2021-01-01|  2021-01-01|         197|         205|
|  e/9ce| 2021-01-01|  2021-01-01|          61|          62|
|  e/9ce| 2021-01-04|  2021-01-04|         263|          41|
|  e/b38| 2021-01-02|  2021-01-02|         258|          77|
|  e/9ce| 2021-01-03|  2

Enter taxi type (e.g., yellow):  green


In [8]:
from pyspark.sql import types
import os
import requests
import gzip

In [7]:
spark = SparkSession.builder \
    .master("local[*]") \
    .appName('test') \
    .getOrCreate()

In [8]:
green_schema = types.StructType([
    types.StructField("VendorID", types.IntegerType(), True),
    types.StructField("lpep_pickup_datetime", types.TimestampType(), True),
    types.StructField("lpep_dropoff_datetime", types.TimestampType(), True),
    types.StructField("store_and_fwd_flag", types.StringType(), True),
    types.StructField("RatecodeID", types.IntegerType(), True),
    types.StructField("PULocationID", types.IntegerType(), True),
    types.StructField("DOLocationID", types.IntegerType(), True),
    types.StructField("passenger_count", types.IntegerType(), True),
    types.StructField("trip_distance", types.DoubleType(), True),
    types.StructField("fare_amount", types.DoubleType(), True),
    types.StructField("extra", types.DoubleType(), True),
    types.StructField("mta_tax", types.DoubleType(), True),
    types.StructField("tip_amount", types.DoubleType(), True),
    types.StructField("tolls_amount", types.DoubleType(), True),
    types.StructField("ehail_fee", types.DoubleType(), True),
    types.StructField("improvement_surcharge", types.DoubleType(), True),
    types.StructField("total_amount", types.DoubleType(), True),
    types.StructField("payment_type", types.IntegerType(), True),
    types.StructField("trip_type", types.IntegerType(), True),
    types.StructField("congestion_surcharge", types.DoubleType(), True)
])

yellow_schema = types.StructType([
    types.StructField("VendorID", types.IntegerType(), True),
    types.StructField("tpep_pickup_datetime", types.TimestampType(), True),
    types.StructField("tpep_dropoff_datetime", types.TimestampType(), True),
    types.StructField("passenger_count", types.IntegerType(), True),
    types.StructField("trip_distance", types.DoubleType(), True),
    types.StructField("RatecodeID", types.IntegerType(), True),
    types.StructField("store_and_fwd_flag", types.StringType(), True),
    types.StructField("PULocationID", types.IntegerType(), True),
    types.StructField("DOLocationID", types.IntegerType(), True),
    types.StructField("payment_type", types.IntegerType(), True),
    types.StructField("fare_amount", types.DoubleType(), True),
    types.StructField("extra", types.DoubleType(), True),
    types.StructField("mta_tax", types.DoubleType(), True),
    types.StructField("tip_amount", types.DoubleType(), True),
    types.StructField("tolls_amount", types.DoubleType(), True),
    types.StructField("improvement_surcharge", types.DoubleType(), True),
    types.StructField("total_amount", types.DoubleType(), True),
    types.StructField("congestion_surcharge", types.DoubleType(), True)
])

In [None]:
output_path = "D:/data_engenering/week4/green2020"
TAXI_TYPE = input("Enter taxi type (e.g., yellow): ")
YEAR = input("Enter year (e.g., 2020): ")

URL_PREFIX = "https://s3.amazonaws.com/nyc-tlc/trip+data"

for MONTH in range(1, 13):
    FMONTH = str(MONTH).zfill(2)

    URL = f"{URL_PREFIX}/{TAXI_TYPE}_tripdata_{YEAR}-{FMONTH}.csv"

    LOCAL_PREFIX = f"data/raw/{TAXI_TYPE}/{YEAR}/{FMONTH}"
    LOCAL_FILE = f"D:/data_engenering/week4/green2020/{TAXI_TYPE}_tripdata_{YEAR}_{FMONTH}.csv"
    LOCAL_PATH = os.path.join(LOCAL_PREFIX, LOCAL_FILE)

    print(f"Downloading {URL} to {LOCAL_PATH}")
    os.makedirs(LOCAL_PREFIX, exist_ok=True)
    response = requests.get(URL)
    with open(LOCAL_PATH, 'wb') as f:
        f.write(response.content)

    print(f"Compressing {LOCAL_PATH}")
    with open(LOCAL_PATH, 'rb') as f_in:
        with gzip.open(LOCAL_PATH + '.gz', 'wb') as f_out:
            f_out.writelines(f_in)
            
    # Read data into DataFrame and write to Parquet
        df = spark.read \
            .option("header", "true") \
            .schema(schema) \
            .csv(local_path)
        
        df.repartition(4).write.mode("overwrite").parquet(output_path)


In [17]:
# Input and output paths
output_path = "D:/data_engenering/week4/green2020"
URL_PREFIX = "https://s3.amazonaws.com/nyc-tlc/trip+data"
# Define function to download and process data
def download_and_process_data(TAXI_TYPE, YEAR, schema,folder):
    for MONTH in range(1, 13):
        
        FMONTH = str(MONTH).zfill(2)
    
        URL = f"{URL_PREFIX}/{TAXI_TYPE}_tripdata_{YEAR}-{FMONTH}.csv"
    
        LOCAL_PREFIX = f"data/raw/{TAXI_TYPE}/{YEAR}/{FMONTH}"
        LOCAL_FILE = f"D:/data_engenering/week4/{folder}/{TAXI_TYPE}_tripdata_{YEAR}_{FMONTH}.csv"
        LOCAL_PATH = os.path.join(LOCAL_PREFIX, LOCAL_FILE)
    
        print(f"Downloading {URL} to {LOCAL_PATH}")
        os.makedirs(LOCAL_PREFIX, exist_ok=True)
        response = requests.get(URL)
        with open(LOCAL_PATH, 'wb') as f:
            f.write(response.content)
    
        print(f"Compressing {LOCAL_PATH}")
        with open(LOCAL_PATH, 'rb') as f_in:
            with gzip.open(LOCAL_PATH + '.gz', 'wb') as f_out:
                f_out.writelines(f_in)

        # Read data into DataFrame and write to Parquet
        #df = spark.read \
         #   .option("header", "true") \
         #   .schema(schema) \
         #   .csv(LOCAL_PATH)
        
       # df.repartition(4).write.mode("overwrite").parquet(output_path)


In [20]:
# Download and process data for green taxis
download_and_process_data("green", "2020", green_schema,"green2020")

Downloading https://s3.amazonaws.com/nyc-tlc/trip+data/green_tripdata_2020-01.csv to D:/data_engenering/week4/green2020/green_tripdata_2020_01.csv
Compressing D:/data_engenering/week4/green2020/green_tripdata_2020_01.csv
Downloading https://s3.amazonaws.com/nyc-tlc/trip+data/green_tripdata_2020-02.csv to D:/data_engenering/week4/green2020/green_tripdata_2020_02.csv
Compressing D:/data_engenering/week4/green2020/green_tripdata_2020_02.csv
Downloading https://s3.amazonaws.com/nyc-tlc/trip+data/green_tripdata_2020-03.csv to D:/data_engenering/week4/green2020/green_tripdata_2020_03.csv
Compressing D:/data_engenering/week4/green2020/green_tripdata_2020_03.csv
Downloading https://s3.amazonaws.com/nyc-tlc/trip+data/green_tripdata_2020-04.csv to D:/data_engenering/week4/green2020/green_tripdata_2020_04.csv
Compressing D:/data_engenering/week4/green2020/green_tripdata_2020_04.csv
Downloading https://s3.amazonaws.com/nyc-tlc/trip+data/green_tripdata_2020-05.csv to D:/data_engenering/week4/green2

In [21]:
# Download and process data for green taxis
download_and_process_data("green", "2021", green_schema,"green2021")

Downloading https://s3.amazonaws.com/nyc-tlc/trip+data/green_tripdata_2021-01.csv to D:/data_engenering/week4/green2021/green_tripdata_2021_01.csv
Compressing D:/data_engenering/week4/green2021/green_tripdata_2021_01.csv
Downloading https://s3.amazonaws.com/nyc-tlc/trip+data/green_tripdata_2021-02.csv to D:/data_engenering/week4/green2021/green_tripdata_2021_02.csv
Compressing D:/data_engenering/week4/green2021/green_tripdata_2021_02.csv
Downloading https://s3.amazonaws.com/nyc-tlc/trip+data/green_tripdata_2021-03.csv to D:/data_engenering/week4/green2021/green_tripdata_2021_03.csv
Compressing D:/data_engenering/week4/green2021/green_tripdata_2021_03.csv
Downloading https://s3.amazonaws.com/nyc-tlc/trip+data/green_tripdata_2021-04.csv to D:/data_engenering/week4/green2021/green_tripdata_2021_04.csv
Compressing D:/data_engenering/week4/green2021/green_tripdata_2021_04.csv
Downloading https://s3.amazonaws.com/nyc-tlc/trip+data/green_tripdata_2021-05.csv to D:/data_engenering/week4/green2

In [22]:
# Download and process data for yellow taxis
download_and_process_data("yellow", "2020", yellow_schema,"yellow2020")

Downloading https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2020-01.csv to D:/data_engenering/week4/yellow2020/yellow_tripdata_2020_01.csv
Compressing D:/data_engenering/week4/yellow2020/yellow_tripdata_2020_01.csv
Downloading https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2020-02.csv to D:/data_engenering/week4/yellow2020/yellow_tripdata_2020_02.csv
Compressing D:/data_engenering/week4/yellow2020/yellow_tripdata_2020_02.csv
Downloading https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2020-03.csv to D:/data_engenering/week4/yellow2020/yellow_tripdata_2020_03.csv
Compressing D:/data_engenering/week4/yellow2020/yellow_tripdata_2020_03.csv
Downloading https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2020-04.csv to D:/data_engenering/week4/yellow2020/yellow_tripdata_2020_04.csv
Compressing D:/data_engenering/week4/yellow2020/yellow_tripdata_2020_04.csv
Downloading https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2020-05.csv to D:/data_en

In [23]:
# Download and process data for yellow taxis
download_and_process_data("yellow", "2021", yellow_schema,"yellow2021")

Downloading https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2021-01.csv to D:/data_engenering/week4/yellow2021/yellow_tripdata_2021_01.csv
Compressing D:/data_engenering/week4/yellow2021/yellow_tripdata_2021_01.csv
Downloading https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2021-02.csv to D:/data_engenering/week4/yellow2021/yellow_tripdata_2021_02.csv
Compressing D:/data_engenering/week4/yellow2021/yellow_tripdata_2021_02.csv
Downloading https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2021-03.csv to D:/data_engenering/week4/yellow2021/yellow_tripdata_2021_03.csv
Compressing D:/data_engenering/week4/yellow2021/yellow_tripdata_2021_03.csv
Downloading https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2021-04.csv to D:/data_engenering/week4/yellow2021/yellow_tripdata_2021_04.csv
Compressing D:/data_engenering/week4/yellow2021/yellow_tripdata_2021_04.csv
Downloading https://s3.amazonaws.com/nyc-tlc/trip+data/yellow_tripdata_2021-05.csv to D:/data_en

In [29]:
year = 2020

for month in range(1, 13):
    print(f'processing data for {year}/{month}')

    input_path = f'D:/data_engenering/week4/green2020/green_tripdata_{year}_{month:02d}.csv'
    output_path = f'D:/data_engenering/week4/green/{year}/{month:02d}/'

    df_green = spark.read \
        .option("header", "true") \
        .schema(green_schema) \
        .csv(input_path)

    df_green \
        .repartition(4) \
        .write.parquet(output_path)

processing data for 2020/1
processing data for 2020/2
processing data for 2020/3
processing data for 2020/4
processing data for 2020/5
processing data for 2020/6
processing data for 2020/7
processing data for 2020/8
processing data for 2020/9
processing data for 2020/10
processing data for 2020/11
processing data for 2020/12


In [31]:
year = 2021

for month in range(1, 13):
    print(f'processing data for {year}/{month}')

    input_path = f'D:/data_engenering/week4/green2021/green_tripdata_{year}_{month:02d}.csv'
    output_path = f'D:/data_engenering/week4/green/{year}/{month:02d}/'

    df_green = spark.read \
        .option("header", "true") \
        .schema(green_schema) \
        .csv(input_path)

    df_green \
        .repartition(4) \
        .write.parquet(output_path)

processing data for 2021/1
processing data for 2021/2
processing data for 2021/3
processing data for 2021/4
processing data for 2021/5
processing data for 2021/6
processing data for 2021/7
processing data for 2021/8
processing data for 2021/9
processing data for 2021/10
processing data for 2021/11
processing data for 2021/12


In [33]:
year = 2020

for month in range(1, 13):
    print(f'processing data for {year}/{month}')

    input_path = f'D:/data_engenering/week4/yellow2020/yellow_tripdata_{year}_{month:02d}.csv'
    output_path = f'D:/data_engenering/week4/yellow/{year}/{month:02d}/'

    df_green = spark.read \
        .option("header", "true") \
        .schema(yellow_schema) \
        .csv(input_path)

    df_green \
        .repartition(4) \
        .write.parquet(output_path)

processing data for 2020/1
processing data for 2020/2
processing data for 2020/3
processing data for 2020/4
processing data for 2020/5
processing data for 2020/6
processing data for 2020/7
processing data for 2020/8
processing data for 2020/9
processing data for 2020/10
processing data for 2020/11
processing data for 2020/12


In [34]:
year = 2021

for month in range(1, 13):
    print(f'processing data for {year}/{month}')

    input_path = f'D:/data_engenering/week4/yellow2021/yellow_tripdata_{year}_{month:02d}.csv'
    output_path = f'D:/data_engenering/week4/yellow/{year}/{month:02d}/'

    df_green = spark.read \
        .option("header", "true") \
        .schema(yellow_schema) \
        .csv(input_path)

    df_green \
        .repartition(4) \
        .write.parquet(output_path)

processing data for 2021/1
processing data for 2021/2
processing data for 2021/3
processing data for 2021/4
processing data for 2021/5
processing data for 2021/6
processing data for 2021/7
processing data for 2021/8
processing data for 2021/9
processing data for 2021/10
processing data for 2021/11
processing data for 2021/12


### Spark SQL

In [66]:
df_green = spark.read.parquet('D:/data_engenering/week4/green/*/*')

In [67]:
df_green = df_green \
    .withColumnRenamed('lpep_pickup_datetime', 'pickup_datetime') \
    .withColumnRenamed('lpep_dropoff_datetime', 'dropoff_datetime')

In [68]:
df_yellow = spark.read.parquet('D:/data_engenering/week4/yellow/*/*')

In [69]:
df_yellow = df_yellow \
    .withColumnRenamed('tpep_pickup_datetime', 'pickup_datetime') \
    .withColumnRenamed('tpep_dropoff_datetime', 'dropoff_datetime')

In [70]:
set(df_green.columns) & set(df_yellow.columns)

{'DOLocationID',
 'PULocationID',
 'RatecodeID',
 'VendorID',
 'congestion_surcharge',
 'dropoff_datetime',
 'extra',
 'fare_amount',
 'improvement_surcharge',
 'mta_tax',
 'passenger_count',
 'payment_type',
 'pickup_datetime',
 'store_and_fwd_flag',
 'tip_amount',
 'tolls_amount',
 'total_amount',
 'trip_distance'}

In [71]:
common_colums = []

yellow_columns = set(df_yellow.columns)

for col in df_green.columns:
    if col in yellow_columns:
        common_colums.append(col)

In [72]:
common_colums

['VendorID',
 'pickup_datetime',
 'dropoff_datetime',
 'store_and_fwd_flag',
 'RatecodeID',
 'PULocationID',
 'DOLocationID',
 'passenger_count',
 'trip_distance',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount',
 'payment_type',
 'congestion_surcharge']

In [73]:
from pyspark.sql import functions as F

df_green_sel = df_green \
    .select(common_colums) \
    .withColumn('service_type', F.lit('green'))

df_yellow_sel = df_yellow \
    .select(common_colums) \
    .withColumn('service_type', F.lit('yellow'))

In [74]:
df_green_sel

DataFrame[VendorID: int, pickup_datetime: timestamp, dropoff_datetime: timestamp, store_and_fwd_flag: string, RatecodeID: int, PULocationID: int, DOLocationID: int, passenger_count: int, trip_distance: double, fare_amount: double, extra: double, mta_tax: double, tip_amount: double, tolls_amount: double, improvement_surcharge: double, total_amount: double, payment_type: int, congestion_surcharge: double, service_type: string]

In [75]:
df_green_sel.registerTempTable('green')



In [76]:
df_yellow_sel.registerTempTable('yellow')

In [62]:
spark.sql("""
SELECT
   *
FROM
    green
""").show()

+--------+---------------+----------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------------------+------------+------------+--------------------+------------+
|VendorID|pickup_datetime|dropoff_datetime|store_and_fwd_flag|RatecodeID|PULocationID|DOLocationID|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|payment_type|congestion_surcharge|service_type|
+--------+---------------+----------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------------------+------------+------------+--------------------+------------+
|    NULL|           NULL|            NULL|              NULL|      NULL|        NULL|        NULL|           NULL|         NULL|       NULL| NULL|   NULL|      NULL|        NULL|                 NULL|        NULL|

In [77]:
df_trips_data = df_green_sel.unionAll(df_yellow_sel)

In [78]:
df_trips_data.groupBy('service_type').count().show()

+------------+-----+
|service_type|count|
+------------+-----+
|       green|   24|
|      yellow|   24|
+------------+-----+



In [79]:
df_trips_data.registerTempTable('trips_data')

In [47]:
spark.sql("""
SELECT
    service_type,
    count(1)
FROM
    trips_data
GROUP BY 
    service_type
""").show()

+------------+--------+
|service_type|count(1)|
+------------+--------+
|       green|      24|
|      yellow|      24|
+------------+--------+



In [80]:
spark.sql("""
SELECT
    count(*) AS total_trips
FROM
    trips_data
WHERE pickup_datetime BETWEEN '2019-10-15 00:00:00' AND '2019-10-15 23:59:59'
""").show()

+-----------+
|total_trips|
+-----------+
|          0|
+-----------+



In [49]:
spark.sql("""
SELECT
   *
FROM
    trips_data
""").show()

+--------+---------------+----------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------------------+------------+------------+--------------------+------------+
|VendorID|pickup_datetime|dropoff_datetime|store_and_fwd_flag|RatecodeID|PULocationID|DOLocationID|passenger_count|trip_distance|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|payment_type|congestion_surcharge|service_type|
+--------+---------------+----------------+------------------+----------+------------+------------+---------------+-------------+-----------+-----+-------+----------+------------+---------------------+------------+------------+--------------------+------------+
|    NULL|           NULL|            NULL|              NULL|      NULL|        NULL|        NULL|           NULL|         NULL|       NULL| NULL|   NULL|      NULL|        NULL|                 NULL|        NULL|

In [50]:
df_result = spark.sql("""
SELECT 
    -- Reveneue grouping 
    PULocationID AS revenue_zone,
    date_trunc('month', pickup_datetime) AS revenue_month, 
    service_type, 

    -- Revenue calculation 
    SUM(fare_amount) AS revenue_monthly_fare,
    SUM(extra) AS revenue_monthly_extra,
    SUM(mta_tax) AS revenue_monthly_mta_tax,
    SUM(tip_amount) AS revenue_monthly_tip_amount,
    SUM(tolls_amount) AS revenue_monthly_tolls_amount,
    SUM(improvement_surcharge) AS revenue_monthly_improvement_surcharge,
    SUM(total_amount) AS revenue_monthly_total_amount,
    SUM(congestion_surcharge) AS revenue_monthly_congestion_surcharge,

    -- Additional calculations
    AVG(passenger_count) AS avg_montly_passenger_count,
    AVG(trip_distance) AS avg_montly_trip_distance
FROM
    trips_data
GROUP BY
    1, 2, 3
""")

In [51]:
df_result

DataFrame[revenue_zone: int, revenue_month: timestamp, service_type: string, revenue_monthly_fare: double, revenue_monthly_extra: double, revenue_monthly_mta_tax: double, revenue_monthly_tip_amount: double, revenue_monthly_tolls_amount: double, revenue_monthly_improvement_surcharge: double, revenue_monthly_total_amount: double, revenue_monthly_congestion_surcharge: double, avg_montly_passenger_count: double, avg_montly_trip_distance: double]

In [57]:
df_result.write.mode("overwrite").parquet('D:/data_engenering/week4/data/report/revenue')

In [58]:
df_result.coalesce(1).write.mode("overwrite").parquet('D:/data_engenering/week4/data/report/revenue', mode='overwrite')

In [65]:
df_green_revenue = spark.sql("""
SELECT 
    date_trunc('hour', pickup_datetime) AS hour, 
    PULocationID AS zone,

    SUM(total_amount) AS amount,
    COUNT(1) AS number_records
FROM
    green
WHERE
    pickup_datetime >= '2020-01-01 00:00:00'
GROUP BY
    1, 2  
""")

In [73]:
df_yellow_revenue = spark.sql("""
SELECT 
    date_trunc('hour', pickup_datetime) AS hour, 
    PULocationID AS zone,

    SUM(total_amount) AS amount,
    COUNT(1) AS number_records
FROM
    yellow
WHERE
    pickup_datetime >= '2020-01-01 00:00:00'
GROUP BY
    1, 2  
""")

In [None]:
df_green_revenue = spark.sql("""
SELECT 
    date_trunc('hour', pickup_datetime) AS hour, 
    PULocationID AS zone,

    SUM(total_amount) AS amount,
    COUNT(1) AS number_records
FROM
    green
WHERE
    pickup_datetime >= '2020-01-01 00:00:00'
GROUP BY
    1, 2  
""")

In [66]:
df_green_revenue.show()

+----+----+------+--------------+
|hour|zone|amount|number_records|
+----+----+------+--------------+
+----+----+------+--------------+



### Join

In [74]:
df_green_revenue_tmp = df_green_revenue \
    .withColumnRenamed('amount', 'green_amount') \
    .withColumnRenamed('number_records', 'green_number_records')

df_yellow_revenue_tmp = df_yellow_revenue \
    .withColumnRenamed('amount', 'yellow_amount') \
    .withColumnRenamed('number_records', 'yellow_number_records')

In [75]:
df_join = df_green_revenue_tmp.join(df_yellow_revenue_tmp, on=['hour', 'zone'], how='outer')

In [76]:
df_join.show()

+----+----+------------+--------------------+-------------+---------------------+
|hour|zone|green_amount|green_number_records|yellow_amount|yellow_number_records|
+----+----+------------+--------------------+-------------+---------------------+
+----+----+------------+--------------------+-------------+---------------------+



### Homework

In [81]:

spark = SparkSession.builder \
    .master("local[*]") \
    .appName('process-taxi-data') \
    .getOrCreate()

# Q1 spark version
print(f"Spark Version - {spark.version}")


Spark Version - 3.5.0


In [78]:
!wget  https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhv/fhv_tripdata_2019-10.csv.gz

--2024-03-03 01:19:26--  https://github.com/DataTalksClub/nyc-tlc-data/releases/download/fhv/fhv_tripdata_2019-10.csv.gz
Resolving github.com (github.com)... 140.82.121.4
Connecting to github.com (github.com)|140.82.121.4|:443... connected.
HTTP request sent, awaiting response... 302 Found
Location: https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/efdfcf82-6d5c-44d1-a138-4e8ea3c3a3b6?X-Amz-Algorithm=AWS4-HMAC-SHA256&X-Amz-Credential=AKIAVCODYLSA53PQK4ZA%2F20240302%2Fus-east-1%2Fs3%2Faws4_request&X-Amz-Date=20240302T221925Z&X-Amz-Expires=300&X-Amz-Signature=87dbcad51f5969307138c17b3cde72e14f8d2ccc7442913a39125062a164aa71&X-Amz-SignedHeaders=host&actor_id=0&key_id=0&repo_id=513814948&response-content-disposition=attachment%3B%20filename%3Dfhv_tripdata_2019-10.csv.gz&response-content-type=application%2Foctet-stream [following]
--2024-03-03 01:19:27--  https://objects.githubusercontent.com/github-production-release-asset-2e65be/513814948/efdfcf82-6d5c-

In [82]:

schema = types.StructType([
    types.StructField('hvfhs_license_num', types.StringType(), True),
    types.StructField('dispatching_base_num', types.StringType(), True),
    types.StructField('pickup_datetime', types.TimestampType(), True),
    types.StructField('dropoff_datetime', types.TimestampType(), True),
    types.StructField('PULocationID', types.IntegerType(), True),
    types.StructField('DOLocationID', types.IntegerType(), True),
    types.StructField('SR_Flag', types.StringType(), True),
    types.StructField('Affiliated_base_number', types.StringType(), True)
])
#  Load the dataframe
file_path = 'D:/data_engenering/week4/fhvhv/2019/fhv_tripdata_2019-10.csv'
print(f"Reading - {file_path}")

df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv(file_path)

#  Partition the data frame
folder_path = 'D:/data_engenering/week4/fhvhv/partitions'
print(f"Creating partitions - 6 folder {folder_path}")
df.head()
df = df.repartition(6)
df.write.mode('overwrite').parquet(folder_path, compression='gzip')


Reading - D:/data_engenering/week4/fhvhv/2019/fhv_tripdata_2019-10.csv
Creating partitions - 6 folder D:/data_engenering/week4/fhvhv/partitions


In [83]:
!dir /S /B D:\data_engenering\week4\fhvhv\partitions\*.parquet > D:/data_engenering/week4/fhvhv/size.log

In [84]:
!dir /S /B D:\data_engenering\week4\fhvhv\partitions\*.parquet

D:\data_engenering\week4\fhvhv\partitions\part-00000-6358915c-23b3-4fbd-a2cb-ea6c0aa4856e-c000.gz.parquet
D:\data_engenering\week4\fhvhv\partitions\part-00001-6358915c-23b3-4fbd-a2cb-ea6c0aa4856e-c000.gz.parquet
D:\data_engenering\week4\fhvhv\partitions\part-00002-6358915c-23b3-4fbd-a2cb-ea6c0aa4856e-c000.gz.parquet
D:\data_engenering\week4\fhvhv\partitions\part-00003-6358915c-23b3-4fbd-a2cb-ea6c0aa4856e-c000.gz.parquet
D:\data_engenering\week4\fhvhv\partitions\part-00004-6358915c-23b3-4fbd-a2cb-ea6c0aa4856e-c000.gz.parquet
D:\data_engenering\week4\fhvhv\partitions\part-00005-6358915c-23b3-4fbd-a2cb-ea6c0aa4856e-c000.gz.parquet


In [85]:
import os

# Path to the directory containing Parquet files
directory = 'D:/data_engenering/week4/fhvhv/partitions'

# List all Parquet files in the directory
parquet_files = [file for file in os.listdir(directory) if file.endswith('.parquet')]

total_size_bytes = 0

# Iterate over each Parquet file
for file in parquet_files:
    # Get the file path
    file_path = os.path.join(directory, file)
    
    # Get the size of the file in bytes
    file_size_bytes = os.path.getsize(file_path)
    
    # Add the file size to the total siz
    total_size_bytes += file_size_bytes

# Calculate the average size in MB
average_size_mb = total_size_bytes / len(parquet_files) / (1024 * 1024)

print(f'Average size of Parquet files: {average_size_mb:.2f} MB')


Average size of Parquet files: 4.31 MB


In [86]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Initialize SparkSession
spark = SparkSession.builder \
    .appName("TaxiTripCount") \
    .getOrCreate()

# Path to the directory containing Parquet files
directory = 'D:/data_engenering/week4/fhvhv/partitions'

# Read Parquet files into DataFrame
df = spark.read.parquet(directory)

# Filter data to include only trips on the 15th of October
df_filtered = df.filter(col("pickup_datetime").between("2019-10-15 00:00:00", "2019-10-15 23:59:59"))

# Count the number of records
trip_count = df_filtered.count()

print(f"Number of taxi trips on the 15th of October: {trip_count}")


Number of taxi trips on the 15th of October: 62295


In [87]:
pickup_dt = '2019-10-15'
from pyspark.sql import functions as F

# Convert pickup_datetime to date
df = df.withColumn('pickup_date', F.to_date(df.pickup_datetime))

# Filter data for the given pickup date and count the number of records
trip_count = df.filter(F.col('pickup_date') == pickup_dt).count()

print(f"Number of taxi trips on {pickup_dt}: {trip_count}")


Number of taxi trips on 2019-10-15: 62295


In [88]:
# using SQL syntax
df.createOrReplaceTempView('fhvhv_tripdata')
pickup_dt = '2019-10-15'

spark.sql(f"""
SELECT
    COUNT(1)
FROM 
    fhvhv_tripdata
WHERE
    to_date(pickup_datetime) = '{pickup_dt}'
""").show()


+--------+
|count(1)|
+--------+
|   62295|
+--------+



In [90]:
spark.sql("""
SELECT
    count(*) AS total_trips
FROM
    fhvhv_tripdata
WHERE pickup_datetime BETWEEN '2019-10-15 00:00:00' AND '2019-10-15 23:59:59'
""").show()

+-----------+
|total_trips|
+-----------+
|      62295|
+-----------+



In [37]:
spark.sql(f"""
SELECT
    *
FROM 
    fhvhv_tripdata
WHERE
    to_date(dropoff_datetime) is not null
""").show()


+-----------------+--------------------+---------------+----------------+------------+------------+-------+----------------------+-----------+
|hvfhs_license_num|dispatching_base_num|pickup_datetime|dropoff_datetime|PULocationID|DOLocationID|SR_Flag|Affiliated_base_number|pickup_date|
+-----------------+--------------------+---------------+----------------+------------+------------+-------+----------------------+-----------+
+-----------------+--------------------+---------------+----------------+------------+------------+-------+----------------------+-----------+



In [33]:
df.columns
['hvfhs_license_num',
 'dispatching_base_num',
 'pickup_datetime',
 'dropoff_datetime',
 'PULocationID',
 'DOLocationID',
 'SR_Flag']
df \
    .withColumn('duration', (df.dropoff_datetime.cast('long') - df.pickup_datetime.cast('long'))/( 60 * 60 )) \
    .withColumn('pickup_date', F.to_date(df.pickup_datetime)) \
    .groupBy('pickup_date') \
        .max('duration') \
    .orderBy('max(duration)', ascending=False) \
    .limit(5) \
    .show()

+-----------+-------------+
|pickup_date|max(duration)|
+-----------+-------------+
| 2019-11-01|         NULL|
| 2019-10-05|         NULL|
| 2019-10-24|         NULL|
| 2019-10-01|         NULL|
| 2019-10-22|         NULL|
+-----------+-------------+

+-----------+--------+
|pickup_date|duration|
+-----------+--------+
| 2019-11-01|    NULL|
| 2019-10-05|    NULL|
| 2019-10-24|    NULL|
| 2019-10-01|    NULL|
| 2019-10-22|    NULL|
+-----------+--------+



In [42]:
spark.sql("""
SELECT
    to_date(pickup_datetime) AS pickup_date,
    MAX((UNIX_TIMESTAMP(COALESCE(dropoff_datetime, '2019-10-15 00:00:00')) - UNIX_TIMESTAMP(pickup_datetime)) / (60 * 60)) AS duration
FROM 
    fhvhv_tripdata
GROUP BY
    1
ORDER BY
    2 DESC
LIMIT 5;
""").show()

+-----------+------------------+
|pickup_date|          duration|
+-----------+------------------+
| 2019-10-01| 335.9661111111111|
| 2019-10-02|311.99833333333333|
| 2019-10-03|             288.0|
| 2019-10-04|             264.0|
| 2019-10-05|             240.0|
+-----------+------------------+



In [44]:
schema = types.StructType([
    types.StructField('LocationID', types.IntegerType(), True),
    types.StructField('Borough', types.StringType(), True),
    types.StructField('Zone', types.StringType(), True),
    types.StructField('service_zone', types.StringType(), True)
])
#  Load the dataframe
file_path = 'D:/data_engenering/week4/fhvhv/2019/taxi_zone_lookup.csv'
print(f"Reading - {file_path}")

df = spark.read \
    .option("header", "true") \
    .schema(schema) \
    .csv(file_path)

#  Partition the data frame
folder_path = 'D:/data_engenering/week4/fhvhv/zons'
print(f"Creating partitions - 6 folder {folder_path}")
df.head()
df = df.repartition(6)
df.write.mode('overwrite').parquet(folder_path, compression='gzip')


Reading - D:/data_engenering/week4/fhvhv/2019/taxi_zone_lookup.csv
Creating partitions - 6 folder D:/data_engenering/week4/fhvhv/zons


In [45]:
# Initialize SparkSession
spark = SparkSession.builder \
    .appName("zones_data") \
    .getOrCreate()

# Path to the directory containing 
directory = 'D:/data_engenering/week4/fhvhv/zons'

# Read Parquet files into DataFrame
df = spark.read.parquet(directory)

In [47]:
df.createOrReplaceTempView('zones_data')

In [55]:
spark.sql("""
SELECT
   Zone
FROM 
    zones_data  
GROUP BY 
    1
""").show()

#East Chelsea

#Jamaica Bay

#Union Sq

#Crown Heights North

+--------------------+
|                Zone|
+--------------------+
|Governor's Island...|
|         Westerleigh|
|Charleston/Totten...|
|Heartland Village...|
|       Dyker Heights|
|     Jackson Heights|
|             Bayside|
|      Yorkville West|
|Flushing Meadows-...|
|Riverdale/North R...|
|  Stuyvesant Heights|
|Upper West Side N...|
|Upper East Side N...|
|       Prospect Park|
|       Starrett City|
|Long Island City/...|
|        Bloomingdale|
|        Midtown East|
|Downtown Brooklyn...|
|Saint George/New ...|
+--------------------+
only showing top 20 rows



In [59]:
from pyspark.sql.functions import max

# Group by 'Zone' and calculate the maximum duration for each zone
max_duration_per_zone = df.groupBy('Zone')

# Order by max duration in descending order, limit to top 5, and show the result
#max_duration_per_zone.orderBy('max_duration', ascending=False).limit(5).show()


In [60]:
max_duration_per_zone

GroupedData[grouping expressions: [Zone], value: [LocationID: int, Borough: string ... 2 more fields], type: GroupBy]

In [62]:
spark.sql("""
SELECT
   pul.Zone,
   COUNT(1) as Total
FROM 
    fhvhv_tripdata fhv 
    INNER JOIN zones_data pul ON fhv.PULocationID = pul.LocationID  
WHERE pul.Zone = "East Chelsea"
GROUP BY 
    1
""").show()

+------------+-----+
|        Zone|Total|
+------------+-----+
|East Chelsea| 2391|
+------------+-----+



In [63]:
spark.sql("""
SELECT
   pul.Zone,
   COUNT(1) as Total
FROM 
    fhvhv_tripdata fhv 
    INNER JOIN zones_data pul ON fhv.PULocationID = pul.LocationID  
WHERE pul.Zone = "Jamaica Bay"
GROUP BY 
    1
""").show()

+-----------+-----+
|       Zone|Total|
+-----------+-----+
|Jamaica Bay|   14|
+-----------+-----+



In [64]:
spark.sql("""
SELECT
   pul.Zone,
   COUNT(1) as Total
FROM 
    fhvhv_tripdata fhv 
    INNER JOIN zones_data pul ON fhv.PULocationID = pul.LocationID  
WHERE pul.Zone = "Union Sq"
GROUP BY 
    1
""").show()

+--------+-----+
|    Zone|Total|
+--------+-----+
|Union Sq| 2102|
+--------+-----+



In [65]:
spark.sql("""
SELECT
   pul.Zone,
   COUNT(1) as Total
FROM 
    fhvhv_tripdata fhv 
    INNER JOIN zones_data pul ON fhv.PULocationID = pul.LocationID  
WHERE pul.Zone = "Crown Heights North"
GROUP BY 
    1
""").show()

+-------------------+-----+
|               Zone|Total|
+-------------------+-----+
|Crown Heights North|15701|
+-------------------+-----+

