In [1]:
import sys
import os
import warnings
import traceback
import logging
import time
import dotenv

dotenv.load_dotenv(".env")

utils_path = "/Users/benminh1201/Downloads/nyc_new/utils"
sys.path.append(utils_path)
from helpers import load_cfg
from minio_utils import MinIOClient

from pyspark.sql import SparkSession

# Create a SparkSession
spark = SparkSession.builder \
    .config("spark.driver.memory", "8g") \
    .config("spark.sql.extensions", "org.apache.iceberg.spark.extensions.IcebergSparkSessionExtensions") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.local", "org.apache.iceberg.spark.SparkCatalog") \
    .config("spark.sql.catalog.local.type", "hadoop") \
    .config("spark.sql.catalog.local.warehouse", "s3a://raw/demo") \
    .config("spark.sql.parquet.enableVectorizedReader", "false") \
    .config("spark.sql.parquet.writeLegacyFormat", "true") \
    .master("local[8]") \
    .getOrCreate()
# Get the SparkContext from the SparkSession
sc = spark.sparkContext
# Set the MinIO access key, secret key, endpoint, and other configurations
sc._jsc.hadoopConfiguration().set("fs.s3a.access.key", "benminh1201")
sc._jsc.hadoopConfiguration().set("fs.s3a.secret.key", "benminh1201")
sc._jsc.hadoopConfiguration().set("fs.s3a.endpoint", "http://localhost:9000")
sc._jsc.hadoopConfiguration().set("fs.s3a.path.style.access", "true")
sc._jsc.hadoopConfiguration().set("fs.s3a.connection.ssl.enabled", "false")
sc._jsc.hadoopConfiguration().set("fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")

25/04/18 18:40:51 WARN Utils: Your hostname, benminh1201.local resolves to a loopback address: 127.0.0.1; using 192.168.1.13 instead (on interface en0)
25/04/18 18:40:51 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/04/18 18:40:51 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [None]:
spark.catalog.tableExists("local.nyc.taxi")

In [None]:
from pyspark.sql.types import *

yellow_tripdata_schema = StructType([
    StructField("VendorID", LongType(), True),
    StructField("tpep_pickup_datetime", DateType(), True),
    StructField("tpep_dropoff_datetime", DateType(), True),
    StructField("passenger_count", IntegerType(), True),
    StructField("trip_distance", DoubleType(), True),
    StructField("RatecodeID", IntegerType(), True),
    StructField("store_and_fwd_flag", StringType(), True),
    StructField("PULocationID", IntegerType(), True),
    StructField("DOLocationID", IntegerType(), True),
    StructField("payment_type", IntegerType(), True),
    StructField("fare_amount", DoubleType(), True),
    StructField("extra", DoubleType(), True),
    StructField("mta_tax", DoubleType(), True),
    StructField("tip_amount", DoubleType(), True),
    StructField("tolls_amount", DoubleType(), True),
    StructField("improvement_surcharge", DoubleType(), True),
    StructField("total_amount", DoubleType(), True),
    StructField("congestion_surcharge", DoubleType(), True),
    StructField("airport_fee", DoubleType(), True)
])

In [2]:
df = spark.read.parquet("s3a://processed/iceberg/nyc/yellow_tripdata/data/")

25/04/18 18:41:03 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
                                                                                

In [3]:
df.count()

                                                                                

45103875

In [None]:
df2 = spark.read.parquet("s3a://raw/yellow_tripdata/2023/yellow_tripdata_2023-10.parquet",
                         schema=yellow_tripdata_schema)
df2.show(5, False)

In [None]:
df.printSchema()

In [3]:
df.show(10)

                                                                                

+---------+----------+-----------------+------------------+---------------+----------------+---------------+-------------+---------+------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+--------------------+-----------+---------+--------------------+-----------+------------+
|vendor_id|ratecodeid|pickup_locationId|dropoff_locationId|pickup_datetime|dropoff_datetime|passenger_count|trip_distance|trip_type|payment_type|fare_amount|extra|mta_tax|tip_amount|tolls_amount|ehail_fee|improvement_surcharge|total_amount|congestion_surcharge|airport_fee|taxi_type|             trip_id|pickup_year|pickup_month|
+---------+----------+-----------------+------------------+---------------+----------------+---------------+-------------+---------+------------+-----------+-----+-------+----------+------------+---------+---------------------+------------+--------------------+-----------+---------+--------------------+-----------+------------+
|        1

In [None]:
from pyspark.sql.functions import col, lit

df = df.select(
    # identifiers
    col("VendorID").cast("long").alias("vendor_id"),
    col("RatecodeID").cast("int").alias("ratecodeid"),
    col("PULocationID").cast("int").alias("pickup_locationId"),
    col("DOLocationID").cast("int").alias("dropoff_locationId"),
    # timestamps
    col("tpep_pickup_datetime")
    .cast("DATE")
    .alias("pickup_datetime"),
    col("tpep_dropoff_datetime")
    .cast("DATE")
    .alias("dropoff_datetime"),
    # trip info
    col("passenger_count").cast("double").alias("passenger_count"),
    col("trip_distance").cast("double").alias("trip_distance"),
    lit(1).alias("trip_type"),
    # payment info
    col("payment_type").cast("int").alias("payment_type"),
    col("fare_amount").cast("double").alias("fare_amount"),
    col("extra").cast("double").alias("extra"),
    col("mta_tax").cast("double").alias("mta_tax"),
    col("tip_amount").cast("double").alias("tip_amount"),
    col("tolls_amount").cast("double").alias("tolls_amount"),
    lit(0.0).alias("ehail_fee"),
    col("improvement_surcharge")
    .cast("double")
    .alias("improvement_surcharge"),
    col("total_amount").cast("double").alias("total_amount"),
    col("congestion_surcharge")
    .cast("double")
    .alias("congestion_surcharge"),
    col("airport_fee").cast("double").alias("airport_fee"),
)
df.show(5, False)

In [None]:
from pyspark.sql.functions import col, unix_timestamp, year

# get year
df = df.withColumn('pickup_year', year(col('tpep_pickup_datetime')))

In [None]:
from pyspark.sql.functions import col, isnan, when, count

df_Columns = ["airport_fee"]
df.select([count(when(isnan(c) | col(c).isNull(), c)).alias(c) for c in df_Columns]
          ).show()

In [None]:
from pyspark.sql.functions import col, lit

# Start with the original DataFrame
df = df.withColumn("vendor_id", col("VendorID").cast("long"))
df = df.withColumn("ratecodeid", col("RatecodeID").cast("double"))
df = df.withColumn("pickup_locationId", col("PULocationID").cast("int"))
df = df.withColumn("dropoff_locationId", col("DOLocationID").cast("int"))

# timestamps
df = df.withColumn("pickup_datetime", col("tpep_pickup_datetime").cast("timestamp"))
df = df.withColumn("dropoff_datetime", col("tpep_dropoff_datetime").cast("timestamp"))

# trip info
df = df.withColumn("passenger_count", col("passenger_count").cast("double"))
df = df.withColumn("trip_distance", col("trip_distance").cast("double"))
df = df.withColumn("trip_type", lit(1))

# payment info
df = df.withColumn("payment_type", col("payment_type").cast("int"))
df = df.withColumn("fare_amount", col("fare_amount").cast("double"))
df = df.withColumn("extra", col("extra").cast("double"))
df = df.withColumn("mta_tax", col("mta_tax").cast("double"))
df = df.withColumn("tip_amount", col("tip_amount").cast("double"))
df = df.withColumn("tolls_amount", col("tolls_amount").cast("double"))
df = df.withColumn("ehail_fee", lit(0.0))
df = df.withColumn("improvement_surcharge", col("improvement_surcharge").cast("double"))
df = df.withColumn("total_amount", col("total_amount").cast("double"))
df = df.withColumn("congestion_surcharge", col("congestion_surcharge").cast("double"))
df = df.withColumn("airport_fee", col("airport_fee").cast("double"))

# Drop the original columns that were renamed
columns_to_drop = ["VendorID", "RatecodeID", "PULocationID", "DOLocationID",
                   "tpep_pickup_datetime", "tpep_dropoff_datetime"]
df = df.drop(*columns_to_drop)

In [None]:
from pyspark.sql.functions import col, concat_ws, sha2, unix_timestamp, lit

df = df.select(
    # identifiers
    col("VendorID").cast("long").alias("vendor_id"),
    col("RatecodeID").cast("double").alias("ratecodeid"),
    col("PULocationID").cast("int").alias("pickup_locationId"),
    col("DOLocationID").cast("int").alias("dropoff_locationId"),
    # timestamps
    col("tpep_pickup_datetime").cast("timestamp").alias("pickup_datetime"),
    col("tpep_dropoff_datetime").cast("timestamp").alias("dropoff_datetime"),
    # trip info
    col("passenger_count").cast("double").alias("passenger_count"),
    col("trip_distance").cast("double").alias("trip_distance"),
    lit(1).alias("trip_type"),
    # payment info
    col("payment_type").cast("int").alias("payment_type"),
    col("fare_amount").cast("double").alias("fare_amount"),
    col("extra").cast("double").alias("extra"),
    col("mta_tax").cast("double").alias("mta_tax"),
    col("tip_amount").cast("double").alias("tip_amount"),
    col("tolls_amount").cast("double").alias("tolls_amount"),
    lit(0.0).alias("ehail_fee"),
    col("improvement_surcharge").cast("double").alias("improvement_surcharge"),
    col("total_amount").cast("double").alias("total_amount"),
    col("congestion_surcharge").cast("double").alias("congestion_surcharge"),
    col("airport_fee").cast("double").alias("airport_fee"),
)
df = df.withColumn("taxi_type", lit("yellow"))

In [None]:
df.printSchema()

In [None]:
df.show(5, False)

In [None]:
df.write.format("parquet") \
    .mode("overwrite") \
    .option("path", "s3a://raw/demo/") \
    .save()

In [None]:
df2 = spark.read.parquet("s3a://processed/yellow_tripdata/*")
df2.show(5, False)

In [None]:
df2.writeTo("local.nyc.taxi").createOrReplace()

In [None]:
# load packages
from pyspark.sql.functions import *
from datetime import datetime, timedelta

# COMMAND ----------

# define boundaries
startdate = datetime.strptime('2019-01-01', '%Y-%m-%d')
enddate = datetime.strptime('2025-01-01', '%Y-%m-%d')

# COMMAND ----------

# define column names and its transformation rules on the Date column
column_rule_df = spark.createDataFrame([
    ("DateID", "cast(date_format(date, 'yyyyMMdd') as int)"),  # 20230101
    ("Year", "year(date)"),  # 2023
    ("Quarter", "quarter(date)"),  # 1
    ("Month", "month(date)"),  # 1
    ("Day", "day(date)"),  # 1
    ("Week", "weekofyear(date)"),  # 1
    ("QuarterNameShort", "date_format(date, 'QQQ')"),  # Q1
    ("QuarterNumberString", "date_format(date, 'QQ')"),  # 01
    ("MonthNameLong", "date_format(date, 'MMMM')"),  # January
    ("MonthNameShort", "date_format(date, 'MMM')"),  # Jan
    ("MonthNumberString", "date_format(date, 'MM')"),  # 01
    ("DayNumberString", "date_format(date, 'dd')"),  # 01
    ("WeekNameLong", "concat('week', lpad(weekofyear(date), 2, '0'))"),  # week 01
    ("WeekNameShort", "concat('w', lpad(weekofyear(date), 2, '0'))"),  # w01
    ("WeekNumberString", "lpad(weekofyear(date), 2, '0')"),  # 01
    ("DayOfWeek", "dayofweek(date)"),  # 1
    ("YearMonthString", "date_format(date, 'yyyy/MM')"),  # 2023/01
    ("DayOfWeekNameLong", "date_format(date, 'EEEE')"),  # Sunday
    ("DayOfWeekNameShort", "date_format(date, 'EEE')"),  # Sun
    ("DayOfMonth", "cast(date_format(date, 'd') as int)"),  # 1
    ("DayOfYear", "cast(date_format(date, 'D') as int)")  # 1
], ["new_column_name", "expression"])

# COMMAND ----------

# explode dates between the defined boundaries into one column
start = int(startdate.timestamp())
stop = int(enddate.timestamp())
df = spark.range(start, stop, 60 * 60 * 24).select(col("id").cast("timestamp").cast("date").alias("Date"))
df.show()


In [None]:
# this loops over all rules defined in column_rule_df adding the new columns
for row in column_rule_df.collect():
    new_column_name = row["new_column_name"]
    expression = expr(row["expression"])
    df = df.withColumn(new_column_name, expression)
display(df)

# COMMAND ----------

# display(df.withColumn("Playground", expr("date_format(date, 'yyyyMMDD')")))

In [None]:
for row in column_rule_df.collect():
    print(row)