In [26]:
today_date = ''

StatementMeta(, cfdf57b2-2fef-4ffc-a7d4-eea667839a1d, 28, Finished, Available, Finished)

In [2]:
from pyspark.sql.functions import col, to_date, col, trim, regexp_replace, when, expr, to_timestamp, date_format, lit, isnull, hour, concat_ws, sha2, dayofweek
nyctaxiyellow_bronze_path = "abfss://NycTaxiProject@onelake.dfs.fabric.microsoft.com/NycTaxiProject.Lakehouse/Tables/nyctaxiyellow_bronze"

df_yellow = spark.read.format('csv').option("header", "true").load(nyctaxiyellow_bronze_path).filter(col('processing_date')==str(today_date))



StatementMeta(, cfdf57b2-2fef-4ffc-a7d4-eea667839a1d, 4, Finished, Available, Finished)

**Data Cleaning**


**Creating Data Quality flag for numeric columns**

**Invalid Numeric Handling**

In [4]:
df_yellow = df_yellow.withColumn(
    "invalid_numeric_flag",
    when(
        (col("trip_distance") < 0) |
        (col("trip_distance").isNull()) |
        (col("fare_amount") < 0) |
        (col("fare_amount").isNull()) |
        (col("total_amount") < 0) |
        (col("total_amount").isNull()),
        lit("Y")
    ).otherwise(lit("N"))
)


StatementMeta(, cfdf57b2-2fef-4ffc-a7d4-eea667839a1d, 6, Finished, Available, Finished)

**Creating data quality flag for pickup and dropoff location**

In [5]:
df_yellow = df_yellow.withColumn(
    "invalid_location_flag",
    when(
        col("PULocationID").isNull() | col("DOLocationID").isNull(),
        lit("Y")
    ).otherwise(lit("N"))
)

StatementMeta(, cfdf57b2-2fef-4ffc-a7d4-eea667839a1d, 7, Finished, Available, Finished)

**Handling Missing Value**

**Drop rows with missing values**

In [6]:
print("Before dropping na", df_yellow.count())
df_yellow = df_yellow.dropna(subset = ['VendorID', 'tpep_pickup_datetime', 'tpep_dropoff_datetime', 'PULocationID', 'DOLocationID'])
print('After dropping na', df_yellow.count())

StatementMeta(, cfdf57b2-2fef-4ffc-a7d4-eea667839a1d, 8, Finished, Available, Finished)

Before dropping na 4591845
After dropping na 4591845


**Business Transformation**

**1. Deriving Custom Columns**

**2. Data Enrichment**

**3. Renaming Column headers**

**4. Metadata: source name**

In [7]:
df_yellow = df_yellow.withColumn(
    "vendor_name",
     when(col('VendorID') == 1, "Creative Mobile Technologies, LLC")
        .when(col('VendorID') == 2, "Curb Mobility, LLC")
        .when(col('VendorID') == 6, "Myle Technologies Inc")
        .when(col('VendorID') == 7, "Helix")
        .otherwise("Unknown Vendor")
     )

df_yellow = df_yellow.withColumn(
    "payment_type_name",
    when(col("payment_type") == 0, "Flex Fare trip")
    .when(col("payment_type") == 1, "Credit card")
    .when(col("payment_type") == 2, "Cash")
    .when(col("payment_type") == 3, "No charge")
    .when(col("payment_type") == 4, "Dispute")
    .when(col("payment_type") == 5, "Unknown")
    .when(col("payment_type") == 6, "Voided trip")
    .otherwise("n/a")
)


df_yellow = df_yellow.withColumn(
    "final_rate_code_name",
    when(col("RatecodeID") == 1, "Standard rate")
    .when(col("RatecodeID") == 2, "JFK")
    .when(col("RatecodeID") == 3, "Newark")
    .when(col("RatecodeID") == 4, "Nassau or Westchester")
    .when(col("RatecodeID") == 5, "Negotiated fare")
    .when(col("RatecodeID") == 6, "Group ride")
    .otherwise("n/a")
)

    

StatementMeta(, cfdf57b2-2fef-4ffc-a7d4-eea667839a1d, 9, Finished, Available, Finished)

**convert date string to date and create a new column pickup_time/drop-off time and pickup date/dropoff date**

In [8]:
df_yellow = df_yellow.withColumn(
  "tpep_pickup_datetime",
  to_timestamp(col("tpep_pickup_datetime"))
)


df_yellow = df_yellow.withColumn(
  "tpep_dropoff_datetime",
  to_timestamp(col("tpep_dropoff_datetime"))
)


StatementMeta(, cfdf57b2-2fef-4ffc-a7d4-eea667839a1d, 10, Finished, Available, Finished)

**Deriving New Column**

In [9]:

df_yellow = df_yellow.withColumn(
    "trip_duration_minutes",
    expr("timestampdiff(MINUTE, tpep_pickup_datetime, tpep_dropoff_datetime)")
)

df_yellow = df_yellow.withColumn(
    "pickup_day_of_week",
    date_format(col("tpep_pickup_datetime"), "EEEE")
).withColumn(
    "pickup_week_day_type",
    when(dayofweek(col("tpep_pickup_datetime")).isin(1,7), "weekend")
    .otherwise("weekday")
).withColumn(
    "pickup_month",
    date_format(col("tpep_pickup_datetime"), "MMMM")
).withColumn(
    "pickup_month_year",
    date_format(col("tpep_pickup_datetime"), "MM-yyyy")
).withColumn(
    "pickup_time_of_day",
    date_format(col("tpep_pickup_datetime"), "HH")
).withColumn(
    "pickup_time_of_day_range",
    when(
    hour(col("tpep_pickup_datetime")).between(0,4), "00:00-04:59")
    .when(
        hour(col("tpep_pickup_datetime")).between(5,9), "05:00-09:59")
    .when(
        hour(col("tpep_pickup_datetime")).between(10,14), "10:00-14:59")
    .when(
        hour(col("tpep_pickup_datetime")).between(15,19), "15:00-19:59")
    .when(
        hour(col("tpep_pickup_datetime")).between(20,23), "20:00-23:59")
            .otherwise("unknown")

)

StatementMeta(, cfdf57b2-2fef-4ffc-a7d4-eea667839a1d, 11, Finished, Available, Finished)

**Add Metadata source_name**

In [10]:
df_yellow = df_yellow.withColumn(
    "source_name", lit("src_yellowtaxi")
)
    

StatementMeta(, cfdf57b2-2fef-4ffc-a7d4-eea667839a1d, 12, Finished, Available, Finished)

**Timestamp data quality check**

In [11]:
df_yellow = df_yellow.withColumn(
    "date_dq_flag",
    when(
        (col("tpep_pickup_datetime").isNull()) |
        (col("tpep_dropoff_datetime").isNull()) |
        (col("tpep_pickup_datetime") > col("tpep_dropoff_datetime")),
        lit("Y")
    ).otherwise(lit("N"))
)

StatementMeta(, cfdf57b2-2fef-4ffc-a7d4-eea667839a1d, 13, Finished, Available, Finished)

**Convert Columns to the appropriate data type**

In [12]:
from pyspark.sql import functions as F
# Columns to convert to INT
cols_to_int = [
    "VendorID",
    "RatecodeID",
    "PULocationID",
    "DOLocationID",
    "passenger_count",
    "payment_type",
    "trip_type"
]

df_yellow = df_yellow.select(
    *[
        F.regexp_replace(F.col(c), r"\.0$", "").cast("int").alias(c)
        if c in cols_to_int else F.col(c)
        for c in df_yellow.columns
    ]
)

# Columns to convert to DECIMAL
cols_to_decimal = [
    "trip_distance",
    "fare_amount",
    "extra",
    "mta_tax",
    "tip_amount",
    "tolls_amount",
    "improvement_surcharge",
    "total_amount",
    "congestion_surcharge",
    "cbd_congestion_fee",
]

df_yellow = df_yellow.select(
    *[
        F.regexp_replace(F.col(c), r"\.0$", "").cast("decimal(10,2)").alias(c)
        if c in cols_to_decimal else F.col(c)
        for c in df_yellow.columns
    ]
)


StatementMeta(, cfdf57b2-2fef-4ffc-a7d4-eea667839a1d, 14, Finished, Available, Finished)

**Drop unwanted columns**

In [13]:
cols_to_drop = ["store_and_fwd_flag", "Airport_fee"]

df_yellow = df_yellow.drop(*cols_to_drop)

StatementMeta(, cfdf57b2-2fef-4ffc-a7d4-eea667839a1d, 15, Finished, Available, Finished)

**Deriving new columns**

In [14]:
df_yellow = df_yellow.withColumn(
    "pickup_time_bucket",
    when(
        hour(col("tpep_pickup_datetime")).between(6,9), "Morning Peak")
        .when(
            hour(col("tpep_pickup_datetime")).between(16,19), "Evening Peak")
            .otherwise("Off Peak")
)


StatementMeta(, cfdf57b2-2fef-4ffc-a7d4-eea667839a1d, 16, Finished, Available, Finished)

**Creating a business Key in the table**

In [15]:
df_yellow = df_yellow.withColumn(
    "pickup_time_string",
    date_format(col("tpep_pickup_datetime"), "yyMMddHHmmss")
)


df_yellow = df_yellow.withColumn(
    "dropoff_time_string",
    date_format(col("tpep_dropoff_datetime"), "yyMMddHHmmss")
)

StatementMeta(, cfdf57b2-2fef-4ffc-a7d4-eea667839a1d, 17, Finished, Available, Finished)

In [16]:
df_yellow = df_yellow.withColumn(
    "trip_business_key",
    concat_ws(
        "-",
        col("source_name"),
        col("PULocationID"),
        col("DOLocationID"),
        col("pickup_time_string"),
        col("dropoff_time_string"),
        col("VendorID"),
        col("payment_type")

    )
)

StatementMeta(, cfdf57b2-2fef-4ffc-a7d4-eea667839a1d, 18, Finished, Available, Finished)

**Creating a trip natural key**

In [17]:
df_yellow = df_yellow.withColumn(
    "trip_natural_key",
    sha2(col("trip_business_key"), 256)
)

StatementMeta(, cfdf57b2-2fef-4ffc-a7d4-eea667839a1d, 19, Finished, Available, Finished)

**Removing invalid rows**

In [18]:
print("before data quality filter", df_yellow.count())

df_yellow = df_yellow.filter(
    (col("invalid_numeric_flag") == "N") &
    (col("invalid_location_flag") == "N") &
    (col("date_dq_flag") == "N") &
    (col("date_dq_flag") == "N")
)

print("after data quanlity filter", df_yellow.count())

StatementMeta(, cfdf57b2-2fef-4ffc-a7d4-eea667839a1d, 20, Finished, Available, Finished)

before data quality filter 4591845
after data quanlity filter 4281737


**Removing duplicates using the trip_natural_key**

In [19]:
print("before dropping duplicates", df_yellow.count())
df_yellow = df_yellow.dropDuplicates(["trip_natural_key"])
print("after dropping duplicates", df_yellow.count())

StatementMeta(, cfdf57b2-2fef-4ffc-a7d4-eea667839a1d, 21, Finished, Available, Finished)

before dropping duplicates 4281737
after dropping duplicates 4281730


In [21]:
df_yellow.columns

StatementMeta(, cfdf57b2-2fef-4ffc-a7d4-eea667839a1d, 23, Finished, Available, Finished)

['VendorID',
 'tpep_pickup_datetime',
 'tpep_dropoff_datetime',
 'passenger_count',
 'trip_distance',
 'RatecodeID',
 'PULocationID',
 'DOLocationID',
 'payment_type',
 'fare_amount',
 'extra',
 'mta_tax',
 'tip_amount',
 'tolls_amount',
 'improvement_surcharge',
 'total_amount',
 'congestion_surcharge',
 'cbd_congestion_fee',
 'processing_date',
 'invalid_numeric_flag',
 'invalid_location_flag',
 'vendor_name',
 'payment_type_name',
 'final_rate_code_name',
 'trip_duration_minutes',
 'pickup_day_of_week',
 'pickup_week_day_type',
 'pickup_month',
 'pickup_month_year',
 'pickup_time_of_day',
 'pickup_time_of_day_range',
 'source_name',
 'date_dq_flag',
 'pickup_time_bucket',
 'pickup_time_string',
 'dropoff_time_string',
 'trip_business_key',
 'trip_natural_key']

**Writing to Silver Table**

In [22]:
df_yellow.createOrReplaceTempView('t_silver_yellow_new_data')

StatementMeta(, cfdf57b2-2fef-4ffc-a7d4-eea667839a1d, 24, Finished, Available, Finished)

**Creating a Silver table**

In [23]:
%%sql
CREATE TABLE IF NOT EXISTS yellowtaxi_silver (
   vendor_id int,
   pickup_datetime timestamp,
   dropoff_datetime timestamp,
   passenger_count int,
   trip_distance decimal(10,2),
   rate_code_id int,
   pickup_location_id int,
   dropoff_location_id int,
   payment_type int, 
   fare_amount decimal(10,2),
   extra decimal(10,2),
   mta_tax decimal(10,2),
   tip_amount decimal(10,2),
   tolls_amount decimal(10,2),
   improvement_surcharge decimal(10,2),
   total_amount decimal(10,2),
   congestion_surcharge decimal(10,2),
   cbd_congestion_fee decimal(10,2),
   processing_date date,
   invalid_numeric_flag string,
   invalid_location_flag string,
   vendor_name string,
   payment_type_name string,
   final_rate_code_name string,
   trip_duration_minutes int,
   pickup_day_of_week string,
   pickup_week_day_type string,
   pickup_month string,
   pickup_month_year string,
   pickup_time_of_day int,
   pickup_time_of_day_range string,
   source_name string,
   date_dq_flag string,
   pickup_time_bucket string,
   pickup_time_string string,
   dropoff_time_string string,
   trip_business_key string,
   trip_natural_key string
)

USING DELTA
PARTITIONED BY (processing_date);

StatementMeta(, cfdf57b2-2fef-4ffc-a7d4-eea667839a1d, 25, Finished, Available, Finished)

<Spark SQL result set with 0 rows and 0 fields>

In [24]:
sql_statement = f"""MERGE INTO yellowtaxi_silver as target
                    USING t_silver_yellow_new_data as source
                    on target.trip_natural_key = source.trip_natural_key

                    WHEN MATCHED THEN
                    UPDATE set
                    target.vendor_id = source.VendorID,
                    target.pickup_datetime = source.tpep_pickup_datetime,
                    target.dropoff_datetime = source.tpep_dropoff_datetime,
                    target.passenger_count = source.passenger_count,
                    target.trip_distance = source.trip_distance,
                    target.rate_code_id = source.RatecodeID,
                    target.pickup_location_id = source.PULocationID,
                    target.dropoff_location_id = source.DOLocationID,
                    target.payment_type = source.payment_type,
                    target.fare_amount = source.fare_amount,
                    target.extra = source.extra,
                    target.mta_tax = source.mta_tax,
                    target.tip_amount = source.tip_amount,
                    target.tolls_amount = source.tolls_amount,
                    target.improvement_surcharge = source.improvement_surcharge,
                    target.total_amount = source.total_amount,
                    target.congestion_surcharge = source.congestion_surcharge,
                    target.cbd_congestion_fee = source.cbd_congestion_fee,
                    target.processing_date = source.processing_date,
                    target.invalid_numeric_flag = source.invalid_numeric_flag,
                    target.invalid_location_flag = source.invalid_location_flag,
                    target.vendor_name = source.vendor_name,
                    target.payment_type_name = source.payment_type_name,
                    target.final_rate_code_name = source.final_rate_code_name,
                    target.trip_duration_minutes = source.trip_duration_minutes,
                    target.pickup_day_of_week = source.pickup_day_of_week,
                    target.pickup_week_day_type = source.pickup_week_day_type,
                    target.pickup_month = source.pickup_month,
                    target.pickup_month_year = source.pickup_month_year,
                    target.pickup_time_of_day = source.pickup_time_of_day,
                    target.pickup_time_of_day_range = source.pickup_time_of_day_range,
                    target.source_name = source.source_name,
                    target.date_dq_flag = source.date_dq_flag,
                    target.pickup_time_bucket = source.pickup_time_bucket,
                    target.pickup_time_string = source.pickup_time_string,
                    target.dropoff_time_string = source.dropoff_time_string,
                    target.trip_business_key = source.trip_business_key,
                    target.trip_natural_key = source.trip_natural_key
                    WHEN NOT MATCHED THEN
                         INSERT (
                                  vendor_id,
                                  pickup_datetime,
                                  dropoff_datetime,
                                  passenger_count,
                                  trip_distance,
                                  rate_code_id,
                                  pickup_location_id,
                                  dropoff_location_id,
                                  payment_type,
                                  fare_amount,
                                  extra,
                                  mta_tax,
                                  tip_amount,
                                  tolls_amount,
                                  improvement_surcharge,
                                  total_amount,
                                  congestion_surcharge,
                                  cbd_congestion_fee,
                                  processing_date,
                                  invalid_numeric_flag,
                                  invalid_location_flag,
                                  vendor_name,
                                  payment_type_name,
                                  final_rate_code_name,
                                  trip_duration_minutes,
                                  pickup_day_of_week,
                                  pickup_week_day_type,
                                  pickup_month,
                                  pickup_month_year,
                                  pickup_time_of_day,
                                  pickup_time_of_day_range,
                                  source_name,
                                  date_dq_flag,
                                  pickup_time_bucket,
                                  pickup_time_string,
                                  dropoff_time_string,
                                  trip_business_key,
                                  trip_natural_key
                                  )
                                  VALUES
                                  (
                                  source.VendorID,
                                  source.tpep_pickup_datetime,
                                  source.tpep_dropoff_datetime,
                                  source.passenger_count,
                                  source.trip_distance,
                                  source.RatecodeID,
                                  source.PULocationID,
                                  source.DOLocationID,
                                  source.payment_type,
                                  source.fare_amount,
                                  source.extra,
                                  source.mta_tax,
                                  source.tip_amount,
                                  source.tolls_amount,
                                  source.improvement_surcharge,
                                  source.total_amount,
                                  source.congestion_surcharge,
                                  source.cbd_congestion_fee,
                                  source.processing_date,
                                  source.invalid_numeric_flag,
                                  source.invalid_location_flag,
                                  source.vendor_name,
                                  source.payment_type_name,
                                  source.final_rate_code_name,
                                  source.trip_duration_minutes,
                                  source.pickup_day_of_week,
                                  source.pickup_week_day_type,
                                  source.pickup_month,
                                  source.pickup_month_year,
                                  source.pickup_time_of_day,
                                  pickup_time_of_day_range,
                                  source.source_name,
                                  source.date_dq_flag,
                                  source.pickup_time_bucket,
                                  source.pickup_time_string,
                                  source.dropoff_time_string,
                                  source.trip_business_key,
                                  source.trip_natural_key
                                  )"""

spark.sql(sql_statement).show()


StatementMeta(, cfdf57b2-2fef-4ffc-a7d4-eea667839a1d, 26, Finished, Available, Finished)

+-----------------+----------------+----------------+-----------------+
|num_affected_rows|num_updated_rows|num_deleted_rows|num_inserted_rows|
+-----------------+----------------+----------------+-----------------+
|          4281730|         4281730|               0|                0|
+-----------------+----------------+----------------+-----------------+



In [25]:
df = spark.sql("SELECT * FROM NycTaxiProject.yellowtaxi_silver")
df.count()

StatementMeta(, cfdf57b2-2fef-4ffc-a7d4-eea667839a1d, 27, Finished, Available, Finished)

26425613