## Data Generation Code - Healthcare attributes example

Step 1: Import Required Libraries

In [8]:
import pandas as pd
import random
from datetime import datetime, timedelta
import os

Step 2: Configuration

In [9]:
LOCAL_OUTPUT_DIR = "generated_data"
os.makedirs(LOCAL_OUTPUT_DIR, exist_ok=True)


Step 3: Define Data Schema (Array)

In [10]:
fields_array = [
    {"field_name": "PatientID", "type": "string"},
    {"field_name": "Name", "type": "string"},
    {"field_name": "Age", "type": "integer"},
    {"field_name": "Gender", "type": "string"},
    {"field_name": "HospitalID", "type": "string"},
    {"field_name": "Location", "type": "string"},
    {"field_name": "AdmissionDate", "type": "date"},
    {"field_name": "DischargeDate", "type": "date"},
    {"field_name": "Department", "type": "string"},
    {"field_name": "Vitals_BP", "type": "string"},       # Flattened nested field
    {"field_name": "Vitals_Pulse", "type": "integer"},   # Flattened nested field
]

Step 4: Function to Generate Record

In [11]:
def generate_patient_record():
    record = {}

    for field in fields_array:
        if field['type'] == 'string':
            if field['field_name'] == 'Name':
                record[field['field_name']] = random.choice(['John Doe', 'Alice Smith', 'Raj Patel', 'Emily Wang'])
            elif field['field_name'] == 'Gender':
                record[field['field_name']] = random.choice(['Male', 'Female', 'Other'])
            elif field['field_name'] == 'HospitalID':
                record[field['field_name']] = random.choice(['HOSP1001', 'HOSP1002', 'HOSP1003'])
            elif field['field_name'] == 'Location':
                record[field['field_name']] = random.choice(['New York', 'San Francisco', 'Chicago'])
            elif field['field_name'] == 'Department':
                record[field['field_name']] = random.choice(['Cardiology', 'Neurology', 'Pediatrics', 'Oncology'])
            elif field['field_name'] == 'Vitals_BP':
                record[field['field_name']] = f"{random.randint(90, 130)}/{random.randint(60, 90)}"
            else:
                record[field['field_name']] = f"{field['field_name']}_{random.randint(1000, 9999)}"

        elif field['type'] == 'integer':
            if field['field_name'] == 'Vitals_Pulse':
                record[field['field_name']] = random.randint(60, 100)
            else:
                record[field['field_name']] = random.randint(1, 100)

        elif field['type'] == 'date':
            random_days = random.randint(1, 1000)
            date_value = datetime.now() - timedelta(days=random_days)
            record[field['field_name']] = date_value.strftime('%Y-%m-%d')

    # Add partitioning and tracking fields
    now = datetime.now()
    record['record_created_date'] = now.strftime('%Y-%m-%d')  # For BQ partitioning
    record['last_updated_timestamp'] = now.strftime('%Y-%m-%d %H:%M:%S')  # For auditing

    return record

Step 5: Generate, Save to CSV, and run the code

In [12]:
def main():
    print("Generating patient data records...")
    records = [generate_patient_record() for _ in range(1000)]
    df = pd.DataFrame(records)

    timestamp = datetime.now().strftime("%Y-%m-%d-%H")
    filename = f"healthcare_patients_{timestamp}.csv"
    local_path = os.path.join(LOCAL_OUTPUT_DIR, filename)

    df.to_csv(local_path, index=False)
    print(f"File saved locally at: {local_path}")


In [13]:
main()

Generating patient data records...
File saved locally at: generated_data/healthcare_patients_2025-05-04-23.csv
