In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *

## Creating the Spark Session

In [3]:
spark = SparkSession.builder \
    .appName("Ecommerce Silver Layer Transformation") \
    .config("spark.sql.legacy.timeParserPolicy", "LEGACY") \
    .getOrCreate()

print(f"Spark Version: {spark.version}")
print("Spark session created!")

Spark Version: 3.5.1
Spark session created!


## Loading Data

In [4]:
df_raw = spark.read.csv("data.csv", header=True, inferSchema=True)
print("="*60)
print("RAW DATA LOADED")
print("="*60)
print(f"\nTotal rows: {df_raw.count():,}")
print("\nSchema:")
df_raw.printSchema()
print("\nFirst 3 rows:")
df_raw.show(3, truncate=False)

RAW DATA LOADED

Total rows: 541,909

Schema:
root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = true)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: string (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- Country: string (nullable = true)


First 3 rows:
+---------+---------+----------------------------------+--------+--------------+---------+----------+--------------+
|InvoiceNo|StockCode|Description                       |Quantity|InvoiceDate   |UnitPrice|CustomerID|Country       |
+---------+---------+----------------------------------+--------+--------------+---------+----------+--------------+
|536365   |85123A   |WHITE HANGING HEART T-LIGHT HOLDER|6       |12/1/2010 8:26|2.55     |17850     |United Kingdom|
|536365   |71053    |WHITE METAL LANTERN               |6       |12/1/2010 8:26|3.39     |17850     |United Kingdom|
|536365   |84406

In [5]:
# Cell 4: Silver Layer Transformation

print("="*60)
print("APPLYING TRANSFORMATIONS")
print("="*60)

# Step 1: Remove bad records
print("\nStep 1: Removing bad records...")

# Remove internal adjustments (negative qty, no 'C', NULL customer)
df_clean = df_raw.filter(
    ~((col("Quantity") < 0) &
      (~col("InvoiceNo").startswith("C")) &
      (col("CustomerID").isNull()))
)
print(f"   Removed internal adjustments")

# Remove all bad prices (price <= 0)
df_clean = df_clean.filter(col("UnitPrice") > 0)
print(f"   Removed bad prices")

rows_removed = df_raw.count() - df_clean.count()
print(f"   Total rows removed: {rows_removed:,}")
print(f"   Rows remaining: {df_clean.count():,}")

# Step 2: Create customer_type column
print("\nStep 2: Creating customer_type column...")
df_clean = df_clean.withColumn(
    "customer_type",
    when(col("CustomerID").isNull(), "Guest")
    .otherwise("Registered")
)
print("   ✓ customer_type created")

# Step 3: Create is_return flag
print("\nStep 3: Creating is_return flag...")
df_clean = df_clean.withColumn(
    "is_return",
    when(col("InvoiceNo").startswith("C"), True)
    .otherwise(False)
)
print("   ✓ is_return flag created")

# Step 4: Fill NULL descriptions
print("\nStep 4: Filling NULL descriptions...")
df_clean = df_clean.fillna({"Description": "Unknown Product"})
print("   ✓ NULL descriptions filled")

# Step 5: Create TotalPrice column
print("\nStep 5: Creating TotalPrice column...")
df_clean = df_clean.withColumn(
    "TotalPrice",
    col("Quantity") * col("UnitPrice")
)
print("   ✓ TotalPrice created")

# Step 6: Parse InvoiceDate
print("\nStep 6: Parsing InvoiceDate...")
df_clean = df_clean.withColumn(
    "InvoiceDate_parsed",
    to_timestamp(col("InvoiceDate"), "M/d/yyyy H:mm")
)

df_clean = df_clean.withColumn("Year", year(col("InvoiceDate_parsed"))) \
                   .withColumn("Month", month(col("InvoiceDate_parsed"))) \
                   .withColumn("Day", dayofmonth(col("InvoiceDate_parsed"))) \
                   .withColumn("Hour", hour(col("InvoiceDate_parsed")))

print("   ✓ Date columns created (Year, Month, Day, Hour)")

print("\n" + "="*60)
print("TRANSFORMATION COMPLETE!")
print("="*60)
print(f"\nFinal row count: {df_clean.count():,}")

APPLYING TRANSFORMATIONS

Step 1: Removing bad records...
   Removed internal adjustments
   Removed bad prices
   Total rows removed: 2,517
   Rows remaining: 539,392

Step 2: Creating customer_type column...
   ✓ customer_type created

Step 3: Creating is_return flag...
   ✓ is_return flag created

Step 4: Filling NULL descriptions...
   ✓ NULL descriptions filled

Step 5: Creating TotalPrice column...
   ✓ TotalPrice created

Step 6: Parsing InvoiceDate...
   ✓ Date columns created (Year, Month, Day, Hour)

TRANSFORMATION COMPLETE!

Final row count: 539,392


In [6]:
# Cell 5: Verification

print("="*60)
print("SILVER LAYER - DATA QUALITY CHECK")
print("="*60)

# Check new columns exist
print("\nNew Schema:")
df_clean.printSchema()

# Check customer_type distribution
print("\nCustomer Type Distribution:")
df_clean.groupBy("customer_type").count().show()

# Check is_return distribution
print("\nReturn Flag Distribution:")
df_clean.groupBy("is_return").count().show()

# Check no NULL descriptions remain
null_desc = df_clean.filter(col("Description").isNull()).count()
print(f"\nNULL Descriptions remaining: {null_desc}")

# Check no bad prices remain
bad_prices = df_clean.filter(col("UnitPrice") <= 0).count()
print(f"Bad prices remaining: {bad_prices}")

# Sample of cleaned data
print("\nSample of cleaned data:")
df_clean.select("InvoiceNo", "Description", "Quantity", "UnitPrice",
                "TotalPrice", "customer_type", "is_return", "Year", "Month").show(5)

print("\n SILVER LAYER READY!")

SILVER LAYER - DATA QUALITY CHECK

New Schema:
root
 |-- InvoiceNo: string (nullable = true)
 |-- StockCode: string (nullable = true)
 |-- Description: string (nullable = false)
 |-- Quantity: integer (nullable = true)
 |-- InvoiceDate: string (nullable = true)
 |-- UnitPrice: double (nullable = true)
 |-- CustomerID: integer (nullable = true)
 |-- Country: string (nullable = true)
 |-- customer_type: string (nullable = false)
 |-- is_return: boolean (nullable = false)
 |-- TotalPrice: double (nullable = true)
 |-- InvoiceDate_parsed: timestamp (nullable = true)
 |-- Year: integer (nullable = true)
 |-- Month: integer (nullable = true)
 |-- Day: integer (nullable = true)
 |-- Hour: integer (nullable = true)


Customer Type Distribution:
+-------------+------+
|customer_type| count|
+-------------+------+
|        Guest|132603|
|   Registered|406789|
+-------------+------+


Return Flag Distribution:
+---------+------+
|is_return| count|
+---------+------+
|     true|  9288|
|    false|

In [7]:
print("="*60)
print("SAVING SILVER LAYER")
print("="*60)


output_path = "ecommerce_silver_layer.parquet"
df_clean.write.mode("overwrite").parquet(output_path)
print(f"\n Silver Layer saved to: {output_path}")
print(f"   Format: Parquet (compressed, columnar)")
print(f"   Rows: {df_clean.count():,}")
print(f"   Columns: {len(df_clean.columns)}")

# Also save as CSV for easy viewing
csv_path = "ecommerce_silver_layer.csv"
df_clean.coalesce(1).write.mode("overwrite") \
    .option("header", "true") \
    .csv(csv_path)

print(f"\n Also saved as CSV: {csv_path}")
print("\n" + "="*60)
print("SILVER LAYER COMPLETE!")
print("="*60)

SAVING SILVER LAYER

 Silver Layer saved to: ecommerce_silver_layer.parquet
   Format: Parquet (compressed, columnar)
   Rows: 539,392
   Columns: 16

 Also saved as CSV: ecommerce_silver_layer.csv

SILVER LAYER COMPLETE!


In [8]:
# Cell 7: Gold Layer - Business Analytics

print("="*60)
print("CREATING GOLD LAYER - BUSINESS ANALYTICS")
print("="*60)

# Filter out returns for sales analysis
df_sales = df_clean.filter(col("is_return") == False)

# 1. Sales by Country
print("\n1. Creating Sales by Country...")
sales_by_country = df_sales.groupBy("Country") \
    .agg(
        sum("TotalPrice").alias("TotalRevenue"),
        count("InvoiceNo").alias("OrderCount"),
        countDistinct("CustomerID").alias("UniqueCustomers")
    ) \
    .orderBy(col("TotalRevenue").desc())

print("   Top 5 Countries by Revenue:")
sales_by_country.show(5)

# 2. Top 10 Products
print("\n2. Creating Top Products...")
top_products = df_sales.groupBy("Description", "StockCode") \
    .agg(
        sum("Quantity").alias("TotalQuantitySold"),
        sum("TotalPrice").alias("TotalRevenue")
    ) \
    .orderBy(col("TotalRevenue").desc()) \
    .limit(10)

print("   Top 10 Products:")
top_products.show(10, truncate=False)

# 3. Monthly Revenue Trend
print("\n3. Creating Monthly Revenue Trend...")
monthly_revenue = df_sales.groupBy("Year", "Month") \
    .agg(
        sum("TotalPrice").alias("MonthlyRevenue"),
        count("InvoiceNo").alias("OrderCount")
    ) \
    .orderBy("Year", "Month")

print("   Monthly Revenue:")
monthly_revenue.show(12)

# Save Gold Layer tables
print("\n" + "="*60)
print("SAVING GOLD LAYER TABLES")
print("="*60)

sales_by_country.write.mode("overwrite").parquet("gold_sales_by_country")
top_products.write.mode("overwrite").parquet("gold_top_products")
monthly_revenue.write.mode("overwrite").parquet("gold_monthly_revenue")

print(" Gold Layer tables saved!")
print("\n BRONZE → SILVER → GOLD COMPLETE!")

CREATING GOLD LAYER - BUSINESS ANALYTICS

1. Creating Sales by Country...
   Top 5 Countries by Revenue:
+--------------+------------------+----------+---------------+
|       Country|      TotalRevenue|OrderCount|UniqueCustomers|
+--------------+------------------+----------+---------------+
|United Kingdom| 9025222.084000144|    485123|           3920|
|   Netherlands|285446.33999999927|      2359|              9|
|          EIRE| 283453.9599999998|      7890|              3|
|       Germany|228867.14000000007|      9040|             94|
|        France|         209715.11|      8407|             87|
+--------------+------------------+----------+---------------+
only showing top 5 rows


2. Creating Top Products...
   Top 10 Products:
+----------------------------------+---------+-----------------+------------------+
|Description                       |StockCode|TotalQuantitySold|TotalRevenue      |
+----------------------------------+---------+-----------------+------------------+
|D

In [13]:
# Cell 8: Package files for AWS upload

from google.colab import files
import os

print("Packaging files for AWS upload...")
print("\nDownloading Silver Layer (Parquet)...")

# For Parquet folders, we need to zip them
!zip -r silver_layer.zip ecommerce_silver_layer.parquet/
!zip -r gold_country.zip gold_sales_by_country/
!zip -r gold_products.zip gold_top_products/
!zip -r gold_monthly.zip gold_monthly_revenue/

print("\n Files ready for download!")
print("\nDownload these files:")
files.download('silver_layer.zip')
files.download('gold_country.zip')
files.download('gold_products.zip')
files.download('gold_monthly.zip')

Packaging files for AWS upload...

Downloading Silver Layer (Parquet)...
  adding: ecommerce_silver_layer.parquet/ (stored 0%)
  adding: ecommerce_silver_layer.parquet/.part-00000-ae09d19d-b9ab-47f9-b4b2-b62919b76a3c-c000.snappy.parquet.crc (stored 0%)
  adding: ecommerce_silver_layer.parquet/part-00001-ae09d19d-b9ab-47f9-b4b2-b62919b76a3c-c000.snappy.parquet (deflated 16%)
  adding: ecommerce_silver_layer.parquet/part-00000-ae09d19d-b9ab-47f9-b4b2-b62919b76a3c-c000.snappy.parquet (deflated 16%)
  adding: ecommerce_silver_layer.parquet/._SUCCESS.crc (stored 0%)
  adding: ecommerce_silver_layer.parquet/_SUCCESS (stored 0%)
  adding: ecommerce_silver_layer.parquet/.part-00001-ae09d19d-b9ab-47f9-b4b2-b62919b76a3c-c000.snappy.parquet.crc (stored 0%)
  adding: gold_sales_by_country/ (stored 0%)
  adding: gold_sales_by_country/.part-00000-d09ee803-04fb-4d46-9bca-771cf02285d6-c000.snappy.parquet.crc (stored 0%)
  adding: gold_sales_by_country/part-00000-d09ee803-04fb-4d46-9bca-771cf02285d6-c0

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

In [15]:
files.download('silver_layer.zip')


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

In [16]:
files.download('gold_country.zip')


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

In [17]:
files.download('gold_products.zip')


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

In [18]:
files.download('gold_monthly.zip')


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

In [1]:
!pip install plotly

import plotly.express as px
import plotly.graph_objects as go

print("✅ Ready to visualize!")

✅ Ready to visualize!


In [9]:
# Convert Spark DataFrames to Pandas for visualization
df_country = sales_by_country.toPandas()
df_products = top_products.toPandas()
df_monthly = monthly_revenue.toPandas()

print("✅ Data converted to Pandas!")
print(f"Countries: {len(df_country)}")
print(f"Products: {len(df_products)}")
print(f"Monthly: {len(df_monthly)}")

# Visualization 1: Top 10 Countries by Revenue
fig1 = px.bar(
    df_country.head(10),
    x='Country',
    y='TotalRevenue',
    title='🌍 Top 10 Countries by Revenue',
    labels={'TotalRevenue': 'Revenue ($)', 'Country': 'Country'},
    color='TotalRevenue',
    color_continuous_scale='Viridis',
    text='TotalRevenue'
)
fig1.update_traces(texttemplate='$%{text:,.0f}', textposition='outside')
fig1.update_layout(height=600, showlegend=False)
fig1.show()

# Visualization 2: Top 10 Products
fig2 = px.bar(
    df_products,
    x='TotalRevenue',
    y='Description',
    orientation='h',
    title='📦 Top 10 Products by Revenue',
    labels={'TotalRevenue': 'Revenue ($)', 'Description': 'Product'},
    color='TotalRevenue',
    color_continuous_scale='Blues'
)
fig2.update_layout(height=600, showlegend=False, yaxis={'categoryorder':'total ascending'})
fig2.show()

# Visualization 3: Monthly Revenue Trend
df_monthly['MonthYear'] = df_monthly['Year'].astype(str) + '-' + df_monthly['Month'].astype(str).str.zfill(2)

fig3 = px.line(
    df_monthly,
    x='MonthYear',
    y='MonthlyRevenue',
    title='📈 Monthly Revenue Trend',
    labels={'MonthlyRevenue': 'Revenue ($)', 'MonthYear': 'Month'},
    markers=True
)
fig3.update_layout(height=500)
fig3.show()

print("\n🎉 ALL VISUALIZATIONS COMPLETE!")

✅ Data converted to Pandas!
Countries: 38
Products: 10
Monthly: 13



🎉 ALL VISUALIZATIONS COMPLETE!


ValueError: 
Image export using the "kaleido" engine requires the kaleido package,
which can be installed using pip:
    $ pip install -U kaleido


In [11]:
!pip install -U kaleido

Collecting kaleido
  Downloading kaleido-1.1.0-py3-none-any.whl.metadata (5.6 kB)
Collecting choreographer>=1.0.10 (from kaleido)
  Downloading choreographer-1.2.0-py3-none-any.whl.metadata (6.8 kB)
Collecting logistro>=1.0.8 (from kaleido)
  Downloading logistro-2.0.0-py3-none-any.whl.metadata (3.9 kB)
Collecting pytest-timeout>=2.4.0 (from kaleido)
  Downloading pytest_timeout-2.4.0-py3-none-any.whl.metadata (20 kB)
Downloading kaleido-1.1.0-py3-none-any.whl (66 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m66.3/66.3 kB[0m [31m4.4 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading choreographer-1.2.0-py3-none-any.whl (56 kB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m56.4/56.4 kB[0m [31m6.5 MB/s[0m eta [36m0:00:00[0m
[?25hDownloading logistro-2.0.0-py3-none-any.whl (8.5 kB)
Downloading pytest_timeout-2.4.0-py3-none-any.whl (14 kB)
Installing collected packages: logistro, pytest-timeout, choreographer, kaleido
Successfully installed choreogr

In [18]:
# Create HTML files first
fig1.write_html("sales_by_country.html")
fig2.write_html("top_products.html")
fig3.write_html("monthly_revenue.html")

print("✅ HTML files created!")

# Now download them
from google.colab import files
files.download('sales_by_country.html')
files.download('top_products.html')
files.download('monthly_revenue.html')

print("✅ Downloaded!")

✅ HTML files created!


<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

<IPython.core.display.Javascript object>

✅ Downloaded!
