In [1]:
from pyspark.sql import SparkSession 
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark import StorageLevel

In [2]:
spark = SparkSession.builder \
    .appName("NYC TLC Merge") \
    .config("spark.sql.extensions", "io.delta.sql.DeltaSparkSessionExtension") \
    .config("spark.sql.catalog.spark_catalog", "org.apache.spark.sql.delta.catalog.DeltaCatalog") \
    .config("spark.hadoop.fs.s3a.endpoint", "http://minio:9000") \
    .config("spark.hadoop.fs.s3a.access.key", "minio") \
    .config("spark.hadoop.fs.s3a.secret.key", "minio123") \
    .config("spark.hadoop.fs.s3a.path.style.access", "true") \
    .config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem") \
    .config("spark.driver.memory", "4g") \
    .config("spark.executor.memory", "4g") \
    .config("spark.executor.memoryOverhead", "2g") \
    .config("spark.driver.memoryOverhead", "2g") \
    .getOrCreate()

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
25/07/22 08:05:27 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [3]:
df_yellow = spark.read.parquet("s3a://tlc/yellow_tripdata_2024-*.parquet")


25/07/22 08:05:30 WARN MetricsConfig: Cannot locate configuration: tried hadoop-metrics2-s3a-file-system.properties,hadoop-metrics2.properties
                                                                                

In [4]:
df_green = spark.read.parquet("s3a://tlc/green_tripdata_2024-*.parquet")

In [5]:
df_yellow = df_yellow \
    .withColumnRenamed("tpep_pickup_datetime", "pickup_datetime") \
    .withColumnRenamed("tpep_dropoff_datetime", "dropoff_datetime") \
    .withColumn("taxi_type", lit("yellow"))

df_green = df_green \
    .withColumnRenamed("lpep_pickup_datetime", "pickup_datetime") \
    .withColumnRenamed("lpep_dropoff_datetime", "dropoff_datetime") \
    .withColumn("taxi_type", lit("green"))

common_cols = list(set(df_yellow.columns).intersection(set(df_green.columns)))
df_yellow = df_yellow.select(common_cols)
df_green = df_green.select(common_cols)

df = df_yellow.unionByName(df_green)
#=====================================================================
##=====================================================================

In [6]:
df.printSchema()

root
 |-- RatecodeID: long (nullable = true)
 |-- extra: double (nullable = true)
 |-- passenger_count: long (nullable = true)
 |-- dropoff_datetime: timestamp_ntz (nullable = true)
 |-- tolls_amount: double (nullable = true)
 |-- pickup_datetime: timestamp_ntz (nullable = true)
 |-- congestion_surcharge: double (nullable = true)
 |-- DOLocationID: integer (nullable = true)
 |-- PULocationID: integer (nullable = true)
 |-- trip_distance: double (nullable = true)
 |-- total_amount: double (nullable = true)
 |-- improvement_surcharge: double (nullable = true)
 |-- mta_tax: double (nullable = true)
 |-- store_and_fwd_flag: string (nullable = true)
 |-- tip_amount: double (nullable = true)
 |-- VendorID: integer (nullable = true)
 |-- payment_type: long (nullable = true)
 |-- taxi_type: string (nullable = false)
 |-- fare_amount: double (nullable = true)



In [7]:
df = df.drop('store_and_fwd_flag')

In [8]:
#df.count()

In [9]:
#for i in df.columns :
#    print(f"=================================== {i} ============================== ")
#    df.select(
#        sum(when(col(i).isNull(), 1).otherwise(0)).alias("null_count")
#    ).show()

In [10]:
#for i in df.columns:
#    print(f"================================= {i} ============================================")
#    df.groupBy(i).agg(count("*").alias("value_count")).orderBy("value_count", ascending=False).show()


In [11]:
#before_cleaning=df.count()


In [12]:
#============================================== drop Nulls =========================================================
df = df.dropna(subset=[
    "congestion_surcharge",
    "payment_type",
    "RatecodeID",
    "passenger_count",
   
    
])

In [13]:
#============================================== dropDuplicates =========================================================

df = df.dropDuplicates([
    "pickup_datetime", 
    "dropoff_datetime", 
    "passenger_count", 
    "trip_distance", 
    "PULocationID", 
    "DOLocationID"
])

In [14]:

df = df.withColumn("trip_duration", (unix_timestamp("dropoff_datetime") - unix_timestamp("pickup_datetime")))
df = df.withColumn("trip_speed_mph", round(col("trip_distance") / (col("trip_duration") / 3600), 2))

In [15]:
#============================================== filter =========================================================

df = df.filter(
    (col("passenger_count") > 0) &
    (col("passenger_count") <= 6) &
    (col("trip_distance") > 0) &
    (col("trip_distance") < 150) &
    (col("fare_amount") >= 0) &
    (col("total_amount") >= 0) &
    (col("tip_amount") >= 0) &
    (col("extra") >= 0) &
    (col("mta_tax") >= 0) &
    (col("improvement_surcharge") >= 0) &
    (col("congestion_surcharge") >= 0) &
    (col("tolls_amount") >= 0) &
    (col("RatecodeID").isin([1, 2, 3, 4, 5, 6])) &  
    (col("payment_type").isin([1, 2, 3, 4, 5, 6])) &
    (col("trip_duration") >= 60) & 
    (col("trip_duration") <= 10800) &
    (col("trip_speed_mph") >= 1) &
    (col("trip_speed_mph") <= 80) &
    (col("PULocationID").between(1, 263)) &
    (col("DOLocationID").between(1, 263))
)


In [16]:
#============================================== Encoding =========================================================
# VendorID => vendor_name
df = df.withColumn("vendor_name", when(col("VendorID") == 1, "Creative Mobile Technologies")
                             .when(col("VendorID") == 2, "Curb Mobility")
                             .when(col("VendorID") == 6, "Myle Technologies")
                             .when(col("VendorID") == 7, "Helix")
                             .otherwise("Unknown"))

# RatecodeID => rate_code_desc
df = df.withColumn("rate_code_desc", when(col("RatecodeID") == 1, "Standard rate")
                              .when(col("RatecodeID") == 2, "JFK")
                              .when(col("RatecodeID") == 3, "Newark")
                              .when(col("RatecodeID") == 4, "Nassau or Westchester")
                              .when(col("RatecodeID") == 5, "Negotiated fare")
                              .when(col("RatecodeID") == 6, "Group ride")
                              .when(col("RatecodeID") == 99, "Unknown")
                              .otherwise("Unknown"))

# payment_type => payment_type_desc
df = df.withColumn("payment_type_desc", when(col("payment_type") == 0, "Flex Fare")
                                 .when(col("payment_type") == 1, "Credit card")
                                 .when(col("payment_type") == 2, "Cash")
                                 .when(col("payment_type") == 3, "No charge")
                                 .when(col("payment_type") == 4, "Dispute")
                                 .when(col("payment_type") == 5, "Unknown")
                                 .when(col("payment_type") == 6, "Voided trip")
                                 .otherwise("Unknown"))




In [17]:
#for i in df.columns:
#    print(f"================================= {i} ============================================")
#    df.groupBy(i).agg(count("*").alias("value_count")).orderBy("value_count", ascending=False).show()

In [18]:
zones = spark.read.csv("s3a://tlc/taxi_zone_lookup.csv", header=True, inferSchema=True)
#zones.show()

                                                                                

In [19]:
df.createOrReplaceTempView("trips")
zones.createOrReplaceTempView("zones")

df = spark.sql("""
SELECT
    trips.*,
    pu.Borough         AS PUBorough,
    pu.Zone            AS PUZone,
    pu.service_zone    AS PU_service_zone,
    do.Borough         AS DOBorough,
    do.Zone            AS DOZone,
    do.service_zone    AS DO_service_zone
FROM trips
LEFT JOIN zones pu
    ON trips.PULocationID = pu.LocationID
LEFT JOIN zones do
    ON trips.DOLocationID = do.LocationID
""")



In [20]:
#============================================== Add Columns =========================================================
#===================================================================================================================================
df = df.withColumn("pickup_hour", hour(col("pickup_datetime")))
df = df.withColumn(
    "pickup_period",
    when(col("pickup_hour").between(0, 5), "Late Night")
    .when(col("pickup_hour").between(6, 11), "Morning")
    .when(col("pickup_hour").between(12, 16), "Afternoon")
    .when(col("pickup_hour").between(17, 20), "Evening")
    .otherwise("Night")
)
    
df = df.withColumn("day_of_week_name", date_format(col("pickup_datetime"), "E"))
df = df.withColumn("is_weekend", (col("day_of_week_name") == "Fri" ) | (col("day_of_week_name") == "Sat"))



df = df.withColumn("trip_length_category",
                   when(col("trip_duration") < 300, "Very Short")
                   .when((col("trip_duration") >= 300) & (col("trip_duration") < 900), "Short")
                   .when((col("trip_duration") >= 900) & (col("trip_duration") < 1800), "Medium")
                   .otherwise("Long"))

df = df.withColumn("IsWeekendText", 
    when(col("is_weekend") == True, "Weekend")
    .otherwise("Weekday"))
#===================================================================================================================================
# 1. fare_per_mile
df = df.withColumn("fare_per_mile", 
                   when(col("trip_distance") > 0, col("fare_amount") / col("trip_distance")))

# 2. fare_per_minute (duration بالثواني)
df = df.withColumn("fare_per_minute", 
                   when(col("trip_duration") > 0, col("fare_amount") / (col("trip_duration") / 60)))

# 3. tip_ratio
df = df.withColumn("tip_ratio", 
                   when(col("total_amount") > 0, col("tip_amount") / col("total_amount")))

# 4. trip_efficiency (miles per second)
df = df.withColumn("trip_efficiency", 
                   when(col("trip_duration") > 0, col("trip_distance") / col("trip_duration")))

# 6. is_airport_trip (based on pickup/dropoff borough)
df = df.withColumn("is_airport_trip",
                   (col("PUBorough").contains("Airport")) | (col("DOBorough").contains("Airport")))

# 7. is_suspicious (مسافة صغيرة جدًا ومبلغ عالي)
df = df.withColumn("is_suspicious",
                   (col("trip_distance") < 0.2) & (col("total_amount") > 30))

In [21]:
#df.printSchema()


In [22]:
df = df.repartition(500)
df.persist(StorageLevel.MEMORY_AND_DISK)

#after_cleaning=df.count()


25/07/22 08:05:40 WARN package: Truncated the string representation of a plan since it was too large. This behavior can be adjusted by setting 'spark.sql.debug.maxToStringFields'.


DataFrame[RatecodeID: bigint, extra: double, passenger_count: bigint, dropoff_datetime: timestamp_ntz, tolls_amount: double, pickup_datetime: timestamp_ntz, congestion_surcharge: double, DOLocationID: int, PULocationID: int, trip_distance: double, total_amount: double, improvement_surcharge: double, mta_tax: double, tip_amount: double, VendorID: int, payment_type: bigint, taxi_type: string, fare_amount: double, trip_duration: bigint, trip_speed_mph: double, vendor_name: string, rate_code_desc: string, payment_type_desc: string, PUBorough: string, PUZone: string, PU_service_zone: string, DOBorough: string, DOZone: string, DO_service_zone: string, pickup_hour: int, pickup_period: string, day_of_week_name: string, is_weekend: boolean, trip_length_category: string, IsWeekendText: string, fare_per_mile: double, fare_per_minute: double, tip_ratio: double, trip_efficiency: double, is_airport_trip: boolean, is_suspicious: boolean]

In [23]:
#print((after_cleaning / before_cleaning)*100 )

In [24]:
#===============================================================================================================
#===============================================================================================================
#===============================================================================================================
df = df.withColumnRenamed("RatecodeID", "ratecode_id")
df = df.withColumnRenamed("payment_type", "payment_type_id")
df = df.withColumnRenamed("VendorID", "vendor_id")
#===============================================================================================================
#===============================================================================================================
#===============================================================================================================


In [25]:
#===============================================================================================================
#===============================================================================================================
#===============================================================================================================
#====================================== Modeling ===============================================================
#===============================================================================================================
#===============================================================================================================
#===============================================================================================================
#===============================================================================================================
#===============================================================================================================


In [26]:
dim_vendor = df.select("vendor_id", "vendor_name").dropDuplicates()

In [27]:
dim_rate_code = df.select("ratecode_id", "rate_code_desc").dropDuplicates()

In [28]:
dim_payment_type = df.select("payment_type_id", "payment_type_desc").dropDuplicates()

In [29]:
dim_pickup_location = df.select(
    col("PULocationID"),
    col("PUBorough"),
    col("PUZone"),
    col("PU_service_zone")
).dropDuplicates()


In [30]:
dim_dropoff_location = df.select(
    col("DOLocationID"),
    col("DOBorough"),
    col("DOZone"),
    col("DO_service_zone")
).dropDuplicates()


In [31]:
dim_time = df.select(
    col("pickup_datetime").alias("pickup_datetime"),
    year("pickup_datetime").alias("year"),
    month("pickup_datetime").alias("month"),
    dayofmonth("pickup_datetime").alias("day"),
    hour("pickup_datetime").alias("hour"),
    col("pickup_hour"),
    col("pickup_period"),
    col("day_of_week_name"),
    col("is_weekend"),
    col("IsWeekendText")
).dropDuplicates()

dim_time = dim_time.withColumn("time_id", monotonically_increasing_id())

dim_time = dim_time.withColumn("pickup_datetime", col("pickup_datetime").cast("timestamp"))

In [32]:
dim_trip_category = df.select(
    "trip_length_category",
    "is_airport_trip",
    "is_suspicious"
).dropDuplicates().withColumn("trip_category_id", monotonically_increasing_id())


In [33]:
fact_trips = df.select(
    "pickup_datetime", "dropoff_datetime", "trip_distance", "trip_duration", "trip_speed_mph", 
    "trip_efficiency", "fare_amount", "tip_amount", "tolls_amount", "total_amount", "passenger_count",
    "fare_per_mile", "fare_per_minute", "congestion_surcharge", "mta_tax", "extra", "improvement_surcharge",
    "ratecode_id", "vendor_id", "payment_type_id", "PULocationID", "DOLocationID", 
    
    # المهمين للربط
    "trip_length_category", "is_airport_trip", "is_suspicious", "taxi_type" 
)
fact_trips = fact_trips.withColumn("trip_id", monotonically_increasing_id())




In [34]:
#============================================== Lookup ==============================================
fact_trips = fact_trips.join(
    dim_time.select("pickup_datetime", "time_id"),
    fact_trips.pickup_datetime == dim_time.pickup_datetime,
    "left"
).drop("pickup_datetime")

fact_trips = fact_trips.join(
    dim_trip_category,
    on=["trip_length_category", "is_airport_trip", "is_suspicious"],
    how="left"
).drop("trip_length_category", "is_airport_trip", "is_suspicious")


fact_trips = fact_trips.withColumn("dropoff_datetime", col("dropoff_datetime").cast("timestamp"))


In [35]:
#fact_trips.printSchema()

In [36]:
#for i in df.columns :
#    print(f"=================================== {i} ============================== ")
#    df.select(
#        sum(when(col(i).isNull(), 1).otherwise(0)).alias("null_count")
#    ).show()

In [37]:
#===============================================================================================================
#===============================================================================================================
#===============================================================================================================
#====================================== snowflake ==============================================================
#===============================================================================================================
#===============================================================================================================
#===============================================================================================================
#===============================================================================================================
#===============================================================================================================

In [38]:
#spark = SparkSession.builder \
#    .appName("snowflake-test") \
#    .config("spark.driver.memory", "2g") \
#    .config("spark.executor.memory", "2g") \
#    .config("spark.executor.memoryOverhead", "2g") \
#    .config("spark.driver.memoryOverhead", "2g") \
#    .config("spark.jars.packages", 
#            "net.snowflake:spark-snowflake_2.12:2.9.0-spark_3.2,net.snowflake:snowflake-jdbc:3.13.31") \
#    .getOrCreate()

In [39]:
#sfOptions = {
#  "sfURL" : "AKGEMQF-VN65042.snowflakecomputing.com",
#  "sfUser" : "LOL",
#  "sfPassword" : "A123456789a123",
#  "sfDatabase" : "NYC_TLC",
#  "sfSchema" : "PUBLIC",
#  "sfWarehouse" : "COMPUTE_WH",
#  "sfRole": "ACCOUNTADMIN"
#}


In [40]:
#dim_vendor.write \
#    .format("snowflake") \
#    .options(**sfOptions) \
#    .option("dbtable", "dim_vendor") \
#    .mode("append") \
#    .save()


In [41]:
#dim_rate_code.write \
#    .format("snowflake") \
#    .options(**sfOptions) \
#    .option("dbtable", "dim_rate_code") \
#    .mode("append") \
#    .save()


In [42]:
#dim_payment_type.write \
#    .format("snowflake") \
#    .options(**sfOptions) \
#    .option("dbtable", "dim_payment_type") \
#    .mode("append") \
#    .save()

In [43]:
#dim_pickup_location.write \
#    .format("snowflake") \
#    .options(**sfOptions) \
#    .option("dbtable", "dim_pickup_location") \
#    .mode("append") \
#    .save()

In [44]:
#dim_dropoff_location.write \
#    .format("snowflake") \
#    .options(**sfOptions) \
#    .option("dbtable", "dim_dropoff_location") \
#    .mode("append") \
#    .save()

In [45]:
#dim_time.write \
#    .format("snowflake") \
#    .options(**sfOptions) \
#    .option("dbtable", "dim_time") \
#    .mode("append") \
#    .save()

In [46]:
#dim_trip_category.write \
#   .format("snowflake") \
#    .options(**sfOptions) \
#    .option("dbtable", "dim_trip_category") \
#    .mode("append") \
#    .save()

In [47]:
#fact_trips.write \
#    .format("snowflake") \
#    .options(**sfOptions) \
#    .option("dbtable", "fact_trips") \
#    .mode("append") \
#    .save()



In [None]:
jdbc_url = "jdbc:postgresql://postgres:5432/admin"
connection_properties = {
   "user": "admin",
   "password": "password",
   "driver": "org.postgresql.Driver"
}

#List of DataFrames and table names
tables = {
   #"fact_trips": fact_trips,
   "dim_trip_category": dim_trip_category,
   "dim_time": dim_time,
   "dim_dropoff_location": dim_dropoff_location,
   "dim_pickup_location": dim_pickup_location,
   "dim_payment_type": dim_payment_type,
   "dim_rate_code": dim_rate_code,
   "dim_vendor": dim_vendor
}

#Loop over each table and write to PostgreSQL
for table_name, df in tables.items():
   print(f"Writing table: {table_name}")
   
   df.write \
     .option("batchsize", 10000) \
     .option("numPartitions", 10) \
     .jdbc(
        url=jdbc_url,
         table=table_name,
         mode="overwrite",  
         properties=connection_properties
     )


Writing table: dim_trip_category


                                                                                

Writing table: dim_time


                                                                                

Writing table: dim_dropoff_location


                                                                                

Writing table: dim_pickup_location


                                                                                

Writing table: dim_payment_type


                                                                                

Writing table: dim_rate_code


                                                                                

Writing table: dim_vendor


                                                                                

In [None]:
fact_trips.write \
  .option("batchsize", 10000) \
  .option("numPartitions", 10) \
  .jdbc(
     url=jdbc_url,
      table="fact_trips",
      mode="overwrite",  
      properties=connection_properties
  )

                                                                                

# 