### Imports

In [2]:
import pandas as pd
import numpy as np
import os
import random
import string
from glob import glob
from datetime import date, datetime, timedelta
from dateutil.relativedelta import relativedelta

### Episodes Column Definitions

In [2]:
def e_gen_nhs_number_and_episode_id(num_records=100000, start_episode_id=100000, start_nhs_number=9000000000):
    # Initialize variables
    episode_id = start_episode_id
    nhs_number = start_nhs_number
    data = []

    while len(data) < num_records:
        # Determine the number of episodes for this NHS number
        rand = random.random()
        if rand < 0.2:
            num_episodes = 1
        elif rand < 0.5:
            num_episodes = 2
        else:
            num_episodes = 3

        # Ensure we don't exceed the total number of records
        if len(data) + num_episodes > num_records:
            num_episodes = num_records - len(data)

        # Add records for this NHS number
        for _ in range(num_episodes):
            data.append({"nhs_number": nhs_number, "episode_id": episode_id})
            episode_id += 1
        nhs_number += 1
    
    return data


def e_gen_appointment_made(df, prob_true=0.95):
    df['appointment_made'] = np.random.choice([True, False], size=len(df), p=[prob_true, 1-prob_true])
    return df


def e_gen_date_of_foa(df):

    df['date_of_foa'] = None

    first_ep_start = datetime(2024, 4, 1)
    first_ep_end = datetime(2025, 3, 31)

    for nhs_number, group in df.groupby('nhs_number'):
        if group.empty:
            continue

        # Generate the first episode date
        first_ep_date = first_ep_start + timedelta(days=np.random.randint(0, (first_ep_end - first_ep_start).days + 1))

        # Assign dates to episodes
        for idx, row in group.iterrows():
            if idx == group.index[0]:
                # First episode
                df.at[idx, 'date_of_foa'] = first_ep_date.date() if row['appointment_made'] else None
            elif idx == group.index[1]:
                # Second episode
                offset = np.random.choice([35, 37], p=[0.9, 0.1])
                second_ep_date = first_ep_date - relativedelta(months=offset)
                df.at[idx, 'date_of_foa'] = second_ep_date.date() if row['appointment_made'] else None
            elif idx == group.index[2]:
                # Third episode
                offset = np.random.choice([35, 37], p=[0.9, 0.1])
                third_ep_date = second_ep_date - relativedelta(months=offset)
                df.at[idx, 'date_of_foa'] = third_ep_date.date() if row['appointment_made'] else None

    return df


def e_gen_date_of_as(df):
    df['date_of_as'] = None

    for idx, row in df.iterrows():
        if pd.isna(row['date_of_foa']):
            df.at[idx, 'date_of_as'] = None
        else:
            rand = random.random()
            if rand < 0.15:
                df.at[idx, 'date_of_as'] = None
            elif rand < 0.20:
                offset = random.randint(6, 7)
                df.at[idx, 'date_of_as'] = row['date_of_foa'] + relativedelta(months=offset)
            else:
                offset = random.randint(0, 6)
                df.at[idx, 'date_of_as'] = row['date_of_foa'] + relativedelta(months=offset)

    return df


def e_gen_episode_date(df):
    df['episode_date'] = None
    random_start = datetime(2024, 4, 1)
    random_end = datetime(2025, 3, 31)

    for idx, row in df.iterrows():
        if pd.notna(row['date_of_foa']):
            df.at[idx, 'episode_date'] = row['date_of_foa'] - timedelta(weeks=2)
        else:
            df.at[idx, 'episode_date'] = (random_start + timedelta(days=np.random.randint(0, (random_end - random_start).days + 1))).date()

    return df


def e_gen_early_recall_date(df):
    df['early_recall_date'] = None

    for idx, row in df.iterrows():
        rand = random.random()
        if rand < 0.05:
           df.at[idx, 'early_recall_date'] = df.at[idx, 'date_of_foa']
        else:
           df.at[idx, 'early_recall_date'] = None
    return df   


def e_gen_change_db_date_time(df):
    df['change_db_date_time'] = None

    for idx, row in df.iterrows():
          # Generate a random time on the `episode_date`
          random_hour = random.randint(0, 23)
          random_minute = random.randint(0, 59)
          random_second = random.randint(0, 59)
          df.at[idx, 'change_db_date_time'] = datetime.combine(row['episode_date'], datetime.min.time()) + timedelta(hours=random_hour, minutes=random_minute, seconds=random_second)

    return df


def e_gen_episode_type(df, episode_type_mapping_df):

    non_r_codes = episode_type_mapping_df[episode_type_mapping_df['code'] != 'R']['code'].tolist()

    df['episode_type'] = None

    for idx in df.index:
        if random.random() < 0.8:
            df.at[idx, 'episode_type'] = 'R'
        else:
            df.at[idx, 'episode_type'] = random.choice(non_r_codes)

    return df


def e_gen_end_code(df):
    df['end_code'] = None

    for idx, row in df.iterrows():
        if not row['appointment_made']:
            df.at[idx, 'end_code'] = 'DNR'
        else:
            if pd.notna(row['date_of_as']):
                df.at[idx, 'end_code'] = 'SC'
            else:
                df.at[idx, 'end_code'] = 'DNA'
    return df


def e_gen_call_recall_status_authorised_by(df):
    df['call_recall_status_authorised_by'] = None

    for idx, row in df.iterrows():
        if row['episode_type'] == 'R':
            df.at[idx, 'call_recall_status_authorised_by'] = random.choice(['GP', 'SCREENING_OFFICE'])
        else:
            df.at[idx, 'call_recall_status_authorised_by'] = None

    return df


def e_gen_end_code_last_updated(df):
    df['end_code_last_updated'] = None

    for idx, row in df.iterrows():
       df.at[idx, 'end_code_last_updated'] = row['change_db_date_time'] + relativedelta(months=8)
    
    return df


def e_gen_bso_organisation_code(df, bso_organisations_mapping_df):
    
    df['bso_organisation_code'] = None

    for nhs_number, group in df.groupby('nhs_number'):
      org_code = bso_organisations_mapping_df['bso_organisation_code'].sample(1).iloc[0]
      df.loc[group.index, 'bso_organisation_code'] = org_code

    return df


def e_gen_bso_batch_id(df):
    df['bso_batch_id'] = None

    for idx, row in df.iterrows():
        random_numbers = ''.join(random.choices(string.digits, k=6))
        random_letter = random.choice(string.ascii_uppercase)
        df.at[idx, 'bso_batch_id'] = f"{row['bso_organisation_code']}{random_numbers}{random_letter}"

    return df


def e_gen_reason_closed_code(df):
    df['reason_closed_code'] = None

    for idx, row in df.iterrows():
        if row['end_code'] == 'DNR':
            df.at[idx, 'reason_closed_code'] = 'NA'
        elif row['end_code'] == 'DNA':
            df.at[idx, 'reason_closed_code'] = 'NA'
        elif row['end_code'] == 'SC':
            df.at[idx, 'reason_closed_code'] = 'R'

    return df


def e_gen_end_point(df):
    df['end_point'] = 'S-,W+,H-'
    return df


def e_gen_final_action_code(df, final_action_code_mapping_df):
    df['final_action_code'] = None

    non_rr_codes = final_action_code_mapping_df[final_action_code_mapping_df['code'] != 'RR']['code'].tolist()

    for idx, row in df.iterrows():
        if row['reason_closed_code'] == 'R':
            if random.random() < 0.9:
                df.at[idx, 'final_action_code'] = 'RR'
            else:
                random_code = random.choice(non_rr_codes)
                df.at[idx, 'final_action_code'] = random_code

    return df

### Subject Column Definitions

In [3]:
def s_gen_initial_values(df):
    output_df = df[['nhs_number', 'change_db_date_time', 'bso_organisation_code', 'early_recall_date', 'date_of_as', 'date_of_foa', 'episode_date']] # same values as episodes
    output_df = output_df.copy()
    output_df['superseded_nhs_number'] = None # blank for simplicity
    output_df['removal_reason'] = None # blank for simplicity
    output_df['removal_date'] = None # blank for simplicity
    output_df['subject_status_code'] = 'Normal' # for simplicity
    output_df['ntdd_calculation_method'] = 'Routine' # for simplicity

    return output_df


def s_gen_is_higher_risk(df):
    df['is_higher_risk'] = None

    for idx in df.index:
        if random.random() < 0.05:
            df.at[idx, 'is_higher_risk'] = True
        else:
            df.at[idx, 'is_higher_risk'] = False

    return df


def s_gen_preferred_language(df, preferred_language_mapping_df):
    df['preferred_language'] = None

    non_en_codes = preferred_language_mapping_df[preferred_language_mapping_df['code'] != 'EN']['code'].tolist()

    for idx, row in df.iterrows():
           if random.random() < 0.9:
              df.at[idx, 'preferred_language'] = 'EN'
           else:
            random_code = random.choice(non_en_codes)
            df.at[idx, 'preferred_language'] = random_code

    return df


def s_gen_gp_practice_code(df, bso_gp_mapping_df):
    df['gp_practice_code'] = None
    for (nhs_number, bso_organisation_code), group in df.groupby(['nhs_number', 'bso_organisation_code']):
      # Filter the mapping table for the current bso_organisation_code
      matching_rows = bso_gp_mapping_df[bso_gp_mapping_df['bso_organisation_code'] == bso_organisation_code]
      if not matching_rows.empty:
          # Select a random gp_practice_code from the matching rows
          random_gp_code = matching_rows['gp_practice_code'].sample(1).iloc[0]
          df.loc[group.index, 'gp_practice_code'] = random_gp_code
    
    return df


def s_gen_next_test_due_date(df):
    df['next_test_due_date'] = None

    for idx, row in df.iterrows():
        if pd.notna(row['date_of_as']):
            df.at[idx, 'next_test_due_date'] = row['date_of_as'] + relativedelta(years=3)
        else:
            df.at[idx, 'next_test_due_date'] = row['episode_date'] + relativedelta(years=3)

    return df


def s_gen_latest_invitation_date(df):
    df['latest_invitation_date'] = None

    for idx, row in df.iterrows():
        if pd.notna(row['date_of_foa']):
            df.at[idx, 'latest_invitation_date'] = row['date_of_foa']
        else:
            df.at[idx, 'latest_invitation_date'] = row['episode_date']

    return df


def s_gen_reason_for_ceasing_code(df, cease_reason_mapping_df):
    df['reason_for_ceasing_code'] = None
    most_recent_df = df.loc[df.groupby('nhs_number')['change_db_date_time'].idxmax()]

    for idx, row in most_recent_df.iterrows():
        if random.random() < 0.05:
            random_code = cease_reason_mapping_df['code'].sample(1).iloc[0]
            df.at[idx, 'reason_for_ceasing_code'] = random_code

    return df


def s_gen_higher_risk_next_test_due_date(df):
    df['higher_risk_next_test_due_date'] = None

    for idx, row in df.iterrows():
        if row['is_higher_risk'] == True:
            df.at[idx, 'higher_risk_next_test_due_date'] = row['next_test_due_date']

    return df


def s_gen_hr_recall_due_date(df):
    df['hr_recall_due_date'] = None

    for idx, row in df.iterrows():
        if row['is_higher_risk'] == True and pd.notna(row['early_recall_date']):
             df.at[idx, 'hr_recall_due_date'] = row['early_recall_date']

    return df


def s_gen_higher_risk_referral_reason_code(df, higher_risk_referral_reasons_mapping_df):
    df['higher_risk_referral_reason_code'] = None

    for idx, row in df.iterrows():
        if row['is_higher_risk'] == True:
            random_reason = higher_risk_referral_reasons_mapping_df['code'].sample(1).iloc[0]
            df.at[idx, 'higher_risk_referral_reason_code'] = random_reason

    return df


def s_gen_date_irradiated(df):
    df['date_irradiated'] = None

    radiotherapy_codes = [
        'RADIOTHERAPY_BELOW_30',
        'RADIOTHERAPY_LOWER',
        'RADIOTHERAPY_UPPER'
    ]

    for idx, row in df.iterrows():
        if row['higher_risk_referral_reason_code'] in radiotherapy_codes and pd.notna(row['date_of_as']):
            df.at[idx, 'date_irradiated'] = row['date_of_as'] + relativedelta(months=1)

    return df


def s_gen_is_higher_risk_active(df):
    df['is_higher_risk_active'] = None
    
    for idx, row in df.iterrows():
        if row['is_higher_risk'] == True:
            df.at[idx, 'is_higher_risk_active'] = False
        else:
            df.at[idx, 'is_higher_risk_active'] = True
    
    return df


def s_gen_gene_code(df, higher_risk_genes_mapping_df):
    df['gene_code'] = None
    
    valid_codes = [
        'BRCA_RISK',
        'BRCA_TESTED',
        'HR_GENE_UNTESTED',
        'OTHER_GENE_MUTATIONS'
    ]
    
    for idx, row in df.iterrows():
        if row['higher_risk_referral_reason_code'] in valid_codes:
            random_gene = higher_risk_genes_mapping_df['code'].sample(1).iloc[0]
            df.at[idx, 'gene_code'] = random_gene
    
    return df


def s_gen_subject_postcode(df, bso_gp_mapping_df):
    df['subject_postcode'] = None
    
    for idx, row in df.iterrows():
        matching_rows = bso_gp_mapping_df[bso_gp_mapping_df['gp_practice_code'] == row['gp_practice_code']]
        if not matching_rows.empty:
            # Select the gp_practice_postcode from the matching rows
            postcode = matching_rows['gp_practice_postcode'].iloc[0]
            df.at[idx, 'subject_postcode'] = postcode
    
    return df

### Generate Tables Definitions

In [4]:
def gen_episodes(mapping_tables, num_records=100000):
    episodes_df = pd.DataFrame(e_gen_nhs_number_and_episode_id(num_records))
    episodes_df = e_gen_appointment_made(episodes_df)
    episodes_df = e_gen_date_of_foa(episodes_df)
    episodes_df = e_gen_date_of_as(episodes_df)
    episodes_df = e_gen_episode_date(episodes_df)
    episodes_df = e_gen_early_recall_date(episodes_df)
    episodes_df = e_gen_change_db_date_time(episodes_df)
    episodes_df = e_gen_episode_type(episodes_df, mapping_tables['EpisodeType'])
    episodes_df = e_gen_end_code(episodes_df)
    episodes_df = e_gen_call_recall_status_authorised_by(episodes_df)
    episodes_df = e_gen_end_code_last_updated(episodes_df)
    episodes_df = e_gen_bso_organisation_code(episodes_df, mapping_tables['bso_organisations'])
    episodes_df = e_gen_bso_batch_id(episodes_df)
    episodes_df = e_gen_reason_closed_code(episodes_df)
    episodes_df = e_gen_end_point(episodes_df)
    episodes_df = e_gen_final_action_code(episodes_df, mapping_tables['FinalActionCode'])
    return episodes_df


def gen_subjects(episodes_df, mapping_tables):
    subjects_df = s_gen_initial_values(episodes_df)
    subjects_df = s_gen_is_higher_risk(subjects_df)
    subjects_df = s_gen_preferred_language(subjects_df, mapping_tables['Language'])
    subjects_df = s_gen_gp_practice_code(subjects_df, mapping_tables['bso_gp_mapping'])
    subjects_df = s_gen_next_test_due_date(subjects_df)
    subjects_df = s_gen_latest_invitation_date(subjects_df)
    subjects_df = s_gen_reason_for_ceasing_code(subjects_df, mapping_tables['CeaseReason'])
    subjects_df = s_gen_higher_risk_next_test_due_date(subjects_df)
    subjects_df = s_gen_hr_recall_due_date(subjects_df)
    subjects_df = s_gen_higher_risk_referral_reason_code(subjects_df, mapping_tables['higher_risk_referral_reasons'])
    subjects_df = s_gen_date_irradiated(subjects_df)
    subjects_df = s_gen_is_higher_risk_active(subjects_df)
    subjects_df = s_gen_gene_code(subjects_df, mapping_tables['higher_risk_genes'])
    subjects_df = s_gen_subject_postcode(subjects_df, mapping_tables['bso_gp_mapping'])
    subjects_df = subjects_df.drop(columns=['date_of_as', 'date_of_foa', 'episode_date'])
    return subjects_df

### Load Mapping Tables

In [5]:
mapping_tables_path = "../mapping_tables"
mapping_files = glob(os.path.join(mapping_tables_path, "*.csv"))
mapping_tables = {os.path.basename(file).replace(".csv", ""): pd.read_csv(file) for file in mapping_files}

### Generate Episodes and Subjects

In [None]:
episodes_df = gen_episodes(mapping_tables)
subjects_df = gen_subjects(episodes_df, mapping_tables)

### Validate Columns and Output CSVs

In [10]:
output_templates_path = "../output_templates"
test_episodes_path = os.path.join(output_templates_path, "test_episodes_20241009.csv")
test_subjects_path = os.path.join(output_templates_path, "test_subjects_20241009.csv")
test_episodes = pd.read_csv(test_episodes_path, nrows=0)
test_subjects = pd.read_csv(test_subjects_path, nrows=0)
test_subjects['subject_postcode'] = None

In [11]:
assert set(test_episodes.columns) == set(episodes_df.columns), "Episodes columns error"
assert set(test_subjects.columns) == set(subjects_df.columns), "Subject columns error"
print("Checks passed")

Checks passed


In [12]:
datestamp = datetime.now().strftime('%Y%m%d')
output_folder = "../output_data"
episodes_file = os.path.join(output_folder, f"audit_episodes_{datestamp}.csv")
subjects_file = os.path.join(output_folder, f"audit_subjects_{datestamp}.csv")
episodes_df.to_csv(episodes_file, index=False)
subjects_df.to_csv(subjects_file, index=False)

### Cheat Join Episodes and Subjects for Simplicity

In [15]:
episodes_df = pd.read_csv("../output_data/audit_episodes_20250523.csv")
subjects_df = pd.read_csv("../output_data/audit_subjects_20250523.csv")

In [16]:
episodes_df = episodes_df.rename(columns={'change_db_date_time': 'e_change_db_date_time'})
subjects_df = subjects_df.rename(columns={'change_db_date_time': 's_change_db_date_time'})

In [17]:
episodes_df.columns

Index(['nhs_number', 'episode_id', 'appointment_made', 'date_of_foa',
       'date_of_as', 'episode_date', 'early_recall_date',
       'e_change_db_date_time', 'episode_type', 'end_code',
       'call_recall_status_authorised_by', 'end_code_last_updated',
       'bso_organisation_code', 'bso_batch_id', 'reason_closed_code',
       'end_point', 'final_action_code'],
      dtype='object')

In [18]:
subjects_df.columns

Index(['nhs_number', 's_change_db_date_time', 'bso_organisation_code',
       'early_recall_date', 'superseded_nhs_number', 'removal_reason',
       'removal_date', 'subject_status_code', 'ntdd_calculation_method',
       'is_higher_risk', 'preferred_language', 'gp_practice_code',
       'next_test_due_date', 'latest_invitation_date',
       'reason_for_ceasing_code', 'higher_risk_next_test_due_date',
       'hr_recall_due_date', 'higher_risk_referral_reason_code',
       'date_irradiated', 'is_higher_risk_active', 'gene_code',
       'subject_postcode'],
      dtype='object')

In [None]:
combined_df = pd.concat([subjects_df, episodes_df], axis=1)
combined_df = combined_df.loc[:,~combined_df.columns.duplicated()]
file = os.path.join(output_folder, f"bs-select_combined_{datestamp}.csv")
combined_df.to_csv(file, index=False)