# Fabric Lakehouse Data Population

This notebook demonstrates how to populate a Fabric Lakehouse with sample data for testing and development purposes.

## Overview
- Create sample datasets
- Write data to lakehouse tables
- Verify data ingestion
- Set up basic data structures for analytics

In [None]:
# Import required libraries
import pandas as pd
import numpy as np
from datetime import datetime, timedelta
import random
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

# Initialize Spark session
spark = SparkSession.builder.appName("LakehouseDataPopulation").getOrCreate()

print("Libraries imported successfully")
print(f"Spark version: {spark.version}")

In [None]:
# Create sample customer data
def generate_customer_data(num_customers=10000):
    """Generate sample customer data"""
    
    # Sample data for names and locations
    first_names = ['John', 'Jane', 'Michael', 'Sarah', 'David', 'Emily', 'Robert', 'Jessica', 'William', 'Ashley']
    last_names = ['Smith', 'Johnson', 'Williams', 'Brown', 'Jones', 'Garcia', 'Miller', 'Davis', 'Rodriguez', 'Martinez']
    cities = ['New York', 'Los Angeles', 'Chicago', 'Houston', 'Phoenix', 'Philadelphia', 'San Antonio', 'San Diego', 'Dallas', 'San Jose']
    states = ['NY', 'CA', 'IL', 'TX', 'AZ', 'PA', 'TX', 'CA', 'TX', 'CA']
    
    customers = []
    for i in range(num_customers):
        customer = {
            'customer_id': i + 1,
            'first_name': random.choice(first_names),
            'last_name': random.choice(last_names),
            'email': f"user{i+1}@example.com",
            'city': random.choice(cities),
            'state': random.choice(states),
            'registration_date': datetime.now() - timedelta(days=random.randint(1, 365)),
            'is_active': random.choice([True, False]),
            'customer_value': round(random.uniform(100, 10000), 2)
        }
        customers.append(customer)
    
    return pd.DataFrame(customers)

# Generate customer data
customers_df = generate_customer_data(5000)
print(f"Generated {len(customers_df)} customer records")
print("\nSample data:")
print(customers_df.head())

In [None]:
# Create sample product data
def generate_product_data(num_products=1000):
    """Generate sample product data"""
    
    categories = ['Electronics', 'Clothing', 'Books', 'Home & Garden', 'Sports', 'Toys', 'Health', 'Beauty']
    brands = ['Brand A', 'Brand B', 'Brand C', 'Brand D', 'Brand E']
    
    products = []
    for i in range(num_products):
        product = {
            'product_id': i + 1,
            'product_name': f"Product {i+1}",
            'category': random.choice(categories),
            'brand': random.choice(brands),
            'price': round(random.uniform(10, 500), 2),
            'cost': round(random.uniform(5, 300), 2),
            'stock_quantity': random.randint(0, 1000),
            'launch_date': datetime.now() - timedelta(days=random.randint(30, 1095)),
            'is_discontinued': random.choice([True, False]) if random.random() < 0.1 else False
        }
        products.append(product)
    
    return pd.DataFrame(products)

# Generate product data
products_df = generate_product_data(1000)
print(f"Generated {len(products_df)} product records")
print("\nSample data:")
print(products_df.head())

In [None]:
# Create sample sales transaction data
def generate_sales_data(num_transactions=50000, customers_df=None, products_df=None):
    """Generate sample sales transaction data"""
    
    if customers_df is None or products_df is None:
        raise ValueError("Customer and product data required")
    
    customer_ids = customers_df['customer_id'].tolist()
    product_ids = products_df['product_id'].tolist()
    
    sales = []
    for i in range(num_transactions):
        transaction_date = datetime.now() - timedelta(days=random.randint(1, 365))
        customer_id = random.choice(customer_ids)
        product_id = random.choice(product_ids)
        
        # Get product price for this transaction
        product_price = products_df[products_df['product_id'] == product_id]['price'].iloc[0]
        quantity = random.randint(1, 5)
        discount = round(random.uniform(0, 0.3), 2) if random.random() < 0.2 else 0
        
        sale = {
            'transaction_id': i + 1,
            'customer_id': customer_id,
            'product_id': product_id,
            'transaction_date': transaction_date,
            'quantity': quantity,
            'unit_price': product_price,
            'discount_percent': discount,
            'total_amount': round(quantity * product_price * (1 - discount), 2),
            'payment_method': random.choice(['Credit Card', 'Debit Card', 'Cash', 'PayPal', 'Bank Transfer']),
            'sales_channel': random.choice(['Online', 'In-Store', 'Mobile App', 'Phone'])
        }
        sales.append(sale)
    
    return pd.DataFrame(sales)

# Generate sales data
sales_df = generate_sales_data(25000, customers_df, products_df)
print(f"Generated {len(sales_df)} sales transaction records")
print("\nSample data:")
print(sales_df.head())

In [None]:
# Write data to Lakehouse tables
def write_to_lakehouse(df, table_name, mode="overwrite"):
    """Write DataFrame to Lakehouse table"""
    try:
        # Convert pandas DataFrame to Spark DataFrame
        spark_df = spark.createDataFrame(df)
        
        # Write to Delta table in the lakehouse
        spark_df.write \
            .format("delta") \
            .mode(mode) \
            .option("mergeSchema", "true") \
            .saveAsTable(table_name)
        
        print(f"Successfully wrote {len(df)} records to table '{table_name}'")
        
        # Show table info
        table_info = spark.sql(f"DESCRIBE TABLE {table_name}")
        print(f"\nTable schema for '{table_name}':")
        table_info.show()
        
    except Exception as e:
        print(f"Error writing to table '{table_name}': {str(e)}")

# Write all datasets to lakehouse
print("Writing customer data to lakehouse...")
write_to_lakehouse(customers_df, "customers")

print("\nWriting product data to lakehouse...")
write_to_lakehouse(products_df, "products")

print("\nWriting sales data to lakehouse...")
write_to_lakehouse(sales_df, "sales_transactions")

In [None]:
# Verify data ingestion and run sample queries
print("=== DATA VERIFICATION ===\n")

# Check table counts
tables = ["customers", "products", "sales_transactions"]

for table in tables:
    try:
        count = spark.sql(f"SELECT COUNT(*) as count FROM {table}").collect()[0]['count']
        print(f"Table '{table}': {count:,} records")
    except Exception as e:
        print(f"Error querying table '{table}': {str(e)}")

print("\n=== SAMPLE QUERIES ===\n")

# Sample query 1: Top customers by total spend
print("Top 10 customers by total spend:")
top_customers = spark.sql("""
    SELECT 
        c.customer_id,
        c.first_name,
        c.last_name,
        c.email,
        SUM(s.total_amount) as total_spent,
        COUNT(s.transaction_id) as transaction_count
    FROM customers c
    JOIN sales_transactions s ON c.customer_id = s.customer_id
    GROUP BY c.customer_id, c.first_name, c.last_name, c.email
    ORDER BY total_spent DESC
    LIMIT 10
""")
top_customers.show()

# Sample query 2: Sales by category
print("\nSales by product category:")
sales_by_category = spark.sql("""
    SELECT 
        p.category,
        COUNT(s.transaction_id) as transaction_count,
        SUM(s.total_amount) as total_revenue,
        AVG(s.total_amount) as avg_transaction_value
    FROM products p
    JOIN sales_transactions s ON p.product_id = s.product_id
    GROUP BY p.category
    ORDER BY total_revenue DESC
""")
sales_by_category.show()

# Sample query 3: Monthly sales trend
print("\nMonthly sales trend:")
monthly_sales = spark.sql("""
    SELECT 
        YEAR(transaction_date) as year,
        MONTH(transaction_date) as month,
        COUNT(transaction_id) as transaction_count,
        SUM(total_amount) as total_revenue
    FROM sales_transactions
    GROUP BY YEAR(transaction_date), MONTH(transaction_date)
    ORDER BY year DESC, month DESC
    LIMIT 12
""")
monthly_sales.show()

In [None]:
# Create additional time series data for analytics
def generate_daily_metrics(start_date, end_date):
    """Generate daily business metrics"""
    
    current_date = start_date
    metrics = []
    
    while current_date <= end_date:
        # Simulate seasonal patterns and weekday effects
        is_weekend = current_date.weekday() >= 5
        is_holiday = current_date.month == 12 and current_date.day in [24, 25, 31]
        
        base_visits = 1000
        if is_weekend:
            base_visits *= 1.3
        if is_holiday:
            base_visits *= 2.0
        
        daily_metric = {
            'date': current_date,
            'website_visits': int(base_visits + random.normal(0, 100)),
            'unique_visitors': int(base_visits * 0.7 + random.normal(0, 50)),
            'page_views': int(base_visits * 3.2 + random.normal(0, 200)),
            'bounce_rate': round(random.uniform(0.3, 0.7), 3),
            'avg_session_duration_minutes': round(random.uniform(2, 8), 2),
            'conversion_rate': round(random.uniform(0.02, 0.08), 4),
            'ad_spend': round(random.uniform(500, 2000), 2),
            'organic_traffic_percent': round(random.uniform(0.4, 0.8), 3)
        }
        metrics.append(daily_metric)
        current_date += timedelta(days=1)
    
    return pd.DataFrame(metrics)

# Generate daily metrics for the last year
start_date = datetime.now() - timedelta(days=365)
end_date = datetime.now()

daily_metrics_df = generate_daily_metrics(start_date, end_date)
print(f"Generated {len(daily_metrics_df)} daily metric records")
print("\nSample data:")
print(daily_metrics_df.head())

# Write to lakehouse
print("\nWriting daily metrics to lakehouse...")
write_to_lakehouse(daily_metrics_df, "daily_website_metrics")

## Data Population Summary

The lakehouse has been successfully populated with the following tables:

### Core Business Tables
1. **customers** - Customer information and profiles
2. **products** - Product catalog with pricing and inventory
3. **sales_transactions** - Transaction records linking customers and products

### Analytics Tables
4. **daily_website_metrics** - Daily web analytics and performance metrics

### Data Volumes
- 5,000 customer records
- 1,000 product records  
- 25,000 sales transactions
- 365 days of website metrics

### Next Steps
- Run analytics queries to explore the data
- Create additional derived tables as needed
- Set up scheduled data refreshes
- Implement data quality monitoring

The lakehouse is now ready for analytics, reporting, and machine learning workloads!