### Customers

In [0]:
from pyspark.sql.functions import *
from pyspark.sql.types import *

In [0]:
import os, sys

repo_root = os.getcwd() 
if repo_root not in sys.path:
    sys.path.append(repo_root)

In [0]:
df_cust = spark.read.table("pysparkdbt.bronze.customers")

In [0]:
display(df_cust)

In [0]:
df_cust = df_cust.withColumn("domain", split(col("email"), "@").getItem(1))
display(df_cust)

In [0]:
# Replace all non digits letters in the phone number column with nothing.
df_cust = df_cust.withColumn(
    "phone_number",
    regexp_replace(col("phone_number"), "[^0-9]", "")
)
display(df_cust)

In [0]:
# Created full_name column by concatinating first_name and last_name, dropping the original after.
df_cust = df_cust.withColumn("full_name", concat_ws((" "), col("first_name"), col("last_name")))
display(df_cust)
df_cust = df_cust.drop("first_name", "last_name")

In [0]:
from utilities.custom_utils import Transformations
t = Transformations()
df_transformed = t.deduplicate(df_cust,["customer_id"],"last_updated_timestamp", "ingestion_timestamp")

display(df_transformed)

In [0]:
df_transformed = t.process_timestamp(df_transformed)
display(df_transformed)

In [0]:
from pyspark.sql.functions import col

table = "pysparkdbt.silver.customers"
cdc_col = "last_updated_timestamp"
t.upsert(
        df=df_transformed,
        key_cols=["customer_id"],
        target_table=table,
        cdc=cdc_col
    )

In [0]:
display(df_transformed) # Rerun and should constantly be 200 if no new records have been inserted.

### Drivers Table

In [0]:
df_driver = spark.read.table("pysparkdbt.bronze.drivers")
display(df_driver)

In [0]:
df_driver = df_driver.withColumn("full_name", concat_ws((" "), col("first_name"), col("last_name")))
display(df_driver)
df_driver = df_driver.drop("first_name", "last_name")

In [0]:
# Replace all non digits letters in the phone number column with nothing.
df_driver = df_driver.withColumn(
    "phone_number",
    regexp_replace(col("phone_number"), "[^0-9]", "")
)
display(df_driver)

In [0]:
# Checking for duplicates

dup_primary = t.duplicates_report(df_driver, ["driver_id"])
dup_temporal = t.duplicates_report(df_driver, ["driver_id", "last_updated_timestamp"])
display(dup_primary)
display(dup_temporal)

In [0]:
nulls = t.nulls_report(df_driver, ["driver_id", "last_updated_timestamp", "phone_number", "vehicle_id", "driver_rating", "city"	])
display(nulls)

In [0]:
# Split the data into valid and rejected by assuring non nulls
split = t.require_non_null(df_driver, ["driver_id" , "last_updated_timestamp"])
valid = split["valid"]
rejected = split["rejected"]

In [0]:
# Filters by valid ranges on the driver ratings [0.0,5.0]
valid = t.filter_by_ranges(valid, {"driver_rating": (0.0, 5.0)}, inclusive=True)["valid"]

In [0]:
# Deduplication by last updated timestamp and if needed ingestion timestamp as a tie breaker
df_silver = t.deduplicate_by_recency(
    df = valid,
    keys = ["driver_id"],
    cdc = "last_updated_timestamp",
    tie_breaker = "ingestion_timestamp"
)

In [0]:
display(df_silver)

In [0]:
t.upsert(
    df = df_silver,
    key_cols = ["driver_id"],
    target_table = "pysparkdbt.silver.drivers",
    cdc = "last_updated_timestamp"
)

### Vehicles

In [0]:
df_vehicles = spark.read.table("pysparkdbt.bronze.vehicles")
display(df_vehicles)

In [0]:
# Clean Strings
import pyspark.sql.functions as F
df_vehicles = t.sanitize_string(df_vehicles, ["license_plate"], pattern=r"[^A-Za-z0-9]", replacement="")
df_vehicles = df_vehicles.withColumn("license_plate", F.upper(F.col("license_plate")))
display(df_vehicles)


In [0]:
df_vehicles = t.sanitize_string(df_vehicles, ["make"], pattern=r",", replacement=" ")
df_vehicles = t.sanitize_string(df_vehicles, ["make"], pattern=r"[^A-Za-z0-9 .&'\-]", replacement="")
df_vehicles = df_vehicles.withColumn("make", F.regexp_replace(F.col("make"), r"\s+", " "))
df_vehicles = df_vehicles.withColumn("make", F.initcap(F.trim(F.col("make"))))
display(df_vehicles)

In [0]:
df_vehicles = t.sanitize_string(df_vehicles, ["model"], pattern=r"[^A-Za-z0-9 \-]", replacement="")
df_vehicles = df_vehicles.withColumn("model", F.regexp_replace(F.col("model"), r"\s+", " "))
df_vehicles = df_vehicles.withColumn("model", F.trim(F.col("model")))
display(df_vehicles)

In [0]:
%sql
SELECT DISTINCT vehicle_type FROM pysparkdbt.bronze.vehicles

In [0]:
valid_types = F.array([F.lit(x) for x in ["Sedan","Suv","Hatchback","Van","Luxury"]])
df_vehicles = df_vehicles.withColumn("vehicle_type", F.initcap(F.trim(F.col("vehicle_type"))))

display(df_vehicles)

In [0]:
from datetime import datetime
max_year = datetime.now().year
res_year = t.filter_by_ranges(df_vehicles, {"year": (1980.0, float(max_year))}, inclusive=True)
df_vehicles = res_year["valid"]
df_vrejected = res_year["rejected"]
display(df_vrejected)
display(df_vehicles)

In [0]:
duplicated = t.duplicates_report(df_vehicles, ["license_plate", "vehicle_id"])
display(duplicated)

nulls = t.nulls_report(df_vehicles, ["vehicle_id","license_plate","year","vehicle_type"])
display(nulls)

In [0]:
split = t.require_non_null(df_vehicles, ["vehicle_id"])
valid = split["valid"]
rejected = split["rejected"]

display(valid)
display(rejected)

In [0]:
# Deduplication on the vehicle id key by recency used last updated timestmamp or the ingestion timestamp
df_silver = t.deduplicate_by_recency(
    df=valid,
    keys=["vehicle_id"],
    cdc="last_updated_timestamp",
    tie_breaker="ingestion_timestamp"
) if "last_updated_timestamp" in valid.columns else t.deduplicate_by_recency(valid, ["vehicle_id"], cdc="vehicle_id")

In [0]:
t.upsert(
    df=df_silver,
    key_cols=["vehicle_id"],
    target_table="pysparkdbt.silver.vehicles",
    cdc="last_updated_timestamp" if "last_updated_timestamp" in df_silver.columns else "vehicle_id"
)

In [0]:
%sql
SELECT COUNT(*) FROM pysparkdbt.silver.vehicles

### Locations

In [0]:
df_locations = spark.read.table("pysparkdbt.bronze.locations")
display(df_locations)

In [0]:
df_locations = t.sanitize_string(df_locations,["city", "state", "country"], pattern =r"[^A-Za-z0-9 \-]", replacement = "")

In [0]:
cols = ["city", "state", "country"]
exprs = {c: F.trim(F.regexp_replace(F.col(c), r"\s+", " ")) for c in cols}
df_locations = df_locations.withColumns(exprs)

In [0]:
# Dict with valid ranges
ranges = {
    "latitude": (-90.0, 90.0),
    "longitude": (-180.0, 180.0)
}

# Filtering the df by valid ranges of latitude and longitude
res = t.filter_by_ranges(df_locations, ranges, inclusive=True)

df_locations = res["valid"]
df_rejected = res["rejected"]

display(df_locations)
display(df_rejected.limit(5))

In [0]:
duplicated = t.duplicates_report(df_locations, ["location_id", "city"])
display(duplicated)

nulls = t.nulls_report(df_locations, ["location_id","city","state","country", "latitude", "longitude"])
display(nulls)

In [0]:
split = t.require_non_null(df_locations, ["city", "state", "country", "latitude", "longitude" , "location_id"])
df_locations = split["valid"]
rejected = split["rejected"]

display(df_locations)
display(rejected)

In [0]:
# Deduplication on the vehicle id key by recency used last updated timestmamp or the ingestion timestamp
df_locations = t.deduplicate_by_recency(
    df=df_locations,
    keys=["location_id","city"],
    cdc="last_updated_timestamp",
    tie_breaker="ingestion_timestamp"
)

In [0]:
# Filters by valid ranges of latitude and longitude
df_locations = t.filter_by_ranges(df_locations, {"latitude": (-90.0, 90.0)}, inclusive=True)["valid"]
df_locations = t.filter_by_ranges(df_locations, {"longitude": (-180.0, 180.0)}, inclusive=True)["valid"]

In [0]:
display(df_locations)

In [0]:
t.upsert(
    df=df_locations,
    key_cols=["location_id"],
    target_table="pysparkdbt.silver.locations",
    cdc="last_updated_timestamp" if "last_updated_timestamp" in df_locations.columns else "location_id"
)

### Payments
Silver transformations for the Payments table

In [0]:
df_payments = spark.read.table("pysparkdbt.bronze.payments")
display(df_payments)

In [0]:
%sql
SELECT DISTINCT payment_status
FROM pysparkdbt.bronze.payments

In [0]:
df_payments = df_payments.withColumn("payment_method", F.upper(F.trim(F.col("payment_method"))))
df_payments = df_payments.withColumn("payment_status", F.upper(F.trim(F.col("payment_status"))))

In [0]:
dup_primary = t.duplicates_report(df_payments, ["payment_id"])
dup_temporal = t.duplicates_report(df_payments, ["payment_id", "last_updated_timestamp"])
display(dup_primary)
display(dup_temporal)

nulls = t.nulls_report(df_payments, ["payment_id","customer_id","trip_id","transaction_time","last_updated_timestamp","amount","payment_method","payment_status"])
display(nulls)


In [0]:
split_req = t.require_non_null(df_payments, ["payment_id","customer_id","trip_id","last_updated_timestamp","amount","payment_method","payment_status"])
df_payments = split_req["valid"]
rejected_pay = split_req["rejected"]


In [0]:
# allowed statuses
allowed_status = ["SUCCESS", "FAILED", "PENDING"]

# normalize first (if not already)
df_payments = df_payments.withColumn("payment_status", F.upper(F.trim(F.col("payment_status"))))

# filter to keep only valid statuses
df_valid = df_payments.filter(F.col("payment_status").isin(allowed_status))

# optionally collect invalid ones for auditing
df_invalid = df_payments.filter(~F.col("payment_status").isin(allowed_status))

display(df_valid)
display(df_invalid)


In [0]:
df_silver = t.deduplicate_by_recency(
    df=df_valid,
    keys=["payment_id"],
    cdc="last_updated_timestamp",
    tie_breaker="ingestion_timestamp"
)


In [0]:
t.upsert(
    df=df_silver,
    key_cols=["payment_id"],
    target_table="pysparkdbt.silver.payments",
    cdc="last_updated_timestamp"
)


### Trips


In [0]:
df_trips = spark.read.table("pysparkdbt.bronze.trips")
display(df_trips)

In [0]:
df_trips = df_trips.withColumn("trip_id", F.col("trip_id").cast(LongType()))
df_trips = df_trips.withColumn("driver_id", F.col("driver_id").cast(LongType()))
df_trips = df_trips.withColumn("customer_id", F.col("customer_id").cast(LongType()))
df_trips = df_trips.withColumn("vehicle_id", F.col("vehicle_id").cast(LongType()))
df_trips = df_trips.withColumn("distance_km", F.col("distance_km").cast(DoubleType()))
df_trips = df_trips.withColumn("fare_amount", F.col("fare_amount").cast(DoubleType()))
df_trips = df_trips.withColumn("trip_start_time", F.col("trip_start_time").cast(TimestampType()))
df_trips = df_trips.withColumn("trip_end_time", F.col("trip_end_time").cast(TimestampType()))
df_trips = df_trips.withColumn("last_updated_timestamp", F.col("last_updated_timestamp").cast(TimestampType()))
df_trips = df_trips.withColumn("ingestion_timestamp", F.col("ingestion_timestamp").cast(TimestampType()))

In [0]:
df_trips = t.sanitize_string(
    df_trips,
    ["start_location", "end_location"],
    pattern=r"[^A-Za-z0-9 \-]",
    replacement=""
)

df_trips = df_trips.withColumns({
    "start_location": F.trim(F.regexp_replace(F.col("start_location"), r"\s+", " ")),
    "end_location":   F.trim(F.regexp_replace(F.col("end_location"),   r"\s+", " "))
})

df_trips = df_trips.withColumn("payment_method", F.upper(F.trim(F.col("payment_method"))))
df_trips = df_trips.withColumn("trip_status", F.upper(F.trim(F.col("trip_status"))))
display(df_trips)

In [0]:
allowed_methods = ["CASH","CARD","WALLET"]
allowed_status  = ["COMPLETED","CANCELLED","ONGOING"]

df_trips_valid_enum = df_trips.filter(
    F.col("payment_method").isin(allowed_methods) & F.col("trip_status").isin(allowed_status)
)
df_trips_rejected_enum = df_trips.filter(
    ~F.col("payment_method").isin(allowed_methods) | ~F.col("trip_status").isin(allowed_status)
)

In [0]:
req_cols = ["trip_id","driver_id","customer_id","vehicle_id","trip_start_time","trip_end_time","last_updated_timestamp","distance_km","fare_amount"]
split_req = t.require_non_null(df_trips_valid_enum, req_cols)
df_trips_valid_req = split_req["valid"]
df_trips_rejected_req = split_req["rejected"]

df_trips_valid_req = df_trips_valid_req.withColumn(
    "duration_min",
    (F.col("trip_end_time").cast("long") - F.col("trip_start_time").cast("long")) / F.lit(60.0)
)

time_ok = (F.col("duration_min") >= F.lit(0.0)) & (F.col("duration_min") <= F.lit(360.0))
df_trips_valid_time = df_trips_valid_req.filter(time_ok)
df_trips_rejected_time = df_trips_valid_req.filter(~time_ok)


In [0]:
ranges = {
    "distance_km": (0.0, 1000.0),
    "fare_amount": (0.0, 10000.0)
}
rng_split = t.filter_by_ranges(df_trips_valid_time, ranges, inclusive=True)
df_trips_valid_rng = rng_split["valid"]
df_trips_rejected_rng = rng_split["rejected"]


In [0]:
df_trips_silver = t.deduplicate_by_recency(
    df=df_trips_valid_rng,
    keys=["trip_id"],
    cdc="last_updated_timestamp",
    tie_breaker="ingestion_timestamp"
)

In [0]:
t.upsert(
    df=df_trips_silver,
    key_cols=["trip_id"],
    target_table="pysparkdbt.silver.trips",
    cdc="last_updated_timestamp"
)