In [1]:
import pyspark
from pyspark.sql import SparkSession, Window
from pyspark.sql import types
from pyspark.sql import functions as F
from pyspark.sql.functions import col
from pyspark.sql.window import Window
import os
import argparse

In [2]:
parser = argparse.ArgumentParser(description='Datetime data for transformation')
parser.add_argument('--year', type=int, required=True)
parser.add_argument('--month', type=int, required=True)

args = parser.parse_args()

usage: ipykernel_launcher.py [-h] --year YEAR --month MONTH
ipykernel_launcher.py: error: the following arguments are required: --year, --month


SystemExit: 2

  warn("To exit: use 'exit', 'quit', or Ctrl-D.", stacklevel=1)


In [3]:
year = args.year
month = args.month
catalog = 'nessie'
namespace = 'nyc_project_db'

NameError: name 'args' is not defined

In [4]:
s3_path = 's3a://nyc-project/raw-data/'

In [5]:
spark = SparkSession.builder \
    .master('local[*]') \
    .appName('data_transformation') \
    .getOrCreate()

sc = spark.sparkContext

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
24/12/24 17:51:04 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [102]:
# Create a new schema with less memory formats
new_schema = types.StructType([
    types.StructField('VendorID', types.IntegerType(), True), 
    types.StructField('tpep_pickup_datetime', types.TimestampType(), True), 
    types.StructField('tpep_dropoff_datetime', types.TimestampType(), True), 
    types.StructField('passenger_count', types.IntegerType(), True), 
    types.StructField('trip_distance', types.FloatType(), True), 
    types.StructField('RatecodeID', types.IntegerType(), True), 
    types.StructField('store_and_fwd_flag', types.StringType(), True), 
    types.StructField('PULocationID', types.IntegerType(), True), 
    types.StructField('DOLocationID', types.IntegerType(), True), 
    types.StructField('payment_type', types.IntegerType(), True), 
    types.StructField('fare_amount', types.FloatType(), True), 
    types.StructField('extra', types.FloatType(), True), 
    types.StructField('mta_tax', types.FloatType(), True), 
    types.StructField('tip_amount', types.FloatType(), True), 
    types.StructField('tolls_amount', types.FloatType(), True), 
    types.StructField('improvement_surcharge', types.FloatType(), True), 
    types.StructField('total_amount', types.FloatType(), True), 
    types.StructField('congestion_surcharge', types.FloatType(), True), 
    types.StructField('airport_fee', types.IntegerType(), True)])

In [103]:
# Read the raw data
df = spark.read.format('parquet').load(f'{s3_path}/2019/01/*.parquet')

In [104]:
old_schema = df.schema

In [105]:
# Get the dataframe in the new schema
for old_field, new_field in zip(old_schema.fields, new_schema.fields):
    df = df.withColumn(new_field.name, col(old_field.name).cast(new_field.dataType))

In [106]:
# Rename all the columns
df = df.withColumnRenamed('VendorID', 'vendor_id') \
    .withColumnRenamed('RatecodeID', 'ratecode_id') \
    .withColumnRenamed('payment_type', 'payment_type_id') \
    .withColumnRenamed('tpep_pickup_datetime', 'pickup_datetime') \
    .withColumnRenamed('tpep_dropoff_datetime', 'dropoff_datetime') \
    .withColumnRenamed('PULocationID', 'pickup_location_id') \
    .withColumnRenamed('DOLocationID', 'dropoff_location_id')

In [107]:
# Remove the data which doesn't make sense
df = df.filter((col('fare_amount') > 0) \
               & (col('trip_distance') > 0) \
               & (col('extra') > 0))

# Ratecode ID cannot be more than 6
df = df.filter((col('ratecode_id') <= 6))

# Replace all the Null values with 0
df = df.withColumn('congestion_surcharge', F.when(col('congestion_surcharge').isNull(), 0).otherwise(col('congestion_surcharge')))
df = df.withColumn('airport_fee', F.when(col('airport_fee').isNull(), 0).otherwise(col('airport_fee')))

In [108]:
# Repalce 0 passenger count with median

window_spec = Window.orderBy('passenger_count')

df_rn = df.select(['passenger_count']).withColumn('rn', F.row_number().over(window_spec))
total_rows = df.count()
                                                         
if total_rows % 2 == 0:
    lower_mid = total_rows // 2
    upper_mid = lower_mid + 1
else:
    lower_mid = total_rows // 2 + 1
    upper_mid = lower_mid

median_df = df_rn.filter((col('rn') == lower_mid) | (col('rn') == upper_mid))

median_value = median_df.agg(F.avg(col('passenger_count'))).collect()[0][0]

df = df.withColumn('passenger_count', F.when(col('passenger_count') == 0, median_value).otherwise(col('passenger_count')))

24/12/15 16:18:59 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/12/15 16:18:59 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/12/15 16:19:00 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
24/12/15 16:19:00 WARN WindowExec: No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.
                                                                                

In [109]:
# Create unique ID on the basis of timestamp

def index_id(date_column):
    year = F.year(date_column)
    month = F.lpad(F.month(date_column).cast("string"), 2, "0")
    day = F.lpad(F.dayofmonth(date_column).cast("string"), 2, "0")
    hour = F.lpad(F.hour(date_column).cast("string"), 2, "0")
    minute = F.lpad(F.minute(date_column).cast("string"), 2, "0")
    second = F.lpad(F.second(date_column).cast("string"), 2, "0")
    index = F.concat(year, month, day, hour, minute, second)
    return index.cast('long')

In [110]:
# Create Dimension Table for Pickup Datetime
pickup_datetime_dim = df.select(['pickup_datetime']) \
    .distinct() \
    .withColumn('pickup_datetime_id', index_id(col('pickup_datetime'))) \
    .withColumn('pickup_hour', F.hour(col('pickup_datetime'))) \
    .withColumn('pickup_day', F.dayofmonth(col('pickup_datetime'))) \
    .withColumn('pickup_month', F.month(col('pickup_datetime'))) \
    .withColumn('pickup_year', F.year(col('pickup_datetime'))) \
    .withColumn('pickup_weekday', F.date_format(col('pickup_datetime'), 'EEEE'))

pickup_datetime_dim = pickup_datetime_dim.select(
    'pickup_datetime_id',
    'pickup_datetime',
    'pickup_hour',
    'pickup_day',
    'pickup_month',
    'pickup_year',
    'pickup_weekday'
)

In [111]:
spark.sql(f"CREATE NAMESPACE IF NOT EXISTS {catalog}.{namespace}")

DataFrame[]

In [112]:
# Create the new pickup datetime table

columns_name = []
pickup_datetime_schema = pickup_datetime_dim.schema
for field in pickup_datetime_schema.fields:
    columns_name.append(f'{field.name} {field.dataType.simpleString().upper()}')
columns_sql = ", ".join(columns_name)
spark.sql(f"""
    CREATE TABLE IF NOT EXISTS {catalog}.{namespace}.pickup_datetime_table (
    {columns_sql}
    )
    USING iceberg
    PARTITIONED BY (pickup_year)
""")

DataFrame[]

In [113]:
# Write the dataframe to Iceberg tables
pickup_datetime_dim.write.format('iceberg') \
    .mode('overwrite') \
    .partitionBy('pickup_year') \
    .save(f'{catalog}.{namespace}.pickup_datetime_table')

                                                                                

In [114]:
# Create the Dimension Table for Dropoff Datetime 
dropoff_datetime_dim = df.select(['dropoff_datetime']) \
    .distinct() \
    .withColumn('dropoff_datetime_id', index_id(col('dropoff_datetime'))) \
    .withColumn('dropoff_hour', F.hour(col('dropoff_datetime'))) \
    .withColumn('dropoff_day', F.dayofmonth(col('dropoff_datetime'))) \
    .withColumn('dropoff_month', F.month(col('dropoff_datetime'))) \
    .withColumn('dropoff_year', F.year(col('dropoff_datetime'))) \
    .withColumn('dropoff_weekday', F.date_format(col('dropoff_datetime'), 'EEEE'))

dropoff_datetime_dim = dropoff_datetime_dim.select(
    'dropoff_datetime_id',
    'dropoff_datetime',
    'dropoff_hour',
    'dropoff_day',
    'dropoff_month',
    'dropoff_year',
    'dropoff_weekday'
)

In [115]:
# Create the new pickup datetime table

columns_name = []
dropoff_datetime_schema = dropoff_datetime_dim.schema
for field in dropoff_datetime_schema.fields:
    columns_name.append(f'{field.name} {field.dataType.simpleString().upper()}')
columns_sql = ", ".join(columns_name)
spark.sql(f"""
    CREATE TABLE IF NOT EXISTS {catalog}.{namespace}.dropoff_datetime_table (
    {columns_sql}
    )
    USING iceberg
    PARTITIONED BY (dropoff_year)
""")

DataFrame[]

In [116]:
# Write the dataframe to iceberg tables
dropoff_datetime_dim.write.format('iceberg') \
    .mode('overwrite') \
    .partitionBy('dropoff_year') \
    .save(f'{catalog}.{namespace}.dropoff_datetime_table')

                                                                                

In [117]:
# Add the foreign key of dropp datetime table & pickup datetime table into fact tables
df = df.withColumn('dropoff_datetime_id', index_id(col('dropoff_datetime'))) \
    .withColumn('pickup_datetime_id', index_id(col('pickup_datetime')))

In [118]:
df = df.select(['vendor_id',
 'pickup_datetime_id',
 'dropoff_datetime_id',
 'pickup_location_id',
 'dropoff_location_id',
 'ratecode_id',
 'passenger_count',
 'trip_distance',
 'payment_type_id',
 'store_and_fwd_flag',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'congestion_surcharge',
 'airport_fee',
 'total_amount'])

In [119]:
# Create the Fact Table in Iceberg

columns_name = []
df_schema = df.schema
for field in df_schema.fields:
    columns_name.append(f'{field.name} {field.dataType.simpleString().upper()}')
columns_sql = ", ".join(columns_name)
spark.sql(f"""
    CREATE TABLE IF NOT EXISTS {catalog}.{namespace}.fact_table (
    {columns_sql}
    )
    USING iceberg
""")

DataFrame[]

In [120]:
# Write the Fact Dataframe to Iceberg
df.write.format('iceberg') \
    .mode('overwrite') \
    .save(f'{catalog}.{namespace}.fact_table')

                                                                                