In [19]:
import pandas as pd
import numpy as np
import uuid
import random
from datetime import datetime, timedelta
from faker import Faker
from tqdm import tqdm
from pathlib import Path

fake = Faker()
np.random.seed(11)

NUM_CUSTOMERS = 1000#500_000
START_DATE = datetime(2022, 1, 1)
END_DATE = datetime(2025, 5, 30)
PRODUCTS = ['checking', 'savings', 'credit_card', 'loan', 'debit']

def generate_customer_profiles(num_customers):
    profiles = []
    for _ in tqdm(range(num_customers), desc="Generating customer profiles"):
        profile = {
            "customer_id": str(uuid.uuid4()),
            "age": np.random.randint(18, 85),
            "state": fake.state_abbr(),
            "has_credit_card": np.random.choice([0, 1]),
            "has_loan": np.random.choice([0, 1]),
            "has_checking": 1,
            "has_savings": np.random.choice([0, 1]),
            "join_date": fake.date_between(start_date='-10y', end_date='-1y'),
            "gender": np.random.choice(['M', 'F', 'Other']),
        }
        profiles.append(profile)
    return pd.DataFrame(profiles)

def generate_transactions(customers, avg_tx_per_customer=100):
    txns = []
    for idx, row in tqdm(customers.iterrows(), total=len(customers), desc="Generating transactions"):
        # Determine which products the customer has
        available_products = ["checking", "debit"]  # always included
        if row["has_credit_card"]:
            available_products.append("credit_card")
        if row["has_loan"]:
            available_products.append("loan")
        if row["has_savings"]:
            available_products.append("savings")

        n_tx = int(np.random.poisson(avg_tx_per_customer))
        base_date = START_DATE + timedelta(days=np.random.randint(0, 180))
        for _ in range(n_tx):
            txn_date = base_date + timedelta(days=np.random.randint(0, (END_DATE - START_DATE).days))
            product = np.random.choice(available_products)
            amount = np.round(np.random.exponential(scale=100), 2)

            txns.append({
                "customer_id": row["customer_id"],
                "product": product,
                "amount": -amount if product in ["debit", "checking", "credit_card"] else amount,
                "txn_type": np.random.choice(['purchase', 'payment', 'deposit', 'withdrawal']),
                "timestamp": txn_date
            })
    return pd.DataFrame(txns)

def generate_interactions(customers, avg_int_per_customer=20):
    interactions = []
    for idx, row in tqdm(customers.iterrows(), total=len(customers), desc="Generating interactions"):
        n_int = int(np.random.poisson(avg_int_per_customer))
        
        # Build a weighted interaction profile
        weights = {
            'login': 1.0,
            'support_call': 0.1,
            'email_click': 0.2
        }

        # Adjust weights based on product ownership
        if row["has_credit_card"]:
            weights['support_call'] += 0.3
            weights['email_click'] += 0.2
        if row["has_loan"]:
            weights['support_call'] += 0.4
            weights['email_click'] += 0.1
        if row["has_savings"]:
            weights['email_click'] += 0.2
        if row["has_checking"]:
            weights['login'] += 0.5

        # Normalize to make it a probability distribution
        total_weight = sum(weights.values())
        interaction_types = list(weights.keys())
        probabilities = [w / total_weight for w in weights.values()]

        for _ in range(n_int):
            interactions.append({
                "customer_id": row["customer_id"],
                "interaction_type": np.random.choice(interaction_types, p=probabilities),
                "timestamp": START_DATE + timedelta(days=np.random.randint(0, (END_DATE - START_DATE).days))
            })
    return pd.DataFrame(interactions)


def generate_churn_labels(customers, transactions):
    latest_tx = transactions.groupby("customer_id")["timestamp"].max().reset_index()
    latest_tx.columns = ["customer_id", "last_tx_date"]
    today = END_DATE
    merged = pd.merge(customers, latest_tx, on="customer_id", how="left")
    merged["days_since_last_tx"] = (today - merged["last_tx_date"]).dt.days.fillna(9999)
    merged["churned"] = merged["days_since_last_tx"].apply(lambda x: 1 if x > 180 else 0)
    return merged[["customer_id", "churned"]]

# Generate datasets
customer_df = generate_customer_profiles(NUM_CUSTOMERS)
transaction_df = generate_transactions(customer_df, avg_tx_per_customer=100)
interaction_df = generate_interactions(customer_df, avg_int_per_customer=20)
churn_df = generate_churn_labels(customer_df, transaction_df)

# Save as Parquet for PySpark
save_dir = Path("..") / "raw"
save_dir.mkdir(exist_ok=True)  # Creates directory if it doesn't exist

customer_df.to_parquet(save_dir / "customers.parquet", index=False)
transaction_df.to_parquet(save_dir / "transactions.parquet", index=False)
interaction_df.to_parquet(save_dir / "interactions.parquet", index=False)
churn_df.to_parquet(save_dir / "churn_labels.parquet", index=False)


Generating customer profiles: 100%|██████████| 1000/1000 [00:00<00:00, 18298.64it/s]
Generating transactions: 100%|██████████| 1000/1000 [00:01<00:00, 544.73it/s]
Generating interactions: 100%|██████████| 1000/1000 [00:00<00:00, 4272.64it/s]
