Exercise 3: ETL Pipeline for E-commerce Analytics

Scenario

You're building an ETL pipeline for an e-commerce platform that needs to process daily
transaction data and generate analytics reports.

Task

Create a system that can process raw transaction data, transform it into analytical
formats, and generate summary reports.

In [4]:
import json
from collections import defaultdict

def read_json(file_path):
  
    with open(file_path, 'r') as f:
        transactions = json.load(f)
    return transactions


file_path = r'C:\Users\aravi\Desktop\Assessment_Python\assessments\20241219\sample-dataset-3.json'  
transactions = read_json(file_path)

In [None]:
transactions

1. Create a function that transforms raw transaction data:
Flatten nested JSON structures using lambda functions
Calculate total transaction values
Extract unique product categories using map() and set()

In [1]:

def transform_transaction(transactions):
    transformed_transactions = []
    for transaction in transactions:
        transformed_transaction = {
            "transaction_id": transaction["transaction_id"],
            "timestamp": transaction["timestamp"],
            "customer_id": transaction["customer"]["id"],
            "region": transaction["customer"].get("region", "Unknown"),
            "payment_method": transaction["payment_method"],
            "status": transaction["status"],
            "total_value": sum(item["price"] * item["quantity"] for item in transaction["items"]),
            "product_categories": list(set(item["category"] for item in transaction["items"]))
        }
        transformed_transactions.append(transformed_transaction)
    return transformed_transactions

In [6]:
trans = transform_transaction(transactions)

2. Create analysis functions that:
Group transactions by region and calculate regional sales
Find top-selling products using sorted() with custom key
Calculate average transaction value by payment method

In [None]:
def analyze_regional_sales(transactions):
   
    regional_sales = defaultdict(float)
    for transaction in transactions:
        regional_sales[transaction["region"]] += transaction["total_value"]
    return regional_sales

def find_top_selling_products(transactions, top_n=5):
    product_quantities = defaultdict(int)
    for transaction in transactions:
        for item in transaction["items"]:
            product_quantities[item["product_id"]] += item["quantity"]
    sorted_products = sorted(product_quantities.items(), key=lambda item: item[1], reverse=True)
    return sorted_products[:top_n]

def calculate_average_transaction_value(transactions):
    payment_method_totals = defaultdict(lambda: {"total_value": 0, "count": 0})
    for transaction in transactions:
        payment_method = transaction["payment_method"]
        payment_method_totals[payment_method]["total_value"] += transaction["total_value"]
        payment_method_totals[payment_method]["count"] += 1
    average_transaction_values = {
        payment_method: totals["total_value"] / totals["count"]
        for payment_method, totals in payment_method_totals.items()
        if totals["count"] > 0
    }
    return average_transaction_values

In [None]:
analyze_regional_sales(trans)

In [None]:
find_top_selling_products(transactions)


In [None]:
calculate_average_transaction_value(trans)

3. Create a report generation function that:
Filters completed transactions using filter()
Sorts data by multiple criteria using lambda
Generates summary statistics for different time periods

In [42]:
def generate_summary_report(transactions, start_date=None, end_date=None):
    filtered_transactions = transactions
    if start_date:
        filtered_transactions = list(filter(lambda t: t["timestamp"] >= start_date, filtered_transactions))
    if end_date:
        filtered_transactions = list(filter(lambda t: t["timestamp"] <= end_date, filtered_transactions))

    total_transactions = len(filtered_transactions)
    total_revenue = sum(t["total_value"] for t in filtered_transactions)
    
    
    transactions_by_region = defaultdict(int)
    for transaction in filtered_transactions:
        transactions_by_region[transaction["region"]] += 1

    report = {
        "Total transactions are ": total_transactions,
        "Total Revenue is ": total_revenue,
        "Transactions based on regions are ": transactions_by_region
    }
    return report

In [None]:
generate_summary_report(trans)