In [4]:
import os
import uuid
import random
import pandas as pd
from datetime import datetime, timedelta
import numpy as np
import zipfile

# ------------------
# Setup
# ------------------
random.seed(42)
np.random.seed(42)

output_dir = "synthetic_trial_dataset_final"
os.makedirs(output_dir, exist_ok=True)

def random_date(start, end):
    """Generate a random date between start and end."""
    return start + timedelta(days=random.randint(0, (end - start).days))

# ------------------
# Reference Data
# ------------------
street_names = ["Main St", "Broadway", "Oak Ave", "Pine St", "Maple Rd", "Cedar Ln", "Elm St", "Park Ave"]
cities_states = [
    ("New York", "NY"), ("Los Angeles", "CA"), ("Chicago", "IL"), ("Houston", "TX"),
    ("Phoenix", "AZ"), ("Philadelphia", "PA"), ("San Antonio", "TX"),
    ("San Diego", "CA"), ("Dallas", "TX"), ("San Jose", "CA")
]

# ------------------
# Patients
# ------------------
n_patients = 800

genders = ["male", "female", "other"]
gender_weights = [0.40, 0.55, 0.05]

ethnicities = ["white", "hispanic", "black", "asian"]
ethnicity_weights = [0.55, 0.15, 0.20, 0.05]

patients = []
for i in range(1, n_patients + 1):
    patient_id = f"P{i:04d}"
    birthdate = datetime(1940, 1, 1) + timedelta(days=random.randint(0, 30000))
    gender = random.choices(genders, gender_weights)[0]
    ethnicity = random.choices(ethnicities, ethnicity_weights)[0]
    city, state = random.choice(cities_states)
    address = f"{random.randint(100,999)} {random.choice(street_names)}, {city}, {state}"
    enrolling_site_id = f"O{random.randint(1,10):03d}"
    patients.append([patient_id, birthdate.date(), gender, ethnicity, address, city, state, enrolling_site_id])

patients_df = pd.DataFrame(patients, columns=["patient_id", "birthdate", "gender", "ethnicity", "address", "city", "state", "enrolling_site_id"])
patients_df.to_csv(os.path.join(output_dir, "patients.csv"), index=False)

# ------------------
# Organizations
# ------------------
orgs = []
for i, (city, state) in enumerate(cities_states, 1):
    if i > 10: break
    orgs.append([f"O{i:03d}", f"{city} Clinical Site", city, state])

organizations_df = pd.DataFrame(orgs, columns=["organization_id", "name", "city", "state"])
organizations_df.to_csv(os.path.join(output_dir, "organizations.csv"), index=False)

# ------------------
# Providers
# ------------------
specialties = ["Oncology", "Cardiology", "Neurology", "Endocrinology", "Pulmonology"]
providers = []
for i in range(1, 501):
    provider_id = f"PR{i:04d}"
    name = f"Dr. {uuid.uuid4().hex[:6].capitalize()}"
    specialty = random.choice(specialties)
    org_id = f"O{random.randint(1,10):03d}"
    providers.append([provider_id, name, specialty, org_id])

providers_df = pd.DataFrame(providers, columns=["provider_id", "name", "specialty", "organization_id"])
providers_df.to_csv(os.path.join(output_dir, "providers.csv"), index=False)

# ------------------
# Trial Recruitment
# ------------------
statuses = ["screen_failed", "enrolled", "randomized", "dropped"]
recruitment = []
for p in patients_df["patient_id"]:
    screening_date = random_date(datetime(2020, 1, 1), datetime(2020, 12, 31))
    status = random.choices(statuses, [0.2, 0.3, 0.4, 0.1])[0]
    enrolled_date = screening_date + timedelta(days=random.randint(1, 30)) if status in ["enrolled", "randomized", "dropped"] else ""
    randomized_date = enrolled_date + timedelta(days=random.randint(1, 30)) if status == "randomized" else ""
    recruitment.append([p, screening_date.date(), enrolled_date, randomized_date, status])

recruitment_df = pd.DataFrame(recruitment, columns=["patient_id", "screening_date", "enrolled_date", "randomized_date", "status"])
recruitment_df.to_csv(os.path.join(output_dir, "trial_recruitment.csv"), index=False)

# ------------------
# Encounters
# ------------------
encounters = []
encounter_id = 1
for p in patients_df["patient_id"]:
    n_enc = random.randint(2, 7)
    for _ in range(n_enc):
        date = random_date(datetime(2020, 1, 1), datetime(2021, 12, 31))
        provider_id = random.choice(providers_df["provider_id"].tolist())
        encounters.append([f"E{encounter_id:05d}", p, provider_id, date.date()])
        encounter_id += 1

encounters_df = pd.DataFrame(encounters, columns=["encounter_id", "patient_id", "provider_id", "visit_date"])
encounters_df.to_csv(os.path.join(output_dir, "encounters.csv"), index=False)

# ------------------
# Medications
# ------------------
medications_list = ["DrugA", "DrugB", "DrugC", "ComparatorX", "Placebo"]
medications = []
for p in patients_df["patient_id"]:
    if random.random() < 0.7:  # 70% get a medication
        med = random.choice(medications_list)
        start_date = random_date(datetime(2020, 1, 1), datetime(2020, 12, 31))
        stop_date = start_date + timedelta(days=random.randint(30, 365))
        medications.append([f"M{uuid.uuid4().hex[:6]}", p, med, start_date.date(), stop_date.date()])

medications_df = pd.DataFrame(medications, columns=["medication_id", "patient_id", "medication_name", "start_date", "stop_date"])
medications_df.to_csv(os.path.join(output_dir, "medications.csv"), index=False)

# ------------------
# Conditions
# ------------------
conditions_list = ["Hypertension", "Diabetes", "Asthma", "COPD", "Cancer"]
conditions = []
for p in patients_df["patient_id"]:
    n_cond = random.randint(0, 3)
    for _ in range(n_cond):
        cond = random.choice(conditions_list)
        start_date = random_date(datetime(2000, 1, 1), datetime(2019, 12, 31))
        stop_date = start_date + timedelta(days=random.randint(30, 365)) if random.random() < 0.2 else ""
        conditions.append([p, cond, start_date.date(), stop_date])

conditions_df = pd.DataFrame(conditions, columns=["patient_id", "condition_name", "start", "stop"])
conditions_df.to_csv(os.path.join(output_dir, "conditions.csv"), index=False)

# ------------------
# Adherence Tracking
# ------------------
adherence = []
for e in encounters_df.itertuples():
    visit_completed = random.choice([0, 1])
    on_med = random.choice([0, 1])
    med_name = random.choice(medications_list) if on_med else ""
    med_taken = random.choice([0, 1]) if on_med else ""
    adherence.append([e.encounter_id, e.patient_id, visit_completed, on_med, med_name, med_taken])

adherence_df = pd.DataFrame(adherence, columns=["encounter_id", "patient_id", "visit_completed", "on_medication", "medication_name", "medication_taken"])
adherence_df.to_csv(os.path.join(output_dir, "adherence_tracking.csv"), index=False)

# ------------------
# Observations
# ------------------
observations = []
for e in encounters_df.sample(1200).itertuples():
    obs_name = random.choice(["BP", "HR", "Weight", "Height", "Glucose"])
    value = round(random.uniform(50, 200), 1)
    observations.append([e.encounter_id, e.patient_id, obs_name, value])

observations_df = pd.DataFrame(observations, columns=["encounter_id", "patient_id", "observation_name", "value"])
observations_df.to_csv(os.path.join(output_dir, "observations.csv"), index=False)

# ------------------
# Procedures
# ------------------
procedures_list = ["Biopsy", "MRI", "CT Scan", "Blood Test", "ECG"]
procedures = []
for e in encounters_df.sample(400).itertuples():
    proc = random.choice(procedures_list)
    procedures.append([e.encounter_id, e.patient_id, proc])

procedures_df = pd.DataFrame(procedures, columns=["encounter_id", "patient_id", "procedure_name"])
procedures_df.to_csv(os.path.join(output_dir, "procedures.csv"), index=False)

# ------------------
# Create ZIP
# ------------------
zip_path = "synthetic_trial_dataset_final.zip"
with zipfile.ZipFile(zip_path, "w") as zf:
    for file in os.listdir(output_dir):
        zf.write(os.path.join(output_dir, file), arcname=file)

print(f"Dataset generated and saved to {zip_path}")


Dataset generated and saved to synthetic_trial_dataset_final.zip


In [None]:
import os
import uuid
import random
import pandas as pd
from datetime import datetime, timedelta
import numpy as np
import zipfile

# ------------------
# Setup
# ------------------
random.seed(42)
np.random.seed(42)

output_dir = "synthetic_trial_dataset_final"
os.makedirs(output_dir, exist_ok=True)

def random_date(start, end):
    """Generate a random date between start and end."""
    return start + timedelta(days=random.randint(0, (end - start).days))

# ------------------
# Reference Data
# ------------------
street_names = ["Main St", "Broadway", "Oak Ave", "Pine St", "Maple Rd", "Cedar Ln", "Elm St", "Park Ave"]
cities_states = [
    ("New York", "NY"), ("Los Angeles", "CA"), ("Chicago", "IL"), ("Houston", "TX"),
    ("Phoenix", "AZ"), ("Philadelphia", "PA"), ("San Antonio", "TX"),
    ("San Diego", "CA"), ("Dallas", "TX"), ("San Jose", "CA")
]

# ------------------
# Patients
# ------------------
n_patients = 800

genders = ["male", "female", "other"]
gender_weights = [0.40, 0.55, 0.05]

ethnicities = ["white", "hispanic", "black", "asian"]
ethnicity_weights = [0.55, 0.15, 0.20, 0.05]

patients = []
for i in range(1, n_patients + 1):
    patient_id = f"P{i:04d}"
    birthdate = datetime(1940, 1, 1) + timedelta(days=random.randint(0, 30000))
    gender = random.choices(genders, gender_weights)[0]
    ethnicity = random.choices(ethnicities, ethnicity_weights)[0]
    city, state = random.choice(cities_states)
    address = f"{random.randint(100,999)} {random.choice(street_names)}, {city}, {state}"
    enrolling_site_id = f"O{random.randint(1,10):03d}"
    patients.append([patient_id, birthdate.date(), gender, ethnicity, address, city, state, enrolling_site_id])

patients_df = pd.DataFrame(patients, columns=["patient_id", "birthdate", "gender", "ethnicity", "address", "city", "state", "enrolling_site_id"])
patients_df.to_csv(os.path.join(output_dir, "patients.csv"), index=False)

# ------------------
# Organizations
# ------------------
orgs = []
for i, (city, state) in enumerate(cities_states, 1):
    if i > 10: break
    orgs.append([f"O{i:03d}", f"{city} Clinical Site", city, state])

organizations_df = pd.DataFrame(orgs, columns=["organization_id", "name", "city", "state"])
organizations_df.to_csv(os.path.join(output_dir, "organizations.csv"), index=False)

# ------------------
# Providers
# ------------------
specialties = ["Oncology", "Cardiology", "Neurology", "Endocrinology", "Pulmonology"]
providers = []
for i in range(1, 501):
    provider_id = f"PR{i:04d}"
    name = f"Dr. {uuid.uuid4().hex[:6].capitalize()}"
    specialty = random.choice(specialties)
    org_id = f"O{random.randint(1,10):03d}"
    providers.append([provider_id, name, specialty, org_id])

providers_df = pd.DataFrame(providers, columns=["provider_id", "name", "specialty", "organization_id"])
providers_df.to_csv(os.path.join(output_dir, "providers.csv"), index=False)

# ------------------
# Trial Recruitment
# ------------------
statuses = ["screen_failed", "enrolled", "randomized", "dropped"]
recruitment = []
for p in patients_df["patient_id"]:
    screening_date = random_date(datetime(2020, 1, 1), datetime(2020, 12, 31))
    status = random.choices(statuses, [0.2, 0.3, 0.4, 0.1])[0]
    enrolled_date = screening_date + timedelta(days=random.randint(1, 30)) if status in ["enrolled", "randomized", "dropped"] else ""
    randomized_date = enrolled_date + timedelta(days=random.randint(1, 30)) if status == "randomized" else ""
    recruitment.append([p, screening_date.date(), enrolled_date, randomized_date, status])

recruitment_df = pd.DataFrame(recruitment, columns=["patient_id", "screening_date", "enrolled_date", "randomized_date", "status"])
recruitment_df.to_csv(os.path.join(output_dir, "trial_recruitment.csv"), index=False)

# ------------------
# Encounters
# ------------------
encounters = []
encounter_id = 1
for p in patients_df["patient_id"]:
    n_enc = random.randint(2, 7)
    for _ in range(n_enc):
        date = random_date(datetime(2020, 1, 1), datetime(2021, 12, 31))
        provider_id = random.choice(providers_df["provider_id"].tolist())
        encounters.append([f"E{encounter_id:05d}", p, provider_id, date.date()])
        encounter_id += 1

encounters_df = pd.DataFrame(encounters, columns=["encounter_id", "patient_id", "provider_id", "visit_date"])
encounters_df.to_csv(os.path.join(output_dir, "encounters.csv"), index=False)

# ------------------
# Medications
# ------------------
medications_list = ["DrugA", "DrugB", "DrugC", "ComparatorX", "Placebo"]
medications = []
for p in patients_df["patient_id"]:
    if random.random() < 0.7:  # 70% get a medication
        med = random.choice(medications_list)
        start_date = random_date(datetime(2020, 1, 1), datetime(2020, 12, 31))
        stop_date = start_date + timedelta(days=random.randint(30, 365))
        medications.append([f"M{uuid.uuid4().hex[:6]}", p, med, start_date.date(), stop_date.date()])

medications_df = pd.DataFrame(medications, columns=["medication_id", "patient_id", "medication_name", "start_date", "stop_date"])
medications_df.to_csv(os.path.join(output_dir, "medications.csv"), index=False)

# ------------------
# Conditions
# ------------------
conditions_list = ["Hypertension", "Diabetes", "Asthma", "COPD", "Cancer"]
conditions = []
for p in patients_df["patient_id"]:
    n_cond = random.randint(0, 3)
    for _ in range(n_cond):
        cond = random.choice(conditions_list)
        start_date = random_date(datetime(2000, 1, 1), datetime(2019, 12, 31))
        stop_date = start_date + timedelta(days=random.randint(30, 365)) if random.random() < 0.2 else ""
        conditions.append([p, cond, start_date.date(), stop_date])

conditions_df = pd.DataFrame(conditions, columns=["patient_id", "condition_name", "start", "stop"])
conditions_df.to_csv(os.path.join(output_dir, "conditions.csv"), index=False)

# ------------------
# Adherence Tracking
# ------------------
adherence = []
for e in encounters_df.itertuples():
    visit_completed = random.choice([0, 1])
    on_med = random.choice([0, 1])
    med_name = random.choice(medications_list) if on_med else ""
    med_taken = random.choice([0, 1]) if on_med else ""
    adherence.append([e.encounter_id, e.patient_id, visit_completed, on_med, med_name, med_taken])

adherence_df = pd.DataFrame(adherence, columns=["encounter_id", "patient_id", "visit_completed", "on_medication", "medication_name", "medication_taken"])
adherence_df.to_csv(os.path.join(output_dir, "adherence_tracking.csv"), index=False)

# ------------------
# Observations
# ------------------
observations = []
for e in encounters_df.sample(1200).itertuples():
    obs_name = random.choice(["BP", "HR", "Weight", "Height", "Glucose"])
    value = round(random.uniform(50, 200), 1)
    observations.append([e.encounter_id, e.patient_id, obs_name, value])

observations_df = pd.DataFrame(observations, columns=["encounter_id", "patient_id", "observation_name", "value"])
observations_df.to_csv(os.path.join(output_dir, "observations.csv"), index=False)

# ------------------
# Procedures
# ------------------
procedures_list = ["Biopsy", "MRI", "CT Scan", "Blood Test", "ECG"]
procedures = []
for e in encounters_df.sample(400).itertuples():
    proc = random.choice(procedures_list)
    procedures.append([e.encounter_id, e.patient_id, proc])

procedures_df = pd.DataFrame(procedures, columns=["encounter_id", "patient_id", "procedure_name"])
procedures_df.to_csv(os.path.join(output_dir, "procedures.csv"), index=False)

# ------------------
# Create ZIP
# ------------------
zip_path = "synthetic_trial_dataset_final.zip"
with zipfile.ZipFile(zip_path, "w") as zf:
    for file in os.listdir(output_dir):
        zf.write(os.path.join(output_dir, file), arcname=file)

print(f"Dataset generated and saved to {zip_path}")


Dataset generated and saved to synthetic_trial_dataset_final.zip


: 