📌 Логіка обробки у Spark job

Spark job виконує наступні кроки:

✅ 1. Фільтрація та очищення даних.

* Видалення поїздок з нульовою або від’ємною відстанню (trip_distance <= 0).
* Відсіювання аномальних поїздок з тарифом ≤ 0 або > 500.
* Відсіювання записів з некоректною кількістю пасажирів (0 або більше 6).

✅ 2. Стандартизація колонок та форматів

* Уніфікація назв колонок до snake_case.
* Конвертація дат у формат timestamp.
* Обчислення додаткових полів:
  * Тривалість поїздки у хвилинах (trip_duration_minutes).
  * Година доби початку поїздки (pickup_hour).

✅ 3. Збагачення географічними ознаками

* Об’єднання з довідником зон за PULocationID та DOLocationID:
  * Отримання назв районів (наприклад, pickup_borough, dropoff_borough).

✅ 4. Запис результатів у PostgreSQL

* Основний датафрейм записується у таблицю yellow_taxi_stage, green_taxi_stage, taxi_lookup_zone_stage
* Формат завантаження: overwrite, залежно від налаштувань.

In [5]:
from pyspark.sql import SparkSession


spark = (
    SparkSession.builder
    .appName("lavreniuk-hw6")
    .config("spark.jars", "./artifacts/postgresql-42.7.7.jar")
    .getOrCreate()
)


25/09/19 23:30:58 WARN Utils: Your hostname, Air-M4.local resolves to a loopback address: 127.0.0.1; using 172.20.10.2 instead (on interface en0)
25/09/19 23:30:58 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
25/09/19 23:30:58 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [1]:
import os

FILES_PATH = "./unified_data/"

GREEN_TAXI_DIR_PATH = os.path.join(FILES_PATH, "green_taxi")
YELLOW_TAXI_DIR_PATH = os.path.join(FILES_PATH, "yellow_taxi")

In [16]:
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, LongType, TimestampNTZType, IntegerType, TimestampType

common_schema = StructType([
    StructField("VendorID", LongType(), True),
    StructField("PULocationID", LongType(), True), # green
    StructField("trip_distance", DoubleType(), True),
    StructField("store_and_fwd_flag", StringType(), True),
    StructField("DOLocationID", LongType(), True),
    StructField("fare_amount", DoubleType(), True),
    StructField("extra", DoubleType(), True),
    StructField("mta_tax", DoubleType(), True),
    StructField("tip_amount", DoubleType(), True),
    StructField("tolls_amount", DoubleType(), True),
    StructField("improvement_surcharge", DoubleType(), True),
    StructField("total_amount", DoubleType(), True),
    StructField("congestion_surcharge", DoubleType(), True),  # Use Double here
])

yellow_schema = StructType([
    StructField("RatecodeID", LongType(), True),
    StructField("payment_type", LongType(), True),
    StructField("passenger_count", DoubleType(), True),
    StructField("tpep_pickup_datetime", TimestampType(), True),
    StructField("tpep_dropoff_datetime", TimestampType(), True),
    StructField("airport_fee", DoubleType(), True),
    StructField("trip_type", DoubleType(), True),
    *common_schema,
])

green_schema = StructType([
    StructField("RatecodeID", LongType(), True),
    StructField("payment_type", LongType(), True),
    StructField("passenger_count", DoubleType(), True),
    StructField("lpep_pickup_datetime", TimestampType(), True),
    StructField("lpep_dropoff_datetime", TimestampType(), True),
    StructField("ehail_fee", DoubleType(), True),
    *common_schema,
])


In [17]:
green_df = (
    spark.read.options(
        mergeSchema="true",
        recursiveFileLookup="true",
    )
    .schema(green_schema)
    .parquet(GREEN_TAXI_DIR_PATH)
    .withColumnRenamed("lpep_pickup_datetime", "pickup_datetime")
    .withColumnRenamed("lpep_dropoff_datetime","dropoff_datetime")
    .withColumnRenamed("ehail_fee", "fee")
)

yellow_df = (
    spark.read.options(
        mergeSchema="true",
        recursiveFileLookup="true",
    )
    .schema(yellow_schema)
    .parquet(YELLOW_TAXI_DIR_PATH)
    .withColumnRenamed("tpep_pickup_datetime", "pickup_datetime")
    .withColumnRenamed("tpep_dropoff_datetime","dropoff_datetime")
)

In [18]:
# * Видалення поїздок з нульовою або від’ємною відстанню (trip_distance <= 0).
# * Відсіювання аномальних поїздок з тарифом ≤ 0 або > 500.
# * Відсіювання записів з некоректною кількістю пасажирів (0 або більше 6).

filter_query = """
    SELECT * FROM {df}
    WHERE 
        trip_distance > 0
        AND total_amount > 0
        AND total_amount <= 500
        AND passenger_count BETWEEN 0 AND 6;   
"""
yellow_filtered_df = spark.sql(filter_query, df=yellow_df)
green_filtered_df = spark.sql(filter_query, df=green_df)

yellow_filtered_df.show(5)
green_filtered_df.show(5)

+----------+------------+---------------+-------------------+-------------------+-----------+---------+--------+------------+-------------+------------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+
|RatecodeID|payment_type|passenger_count|    pickup_datetime|   dropoff_datetime|airport_fee|trip_type|VendorID|PULocationID|trip_distance|store_and_fwd_flag|DOLocationID|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|
+----------+------------+---------------+-------------------+-------------------+-----------+---------+--------+------------+-------------+------------------+------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+
|         1|           1|            1.0|2020-02-01 02:17:35|2020-02-01 02:30:32|       null|     null|       1|         145|          2.6|                 N|        

In [19]:
# * Уніфікація назв колонок до snake_case.
# * Конвертація дат у формат timestamp. - Already done
# * Обчислення додаткових полів:
#   * Тривалість поїздки у хвилинах (trip_duration_minutes).
#   * Година доби початку поїздки (pickup_hour).

import re
from pyspark.sql.functions import unix_timestamp, hour, col
from pyspark.sql.functions import round as sp_round

def to_snake_case(name: str) -> str:
    name = re.sub("(.)([A-Z][a-z]+)", r"\1_\2", name)
    name = re.sub("([a-z0-9])([A-Z])", r"\1_\2", name)
    return name.lower()

additional_columns = {
    "pickup_hour": hour("pickup_datetime"),
    "trip_duration_minutes": sp_round(
        (unix_timestamp(col("dropoff_datetime")) - unix_timestamp(col("pickup_datetime"))) / 60,
        4
    )
}

yellow_snaked_df = (
    yellow_filtered_df.toDF(*[to_snake_case(c) for c in yellow_filtered_df.columns])
    .withColumns(additional_columns)
)

green_snaked_df = (
    green_filtered_df.toDF(*[to_snake_case(c) for c in green_filtered_df.columns])
    .withColumns(additional_columns)
)

In [8]:
yellow_snaked_df.show(5)

green_snaked_df.show(5)

+-----------+------------+---------------+-------------------+-------------------+-----------+---------+---------+--------------+-------------+------------------+--------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+---------------------+
|ratecode_id|payment_type|passenger_count|    pickup_datetime|   dropoff_datetime|airport_fee|trip_type|vendor_id|pu_location_id|trip_distance|store_and_fwd_flag|do_location_id|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|pickup_hour|trip_duration_minutes|
+-----------+------------+---------------+-------------------+-------------------+-----------+---------+---------+--------------+-------------+------------------+--------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+---------------------+
|          1|           1|              1|2020

In [9]:
# * Об’єднання з довідником зон за PULocationID та DOLocationID:
# * Отримання назв районів (наприклад, pickup_borough, dropoff_borough).

TAXI_ZONE_FILE_PATH = os.path.join(FILES_PATH, "taxi_zone_lookup.csv")
taxi_zone_lookup_df = spark.read.options(header="true").csv(TAXI_ZONE_FILE_PATH)
taxi_zone_lookup_df.show(5)

+----------+-------------+--------------------+------------+
|LocationID|      Borough|                Zone|service_zone|
+----------+-------------+--------------------+------------+
|         1|          EWR|      Newark Airport|         EWR|
|         2|       Queens|         Jamaica Bay|   Boro Zone|
|         3|        Bronx|Allerton/Pelham G...|   Boro Zone|
|         4|    Manhattan|       Alphabet City| Yellow Zone|
|         5|Staten Island|       Arden Heights|   Boro Zone|
+----------+-------------+--------------------+------------+
only showing top 5 rows


In [10]:
from pyspark.sql.functions import broadcast
from pyspark.sql import DataFrame

def join_zones(df: DataFrame, zone_df: DataFrame) -> DataFrame:
    return (
        df.join(
            broadcast(
                zone_df.select(
                    col("Borough").alias("pickup_borough"),
                    col("LocationID").alias("pu_location_id"),
                )
            ),
            how="left",
            on="pu_location_id"
        ).join(
            broadcast(
                zone_df.select(
                    col("Borough").alias("dropoff_borough"),
                    col("LocationID").alias("do_location_id"),
                )
            ),
            how="left",
            on="do_location_id" 
        )
    )

yellow_taxi_stage_df = join_zones(yellow_snaked_df, taxi_zone_lookup_df)
green_taxi_stage_df = join_zones(green_snaked_df, taxi_zone_lookup_df)
    

yellow_taxi_stage_df.show(5)
green_taxi_stage_df.show(5)

+--------------+--------------+-----------+------------+---------------+-------------------+-------------------+-----------+---------+---------+-------------+------------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+---------------------+--------------+---------------+
|do_location_id|pu_location_id|ratecode_id|payment_type|passenger_count|    pickup_datetime|   dropoff_datetime|airport_fee|trip_type|vendor_id|trip_distance|store_and_fwd_flag|fare_amount|extra|mta_tax|tip_amount|tolls_amount|improvement_surcharge|total_amount|congestion_surcharge|pickup_hour|trip_duration_minutes|pickup_borough|dropoff_borough|
+--------------+--------------+-----------+------------+---------------+-------------------+-------------------+-----------+---------+---------+-------------+------------------+-----------+-----+-------+----------+------------+---------------------+------------+--------------------+-----------+-------

In [None]:
# * Основний датафрейм записується у таблицю yellow_taxi_stage, green_taxi_stage, taxi_lookup_zone_stage
# * Формат завантаження: overwrite, залежно від налаштувань.

DB_USER = "airflow"
DB_PASSWORD = "airflow"
JDBC_URL = "jdbc:postgresql://localhost:8432/hw6"

def write_psql(
        writeable_df: DataFrame,
        *,
        table: str,
        connection_str: str,
        username: str,
        password: str,
        mode: str = "overwrite"
) -> None:
    properties = {"user":username, "password":password, "driver": "org.postgresql.Driver"}
    writeable_df.write.jdbc(url=connection_str, table=table, properties=properties, mode=mode)

write_psql(yellow_taxi_stage_df.limit(20000), table="yellow_taxi_stage", connection_str=JDBC_URL, username=DB_USER, password=DB_PASSWORD)
write_psql(green_taxi_stage_df.limit(20000), table="green_taxi_stage", connection_str=JDBC_URL, username=DB_USER, password=DB_PASSWORD)
write_psql(taxi_zone_lookup_df.toDF(*[to_snake_case(c) for c in taxi_zone_lookup_df.columns]), table="taxi_lookup_zone_stage", connection_str=JDBC_URL, username=DB_USER, password=DB_PASSWORD)



                                                                                