Authors: Abhishek Bhatia, Tomas McIntee, John Powers  
© 2025, The University of North Carolina at Chapel Hill. Permission is granted to use in accordance with the MIT license.  
The code is licensed under the MIT license.  


***Model Training Process***

For the training process, three data source tables were combined with two OMOP concept tables and a variable rollup table borrowed from a previously published model.

**full_cohort**, the dataset generated by the Diagnosis-cohort-building pipeline.
* *person_id*, an identifier used to link persons.
* *index_date*, assigned as earliest date of diagnosis with ME/CFS or PASC for known positives and otherwise assigned randomly within a patient's temporal record.
* *window_start*, assigned in this training process as 30 days prior to *index_date*.
* *window_end*, assigned in this training process as 30 days prior to *index_date*
* *cohort_label*, coded as PASC, CFS, PASC_CFS, or CTRL.
* *pre_covid*, an indicator of whether or not the index date is before the COVID-19 era.
* *PASC_index*, date of PASC diagnosis, usually null.
* *CFS_index*, date of CFS diagnosis, usually null.
* *is_female* Used for matching and as a model variable.
* *age_bin* Used for matching and as a model variable.
* *ethnicity* Used for matching only.
* *race* Used for matching only.
* *cci_score* Used for matching only.

**infections_cohort**, a dataset of infections containing the following columns:
* *person_id*, an identifier used to link persons.
* *infection_date*, date of a COVID-19 infection. In our data, based on a minimum date between diagnosis and prescriptions for either Paxlovid or Remdesivir.
* *infection_count*, a within-person index of how many prior infections the person has had.
* *blackout_begin*, set in this work at 7 days prior to infection index.
* *blackout_end*, set in this work at 28 days prior to infection index.

**condition_occurrence**, an OMOP-formatted table containing the following important columns:
* *person_id*, an identifier used to link persons.
* *condition_start_date*, used 
* *condition_concept_id*, a unique identifier for a condition
* *condition_concept_name*, a human-readable condition name useful for model evaluation and interpretation.

**rollup_lookup**, a table designating which rarer conditions should be combined. This was generated based on statistically desirable minimum counts and was used to simplify the **condition_occurrence** table. The following four data columns were utilized:
* *ancestor_concept_id* The ID number of a common condition.
* *descendant_concept_id* The ID number of a rare condition to be rolled up.
* *anc_concept_name* The name of the common condition.
* *des_concept_name* The name of the rare condition.

In [None]:
# Python imports
from foundry_ml import Model, Stage
from pandas import DataFrame
import shap
import matplotlib
from matplotlib import pyplot
import pandas
import numpy
from xgboost.sklearn import XGBClassifier

from pyspark.sql import functions as F
from pyspark.sql.types import IntegerType, FloatType

In [None]:
# R requirements
require(MatchIt)
require(dplyr)

***Cohort generation***
Matched training cohorts were generated with the *MatchIt* package. This took place within R, although alternative matching techniques are available in Python.

In [None]:
#Set seed and training fraction for replicability
set.seed(37)
training_fraction <- 0.8
# Original Python code included for reference. R equivalent code provided for convenience.

# seed_choice = 37 # Replicability, but note that Spark poses issues.
# For patients who are diagnosed with both PASC and CFS, index date needs to be corrected as necessary.
# CFS_cohort = full_cohort.filter("cohort_label == 'CFS' or cohort_label == 'PASC'") \
#    .withColumn("window_start",full_cohort.window_start + (full_cohort.CFS_index - full_cohort.index_date)) \
#    .withColumn("window_end",full_cohort.window_end + (full_cohort.CFS_index - full_cohort.index_date)) \
#    .withColumn("pre_covid",full_cohort.pre_covid)
# PASC_cohort = full_cohort.filter("cohort_label == 'PASC' or cohort_label == 'PASC'") \
#    .withColumn("window_start",full_cohort.window_start + (full_cohort.PASC_index - full_cohort.index_date)) \
#    .withColumn("window_end",full_cohort.window_end + (full_cohort.PASC_index - full_cohort.index_date)) \
#    .withColumn("pre_covid",full_cohort.pre_covid_PASC)
#
# Base cohorts:
CTRL_cohort <- full_cohort %>%
    filter(cohort_label == "CTRL")
# For patients who are diagnosed with both PASC and CFS, index date needs to be corrected as necessary.
PASC_cohort <- full_cohort %>%
    filter(cohort_label == "PASC" | cohort_label == "PASC_CFS") %>%
    mutate(window_start = window_start + PASC_index - index_date,
        window_end = window_end + PASC_index - index_date)
CFS_cohort <- full_cohort %>%
    filter(cohort_label == "CFS" | cohort_label == "PASC_CFS") %>%
    mutate(window_start = window_start + CFS_index - index_date,
        window_end = window_end + CFS_index - index_date)


# Random sampling
sick_count = nrow(full_cohort) - nrow(CTRL_cohort)

CTRL_train = CTRL_cohort.sample %>%
    slice_sample(n = 2*sick_count) #Optional. In our case, this was required by performance. A larger multiple can (and should) be used if it is needed to find matches for the PASC / CFS training subsets, otherwise selection effects become troublesome.

PASC_train = PASC_cohort %>%
    filter(index_date == PASC_index) %>% # Recommended but not necessarily required. Restricts training set to PASC-first patients.
    slice_sample(prop = training_fraction)  %>%
    rbind(PASC_train) %>%
    filter(pre_covid == FALSE) %>% 
    mutate(sick = (cohort_label == "PASC" | cohort_label == "PASC_CFS"))

CFS_train = CFS_cohort %>%
    filter(index_date == CFS_index) %>% # Recommended but not necessarily required. Restricts training set to CFS-first patients.
    slice_sample(prop = training_fraction) %>%
    rbind(CFS_train) %>%
    #filter(pre_covid == TRUE) %>% #This is the one line that controls whether or not CFS model training is restricted to pre_covid.
    mutate(sick = (cohort_label == "CFS" | cohort_label == "PASC_CFS"))

#Run matching. Exact matching on already-coarse demographic variables is in this case was very easy. In particular, training on data matched on age, sex, and CCI changes the model significantly.

common_matched <- function(cohort_df)
{
    matchy <- matchit(formula = sick ~ 1,
        data = cohort_df,
        exact = ~ ethnicity + race + is_female + age_bin + pre_covid + cci_score, # This list of matching variables can be revisited. 
        # exact = ~ ethnicity + race + is_female + age_bin + cci_score, # Main variation, as precovid status was thought to be likely significant in the distribution of conditions, partly through changes to data coding. 
        replace = FALSE,
        m.order = "random")
    print("matching complete")
    matched <- get_matches(matchy,data = cohort_df)
    return(matched)
}
matched_CFS <- common_matched(matchy_CFS)
matched_PASC <- common_matched(matched_PASC)

# We tested but did not find useful a PASC vs CFS classifier, even when not using matching on pre-covid era:

PVC_train <- PASC_train %>%
    rbind(CFS_train) %>%
    select(person_id,cohort_label,index_date,window_start,window_end,is_female,age_bin,ethnicity,race,cci_score,pre_covid) %>% 
    mutate(pre_covid == 0) %>% # Optional "soft delete" of pre_covid variable for matching purposes. In this case, the model mainly picks up on "new" conditions as weakly predictive of PASC over CFS.
    filter(cohort_label != "PASC_CFS") %>%
    mutate(sick = (cohort_label == "CFS"))

matched_PVC <- common_matched(PVC_train)

# All patients not used in training processes are labeled as the test cohort. Selection effect problems arise if the training fraction is high and the matching process misses significant numbers of CFS or PASC patients.
test_cohort <- full_cohort %>%
    select(person_id,cohort_label,index_date,window_start,window_end,is_female,age_bin,ethnicity,race,cci_score,pre_covid) %>%
    anti_join(matched_CFS %>% select(person_id)) %>%
    anti_join(matched_PASC %>% select(person_id))

# At this point, the code will transition to Python. In our pipeline, this was handled by the environment.
# Running code locally, it may be necessary to take additional steps to load the R dataframes into Python, such as saving to CSV and then loading. Code below is an example for convenience.

write.csv(matched_CFS,"matched_CFS.csv")
write.csv(matched_PASC,"matched_PASC.csv")
write.csv(matched_PVC,"matched_PVC.csv")
write.csv(test_cohort,"test_cohort.csv")

In [None]:
# This would then load CSVs into Python (Pyspark):

matched_CFS = spark.read.format("csv").option("header", "true").load("matched_CFS.csv")
matched_PASC = spark.read.format("csv").option("header", "true").load("matched_PASC.csv")
matched_PVC = spark.read.format("csv").option("header", "true").load("matched_PVC.csv")
test_cohort = spark.read.format("csv").option("header", "true").load("test_cohort.csv")

***Condition preparation***
From the base OMOP-format *condition_occurrence* table, uncommon conditions are rolled up in order to improve model performance and efficiency using the *rollup_lookup* table developed for the LCM 2.0 model (see Crosskey, M., McIntee, T., Preiss, S., Brannock, D., Yoo, Y. J., Hadley, E., ... & Pfaff, E. (2023). Reengineering a machine learning phenotype to adapt to the changing COVID-19 landscape: A study from the N3C and RECOVER consortia. medRxiv, 2023-12.), with the exception of the roll-up of pregnancy concepts. This is an optional step, although for performance purposes, uncommon conditions should either be eliminated from the data or rolled up to more common conditions.

The final rollup lookup table uses the following columns:
* *ancestor_concept_id*
* *descendant_concept_id*
* *anc_concept_name*

A second step in the process is selecting features

In [None]:
CREATE OR REPLACE TABLE pregnancy_concepts AS 
SELECT ca.ancestor_concept_id, 
    rr1.concept_name as anc_concept_name, 
    ca.descendant_concept_id, 
    rr2.concept_name as des_concept_name, 
    ca.min_levels_of_separation, 
    ca.max_levels_of_separation
FROM recover_release_concept_ancestor ca JOIN recover_release_concept rr1 ON ca.ancestor_concept_id = rr1.concept_id
    JOIN recover_release_concept rr2 ON ca.descendant_concept_id = rr2.concept_id
WHERE rr1.vocabulary_id = 'SNOMED' 
    AND rr1.standard_concept = 'S' 
    AND rr1.domain_id = 'Condition'
    AND (rr1.concept_name = 'Finding related to pregnancy' OR rr1.concept_name = 'Delivery finding');

CREATE OR REPLACE TABLE modified_rollup_lookup AS
SELECT DISTINCT nvl(pregnancy_concepts.descendant_concept_id,rolled_lookup.descendant_concept_id) AS descendant_concept_id,
    nvl(pregnancy_concepts.des_concept_name,rolled_lookup.des_concept_name) AS des_concept_name,
    nvl(pregnancy_concepts.ancestor_concept_id,rolled_lookup.ancestor_concept_id) AS ancestor_concept_id,
    nvl(pregnancy_concepts.anc_concept_name,rolled_lookup.anc_concept_name) AS anc_concept_name
FROM rolled_lookup FULL OUTER JOIN pregnancy_concepts ON rolled_lookup.descendant_concept_id = pregnancy_concepts.descendant_concept_id

In [None]:
# Helper functions for condition preparation:

# Merges conditions and cohort. In our actual pipeline, merging was performed via SQL query (in comment below).
# SELECT cohort.*,
#    conditions_blacked_out.condition_concept_id,
#    conditions_blacked_out.condition_concept_name,
#    nvl(conditions_blacked_out.condition_end_date, conditions_blacked_out.condition_start_date) AS condition_end_date,
#    conditions_blacked_out.condition_start_date
# FROM cohort INNER JOIN conditions_blacked_out ON cohort.person_id = conditions_blacked_out.person_id
# WHERE (conditions_blacked_out.condition_start_date BETWEEN cohort.window_start AND cohort.window_end)
#    OR (conditions_blacked_out.condition_end_date BETWEEN cohort.window_start AND cohort.window_end)

def merge_cond_person(cohort,conditions):
    merged_df = cohort.join(cohort, conditions, on='person_id')
    merged_df['condition_end_date'] = merged_df['condition_end_date'].fillna(filtered_df['condition_start_date']) # In our data, condition_end_date was frequently null, but start date never was. YMMV.
    filtered_df = merged_df[
        (merged_df['condition_start_date'].between(merged_df['window_start'], merged_df['window_end'])) |
        (merged_df['condition_end_date'].between(merged_df['window_start'], merged_df['window_end']))
    ]
    result_df = filtered_df[['person_id', 'condition_concept_id', 'condition_concept_name', 'condition_end_date', 'condition_start_date']]
    return result_df

# This helper function performs rollup processes:
def condition_rollup(condition_table, lookup_table):
    rolled_up = lookup_table \
        .join(lookup_table, condition_table['condition_concept_id'] == lookup_table['descendant_concept_id'])
    rolled_up = rolled_up[['person_id','ancestor_concept_id','anc_concept_name','sick','cohort_label','pre_covid','condition_start_date','condition_end_date']] \
        .withColumnRenamed('ancestor_concept_id','condition_concept_id') \
        .withColumnRenamed('anc_concept_name','condition_concept_name') \
        .distinct()
    return rolled_up

# Helper function for feature selection:
def select_features(conditions_table, number_features):
    # List of conditions to drop that would be unhelpful as features for various reasons
    drop_conditions = [
        # Roll-up misc bin:
        '4041283',  # General finding of observation of patient
        # COVID conditions:        
        '37311061', # COVID-19
        '4100065',  # Disease due to Coronaviridae
        '3661408',  # Pneumonia due to SARS-COV-2
        '4195694',  # Acute respiratory distress syndrome (ARDS).
        # Explicit sequelae diagnosis list follows:
        '36714927', # Sequelae of infectious disease
        '432738',   # Chronic fatigue syndrome
        '44793521', # Also CFS
        '44793522', # Also CFS
        '44793523', # Also CFS
        '40405599', # Fibromyalgia
        '444205',   # Disorder following viral disease
        '4202045',  # Postviral fatigue syndrome (G9.93)
        '705076',   # PASC (U09.9)
        '4159659'   # Postural orthostatic tachycardia syndrome (POTS, G90.A)
        ] 
    #Drop extra columns, make distinct().
    conditions_table = conditions_table[['person_id','sick','condition_concept_id','condition_concept_name']].distinct()
    #Calculations:
    obs = conditions_table[['person_id']].distinct().count()
    pos = conditions_table.filter(conditions_table.sick)[['person_id']].distinct().count()
    ev = pos/obs
    conditions_filtered = conditions_table.filter(conditions_table['condition_concept_id'].isin(drop_conditions) == False)
    #Add means:
    with_means = conditions_filtered.groupBy('condition_concept_id').count()\
        .withColumn('mean',functions.col('count') / obs)\
        .drop('count')\
        .join(conditions_filtered, on='condition_concept_id')
    #Calculate covariance
    cov_calced = with_means.withColumn('label', with_means.sick.cast("float"))\
        .withColumn('value', (1 - functions.col('mean')) * (functions.col('label') - ev) / obs)\
        .groupBy('condition_concept_name','condition_concept_id')\
        .agg(functions.abs(functions.sum('value')).alias('abs_covariance'),functions.sum('value').alias('covariance'))\
        .toPandas()\
        .sort_values(by = 'abs_covariance', ascending = False)
    
    cov_calced_short = cov_calced\
        .sort_values(by = 'abs_covariance', ascending = False)\
        .head(number_features)
    cov_calced_short['covariance_indicator'] = numpy.where((cov_calced_short.covariance < 0), 'negative', 'positive')
    return(cov_calced_short)
# In our actual pipeline, SQL was used to convert conditions to counts:
# CREATE OR REPLACE TABLE counts_PASC AS
# SELECT DISTINCT conditions_PASC.person_id,
#    conditions_PASC.condition_concept_id,
#    CASE WHEN MAX(conditions_PASC.condition_start_date) = MIN(conditions_PASC.condition_start_date) AND
#        MAX(conditions_PASC.condition_end_date) = MIN(conditions_PASC.condition_end_date) THEN 1
#    ELSE 1 --Change to 2 for ternary conditions
#    END AS one_or_many_conds
# FROM conditions_PASC
# GROUP BY conditions_PASC.person_id, conditions_PASC.condition_concept_id;

# This logic has been incorporated for replication convenience in the below helper function
def model_style_data(features, cohort, conditions):
    grouped_df = conditions.groupBy('person_id', 'condition_concept_id')
    conds_counts = grouped_df.agg(
        when(
            (spark_max('condition_start_date') == spark_min('condition_start_date')) &
            (spark_max('condition_end_date') == spark_min('condition_end_date')),
            1
        ).otherwise(1).alias('one_or_many_conds')  # Change to 2 for ternary conditions
    )
    converted = features.join(conds_counts, on = 'condition_concept_id')\
        .groupBy('person_id')\
        .pivot('condition_concept_id')\
        .sum('one_or_many_conds')\
        .join(cohort['person_id','age_bin','is_female','sick'], on = 'person_id', how = 'outer')\
        .na.fill(value = 0)
    return(converted)

# Training function
def train_the_model(training_set, seed):
    df = training_set.toPandas()
    y = (df["sick"].to_numpy())
    X = df.drop(columns = ["sick", "person_id"])
    X = X.to_numpy()
    model =  XGBClassifier(colsample_bytree=0.1, gamma=0.4, learning_rate=0.09, max_depth=12, min_child_weight=0, n_estimators=400, subsample=0.9, random_state=seed)
    model.fit(X, y)
    return Model(Stage(model))

In [None]:
# This script cuts down the table of conditions based on blackout dates and base cohort inclusion criteria.

# Include cohort members without infections, do not include infections outside of cohort, trimming horizontally to minimize memory usage:
person_date = full_cohort[['person_id']].join(infections_cohort, on='person_id',how='left')[['person_id','blackout_begin','blackout_end']]

# Trim conditions table horizontally for performance:
cond = condition_occurrence[['person_id','condition_occurrence_id','condition_start_date','condition_end_date','condition_concept_id','condition_concept_name']]

# Identify blacked out conditions:
blacked_out_ids = person_date.join(cond, on='person_id') \
    .filter(df['condition_start_date'] >= df['blackout_begin']) \
    .filter(df['condition_start_date'] <= df['blackout_end'])[['condition_occurrence_id']]

# Anti join to rid ourselves of the blacked out visits
conditions_blacked_out = cond.join(blacked_out_ids, on='condition_occurrence_id', how='leftanti')

# Combine cohort and condition data:
merged_CFS = merge_cond_person(CFS_train,conditions_blacked_out)
merged_PASC = merge_cond_person(PASC_train,conditions_blacked_out)
merged_PVC = merge_cond_person(PVC_train,conditions_blacked_out)
merged_test = merge_cond_person(test_cohort,conditions_blacked_out) #This is not used for model training, but will be used later.

# Roll up rare conditions:
conditions_CFS = condition_rollup(merged_CFS,modified_rollup_lookup)
conditions_PASC = condition_rollup(merged_PASC,modified_rollup_lookup)
conditions_PVC = condition_rollup(merged_PVC,modified_rollup_lookup)
conditions_test = condition_rollup(merged_test,modified_rollup_lookup)



***Model training***
The above condition processing is model-agnostic. Below is the model-specific processing of data and the model training process:

In [None]:
# This script performs model-related processing.

# Identify selected features:
selected_features_CFS = select_features(conditions_CFS,200)\
    .withColumnRenamed('abs_covariance','cfs_abs_covariance')\
    .withColumnRenamed('covariance','cfs_covariance')
selected_features_PASC = select_features(conditions_PASC,200)\
    .withColumnRenamed('abs_covariance','pasc_abs_covariance')\
    .withColumnRenamed('covariance','pasc_covariance')
selected_features_PVC = select_features(conditions_PVC,200)

# Use common feature pool for CFS and PASC classifiers for apples-to-apples universe:
selected_features = selected_features_CFS.merge(selected_features_PASC, 
    on = ['condition_concept_name','condition_concept_id','pasc_abs_covariance','cfs_abs_covariance','pasc_covariance','cfs_covariance'], 
    how = 'outer'
    )
selected_features['covariance_indicator'] = numpy.where((selected_features.pasc_covariance < 0) & (selected_features.cfs_covariance < 0), 'negative', 
    numpy.where((selected_features.pasc_covariance > 0) & (selected_features.cfs_covariance > 0), 'positive', 'mixed'))

# Convert all data to wide format:
training_set_PASC = model_style_data(selected_features,matched_PASC,conditions_PASC)
training_set_CFS = model_style_data(selected_features,matched_CFS,conditions_CFS)
testing_set = model_style_data(selected_features,cohort_test,conditions_test)

# PVC classifier:
training_set_PVC = model_style_data(selected_features_PVC,matched_PVC,conditions_PVC)
testing_set_alt = model_style_data(selected_features_PVC,cohort_test,conditions_test)

In [None]:

# Actual model training. Separate because it can be quite long.
model_CFS = train_the_model(training_set_CFS,42)
model_PASC = train_the_model(training_set_PASC,42)
model_PVC= train_the_model(training_set_PVC,42)

***Model application***
Having trained the model, this may be then used to produce predictions. The predictions are float-valued model scores. These scores are not reasonably interpretable as probability estimates, as the training datasets are divided evenly between positives and negatives.

In [None]:
# Helper function:
def apply_model(dataset, trained_model):
    #Standard model setup:
    model = trained_model.stages[0].model
    #count_features = F.udf(lambda row: len([x for x in row if x == 1]), IntegerType())
    #Internal fn:
    def run_model(row):
        arr = np.array([x for x in row])[None,:]
        y_pred =  model.predict_proba(arr)
        value = float(y_pred.squeeze()[1]) # prob of class=1
        return value
    #Define 
    model_udf = F.udf(run_model, FloatType())
    #Now apply 
    predictions = dataset.withColumn("model_score", model_udf(F.struct([dataset[x] for x in dataset.columns if x not in ['person_id']])))
    #Output narrow model score:
    predictions = predictions[['person_id','model_score']]
    return predictions

In [None]:
# Depending on size of data and environment, these datasets may need to be chunked up substantially before running the models.

predictions_CFS = apply_model(testing_set, model_CFS)
predictions_PASC = apply_model(testing_set, model_PASC)
predictions_PVC = apply_model(testing_set_alt, model_PVC)