In [0]:

from datetime import datetime
import logging

# Set up logging
log_file_path = "abfss://bronze@genbiz123.dfs.core.windows.net/logs/processing_log_bronze.log"
logger = logging.getLogger("RawDataPipeline")
logger.setLevel(logging.INFO)

class ADLSHandler(logging.Handler):
    def emit(self, record):
        try:
            from pyspark.dbutils import DBUtils
            dbutils = DBUtils(spark)
            message = self.format(record)
            timestamp = datetime.now().strftime("%Y-%m-%d %H:%M:%S")
            full_message = f"[{timestamp}] {message}\n"

            try:
                existing_logs = dbutils.fs.head(log_file_path, 1048576)
            except Exception:
                existing_logs = ""

            updated_logs = existing_logs + full_message
            dbutils.fs.put(log_file_path, updated_logs, overwrite=True)

        except Exception as e:
            print(f"Failed to write log: {e}")

log_handler = ADLSHandler()
formatter = logging.Formatter('%(levelname)s - %(message)s')
log_handler.setFormatter(formatter)

if not logger.handlers:
    logger.addHandler(log_handler)

logger.info("Logging initialized.")


Wrote 229 bytes.


In [0]:
try:
    #importing necessary packages
    from pyspark.sql import*
    from pyspark.sql.functions import*
    import logging
    import os
    logger.info("Library imported successfully.")
except Exception as e:
    logger.error(f"Error in Cell 1: {str(e)}")
    raise


Wrote 289 bytes.


In [0]:
# Define file paths
try:
    source_path = "abfss://raw@blobbizproject.dfs.core.windows.net/"
    destination_base_path = "abfss://bronze@genbiz123.dfs.core.windows.net/"
    logger.info("paths set successfully.")
except Exception as e:
    logger.error(f"Error in Cell 4: {str(e)}")
    raise


Wrote 342 bytes.


In [0]:
try:
    spark.conf.set(
    "fs.azure.account.key.blobbizproject.dfs.core.windows.net",
    "5E4WuM/5pxbR5Jf7K3GXrD68mYIf1pm4eQvYSX0hunq8iMEpTh7n285i20HBmAyM0U+GGjBmL+Df+ASt4S4EZQ==")

    spark.conf.set(
    "fs.azure.account.key.genbiz123.dfs.core.windows.net",
    "sv0PmGaLbFlPWbpuUOMhs0EjBV1H+FdY4q92f2xR2CCd4ZwUxYh7gjtlqdLJ4i2JukVV/DX3YzDu+AStouL1lw==")
    logger.info("Acess key verified successfully.")
except Exception as e:
    logger.error(f"Error in Cell 3: {str(e)}")
    raise

Wrote 404 bytes.


In [0]:
# Process each CSV file and save as Parquet with audit columns
# 1. Delivery Data
try:
    try:
        delivery_df = spark.read.format('csv').option('header','true').option('inferSchema','true').load(f"{source_path}delivery_data.csv")
        delivery_bronze = delivery_df \
        .withColumn("ingestion_date", current_timestamp()) \
        .withColumn("source_file", lit("delivery_data.csv"))
        delivery_bronze.write.mode("overwrite").parquet(f"{destination_base_path}delivery_data_bronze.parquet")
        logger.info("Read operation successful.")
    except Exception as e:
        logger.error(f"Error in reading data: {e}")
    logger.info("Parquet generated successfully.")
except Exception as e:
    logger.error(f"Error in Cell 5: {str(e)}")
    raise

Wrote 460 bytes.
Wrote 521 bytes.


In [0]:
# 2. Vehicle Data
try:
    try:  

        vehicle_df = spark.read.option("header", "true").csv(f"{source_path}vehicle_data.csv")
        vehicle_bronze = vehicle_df \
        .withColumn("ingestion_date", current_timestamp()) \
        .withColumn("source_file", lit("vehicle_data.csv"))
        vehicle_bronze.write.mode("overwrite").parquet(f"{destination_base_path}vehicle_data_bronze.parquet")
        logger.info("Read operation successful.")
    except Exception as e:
        logger.error(f"Error in reading data: {e}")
    logger.info("Parquet generated successfully.")
except Exception as e:
    logger.error(f"Error in Cell 8: {str(e)}")
    raise

Wrote 577 bytes.
Wrote 638 bytes.


In [0]:
# 3. Route Data
try:
    try:
       route_df = spark.read.option("header", "true").csv(f"{source_path}route_data.csv")
       route_bronze = route_df \
       .withColumn("ingestion_date", current_timestamp()) \
       .withColumn("source_file", lit("route_data.csv"))
       route_bronze.write.mode("overwrite").parquet(f"{destination_base_path}route_data_bronze.parquet")
       logger.info("Read operation successful.")
    except Exception as e:
        logger.error(f"Error in reading data: {e}")
    logger.info("Parquet generated successfully.")
except Exception as e:
    logger.error(f"Error in Cell 7: {str(e)}")
    raise

Wrote 694 bytes.
Wrote 755 bytes.


In [0]:
# 4. Driver Data
try:
    try:
        driver_df = spark.read.option("header", "true").csv(f"{source_path}driver_data.csv")
        driver_bronze = driver_df \
        .withColumn("ingestion_date", current_timestamp()) \
        .withColumn("source_file", lit("driver_data.csv"))
        driver_bronze.write.mode("overwrite").parquet(f"{destination_base_path}driver_data_bronze.parquet")
        logger.info("Read operation successful.")
    except Exception as e:
        logger.error(f"Error in reading data: {e}")
    logger.info("Parquet generated successfully.")
except Exception as e:
    logger.error(f"Error in Cell 6: {str(e)}")
    raise


Wrote 811 bytes.
Wrote 872 bytes.
