In [None]:
#Task 1 Data Ingestion
from pyspark.sql import SparkSession
import os
import logging
spark = SparkSession.builder.appName("Customer Data Ingestion").getOrCreate()
file_path = "/content/sample_data/customer_transactions.csv"

logging.basicConfig(filename='/content/sample_data/logs/ingestion_log.log', level=logging.INFO)

if os.path.exists(file_path):
    customer_df = spark.read.format("csv").option("header", "true").load(file_path)

    customer_df.write.format("delta").mode("overwrite").save("/content/sample_data/delta/customer_raw")
    logging.info("Customer data ingestion completed successfully.")
else:
    logging.error(f"File {file_path} does not exist.")


In [None]:
#Task 2 Data Cleaning
from pyspark.sql.functions import col

customer_df = spark.read.format("delta").load("/content/sample_data/delta/customer_raw")
cleaned_df = customer_df.dropDuplicates()
cleaned_df = cleaned_df.na.fill({"TransactionAmount": 0})


cleaned_df.write.format("delta").mode("overwrite").save("/content/sample_data/delta/customer_cleaned")
print("Customer data cleaning completed successfully.")


In [None]:
#Task 3 Data Aggregation
from pyspark.sql.functions import sum
cleaned_df = spark.read.format("delta").load("/content/sample_data/delta/customer_cleaned")

aggregated_df = cleaned_df.groupBy("ProductCategory").agg(
    sum("TransactionAmount").alias("TotalTransactionAmount")
)

aggregated_df.write.format("delta").mode("overwrite").save("/content/sample_data/delta/customer_aggregated")
print("Customer data aggregation completed successfully.")


In [None]:
#Task 4 Pipeline Creation
import subprocess
import logging

logging.basicConfig(filename='/content/sample_data/logs/pipeline_log.log', level=logging.INFO)
notebooks = [
    "/content/sample_data/delta/customer_raw",
    "/content/sample_data/delta/customer_cleaned",
    "/content/sample_data/delta/customer_aggregated"
]
for notebook in notebooks:
    try:
        subprocess.run(["databricks", "workspace", "import", notebook], check=True)
        logging.info(f"Successfully executed {notebook}")
    except subprocess.CalledProcessError as e:
        logging.error(f"Error occurred while executing {notebook}: {e}")


In [None]:
#Task 5

cleaned_df = spark.read.format("delta").load("/content/sample_data/delta/customer_cleaned")
aggregated_df = spark.read.format("delta").load("/content/sample_data/delta/customer_aggregated")

total_transactions = cleaned_df.agg(sum("TransactionAmount").alias("TotalTransactions")).collect()[0]["TotalTransactions"]

total_aggregated_transactions = aggregated_df.agg(sum("TotalTransactionAmount").alias("TotalAggregatedTransactions")).collect()[0]["TotalAggregatedTransactions"]

if total_transactions == total_aggregated_transactions:
    print(f"Data validation passed: {total_transactions} == {total_aggregated_transactions}")
else:
    print(f"Data validation failed: {total_transactions} != {total_aggregated_transactions}")
