# **Silver Layer Scripts**

### Loading ZedaFleet Bronze Lakehouse Tables

In [1]:
df_customers = spark.read.table("ZedaFleet_Bronze.customers")
df_customer_interactions = spark.read.table("ZedaFleet_Bronze.customer_interactions")
df_sales = spark.read.table("ZedaFleet_Bronze.sales")
df_rentals = spark.read.table("ZedaFleet_Bronze.rentals")
df_maintenance = spark.read.table("ZedaFleet_Bronze.vehicle_maintenance")
df_vehicles = spark.read.table("ZedaFleet_Bronze.vehicles")
df_contracts = spark.read.table("ZedaFleet_Bronze.leasing_contracts")
df_fleet_telemetry = spark.read.table("ZedaFleet_Bronze.fleet_telemetry")

StatementMeta(, b89d9a14-c458-4f52-90c1-e15127579734, 3, Finished, Available, Finished)

### Import Libraries

In [2]:
from pyspark.sql.functions import *
from pyspark.sql.types import *
from pyspark.sql.functions import col, coalesce, lit, round as spark_round, trim

StatementMeta(, b89d9a14-c458-4f52-90c1-e15127579734, 4, Finished, Available, Finished)

### Transformation and load ZedaFleet Lakehouse Tables

In [None]:
# Customers 
customers_df = df_customers.select(
    regexp_extract(col("crm_customer_id"), r"_([A-Za-z0-9]+)", 1).alias("customer_id"),
    trim(col("name")).alias("customer_name"),
    lower(trim(col("email"))).alias("email"),
    col("phone"),
    trim(col("address")).alias("address"),
    trim(col("city")).alias("city"),
    trim(col("country")).alias("country"),
    col("customer_type"),
    col("registration_date"),
    coalesce(col("corporate_discount"), lit(0)).alias("corporate_discount"),
    coalesce(col("loyalty_points"), lit(0)).alias("loyalty_points"),
    trim(col("preferred_branch")).alias("preferred_branch"),
    col("last_interaction_date"),
    when(upper(trim(col("marketing_consent"))) == "Y", "YES")
        .when(upper(trim(col("marketing_consent"))) == "N", "NO")
        .otherwise(None).alias("marketing_consent"),
    col("customer_segment")
)

df.write.format("delta")\
            .mode("overwrite")\
            .saveAsTable("customers")

In [4]:
# Customer Interactions
customer_interactions_df = df_customer_interactions.select(
    regexp_extract(col("crm_interaction_id"), r"_([A-Za-z0-9]+)", 1).alias("interaction_id"),
    regexp_extract(col("crm_customer_id"), r"_([A-Za-z0-9]+)", 1).alias("customer_id"),
    col("interaction_date").cast("timestamp"),
    col("interaction_type"),
    col("interaction_channel"),
    col("subject_matter"),
    col("outcome"),
    trim(col("assigned_agent")).alias("assigned_agent"),
    when(upper(trim(col("followup_required"))) == "Y", "YES")
        .when(upper(trim(col("followup_required"))) == "N", "NO")
        .otherwise(None).alias("followup_required"),
    col("followup_date")
)

customer_interactions_df.write.format("delta")\
            .mode("overwrite")\
            .saveAsTable("customer_interactions")

StatementMeta(, b89d9a14-c458-4f52-90c1-e15127579734, 6, Finished, Available, Finished)

In [10]:
# Car Sales
sales_df = df_sales.select(
    regexp_extract(col("erp_sale_id"), r"_([A-Za-z0-9]+)", 1).alias("sale_id"),
    regexp_extract(col("erp_vehicle_id"), r"_([A-Za-z0-9]+)", 1).alias("vehicle_id"),
    col("sale_date"),
    col("sale_price"),
    col("original_cost"),
    col("sale_channel"),
    col("buyer_type"),
    col("days_on_market"),
    col("profit_margin"),
    col("dealer_discount"),
    trim(col("sales_person")).alias("sales_person"),
    col("finance_option")
)

sales_df.write.format("delta")\
            .mode("overwrite")\
            .saveAsTable("sales")

StatementMeta(, 3e8fa50b-b006-4c0d-98f8-293c341470aa, 11, Finished, Available, Finished)

In [11]:
# Vehicles
vehicles_df = df_vehicles.select(
    regexp_extract(col("erp_vehicle_id"), r"_([A-Za-z0-9]+)", 1).alias("vehicle_id"),
    col("registration"),
    trim(col("make")).alias("make"),
    col("model"),
    col("model_year"),
    col("vehicle_type"),
    col("segment"),
    trim(col("branch")).alias("branch"),
    col("acquisition_date"),
    col("purchase_price"),
    col("current_value"),
    col("status"),
    coalesce(col("odometer"), lit(0)).alias("odometer"),
    col("warranty_expiry"),
    col("insurance_policy")
)

vehicles_df.write.format("delta")\
            .mode("overwrite")\
            .saveAsTable("vehicles")

StatementMeta(, 3e8fa50b-b006-4c0d-98f8-293c341470aa, 12, Finished, Available, Finished)

In [12]:
# Leasing Contracts
contracts_df = df_contracts.select(
    regexp_extract(col("erp_contract_id"), r"_([A-Za-z0-9]+)", 1).alias("contract_id"),
    regexp_extract(col("crm_customer_id"), r"_([A-Za-z0-9]+)", 1).alias("customer_id"),
    regexp_extract(col("erp_vehicle_id"), r"_([A-Za-z0-9]+)", 1).alias("vehicle_id"),
    col("start_dt"),
    col("end_dt"),
    col("monthly_payment"),
    trim(col("branch")).alias("branch"),
    col("contract_type"),
    col("km_limit"),
    when(upper(trim(col("maintenance_included"))) == 'Y', 'YES')
     .when(upper(trim(col("maintenance_included"))) == 'N', 'NO')
     .otherwise(None).alias("maintenance_included"),
    when(upper(trim(col("insurance_included"))) == 'Y', 'YES')
     .when(upper(trim(col("insurance_included"))) == 'N', 'NO')
     .otherwise(None).alias("insurance_included"),
    col("payment_method"),
    col("contract_status")
)

contracts_df.write.format("delta")\
            .mode("overwrite")\
            .saveAsTable("contracts")

StatementMeta(, 3e8fa50b-b006-4c0d-98f8-293c341470aa, 13, Finished, Available, Finished)

In [13]:
# Vehicle Maintenance
maintenance_df = df_maintenance.select(
    regexp_extract(col("erp_maintenance_id"), r"_([A-Za-z0-9]+)", 1).alias("maintenance_id"),
    regexp_extract(col("erp_vehicle_id"), r"_([A-Za-z0-9]+)", 1).alias("vehicle_id"),
    col("service_date"),
    col("service_type"),
    coalesce(col("odometer"), lit(0)).alias("odometer"),
    col("cost"),
    trim(col("branch")).alias("branch"),
    col("service_provider"),
    coalesce(col("downtime_days"), lit(0)).alias("downtime_days"),
    col("main_description"),
    when(upper(trim(col("warranty_claim"))) == 'Y', 'YES')
     .when(upper(trim(col("warranty_claim"))) == 'N', 'NO')
     .otherwise(None).alias("warranty_claim")
)

maintenance_df.write.format("delta")\
            .mode("overwrite")\
            .saveAsTable("maintenance")

StatementMeta(, 3e8fa50b-b006-4c0d-98f8-293c341470aa, 14, Finished, Available, Finished)

In [14]:
# Rentals
rentals_df = df_rentals.select(
    regexp_extract(col("erp_rental_id"), r"_([A-Za-z0-9]+)", 1).alias("rental_id"),
    regexp_extract(col("crm_customer_id"), r"_([A-Za-z0-9]+)", 1).alias("customer_id"),
    regexp_extract(col("erp_vehicle_id"), r"_([A-Za-z0-9]+)", 1).alias("vehicle_id"),
    trim(col("branch")).alias("branch"),
    col("segment"),
    col("duration_days"),
    col("rate_per_day"),
    col("total_amount"),
    col("revenue_channel"),
    col("rental_status"),
    col("insurance_option"),
    col("payment_status"),
    col("invoice_number")
)

rentals_df.write.format("delta")\
            .mode("overwrite")\
            .saveAsTable("rentals")

StatementMeta(, 3e8fa50b-b006-4c0d-98f8-293c341470aa, 15, Finished, Available, Finished)

In [15]:
# Fleet Telemetry
telemetry_df = df_fleet_telemetry.select(
    regexp_extract(col("iot_telemetry_id"), r"_([A-Za-z0-9]+)", 1).alias("telemetry_id"),
    regexp_extract(col("erp_vehicle_id"), r"_([A-Za-z0-9]+)", 1).alias("vehicle_id"),
    col("record_date"),
    coalesce(col("odometer"), lit(0)).alias("odometer"),
    spark_round(col("fuel_level"), 2).alias("fuel_level"),
    spark_round(col("fuel_consumption"), 2).alias("fuel_consumption"),
    spark_round(col("location_lat"), 6).alias("location_lat"),
    spark_round(col("location_long"), 6).alias("location_long"),
    trim(col("branch")).alias("branch"),
    col("engine_status"),
    col("speed"),
    spark_round(col("battery_voltage"), 2).alias("battery_voltage"),
    spark_round(col("tire_pressure"), 2).alias("tire_pressure")
)

telemetry_df.write.format("delta")\
            .mode("overwrite")\
            .saveAsTable("telemetry")

StatementMeta(, 3e8fa50b-b006-4c0d-98f8-293c341470aa, 16, Finished, Available, Finished)