## 🔄 2. Gold Layer Data Transformations & Aggregations
 
This is where I:

- Clean, enrich, join, and model the data
- Build fact and dimension tables
- Apply business logic

In [15]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import input_file_name, regexp_extract, col,monotonically_increasing_id, col, lit, udf
from pyspark.sql import Row
from pyspark.sql.types import StructType, StructField, StringType, IntegerType
import requests
import json
import random

# Initialize SparkSession
spark = SparkSession.builder.appName("transformation").getOrCreate()

StatementMeta(, ab7c308b-5460-4aa7-a362-3d93bb8ea94c, 17, Finished, Available, Finished)

### ✅ Step 1 Ingest combined table

In [2]:
from pyspark.sql.functions import col, max, to_date

# Read the entire delta directory
df = spark.read.parquet("Files/processed_output/delta/")

# Option 1: Show the latest OrderDate if available
#df_all.select(max("OrderDate")).show()

# Option 2: Filter to latest OrderDate rows
#max_date = df_all.select(max("OrderDate")).collect()[0][0]
#df = df_all.filter(col("OrderDate") == max_date)


# View or process
#display(df)

#df = spark.read.parquet("Files/processed_output/delta/part-00000-95db561c-2379-4473-8c31-ec44b78fbc67-c000.snappy.parquet")
# df now is a Spark DataFrame containing parquet data from "Files/processed_output/delta/part-00000-95db561c-2379-4473-8c31-ec44b78fbc67-c000.snappy.parquet".
#df.show(2)

StatementMeta(, ab7c308b-5460-4aa7-a362-3d93bb8ea94c, 4, Finished, Available, Finished)

## ✅ Step 2 Create Dimension Tables
Create dimension tables by selecting distinct values from relevant columns and adding a new surrogate key for each.

customer_dim using have CustomerID and CustomerName (or similar fields) in my dataset:

#### ✅ Step a: Customer_Dim Table

In [3]:
customer_dim = df.selectExpr("CustomerId","Firstname","Lastname","DateOfBirth","Email", "Postcode","Address","PhoneNumber","Gender").distinct() \
                 .withColumn("customer_key", monotonically_increasing_id())

# Reorder columns to have key first
customer_dim = customer_dim.select("customer_key", "CustomerId","Firstname","Lastname","DateOfBirth","Email","Postcode","Address","PhoneNumber","Gender")
#customer_dim.show(5)
#customer_dim.printSchema()

StatementMeta(, ab7c308b-5460-4aa7-a362-3d93bb8ea94c, 5, Finished, Available, Finished)

#### ✅ Step b: Product_Dim Table
Using Product ID and Product Name:


In [4]:
product_dim = df.selectExpr("ProductId", "ProductName","ProductPrice").distinct() \
                .withColumn("product_key", monotonically_increasing_id())

product_dim = product_dim.select("product_key", "ProductId", "ProductName","ProductPrice")
#product_dim.show(5)
#product_dim.printSchema()

StatementMeta(, ab7c308b-5460-4aa7-a362-3d93bb8ea94c, 6, Finished, Available, Finished)

#### ✅ Step c: Account_Dim Table

Using AccountId, AccountType and other similar fields 

In [5]:
account_dim = df.selectExpr("AccountId", "AccountType","AccountCategory").distinct() \
                .withColumn("account_key", monotonically_increasing_id())

account_dim = account_dim.select("account_key", "AccountId", "AccountType","AccountCategory")
#account_dim.show(5)
#account_dim.printSchema()

StatementMeta(, ab7c308b-5460-4aa7-a362-3d93bb8ea94c, 7, Finished, Available, Finished)

#### ✅ Step d: Payment_Dim Tables

Using Payment Status and Payment Payment Source

In [6]:
payment_status_dim = df.selectExpr("PaymentStatus").distinct() \
                        .withColumn("payment_status_key", monotonically_increasing_id())

payment_status_dim = payment_status_dim.select("payment_status_key", "PaymentStatus")


payment_source_dim = df.selectExpr('PaymentId','PaymentDate','PaymentSource').distinct() \
                        .withColumn("payment_source_key", monotonically_increasing_id())

payment_source_dim = payment_source_dim.select("payment_source_key",'PaymentId','PaymentDate',"PaymentSource")
#payment_source_dim.show(2)
#payment_source_dim.printSchema()

StatementMeta(, ab7c308b-5460-4aa7-a362-3d93bb8ea94c, 8, Finished, Available, Finished)

#### ✅ Step e: Location_Dim Using External API

Steps

1. Extract unique IP addresses from my dataset. 

2. Call a suitable IP geolocation API for each unique IP address. A good option is IP-API.com, which is free for non-commercial use and does not require an API key, but is limited to 45 requests per minute from an IP address. For larger datasets, you might need to manage the rate limit carefully (e.g., introduce delays, use a paid plan, or consider an offline IP database).

3. Structure the results into a DataFrame and add a surrogate key.

In [18]:
# Define lists for fallback values
random_region = ["region1", "region2", "region3"]
random_countries = ["country1", "country2", "country3"]
random_cities = ["city1", "city2", "city3"]

# Define my target IP because the free-tier gives limit of 45 requests/mins
priority_ip = "162.120.187.148"

# Collect other distinct IPs
all_ips = [
    row["LaptopIP_Address"]
    for row in df.select("LaptopIP_Address").distinct().collect()
    if row["LaptopIP_Address"]
]

# Remove my IP from the list if it exists
other_ips = [ip for ip in all_ips if ip != priority_ip]

# Reorder IP list: put your IP first if it's in the list
unique_ips = [priority_ip] + [ip for ip in all_ips if ip != priority_ip]

# Track errors
error_messages = []

# Function to call IP-API
def get_location_from_ip(ip):
    country = region = city = None
    try:
        response = requests.get(f"http://ip-api.com/json/{ip}?fields=status,message,country,regionName,city", timeout=5)
        data = response.json()

        if data.get("status") == "success":
            country = data.get("country")
            region = data.get("regionName")
            city = data.get("city")
        else:
            error_messages.append(f"{ip} failed: {data.get('message')}")
    except Exception as e:
        error_messages.append(f"{ip} error: {str(e)}")

    # Assign fallbacks if needed
    if not country:
        country = random.choice(random_countries)
    if not region:
        region = random.choice(random_region)
    if not city:
        city = random.choice(random_cities)

    return (ip, country, region, city)

# Fetch all location data
location_data = [get_location_from_ip(ip) for ip in unique_ips]

# Create Spark DataFrame
schema = StructType([
    StructField("LaptopIP_Address", StringType(), True),
    StructField("country", StringType(), True),
    StructField("region", StringType(), True),
    StructField("city", StringType(), True)
])

location_dim = spark.createDataFrame(location_data, schema=schema) \
                    .withColumn("location_key", monotonically_increasing_id()) \
                    .select("location_key", "LaptopIP_Address", "country", "region", "city")

StatementMeta(, ab7c308b-5460-4aa7-a362-3d93bb8ea94c, 20, Finished, Available, Finished)

#### ✅ Step f: Create The Fact Table

The fact table will contain the measures and foreign keys to the dimension tables. You'll join your original DataFrame with each of the dimension tables to replace the natural keys with the newly generated surrogate keys.

In [20]:
# Start with the original DataFrame
fact_df = df

# Join with customer_dim
fact_df = fact_df.join(customer_dim,
                       fact_df["CustomerId"] == customer_dim["CustomerId"],
                       "left") \
                 .drop( "CustomerId","Firstname","Lastname","DateOfBirth","Email","Postcode","Address","PhoneNumber","Gender") # Drop natural keys

# Join with product_dim
fact_df = fact_df.join(product_dim,
                       fact_df["ProductId"] == product_dim["ProductId"],
                       "left") \
                 .drop( "ProductId", "ProductName","ProductPrice")

# Join with account_dim
fact_df = fact_df.join(account_dim,
                       fact_df["AccountId"] == account_dim["AccountId"],
                       "left") \
                 .drop( "AccountId", "AccountType","AccountCategory")

# Join with payment_source_dim
fact_df = fact_df.join(payment_source_dim,
                       fact_df["PaymentId"] == payment_source_dim["PaymentId"],
                       "left") \
                 .drop('PaymentId','PaymentDate',"PaymentSource")

# Join with payment_status_dim
fact_df = fact_df.join(payment_status_dim,
                       fact_df["PaymentStatus"] == payment_status_dim["PaymentStatus"],
                       "left") \
                 .drop("PaymentStatus")

# Join with location_dim
fact_df = fact_df.join(location_dim,
                       fact_df["LaptopIP_Address"] == location_dim["LaptopIP_Address"],
                       "left") \
                 .drop("LaptopIP_Address", "country", "region", "city") # Drop original IP and fetched location details

# Select relevant columns for the fact table, including measures and foreign keys
# Replace `Your_Measure_Column_1`, `Your_Measure_Column_2` with actual measure columns from your dataset
fact_table = fact_df.selectExpr(
    "customer_key",
    "CustomerSegment",
    "product_key",
    "account_key",
    "payment_source_key",
    "payment_status_key",
    "location_key",
    "OrderDate",
    "OrderDetailsId",
    "OrderId",
    "OrderStatus",
    "ReferralSource",
    "ShippingAddress",
    "UserAgent",
    "DeviceType",
    "TransactionTimestamp",
    "OrderTotalPrice",
    "Quantity" 
    # Add other relevant columns from your original dataset that are measures or additional attributes
).withColumn("fact_id", monotonically_increasing_id()) # Add a unique ID for the fact table

StatementMeta(, ab7c308b-5460-4aa7-a362-3d93bb8ea94c, 22, Finished, Available, Finished)

### ✅ Step 2: Export all Data Tables as Tables
Exporting as tables is very important because Semantic models can only be built on Tables

In [22]:
# Save dimension tables
customer_dim.write.mode("overwrite").saveAsTable("customer_dim")
product_dim.write.mode("overwrite").saveAsTable("product_dim")
account_dim.write.mode("overwrite").saveAsTable("account_dim")
payment_source_dim.write.mode("overwrite").saveAsTable("payment_source_dim")
payment_status_dim.write.mode("overwrite").saveAsTable("payment_status_dim")
location_dim.write.mode("overwrite").saveAsTable("location_dim")

# Save fact table
fact_table.write.mode("overwrite").saveAsTable("fact_table")

StatementMeta(, ab7c308b-5460-4aa7-a362-3d93bb8ea94c, 24, Finished, Available, Finished)