# Reverse ETL and Personalization

This notebook demonstrates a complete **Reverse ETL** pipeline that:
1. **Extracts** aggregated loyalty data from the Data Warehouse
2. **Transforms** and updates customer profiles in Cosmos DB
3. **Builds personalized recommendations** using purchase history
4. **Loads** enriched customer data back to Cosmos DB

The process keeps operational data (Cosmos DB) synchronized with analytical insights from the warehouse.

In [None]:
#Install packages
%pip install azure-cosmos
%pip install azure-core

In [None]:
# Configuration - Update these endpoints for your environment
import base64, json
from typing import Any, Optional
from azure.cosmos import CosmosClient, PartitionKey, ThroughputProperties
from azure.core.credentials import TokenCredential, AccessToken

# ========== REQUIRED CONFIGURATION ==========
# 1. Paste your Cosmos DB endpoint here (found in Settings > Connection)
COSMOS_ENDPOINT = ''  # e.g., 'https://your-cosmos-account.documents.azure.com:443/'

# 2. Paste your SQL Endpoint here (found in Warehouse > Settings > SQL Endpoint)  
WAREHOUSE_SERVER = ''  # e.g., 'abcd1234-...-workspace.z01.datawarehouse.fabric.microsoft.com'

# ========== DATABASE AND CONTAINER NAMES ==========
COSMOS_DATABASE_NAME = 'fc_commerce_cosmos' 
COSMOS_CONTAINER_NAME = 'customers'
WAREHOUSE_NAME = 'fc_commerce_warehouse'

In [None]:
# Authentication and Client Setup
class FabricTokenCredential(TokenCredential):
    """Token credential for Fabric Cosmos DB access with automatic refresh and retry logic."""
    
    def get_token(self, *scopes: str, claims: Optional[str] = None, tenant_id: Optional[str] = None,
                  enable_cae: bool = False, **kwargs: Any) -> AccessToken:
        access_token = notebookutils.credentials.getToken("https://cosmos.azure.com/.default")
        parts = access_token.split(".")
        if len(parts) < 2:
            raise ValueError("Invalid JWT format")
        payload_b64 = parts[1]
        # Fix padding
        padding = (-len(payload_b64)) % 4
        if padding:
            payload_b64 += "=" * padding
        payload_json = base64.urlsafe_b64decode(payload_b64.encode("utf-8")).decode("utf-8")
        payload = json.loads(payload_json)
        exp = payload.get("exp")
        if exp is None:
            raise ValueError("exp claim missing in token")
        return AccessToken(token=access_token, expires_on=exp)

# Initialize Cosmos DB clients
COSMOS_CLIENT = CosmosClient(COSMOS_ENDPOINT, FabricTokenCredential())
DATABASE_CLIENT = COSMOS_CLIENT.get_database_client(COSMOS_DATABASE_NAME)
CONTAINER_CLIENT = DATABASE_CLIENT.get_container_client(COSMOS_CONTAINER_NAME)

In [None]:
# Table Names and Utility Functions
from pyspark.sql import functions as F
from datetime import datetime
from notebookutils.mssparkutils import credentials

# Data Warehouse table names
DW_FACT_SALES = "dbo.FactSales"
DW_FACT_SALES_LINE = "dbo.FactSalesLineItems"  
DW_DIM_CUSTOMER = "dbo.DimCustomer"
DW_DIM_MENU = "dbo.DimMenuItem"

def get_warehouse_connection():
    """Get JDBC connection properties for the data warehouse."""
    token = credentials.getToken("pbi")
    jdbc_url = (
        f"jdbc:sqlserver://{WAREHOUSE_SERVER}:1433;"
        f"database={WAREHOUSE_NAME};"
        "encrypt=true;"
        "trustServerCertificate=false;"
        "hostNameInCertificate=*.datawarehouse.fabric.microsoft.com;"
        "loginTimeout=30"
    )
    props = {
        "driver": "com.microsoft.sqlserver.jdbc.SQLServerDriver",
        "accessToken": token
    }
    return jdbc_url, props

def read_dw_table(table_name: str):
    """Read a table from the data warehouse."""
    jdbc_url, props = get_warehouse_connection()
    return spark.read.jdbc(jdbc_url, table=table_name, properties=props)

def query_cosmos(query_text: str, parameters=None):
    """Execute a query against Cosmos DB and return results as Spark DataFrame."""
    if parameters is None:
        parameters = []
    
    results = CONTAINER_CLIENT.query_items(
        query=query_text,
        parameters=parameters,
        enable_cross_partition_query=True,
    )
    results_list = list(results)
    print(f"Retrieved {len(results_list)} records from Cosmos.")
    return spark.createDataFrame(results_list) if results_list else None

def patch_cosmos_customer(customer_id: str, patch_operations: list):
    """Patch a customer document in Cosmos DB."""
    try:
        CONTAINER_CLIENT.patch_item(
            item=customer_id, 
            partition_key=customer_id, 
            patch_operations=patch_operations
        )
        return True
    except Exception as e:
        print(f"⚠️ Patch failed for {customer_id}: {e}")
        return False

In [None]:
# Test Query
query_text = """SELECT * FROM customers c"""

df = query_cosmos(query_text)
if df:
    print(f"Customer recommendation status: {df.count()} records")
    display(df.show(5))

## Part 1: Reverse ETL - Loyalty Point Synchronization

**Reverse ETL** moves processed data from analytical systems back to operational systems. Here we:
- Extract aggregated loyalty points from the Data Warehouse
- Calculate net loyalty balances (earned - redeemed)
- Update customer profiles in Cosmos DB with current loyalty totals

In [None]:
# Load warehouse data and calculate loyalty point updates
print("Loading data warehouse tables...")
fact_sales=read_dw_table(DW_FACT_SALES)
fact_line  = read_dw_table(DW_FACT_SALES_LINE).select("TransactionId","MenuItemKey","Quantity")
dim_menu= read_dw_table(DW_DIM_MENU).select("MenuItemKey","MenuItemId","MenuItemName")
dim_customer = read_dw_table(DW_DIM_CUSTOMER).select("CustomerKey","CustomerId")

print("Tables loaded successfully!")
print(f"Fact Sales: {fact_sales.count()} records")
print(f"Fact Line Items: {fact_line_items.count()} records")

In [None]:
# Aggregate loyalty deltas from FactSales
loyalty_agg = (fact_sales
    .groupBy("CustomerKey")
    .agg(
        F.coalesce(F.sum("LoyaltyPointsEarned"), F.lit(0)).alias("PointsEarned"),
        F.coalesce(F.sum("LoyaltyPointsRedeemed"), F.lit(0)).alias("PointsRedeemed"),
        F.max("CreatedAt").alias("LastPurchaseUtc")
    )
    .withColumn("NewLoyaltyPoints", F.col("PointsEarned") - F.col("PointsRedeemed"))
)

In [None]:
# Load all customers from Cosmos DB
customers_df = query_cosmos("SELECT * FROM c")
if customers_df:
    print(f"Loaded {customers_df.count()} customers from Cosmos DB")
    customers_df.select("customerId", "name", "loyaltyPoints", "lastPurchaseDate").show(5)

In [None]:
# Bring CustomerId to join with Cosmos (via DimCustomer)
dim_customer = read_dw_table(DW_DIM_CUSTOMER).select("CustomerKey","CustomerId")

loyalty_by_id = (loyalty_agg
    .join(dim_customer, on="CustomerKey", how="inner")
    .select("CustomerId","NewLoyaltyPoints","LastPurchaseUtc")
)

# Join with existing customer docs and produce upserts
# Keep all fields in the doc, only replace loyaltyPoints / lastPurchaseDate / updatedAt
cust_with_loyalty = (customers_df.alias("c")
    .join(loyalty_by_id.alias("l"), F.col("c.customerId") == F.col("l.CustomerId"), "left")
    .withColumn("loyaltyPoints", F.when(F.col("l.NewLoyaltyPoints").isNotNull(), F.col("l.NewLoyaltyPoints")).otherwise(F.col("c.loyaltyPoints")))
    .withColumn("lastPurchaseDate", F.when(F.col("l.LastPurchaseUtc").isNotNull(), F.col("l.LastPurchaseUtc").cast("timestamp")).otherwise(F.col("c.lastPurchaseDate")))
    .withColumn("updatedAt", F.current_timestamp())
    .drop("NewLoyaltyPoints","LastPurchaseUtc","CustomerKey")
)

display(cust_with_loyalty.limit(10))

In [None]:
# Calculate loyalty point updates from warehouse data
loyalty_agg = (fact_sales
    .groupBy("CustomerKey")
    .agg(
        F.coalesce(F.sum("LoyaltyPointsEarned"), F.lit(0)).alias("PointsEarned"),
        F.coalesce(F.sum("LoyaltyPointsRedeemed"), F.lit(0)).alias("PointsRedeemed"),
        F.max("CreatedAt").alias("LastPurchaseUtc")
    )
    .withColumn("NewLoyaltyPoints", F.col("PointsEarned") - F.col("PointsRedeemed"))
)

# Join with customer dimension to get CustomerId
loyalty_by_id = (loyalty_agg
    .join(dim_customer.select("CustomerKey", "CustomerId"), on="CustomerKey", how="inner")
    .select("CustomerId", "NewLoyaltyPoints", "LastPurchaseUtc")
)

# Filter to customers with purchases today
today_updates = (loyalty_by_id
    .filter(F.to_date("LastPurchaseUtc") == F.current_date())
    .select("CustomerId", "NewLoyaltyPoints", "LastPurchaseUtc")
)

print(f"Customers with purchases today: {today_updates.count()}")
today_updates.show(5)

In [None]:
# Update customer loyalty points in Cosmos DB
patched, failed = 0, 0

for row in today_updates.toLocalIterator():
    cust_id = row["CustomerId"]
    points = int(row["NewLoyaltyPoints"] or 0)
    last_dt = row["LastPurchaseUtc"]
    last_iso = last_dt.isoformat().replace("+00:00", "Z") if last_dt.tzinfo else last_dt.isoformat() + "Z"

    ops = [
        {"op": "set", "path": "/loyaltyPoints", "value": points},
        {"op": "set", "path": "/lastPurchaseDate", "value": last_iso},
        {"op": "set", "path": "/updatedAt", "value": datetime.utcnow().isoformat() + "Z"}
    ]

    if patch_cosmos_customer(cust_id, ops):
        patched += 1
    else:
        failed += 1

print(f"✅ Patched: {patched} | ❌ Failed: {failed}")

In [None]:
# Verify loyalty point updates
check_query = """
SELECT TOP 10 c.customerId, c.name, c.loyaltyPoints, c.lastPurchaseDate, c.updatedAt
FROM c
WHERE IS_DEFINED(c.updatedAt)
ORDER BY c.updatedAt DESC
"""

df_check = query_cosmos(check_query)
if df_check:
    print(f"Retrieved {df_check.count()} recently updated customers")
    display(df_check)

## Part 2: Personalized Recommendations
### Step 1: Calculate Weighted Purchase Scores & Build Top Recommendations
- Join transaction data, apply exponential decay for recency, and normalize scores (0-1) per customer.
- Min-max normalization to select top 4 items with scores ≥ 0.70 per customer.

In [None]:
# Generate Personalized Recommendations
from pyspark.sql import Window
from math import log

# Recommendation algorithm parameters
TOPK = 4
MIN_SCORE = 0.70
HALF_LIFE_DAYS = 14  # Recency decay (set to 0 to disable)
LAMBDA = (log(2) / HALF_LIFE_DAYS) if HALF_LIFE_DAYS > 0 else 0.0

def calculate_recommendations():
    """Calculate personalized recommendations based on purchase history."""
    
    # Get transaction data with customer and menu item details
    lines = (fact_line
             .join(fact_sales, on="TransactionId", how="inner")
             .select("CustomerKey", "MenuItemKey", "Quantity", "CreatedAt"))
    
    # Apply recency decay if enabled
    if LAMBDA > 0:
        lines = (lines
                .withColumn("ageDays", F.datediff(F.current_timestamp(), F.col("CreatedAt")))
                .withColumn("decay", F.exp(-F.lit(LAMBDA) * F.col("ageDays")))
                .withColumn("wQty", F.col("Quantity") * F.col("decay")))
    else:
        lines = lines.withColumn("wQty", F.col("Quantity"))
    
    # Aggregate weighted quantity per (customer, item)
    cust_item = (lines
                .groupBy("CustomerKey", "MenuItemKey")
                .agg(F.sum("wQty").alias("wQty")))
    
    print(f"Unique (Customer, Item) pairs: {cust_item.count()}")
    
    # Normalize scores 0..1 per customer
    win_c = Window.partitionBy("CustomerKey")
    scored = (cust_item
             .withColumn("minQ", F.min("wQty").over(win_c))
             .withColumn("maxQ", F.max("wQty").over(win_c))
             .withColumn("score",
                        F.when(F.col("maxQ") == F.col("minQ"), F.lit(1.0))
                         .otherwise((F.col("wQty") - F.col("minQ")) / (F.col("maxQ") - F.col("minQ"))))
             .select("CustomerKey", "MenuItemKey", F.round("score", 4).alias("score")))
    
    return scored

# Calculate recommendation scores
recommendation_scores = calculate_recommendations()
print("Recommendation scores calculated successfully!")
recommendation_scores.show(5)

### Step 2: Join with Dimension Tables & Create Recommendation Payload
- Join with product and customer dimension tables to enrich the recommendation data.
- Create a structured payload for the recommendation engine, including customer ID, product ID, and score.

In [None]:
# Prepare top recommendations for each customer
def prepare_recommendations(scored_df):
    """Prepare top recommendations with menu item details."""
    
    # Join with customer and menu dimensions
    scored_with_details = (scored_df
                          .join(dim_customer.select("CustomerKey", "CustomerId"), 
                                on="CustomerKey", how="inner")
                          .join(dim_menu.select("MenuItemKey", "MenuItemId", "MenuItemName"), 
                                on="MenuItemKey", how="left")
                          .select("CustomerId", "MenuItemId", "MenuItemName", "score"))
    
    # Filter and rank per customer
    win_rank = Window.partitionBy("CustomerId").orderBy(F.col("score").desc())
    top_recs = (scored_with_details
               .filter(F.col("score") >= F.lit(MIN_SCORE))
               .withColumn("rnk", F.row_number().over(win_rank))
               .filter(F.col("rnk") <= TOPK)
               .drop("rnk"))
    
    print(f"Customers with ≥{MIN_SCORE} score items: {top_recs.select('CustomerId').distinct().count()}")
    
    # Build recommendations array payload
    recs_ready = (top_recs
                 .withColumn("rec_item", F.struct(
                     F.col("MenuItemId").alias("menuItemId"),
                     F.col("MenuItemName").alias("name"),
                     F.col("score").alias("score"),
                     F.lit("High-frequency purchase").alias("reason")))
                 .groupBy("CustomerId")
                 .agg(F.collect_list("rec_item").alias("recommendations")))
    
    print(f"Total customers with recommendations: {recs_ready.count()}")
    return recs_ready

# Prepare final recommendations
final_recommendations = prepare_recommendations(recommendation_scores)
display(final_recommendations.limit(5))

### Step 3: Update Customer Profiles
Patch Cosmos DB documents with recommendation arrays and timestamps.

In [None]:
# Update customer recommendations in Cosmos DB
from pyspark.sql.types import Row

def update_customer_recommendations(recommendations_df):
    """Update customer documents with personalized recommendations."""
    patched, failed = 0, 0
    
    for row in recommendations_df.toLocalIterator():
        cust_id = row["CustomerId"]
        recs = [x.asDict(recursive=True) if isinstance(x, Row) else x for x in row["recommendations"]]
        
        ops = [
            {"op": "set", "path": "/recommendations", "value": recs},
            {"op": "set", "path": "/updatedAt", "value": datetime.utcnow().isoformat() + "Z"}
        ]
        
        if patch_cosmos_customer(cust_id, ops):
            patched += 1
            if patched % 100 == 0:
                print(f"Patched {patched} customers so far...")
        else:
            failed += 1
    
    print(f"✅ Patched {patched} customers, ❌ Failed {failed}")
    return patched, failed

# Update recommendations in Cosmos DB
update_customer_recommendations(final_recommendations)

In [None]:
# Verify recommendations were successfully added
check_query = """
SELECT TOP 10 c.customerId, c.loyaltyPoints, c.recommendations, c.updatedAt
FROM c
WHERE IS_DEFINED(c.recommendations) AND ARRAY_LENGTH(c.recommendations) > 0
ORDER BY c.updatedAt DESC
"""

result_df = query_cosmos(check_query)
if result_df:
    print(f"Found {result_df.count()} customers with recommendations.")
    # Convert to pandas for better display of nested data
    result_pandas = result_df.toPandas()
    display(result_pandas)