In [0]:
# Specify the catalog and schema you want to work with as variables
CATALOG_NAME = "dbdemos_atit" 
SCHEMA_NAME = "customer360_telco"

spark.sql(f"USE CATALOG {CATALOG_NAME}")
spark.sql(f"USE SCHEMA {SCHEMA_NAME}")

# Optional: verify the current context
display(spark.sql("SELECT current_catalog() AS catalog, current_database() AS schema"))

In [0]:
# Install required packages
%pip install faker

In [0]:
import pandas as pd
import numpy as np
import random
from datetime import datetime, timedelta
import uuid
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.types import *
import matplotlib.pyplot as plt
import seaborn as sns

# Set random seeds for reproducibility
np.random.seed(42)
random.seed(42)

# Configuration
NUM_CUSTOMERS = 10000  # Number of synthetic customers to generate
DATA_START_DATE = datetime(2023, 1, 1)
DATA_END_DATE = datetime(2024, 11, 1)

print(f"Synthetic Telco Data Generation Framework Initialized")
print(f"Target customers: {NUM_CUSTOMERS:,}")
print(f"Data period: {DATA_START_DATE.strftime('%Y-%m-%d')} to {DATA_END_DATE.strftime('%Y-%m-%d')}")
print(f"Random seed set to 42 for reproducibility")

In [0]:
from faker import Faker
fake = Faker()
Faker.seed(42)

# Generate customer demographics
def generate_customer_demographics(num_customers):
    customers = []
    
    # Define realistic distributions
    age_weights = [0.15, 0.25, 0.30, 0.20, 0.08, 0.02]  # 18-25, 26-35, 36-45, 46-55, 56-75, +75
    gender_weights = [0.505, 0.495]  # Female, Male
    employment_types = ['Employed', 'Self-Employed', 'Student', 'Unemployed', 'Retired']
    employment_weights = [0.45, 0.20, 0.15, 0.15, 0.05]
    
    # Cities and regions (Philippines)
    cities = ['Manila', 'Quezon City', 'Davao', 'Cebu City', 'Zamboanga', 'Antipolo', 'Pasig', 'Taguig', 'Cagayan de Oro', 'Paranaque', 'Makati', 'Iloilo City']
    
    for i in range(num_customers):
        # Generate customer ID
        customer_id = f"CUST_{str(i+1).zfill(8)}"
        
        # Generate phone number (Philippines format)
        phone_number = f"+63{random.randint(9000000000, 9999999999)}"
        
        # Age distribution
        age_group = np.random.choice(range(6), p=age_weights)
        if age_group == 0:
            age = random.randint(18, 25)
        elif age_group == 1:
            age = random.randint(26, 35)
        elif age_group == 2:
            age = random.randint(36, 45)
        elif age_group == 3:
            age = random.randint(46, 55)
        elif age_group == 4:
            age = random.randint(56, 75)
        else:
            age = random.randint(76, 105)
        
        
        # Gender
        gender = np.random.choice(['Female', 'Male'], p=gender_weights)
        
        # Employment status
        employment_status = np.random.choice(employment_types, p=employment_weights)
        
        # SIM age (how long they've been a customer)
        sim_age_days = random.randint(30, 2000)  # 1 month to ~5.5 years
        sim_registration_date = DATA_END_DATE - timedelta(days=sim_age_days)
        
        # Location
        city = random.choice(cities)
        
        # Income estimation based on employment and age (Philippine Peso)
        if employment_status == 'Employed':
            base_income = random.randint(25000, 150000)  # PHP 25k-150k monthly
        elif employment_status == 'Self-Employed':
            base_income = random.randint(15000, 100000)  # PHP 15k-100k monthly
        elif employment_status == 'Student':
            base_income = random.randint(0, 20000)  # PHP 0-20k monthly
        elif employment_status == 'Retired':
            base_income = random.randint(10000, 50000)  # PHP 10k-50k monthly
        else:  # Unemployed
            base_income = random.randint(0, 15000)  # PHP 0-15k monthly
        
        # Adjust income by age (experience factor)
        if age > 35:
            base_income = int(base_income * random.uniform(1.1, 1.5))
        
        customers.append({
            'customer_id': customer_id,
            'phone_number': phone_number,
            'age': age,
            'gender': gender,
            'employment_status': employment_status,
            'estimated_monthly_income': base_income,
            'city': city,
            'sim_registration_date': sim_registration_date,
            'sim_age_days': sim_age_days,
            'customer_segment': 'Premium' if base_income > 80000 else 'Standard' if base_income > 35000 else 'Basic'
        })
    
    return pd.DataFrame(customers)

# Generate the customer base
customers_df = generate_customer_demographics(NUM_CUSTOMERS)

print(f"Generated {len(customers_df):,} customer profiles")
print(f"\nCustomer Demographics Summary:")
print(f"Age range: {customers_df['age'].min()} - {customers_df['age'].max()} years")
print(f"Gender distribution:\n{customers_df['gender'].value_counts()}")
print(f"\nEmployment status distribution:\n{customers_df['employment_status'].value_counts()}")
print(f"\nCustomer segments:\n{customers_df['customer_segment'].value_counts()}")

# Display sample
display(customers_df.head(10))

In [0]:
# Generate device and handset data
def generate_device_data(customers_df):
    devices = []
    
    # Device brands and models with price tiers
    device_data = {
        'Premium': {
            'brands': ['iPhone', 'Samsung Galaxy S', 'Google Pixel', 'OnePlus'],
            'models': {
                'iPhone': ['iPhone 15 Pro', 'iPhone 14 Pro', 'iPhone 13 Pro', 'iPhone 12 Pro'],
                'Samsung Galaxy S': ['Galaxy S24 Ultra', 'Galaxy S23 Ultra', 'Galaxy S22 Ultra', 'Galaxy S21 Ultra'],
                'Google Pixel': ['Pixel 8 Pro', 'Pixel 7 Pro', 'Pixel 6 Pro'],
                'OnePlus': ['OnePlus 12', 'OnePlus 11', 'OnePlus 10 Pro']
            },
            'os_versions': {
                'iPhone': ['iOS 17.1', 'iOS 16.7', 'iOS 15.8'],
                'Samsung Galaxy S': ['Android 14', 'Android 13', 'Android 12'],
                'Google Pixel': ['Android 14', 'Android 13', 'Android 12'],
                'OnePlus': ['Android 14', 'Android 13', 'Android 12']
            },
            'price_range': (40000, 120000)  # PHP 40k-120k
        },
        'Standard': {
            'brands': ['Samsung Galaxy A', 'Xiaomi', 'Oppo', 'Vivo', 'Realme'],
            'models': {
                'Samsung Galaxy A': ['Galaxy A54', 'Galaxy A34', 'Galaxy A24', 'Galaxy A14'],
                'Xiaomi': ['Redmi Note 12', 'Redmi Note 11', 'Mi 11 Lite', 'Redmi 10'],
                'Oppo': ['Oppo A78', 'Oppo A58', 'Oppo A38', 'Oppo A18'],
                'Vivo': ['Vivo Y36', 'Vivo Y27', 'Vivo Y16', 'Vivo Y02'],
                'Realme': ['Realme C55', 'Realme C35', 'Realme C25', 'Realme C21']
            },
            'os_versions': {
                'Samsung Galaxy A': ['Android 13', 'Android 12', 'Android 11'],
                'Xiaomi': ['MIUI 14', 'MIUI 13', 'MIUI 12'],
                'Oppo': ['ColorOS 13', 'ColorOS 12', 'ColorOS 11'],
                'Vivo': ['Funtouch OS 13', 'Funtouch OS 12'],
                'Realme': ['Realme UI 4.0', 'Realme UI 3.0']
            },
            'price_range': (8000, 40000)  # PHP 8k-40k
        },
        'Basic': {
            'brands': ['Samsung Galaxy M', 'Infinix', 'Tecno', 'Cherry Mobile', 'Nokia'],
            'models': {
                'Samsung Galaxy M': ['Galaxy M14', 'Galaxy M04', 'Galaxy A04'],
                'Infinix': ['Infinix Hot 30', 'Infinix Smart 7', 'Infinix Hot 20'],
                'Tecno': ['Tecno Spark 10', 'Tecno Pop 7', 'Tecno Camon 20'],
                'Cherry Mobile': ['Flare S9', 'Aqua S10', 'Desire R8'],
                'Nokia': ['Nokia C32', 'Nokia C12', 'Nokia G21']
            },
            'os_versions': {
                'Samsung Galaxy M': ['Android 12', 'Android 11', 'Android 10'],
                'Infinix': ['Android 12', 'Android 11'],
                'Tecno': ['Android 12', 'Android 11'],
                'Cherry Mobile': ['Android 11', 'Android 10'],
                'Nokia': ['Android 12', 'Android 11']
            },
            'price_range': (3000, 15000)  # PHP 3k-15k
        }
    }
    
    for _, customer in customers_df.iterrows():
        segment = customer['customer_segment']
        
        # Select device tier based on customer segment with some variation
        if segment == 'Premium':
            # 70% premium devices, 25% standard, 5% basic
            device_tier = np.random.choice(['Premium', 'Standard', 'Basic'], p=[0.70, 0.25, 0.05])
        elif segment == 'Standard':
            # 10% premium, 70% standard, 20% basic
            device_tier = np.random.choice(['Premium', 'Standard', 'Basic'], p=[0.10, 0.70, 0.20])
        else:  # Basic
            # 2% premium, 28% standard, 70% basic
            device_tier = np.random.choice(['Premium', 'Standard', 'Basic'], p=[0.02, 0.28, 0.70])
        
        # Select brand and model
        brand = random.choice(device_data[device_tier]['brands'])
        model = random.choice(device_data[device_tier]['models'][brand])
        os_version = random.choice(device_data[device_tier]['os_versions'][brand])
        
        # Generate device price within tier range
        price_min, price_max = device_data[device_tier]['price_range']
        device_price = random.randint(price_min, price_max)
        
        # Device age (how old the device is)
        device_age_months = random.randint(1, 36)  # 1 month to 3 years
        purchase_date = DATA_END_DATE - timedelta(days=device_age_months * 30)
        
        # Storage capacity based on device tier
        if device_tier == 'Premium':
            storage_gb = random.choice([128, 256, 512, 1024])
        elif device_tier == 'Standard':
            storage_gb = random.choice([64, 128, 256])
        else:
            storage_gb = random.choice([32, 64, 128])
        
        # RAM based on device tier
        if device_tier == 'Premium':
            ram_gb = random.choice([8, 16, 32])
        elif device_tier == 'Standard':
            ram_gb = random.choice([4, 8, 16])
        else:
            ram_gb = random.choice([2, 4, 8])
        
        devices.append({
            'customer_id': customer['customer_id'],
            'device_brand': brand,
            'device_model': model,
            'device_tier': device_tier,
            'os_version': os_version,
            'device_price_php': device_price,
            'storage_gb': storage_gb,
            'ram_gb': ram_gb,
            'device_age_months': device_age_months,
            'purchase_date': purchase_date,
            'is_smartphone': True,  # Assuming all are smartphones
            'has_dual_sim': random.choice([True, False]),
            'supports_5g': device_tier in ['Premium', 'Standard'] and random.random() > 0.3
        })
    
    return pd.DataFrame(devices)

# Generate device data
devices_df = generate_device_data(customers_df)

print(f"Generated device data for {len(devices_df):,} customers")
print(f"\nDevice Distribution by Tier:")
print(devices_df['device_tier'].value_counts())
print(f"\nTop Device Brands:")
print(devices_df['device_brand'].value_counts().head(10))
print(f"\n5G Support: {devices_df['supports_5g'].sum():,} devices ({devices_df['supports_5g'].mean()*100:.1f}%)")

# Display sample
display(devices_df.head(10))

In [0]:
# Generate Call Detail Records (CDR)
def generate_cdr_data(customers_df, num_days=30):
    cdr_records = []
    
    # Call patterns by customer segment and demographics
    call_patterns = {
        'Premium': {'daily_calls': (8, 25), 'avg_duration': (180, 600)},  # 3-10 min calls
        'Standard': {'daily_calls': (4, 15), 'avg_duration': (120, 480)},  # 2-8 min calls
        'Basic': {'daily_calls': (2, 8), 'avg_duration': (60, 300)}       # 1-5 min calls
    }
    
    # Call types and their probabilities
    call_types = ['Voice', 'Video', 'Conference']
    call_type_weights = [0.75, 0.20, 0.05]  # Most calls are voice
    
    # Time patterns (hour of day weights) - normalized to sum to 1.0
    hourly_weights = np.array([
        0.01, 0.01, 0.01, 0.01, 0.01, 0.02,  # 0-5 AM (low activity)
        0.03, 0.05, 0.07, 0.08, 0.09, 0.10,  # 6-11 AM (morning peak)
        0.09, 0.08, 0.07, 0.06, 0.07, 0.08,  # 12-5 PM (afternoon)
        0.09, 0.10, 0.08, 0.06, 0.04, 0.02   # 6-11 PM (evening peak)
    ])
    # Normalize to ensure sum equals 1.0
    hourly_weights = hourly_weights / hourly_weights.sum()
    
    for _, customer in customers_df.iterrows():
        customer_id = customer['customer_id']
        segment = customer['customer_segment']
        age = customer['age']
        employment = customer['employment_status']
        
        # Adjust call patterns based on demographics
        base_calls = call_patterns[segment]['daily_calls']
        base_duration = call_patterns[segment]['avg_duration']
        
        # Age adjustments
        if age < 25:  # Young people call more
            daily_calls_range = (base_calls[0] + 2, base_calls[1] + 5)
        elif age > 55:  # Older people call less frequently but longer
            daily_calls_range = (builtins.max(1, base_calls[0] - 2), base_calls[1] - 2)
            base_duration = (base_duration[0] + 60, base_duration[1] + 120)
        else:
            daily_calls_range = base_calls
        
        # Employment adjustments
        if employment == 'Employed':
            # More calls during business hours
            business_hour_boost = 1.3
        elif employment == 'Student':
            # More calls in evening
            daily_calls_range = (daily_calls_range[0] + 1, daily_calls_range[1] + 3)
        else:
            business_hour_boost = 1.0
        
        # Generate calls for the specified period
        for day in range(num_days):
            call_date = DATA_END_DATE - timedelta(days=num_days - day - 1)
            
            # Skip some days randomly (not everyone calls every day)
            if random.random() < 0.15:  # 15% chance of no calls on a given day
                continue
            
            # Determine number of calls for this day
            daily_calls = random.randint(*daily_calls_range)
            
            # Weekend adjustment (fewer calls on weekends for employed people)
            if call_date.weekday() >= 5 and employment == 'Employed':
                daily_calls = builtins.max(1, int(daily_calls * 0.7))
            
            for call_num in range(daily_calls):
                # Generate call time
                hour = np.random.choice(range(24), p=hourly_weights)
                minute = random.randint(0, 59)
                second = random.randint(0, 59)
                
                call_datetime = call_date.replace(hour=hour, minute=minute, second=second)
                
                # Generate called number (simplified)
                called_number = f"+63{random.randint(9000000000, 9999999999)}"
                
                # Call type
                call_type = np.random.choice(call_types, p=call_type_weights)
                
                # Call duration (in seconds)
                if call_type == 'Video':
                    duration_multiplier = 1.5  # Video calls tend to be longer
                elif call_type == 'Conference':
                    duration_multiplier = 2.0  # Conference calls are longest
                else:
                    duration_multiplier = 1.0
                
                duration_seconds = int(random.randint(*base_duration) * duration_multiplier)
                
                # Call direction
                direction = random.choice(['Outgoing', 'Incoming'])
                
                # Call status
                call_status = np.random.choice(
                    ['Completed', 'Missed', 'Busy', 'Failed'], 
                    p=[0.75, 0.15, 0.07, 0.03]
                )
                
                # If call was not completed, duration should be 0 or very short
                if call_status != 'Completed':
                    duration_seconds = random.randint(0, 30)
                
                # Location (cell tower)
                tower_id = f"TOWER_{random.randint(1000, 9999)}"
                
                cdr_records.append({
                    'customer_id': customer_id,
                    'call_datetime': call_datetime,
                    'called_number': called_number,
                    'call_type': call_type,
                    'direction': direction,
                    'duration_seconds': duration_seconds,
                    'call_status': call_status,
                    'tower_id': tower_id,
                    'date': call_date.date(),
                    'hour': hour,
                    'day_of_week': call_date.strftime('%A')
                })
    
    return pd.DataFrame(cdr_records)

# Import builtins to avoid PySpark function conflicts
import builtins

# Generate CDR data for the last 30 days
print("Generating CDR data... (this may take a moment)")
cdr_df = generate_cdr_data(customers_df, num_days=30)

print(f"Generated {len(cdr_df):,} call records")
print(f"\nCall Statistics:")
print(f"Total customers with calls: {cdr_df['customer_id'].nunique():,}")
print(f"Average calls per customer: {len(cdr_df) / cdr_df['customer_id'].nunique():.1f}")
print(f"\nCall Status Distribution:")
print(cdr_df['call_status'].value_counts())
print(f"\nCall Type Distribution:")
print(cdr_df['call_type'].value_counts())
print(f"\nAverage call duration: {cdr_df[cdr_df['call_status'] == 'Completed']['duration_seconds'].mean():.1f} seconds")

# Display sample
display(cdr_df.head(10))

In [0]:
# Generate SMS Usage Patterns
def generate_sms_data(customers_df, num_days=30):
    sms_records = []
    
    # SMS patterns by customer segment and demographics
    sms_patterns = {
        'Premium': {'daily_sms': (15, 50), 'peak_hours': [9, 12, 18, 21]},
        'Standard': {'daily_sms': (8, 30), 'peak_hours': [8, 12, 17, 20]},
        'Basic': {'daily_sms': (3, 15), 'peak_hours': [7, 12, 18, 19]}
    }
    
    # SMS types and their probabilities
    sms_types = ['Personal', 'Business', 'OTP', 'Marketing', 'Promotional']
    sms_type_weights = [0.45, 0.20, 0.15, 0.12, 0.08]
    
    # Time patterns for SMS (different from calls)
    hourly_weights = np.array([
        0.005, 0.002, 0.001, 0.001, 0.002, 0.01,  # 0-5 AM (very low)
        0.03, 0.06, 0.08, 0.09, 0.08, 0.07,      # 6-11 AM (morning)
        0.06, 0.05, 0.04, 0.05, 0.06, 0.08,      # 12-5 PM (afternoon)
        0.09, 0.12, 0.10, 0.08, 0.05, 0.02       # 6-11 PM (evening peak)
    ])
    hourly_weights = hourly_weights / hourly_weights.sum()
    
    for _, customer in customers_df.iterrows():
        customer_id = customer['customer_id']
        segment = customer['customer_segment']
        age = customer['age']
        employment = customer['employment_status']
        
        # Adjust SMS patterns based on demographics
        base_sms = sms_patterns[segment]['daily_sms']
        
        # Age adjustments for SMS behavior
        if age < 30:  # Younger people text more
            daily_sms_range = (base_sms[0] + 5, base_sms[1] + 15)
        elif age > 60:  # Older people text less
            daily_sms_range = (builtins.max(1, base_sms[0] - 3), base_sms[1] - 5)
        else:
            daily_sms_range = base_sms
        
        # Employment adjustments
        if employment == 'Student':
            # Students text significantly more
            daily_sms_range = (daily_sms_range[0] + 8, daily_sms_range[1] + 20)
        elif employment == 'Employed':
            # Business SMS during work hours
            business_sms_boost = 1.2
        
        # Generate SMS for the specified period
        for day in range(num_days):
            sms_date = DATA_END_DATE - timedelta(days=num_days - day - 1)
            
            # Skip some days randomly (SMS usage varies)
            if random.random() < 0.08:  # 8% chance of no SMS on a given day
                continue
            
            # Determine number of SMS for this day
            daily_sms = random.randint(*daily_sms_range)
            
            # Weekend adjustment (more personal SMS on weekends)
            if sms_date.weekday() >= 5:
                if employment == 'Student':
                    daily_sms = int(daily_sms * 1.3)  # Students text more on weekends
                else:
                    daily_sms = int(daily_sms * 0.9)  # Others text slightly less
            
            for sms_num in range(daily_sms):
                # Generate SMS time
                hour = np.random.choice(range(24), p=hourly_weights)
                minute = random.randint(0, 59)
                second = random.randint(0, 59)
                
                sms_datetime = sms_date.replace(hour=hour, minute=minute, second=second)
                
                # Generate recipient number
                recipient_number = f"+63{random.randint(9000000000, 9999999999)}"
                
                # SMS type based on time and customer profile
                if 9 <= hour <= 17 and employment == 'Employed':
                    # More business SMS during work hours
                    sms_type = np.random.choice(sms_types, p=[0.25, 0.40, 0.15, 0.12, 0.08])
                elif hour >= 22 or hour <= 6:
                    # Mostly personal SMS late night/early morning
                    sms_type = np.random.choice(sms_types, p=[0.70, 0.05, 0.15, 0.05, 0.05])
                else:
                    # Normal distribution
                    sms_type = np.random.choice(sms_types, p=sms_type_weights)
                
                # SMS direction
                direction = random.choice(['Sent', 'Received'])
                
                # SMS status
                sms_status = np.random.choice(
                    ['Delivered', 'Failed', 'Pending'], 
                    p=[0.92, 0.06, 0.02]
                )
                
                # Message length (characters)
                if sms_type == 'OTP':
                    message_length = random.randint(20, 50)
                elif sms_type == 'Marketing':
                    message_length = random.randint(100, 160)
                elif sms_type == 'Business':
                    message_length = random.randint(50, 140)
                else:  # Personal
                    message_length = random.randint(10, 120)
                
                # Cost (in PHP centavos)
                if direction == 'Sent':
                    if sms_type in ['Marketing', 'Promotional']:
                        cost_centavos = 0  # Usually free/bundled
                    else:
                        cost_centavos = random.randint(100, 150)  # 1-1.5 PHP
                else:
                    cost_centavos = 0  # Receiving is free
                
                sms_records.append({
                    'customer_id': customer_id,
                    'sms_datetime': sms_datetime,
                    'recipient_number': recipient_number,
                    'sms_type': sms_type,
                    'direction': direction,
                    'message_length': message_length,
                    'sms_status': sms_status,
                    'cost_centavos': cost_centavos,
                    'date': sms_date.date(),
                    'hour': hour,
                    'day_of_week': sms_date.strftime('%A')
                })
    
    return pd.DataFrame(sms_records)

# Generate SMS data for the last 30 days
print("Generating SMS usage data... (this may take a moment)")
sms_df = generate_sms_data(customers_df, num_days=30)

print(f"Generated {len(sms_df):,} SMS records")
print(f"\nSMS Statistics:")
print(f"Total customers with SMS: {sms_df['customer_id'].nunique():,}")
print(f"Average SMS per customer: {len(sms_df) / sms_df['customer_id'].nunique():.1f}")
print(f"\nSMS Status Distribution:")
print(sms_df['sms_status'].value_counts())
print(f"\nSMS Type Distribution:")
print(sms_df['sms_type'].value_counts())
print(f"\nDirection Distribution:")
print(sms_df['direction'].value_counts())
print(f"\nAverage message length: {sms_df['message_length'].mean():.1f} characters")
print(f"Total SMS cost: PHP {sms_df['cost_centavos'].sum() / 100:.2f}")

# Display sample
display(sms_df.head(10))

In [0]:
# Generate Data Usage Patterns
def generate_data_usage(customers_df, devices_df, num_days=30):
    data_usage_records = []
    
    # Merge customer and device data
    customer_device_df = customers_df.merge(devices_df, on='customer_id')
    
    # Data usage patterns by customer segment
    usage_patterns = {
        'Premium': {'daily_mb': (500, 2000), 'apps_per_day': (8, 15)},
        'Standard': {'daily_mb': (200, 800), 'apps_per_day': (5, 10)},
        'Basic': {'daily_mb': (50, 300), 'apps_per_day': (3, 7)}
    }
    
    # Popular apps and their data consumption patterns
    apps_data = {
        'Social Media': {
            'apps': ['Facebook', 'Instagram', 'TikTok', 'Twitter', 'Snapchat'],
            'mb_per_session': (5, 50),
            'sessions_per_day': (3, 12),
            'peak_hours': [12, 18, 20, 21]
        },
        'Messaging': {
            'apps': ['WhatsApp', 'Messenger', 'Telegram', 'Viber'],
            'mb_per_session': (1, 10),
            'sessions_per_day': (5, 20),
            'peak_hours': [9, 12, 17, 20]
        },
        'Entertainment': {
            'apps': ['YouTube', 'Netflix', 'Spotify', 'TikTok', 'Twitch'],
            'mb_per_session': (20, 200),
            'sessions_per_day': (2, 8),
            'peak_hours': [19, 20, 21, 22]
        },
        'Gaming': {
            'apps': ['Mobile Legends', 'PUBG Mobile', 'Call of Duty Mobile', 'Genshin Impact'],
            'mb_per_session': (10, 100),
            'sessions_per_day': (1, 6),
            'peak_hours': [18, 19, 20, 21]
        },
        'Productivity': {
            'apps': ['Gmail', 'Google Drive', 'Microsoft Office', 'Zoom', 'Teams'],
            'mb_per_session': (2, 30),
            'sessions_per_day': (2, 8),
            'peak_hours': [9, 10, 14, 16]
        },
        'E-commerce': {
            'apps': ['Shopee', 'Lazada', 'GrabFood', 'Foodpanda'],
            'mb_per_session': (3, 25),
            'sessions_per_day': (1, 5),
            'peak_hours': [12, 18, 19, 20]
        }
    }
    
    for _, customer in customer_device_df.iterrows():
        customer_id = customer['customer_id']
        segment = customer['customer_segment']
        age = customer['age']
        employment = customer['employment_status']
        device_tier = customer['device_tier']
        supports_5g = customer['supports_5g']
        
        # Adjust usage based on demographics
        base_usage = usage_patterns[segment]['daily_mb']
        base_apps = usage_patterns[segment]['apps_per_day']
        
        # Age adjustments
        if age < 25:  # Young people use more data
            daily_mb_range = (base_usage[0] + 100, base_usage[1] + 500)
            apps_range = (base_apps[0] + 2, base_apps[1] + 5)
        elif age > 60:  # Older people use less data
            daily_mb_range = (builtins.max(10, base_usage[0] - 100), builtins.max(base_usage[0], base_usage[1] - 200))
            apps_range = (builtins.max(1, base_apps[0] - 2), builtins.max(base_apps[0], base_apps[1] - 3))
        else:
            daily_mb_range = base_usage
            apps_range = base_apps
        
        # Device tier adjustments
        if device_tier == 'Premium':
            daily_mb_range = (daily_mb_range[0] + 200, daily_mb_range[1] + 800)
        elif device_tier == 'Basic':
            daily_mb_range = (builtins.max(10, daily_mb_range[0] - 50), builtins.max(daily_mb_range[0], daily_mb_range[1] - 100))
        
        # 5G boost
        if supports_5g:
            daily_mb_range = (daily_mb_range[0] + 100, daily_mb_range[1] + 400)
        
        # Ensure valid range
        daily_mb_range = (builtins.min(daily_mb_range), builtins.max(daily_mb_range))
        apps_range = (builtins.min(apps_range), builtins.max(apps_range))
        
        # Generate data usage for each day
        for day in range(num_days):
            usage_date = DATA_END_DATE - timedelta(days=num_days - day - 1)
            
            # Some days have no usage (phone off, no connectivity)
            if random.random() < 0.05:  # 5% chance of no usage
                continue
            
            # Determine daily data usage
            daily_mb = random.randint(*daily_mb_range)
            
            # Weekend adjustment
            if usage_date.weekday() >= 5:
                if employment == 'Student':
                    daily_mb = int(daily_mb * 1.4)  # Students use more on weekends
                elif employment == 'Employed':
                    daily_mb = int(daily_mb * 0.8)  # Less work-related usage
            
            # Distribute usage across apps
            num_apps = random.randint(*apps_range)
            remaining_mb = daily_mb
            
            for app_session in range(num_apps):
                if remaining_mb <= 0:
                    break
                
                # Select app category based on user profile
                if age < 25:
                    category_weights = [0.35, 0.15, 0.25, 0.15, 0.05, 0.05]  # More social/entertainment
                elif employment == 'Employed':
                    category_weights = [0.20, 0.20, 0.15, 0.05, 0.30, 0.10]  # More productivity
                elif employment == 'Student':
                    category_weights = [0.30, 0.20, 0.30, 0.10, 0.05, 0.05]  # Social/entertainment
                else:
                    category_weights = [0.25, 0.25, 0.20, 0.10, 0.10, 0.10]  # Balanced
                
                categories = list(apps_data.keys())
                category = np.random.choice(categories, p=category_weights)
                
                # Select specific app
                app_name = random.choice(apps_data[category]['apps'])
                
                # Generate session details
                mb_range = apps_data[category]['mb_per_session']
                session_mb = builtins.min(remaining_mb, random.randint(*mb_range))
                
                # Generate session time (prefer peak hours)
                peak_hours = apps_data[category]['peak_hours']
                if random.random() < 0.6:  # 60% chance of peak hour usage
                    hour = random.choice(peak_hours)
                else:
                    hour = random.randint(6, 23)  # Avoid very late night
                
                minute = random.randint(0, 59)
                session_datetime = usage_date.replace(hour=hour, minute=minute)
                
                # Session duration (minutes)
                if category == 'Entertainment':
                    duration_minutes = random.randint(10, 120)
                elif category == 'Gaming':
                    duration_minutes = random.randint(15, 90)
                elif category == 'Social Media':
                    duration_minutes = random.randint(5, 45)
                else:
                    duration_minutes = random.randint(2, 30)
                
                # Connection type
                if supports_5g and random.random() < 0.4:
                    connection_type = '5G'
                elif random.random() < 0.7:
                    connection_type = '4G'
                else:
                    connection_type = '3G'
                
                data_usage_records.append({
                    'customer_id': customer_id,
                    'session_datetime': session_datetime,
                    'app_name': app_name,
                    'app_category': category,
                    'data_mb': session_mb,
                    'duration_minutes': duration_minutes,
                    'connection_type': connection_type,
                    'date': usage_date.date(),
                    'hour': hour,
                    'day_of_week': usage_date.strftime('%A')
                })
                
                remaining_mb -= session_mb
    
    return pd.DataFrame(data_usage_records)

# Generate data usage patterns
print("Generating data usage patterns... (this may take a moment)")
data_usage_df = generate_data_usage(customers_df, devices_df, num_days=30)

print(f"Generated {len(data_usage_df):,} data usage sessions")
print(f"\nData Usage Statistics:")
print(f"Total customers with data usage: {data_usage_df['customer_id'].nunique():,}")
print(f"Average sessions per customer: {len(data_usage_df) / data_usage_df['customer_id'].nunique():.1f}")
print(f"Total data consumed: {data_usage_df['data_mb'].sum():,.0f} MB ({data_usage_df['data_mb'].sum()/1024:.1f} GB)")
print(f"Average data per session: {data_usage_df['data_mb'].mean():.1f} MB")
print(f"\nTop App Categories:")
print(data_usage_df['app_category'].value_counts())
print(f"\nConnection Type Distribution:")
print(data_usage_df['connection_type'].value_counts())
print(f"\nTop Apps:")
print(data_usage_df['app_name'].value_counts().head(10))

# Display sample
display(data_usage_df.head(10))

In [0]:
# Generate Recharge/Payment History
def generate_recharge_payment_history(customers_df, num_months=12):
    recharge_records = []
    
    # Recharge patterns by customer segment
    recharge_patterns = {
        'Premium': {
            'monthly_recharges': (4, 12),
            'amount_range': (500, 2000),
            'payment_methods': ['Credit Card', 'Bank Transfer', 'Digital Wallet', 'Cash'],
            'method_weights': [0.40, 0.30, 0.25, 0.05]
        },
        'Standard': {
            'monthly_recharges': (2, 8),
            'amount_range': (200, 800),
            'payment_methods': ['Digital Wallet', 'Cash', 'Credit Card', 'Bank Transfer'],
            'method_weights': [0.35, 0.30, 0.25, 0.10]
        },
        'Basic': {
            'monthly_recharges': (1, 4),
            'amount_range': (50, 300),
            'payment_methods': ['Cash', 'Digital Wallet', 'Credit Card'],
            'method_weights': [0.50, 0.35, 0.15]
        }
    }
    
    # Recharge types and their characteristics
    recharge_types = {
        'Regular Load': {'min_amount': 50, 'max_amount': 500, 'validity_days': 30},
        'Data Bundle': {'min_amount': 100, 'max_amount': 1000, 'validity_days': 30},
        'Call/SMS Bundle': {'min_amount': 75, 'max_amount': 400, 'validity_days': 7},
        'Unlimited Bundle': {'min_amount': 300, 'max_amount': 1500, 'validity_days': 30},
        'Emergency Load': {'min_amount': 20, 'max_amount': 100, 'validity_days': 3}
    }
    
    for _, customer in customers_df.iterrows():
        customer_id = customer['customer_id']
        segment = customer['customer_segment']
        age = customer['age']
        employment = customer['employment_status']
        income = customer['estimated_monthly_income']
        
        # Get recharge pattern for this customer segment
        pattern = recharge_patterns[segment]
        
        # Adjust patterns based on demographics
        monthly_recharges_range = pattern['monthly_recharges']
        amount_range = pattern['amount_range']
        
        # Age adjustments
        if age < 25:  # Young people recharge more frequently, smaller amounts
            monthly_recharges_range = (monthly_recharges_range[0] + 2, monthly_recharges_range[1] + 4)
            amount_range = (amount_range[0] - 50, amount_range[1] - 100)
        elif age > 60:  # Older people recharge less frequently, larger amounts
            monthly_recharges_range = (builtins.max(1, monthly_recharges_range[0] - 1), monthly_recharges_range[1] - 1)
            amount_range = (amount_range[0] + 100, amount_range[1] + 200)
        
        # Employment adjustments
        if employment == 'Student':
            amount_range = (builtins.max(20, amount_range[0] - 100), amount_range[1] - 200)
        elif employment == 'Employed' and income > 100000:
            amount_range = (amount_range[0] + 200, amount_range[1] + 500)
        
        # Ensure valid ranges
        amount_range = (builtins.max(20, amount_range[0]), builtins.max(amount_range[0] + 50, amount_range[1]))
        monthly_recharges_range = (builtins.max(1, monthly_recharges_range[0]), builtins.max(monthly_recharges_range[0], monthly_recharges_range[1]))
        
        # Generate payment reliability score (affects delinquency)
        if segment == 'Premium':
            reliability_score = random.uniform(0.85, 0.98)
        elif segment == 'Standard':
            reliability_score = random.uniform(0.70, 0.90)
        else:
            reliability_score = random.uniform(0.50, 0.80)
        
        # Generate recharge history for the specified period
        for month_offset in range(num_months):
            # Calculate the month date
            month_date = DATA_END_DATE - timedelta(days=30 * month_offset)
            
            # Determine number of recharges for this month
            monthly_recharges = random.randint(*monthly_recharges_range)
            
            # Seasonal adjustments (more recharges during holidays)
            if month_date.month in [12, 1]:  # December/January
                monthly_recharges = int(monthly_recharges * 1.3)
            elif month_date.month in [6, 7, 8]:  # Summer months
                monthly_recharges = int(monthly_recharges * 1.1)
            
            for recharge_num in range(monthly_recharges):
                # Generate recharge date within the month
                day_of_month = random.randint(1, 28)  # Safe range for all months
                recharge_date = month_date.replace(day=day_of_month)
                
                # Generate recharge time
                hour = random.randint(6, 23)  # Avoid very early morning
                minute = random.randint(0, 59)
                recharge_datetime = recharge_date.replace(hour=hour, minute=minute)
                
                # Select recharge type based on customer profile
                if segment == 'Premium':
                    type_weights = [0.20, 0.35, 0.15, 0.25, 0.05]
                elif segment == 'Standard':
                    type_weights = [0.30, 0.30, 0.20, 0.15, 0.05]
                else:  # Basic
                    type_weights = [0.45, 0.25, 0.15, 0.10, 0.05]
                
                recharge_type = np.random.choice(list(recharge_types.keys()), p=type_weights)
                
                # Generate recharge amount
                type_info = recharge_types[recharge_type]
                min_amt = builtins.max(type_info['min_amount'], amount_range[0])
                max_amt = builtins.min(type_info['max_amount'], amount_range[1])
                
                if min_amt >= max_amt:
                    recharge_amount = min_amt
                else:
                    recharge_amount = random.randint(min_amt, max_amt)
                
                # Payment method
                payment_method = np.random.choice(pattern['payment_methods'], p=pattern['method_weights'])
                
                # Payment status based on reliability score
                if random.random() < reliability_score:
                    payment_status = 'Successful'
                    payment_delay_days = 0
                else:
                    # Failed or delayed payment
                    if random.random() < 0.7:  # 70% of failures are delays
                        payment_status = 'Delayed'
                        payment_delay_days = random.randint(1, 15)
                    else:
                        payment_status = 'Failed'
                        payment_delay_days = 0
                        recharge_amount = 0  # Failed payments don't add credit
                
                # Transaction ID
                transaction_id = f"TXN_{random.randint(100000000, 999999999)}"
                
                # Channel (where the recharge was made)
                if payment_method == 'Cash':
                    channel = random.choice(['Retail Store', 'Convenience Store', 'Agent'])
                elif payment_method == 'Digital Wallet':
                    channel = random.choice(['GCash', 'PayMaya', 'Mobile App'])
                else:
                    channel = random.choice(['Mobile App', 'Website', 'ATM'])
                
                recharge_records.append({
                    'customer_id': customer_id,
                    'transaction_id': transaction_id,
                    'recharge_datetime': recharge_datetime,
                    'recharge_type': recharge_type,
                    'amount_php': recharge_amount,
                    'payment_method': payment_method,
                    'payment_status': payment_status,
                    'payment_delay_days': payment_delay_days,
                    'channel': channel,
                    'validity_days': type_info['validity_days'],
                    'date': recharge_date.date(),
                    'month': recharge_date.month,
                    'year': recharge_date.year
                })
    
    return pd.DataFrame(recharge_records)

# Generate recharge/payment history for the last 12 months
print("Generating recharge/payment history... (this may take a moment)")
recharge_df = generate_recharge_payment_history(customers_df, num_months=12)

print(f"Generated {len(recharge_df):,} recharge/payment records")
print(f"\nRecharge Statistics:")
print(f"Total customers with recharges: {recharge_df['customer_id'].nunique():,}")
print(f"Average recharges per customer: {len(recharge_df) / recharge_df['customer_id'].nunique():.1f}")
print(f"Total recharge value: PHP {recharge_df['amount_php'].sum():,.2f}")
print(f"Average recharge amount: PHP {recharge_df['amount_php'].mean():.2f}")
print(f"\nPayment Status Distribution:")
print(recharge_df['payment_status'].value_counts())
print(f"\nPayment Method Distribution:")
print(recharge_df['payment_method'].value_counts())
print(f"\nRecharge Type Distribution:")
print(recharge_df['recharge_type'].value_counts())

# Display sample
display(recharge_df.head(10))

In [0]:
# Import builtins to avoid conflicts
import builtins

# Generate Location and Movement Data (optimized for smaller dataset)
def generate_location_movement_data(customers_df, num_days=7):  # Reduced to 7 days
    location_records = []
    
    # Sample only 1000 customers to avoid cluster overload
    sample_customers = customers_df.sample(n=1000, random_state=42)
    
    # Define location types and coordinates for Philippine cities
    location_data = {
        'Manila': {'center_lat': 14.5995, 'center_lon': 120.9842},
        'Quezon City': {'center_lat': 14.6760, 'center_lon': 121.0437},
        'Cebu City': {'center_lat': 10.3157, 'center_lon': 123.8854},
        'Davao': {'center_lat': 7.1907, 'center_lon': 125.4553}
    }
    
    # Default location for other cities
    default_location = {'center_lat': 14.5995, 'center_lon': 120.9842}
    
    for _, customer in sample_customers.iterrows():
        customer_id = customer['customer_id']
        city = customer['city']
        employment = customer['employment_status']
        age = customer['age']
        
        # Get location data for customer's city
        city_data = location_data.get(city, default_location)
        center_lat = city_data['center_lat']
        center_lon = city_data['center_lon']
        
        # Generate consistent home and work locations
        home_lat = center_lat + random.uniform(-0.05, 0.05)
        home_lon = center_lon + random.uniform(-0.05, 0.05)
        
        if employment in ['Employed', 'Self-Employed']:
            work_lat = center_lat + random.uniform(-0.08, 0.08)
            work_lon = center_lon + random.uniform(-0.08, 0.08)
        else:
            work_lat, work_lon = home_lat, home_lon
        
        # Movement patterns based on demographics
        if employment == 'Employed':
            mobility_score = random.uniform(0.7, 1.0)
            work_frequency = 0.8
        elif employment == 'Student':
            mobility_score = random.uniform(0.8, 1.0)
            work_frequency = 0.7
        else:
            mobility_score = random.uniform(0.3, 0.6)
            work_frequency = 0.2
        
        # Age adjustments
        if age < 30:
            mobility_score *= 1.2
        elif age > 60:
            mobility_score *= 0.7
        
        # Generate location data for each day (reduced frequency)
        for day in range(num_days):
            location_date = DATA_END_DATE - timedelta(days=num_days - day - 1)
            is_weekend = location_date.weekday() >= 5
            
            # Reduced number of pings per day
            daily_pings = int(random.randint(5, 15) * mobility_score)
            
            for ping in range(daily_pings):
                # Generate timestamp
                hour = random.randint(6, 23)
                minute = random.randint(0, 59)
                timestamp = location_date.replace(hour=hour, minute=minute)
                
                # Determine location type
                if is_weekend:
                    if 6 <= hour <= 10:
                        location_weights = [0.70, 0.05, 0.10, 0.10, 0.05]
                    elif 11 <= hour <= 18:
                        location_weights = [0.30, 0.10, 0.25, 0.25, 0.10]
                    else:
                        location_weights = [0.60, 0.05, 0.15, 0.15, 0.05]
                else:
                    if 9 <= hour <= 17 and random.random() < work_frequency:
                        location_weights = [0.10, 0.70, 0.05, 0.10, 0.05]
                    else:
                        location_weights = [0.60, 0.10, 0.10, 0.10, 0.10]
                
                location_types = ['Home', 'Work', 'Shopping', 'Restaurant', 'Transport']
                location_type = np.random.choice(location_types, p=location_weights)
                
                # Generate coordinates
                if location_type == 'Home':
                    lat = home_lat + random.uniform(-0.001, 0.001)
                    lon = home_lon + random.uniform(-0.001, 0.001)
                elif location_type == 'Work':
                    lat = work_lat + random.uniform(-0.002, 0.002)
                    lon = work_lon + random.uniform(-0.002, 0.002)
                else:
                    lat = center_lat + random.uniform(-0.05, 0.05)
                    lon = center_lon + random.uniform(-0.05, 0.05)
                
                # Generate other attributes
                tower_id = f"TOWER_{random.randint(1000, 9999)}"
                signal_strength = random.randint(-100, -50)
                
                if location_type == 'Transport':
                    speed_kmh = random.randint(20, 80)
                elif location_type in ['Home', 'Work']:
                    speed_kmh = random.randint(0, 5)
                else:
                    speed_kmh = random.randint(0, 15)
                
                location_records.append({
                    'customer_id': customer_id,
                    'timestamp': timestamp,
                    'latitude': builtins.round(lat, 6),
                    'longitude': builtins.round(lon, 6),
                    'location_type': location_type,
                    'tower_id': tower_id,
                    'signal_strength_dbm': signal_strength,
                    'speed_kmh': speed_kmh,
                    'city': city,
                    'date': location_date.date(),
                    'hour': hour,
                    'day_of_week': location_date.strftime('%A'),
                    'is_weekend': is_weekend
                })
    
    return pd.DataFrame(location_records)

# Generate location and movement data (reduced dataset)
print("Generating location and movement data... (optimized for cluster stability)")
location_df = generate_location_movement_data(customers_df, num_days=7)

print(f"Generated {len(location_df):,} location records")
print(f"\nLocation Statistics:")
print(f"Total customers with location data: {location_df['customer_id'].nunique():,}")
print(f"Average location pings per customer: {len(location_df) / location_df['customer_id'].nunique():.1f}")
print(f"\nLocation Type Distribution:")
print(location_df['location_type'].value_counts())
print(f"\nAverage signal strength: {location_df['signal_strength_dbm'].mean():.1f} dBm")
print(f"Average movement speed: {location_df['speed_kmh'].mean():.1f} km/h")

# Display sample
display(location_df.head(10))

In [0]:
# Import required libraries
import pandas as pd
import numpy as np
import random
from datetime import datetime, timedelta
import builtins

# Set random seed for reproducibility
random.seed(42)
np.random.seed(42)

# Redefine catalog and schema names
CATALOG_NAME = "dbdemos_atit"
SCHEMA_NAME = "customer360_telco"

# Reload customer data from the table we created
print("Reloading customer data from existing table...")
customers_spark_df = spark.table(f"{CATALOG_NAME}.{SCHEMA_NAME}.customers")
customers_df = customers_spark_df.toPandas()

print(f"Reloaded {len(customers_df):,} customer records")
print(f"Customer segments: {customers_df['customer_segment'].value_counts().to_dict()}")

# Redefine date constants
DATA_END_DATE = datetime(2024, 11, 1)

# Generate Contact Network Data (simplified)
def generate_contact_network_data(customers_df, num_contacts=5):
    contact_records = []
    
    # Sample customers to avoid memory issues
    sample_customers = customers_df.sample(n=2000, random_state=42)
    
    for _, customer in sample_customers.iterrows():
        customer_id = customer['customer_id']
        segment = customer['customer_segment']
        age = customer['age']
        
        # Number of frequent contacts based on segment
        if segment == 'Premium':
            num_frequent_contacts = random.randint(8, 15)
        elif segment == 'Standard':
            num_frequent_contacts = random.randint(5, 10)
        else:
            num_frequent_contacts = random.randint(3, 7)
        
        # Generate frequent contacts
        for i in range(num_frequent_contacts):
            contact_number = f"+63{random.randint(9000000000, 9999999999)}"
            
            # Contact relationship
            if i < 2:  # First 2 are family
                relationship = 'Family'
                contact_frequency = random.randint(20, 50)  # calls per month
            elif i < 5:  # Next 3 are friends
                relationship = 'Friend'
                contact_frequency = random.randint(10, 30)
            else:  # Rest are colleagues/others
                relationship = random.choice(['Colleague', 'Business', 'Other'])
                contact_frequency = random.randint(5, 20)
            
            # Call duration patterns
            avg_call_duration = random.randint(120, 600)  # seconds
            
            # SMS frequency
            sms_frequency = random.randint(5, 30)  # SMS per month
            
            contact_records.append({
                'customer_id': customer_id,
                'contact_number': contact_number,
                'relationship': relationship,
                'call_frequency_monthly': contact_frequency,
                'avg_call_duration_seconds': avg_call_duration,
                'sms_frequency_monthly': sms_frequency,
                'contact_strength': random.uniform(0.3, 1.0),  # 0-1 scale
                'first_contact_date': DATA_END_DATE - timedelta(days=random.randint(30, 1000))
            })
    
    return pd.DataFrame(contact_records)

# Generate Mobile Money Transactions (simplified with fixed ranges)
def generate_mobile_money_data(customers_df, num_months=6):
    money_records = []
    
    # Sample customers
    sample_customers = customers_df.sample(n=3000, random_state=42)
    
    transaction_types = ['Send Money', 'Receive Money', 'Cash In', 'Cash Out', 'Bill Payment', 'Buy Load']
    
    for _, customer in sample_customers.iterrows():
        customer_id = customer['customer_id']
        segment = customer['customer_segment']
        income = customer['estimated_monthly_income']
        
        # Transaction frequency based on segment
        if segment == 'Premium':
            monthly_transactions = random.randint(10, 25)
        elif segment == 'Standard':
            monthly_transactions = random.randint(5, 15)
        else:
            monthly_transactions = random.randint(2, 8)
        
        for month in range(num_months):
            month_date = DATA_END_DATE - timedelta(days=30 * month)
            
            for _ in range(monthly_transactions):
                transaction_date = month_date - timedelta(days=random.randint(0, 29))
                
                # Transaction type
                transaction_type = random.choice(transaction_types)
                
                # Amount based on income and transaction type with safe ranges
                if transaction_type in ['Send Money', 'Receive Money']:
                    max_amount = builtins.max(200, builtins.min(5000, income // 10))
                    amount = random.randint(100, max_amount)
                elif transaction_type in ['Cash In', 'Cash Out']:
                    max_amount = builtins.max(1000, builtins.min(10000, income // 5))
                    amount = random.randint(500, max_amount)
                elif transaction_type == 'Bill Payment':
                    max_amount = builtins.max(500, builtins.min(3000, income // 15))
                    amount = random.randint(200, max_amount)
                else:  # Buy Load
                    amount = random.randint(50, 500)
                
                # Transaction status
                status = random.choices(['Successful', 'Failed'], weights=[0.95, 0.05])[0]
                
                money_records.append({
                    'customer_id': customer_id,
                    'transaction_id': f"MM_{random.randint(100000000, 999999999)}",
                    'transaction_date': transaction_date,
                    'transaction_type': transaction_type,
                    'amount_php': amount if status == 'Successful' else 0,
                    'status': status,
                    'recipient_number': f"+63{random.randint(9000000000, 9999999999)}" if 'Money' in transaction_type else None,
                    'fee_php': random.randint(5, 25) if status == 'Successful' else 0
                })
    
    return pd.DataFrame(money_records)

# Generate Linked Accounts Data (simplified)
def generate_linked_accounts_data(customers_df):
    linked_records = []
    
    platforms = ['Facebook', 'Google', 'Apple ID', 'GCash', 'PayMaya', 'Shopee', 'Lazada', 'Grab']
    
    # Sample customers
    sample_customers = customers_df.sample(n=5000, random_state=42)
    
    for _, customer in sample_customers.iterrows():
        customer_id = customer['customer_id']
        age = customer['age']
        segment = customer['customer_segment']
        
        # Number of linked accounts based on age and segment
        if age < 30:
            num_accounts = random.randint(4, 8)
        elif age < 50:
            num_accounts = random.randint(2, 6)
        else:
            num_accounts = random.randint(1, 4)
        
        # Select random platforms
        selected_platforms = random.sample(platforms, builtins.min(num_accounts, len(platforms)))
        
        for platform in selected_platforms:
            link_date = DATA_END_DATE - timedelta(days=random.randint(30, 1000))
            
            # Account status
            is_active = random.choices([True, False], weights=[0.85, 0.15])[0]
            
            # Usage frequency
            if platform in ['Facebook', 'Google']:
                usage_frequency = 'Daily' if random.random() > 0.3 else 'Weekly'
            elif platform in ['GCash', 'PayMaya']:
                usage_frequency = random.choice(['Daily', 'Weekly', 'Monthly'])
            else:
                usage_frequency = random.choice(['Weekly', 'Monthly', 'Rarely'])
            
            linked_records.append({
                'customer_id': customer_id,
                'platform_name': platform,
                'account_id': f"{platform.lower().replace(' ', '')}_{random.randint(100000, 999999)}",
                'link_date': link_date,
                'is_active': is_active,
                'usage_frequency': usage_frequency,
                'verification_status': random.choice(['Verified', 'Pending', 'Unverified'])
            })
    
    return pd.DataFrame(linked_records)

# Generate all remaining datasets
print("\nGenerating Contact Network Data...")
contact_df = generate_contact_network_data(customers_df)
print(f"Generated {len(contact_df):,} contact network records")

print("\nGenerating Mobile Money Transactions...")
mobile_money_df = generate_mobile_money_data(customers_df)
print(f"Generated {len(mobile_money_df):,} mobile money records")

print("\nGenerating Linked Accounts Data...")
linked_accounts_df = generate_linked_accounts_data(customers_df)
print(f"Generated {len(linked_accounts_df):,} linked account records")

print("\n" + "="*60)
print("üéâ All remaining datasets generated successfully!")
print("="*60)

In [0]:
# Helper function to safely create tables
def safe_create_table(df, table_name, description):
    try:
        # Check if table exists
        existing_tables = spark.sql(f"SHOW TABLES IN {CATALOG_NAME}.{SCHEMA_NAME}").collect()
        table_exists = any(row.tableName == table_name.split('.')[-1] for row in existing_tables)
        
        if table_exists:
            print(f"‚ö†Ô∏è  Table {table_name} already exists. Skipping creation.")
            return False
        else:
            # Create temp view and table
            temp_view_name = f"temp_{table_name.split('.')[-1]}"
            spark_df = spark.createDataFrame(df)
            spark_df.createOrReplaceTempView(temp_view_name)
            spark.sql(f"CREATE TABLE {table_name} AS SELECT * FROM {temp_view_name}")
            print(f"‚úì Created new table {table_name} with {len(df):,} records")
            return True
    except Exception as e:
        print(f"‚ùå Error creating table {table_name}: {str(e)}")
        return False

print("Saving new datasets as Delta tables...")

# 1. Reload and save recharge data (if it exists in memory from previous generation)
try:
    recharge_spark_df = spark.table(f"{CATALOG_NAME}.{SCHEMA_NAME}.recharge_history")
    print(f"‚úì Recharge history table already exists")
except:
    print(f"‚ö†Ô∏è  Recharge history table not found - will need to be regenerated")

# 2. Save Contact Network Data
print("\n1. Saving contact network data...")
safe_create_table(contact_df, f"{CATALOG_NAME}.{SCHEMA_NAME}.contact_network", "Contact network data")

# 3. Save Mobile Money Transactions
print("\n2. Saving mobile money transactions...")
safe_create_table(mobile_money_df, f"{CATALOG_NAME}.{SCHEMA_NAME}.mobile_money_transactions", "Mobile money transactions")

# 4. Save Linked Accounts Data
print("\n3. Saving linked accounts data...")
safe_create_table(linked_accounts_df, f"{CATALOG_NAME}.{SCHEMA_NAME}.linked_accounts", "Linked accounts data")

# 5. Generate and save a simplified location data (since the full version had cluster issues)
print("\n4. Generating and saving simplified location data...")

# Simple location data generation
def generate_simple_location_data(customers_df, num_records=50000):
    location_records = []
    
    # Sample customers
    sample_customers = customers_df.sample(n=1000, random_state=42)
    
    # Philippine cities with coordinates
    city_coords = {
        'Manila': (14.5995, 120.9842),
        'Quezon City': (14.6760, 121.0437),
        'Cebu City': (10.3157, 123.8854),
        'Davao': (7.1907, 125.4553)
    }
    
    for _, customer in sample_customers.iterrows():
        customer_id = customer['customer_id']
        city = customer['city']
        
        # Get city coordinates or use Manila as default
        if city in city_coords:
            base_lat, base_lon = city_coords[city]
        else:
            base_lat, base_lon = city_coords['Manila']
        
        # Generate 50 location points per customer
        for i in range(50):
            # Random date in last 30 days
            days_ago = random.randint(0, 29)
            location_date = DATA_END_DATE - timedelta(days=days_ago)
            
            # Random time
            hour = random.randint(6, 23)
            minute = random.randint(0, 59)
            timestamp = location_date.replace(hour=hour, minute=minute)
            
            # Location type based on time
            if 9 <= hour <= 17:
                location_type = random.choice(['Work', 'Shopping', 'Restaurant'])
            elif 18 <= hour <= 22:
                location_type = random.choice(['Home', 'Restaurant', 'Shopping'])
            else:
                location_type = 'Home'
            
            # Generate coordinates around the city
            lat = base_lat + random.uniform(-0.05, 0.05)
            lon = base_lon + random.uniform(-0.05, 0.05)
            
            location_records.append({
                'customer_id': customer_id,
                'timestamp': timestamp,
                'latitude': round(lat, 6),
                'longitude': round(lon, 6),
                'location_type': location_type,
                'city': city,
                'date': location_date.date(),
                'hour': hour
            })
    
    return pd.DataFrame(location_records)

# Generate simple location data
location_simple_df = generate_simple_location_data(customers_df)
print(f"Generated {len(location_simple_df):,} location records")

# Save location data
safe_create_table(location_simple_df, f"{CATALOG_NAME}.{SCHEMA_NAME}.location_data", "Location and movement data")

# Show all tables in the schema
print("\n" + "="*60)
print("üìä FINAL SUMMARY: All telco datasets created")
print("="*60)
print(f"Schema: {CATALOG_NAME}.{SCHEMA_NAME}")

tables_df = spark.sql(f"SHOW TABLES IN {CATALOG_NAME}.{SCHEMA_NAME}")
tables_list = [row.tableName for row in tables_df.collect()]

print(f"\nTables created ({len(tables_list)} total):")
for i, table in enumerate(sorted(tables_list), 1):
    print(f"  {i:2d}. {table}")

print("\n‚úÖ Comprehensive synthetic telco dataset generation completed!")
print("üöÄ Ready for Customer 360 analytics and ML modeling!")