In [None]:
# Cell 1: Configuration
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, sum, count, avg, to_date

# Environment-specific parameters (will be substituted during deployment)
storage_account = "{{storage_account}}"
data_lake_path = "{{data_lake_path}}"
log_level = "{{log_level}}"

print(f"Storage Account: {storage_account}")
print(f"Data Lake Path: {data_lake_path}")
print(f"Log Level: {log_level}")

In [None]:
# Cell 2: Load Raw Sales Data
raw_sales_path = f"{data_lake_path}raw/sales.csv"

df_sales = spark.read.format("csv") \
    .option("header", "true") \
    .option("inferSchema", "true") \
    .load(raw_sales_path)

print(f"Loaded {df_sales.count()} sales records")
df_sales.printSchema()
df_sales.show(5)

In [None]:
# Cell 3: Data Transformation
# Clean and transform the data
df_cleaned = df_sales \
    .filter(col("amount") > 0) \
    .filter(col("quantity") > 0) \
    .withColumn("sale_date", to_date(col("date"), "yyyy-MM-dd")) \
    .withColumn("unit_price", col("amount") / col("quantity"))

print(f"Cleaned data: {df_cleaned.count()} records")
df_cleaned.show(5)

In [None]:
# Cell 4: Create Aggregations
# Regional sales summary
df_regional_summary = df_cleaned.groupBy("region", "sale_date") \
    .agg(
        sum("amount").alias("total_sales"),
        count("*").alias("transaction_count"),
        avg("amount").alias("avg_transaction_value")
    ) \
    .orderBy("region", "sale_date")

print("Regional Sales Summary:")
df_regional_summary.show(10)

In [None]:
# Cell 5: Product category analysis
df_category_summary = df_cleaned.groupBy("category") \
    .agg(
        sum("amount").alias("total_sales"),
        sum("quantity").alias("total_quantity"),
        avg("unit_price").alias("avg_unit_price")
    ) \
    .orderBy(col("total_sales").desc())

print("Product Category Summary:")
df_category_summary.show()

In [None]:
# Cell 6: Write Processed Data
# Save regional summary
processed_path = f"{data_lake_path}processed/sales_regional_summary"
df_regional_summary.write \
    .format("delta") \
    .mode("overwrite") \
    .save(processed_path)

print(f"Regional summary saved to: {processed_path}")

# Save category summary
category_path = f"{data_lake_path}processed/sales_category_summary"
df_category_summary.write \
    .format("delta") \
    .mode("overwrite") \
    .save(category_path)

print(f"Category summary saved to: {category_path}")

In [None]:
# Cell 7: Data Quality Checks
total_amount = df_cleaned.agg(sum("amount")).collect()[0][0]
record_count = df_cleaned.count()
null_count = df_cleaned.filter(col("amount").isNull()).count()

print(f"Data Quality Report:")
print(f"  Total Records: {record_count}")
print(f"  Total Sales Amount: ${total_amount:,.2f}")
print(f"  Null Amount Records: {null_count}")
print(f"  Data Quality Score: {((record_count - null_count) / record_count * 100):.2f}%")

# Assert quality thresholds
assert null_count == 0, "Found null values in amount column"
assert record_count > 0, "No records to process"

print("\nâœ… All data quality checks passed!")