In [0]:

from datetime import datetime
import logging


log_file_path = "abfss://silver@logisticsandtransport.dfs.core.windows.net/logs/processing_log.txt"
logger = logging.getLogger("BronzeDataPipeline")
logger.setLevel(logging.INFO)

class ADLSHandler(logging.Handler):
    def emit(self, record):
        from pyspark.dbutils import DBUtils
        dbutils = DBUtils(spark)
        message = self.format(record)
        timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
        full_message = f"[{timestamp}] {message}\n"
        try:
            existing_logs = dbutils.fs.head(log_file_path, 1048576) if dbutils.fs.exists(log_file_path) else ""
            updated_logs = existing_logs.rstrip() + "\\n" + full_message
            dbutils.fs.put(log_file_path, updated_logs, overwrite=True)
        except Exception as e:
            print(f"Failed to write log: {e}")

log_handler = ADLSHandler()
formatter = logging.Formatter('%(levelname)s - %(message)s')
log_handler.setFormatter(formatter)
logger.addHandler(log_handler)
logger.info("Logging initialized.")


Failed to write log: 'FSHandler' object has no attribute 'exists'


In [0]:
try:
    #importing necessary packages
    from pyspark.sql import*
    from pyspark.sql.functions import*
    from pyspark.sql.types import*
    logger.info("Packages imported successfully.")
except Exception as e:
    logger.error(f"Error: {str(e)}")
    raise

In [0]:
try:
    spark.conf.set(
        "fs.azure.account.key.logisticsandtransport.dfs.core.windows.net",
        "AztFZBkLKu6aGzdDi4r7aT7UA7G4UF5oSkFFTwzGjWNM79CRvggV3PuK5iAyZGCbYobManm6J5Ny+AStqinx7A==")
    logger.info("Access key Verified.")
except Exception as e:
    logger.error(f"Error: {str(e)}")
    raise

In [0]:
try:
    #setting up the paths of both bronze and silver data containers 
    bronze_path="abfss://bronze@logisticsandtransport.dfs.core.windows.net/"
    silver_path="abfss://silver@logisticsandtransport.dfs.core.windows.net/"
    logger.info("Paths set successfully.")
except Exception as e:
    logger.error(f"Error: {str(e)}")
    raise

In [0]:
try:
    #load all datasets from bronze path
    delivery_bronze=spark.read.format("parquet").options(header='true',inferSchema='true').load(f"{bronze_path}delivery_data_bronze.parquet")
    driver_bronze=spark.read.format("parquet").options(header='true',inferSchema='true').load(f"{bronze_path}driver_data_bronze.parquet")
    route_bronze=spark.read.format("parquet").options(header='true',inferSchema='true').load(f"{bronze_path}route_data_bronze.parquet")
    vehicle_bronze=spark.read.format("parquet").options(header='true',inferSchema='true').load(f"{bronze_path}vehicle_data_bronze.parquet")
    logger.info("Bronze dataset loaded.")
except Exception as e:
    logger.error(f"Error: {str(e)}")
    raise

>Type Conversions for columns that need changes

In [0]:
try:
    delivery_bronze = delivery_bronze .withColumn("delivery_time", col("delivery_time").cast(FloatType())) \
        .withColumn("distance_covered", col("distance_covered").cast(FloatType()))
    logger.info("Type Cast successfull.")
except Exception as e:
    logger.error(f"Error: {str(e)}")
    raise

In [0]:
try:
    driver_bronze = driver_bronze.withColumn("rating", col("rating").cast(FloatType()))
    logger.info("Type Cast successfull.")
except Exception as e:
    logger.error(f"Error: {str(e)}")
    raise

In [0]:
try:
    route_bronze = route_bronze.withColumn("distance", col("distance").cast(FloatType()))
    logger.info("Type Cast successfull.")
except Exception as e:
    logger.error(f"Error: {str(e)}")
    raise

In [0]:
try:
    vehicle_bronze = vehicle_bronze.withColumn("fuel_efficiency", col("fuel_efficiency").cast(FloatType()))
    logger.info("Type Cast successfull.")
except Exception as e:
    logger.error(f"Error: {str(e)}")
    raise

>Data Cleaning

In [0]:
try:
    #Drop Null Values
    delivery_bronze = delivery_bronze.dropna(subset=["delivery_id", "vehicle_id", "route_id", "driver_id","delivery_time","distance_covered","delivery_status"])
    vehicle_bronze = vehicle_bronze.dropna(subset=["vehicle_id","vehicle_type","fuel_efficiency"])
    route_bronze = route_bronze.dropna(subset=["route_id","start_location","end_location","distance"])
    driver_bronze = driver_bronze.dropna(subset=["driver_id", "driver_name","rating"])
    logger.info("Null values dropped.")
except Exception as e:
    logger.error(f"Error: {str(e)}")
    raise

In [0]:
try:
    #creating route_name column
    route_bronze = route_bronze.withColumn("route_name", concat(col("start_location"), lit(" to "), col("end_location")))
    logger.info("route_name created successfully.")
except Exception as e:
    logger.error(f"Error: {str(e)}")
    raise

In [0]:
try:
    #joining datasets to create the silver layer
    silver_layer_data = delivery_bronze \
        .join(vehicle_bronze, on="vehicle_id", how="inner") \
        .join(route_bronze, on="route_id", how="inner") \
        .join(driver_bronze, on="driver_id", how="inner") \
        .select(
            delivery_bronze["delivery_id"],
            delivery_bronze["vehicle_id"],
            vehicle_bronze["vehicle_type"],
            driver_bronze["driver_name"],
            driver_bronze["rating"],
            route_bronze["route_name"],
            route_bronze["distance"],
            delivery_bronze["delivery_time"],
            delivery_bronze["distance_covered"],
            delivery_bronze["delivery_status"],
            (col("distance_covered") / col("fuel_efficiency")).alias("fuel_consumed"),
            current_timestamp().alias("processed_date")
        )
    
    silver_layer_data.display()
    logger.info("silver dataset created successfully.")
except Exception as e:
    logger.error(f"Error: {str(e)}")
    raise

delivery_id,vehicle_id,vehicle_type,driver_name,rating,route_name,distance,delivery_time,distance_covered,delivery_status,fuel_consumed,processed_date
5,383,Truck,Joseph Wilson,4.2,New Rogerton to North Jennifer,693.0,12.0,84.86,Failed,6.5478395339947,2025-04-04T19:05:08.614636Z
6,313,Van,Elizabeth Stout,1.4,North Susan to Bellport,372.0,12.0,500.0,Completed,37.09198939016063,2025-04-04T19:05:08.614636Z
52,405,Car,Mr. Patrick Adams III,1.4,North John to Robinbury,121.0,4.72,500.0,Completed,50.709940913780585,2025-04-04T19:05:08.614636Z
93,311,Van,Mrs. Lisa Clark,4.9,South Tylerchester to Luisland,470.0,12.0,500.0,Failed,57.4052810344618,2025-04-04T19:05:08.614636Z
130,100,Truck,Alexander Marsh,3.8,Joshuabury to Nicolestad,898.0,12.0,500.0,Failed,62.111799770573214,2025-04-04T19:05:08.614636Z
145,256,Truck,Darlene Turner,1.2,Haasland to East Scottville,60.0,5.84,141.12,Failed,11.289599609375,2025-04-04T19:05:08.614636Z
152,302,Van,Albert Lynch,2.2,West Robert to South Andrew,763.0,3.38,147.51,Completed,14.021861985564344,2025-04-04T19:05:08.614636Z
166,8,Car,Michael Bell,1.0,East Edwinfort to Lake Josephland,812.0,1.48,84.71,Failed,6.233259612461569,2025-04-04T19:05:08.614636Z
173,247,Bus,Christopher Hahn,4.0,Vegamouth to Port Dawnshire,151.0,4.32,500.0,Completed,65.96306134994488,2025-04-04T19:05:08.614636Z
175,371,Car,Colleen Russell,1.8,South Rachel to Brettton,214.0,4.72,162.58,Failed,17.787745702857425,2025-04-04T19:05:08.614636Z


In [0]:
silver_layer_data.columns

['delivery_id',
 'vehicle_id',
 'vehicle_type',
 'driver_name',
 'rating',
 'route_name',
 'distance',
 'delivery_time',
 'distance_covered',
 'delivery_status',
 'fuel_consumed',
 'processed_date']

In [0]:
silver_layer_data.printSchema()  

root
 |-- delivery_id: integer (nullable = true)
 |-- vehicle_id: integer (nullable = true)
 |-- vehicle_type: string (nullable = true)
 |-- driver_name: string (nullable = true)
 |-- rating: float (nullable = true)
 |-- route_name: string (nullable = true)
 |-- distance: float (nullable = true)
 |-- delivery_time: float (nullable = true)
 |-- distance_covered: float (nullable = true)
 |-- delivery_status: string (nullable = true)
 |-- fuel_consumed: double (nullable = true)
 |-- processed_date: timestamp (nullable = false)



In [0]:
try:
    silver_layer_data.write.mode("overwrite").parquet(f"{silver_path}delivery_data_silver.parquet")
    logger.info("parquet file generated successfully.")
except Exception as e:
    logger.error(f"Error: {str(e)}")
    raise

In [0]:
try:
    jdbc_url = "jdbc:mysql://serlog.mysql.database.azure.com:3306/silver_db"
    connection_properties = {
        "user": "login", 
        "password": "amal@gmail.2002",
        "driver": "com.mysql.cj.jdbc.Driver"
    }
    logger.info("database connected successfully.")
except Exception as e:
    logger.error(f"Error: {str(e)}")
    raise

In [0]:
try:
    silver_layer_data.write \
        .format("jdbc") \
        .option("url", jdbc_url) \
        .option("dbtable", "delivery_data_silver") \
        .option("user", "login") \
        .option("password", "amal@gmail.2002") \
        .option("driver", "com.mysql.cj.jdbc.Driver") \
        .mode("overwrite") \
        .save()
    logger.info("table created successfully.")
except Exception as e:
    logger.error(f"Error: {str(e)}")
    raise