# AutoCouncil — Fabric Synthetic Data Generator (PySpark)

Generates large-scale synthetic automotive datasets for the AutoCouncil scenario: dealers, models, customers, incentives, inventory, and sales fact data with campaign + logistics anomalies.

## Features
- 38M+ rows across 6 Delta tables
- Azure Motors EV SUV campaign uplift (Jun–Sep 2025)
- West/Pacific NW logistics delay (Aug–Sep 2025)
- Deterministic randomness (seed + hash)
- Fabric-ready (relative lakehouse paths)

## Run Steps
1. Attach & pin your Lakehouse
2. Run cells top → bottom
3. Inspect Tables and Files/reports

Estimated runtime: 10–20 minutes

## 1. Test Lakehouse Connectivity
Ensures we can read/write to the attached Lakehouse.
Prerequisites: Lakehouse attached + write permissions

In [1]:
import os, pandas as pd
tables_path = 'Tables'
files_path = 'Files'
print('📋 Fabric environment detected')
print('🔌 Testing Lakehouse connectivity...')
try:
    os.makedirs(tables_path, exist_ok=True)
    os.makedirs(files_path, exist_ok=True)
    test_path = f'{files_path}/.connectivity_test'
    with open(test_path,'w') as f: 
        f.write(f'Test at {pd.Timestamp.now()}')
    with open(test_path,'r') as f: 
        _ = f.read()
    print('✅ Lakehouse connectivity OK')
except Exception as e:
    print(f'❌ Connectivity failed: {e}')
    raise

StatementMeta(, 4825afcd-6c6c-4c32-a606-a141b2984f0a, 3, Finished, Available, Finished)

📋 Fabric environment detected
🔌 Testing Lakehouse connectivity...
✅ Lakehouse connectivity OK


## 2. Install Required Libraries
Uses Faker for realistic synthetic attributes.

In [2]:
%pip install faker
print('✅ Faker installed')

StatementMeta(, 4825afcd-6c6c-4c32-a606-a141b2984f0a, 9, Finished, Available, Finished)

Collecting faker
  Downloading faker-37.11.0-py3-none-any.whl.metadata (15 kB)
Downloading faker-37.11.0-py3-none-any.whl (2.0 MB)
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m2.0/2.0 MB[0m [31m18.5 MB/s[0m eta [36m0:00:00[0m00:01[0m0:01[0m
[?25hInstalling collected packages: faker
Successfully installed faker-37.11.0

[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m A new release of pip is available: [0m[31;49m24.0[0m[39;49m -> [0m[32;49m25.2[0m
[1m[[0m[34;49mnotice[0m[1;39;49m][0m[39;49m To update, run: [0m[32;49mpython -m pip install --upgrade pip[0m
Note: you may need to restart the kernel to use updated packages.
✅ Faker installed



In [3]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
from faker import Faker
import random, hashlib
from datetime import datetime, timedelta
Faker.seed(12345); random.seed(12345); faker = Faker()
try: spark; print('✅ Using existing Spark session')
except NameError: 
    spark = SparkSession.builder.appName('AutoCouncil Data Generator')\
        .config('spark.sql.extensions','io.delta.sql.DeltaSparkSessionExtension')\
        .config('spark.sql.catalog.spark_catalog','org.apache.spark.sql.delta.catalog.DeltaCatalog')\
        .getOrCreate(); print('✅ Created Spark session')
print('Spark version:', spark.version)

StatementMeta(, 4825afcd-6c6c-4c32-a606-a141b2984f0a, 11, Finished, Available, Finished)

✅ Using existing Spark session
Spark version: 3.5.1.5.4.20251001.1


## 3. Configuration
Scale + scenario setup

In [4]:
NUM_DEALERS = 1200
NUM_MODELS = 600
NUM_CUSTOMERS = 1_000_000
SALES_TARGET_ROWS = 20_000_000
MONTHS = [f'{y}-{m:02d}' for y in [2024,2025] for m in range(1,13)]
REGIONS = ['Northeast','Mid-Atlantic','Midwest','South','Mountain','West','Pacific NW','Southwest']
ANOMALY_MONTHS = ['2025-06','2025-07','2025-08','2025-09']
ANOMALY_BRAND = 'Azure Motors'
ANOMALY_BODYTYPE = 'EV SUV'
TABLES_PATH='Tables'; FILES_PATH='Files'
print('✅ Config loaded'); print('Months:', len(MONTHS),'Regions:',len(REGIONS))

StatementMeta(, 4825afcd-6c6c-4c32-a606-a141b2984f0a, 12, Finished, Available, Finished)

✅ Config loaded
Months: 24 Regions: 8


## 4. Helper Functions

In [5]:
def hash_random(*items):
    hv = int(hashlib.md5('|'.join(map(str,items)).encode()).hexdigest(),16)
    return (hv % 1_000_000) / 1_000_000.0
def write_delta(df,name,mode='overwrite'):
    path=f'{TABLES_PATH}/{name}'
    df.write.format('delta').mode(mode).save(path)
    print(f'   ✅ {name}: {df.count():,} rows')
    return path
print('✅ Helpers ready')

StatementMeta(, 4825afcd-6c6c-4c32-a606-a141b2984f0a, 13, Finished, Available, Finished)

✅ Helpers ready


## 5. Vehicle Models

In [7]:
body_types = ['Sedan','Hatchback','SUV','EV Sedan','EV SUV','Pickup','Van','Coupe','Wagon','Crossover']
brands=[ANOMALY_BRAND]
while len(brands)<40:
    b=f'{faker.company()} Motors'.replace(',','')
    if b not in brands:
        brands.append(b)

models=[]
# First, ensure Azure Motors has EV SUV models (critical for campaign)
for i in range(60):  # 60 Azure Motors EV SUV models
    trim=f'{faker.color_name()} {(i%18)+1}'
    models.append({
        'model_id':f'M{i:05d}',
        'brand':ANOMALY_BRAND,
        'model_name':f"{ANOMALY_BRAND.split()[0]} {ANOMALY_BODYTYPE} {trim}",
        'body_type':ANOMALY_BODYTYPE,
        'msrp':21000+(i%120)*600,
        'launch_date':'2024-01-01'
    })
# Fill remaining models with varied brands/body types
for i in range(60, NUM_MODELS):
    brand=brands[i%len(brands)]
    btype=body_types[i%len(body_types)]
    trim=f'{faker.color_name()} {(i%18)+1}'
    models.append({
        'model_id':f'M{i:05d}',
        'brand':brand,
        'model_name':f"{brand.split()[0]} {btype} {trim}",
        'body_type':btype,
        'msrp':21000+(i%120)*600,
        'launch_date':'2024-01-01'
    })

models_df=spark.createDataFrame(models)
write_delta(models_df,'models')
print(f'✅ Models created: {len(models)} across {len(brands)} brands (including {60} Azure Motors EV SUV models)')

StatementMeta(, 4825afcd-6c6c-4c32-a606-a141b2984f0a, 15, Finished, Available, Finished)

   ✅ models: 600 rows
✅ Models created: 600 across 40 brands (including 60 Azure Motors EV SUV models)


## 6. Dealers

In [8]:
dealer_types=['Auto','Motors','Cars','Dealers','Group']
dealers=[]
for i in range(NUM_DEALERS):
    region=REGIONS[i%len(REGIONS)]
    city=faker.city()
    dtype=random.choice(dealer_types)
    dealers.append({'dealer_id':f'D{i:05d}','dealer_name':f'{city} {dtype}','region':region,'lat':25.0+(i%30)*0.9,'lon':-124.0+(i%60)*1.2})
dealers_df=spark.createDataFrame(dealers); write_delta(dealers_df,'dealers')
print(f'✅ Dealers created: {len(dealers)}')

StatementMeta(, 4825afcd-6c6c-4c32-a606-a141b2984f0a, 16, Finished, Available, Finished)

   ✅ dealers: 1,200 rows
✅ Dealers created: 1200


## 7. Customers (1M)
Progress prints every 100K.

In [9]:
customers=[]
for i in range(NUM_CUSTOMERS):
    first=faker.first_name(); last=faker.last_name()
    customers.append({'customer_id':f'C{i:07d}','first_name':first,'last_name':last,'email':faker.email().lower().replace(' ','.'),'age':21+(i%60),'annual_income':35000+(i%120)*1200+(i%7)*900})
    if i and i%100000==0: print(f'   {i:,} customers...')
customers_df=spark.createDataFrame(customers); write_delta(customers_df,'customers')
print(f'✅ Customers created: {len(customers):,}')

StatementMeta(, 4825afcd-6c6c-4c32-a606-a141b2984f0a, 17, Finished, Available, Finished)

   100,000 customers...
   200,000 customers...
   300,000 customers...
   400,000 customers...
   500,000 customers...
   600,000 customers...
   700,000 customers...
   800,000 customers...
   900,000 customers...
   ✅ customers: 1,000,000 rows
✅ Customers created: 1,000,000


## 8. Incentives & Campaign

In [10]:
incentives=[]
for brand in brands:
  for bt in body_types:
    for m in MONTHS:
      base_rebate=int(hash_random(brand,bt,m)*3500); base_apr=(hash_random(brand,m)*5.0)+0.9
      rebate=base_rebate; apr=base_apr
      if brand==ANOMALY_BRAND and bt==ANOMALY_BODYTYPE and m in ANOMALY_MONTHS:
        rebate+=3000; apr=1.9
      incentives.append({'brand':brand,'body_type':bt,'month':m,'rebate':rebate,'apr':apr})
incentives_df=spark.createDataFrame(incentives); write_delta(incentives_df,'incentives')
print(f'✅ Incentives rows: {len(incentives):,}')

StatementMeta(, 4825afcd-6c6c-4c32-a606-a141b2984f0a, 18, Finished, Available, Finished)

   ✅ incentives: 9,600 rows
✅ Incentives rows: 9,600


## 9. Inventory (~17M+)
Includes regional logistics constraint (incoming *0.7 West/Pacific NW Aug–Sep 2025)

In [11]:
inventory=[]; count=0
for d in dealers:
  for m in models:
    for month in MONTHS:
      on_hand=80+int(hash_random(d['dealer_id'],m['model_id'],month)*80)
      incoming=10+int(hash_random(m['model_id'],month)*40)
      turn=(hash_random(d['dealer_id'],m['model_id'])*4.0)+0.8
      if d['region'] in ['West','Pacific NW'] and month in ['2025-08','2025-09']: incoming=int(incoming*0.7)
      inventory.append({'dealer_id':d['dealer_id'],'region':d['region'],'model_id':m['model_id'],'brand':m['brand'],'body_type':m['body_type'],'month':month,'on_hand':on_hand,'incoming':incoming,'turn_rate':turn})
      count+=1
      if count and count%1_000_000==0: print(f'   {count:,} inventory rows')
inventory_df=spark.createDataFrame(inventory); write_delta(inventory_df,'inventory')
print(f'✅ Inventory rows: {len(inventory):,}')

StatementMeta(, 4825afcd-6c6c-4c32-a606-a141b2984f0a, 19, Finished, Available, Finished)

   1,000,000 inventory rows
   2,000,000 inventory rows
   3,000,000 inventory rows
   4,000,000 inventory rows
   5,000,000 inventory rows
   6,000,000 inventory rows
   7,000,000 inventory rows
   8,000,000 inventory rows
   9,000,000 inventory rows
   10,000,000 inventory rows
   11,000,000 inventory rows
   12,000,000 inventory rows
   13,000,000 inventory rows
   14,000,000 inventory rows
   15,000,000 inventory rows
   16,000,000 inventory rows
   17,000,000 inventory rows
   ✅ inventory: 17,280,000 rows
✅ Inventory rows: 17,280,000


## 10. Sales Transactions (20M)
Campaign boost: Azure Motors EV SUV units *1.35 in campaign months

In [12]:
# BATCHED Sales Transactions Generation (avoids huge driver list)
from builtins import max
inc_lookup={(i['brand'],i['body_type'],i['month']):i for i in incentives}
per_combo=NUM_DEALERS*NUM_MODELS*len(MONTHS)
repeat=max(1,int((SALES_TARGET_ROWS/per_combo)+0.5))
TARGET=SALES_TARGET_ROWS
BATCH_SIZE=250_000  # tune if needed
sc=0
batch=[]
first_write=True
import math
def flush(batch_rows, first):
    if not batch_rows: return first
    df=spark.createDataFrame(batch_rows)
    mode='overwrite' if first else 'append'
    write_delta(df,'car_sales',mode=mode)
    return False
for d in dealers:
  for m in models:
    for month in MONTHS:
      for r in range(repeat):
        if sc>=TARGET: break
        rep=str(r)
        inc=inc_lookup.get((m['brand'],m['body_type'],month),{})
        units=1+int(hash_random(d['dealer_id'],m['model_id'],rep)*4)
        discount=int(hash_random(d['dealer_id'],m['model_id'],month)*2000)
        day=1+int(hash_random(d['dealer_id'],m['model_id'],rep)*28)
        year,mo=month.split('-'); sale_date=f'{year}-{mo}-{day:02d}'
        cust=customers[int(hash_random(d['dealer_id'], m['model_id'], rep) * len(customers))]['customer_id']
        if m['brand']==ANOMALY_BRAND and m['body_type']==ANOMALY_BODYTYPE and month in ANOMALY_MONTHS: units=int(units*1.35)
        final_price=m['msrp']-discount - (inc.get('rebate',0)*0.7)
        cogs=m['msrp']*(0.76+hash_random(m['model_id'],d['dealer_id'])*0.1)
        revenue=final_price*units; profit=revenue-(cogs*units)
        batch.append({'sale_id':f'S{sc:08d}','dealer_id':d['dealer_id'],'region':d['region'],'model_id':m['model_id'],'brand':m['brand'],'body_type':m['body_type'],'model_name':m['model_name'],'month':month,'sale_date':sale_date,'customer_id':cust,'units':units,'msrp':m['msrp'],'discount':discount,'rebate':inc.get('rebate',0),'apr':inc.get('apr',0.0),'final_price':final_price,'cogs':cogs,'revenue':revenue,'profit':profit})
        sc+=1
        if sc % 1_000_000 == 0: print(f'   {sc:,} sales rows (cumulative)')
        if len(batch) >= BATCH_SIZE:
            first_write=flush(batch, first_write)
            batch=[]
      if sc>=TARGET: break
    if sc>=TARGET: break
  if sc>=TARGET: break
# flush remaining
first_write=flush(batch, first_write)
print(f'✅ Sales rows written: {sc:,}')

StatementMeta(, 4825afcd-6c6c-4c32-a606-a141b2984f0a, 20, Finished, Available, Finished)

   ✅ car_sales: 250,000 rows
   ✅ car_sales: 250,000 rows
   ✅ car_sales: 250,000 rows
   1,000,000 sales rows (cumulative)
   ✅ car_sales: 250,000 rows
   ✅ car_sales: 250,000 rows
   ✅ car_sales: 250,000 rows
   ✅ car_sales: 250,000 rows
   2,000,000 sales rows (cumulative)
   ✅ car_sales: 250,000 rows
   ✅ car_sales: 250,000 rows
   ✅ car_sales: 250,000 rows
   ✅ car_sales: 250,000 rows
   3,000,000 sales rows (cumulative)
   ✅ car_sales: 250,000 rows
   ✅ car_sales: 250,000 rows
   ✅ car_sales: 250,000 rows
   ✅ car_sales: 250,000 rows
   4,000,000 sales rows (cumulative)
   ✅ car_sales: 250,000 rows
   ✅ car_sales: 250,000 rows
   ✅ car_sales: 250,000 rows
   ✅ car_sales: 250,000 rows
   5,000,000 sales rows (cumulative)
   ✅ car_sales: 250,000 rows
   ✅ car_sales: 250,000 rows
   ✅ car_sales: 250,000 rows
   ✅ car_sales: 250,000 rows
   6,000,000 sales rows (cumulative)
   ✅ car_sales: 250,000 rows
   ✅ car_sales: 250,000 rows
   ✅ car_sales: 250,000 rows
   ✅ car_sales: 250,000 

## 11. Example Analytic Questions & Queries
This section provides sample business questions you can answer with the generated data, along with SQL queries to run in Microsoft Fabric.

In [13]:
# Register Delta tables as temporary views for SQL queries
tables = ['models', 'dealers', 'customers', 'incentives', 'inventory', 'car_sales']
for table in tables:
    try:
        df = spark.read.format('delta').load(f'Tables/{table}')
        df.createOrReplaceTempView(table)
        print(f'✅ Registered view: {table}')
    except Exception as e:
        print(f'❌ Failed to register {table}: {e}')
print('✅ All tables registered for SQL queries')

StatementMeta(, 4825afcd-6c6c-4c32-a606-a141b2984f0a, 21, Finished, Available, Finished)

✅ Registered view: models
✅ Registered view: dealers
✅ Registered view: customers
✅ Registered view: incentives
✅ Registered view: inventory
✅ Registered view: car_sales
✅ All tables registered for SQL queries


### 13.0 Register Tables for SQL Queries
Create temporary views so Spark SQL can query the Delta tables.

### 13.A Campaign Effectiveness
How much did the Azure Motors EV SUV campaign increase sales and revenue during the campaign months?

In [None]:
# Campaign impact: Azure Motors EV SUV uplift with dynamic interpretation
query = """
SELECT
  CASE WHEN month IN ('2025-06','2025-07','2025-08','2025-09') THEN 'Campaign' ELSE 'Baseline' END AS period,
  SUM(units) AS total_units,
  SUM(revenue) AS total_revenue
FROM car_sales
WHERE brand = 'Azure Motors' AND body_type = 'EV SUV'
GROUP BY CASE WHEN month IN ('2025-06','2025-07','2025-08','2025-09') THEN 'Campaign' ELSE 'Baseline' END
ORDER BY period
"""

# Execute query and display results
df = spark.sql(query)
display(df)

# Calculate dynamic metrics from actual results
results = df.collect()
baseline = next((r for r in results if r['period'] == 'Baseline'), None)
campaign = next((r for r in results if r['period'] == 'Campaign'), None)

if baseline and campaign:
    baseline_months = 20  # Total months minus 4 campaign months
    campaign_months = 4
    
    baseline_avg_units = baseline['total_units'] / baseline_months
    campaign_avg_units = campaign['total_units'] / campaign_months
    pct_increase = ((campaign_avg_units - baseline_avg_units) / baseline_avg_units) * 100
    
    baseline_per_unit = baseline['total_revenue'] / baseline['total_units']
    campaign_per_unit = campaign['total_revenue'] / campaign['total_units']
    margin_diff = baseline_per_unit - campaign_per_unit
    
    # Display dynamic interpretation
    print("\n" + "="*70)
    print("📊 CAMPAIGN PERFORMANCE ANALYSIS")
    print("="*70)
    print(f"\n📈 VOLUME METRICS:")
    print(f"   • Baseline:  {baseline_avg_units:>10,.0f} units/month  (20 months)")
    print(f"   • Campaign:  {campaign_avg_units:>10,.0f} units/month  (Jun-Sep 2025)")
    print(f"   • Increase:  {pct_increase:>10.1f}% more units per month")
    
    print(f"\n💰 REVENUE PER UNIT:")
    print(f"   • Baseline:  ${baseline_per_unit:>10,.2f} per unit")
    print(f"   • Campaign:  ${campaign_per_unit:>10,.2f} per unit")
    print(f"   • Margin:    ${margin_diff:>10,.2f} lower per unit (rebates + 1.9% APR)")
    
    print(f"\n✅ STRATEGIC OUTCOME:")
    if pct_increase > 15:
        print(f"   🎯 Campaign SUCCESS! Gained {pct_increase:.1f}% more sales by trading")
        print(f"      ${margin_diff:.0f}/unit margin for significant volume growth.")
        print(f"   📌 Classic promotional trade-off: margin → market share")
    else:
        print(f"   ⚠️  Modest {pct_increase:.1f}% increase - evaluate ROI vs rebate cost")
    print("="*70 + "\n")
else:
    print("⚠️ Unable to calculate metrics - check if both Baseline and Campaign data exist")

### 13.B Regional Logistics Impact
How did logistics delays in the West/Pacific NW affect inventory levels and sales?

In [None]:
# Regional logistics delay effect with dynamic interpretation
query = """
SELECT
  region,
  month,
  SUM(incoming) AS total_incoming,
  SUM(on_hand) AS total_on_hand
FROM inventory
WHERE region IN ('West', 'Pacific NW') AND month IN ('2025-08', '2025-09')
GROUP BY region, month
ORDER BY region, month
"""

# Execute query and display results
df = spark.sql(query)
display(df)

# Compare with normal months for context
comparison_query = """
SELECT
  region,
  AVG(incoming) AS avg_incoming_normal
FROM inventory
WHERE region IN ('West', 'Pacific NW') 
  AND month NOT IN ('2025-08', '2025-09')
GROUP BY region
"""
normal_df = spark.sql(comparison_query)
normal_data = {r['region']: r['avg_incoming_normal'] for r in normal_df.collect()}

# Analyze delay impact
results = df.collect()
if results:
    print("\n" + "="*70)
    print("🚚 LOGISTICS DELAY IMPACT ANALYSIS")
    print("="*70)
    print("\n📦 INCOMING INVENTORY COMPARISON:")
    
    for row in results:
        region = row['region']
        month = row['month']
        incoming = row['total_incoming']
        normal_avg = normal_data.get(region, 0)
        
        if normal_avg > 0:
            pct_change = ((incoming - normal_avg) / normal_avg) * 100
            print(f"\n   {region} ({month}):")
            print(f"      • Incoming:        {incoming:>8,} units")
            print(f"      • Normal average:  {normal_avg:>8,.0f} units")
            print(f"      • Change:          {pct_change:>7.1f}% {'⬇️' if pct_change < 0 else '⬆️'}")
    
    # Calculate total impact
    delay_months = df.filter(df.month.isin(['2025-08', '2025-09']))
    total_incoming_delay = delay_months.agg({'total_incoming': 'sum'}).collect()[0][0]
    
    print(f"\n📊 OVERALL IMPACT:")
    print(f"   • Total incoming (Aug-Sep):  {total_incoming_delay:,} units")
    print(f"   • Expected reduction: ~30% due to carrier constraints")
    print(f"   • Affected regions: West, Pacific NW")
    
    print(f"\n⚠️  BUSINESS IMPLICATIONS:")
    print(f"   🔴 Potential stockouts in affected regions")
    print(f"   🔴 Extended lead times (+4 days estimated)")
    print(f"   🟡 May impact campaign fulfillment in these regions")
    print("="*70 + "\n")
else:
    print("⚠️ No data found for logistics delay period")

### 13.C Dealer Performance
Which dealers had the highest sales, revenue, or profit?

In [None]:
# Top 10 dealers by profit with dynamic interpretation
import builtins  # Avoid conflict with PySpark's sum()

query = """
SELECT
  dealer_id,
  region,
  SUM(profit) AS total_profit,
  SUM(units) AS total_units,
  SUM(revenue) AS total_revenue
FROM car_sales
GROUP BY dealer_id, region
ORDER BY total_profit DESC
LIMIT 10
"""

# Execute query and display results
df = spark.sql(query)
display(df)

# Analyze top performers
results = df.collect()
if results:
    # Calculate network-wide averages for comparison
    avg_query = """
    SELECT 
        AVG(dealer_profit) AS avg_profit,
        AVG(dealer_units) AS avg_units
    FROM (
        SELECT dealer_id, SUM(profit) AS dealer_profit, SUM(units) AS dealer_units
        FROM car_sales
        GROUP BY dealer_id
    )
    """
    avg_data = spark.sql(avg_query).collect()[0]
    network_avg_profit = avg_data['avg_profit']
    network_avg_units = avg_data['avg_units']
    
    top_dealer = results[0]
    total_top10_profit = builtins.sum(r['total_profit'] for r in results)
    
    print("\n" + "="*70)
    print("🏆 TOP DEALER PERFORMANCE ANALYSIS")
    print("="*70)
    
    print(f"\n🥇 TOP PERFORMER:")
    print(f"   • Dealer:          {top_dealer['dealer_id']}")
    print(f"   • Region:          {top_dealer['region']}")
    print(f"   • Total profit:    ${top_dealer['total_profit']:>12,.2f}")
    print(f"   • Units sold:      {top_dealer['total_units']:>12,}")
    print(f"   • Profit/unit:     ${top_dealer['total_profit']/top_dealer['total_units']:>12,.2f}")
    
    vs_avg = ((top_dealer['total_profit'] - network_avg_profit) / network_avg_profit) * 100
    print(f"   • vs Network avg:  {vs_avg:>11,.1f}% higher")
    
    # Regional distribution
    region_counts = {}
    for r in results:
        region_counts[r['region']] = region_counts.get(r['region'], 0) + 1
    
    print(f"\n📍 REGIONAL DISTRIBUTION (Top 10):")
    for region, count in sorted(region_counts.items(), key=lambda x: x[1], reverse=True):
        print(f"   • {region:20s} {count} dealer(s)")
    
    # Concentration analysis
    all_dealers_profit = spark.sql("SELECT SUM(profit) AS total FROM car_sales").collect()[0]['total']
    concentration = (total_top10_profit / all_dealers_profit) * 100
    
    print(f"\n💼 PERFORMANCE CONCENTRATION:")
    print(f"   • Top 10 dealers: ${total_top10_profit:>12,.2f} profit")
    print(f"   • Network total:  ${all_dealers_profit:>12,.2f} profit")
    print(f"   • Concentration:  {concentration:>11.1f}% of total profit")
    
    print(f"\n✅ INSIGHTS:")
    if concentration > 10:
        print(f"   ⚠️  High concentration ({concentration:.1f}%) - investigate success factors")
        print(f"   📌 Consider replicating top dealer strategies network-wide")
    else:
        print(f"   ✅ Balanced performance across dealer network")
    print("="*70 + "\n")
else:
    print("⚠️ No dealer data found")

### 13.D Customer Segmentation
What are the demographic profiles of customers buying EVs vs. other body types?

In [None]:
# Customer segmentation by income band with dynamic interpretation
import builtins  # Avoid conflict with PySpark's sum()

query = """
SELECT
  CASE
    WHEN annual_income < 50000 THEN 'Low'
    WHEN annual_income < 90000 THEN 'Medium'
    ELSE 'High'
  END AS income_band,
  COUNT(*) AS customer_count
FROM customers
GROUP BY CASE
    WHEN annual_income < 50000 THEN 'Low'
    WHEN annual_income < 90000 THEN 'Medium'
    ELSE 'High'
  END
ORDER BY customer_count DESC
"""

# Execute query and display results
df = spark.sql(query)
display(df)

# Analyze customer segmentation
results = df.collect()
if results:
    total_customers = builtins.sum(r['customer_count'] for r in results)
    
    # Get income stats
    income_stats = spark.sql("""
        SELECT 
            MIN(annual_income) AS min_income,
            MAX(annual_income) AS max_income,
            AVG(annual_income) AS avg_income,
            PERCENTILE_APPROX(annual_income, 0.5) AS median_income
        FROM customers
    """).collect()[0]
    
    print("\n" + "="*70)
    print("👥 CUSTOMER SEGMENTATION ANALYSIS")
    print("="*70)
    
    print(f"\n💰 INCOME DISTRIBUTION:")
    for row in results:
        band = row['income_band']
        count = row['customer_count']
        pct = (count / total_customers) * 100
        
        # Define income range
        if band == 'Low':
            range_txt = "< $50,000"
        elif band == 'Medium':
            range_txt = "$50,000 - $90,000"
        else:
            range_txt = "> $90,000"
        
        bar = "█" * int(pct / 2)  # Visual bar chart
        print(f"   {band:8s} ({range_txt:20s}): {count:>8,} ({pct:>5.1f}%) {bar}")
    
    print(f"\n📊 INCOME STATISTICS:")
    print(f"   • Total customers:  {total_customers:>10,}")
    print(f"   • Average income:   ${income_stats['avg_income']:>10,.2f}")
    print(f"   • Median income:    ${income_stats['median_income']:>10,.2f}")
    print(f"   • Income range:     ${income_stats['min_income']:>10,.2f} - ${income_stats['max_income']:>10,.2f}")
    
    # Age distribution
    age_stats = spark.sql("""
        SELECT 
            AVG(age) AS avg_age,
            MIN(age) AS min_age,
            MAX(age) AS max_age
        FROM customers
    """).collect()[0]
    
    print(f"\n👤 AGE PROFILE:")
    print(f"   • Average age:      {age_stats['avg_age']:>10.1f} years")
    print(f"   • Age range:        {age_stats['min_age']:>10.0f} - {age_stats['max_age']:.0f} years")
    
    # Marketing insights
    high_income = next((r for r in results if r['income_band'] == 'High'), None)
    high_income_pct = (high_income['customer_count'] / total_customers * 100) if high_income else 0
    
    print(f"\n🎯 MARKETING INSIGHTS:")
    if high_income_pct > 35:
        print(f"   💎 Strong high-income segment ({high_income_pct:.1f}%)")
        print(f"   📌 Target for luxury/premium models & EV SUV campaign")
    elif high_income_pct > 25:
        print(f"   ✅ Balanced income distribution")
        print(f"   📌 Diversified product portfolio recommended")
    else:
        print(f"   💵 Price-sensitive market ({100-high_income_pct:.1f}% under $90K)")
        print(f"   📌 Focus on value propositions & financing options")
    
    print("="*70 + "\n")
else:
    print("⚠️ No customer data found")

In [2]:
# Spark validation helper (run after generation)
tables=['models','dealers','customers','incentives','inventory','car_sales']
for t in tables:
  try:
    cnt=spark.read.format('delta').load(f'Tables/{t}').count()
    print(f'{t:12s} -> {cnt:,} rows')
  except Exception as e:
    print(f'{t:12s} -> MISSING ({e})')

StatementMeta(, 00827c82-e21a-4066-bbe6-0d1aa33d7286, 4, Finished, Available, Finished)

models       -> 600 rows
dealers      -> 1,200 rows
customers    -> 1,000,000 rows
incentives   -> 9,600 rows
inventory    -> 17,280,000 rows
car_sales    -> 17,280,000 rows
