Here is my notebook for making and stratifying models.

Let's first create a basic SEIRHD model. This is going to be the base-case for all models going forward.

In [None]:
import sympy
import itertools

from mira.metamodel import *
from mira.modeling import Model
from mira.modeling.askenet.petrinet import AskeNetPetriNetModel

person_units = lambda: Unit(expression=sympy.Symbol('person'))
virus_units = lambda: Unit(expression=sympy.Symbol('virus'))
virus_per_gram_units = lambda: Unit(expression=sympy.Symbol('virus')/sympy.Symbol('gram'))
day_units = lambda: Unit(expression=sympy.Symbol('day'))
per_day_units = lambda: Unit(expression=1/sympy.Symbol('day'))
dimensionless_units = lambda: Unit(expression=sympy.Integer('1'))
gram_units = lambda: Unit(expression=sympy.Symbol('gram'))
per_day_per_person_units = lambda: Unit(expression=1/(sympy.Symbol('day')*sympy.Symbol('person')))

# See Table 1 of the paper
c = {
    'S': Concept(name='S', units=person_units(), identifiers={'ido': '0000514'}),
    'E': Concept(name='E', units=person_units(), identifiers={'apollosv': '0000154'}),
    'I': Concept(name='I', units=person_units(), identifiers={'ido': '0000511'}),
    'V': Concept(name='V', units=person_units(), identifiers={'vido': '0001331'}),
}


parameters = {
    'gamma': Parameter(name='gamma', value=0.08, units=per_day_units()),
    'delta': Parameter(name='delta', value=1/8, units=per_day_units()),
    'alpha': Parameter(name='alpha', value=500, units=gram_units(),
                       distribution=Distribution(type='Uniform1',
                                                 parameters={
                                                     'minimum': 51,
                                                     'maximum': 796
                                                 })),
    'lambda': Parameter(name='lambda', value=9.66e-8, units=per_day_per_person_units()),
    'beta': Parameter(name='beta', value=4.49e7, units=virus_per_gram_units()),
    'k': Parameter(name='k', value=1/3, units=per_day_units()),
}

initials = {
    'S': Initial(concept=Concept(name='S'), value=2_300_000),
    'E': Initial(concept=Concept(name='E'), value=1000),
    'I': Initial(concept=Concept(name='I'), value=0),
    'V': Initial(concept=Concept(name='V'), value=0),
}

S, E, I, V, gamma, delta, alpha, lmbd, beta, k = \
    sympy.symbols('S E I V gamma delta alpha lambda beta k')

t1 = ControlledConversion(subject=c['S'],
                          outcome=c['E'],
                          controller=c['I'],
                          rate_law=S*I*lmbd)
t2 = NaturalConversion(subject=c['E'],
                       outcome=c['I'],
                       rate_law=k*E)
t3 = NaturalDegradation(subject=c['I'],
                        rate_law=delta*I)
t4 = ControlledProduction(outcome=c['V'],
                          controller=c['I'],
                          rate_law=alpha*beta*(1-gamma)*I)
templates = [t1, t2, t3, t4]
observables = {}
tm = TemplateModel(
    templates=templates,
    parameters=parameters,
    initials=initials,
    time=Time(name='t', units=day_units()),
    observables=observables,
    annotations=Annotations(name='Scenario 3 base model'))

In [None]:
# Add uncertainty
parameters = {
    'gamma': Parameter(name='gamma', value=0.08, units=per_day_units(),
                       distribution=Distribution(type='Uniform1',
                                                 parameters={
                                                     'minimum': 0.06,
                                                     'maximum': 0.09
                                                 })),
    'delta': Parameter(name='delta', value=1/8, units=per_day_units()),
    'alpha': Parameter(name='alpha', value=500, units=gram_units(),
                       distribution=Distribution(type='Uniform1',
                                                 parameters={
                                                     'minimum': 51,
                                                     'maximum': 796
                                                 })),
    'lambda': Parameter(name='lambda', value=9.66e-8, units=per_day_per_person_units(),
                       distribution=Distribution(type='Uniform1',
                                                 parameters={
                                                     'minimum': 6.66e-8,
                                                     'maximum': 12.66e-8
                                                 })),
    'beta': Parameter(name='beta', value=4.49e7, units=virus_per_gram_units()),
    'k': Parameter(name='k', value=1/3, units=per_day_units()),
}

# E here is E(0) -> (10, 100, 1000, 5000)
initials = {
    'S': Initial(concept=Concept(name='S'), value=2_300_000),
    'E': Initial(concept=Concept(name='E'), value=1000),
    'I': Initial(concept=Concept(name='I'), value=0),
    'V': Initial(concept=Concept(name='V'), value=0),
}

tm = TemplateModel(
    templates=templates,
    parameters=parameters,
    initials=initials,
    time=Time(name='t', units=day_units()),
    observables=observables,
    annotations=Annotations(name='Scenario 3 base model'))

In [None]:
AskeNetPetriNetModel(Model(tm)).to_json_file('my_model.json')

In [None]:
STATUSES = [
    "unvaccinated",
    "vaccinated",
]
AGES = [
    "0-19",
    "20-49",
    "50-64",
    "65",
]
VARIANTS = [
    "wild",
    "delta",
    "omicron",
]

In [None]:
person_units = lambda: Unit(expression=sympy.Symbol("person"))
day_units = lambda: Unit(expression=sympy.Symbol("day"))
per_day_units = lambda: Unit(expression=1 / sympy.Symbol("day"))
dimensionless_units = lambda: Unit(expression=sympy.Integer("1"))
per_day_per_person_units = lambda: Unit(
    expression=1 / (sympy.Symbol("day") * sympy.Symbol("person"))
)

BASE_CONCEPTS = {
    "S": Concept(name="S", units=person_units(), identifiers={"ido": "0000514"}),
    "E": Concept(name="E", units=person_units(), identifiers={"apollosv": "0000154"}),
    "I": Concept(name="I", units=person_units(), identifiers={"ido": "0000511"}),
    "R": Concept(name="R", units=person_units(), identifiers={"ido": "0000592"}),
    "H": Concept(
        name="H",
        units=person_units(),
        identifiers={"ido": "0000511"},
        context={"property": "ncit:C25179"},
    ),
    "D": Concept(name="D", units=person_units(), identifiers={"ncit": "C28554"}),
}


N_val = 19_340_000
E_val = 1
I_val = 4
R_0 = 2.6
gamma_val = 1/5

BASE_PARAMETERS = {
    "gamma":  Parameter(name="gamma", value=gamma_val, units=per_day_units()),
    "eta": Parameter(name="eta", value=0.1, units=dimensionless_units()),
    "mu":  Parameter(name="mu", value=0.003, units=dimensionless_units()),
    "lr":  Parameter(name="lr", value=5, units=day_units()), # average time to recovery (duration of hospital stay if they recover)
    "ld":  Parameter(name="ld", value=9.25, units=day_units()), # average time to recovery (duration of hospital stay if they die)
    "rho": Parameter(name="rho", value=1/2, units=per_day_units()),
    "q": Parameter(name="q", value=R_0 * gamma_val, units=dimensionless_units()), # transmission probability
    "d": Parameter(name="d", value=1/N_val, units=per_day_per_person_units()), # scaled contact rate
    "phi": Parameter(name="phi", value=1.0, units=dimensionless_units()), # host susceptibility
    "chi": Parameter(name="chi", value=1.0, units=dimensionless_units()), # relative transmissibility of variant

}

BASE_INITIALS = {
    "S": Initial(concept=Concept(name="S"), value=N_val - (E_val + I_val)),
    "E": Initial(concept=Concept(name="E"), value=E_val),
    "I": Initial(concept=Concept(name="I"), value=I_val),
    "R": Initial(concept=Concept(name="R"), value=0),
    "H": Initial(concept=Concept(name="H"), value=0),
    "D": Initial(concept=Concept(name="D"), value=0),
}

observables = {}

In [None]:
# make function to do this automatically
(
    S,
    E,
    I,
    R,
    D,
    H,
    q,
    d,  # d eats the N
    rho, # r_E_to_I,
    gamma,
    eta,
    mu,
    lr,
    ld,
    chi,
    phi,
) = sympy.symbols(
    "S E I R D H q d rho gamma eta mu lr ld chi phi"
)

In [None]:
ORDER = ["age", "variant", "status"]
STRATA = {
    "age": AGES, 
    "variant": VARIANTS,
    "status": STATUSES,
}

stratification_config = {
    "S": ["age", "status"],
    "E": ["age", "variant", "status"],
    "I": ["age", "variant", "status"],
    "R": ["age", "variant", "status"],
    "H": ["age", "variant", "status"],
    "D": ["age", "variant", "status"],
}

param_stratification_config = {
    "d": ["age", "age"],
    "eta": ["age", "variant", "status"],
    "mu": ["age", "variant", "status"],
    "ld": ["age"],
    "lr": ["age"],
    "phi": ["status"],
    "chi": ["variant"],
}


templates = []

In [None]:
stratification_config["S"]

In [None]:
# Index all p
concepts = {}

for concept, labels in stratification_config.items():
    for keys in itt.product(*(
        zip(itt.repeat(label), enumerate(STRATA[label]))
        for label in labels
    )):
        d = {
            key: str(idx)
            for key, (idx, label) in keys
        }
        idx = tuple(d[k] for k in ORDER if k in d) 
        concept_copy = _d(BASE_CONCEPTS[concept]).with_context(**d, do_rename=False)
        concept_copy.name = f"{concept_copy.name}_" + "_".join(idx)
        concepts[(concept, *idx)] = concept_copy
        
list(concepts.items())[30:35]   

In [None]:
initials = {}
for concept in concepts.values():
    orig_key = concept.name.split("_")[0]
    initials[concept.name] =  Initial(
        concept=concept, value=BASE_INITIALS[orig_key].value
    )

In [None]:
concept_to_strata = defaultdict(list)
for idx, concept in concepts.items():
    concept_to_strata[idx[0]].append(concept)
    
    
def concept_strata_prod(*variables: str):
    yield from itt.product(*(
        concept_to_strata[variable]
        for variable in variables
    ))

In [None]:
# Index all possible parameters with the name as the 
parameters = {}
for parameter, labels in param_stratification_config.items():
    for keys in itt.product(*(
        zip(itt.repeat(label), enumerate(STRATA[label]))
        for label in labels
    )):
        d = defaultdict(list)
        for key, (idx, label) in keys:
            d[key].append(str(idx))
        d = {k:sorted(v) for k,v in d.items()}
        idx = tuple(itt.chain.from_iterable(d[k] for k in ORDER if k in d))
        p = _d(BASE_PARAMETERS[parameter])
        p.name = f"{p.name}_" + "_".join(idx)
        parameters[(parameter, *idx)] = p
        
parameter_to_strata = defaultdict(list)
for idx, parameter in parameters.items():
    parameter_to_strata[idx[0]].append(parameter)
        
list(parameters.items())[:5]

In [None]:
def force_of_infection(a, v, i):
    d = IndexedBase('d')
    I = IndexedBase('I')
    exprs = [
        d[a, age_index] * I[age_index, v , status_index]
        for age_index, _ in enumerate(AGES)
        for status_index, _ in enumerate(STATUSES)
    ]
    return (
        q 
        * Indexed("chi", v)
        * Indexed("psi", i)
        * sum(exprs)
    )
    
force_of_infection(a='0-19', v='omicron', i='vaccinated')

In [None]:
def context_idx(concept):
    return tuple(
        concept.context[part]
        for part in stratification_config[concept.name]
    )
context_idx(concepts['S', '0', '0'])

In [None]:
def c_symbol(concept):
    return sympy.Symbol(concept.name + "_" + "_".join(idx))

c_symbol(concepts['S', '0', '0'])

In [None]:
def force_of_infection_component(e, i):
    d = IndexedBase('d')
    I = IndexedBase('I')
    return (
        q 
        * sympy.Symbol("chi_" + e.context["status"])
        * sympy.Symbol("psi_" + e.context["variant"])
        * sympy.Symbol("d_" + e.context["age"] + "_" + i.context["age"])
        * c_symbol(i)
    )