In [1]:
import pandas as pd
import numpy as np

age_groups = 10

In [2]:
# Load MixIT 2022_2023 data
pc = pd.read_csv('data_out/MixIT/participant_common_MixIT_2022_2023.csv')
pe = pd.read_csv('data_out/MixIT/participant_extra_MixIT_2022_2023.csv')
cc = pd.read_csv('data_out/MixIT/contact_common_MixIT_2022_2023.csv')
ce = pd.read_csv('data_out/MixIT/contact_extra_MixIT_2022_2023.csv')

ri = pc.merge(pe, on='part_id')
cs = cc.merge(ce, on='cont_id')

# compute direct contacs
ri = ri.merge(cs.groupby('part_id')['cont_id'].count().rename('direct_contacts').reset_index(), on='part_id', how='left')
ri = ri.rename(columns={'part_age_exact':'part_age'})
# compute indoor direct contacs
ri = ri.merge(cs[cs['setting']=='indoor'].groupby('part_id')['cont_id'].count().rename('indoor_direct_contacts').reset_index(), on='part_id', how='left')
ri[['direct_contacts','indoor_direct_contacts']] = ri[['direct_contacts','indoor_direct_contacts']].fillna(0).astype(int)

# Add respondent information to contact data
cs['part_age'] = cs['part_id'].map(ri.set_index('part_id')['part_age'].to_dict())
cs['part_occupation'] = cs['part_id'].map(ri.set_index('part_id')['occupation'].to_dict())
cs['primary_vaccination_cycle'] = cs['part_id'].map(ri.set_index('part_id')['primary_vaccination_cycle'].to_dict())

# Cut ages into age_groups
age_bins = [i for i in range(0,75,age_groups)]+[150]
age_labels = [f'{i}-{i+age_groups-1}' for i in range(0,70,age_groups)]+['70+']
ri[f'part_age_group_{age_groups}'] = pd.cut(ri['part_age'], bins=age_bins, labels=age_labels, right=False, include_lowest=True)
cs[f'part_age_group_{age_groups}'] = pd.cut(cs['part_age'], bins=age_bins, labels=age_labels, right=False, include_lowest=True)
cs[f'cnt_age_group_{age_groups}'] = pd.cut(cs['cnt_age_exact'], bins=age_bins, labels=age_labels, right=False, include_lowest=True)


In [3]:
# Prepare single dominant setting augmentation
# adding respondent occupation for all respondents (children are mapped as students if they either go to kindergarten or to school)
ri['dominant_location'] = ri['occupation'].map({"Student":"School","Employed":"Work"})
ri.loc[ri[(ri['occupation']=='Student')&(ri['presence_school']!='In-person attendance')].index,'dominant_location'] = np.nan
ri.loc[ri[(ri['occupation']=='Employed')&(ri['presence_work']!='In-person work')].index,'dominant_location'] = np.nan

# adding non-student and non-workers main location type
non_occupated_map = (cs[~cs['location_fine_multi'].isin(['Cohabitant','School','Work'])]
    .groupby(['part_id','location_fine_multi'],as_index=False)['cont_id']
    .count()
    .sort_values('cont_id', ascending=False)
    .drop_duplicates(['part_id'], keep='first')
    .set_index('part_id')['location_fine_multi']
    .to_dict()
)
ri.loc[ri['dominant_location'].isna(),'dominant_location'] = ri.loc[ri['dominant_location'].isna(),'part_id'].map(non_occupated_map)
# Note: Those that only had contacts with cohabitant will see their "indirect" contacts set based on general pop location

cs = cs.merge(ri[['part_id','dominant_location']],on='part_id')

In [4]:
"""
    contact pd.DataFrame resulting from prolonged contact explosion consist of three additional columns:
        'is_personal' : 0/1 based on whether the contact information is sampled from collective or personal contacts
        'is_soft'     : 0 if sampling new contacts, 1 if sampled contact could be already reported as indoor contact in the diary
                        (simple count matching/subtraction based on the number of indoor contacts reported within the diary contacts)
        'is_age_match': 0/1 if sampling is based on exact age matching of respondent age 
                        (wheter with her/himself age (trivial if at least one contact is present) or with all other respondents)
"""
def sample_contacts(s_cs, respondent, cols=None):
    """
        s_cs: DataFrame containing setting specific contacts for all respondents (cohabitants are excluded)
        respondent: single respondent info (pd.Series)
        cols: list of columns to sample as contact properties (by Default select an entire contact row)

        return: a pd.DataFrame
    """
    if cols is None: cols = list(set(s_cs.columns).difference(set(respondent.index)))
    r_cols = list(set(s_cs.columns).intersection(set(respondent.index)))
    soft_n = respondent.loc['indirect_contacts']
    hard_n = max(0, soft_n - respondent.loc['indoor_direct_contacts'])
    scs = s_cs.copy()
    if respondent['presence_school']!=2: scs = scs[scs['location_multi']!='School']
    if respondent['presence_work']!=2:   scs = scs[scs['location_multi']!='Work']
    ## Specifies "vaccine-specific" contact patterns (to preserve assortativity) 
    ## -> requires respondent vaccination status and contact vaccinal status.
    if respondent['primary_vaccination_cycle']=='Yes':          scs = scs[scs['primary_vaccination_cycle']=='Yes']
    if respondent['primary_vaccination_cycle']=='No':          scs = scs[scs['primary_vaccination_cycle']=='No']
    if soft_n > 0:
        # whether to sample from personal contacts (if any available)
        on_personal = scs[(scs['setting']=='indoor')&(scs['part_id']==respondent.loc['part_id'])]
        # or from collective: collective sampling is conditioned only on age...but we could try to add more here...
        on_collective = scs[(scs['setting']=='indoor')&(scs['part_age']==respondent.loc['part_age'])]
        ag = 1
        if on_collective.empty:
            on_collective = scs[(scs['setting']=='indoor')&(scs[f'cnt_age_group_{age_groups}']==respondent.loc[f'part_age_group_{age_groups}'])]
            ag = 0
        # print(on_personal.shape,'personal')
        # print(on_collective.shape,'collective')
        # print(respondent[['respondent_age','caseid']])
        if on_personal.empty:
            rsample = on_collective[cols].sample(soft_n, replace=True)
            rsample['is_personal'] = False
            rsample['is_soft'] = [False]*hard_n + [True]*min(soft_n,respondent.loc['indoor_direct_contacts'])
            rsample['is_age_match'] = bool(ag)
            for c in r_cols:
                rsample[c] = respondent[c]
        else:
            rsample = on_personal[cols].sample(soft_n, replace=True)
            rsample['is_personal'] = True
            rsample['is_soft'] = [False]*hard_n + [True]*min(soft_n,respondent.loc['indoor_direct_contacts'])
            rsample['is_age_match'] = bool(ag)
            for c in r_cols:
                rsample[c] = respondent[c]
        return rsample
def single_setting_completion(cs,ri):
    s_cs = cs[cs['dominant_location']==cs['location_fine_multi']]
    cs_copy = pd.concat(list(ri.apply(lambda respondent: sample_contacts(s_cs, respondent),axis=1)))
    return cs_copy
def proportional_setting_completion(cs,ri):
    s_cs = cs[cs['location_fine_multi']!='Cohabitant']
    cs_copy = pd.concat(list(ri.apply(lambda respondent: sample_contacts(s_cs, respondent),axis=1)))
    return cs_copy
### Sampling prolonged contact based on requested method
def completing_contacts_data(cs,ri,method=None):
    """
        cs: contact dataframe
        ri: respondent information dataframe
        method: whether to return exploded prolongued contact (or not) 
                assigning on a single setting or proportionally on all settings (cohabitant excluded)
        returns: contact dataframe concatenated with prolongued contacts
    """
    if method is None:
        cs_copy = cs.copy()
    elif method == 'single_setting':
        cs_copy = single_setting_completion(cs,ri)
        cs_copy = pd.concat([cs,cs_copy.astype({'is_personal':float, 'is_soft':float, 'is_age_match':float})], ignore_index=True)
    elif method == 'proportional_setting':
        cs_copy = proportional_setting_completion(cs,ri)
        cs_copy = pd.concat([cs,cs_copy.astype({'is_personal':float, 'is_soft':float, 'is_age_match':float})], ignore_index=True)
    else: raise Exception('Invalid METHOD option!')
    return cs_copy


In [5]:
cs_single = completing_contacts_data(cs,ri,method='single_setting')
cs_single['cont_unique_id'] = cs_single.index + 1

cs_proportional = completing_contacts_data(cs,ri,method='proportional_setting')
cs_proportional['cont_unique_id'] = cs_proportional.index + 1

In [6]:
# cs_single.to_csv(path_to_data+"data_out/clean_contacts_proc_prol_single_setting.csv", index=True)
# cs_proportional.to_csv(path_to_data+"data_out/clean_contacts_proc_prolonged_proportional_setting.csv", index=True)

cs_single.to_csv(f"data_out/contacts_prol_sing_sett_{age_groups}y_agegroups.csv", index=False)
cs_proportional.to_csv(f"data_out/contacts_prol_prop_{age_groups}y_agegroups.csv", index=False)

ri.to_csv(f"data_out/participants_rich_{age_groups}y_agegroups.csv", index=False)