In [1]:
#note: must use a microsim kernel or a kernel that can load all necessary python modules

In [2]:
import csv
import os
import numpy as np
import pandas as pd
import logging
logging.basicConfig(level=logging.INFO)
import multiprocessing as mp
from pathlib import Path
import copy
from pandarallel import pandarallel

In [3]:
from microsim.alcohol_category import AlcoholCategory
from microsim.bp_treatment_strategies import *
from microsim.cohort_risk_model_repository import CohortRiskModelRepository
from microsim.cv_outcome_determination import CVOutcomeDetermination
from microsim.data_loader import (get_absolute_datafile_path,
                                  load_regression_model)
from microsim.education import Education
from microsim.gender import NHANESGender
from microsim.gfr_equation import GFREquation
from microsim.initialization_repository import InitializationRepository
from microsim.nhanes_risk_model_repository import NHANESRiskModelRepository
from microsim.outcome import Outcome, OutcomeType
from microsim.outcome_model_repository import OutcomeModelRepository
from microsim.outcome_model_type import OutcomeModelType
from microsim.person import Person
from microsim.qaly_assignment_strategy import QALYAssignmentStrategy
from microsim.race_ethnicity import NHANESRaceEthnicity
from microsim.smoking_status import SmokingStatus
from microsim.statsmodel_logistic_risk_factor_model import \
    StatsModelLogisticRiskFactorModel

from microsim.population import NHANESDirectSamplePopulation
#from microsim.trials.trial_description import TrialDescription
from microsim.trials.trial import Trial
from microsim.outcome_model_type import OutcomeModelType
from microsim.bp_treatment_strategies import SprintTreatment
from microsim.trials.logistic_regression_analysis import LogisticRegressionAnalysis
from microsim.trials.linear_regression_analysis import LinearRegressionAnalysis
from microsim.trials.outcome_assessor import OutcomeAssessor
from microsim.trials.attribute_outcome_assessor import AttributeOutcomeAssessor
from microsim.trials.attribute_outcome_assessor import AssessmentMethod
from microsim.outcome import OutcomeType
from microsim.trials.risk_filter import RiskFilter
from microsim.trials.trial_utils import get_analysis_name
from microsim.population import PersonListPopulation
from statsmodels.tools.sm_exceptions import PerfectSeparationError

In [4]:
#any microsim dir will work, just need to access the NHANES data
microsimDir = "/users/PAS2164/deligkaris/MICROSIM/CODE/microsim"
os.chdir(microsimDir)

In [5]:
#I had to disable pandarallel on this class which is why I copied the entire population.py file here 
#in method get_people_current_state_as_dataframe the variable parallel is used 
#to enable/disable pandarallel but parallel can mean other things as well
#for now I used variable pandarallelOn elsewhere
#you can probably safely skip this entire code box

class Population:
    """
    Unit of people subject to treatment program over time.

    (WIP) THe basic idea is that this is a generic superclass which will manage a group of
    people. Tangible subclasses will be needed to actually assign assumptions for a given
    population. As it stands, this class doesn't do anything (other than potentially being
    useful for tests), because it isn't tied to tangible risk models. Ultimately it might
    turn into an abstract class...
    """

    _ageStandards = {}

    def __init__(self, people):
        self._people = people
        self._ageStandards = {}
        # luciana tag: discuss with luciana...want to keep track of the sim wave htat is currently running, while running
        # and also the total number of years advanced...need to think about how to do this is a way that will be safe
        # this approach has major risks if you forget to update one of these variables
        self._totalWavesAdvanced = 0
        self._currentWave = 0
        self._bpTreatmentStrategy = None
        self.num_of_processes = 8

        self._riskFactors = [
            "sbp",
            "dbp",
            "a1c",
            "hdl",
            "ldl",
            "trig",
            "totChol",
            "bmi",
            "anyPhysicalActivity",
            "afib",
            "waist",
            "alcoholPerWeek",
            "creatinine",
        ]
        # , 'otherLipidLoweringMedicationCount']
        self._treatments = ["antiHypertensiveCount", "statin"]
        self._timeVaryingCovariates = copy.copy(self._riskFactors)
        self._timeVaryingCovariates.append("age")
        self._timeVaryingCovariates.extend(self._treatments)
        self._timeVaryingCovariates.append("bpMedsAdded")

    def reset_to_baseline(self):
        self._totalWavesAdvanced = 0
        self._currentWave = 0
        self._bpTreatmentStrategy = None
        for person in self._people:
            person.reset_to_baseline()

    # trying to work this out. if we do get it worked out, then we probably want to rebuild the person to use systematic data structrures
    # (i.e. static attributes, time-varying attributes)
    # also, will need to thikn about ways to make sure that the dataframe version of reality stays synced with teh "patient-based" version of reality
    # for now, will build a DF at the beginnign and then update the peopel at the end...
    def advance_vectorized(self, years):
        pandarallelOn = False #CD added
        # get dataframe of people...
        df = self.get_people_current_state_and_summary_as_dataframe()
        alive = df.loc[df.dead == False]
        if pandarallelOn is True: #CD added
            pandarallel.initialize(verbose=1) 
        # might not need this row...depends o n whethe we do an bulk update on people or an wave-abased update
        waveAtStartOfAdvance = self._currentWave

        for yearIndex in range(years):
            logging.info(f"processing year: {self._currentWave}")
            alive = alive.loc[alive.dead == False]
            # if everybody has died, break out of the loop, no need to keep moving forward
            if len(alive) == 0:
                break
            self._currentWave += 1

            riskFactorsAndTreatment = {}
            # advance risk factors
            #import pdb; pdb.set_trace()
            for rf in self._riskFactors:
                # print(f"### Risk Factor: {rf}")
                if pandarallelOn is True: #CD added
                    riskFactorsAndTreatment[rf + "Next"] = alive.parallel_apply(
                        self._risk_model_repository.get_model(rf).estimate_next_risk_vectorized,
                        axis="columns",
                    )
                else: #CD added
                    riskFactorsAndTreatment[rf + "Next"] = alive.apply(
                        self._risk_model_repository.get_model(rf).estimate_next_risk_vectorized,
                        axis="columns",
                    )

            # advance treatment
            for treatment in self._treatments:
                # print(f"### Treatment: {treatment}")
                riskFactorsAndTreatment[treatment + "Next"] = alive.apply(
                    self._risk_model_repository.get_model(treatment).estimate_next_risk_vectorized,
                    axis="columns",
                )

            # apply treatment modifications
            alive = pd.concat([alive.reset_index(drop=True), pd.DataFrame(riskFactorsAndTreatment).reset_index(drop=True)], axis='columns', ignore_index=False)
            
            # bp meds added in this wave are 0...
            medsData = pd.DataFrame({'bpMedsAddedNext' :  pd.Series(np.zeros(len(alive))), 
                                    'totalBPMedsAddedNext' : pd.Series(alive['totalBPMedsAdded'])})
            
            # total bp meds are carried forward from teh prior wave
            alive = pd.concat([alive.reset_index(drop=True), medsData], axis='columns', ignore_index=False)
            if self._bpTreatmentStrategy is not None:
                alive = alive.apply(
                    self._bpTreatmentStrategy.get_changes_vectorized, axis="columns"
                )
            # advance outcomes
            # first determine if there is a cv event
            # add these variables here to speed up performance...better than adding one at a time
            # in the advance method...
            outcomeVars = {}
            # first, setup outcome variables
            for outcome in ["stroke", "mi", "dementia", "dead", "cvDeath", 'nonCVDeath']:
                outcomeVars[outcome + "Next"] = [False] * len(alive)
            outcomeVars["strokeFatal"] = [False] * len(alive)
            outcomeVars["miFatal"] =[False] * len(alive)
            #outcomeVars["qalyNext"] =np.zeros(len(alive))
            #outcomeVars["ageAtFirstDementia"] = [np.nan] * len(df)
            alive = pd.concat([alive.reset_index(drop=True), pd.DataFrame(outcomeVars) ], axis='columns')
            alive = alive.apply(
                self._outcome_model_repository.assign_cv_outcome_vectorized, axis="columns"
            )

            gcp = {}
            gcp["gcpNext"] = alive.apply(
                self._outcome_model_repository.get_gcp_vectorized, axis="columns"
            )
            gcp["gcpSlope"] = gcp['gcpNext'] - alive['gcp']
            alive.drop(columns=['gcpSlope'], inplace=True)
            alive = pd.concat([alive.reset_index(drop=True), pd.DataFrame(gcp)], axis='columns')

            # if the whole popluation has demeentia, nobody new can get dementia...            
            alive.dementia = alive.dementia.astype('bool_')
            if len(alive.loc[~alive.dementia]) > 0:
                newDementia = alive.loc[~alive.dementia].apply(
                    self._outcome_model_repository.get_dementia_vectorized, axis="columns"
                )
                alive["dementiaNext"] = newDementia
            else:
                alive["dementiaNext"] = np.repeat(False, len(alive))

            alive.loc[alive["dementiaNext"] == 1, "ageAtFirstDementia"] = alive.age
            alive["dementia"] = newDementia | alive["dementia"]

            numberAliveBeforeRecal = len(alive)

            alive = self.apply_recalibration_standards(alive)
            if len(alive) != numberAliveBeforeRecal:
                raise Exception(
                    f"number alive: {len(alive)} not equal to alive before recal {numberAliveBeforeRecal}"
                )
            alive["cvDeathNext"] = alive["deadNext"]
            alive["nonCVDeathNext"] = alive.apply(
                self._outcome_model_repository.assign_non_cv_mortality_vectorized, axis="columns"
            )
            alive["deadNext"] = alive["nonCVDeathNext"] | alive["cvDeathNext"]

            qaly = {}
            qaly["qalyNext"] = alive.apply(
                QALYAssignmentStrategy().get_qalys_vectorized, axis="columns"
            )
            alive = pd.concat([alive.reset_index(drop=True), pd.DataFrame(qaly)], axis='columns')

            alive.loc[~alive.dead, "age"] = alive.age + 1
            self._totalWavesAdvanced += 1

            alive = self.move_people_df_forward(alive)

            # for efficieicny, we could try to do this all at the end...but, its a bit cleanear  to do it wave by wave
            alive.apply(self.push_updates_back_to_people, axis="columns")
            nextCols = [col for col in alive.columns if "Next" in col]
            alive.drop(columns=nextCols, inplace=True)
            fatalCols = [col for col in alive.columns if "Fatal" in col]
            alive.drop(columns=fatalCols, inplace=True)
        return alive, df

    def push_updates_back_to_people(self, x):
        updatedIndices = set()
        peopleSet = set()
        person = self._people.iloc[int(x.populationIndex)]
        if x.populationIndex in updatedIndices or person in peopleSet:
            raise Exception(f"Population index: {x.populationIndex} already updated")
        else:
            updatedIndices.add(x.populationIndex)
            peopleSet.add(person)
        return self.update_person(person, x)

    def update_person(self, person, x):
        if person.is_dead():
            raise Exception(f"Trying to update a dead person: {person}")
        for rf in self._riskFactors:
            attr = getattr(person, "_" + rf)
            attr.append(x[rf + str(self._currentWave)])

        for treatment in self._treatments:
            attr = getattr(person, "_" + treatment)
            attr.append(x[treatment + str(self._currentWave)])

        # advance outcomes - this will add CV eath
        for outcomeName, outcomeType in {
            "stroke": OutcomeType.STROKE,
            "mi": OutcomeType.MI,
            "dementia": OutcomeType.DEMENTIA,
        }.items():
            if x[outcomeName + "Next"]:
                fatal = False if outcomeName == "dementia" else x[outcomeName + "Fatal"]
                # only one dementia event per person
                if outcomeName == "dementia" and person._dementia:
                    break
                else:
                    person.add_outcome_event(Outcome(outcomeType, fatal))

        person._gcp.append(x.gcp)
        person._qalys.append(x.qalyNext)
        person._bpMedsAdded.append(x.bpMedsAddedNext)

        # add non CV death to person objects
        if x.nonCVDeathNext:
            person._alive.append(False)

        # only advance age in survivors
        if not x.deadNext:
            person._age.append(person._age[-1] + 1)
            person._alive.append(True)
        return person

    def move_people_df_forward(self, df):
        factorsToChange = copy.copy(self._riskFactors)
        factorsToChange.extend(self._treatments)
        
        newVariables = {}
        
        for rf in factorsToChange:
            # the curent value is stored in the variable name
            df[rf] = df[rf + "Next"]
            newVariables[rf + str(self._currentWave)] = df[rf + "Next"]
            df["mean" + rf.capitalize()] = (
                df["mean" + rf.capitalize()] * (df["totalYearsInSim"] + 1) + df[rf + "Next"]
            ) / (df["totalYearsInSim"] + 2)
        for outcome in ["mi", "stroke"]:
            df[outcome + "InSim"] = df[outcome + "InSim"] | df[outcome + "Next"]
            newVariables[outcome + str(self._currentWave)] = df[outcome + "Next"]
        df["dead"] = df["dead"] | df["deadNext"]
        newVariables["dead" + str(self._currentWave)] = df["deadNext"]

        df["totalYearsInSim"] = df["totalYearsInSim"] + 1
        df["current_diabetes"] = df["a1c"] > 6.5
        df['gcp'] = df['gcpNext']
        df["gfr"] = df.apply(GFREquation().get_gfr_for_person_vectorized, axis="columns")
        df["current_bp_treatment"] = df["antiHypertensiveCount"] >= 1
        df["totalQalys"] = df["totalQalys"] + df["qalyNext"]
        df["bpMedsAdded"] = df["bpMedsAddedNext"]
        df["totalBPMedsAdded"] = df["totalBPMedsAddedNext"]
        
        # assign ages for new events
        # df.loc[(df.ageAtFirstStroke.isnull()) & (df.strokeNext), 'ageAtFirstStroke'] = df.age
        # df.loc[(df.ageAtFirstMI.isnull()) & (df.miNext), 'ageAtFirstMI'] = df.age
        # df.loc[(df.ageAtFirstDementia.isnull()) & (df.dementiaNext), 'ageAtFirstDementia'] = df.age

        return pd.concat([df.reset_index(drop=True), pd.DataFrame(newVariables).reset_index(drop=True)], axis='columns', ignore_index=False)


    def set_bp_treatment_strategy(self, bpTreatmentStrategy):
        self._bpTreatmentStrategy = bpTreatmentStrategy
        for person in self._people:
            person._bpTreatmentStrategy = bpTreatmentStrategy

    def apply_recalibration_standards(self, recalibration_df):
        # treatment_standard is a dictionary of outcome types and effect sizees
        if self._bpTreatmentStrategy is not None:
            if self._bpTreatmentStrategy.get_treatment_recalibration_for_population() is not None:
                recalibration_df = self.recalibrate_bp_treatment(recalibration_df)
        return recalibration_df

    # should the estiamted treatment effect be based on the number of events in the population
    # (i.e. # events treated / # of events untreated)
    # of should it be based on teh predicted reisks
    # the problem with the first approach is that its going to depend a lot on small sample sizes...
    # and we don't necessarily want to take out that random error...that random error reflects
    # genuine uncertainty.
    # so, i thikn it should be based on the model-predicted risks...

    def recalibrate_bp_treatment(self, recalibration_df):
        #logging.info(f"*** before recalibration, mi count: {recalibration_df.miNext.sum()}, stroke count: {recalibration_df.strokeNext.sum()}")
        treatment_outcome_standard = (
            self._bpTreatmentStrategy.get_treatment_recalibration_for_population()
        )
        # estimate risk for the people alive at the start of the wave
        recalibration_df= self.estimate_risks(recalibration_df, "treated")

        # rollback the treatment effect.
        # redtag: would like to apply to this to a deeply cloned population, but i can't get that to work
        # so, for now, applying it to the actual population and then rolling the effect back later.
        recalibration_df = recalibration_df.apply(
            self._bpTreatmentStrategy.rollback_changes_vectorized, axis="columns"
        )

        # estimate risk after applying the treamtent effect
        recalibration_df = self.estimate_risks(recalibration_df, "untreated")

        # hacktag related to above — roll back the treatment effect...
        recalibration_df = recalibration_df.apply(
            self._bpTreatmentStrategy.get_changes_vectorized, axis="columns"
        )
        #logging.info(f"######## BP meds After redo: {recalibration_df.totalBPMedsAddedNext.value_counts()}")
        totalBPMedsAddedCapped = recalibration_df['totalBPMedsAddedNext']
        totalBPMedsAddedCapped.loc[totalBPMedsAddedCapped >= BaseTreatmentStrategy.MAX_BP_MEDS] = BaseTreatmentStrategy.MAX_BP_MEDS
        #recalibration_df.loc[recalibration_df['totalBPMedsAddedNext'] >= BaseTreatmentStrategy.MAX_BP_MEDS, 'totalBPMedsAddedCapped'] = BaseTreatmentStrategy.MAX_BP_MEDS
        recalibrationVars = {"rolledBackEventType" : [None] * len(recalibration_df),
                            'totalBPMedsAddedCapped' : totalBPMedsAddedCapped}       
        recalibration_df = pd.concat([recalibration_df.reset_index(drop=True), pd.DataFrame(recalibrationVars).reset_index(drop=True)], axis='columns', ignore_index=False)
        
        #recalibration_df["rolledBackEventType"] = None
        # total meds added represents the total number of medication effects that we'll recalibrate for
        # it is the lesser of the total number of BP meds actually added (totalBpMedsAdded) or the max cap
        # so, if a treamtent strategy adds 10 medications, they'll effect the BP...but, they 
        # wont' have an additional efect on event reduction over the medication cap
        #logging.info(f"######## BP meds After redo: {recalibration_df.totalBPMedsAddedNext.value_counts()}")

        # recalibrate within each group of added medicaitons so that we can stratify the treamtnet effects
        for i in range(1, BaseTreatmentStrategy.MAX_BP_MEDS + 1):
            #logging.info(f"Roll back for med count: {i}")
            recalibrationPopForMedCount = recalibration_df.loc[recalibration_df.totalBPMedsAddedCapped == i]
            # the change standards are for a single medication
            recalibration_standard_for_med_count = treatment_outcome_standard.copy()
            for key, value in recalibration_standard_for_med_count.items():
                recalibration_standard_for_med_count[key] = value**i

            if len(recalibrationPopForMedCount) > 0:
                # recalibrate stroke
                recalibratedForMedCount = self.create_or_rollback_events_to_correct_calibration(
                    recalibration_standard_for_med_count,
                    "treatedstrokeRisks",
                    "untreatedstrokeRisks",
                    "stroke",
                    OutcomeType.STROKE,
                    CVOutcomeDetermination()._will_have_fatal_stroke,
                    recalibrationPopForMedCount,
                )

                recalibration_df.loc[
                    recalibratedForMedCount.index, "strokeNext"
                ] = recalibratedForMedCount["strokeNext"]
                recalibration_df.loc[
                    recalibratedForMedCount.index, "strokeFatal"
                ] = recalibratedForMedCount["strokeFatal"]
                recalibration_df.loc[
                    recalibratedForMedCount.index, "deadNext"
                ] = recalibratedForMedCount["deadNext"]
                recalibration_df.loc[
                    recalibratedForMedCount.index, "ageAtFirstStroke"
                ] = recalibratedForMedCount["ageAtFirstStroke"]
                recalibration_df.loc[
                    recalibratedForMedCount.index, "rolledBackEventType"
                ] = recalibratedForMedCount["rolledBackEventType"]

                # recalibrate MI
                recalibratedForMedCount = self.create_or_rollback_events_to_correct_calibration(
                    recalibration_standard_for_med_count,
                    "treatedmiRisks",
                    "untreatedmiRisks",
                    "mi",
                    OutcomeType.MI,
                    CVOutcomeDetermination()._will_have_fatal_mi,
                    recalibrationPopForMedCount,
                )
                recalibration_df.loc[
                    recalibratedForMedCount.index, "miNext"
                ] = recalibratedForMedCount["miNext"]
                recalibration_df.loc[
                    recalibratedForMedCount.index, "miFatal"
                ] = recalibratedForMedCount["miFatal"]
                recalibration_df.loc[
                    recalibratedForMedCount.index, "deadNext"
                ] = recalibratedForMedCount["deadNext"]
                recalibration_df.loc[
                    recalibratedForMedCount.index, "ageAtFirstMI"
                ] = recalibratedForMedCount["ageAtFirstMI"]
                recalibration_df.loc[
                    recalibratedForMedCount.index, "rolledBackEventType"
                ] = recalibratedForMedCount["rolledBackEventType"]

        #logging.info(f"*** after recalibration, mi count: {recalibration_df.miNext.sum()}, stroke count: {recalibration_df.strokeNext.sum()}")
        recalibration_df.drop(columns=['treatedcombinedRisks', 'treatedstrokeProbabilities', 'treatedstrokeRisks', 'treatedmiRisks', 
                    'untreatedcombinedRisks', 'untreatedstrokeProbabilities', 'untreatedstrokeRisks', 'untreatedmiRisks', 'totalBPMedsAddedCapped', 'rolledBackEventType'], inplace=True)
        return recalibration_df

    def estimate_risks(self, recalibration_df, prefix):
        combinedRisks = recalibration_df.apply(
            self._outcome_model_repository.get_risk_for_person_vectorized,
            axis="columns",
            args=(OutcomeModelType.CARDIOVASCULAR, 1),
        )
        strokeProbabilities = recalibration_df.apply(
            CVOutcomeDetermination().get_stroke_probability, axis="columns", vectorized=True
        )
        strokeRisks = (
            combinedRisks * strokeProbabilities
        )
        miRisks = combinedRisks * (
            1 - strokeProbabilities
        )

        risksAndProbs = pd.DataFrame({prefix + "combinedRisks" : combinedRisks, prefix + "strokeProbabilities" : strokeProbabilities,
                        prefix + "strokeRisks" : strokeRisks, prefix + "miRisks" : miRisks})
        
        return pd.concat([recalibration_df.reset_index(drop=True), risksAndProbs.reset_index(drop=True)], axis='columns', ignore_index=False)


    def create_or_rollback_events_to_correct_calibration(
        self,
        treatment_outcome_standard,
        treatedRiskVar,
        untreatedRiskVar,
        eventVar,
        outcomeType,
        fatalityDetermination,
        recalibration_pop,
    ):
        #logging.info(f"create or rollback {outcomeType}, standard: {treatment_outcome_standard[outcomeType]}")

        modelEstimatedRR = (
            recalibration_pop[treatedRiskVar].mean() / recalibration_pop[untreatedRiskVar].mean()
        )
        nextEventVar = eventVar + "Next"
        ageAtFirstVar = (
            "ageAtFirst" + eventVar.upper()
            if len(eventVar) == 2
            else "ageAtFirst" + eventVar.capitalize()
        )
        # use the delta between that effect and the calibration standard to recalibrate the pop.
        delta = modelEstimatedRR - treatment_outcome_standard[outcomeType]
        eventsForPeople = recalibration_pop.loc[recalibration_pop[nextEventVar] == True]

        numberOfEventStatusesToChange = abs(
            int(round(delta * len(eventsForPeople) / modelEstimatedRR))
        )
        nonEventsForPeople = recalibration_pop.loc[recalibration_pop[nextEventVar] == False]
        # key assumption: "treatment" is applied to a population as opposed to individuals within a population
        # analyses can be setup either way...build two populations and then set different treatments
        # or build a ur-population adn then set different treamtents within them
        # this is, i thikn, the first time where a coding decision is tied to one of those structure.
        # it would not, i think, be hard to change. but, just spelling it out here.

        # if negative, the model estimated too few events, if positive, too mnany
        #logging.info(f"bp recalibration, delta: {delta}, number of statuses to change: {numberOfEventStatusesToChange}")

        if delta < 0:
            if numberOfEventStatusesToChange > 0:
                new_events = nonEventsForPeople.sample(
                    n=numberOfEventStatusesToChange,
                    replace=False,
                    weights=nonEventsForPeople[untreatedRiskVar].values,
                )
                recalibration_pop.loc[new_events.index, nextEventVar] = True
                recalibration_pop.loc[
                    new_events.index, eventVar + "Fatal"
                ] = recalibration_pop.loc[new_events.index].apply(
                    fatalityDetermination, axis="columns", args=(True,)
                )
                recalibration_pop.loc[new_events.index, ageAtFirstVar] = np.fmin(
                    recalibration_pop.loc[new_events.index].age,
                    recalibration_pop.loc[new_events.index][ageAtFirstVar],
                )

        elif delta > 0:
            if numberOfEventStatusesToChange > len(eventsForPeople):
                numberOfEventStatusesToChange = len(eventsForPeople)
            if numberOfEventStatusesToChange > 0:
                events_to_rollback = eventsForPeople.sample(
                    n=numberOfEventStatusesToChange,
                    replace=False,
                    weights=1 - eventsForPeople[untreatedRiskVar].values,
                )
                recalibration_pop.loc[events_to_rollback.index, nextEventVar] = False
                recalibration_pop.loc[events_to_rollback.index, eventVar + "Fatal"] = False
                recalibration_pop.loc[events_to_rollback.index, "deadNext"] = False
                recalibration_pop.loc[events_to_rollback.index, ageAtFirstVar] = np.minimum(
                    recalibration_pop.loc[events_to_rollback.index].age,
                    recalibration_pop.loc[events_to_rollback.index][ageAtFirstVar],
                )
                recalibration_pop.loc[events_to_rollback.index, "rolledBackEventType"] = eventVar
        return recalibration_pop

    def get_people_alive_at_the_start_of_the_current_wave(self):
        return self.get_people_alive_at_the_start_of_wave(self._currentWave)

    def get_people_alive_at_the_start_of_wave(self, wave):
        peopleAlive = []
        for person in self._people:
            if person.alive_at_start_of_wave(wave):
                peopleAlive.append(person)
        return pd.Series(peopleAlive)

    def get_people_that_are_currently_alive(self):
        return pd.Series([not person.is_dead() for _, person in self._people.items()])

    def get_number_of_patients_currently_alive(self):
        self.get_people_that_are_currently_alive().sum()

    def get_events_in_most_recent_wave(self, eventType):
        peopleWithEvents = []
        for _, person in self._people.items():
            if person.has_outcome_at_age(eventType, person._age[-1]):
                peopleWithEvents.append(person)
        return peopleWithEvents

    def generate_starting_mean_patient(self):
        df = self.get_people_initial_state_as_dataframe()
        return Person(
            age=int(round(df.age.mean())),
            gender=NHANESGender(df.gender.mode()),
            raceEthnicity=NHANESRaceEthnicity(df.raceEthnicity.mode()),
            sbp=df.sbp.mean(),
            dbp=df.dbp.mean(),
            a1c=df.a1c.mean(),
            hdl=df.hdl.mean(),
            totChol=df.totChol.mean(),
            bmi=df.bmi.mean(),
            ldl=df.ldl.mean(),
            trig=df.trig.mean(),
            waist=df.waist.mean(),
            anyPhysicalActivity=df.anyPhysicalActivity.mode(),
            education=Education(df.education.mode()),
            smokingStatus=SmokingStatus(df.smokingStatus.mode()),
            antiHypertensiveCount=int(round(df.antiHypetensiveCount().mean())),
            statin=df.statin.mode(),
            otherLipidLoweringMedicationCount=int(
                round(df.otherLipidLoweringMedicationCount.mean())
            ),
            initializeAfib=(lambda _: False),
            selfReportStrokeAge=None,
            selfReportMIAge=None,
            randomEffects=self._outcome_model_repository.get_random_effects(),
        )

    def get_event_rate_in_simulation(self, eventType, duration):
        events = [
            person.has_outcome_during_simulation_prior_to_wave(eventType, duration)
            for i, person in self._people.items()
        ]
        totalTime = [
            person.years_in_simulation() if person.years_in_simulation() < duration else duration
            for i, person in self._people.items()
        ]
        return np.array(events).sum() / np.array(totalTime).sum()

    def get_raw_incidence_by_age(self, eventType):
        popDF = self.get_people_current_state_as_dataframe()

        for year in range(1, self._totalWavesAdvanced + 1):
            eventVarName = "event" + str(year)
            ageVarName = "age" + str(year)
            popDF[ageVarName] = popDF["baseAge"] + year
            popDF[eventVarName] = [
                person.has_outcome_during_wave(year, OutcomeType.DEMENTIA)
                for person in self._people
            ]

        popDF = popDF[
            list(filter(lambda x: x.startswith("age") or x.startswith("event"), popDF.columns))
        ]
        popDF["id"] = popDF.index
        popDF.drop(columns=["age"], inplace=True)
        longAgesEvents = pd.wide_to_long(df=popDF, stubnames=["age", "event"], i="id", j="wave")

        agesAliveDF = self.get_people_current_state_as_dataframe()
        for year in range(1, self._totalWavesAdvanced + 1):
            aliveVarName = "alive" + str(year)
            ageVarName = "age" + str(year)
            agesAliveDF[ageVarName] = agesAliveDF["baseAge"] + year
            agesAliveDF[aliveVarName] = [
                person.alive_at_start_of_wave(year) for i, person in self._people.items()
            ]

        agesAliveDF = agesAliveDF[
            list(
                filter(lambda x: x.startswith("age") or x.startswith("alive"), agesAliveDF.columns)
            )
        ]
        agesAliveDF.drop(columns=["age"], inplace=True)
        agesAliveDF["id"] = agesAliveDF.index
        longAgesDead = pd.wide_to_long(
            df=agesAliveDF, stubnames=["age", "alive"], i="id", j="wave"
        )
        return (
            longAgesEvents.groupby("age")["event"].sum()
            / longAgesDead.groupby("age")["alive"].sum()
        )

    # refactorrtag: we should probably build a specific class that loads data files...

    def build_age_standard(self, yearOfStandardizedPopulation):
        if yearOfStandardizedPopulation in Population._ageStandards:
            return copy.deepcopy(Population._ageStandards[yearOfStandardizedPopulation])

        datafile_path = get_absolute_datafile_path("us.1969_2017.19ages.adjusted.txt")
        ageStandard = pd.read_csv(datafile_path, header=0, names=["raw"])
        # https://seer.cancer.gov/popdata/popdic.html
        ageStandard["year"] = ageStandard["raw"].str[0:4]
        ageStandard["year"] = ageStandard.year.astype(int)
        # format changes in 1990...so, we'll go forward from there...
        ageStandard = ageStandard.loc[ageStandard.year >= 1990]
        ageStandard["state"] = ageStandard["raw"].str[4:6]
        ageStandard["state"] = ageStandard["raw"].str[4:6]
        # 1 = white, 2 = black, 3 = american indian/alaskan, 4 = asian/pacific islander
        ageStandard["race"] = ageStandard["raw"].str[13:14]
        ageStandard["hispanic"] = ageStandard["raw"].str[14:15]
        ageStandard["female"] = ageStandard["raw"].str[15:16]
        ageStandard["female"] = ageStandard["female"].astype(int)
        ageStandard["female"] = ageStandard["female"].replace({1: 0, 2: 1})
        ageStandard["ageGroup"] = ageStandard["raw"].str[16:18]
        ageStandard["ageGroup"] = ageStandard["ageGroup"].astype(int)
        ageStandard["standardPopulation"] = ageStandard["raw"].str[18:26]
        ageStandard["standardPopulation"] = ageStandard["standardPopulation"].astype(int)
        ageStandard["lowerAgeBound"] = (ageStandard.ageGroup - 1) * 5
        ageStandard["upperAgeBound"] = (ageStandard.ageGroup * 5) - 1
        ageStandard["lowerAgeBound"] = ageStandard["lowerAgeBound"].replace({-5: 0, 0: 1})
        ageStandard["upperAgeBound"] = ageStandard["upperAgeBound"].replace({-1: 0, 89: 150})
        ageStandardYear = ageStandard.loc[ageStandard.year == yearOfStandardizedPopulation]
        ageStandardGroupby = ageStandardYear[
            ["female", "standardPopulation", "lowerAgeBound", "upperAgeBound", "ageGroup"]
        ].groupby(["ageGroup", "female"])
        ageStandardHeaders = ageStandardGroupby.first()[["lowerAgeBound", "upperAgeBound"]]
        ageStandardHeaders["female"] = ageStandardHeaders.index.get_level_values(1)
        ageStandardPopulation = ageStandardYear[["female", "standardPopulation", "ageGroup"]]
        ageStandardPopulation = ageStandardPopulation.groupby(["ageGroup", "female"]).sum()
        ageStandardPopulation = ageStandardHeaders.join(ageStandardPopulation, how="inner")
        # cache the age standard populations...they're not that big and it takes a while
        # to build one
        ageStandardPopulation["outcomeCount"] = 0
        ageStandardPopulation["simPersonYears"] = 0
        ageStandardPopulation["simPeople"] = 0
        Population._ageStandards[yearOfStandardizedPopulation] = copy.deepcopy(
            ageStandardPopulation
        )

        return ageStandardPopulation

    def tabulate_age_specific_rates(self, ageStandard):
        ageStandard["percentStandardPopInGroup"] = ageStandard["standardPopulation"] / (
            ageStandard["standardPopulation"].sum()
        )
        ageStandard["ageSpecificRate"] = (
            ageStandard["outcomeCount"] * 100000 / ageStandard["simPersonYears"]
        )
        ageStandard["ageSpecificContribution"] = (
            ageStandard["ageSpecificRate"] * ageStandard["percentStandardPopInGroup"]
        )
        return ageStandard

    # return the age standardized # of events per 100,000 person years
    def calculate_mean_age_sex_standardized_incidence(
        self,
        outcomeType,
        yearOfStandardizedPopulation=2016,
        subPopulationSelector=None,
        subPopulationDFSelector=None,
    ):

        # the age selector picks the first outcome (_outcomes(outcomeTYpe)[0]) and the age is the
        # first element within the returned tuple (the second [0])
        events = self.calculate_mean_age_sex_standardized_event(
            lambda x: x.has_outcome_during_simulation(outcomeType),
            lambda x: x.get_outcomes_during_simulation(outcomeType)[0][0] - x._age[0] + 1,
            yearOfStandardizedPopulation,
            subPopulationSelector,
            subPopulationDFSelector,
        )
        return (
            pd.Series([event[0] for event in events]).mean(),
            pd.Series([event[1] for event in events]).sum(),
        )

    def calculate_mean_age_sex_standardized_mortality(self, yearOfStandardizedPopulation=2016):
        events = self.calculate_mean_age_sex_standardized_event(
            lambda x: x.is_dead(), lambda x: x.years_in_simulation(), yearOfStandardizedPopulation
        )
        return pd.Series([event[0] for event in events]).mean()

    def get_events_for_event_type(
        self,
        eventSelector,
        eventAgeIdentifier,
        subPopulationSelector=None,
        subPopulationDFSelector=None,
    ):
        # build a dataframe to represent the population
        popDF = self.get_people_current_state_as_dataframe(parallel=False)
        popDF["female"] = popDF["gender"] - 1

        # calculated standardized event rate for each year
        for year in range(1, self._totalWavesAdvanced + 1):
            eventVarName = "event" + str(year)
            ageVarName = "age" + str(year)
            popDF[ageVarName] = popDF["baseAge"] + year
            if subPopulationDFSelector is not None:
                popDF["subpopFilter"] = popDF.apply(subPopulationDFSelector, axis="columns")
                popDF = popDF.loc[popDF.subpopFilter == 1]
            popDF[eventVarName] = [
                eventSelector(person) and eventAgeIdentifier(person) == year
                for person in filter(subPopulationSelector, self._people)
            ]
        return popDF

    def calculate_mean_age_sex_standardized_event(
        self,
        eventSelector,
        eventAgeIdentifier,
        yearOfStandardizedPopulation=2016,
        subPopulationSelector=None,
        subPopulationDFSelector=None,
    ):
        # calculated standardized event rate for each year
        popDF = self.get_events_for_event_type(
            eventSelector, eventAgeIdentifier, subPopulationSelector, subPopulationDFSelector
        )
        popDF["female"] = popDF["gender"] - 1

        eventsPerYear = []

        for year in range(1, self._totalWavesAdvanced + 1):
            eventVarName = "event" + str(year)
            ageVarName = "age" + str(year)
            popDF[ageVarName] = popDF["baseAge"] + year
            if subPopulationDFSelector is not None:
                popDF["subpopFilter"] = popDF.apply(subPopulationDFSelector, axis="columns")
                popDF = popDF.loc[popDF.subpopFilter == 1]
            popDF[eventVarName] = [
                eventSelector(person) and eventAgeIdentifier(person) == year
                for person in filter(subPopulationSelector, self._people)
            ]
            dfForAnnualEventCalc = popDF[[ageVarName, "female", eventVarName]]
            dfForAnnualEventCalc.rename(
                columns={ageVarName: "age", eventVarName: "event"}, inplace=True
            )
            eventsPerYear.append(
                self.get_standardized_events_for_year(
                    dfForAnnualEventCalc, yearOfStandardizedPopulation
                )
            )

        return eventsPerYear

    def get_standardized_events_for_year(self, peopleDF, yearOfStandardizedPopulation):
        ageStandard = self.build_age_standard(yearOfStandardizedPopulation)
        # limit to the years where there are people
        # if the simulation runs for 50 years...there will be empty cells in all of the
        # young person categories
        ageStandard = ageStandard.loc[ageStandard.lowerAgeBound >= peopleDF.age.min()]

        # take the dataframe of peoplein teh population and tabnulate events relative
        # to the age standard (max age is 85 in the age standard...)
        peopleDF.loc[peopleDF["age"] > 85, "age"] = 85
        peopleDF.loc[:, "ageGroup"] = (peopleDF["age"] // 5) + 1
        peopleDF.loc[:, "ageGroup"] = peopleDF["ageGroup"].astype(int)
        # tabulate events by group
        eventsByGroup = peopleDF.groupby(["ageGroup", "female"])["event"].sum()
        personYears = peopleDF.groupby(["ageGroup", "female"])["age"].count()
        # set those events on the age standard
        ageStandard["outcomeCount"] = eventsByGroup
        ageStandard["simPersonYears"] = personYears

        ageStandard = self.tabulate_age_specific_rates(ageStandard)
        return (ageStandard.ageSpecificContribution.sum(), ageStandard.outcomeCount.sum())

    def get_person_attributes_from_person(self, person, timeVaryingCovariates):
        attrForPerson = person.get_current_state_as_dict()
        try:
            attrForPerson["populationIndex"] = person._populationIndex
        except AttributeError:
            pass  # populationIndex is not necessary for advancing; can continue safely without it

        timeVaryingAttrsForPerson = person.get_tvc_state_as_dict(timeVaryingCovariates)
        attrForPerson.update(timeVaryingAttrsForPerson)
        return attrForPerson

    #def get_people_current_state_as_dataframe(self, parallel=True): #CD modified
    def get_people_current_state_as_dataframe(self, parallel=False):
        if parallel:
            pandarallel.initialize(verbose=1)
            return pd.DataFrame(
                list(
                    self._people.parallel_apply(
                        self.get_person_attributes_from_person,
                        timeVaryingCovariates=self._timeVaryingCovariates,
                    )
                )
            )
        else:
            return pd.DataFrame(
                list(
                    self._people.apply(
                        self.get_person_attributes_from_person,
                        timeVaryingCovariates=self._timeVaryingCovariates,
                    )
                )
            )

    def get_people_current_state_and_summary_as_dataframe(self):
        df = self.get_people_current_state_as_dataframe()
        # iterate through variables that vary over time
        tvcMeans = {}
        for var in self._timeVaryingCovariates:
            tvcMeans["mean" + var.capitalize()] = [
                pd.Series(getattr(person, "_" + var)).mean()
                for i, person in self._people.items()
            ]   
        return pd.concat([df, pd.DataFrame(tvcMeans)], axis=1)

    def get_people_initial_state_as_dataframe(self):
        return pd.DataFrame(
            {
                "age": [person._age[0] for person in self._people],
                "gender": [person._gender for person in self._people],
                "raceEthnicity": [person._raceEthnicity for person in self._people],
                "sbp": [person._sbp[0] for person in self._people],
                "dbp": [person._dbp[0] for person in self._people],
                "a1c": [person._a1c[0] for person in self._people],
                "hdl": [person._hdl[0] for person in self._people],
                "ldl": [person._ldl[0] for person in self._people],
                "trig": [person._trig[0] for person in self._people],
                "totChol": [person._totChol[0] for person in self._people],
                "creatinine": [person._creatinine[0] for person in self._people],
                "bmi": [person._bmi[0] for person in self._people],
                "anyPhysicalActivity": [person._anyPhysicalActivity[0] for person in self._people],
                "education": [person._education.value for person in self._people],
                "afib": [person._afib[0] for person in self._people],
                "antiHypertensiveCount": [
                    person._antiHypertensiveCount[0] for person in self._people
                ],
                "statin": [person._statin[0] for person in self._people],
                "otherLipidLoweringMedicationCount": [
                    person._otherLipidLoweringMedicationCount[0] for person in self._people
                ],
                "waist": [person._waist[0] for person in self._people],
                "smokingStatus": [person._smokingStatus for person in self._people],
                "miPriorToSim": [person._selfReportMIPriorToSim for person in self._people],
                "strokePriorToSim": [
                    person._selfReportStrokePriorToSim for person in self._people
                ],
                "totalQalys": [np.array(person._qalys).sum() for person in self._people],
                "totalBPMedsAdded" : [np.zeros(len(self._people))],
                "bpMedsAdded" : [np.zeros(len(self._people))]
            }
        )

    def get_summary_df(self):
        data = {}
        for year in range(1,self._currentWave+1):
            data[f'mi{year}'] = [x.has_mi_during_wave(year) for _, x in self._people.items()]
            data[f'stroke{year}'] = [x.has_stroke_during_wave(year) for _, x in self._people.items()]
            data[f'dead{year}'] = [x.dead_at_end_of_wave(year) for _, x in self._people.items()]
            data[f'dementia{year}'] = [x.has_outcome_during_wave(year, OutcomeType.DEMENTIA) for _, x in self._people.items()]
            data[f'gcp{year}'] = [np.nan if x.dead_at_start_of_wave(year) else x._gcp[year-1] for _, x in self._people.items()]
            data[f'sbp{year}'] = [np.nan if x.dead_at_start_of_wave(year) else x._sbp[year-1] for _, x in self._people.items()]
            data[f'dbp{year}'] = [np.nan if x.dead_at_start_of_wave(year) else x._dbp[year-1] for _, x in self._people.items()]  
            data[f'bpMeds{year}'] = [np.nan if x.dead_at_start_of_wave(year) else x._antiHypertensiveCount[year-1] for _, x in self._people.items()]
            data[f'bpMedsAdded{year}'] = [np.nan if x.dead_at_start_of_wave(year) else x._bpMedsAdded[year-1] for _, x in self._people.items()]
            data[f'totalBPMeds{year}'] = [np.nan if x.dead_at_start_of_wave(year) else x._bpMedsAdded[year-1]+x._antiHypertensiveCount[year-1] for _, x in self._people.items()]
            data[f'totalBPMedsAdded{year}'] = [np.array(x._bpMedsAdded).sum() for _, x in self._people.items()]

        data['baseAge'] = [x._age[0] for _, x in self._people.items()]
        data['id'] = [x._populationIndex  for _, x in self._people.items()] 
        data['finalAge'] = [x._age[-1]  for _, x in self._people.items()]
        data['education'] = [x._education  for _, x in self._people.items()]
        data['gender'] = [x._gender  for _, x in self._people.items()]
        data['raceEthnicity'] = [x._raceEthnicity  for _, x in self._people.items()]
        data['smokingStatus'] = [x._smokingStatus  for _, x in self._people.items()]

        data['baselineSBP'] = [x._sbp[0] for _, x in self._people.items()]
        data['baselineDBP'] = [x._dbp[0] for _, x in self._people.items()]
        data['black'] = [x._raceEthnicity==4 for _, x in self._people.items()]
        data['dementiaFreeYears'] = [x.get_age_at_first_outcome(OutcomeType.DEMENTIA) - x._age[0] if x._dementia else x._age[-1] - x._age[0]  for _, x in self._people.items()]
        data['deadAtEndOfSim'] = [x._alive[-1]==False for _, x in self._people.items()]
        return pd.DataFrame(data)
    


def initializeAFib(person):
    model = load_regression_model("BaselineAFibModel")
    statsModel = StatsModelLogisticRiskFactorModel(model)
    return statsModel.estimate_next_risk(person)


def build_person(x, outcome_model_repository, randomEffects=None):
    return Person(
        age=x.age,
        gender=NHANESGender(int(x.gender)),
        raceEthnicity=NHANESRaceEthnicity(int(x.raceEthnicity)),
        sbp=x.meanSBP,
        dbp=x.meanDBP,
        a1c=x.a1c,
        hdl=x.hdl,
        ldl=x.ldl,
        trig=x.trig,
        totChol=x.tot_chol,
        bmi=x.bmi,
        waist=x.waist,
        anyPhysicalActivity=x.anyPhysicalActivity,
        smokingStatus=SmokingStatus(int(x.smokingStatus)),
        alcohol=AlcoholCategory.get_category_for_consumption(x.alcoholPerWeek),
        education=Education(int(x.education)),
        antiHypertensiveCount=x.antiHypertensive,
        statin=x.statin,
        otherLipidLoweringMedicationCount=x.otherLipidLowering,
        creatinine=x.serumCreatinine,
        initializeAfib=initializeAFib,
        initializationRepository=InitializationRepository(),
        selfReportStrokeAge=x.selfReportStrokeAge,
        selfReportMIAge=np.random.randint(18, x.age)
        if x.selfReportMIAge == 99999
        else x.selfReportMIAge,
        randomEffects=outcome_model_repository.get_random_effects() if randomEffects is None else randomEffects,
        dfIndex=x.name,
        diedBy2015=x.diedBy2015 == True,
    )


def build_people_using_nhanes_for_sampling(
    nhanes, n, outcome_model_repository, filter=None, random_seed=None, weights=None
):
    if weights is None:
        weights = nhanes.WTINT2YR
    repeated_sample = nhanes.sample(n, weights=weights, random_state=random_seed, replace=True)
    pandarallel.initialize(verbose=1)
    people = repeated_sample.parallel_apply(
        build_person, outcome_model_repository=outcome_model_repository, axis="columns"
    )

    for i in range(0, len(people)):
        people.iloc[i]._populationIndex = i

    if filter is not None:
        people = people.loc[people.apply(filter)]

    return people

class NHANESDirectSamplePopulation(Population):
    """Simple base class to sample with replacement from 2015/2016 NHANES"""

    def __init__(
        self,
        n,
        year,
        filter=None,
        generate_new_people=True,
        model_reposistory_type="cohort",
        random_seed=None,
        weights=None,
    ):

        nhanes = pd.read_stata("microsim/data/fullyImputedDataset.dta")
        nhanes = nhanes.loc[nhanes.year == year]
        self._outcome_model_repository = OutcomeModelRepository()
        people = build_people_using_nhanes_for_sampling(
            nhanes,
            n,
            self._outcome_model_repository,
            filter=filter,
            random_seed=random_seed,
            weights=weights,
        )
        super().__init__(people)
        self._qaly_assignment_strategy = QALYAssignmentStrategy()
        self.n = n
        self.year = year
        self._initialize_risk_models(model_reposistory_type)

    def copy(self):
        newPop = NHANESDirectSamplePopulation(self.n, self.year, False)
        newPop._people = copy.deepcopy(self._people)
        return newPop

    def _initialize_risk_models(self, model_repository_type):
        if model_repository_type == "cohort":
            self._risk_model_repository = CohortRiskModelRepository()
        elif model_repository_type == "nhanes":
            self._risk_model_repository = NHANESRiskModelRepository()
        else:
            raise Exception("unknwon risk model repository type" + model_repository_type)

class PersonListPopulation(Population):
    def __init__(self, people):

        super().__init__(pd.Series(people))
        self.n = len(people)
        self._qaly_assignment_strategy = QALYAssignmentStrategy()
        self._outcome_model_repository = OutcomeModelRepository()
        self._risk_model_repository = CohortRiskModelRepository()
        # population index is used for efficiency in the population, need to set it on 
        # each person when a new population is setup
        for i, person in self._people.items():
            person._populationIndex = i
        # if the people have already been advanced, have the population start at that point
        self._currentWave = len(people[0]._age)-1


class NHANESAgeStandardPopulation(NHANESDirectSamplePopulation):
    def __init__(self, n, year):
        nhanes = pd.read_stata("microsim/data/fullyImputedDataset.dta")
        weights = self.get_weights(year)
        weights["gender"] = weights["female"] + 1
        weights = pd.merge(nhanes, weights, how="left", on=["age", "gender"]).popWeight
        super().__init__(n=n, year=year, weights=weights)

    def get_weights(self, year):
        standard = self.build_age_standard(year)
        return self.get_population_weighted_standard(standard)

    def get_population_weighted_standard(self, standard):
        rows = []
        for age in range(1, 151):
            for female in range(0, 2):
                dfRow = standard.loc[
                    (age >= standard.lowerAgeBound)
                    & (age <= standard.upperAgeBound)
                    & (standard.female == female)
                ]
                upperAge = dfRow["upperAgeBound"].values[0]
                lowerAge = dfRow["lowerAgeBound"].values[0]
                totalPop = dfRow["standardPopulation"].values[0]
                rows.append(
                    {"age": age, "female": female, "pop": totalPop / (upperAge - lowerAge + 1)}
                )
        df = pd.DataFrame(rows)
        df["popWeight"] = df["pop"] / df["pop"].sum()
        return df


class ClonePopulation(Population):
    """Simple class to build a "Population" seeded by mulitple copies of the same person"""

    def __init__(self, person, n):
        self._outcome_model_repository = OutcomeModelRepository()
        self._qaly_assignment_strategy = QALYAssignmentStrategy()
        self._risk_model_repository = CohortRiskModelRepository()
        self._initialization_repository = InitializationRepository()
        self.n = n

        # trying to make sure that cloned peopel are setup the same way as people are
        # when sampled from NHANES
        clonePerson = build_person(pd.Series({'age' : person._age[0],
                                            'gender': int(person._gender),
                                            'raceEthnicity':int(person._raceEthnicity),
                                            'meanSBP' :person._sbp[0],
                                            'meanDBP' :person._dbp[0],
                                            'a1c':person._a1c[0],
                                            'hdl':person._hdl[0],
                                            'ldl':person._ldl[0],
                                            'trig':person._trig[0],
                                            'tot_chol':person._totChol[0],
                                            'bmi':person._bmi[0],
                                            'waist':person._waist[0],
                                            'anyPhysicalActivity':person._anyPhysicalActivity[0],
                                            'smokingStatus': int(person._smokingStatus),
                                            'alcoholPerWeek': person._alcoholPerWeek[0],
                                            'education' : int(person._education),
                                            'antiHypertensive': person._antiHypertensiveCount[0],
                                            'statin': person._statin[0],
                                            'otherLipidLowering' : person._otherLipidLoweringMedicationCount[0],
                                            'serumCreatinine' : person._creatinine[0],
                                            'selfReportStrokeAge' : -1, 
                                            'selfReportMIAge' : -1,
                                            'diedBy2015' : 0}), 
                                            self._outcome_model_repository, 
                                            randomEffects=person._randomEffects)

        # for factors that were initialized on the first person, we have to set them the same way on teh clones
        clonePerson._afib[0] = person._afib[0]
        initializers = self._initialization_repository.get_initializers()
        for initializerName, _ in initializers.items():
            fromAttr = getattr(person, initializerName)
            toAttr = getattr(clonePerson, initializerName)
            toAttr.clear()
            toAttr.extend(fromAttr)

        
        people = pd.Series([copy.deepcopy(clonePerson) for i in range(0, n)])

        #pandarallel.initialize(verbose=1)
        for i in range(0, len(people)):
            people.iloc[i]._populationIndex = i
        super().__init__(people)

    

In [6]:
#trying a small population for now to get things moving quickly
pop = NHANESDirectSamplePopulation(10000, 1999)

In [7]:
alive, df = pop.advance_vectorized(1)

INFO:root:processing year: 0


In [8]:
vascularEventOrDeath = LogisticRegressionAnalysis(OutcomeAssessor([OutcomeAssessor.DEATH, OutcomeType.STROKE, OutcomeType.MI]))
anyEvent = LogisticRegressionAnalysis(OutcomeAssessor([OutcomeAssessor.DEATH, OutcomeType.STROKE, OutcomeType.MI, OutcomeType.DEMENTIA]))
death = LogisticRegressionAnalysis(OutcomeAssessor([OutcomeAssessor.DEATH]))
qalys = LinearRegressionAnalysis(AttributeOutcomeAssessor("_qalys", AssessmentMethod.SUM))
meanGCP = LinearRegressionAnalysis(AttributeOutcomeAssessor("_gcp", AssessmentMethod.MEAN))
lastGCP = LinearRegressionAnalysis(AttributeOutcomeAssessor("_gcp", AssessmentMethod.LAST))

In [9]:
#we will later define how many processes to launch, this is just for validation
#and so that we do not set the number of processes to a number greater than the number of available cores
print("code has access to cores ",len(os.sched_getaffinity(0)))

code has access to cores  4


In [10]:
#these are usually set from the input file, but to make things easier here 
#just set them on the notebook (which means no script will test for their meaningfulness)
inputSampleSizes = [500,1000] #for quick tests
#inputSampleSizes = [100, 200, 500, 1000, 5000, 10000, 15000, 20000]
inputDurations = [1,2] #for quick tests
#inputDurations = [3,5,10,15,20]
#inputDemThresholds = [2.4845839854531493e-08, 0.00018576417292080007, 0.0012917270937081809, 0.005870510161620921, 0.025739443157677927]
#inputCvThresholds = [1.167603052003119e-06, 0.0008193743487641601, 0.0026191105926681, 0.006091251406939853, 0.0132184645579298]
inputDemThresholds = [2.4845839854531493e-08] #for quick tests
inputCvThresholds = [1.167603052003119e-06] #for quick tests
inputTrialsetSize = 3
inputProcesses = 3

In [11]:
#perhaps these could go to trial_utils.py
#since I need them in the trial descriptions 

#this function can be generalized, passing the kind of distribution as argument,if other distributions may be useful
def randomizationSchemaUniform(x):
    return (np.random.uniform() < 0.5)

#riskDictionary will hold specific risk factor thresholds  
#each trial will set its own cv and dem thresholds on this dictionary during initialization
#also, should this function create the additional labels we include in results? 
#otherwise, is there a chance that the inclusion filter and labels be actually different?
def inclusionFilter(person,riskDictionary): #added riskDictionary to arguments
    #return RiskFilter({OutcomeModelType.DEMENTIA : dem , OutcomeModelType.CARDIOVASCULAR : cv}).exceedsThresholds(x)
    return RiskFilter(riskDictionary).exceedsThresholds(person)

In [12]:
class TrialDescription:
    #right now all trials of a set share a trial description and differ in their risk thresholds
    #as is, I cannot fully specify inclusionFilter (I mean I cannot provide arguments to inclusionFilter)
    #if we decide to have each trial a different trialdescription, differing in their risk 
    #factors and/or risk thresholds then we could fully 
    #specify here the inclusionFilter and risk thresholds
    def __init__(self, sampleSizes, durations, inclusionFilter, exclusionFilter, analyses, treatment,
                #python pickles everything when it transfers things to the processes and 
                #lambda functions are not picklable, so I need to define a function for this
                #randomizationSchema=lambda x : np.random.uniform() < 0.5): 
                randomizationSchema):
        self.sampleSizes = sampleSizes
        self.durations = durations
        self.inclusionFilter = inclusionFilter
        self.exclusionFilter = exclusionFilter
        self.randomizationSchema = randomizationSchema
        self.treatment = treatment
        self.analyses = analyses

In [13]:
class Trial:
    
    #included risk dictionary and additional labels as part of trial instances
    #riskDictionary is used during instance initialization in select_trial_population
    #and not used after that, so I am not sure if it needs to be an instance attribute or not
    #def __init__(self, trialDescription, targetPopulation):
    def __init__(self, trialDescription, targetPopulation, riskDictionary, additionalLabels=None): 
        self.trialDescription = trialDescription
        self.trialPopulation = self.select_trial_population(targetPopulation,
            trialDescription.inclusionFilter, trialDescription.exclusionFilter,riskDictionary)
        # select our patients from the population
        self.maxSampleSize = pd.Series(trialDescription.sampleSizes).max()
        self.treatedPop, self.untreatedPop = self.randomize(trialDescription.randomizationSchema)
        self.analyticResults = {}
        self.additionalLabels = additionalLabels #included labels as part of trial instances
    #def select_trial_population(self, targetPopulation, inclusionFilter, exclusionFilter):
    def select_trial_population(self, targetPopulation, inclusionFilter, exclusionFilter,riskDictionary):
        #filter takes only one argument so I need to rewrite this without using filter
        #so that I can pass both the person and the specific risk dictionary for this trial
        #filteredPeople = list(filter(inclusionFilter, list(targetPopulation._people))) 
        filteredPeople = [person for person in list(targetPopulation._people) 
                          if inclusionFilter(person,riskDictionary)]
        return PersonListPopulation(filteredPeople)
    def randomize(self, randomizationSchema):
        treatedList = []
        untreatedList = []
        randomizedCount = 0
        # might be able to make this more efficient by sampling from the filtered people...
        for i, person in self.trialPopulation._people.iteritems():
            while randomizedCount < self.maxSampleSize:
                if not person.is_dead():
                    if randomizationSchema(person):
                        treatedList.append(copy.deepcopy(person))
                    else:
                        untreatedList.append(copy.deepcopy(person))
                    randomizedCount+=1
                else:
                    continue
        return PersonListPopulation(treatedList), PersonListPopulation(untreatedList)
    def run(self):
        self.treatedPop._bpTreatmentStrategy = self.trialDescription.treatment
        lastDuration = 0
        for duration in self.trialDescription.durations:
            self.treatedDF, self.treatedAlive = self.treatedPop.advance_vectorized(duration-lastDuration)
            self.untreatedDF, self.untreatedAlive = self.untreatedPop.advance_vectorized(duration-lastDuration)
            self.analyze(duration, 
                         self.maxSampleSize, 
                         self.treatedPop._people.tolist(), 
                         self.untreatedPop._people.tolist())
            self.analyzeSmallerTrials(duration)
            lastDuration = duration
    def analyzeSmallerTrials(self, duration):
        for sampleSize in self.trialDescription.sampleSizes:
            numTrialsForSample = self.maxSampleSize // sampleSize
            for i in range(0, numTrialsForSample):
                if sampleSize==self.maxSampleSize:
                    continue
                sampleTreated = self.treatedPop._people.sample(int(sampleSize/2))
                sampleUntreated = self.untreatedPop._people.sample(int(sampleSize/2))
                self.analyze(duration, sampleSize, sampleTreated.tolist(), sampleUntreated.tolist())
    def analyze(self, duration, sampleSize, treatedPopList, untreatedPopList):
        for analysis in self.trialDescription.analyses:
            reg, se, pvalue = None, None, None
            try:
                reg, se, pvalue = analysis.analyze(treatedPopList, untreatedPopList)
            except PerfectSeparationError: # how to track these is not obvious, now now we'll enter "Nones"
                pass
            self.analyticResults[get_analysis_name(analysis, duration, sampleSize)] = \
                                                        {  'reg' : reg,
                                                            'se' : se,
                                                            'pvalue': pvalue,
                                                            'duration' : duration,
                                                            'sampleSize' : sampleSize,
                                                            'outcome' :  analysis.outcomeAssessor.get_name(),
                                                            'analysis' : analysis.name}
        return self.analyticResults


In [14]:
#I tried at the beginning to bring all of these functions in the existing Trialset class but it seemed too 
#complex of a task to start with
#so I started from scratch here another class
#with your input, I can make changes and eventually move these to Trialset

class TrialsetParallel:
    
    #demThresholds and cvThresholds are associated with riskDictionary and additionalLabels
    #there is a potential of merging and simplifying these perhaps
    def __init__(self, 
                 trialDescription, pop, trialCount, 
                 processesCount, 
                 demThresholds, cvThresholds, additionalLabels=None):
        self.trialDescription = trialDescription
        self.pop = pop
        self.trialCount = trialCount #this is per set of risk factors, this is not the total!
        self.processesCount = processesCount
        self.demThresholds = demThresholds
        self.cvThresholds = cvThresholds
        self.additionalLabels = additionalLabels  
    
    #prepare all arguments needed to run the entire trial set 
    def prepareArgsForParallelRun(self):
        argsForParallelRun = [] #arguments will be stored in a list of tuples (multiprocessing requirement)
        for dem in self.demThresholds:
            for cv in self.cvThresholds:
                for iTrial in range(0,self.trialCount):
                    #multiprocessing map functions accept only tuples to be sent to processes
                    #the description and population are now available to the class methods as attributes
                    #so the only thing I need to pass are the risk factors and thresholds
                    #for as many trials as the set asks for
                    argsForParallelRun.append(tuple((dem,cv)))
        return argsForParallelRun
    
    #having the population as an argument passed to each trial is the reason why python makes copies of the population
    #when it sends information to the cores running the processes with multiprocessing
    #currently, this adds a RAM cost = (number of processes) * (population size in RAM)
    #which suggests a solution: do not pass the actual population to the function the processes run but
    #some kind of index/link/list/function as an interface to the population
    def prepareRunAnalyzeTrial(self,dem,cv):
        
        print("starting a trial now") #just to see if the calculation is indeed done in parallel
        riskDictionary={OutcomeModelType.DEMENTIA : dem , OutcomeModelType.CARDIOVASCULAR : cv}
        
        trial = Trial(self.trialDescription, 
                      self.pop, 
                      riskDictionary,
                      additionalLabels={'dementiaRisk' : dem, 'cvRisk' : cv}) #initialize trial
        
        trial.run() #run trial
        
        resultsForTrial = [] 
        for analysis in trial.trialDescription.analyses:
            for duration in trial.trialDescription.durations:
                for sampleSize in trial.trialDescription.sampleSizes:
                    resultsForTrial.append(trial.analyticResults[get_analysis_name(analysis, duration, sampleSize)])
        dfForTrial = pd.DataFrame(resultsForTrial)
        if trial.additionalLabels is not None:
            for label, labelVal in trial.additionalLabels.items():
                dfForTrial[label] = labelVal
                
        #not sure if this is necessary in order to release the memory or not 
        #because python keeps track of references to objects anyway
        del trial 
        print("ending a trial now")
        return dfForTrial #return only what I need: results
    
    def run(self):
        
        with mp.Pool(self.processesCount) as myPool: #context manager will terminate this pool
            
            #run trials and get back the list of dataframes with the results (no instance is returned)
            resultsTrialsetList = myPool.starmap(self.prepareRunAnalyzeTrial, self.prepareArgsForParallelRun())
            #convert list of dataframes to a single dataframe
            resultsTrialsetPd = pd.concat(resultsTrialsetList).reset_index(drop=True)
            
        return resultsTrialsetPd

In [15]:
#this trial description will be the same for all trials in the trial set
#the risk factor thresholds are set in method prepareRunAnalyzeTrial of TrialsetParallel
desc = TrialDescription(sampleSizes= inputSampleSizes,
                                                durations = inputDurations,
                                                inclusionFilter=inclusionFilter,
                                                exclusionFilter=None,
                                                analyses=[death,
                                                        anyEvent,
                                                        vascularEventOrDeath,
                                                        qalys,
                                                        meanGCP,
                                                        lastGCP],
                                                treatment=SprintTreatment(),
                                                randomizationSchema=randomizationSchemaUniform)

In [16]:
#create the set
trialset = TrialsetParallel(desc,pop,inputTrialsetSize,inputProcesses,inputDemThresholds,inputCvThresholds)

In [17]:
if __name__ ==  '__main__': #launching processes with multiprocesses requires this
    results = trialset.run()

starting a trial now
starting a trial now
starting a trial now


  for i, person in self.trialPopulation._people.iteritems():
  for i, person in self.trialPopulation._people.iteritems():
  for i, person in self.trialPopulation._people.iteritems():
INFO:root:processing year: 1
INFO:root:processing year: 1
INFO:root:processing year: 1
A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  totalBPMedsAddedCapped.loc[totalBPMedsAddedCapped >= BaseTreatmentStrategy.MAX_BP_MEDS] = BaseTreatmentStrategy.MAX_BP_MEDS
A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  totalBPMedsAddedCapped.loc[totalBPMedsAddedCapped >= BaseTreatmentStrategy.MAX_BP_MEDS] = BaseTreatmentStrategy.MAX_BP_MEDS
A value is trying to be set on a copy of a slice from a DataFrame


INFO:root:processing year: 2
INFO:root:processing year: 2
INFO:root:processing year: 2
A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  totalBPMedsAddedCapped.loc[totalBPMedsAddedCapped >= BaseTreatmentStrategy.MAX_BP_MEDS] = BaseTreatmentStrategy.MAX_BP_MEDS
A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  totalBPMedsAddedCapped.loc[totalBPMedsAddedCapped >= BaseTreatmentStrategy.MAX_BP_MEDS] = BaseTreatmentStrategy.MAX_BP_MEDS
A value is trying to be set on a copy of a slice from a DataFrame

See the caveats in the documentation: https://pandas.pydata.org/pandas-docs/stable/user_guide/indexing.html#returning-a-view-versus-a-copy
  totalBPMedsAddedCapped.loc[totalBPMedsAdd



ending a trial now




ending a trial now
ending a trial now


In [18]:
results

Unnamed: 0,reg,se,pvalue,duration,sampleSize,outcome,analysis,dementiaRisk,cvRisk
0,3.089571e-16,1.417051,1.000000,1,500,death,logisticRegression-death,2.484584e-08,0.000001
1,-4.016604e-02,1.002007,0.968025,1,1000,death,logisticRegression-death,2.484584e-08,0.000001
2,,,,2,500,death,logisticRegression-death,2.484584e-08,0.000001
3,-4.476824e-01,0.915071,0.624677,2,1000,death,logisticRegression-death,2.484584e-08,0.000001
4,3.089571e-16,1.417051,1.000000,1,500,deathstroke-mi-dementia-,logisticRegression-deathstroke-mi-dementia-,2.484584e-08,0.000001
...,...,...,...,...,...,...,...,...,...
67,2.727091e-01,0.157073,0.082838,2,1000,_gcp-mean,linearRegression-_gcp-mean,2.484584e-08,0.000001
68,1.496000e+00,0.624071,0.016890,1,500,_gcp-last,linearRegression-_gcp-last,2.484584e-08,0.000001
69,5.308123e-01,0.438990,0.226884,1,1000,_gcp-last,linearRegression-_gcp-last,2.484584e-08,0.000001
70,5.280000e-01,0.596910,0.376823,2,500,_gcp-last,linearRegression-_gcp-last,2.484584e-08,0.000001
