<a href="https://colab.research.google.com/github/IkerZha0401/INST0001/blob/main/individual_and_survey_generation.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [4]:
import csv
import random
from datetime import datetime, timedelta

_SURVEY_ID_COUNTER = 1

def generate_survey_id():
    global _SURVEY_ID_COUNTER
    if _SURVEY_ID_COUNTER > 24000:
        raise ValueError("Survey ID超过最大值24000")
    current_id = _SURVEY_ID_COUNTER
    _SURVEY_ID_COUNTER += 1
    return current_id

# Part 1: Define Regions

regions = [
    "SSD001", "SSD002", "SSD003", "SSD004",
    "RCB001", "RCB002", "RCB003", "RCB004",
    "PSE001", "PSE002", "PSE003", "PSE004",
    "YEM001", "YEM002", "YEM003", "YEM004",
    "LEB001", "LEB002", "LEB003", "LEB004"
]


# Step 2: Generate Yearly Parameters

def generate_yearly_params():
    """ Generate survey indicator trends from 2019 to 2024. """
    params = {2019: {"literacy_rate": 0.60, "numeracy_rate": 0.55, "poverty_rate": 0.40}}

    for year in range(2020, 2025):
        prev = params[year - 1]
        new_lit = max(0.0, min(1.0, prev["literacy_rate"] + random.uniform(-0.02, 0.02)))
        new_num = max(0.0, min(1.0, prev["numeracy_rate"] + random.uniform(-0.02, 0.02)))
        new_pov = max(0.0, min(1.0, prev["poverty_rate"] + random.uniform(-0.02, 0.02)))
        params[year] = {"literacy_rate": new_lit, "numeracy_rate": new_num, "poverty_rate": new_pov}

    return params


# Step 3: Generate `date_of_birth`


def generate_date_of_birth():
    """ Generate a random date of birth between 1944-01-01 and 2005-01-01. """
    start_date = datetime(1944, 1, 1)
    end_date = datetime(2005, 1, 1)
    delta = end_date - start_date
    random_days = random.randint(0, delta.days)
    return (start_date + timedelta(days=random_days)).strftime("%Y-%m-%d")

# Step 4: Generate SURVEY Data with 20% Retention

def generate_survey_data(regions, yearly_params, sample_size=200):
    """
    Generate survey data with 20% retention from previous year.
    Each personal_ID has a fixed region_ID and date_of_birth.
    """
    surveys = []
    individuals = []  # Store unique individuals with personal_ID, region_ID, sex, and date_of_birth
    personal_id_counter = 1
    previous_year_data = {}  # Store previous year personal_IDs per region

    for year in range(2019, 2025):
        for region_id in regions:
            # 20% retention from last year's survey
            retained_individuals = previous_year_data.get(region_id, [])
            retained_count = int(sample_size * 0.2)  # 20% retention
            retained_individuals = random.sample(retained_individuals, min(retained_count, len(retained_individuals)))

            # Generate new individuals (80%)
            new_count = sample_size - len(retained_individuals)
            new_individuals = []

            for _ in range(new_count):
                sex = random.choices(["male", "female", "prefer not to say"], weights=[49, 49, 2])[0]
                date_of_birth = generate_date_of_birth()
                new_individuals.append({"personal_id": personal_id_counter, "sex": sex, "date_of_birth": date_of_birth, "region_id": region_id})
                personal_id_counter += 1

            # Merge retained + new individuals
            all_individuals = new_individuals + retained_individuals

            # Update retention list for next year
            previous_year_data[region_id] = all_individuals.copy()

            # Generate survey records
            params = yearly_params[year]

            for person in all_individuals:
                personal_id = person["personal_id"]
                region_id = person["region_id"]

                proficiency_in_literacy = random.random() < params["literacy_rate"]
                proficiency_in_numeracy = random.random() < params["numeracy_rate"]
                employment_status = "below_minimum_legal_work_age" if int(person["date_of_birth"][:4]) > (year - 18) else random.choices(
                    ["employed", "unemployed"], weights=[70, 30])[0]
                below_poverty_line = random.random() < params["poverty_rate"]

                # Generate `average_time_spent_on_unpaid_domestic_work`
                if person["sex"] == "male":
                    time_spent = random.randint(8, 13)
                elif person["sex"] == "female":
                    time_spent = random.randint(30, 40)
                else:
                    time_spent = random.choice([random.randint(8, 13), random.randint(30, 40)])

                survey = {
                    "survey_id": generate_survey_id(),
                    "personal_id": personal_id,  # Foreign key to INDIVIDUAL
                    "region_id": region_id,  # Fixed region for each individual
                    "proficiency_in_literacy": proficiency_in_literacy,
                    "proficiency_in_numeracy": proficiency_in_numeracy,
                    "employment_status": employment_status,
                    "below_poverty_line": below_poverty_line,
                    "average_time_spent_on_unpaid_domestic_work": time_spent,
                    "survey_year": year,
                }
                surveys.append(survey)

            # Add new individuals to the list for individuals.csv
            individuals.extend(new_individuals)

    return surveys, individuals

# Step 5: Export Data to CSV

def export_to_csv(filename, data, fieldnames):
    """ Export a list of dictionaries 'data' to a CSV file. """
    with open(filename, 'w', newline='', encoding='utf-8') as f:
        writer = csv.DictWriter(f, fieldnames=fieldnames)
        writer.writeheader()
        for row in data:
            writer.writerow(row)

# Step 6: Main Execution

def main():
    yearly_params = generate_yearly_params()
    surveys, individuals = generate_survey_data(regions, yearly_params, sample_size=200)

    # Export CSVs
    export_to_csv('individuals.csv', individuals, ['personal_id', 'region_id', 'sex', 'date_of_birth'])
    export_to_csv('surveys.csv', surveys, [
        'survey_id', 'personal_id', 'region_id',
        'proficiency_in_literacy', 'proficiency_in_numeracy',
        'employment_status', 'below_poverty_line',
        'average_time_spent_on_unpaid_domestic_work', 'survey_year'
    ])

    print("Data generation complete. 'individuals.csv' and 'surveys.csv' have been created.")

if __name__ == "__main__":
    main()


Data generation complete. 'individuals.csv' and 'surveys.csv' have been created.
