In [0]:
# Databricks notebook source
import pandas as pd
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import TimestampType, MapType, StringType
from pyspark.sql.functions import monotonically_increasing_id, hour, dayofmonth, month, year, date_format, dayofweek, create_map, col, lit

In [0]:
# Initialize Spark session 
spark = SparkSession.builder.appName("ReadBlobCSV").getOrCreate()

# Define Azure storage account parameters
storage_account = "nycyellowtaxidataset"
container = "nycyellowtaxisdatasets"
access_key = "your_acccount_key"
filepath = "/FinalCSVFile/nycyellowtaxidataset.csv"  

# Connect to Azure Blob Storage
wasbs_path = "wasbs://%s@%s.blob.core.windows.net/%s" % (container, storage_account, filepath)

spark.conf.set(
  "fs.azure.account.key.%s.blob.core.windows.net" % storage_account, access_key)

# Read the CSV file from Blob storage into Spark DataFrame 
df = spark.read.option("header", "true") \
    .option("delimiter",",") \
    .csv(wasbs_path, header=True, inferSchema=True) 
df = df.coalesce(1)

In [0]:
df.describe().show()

In [0]:
df = df.withColumn("tpep_pickup_datetime", 
                   df["tpep_pickup_datetime"].cast(TimestampType()))
df = df.withColumn("tpep_dropoff_datetime",  
                   df["tpep_dropoff_datetime"].cast(TimestampType()))

In [0]:
df= df.filter(df['RatecodeID'] != 99)
df = df.filter((df['PULocationID'] != 264) & (df['PULocationID'] != 265))

In [0]:
# Drop duplicates
df = df.dropDuplicates()
# Reset index, remove existing index column
df = df.withColumn("tmp_idx", monotonically_increasing_id())
df = df.drop("index") # index column if exists
df = df.withColumnRenamed("tmp_idx", "index")
# Add new trip_id column based on row number 
df = df.withColumn("trip_id", monotonically_increasing_id())

In [0]:
datetime_dim = df.select("tpep_pickup_datetime", "tpep_dropoff_datetime")

# Extract pickup attributes
datetime_dim = datetime_dim.withColumn("pick_hour", hour(df.tpep_pickup_datetime))
datetime_dim = datetime_dim.withColumn("pick_day", dayofmonth(df.tpep_pickup_datetime)) 
datetime_dim = datetime_dim.withColumn("pick_month", month(df.tpep_pickup_datetime))
datetime_dim = datetime_dim.withColumn("pick_year", year(df.tpep_pickup_datetime))
datetime_dim = datetime_dim.withColumn("pick_weekday", dayofweek(df.tpep_pickup_datetime))

# Similarly extract drop off attributes
# Add sequential ID column
datetime_dim = datetime_dim.withColumn("datetime_id", monotonically_increasing_id())

# Select desired columns
datetime_dim = datetime_dim.select(["datetime_id", "tpep_pickup_datetime","tpep_dropoff_datetime", 
                                  "pick_hour","pick_day","pick_month","pick_year","pick_weekday"])

#datetime_dim.show(5)

+-----------+--------------------+---------------------+---------+--------+----------+---------+------------+
|datetime_id|tpep_pickup_datetime|tpep_dropoff_datetime|pick_hour|pick_day|pick_month|pick_year|pick_weekday|
+-----------+--------------------+---------------------+---------+--------+----------+---------+------------+
|          0| 2023-08-01 00:26:44|  2023-08-01 00:45:25|        0|       1|         8|     2023|           3|
|          1| 2023-07-01 00:29:59|  2023-07-01 00:40:15|        0|       1|         7|     2023|           7|
|          2| 2023-08-01 00:55:42|  2023-08-01 01:00:53|        0|       1|         8|     2023|           3|
|          3| 2023-08-01 00:32:04|  2023-08-01 01:09:03|        0|       1|         8|     2023|           3|
|          4| 2023-08-01 00:13:37|  2023-08-01 00:41:15|        0|       1|         8|     2023|           3|
+-----------+--------------------+---------------------+---------+--------+----------+---------+------------+
only showi

In [0]:
df = df.withColumnRenamed('PULocationID', 'pickup_location_id')
df = df.withColumnRenamed('DOLocationID', 'dropoff_location_id')
df = df.withColumnRenamed('payment_type', 'payment_type_id')
df = df.withColumnRenamed('RatecodeID', 'rate_code_id')

In [0]:
# Create rate code map 
rate_code_map = create_map([
    lit(1), lit("Standard rate"), 
    lit(2), lit("JFK"),
    lit(3), lit("Newark"),
    lit(4), lit("Nassau or Westchester"),
    lit(5), lit("Negotiated fare"),
    lit(6), lit("Group ride")
])
# Rate Code Dim Table
rate_code_dim = df.select("rate_code_id").distinct()
rate_code_dim = rate_code_dim.withColumn("rate_code_name", rate_code_map.getItem(col("rate_code_id")).cast(StringType()))
rate_code_dim = rate_code_dim.select("rate_code_id","rate_code_name")



In [0]:
#unique_payment_types = df.select("payment_type_id").distinct()
#unique_payment_types.show()

+---------------+
|payment_type_id|
+---------------+
|              1|
|              3|
|              2|
|              4|
|              0|
+---------------+



In [0]:
# Create payment type name map
payment_map = create_map([
    lit(0), lit("Credit card"),
    lit(1), lit("Cash"),
    lit(2), lit("No charge"),
    lit(3), lit("Dispute"),
    lit(4), lit("Unknown"),
    lit(5), lit("Voided trip")
])

# Payment type dim table
payment_type_dim = df.select("payment_type_id").distinct()
payment_type_dim = payment_type_dim.withColumn("payment_type", payment_map.getItem(col("payment_type_id")))
payment_type_dim = payment_type_dim.select("payment_type_id", "payment_type")

In [0]:
#payment_type_dim.show()

+---------------+------------+
|payment_type_id|payment_type|
+---------------+------------+
|              1|        Cash|
|              3|     Dispute|
|              2|   No charge|
|              4|     Unknown|
+---------------+------------+



In [0]:
lookup_filepath = "/FinalCSVFile/zone_coordinates.csv"  
lookup_path = "wasbs://%s@%s.blob.core.windows.net/%s" % (container, storage_account, lookup_filepath)
spark.conf.set("fs.azure.account.key.%s.blob.core.windows.net" % storage_account, access_key)
zone_coordinates = spark.read.option("header", "true") \
    .option("delimiter",",") \
    .csv(lookup_path, header=True, inferSchema=True) 
zone_coordinates = zone_coordinates.coalesce(1)

lookup_filepath2 = "/FinalCSVFile/taxizone_lookup.csv"  
lookup_path = "wasbs://%s@%s.blob.core.windows.net/%s" % (container, storage_account, lookup_filepath2)
spark.conf.set("fs.azure.account.key.%s.blob.core.windows.net" % storage_account, access_key)
taxizone_lookup = spark.read.option("header", "true") \
    .option("delimiter",",") \
    .csv(lookup_path, header=True, inferSchema=True) 
taxizone_lookup = taxizone_lookup.coalesce(1)

In [0]:
pickup_location_dim = df.select("pickup_location_id").distinct()
pickup_location_dim = pickup_location_dim.select("pickup_location_id") \
    .join(zone_coordinates.alias("zone"), pickup_location_dim["pickup_location_id"] == zone_coordinates["LocationID"] , how="left_outer") \
    .join(taxizone_lookup.alias("taxi"), pickup_location_dim["pickup_location_id"] == taxizone_lookup["LocationID"] ,how="left_outer")
pickup_location_dim = pickup_location_dim.select("pickup_location_id", col("zone.Zone"),col("zone.Latitude"),col("zone.Longitude"), col("taxi.Borough"))
pickup_location_dim = pickup_location_dim.withColumnsRenamed({"Zone":"pickup_zone","Borough":"pickup_borough", "Latitude":"pickup_latitude", "Longitude":"pickup_longitude"})

dropoff_location_dim = df.select("dropoff_location_id").distinct()
dropoff_location_dim = dropoff_location_dim.select("dropoff_location_id") \
    .join(zone_coordinates.alias("zone"), df["dropoff_location_id"] == zone_coordinates["LocationID"] , how="left_outer") \
    .join(taxizone_lookup.alias("taxi"), dropoff_location_dim["dropoff_location_id"] == taxizone_lookup["LocationID"] , how="left_outer")
dropoff_location_dim = dropoff_location_dim.select("dropoff_location_id", col("zone.Zone"),col("zone.Latitude"),col("zone.Longitude"), col("taxi.Borough"))
dropoff_location_dim = dropoff_location_dim.withColumnsRenamed({"Zone":"dropoff_zone","Borough":"dropoff_borough", "Latitude":"dropoff_latitude","Longitude":"dropoff_longitude"})

In [0]:
# Perform Spark SQL joins with aliases
fact_table = df.join(datetime_dim, df["trip_id"] == datetime_dim["datetime_id"], how = "leftouter") \
    .join(rate_code_dim.alias("rate"), df["rate_code_id"] == rate_code_dim["rate_code_id"], how = "leftouter") \
    .join(payment_type_dim.alias("payment"), df["payment_type_id"] == payment_type_dim["payment_type_id"], how = "leftouter") \
    .join(pickup_location_dim.alias("pickup"), df["pickup_location_id"] == pickup_location_dim["pickup_location_id"], how = "leftouter") \
    .join(dropoff_location_dim.alias("dropoff"), df["dropoff_location_id"] == dropoff_location_dim["dropoff_location_id"], how = "leftouter") \
    .select(
        "trip_id", "VendorID", "datetime_id", "passenger_count",
        "trip_distance","store_and_fwd_flag", col("rate.rate_code_id"), 
        col("pickup.pickup_location_id"),col("dropoff.dropoff_location_id"),
        col("payment.payment_type_id"), "fare_amount", "extra", "mta_tax", 
        "tip_amount", "tolls_amount", "improvement_surcharge", "total_amount"
    )

In [0]:
fact_table = fact_table.filter(fact_table["VendorID"] == 1)

In [0]:
# Define JDBC URL
jdbc_url = "jdbc:sqlserver://nycyellowtaxi.database.windows.net:1433;database=nyc_yellow_taxi_data;user=yourusername;password=yourpassword;encrypt=true;trustServerCertificate=false;hostNameInCertificate=*.database.windows.net;loginTimeout=30;"
fact_table.write.format("jdbc").option("url", jdbc_url).option("dbtable", "fact_table").mode("overwrite").save()
payment_type_dim.write.format("jdbc").option("url", jdbc_url).option("dbtable", "payment_type_dim").mode("overwrite").save()
pickup_location_dim.write.format("jdbc").option("url", jdbc_url).option("dbtable", "pickup_location_dim").mode("overwrite").save()
dropoff_location_dim.write.format("jdbc").option("url", jdbc_url).option("dbtable", "dropoff_location_dim").mode("overwrite").save()
rate_code_dim.write.format("jdbc").option("url", jdbc_url).option("dbtable", "rate_code_dim").mode("overwrite").save() 
datetime_dim.write.format("jdbc").option("url", jdbc_url).option("dbtable", "datetime_dim").mode("overwrite").save()