In [37]:
import random
import math
import csv
from dataclasses import dataclass
from typing import Tuple, List
import numpy as np
import os


In [38]:
@dataclass
class Segment:
    name: str
    income_range: Tuple[int, int]
    cv_range: Tuple[float, float]
    expense_range: Tuple[float, float]
    emi_range: Tuple[float, float]
    bounce_lambda: float
    vintage_range: Tuple[int, int]


In [39]:
SEGMENTS: List[Segment] = [

    Segment("SUPER_PRIME", (300000, 500000), (0.02, 0.05), (0.30, 0.40), (0.10, 0.20), 0.1, (72, 120)),
    Segment("PRIME_SALARIED", (180000, 300000), (0.05, 0.10), (0.40, 0.55), (0.20, 0.30), 0.3, (48, 96)),
    Segment("PRIME_MSME", (150000, 250000), (0.12, 0.18), (0.45, 0.60), (0.25, 0.35), 0.5, (36, 84)),

    Segment("NEAR_PRIME_SPENDER", (120000, 200000), (0.10, 0.15), (0.60, 0.65), (0.30, 0.40), 0.6, (24, 72)),
    Segment("NEW_TO_CREDIT", (120000, 180000), (0.08, 0.12), (0.50, 0.60), (0.00, 0.10), 0.2, (3, 12)),

    Segment("FREELANCER", (80000, 150000), (0.25, 0.35), (0.60, 0.70), (0.30, 0.45), 0.8, (18, 60)),
    Segment("SEASONAL_BUSINESS", (50000, 300000), (0.35, 0.50), (0.65, 0.75), (0.35, 0.45), 1.0, (24, 84)),
    Segment("OVERLEVERAGED_DISCIPLINED", (150000, 350000), (0.10, 0.20), (0.40, 0.50), (0.45, 0.50), 0.1, (48, 120)),

    Segment("CASH_STRUGGLER", (50000, 90000), (0.40, 0.60), (0.75, 0.90), (0.45, 0.55), 1.5, (24, 72)),
    Segment("REPEATED_BOUNCES", (60000, 100000), (0.35, 0.55), (0.65, 0.85), (0.40, 0.50), 2.5, (24, 60)),

    Segment("STRUCTURING", (70000, 150000), (0.60, 0.90), (1.20, 1.60), (0.20, 0.40), 0.4, (12, 48)),
    Segment("SALARY_CASHOUT", (120000, 250000), (0.05, 0.10), (0.90, 1.10), (0.20, 0.30), 0.2, (24, 72)),
    Segment("SUDDEN_SPIKE", (100000, 1000000), (0.80, 1.20), (0.20, 0.50), (0.00, 0.20), 0.1, (6, 36)),

    Segment("LOW_INCOME_DISCIPLINED", (40000, 60000), (0.05, 0.10), (0.30, 0.40), (0.10, 0.20), 0.1, (48, 120)),
    Segment("HIGH_INCOME_CHAOTIC", (300000, 600000), (0.40, 0.70), (0.80, 1.00), (0.30, 0.50), 1.2, (24, 72)),
]


In [40]:
def sample_lognormal(min_val, max_val):
    mean = math.log((min_val + max_val) / 2)
    sigma = 0.4
    value = np.random.lognormal(mean, sigma)
    return max(min(value, max_val), min_val)


def sample_uniform(a, b):
    return random.uniform(a, b)


def sample_poisson(lam):
    return np.random.poisson(lam)


In [41]:
def generate_feature_row(segment: Segment) -> dict:
    avgMonthlyIncome = sample_lognormal(*segment.income_range)

    incomeCV = sample_uniform(*segment.cv_range)

    expenseRatio = np.clip(
        random.normalvariate(
            sum(segment.expense_range) / 2, 0.05
        ),
        0.20, 1.80
    )

    emiRatio = np.clip(
        random.normalvariate(
            sum(segment.emi_range) / 2, 0.05
        ),
        0.00, 0.90
    )

    avgMonthlyBalance = avgMonthlyIncome * random.uniform(0.2, 1.2)

    bounceCount = sample_poisson(segment.bounce_lambda)

    accountAgeMonths = random.randint(*segment.vintage_range)

    # üîä Noise injection
    avgMonthlyIncome *= random.uniform(0.95, 1.05)
    avgMonthlyBalance *= random.uniform(0.9, 1.1)

    return {
        "avgMonthlyIncome": round(avgMonthlyIncome, 2),
        "incomeCV": round(incomeCV, 3),
        "expenseRatio": round(expenseRatio, 3),
        "emiRatio": round(emiRatio, 3),
        "avgMonthlyBalance": round(avgMonthlyBalance, 2),
        "bounceCount": int(bounceCount),
        "accountAgeMonths": int(accountAgeMonths),
    }


In [45]:
import csv
import os

def generate_dataset(
    filename="../data/synthetic/features_only.csv",
    rows_per_segment=2000
):
    fieldnames = [
        "avgMonthlyIncome",
        "incomeCV",
        "expenseRatio",
        "emiRatio",
        "avgMonthlyBalance",
        "bounceCount",
        "accountAgeMonths"
    ]

    # Ensure directory exists
    os.makedirs(os.path.dirname(filename), exist_ok=True)

    with open(filename, "w", newline="", encoding="utf-8") as f:
        writer = csv.DictWriter(f, fieldnames=fieldnames)
        writer.writeheader()

        for segment in SEGMENTS:
            for _ in range(rows_per_segment):
                row = generate_feature_row(segment)

                # Safety check (prevents silent data corruption)
                if not all(key in row for key in fieldnames):
                    raise ValueError(
                        f"‚ùå Missing fields in generated row for segment {segment}"
                    )

                writer.writerow(row)

    total_rows = len(SEGMENTS) * rows_per_segment

    print(f"Feature dataset generated successfully")


In [46]:
generate_dataset(
    filename="../data/synthetic/features_only.csv",
    rows_per_segment=2000
)

Feature dataset generated successfully
