In [None]:
import sys
sys.path.insert(0, '../src')

import os
import pandas as pd
import numpy as np
import matplotlib.pyplot as plt

from carpediem import data as cd_data

pd.options.display.max_columns = 300
pd.options.display.max_rows = 200

import seaborn as sns
sns.set_context('paper')

import builtins


# Read in data

# Add EDW BAL results

In [None]:

data = pd.read_parquet(cd_data.resources.edw['bal_results.pq'])

### Extract PCR and Culture results

**Pathogen information**
1. Simple aggregates: Bacteria, Virus, Bacteria+Virus, Resistance flag
2. Lossless pathogen information in a JSON field 

In [None]:
to_collect = []
pcr = data.loc[data.index_type.notna(), "staphylococcus aureus":"SARS-COV-2"].copy()

In [None]:
absent_pathogens = pcr.columns[pcr.notna().sum(axis="rows") == 0]

In [None]:
pcr.drop(absent_pathogens, axis="columns", inplace=True)

In [None]:
for c in pcr:
    pcr[c] = pcr[c].str.contains(r"Positive|(?<!Not )Detected", regex=True)

In [None]:
PCR_MERGES = {
    'SARS-COV-2': 'covid_19',
    'adenovirus - rvpdmb': 'adenovirus',
    'respiratory syncytial-virus respan22': 'respiratory syncytial virus',
    'human_metapneumovirus_lower_resp': 'human metapneumovirus',
    'human_rhinovirus_enterovirus_lower_resp': 'human rhinovirus/enterovirus',
}

In [None]:
for s, t in PCR_MERGES.items():
    idx = pcr[t].isna() & pcr[s].notna()
    pcr.loc[idx, t] = pcr[s][idx]
    pcr.drop(columns=s, inplace=True)

In [None]:
PCR_STAPH = [
    'staphylococcus aureus', 
]
PCR_STREP = [
    'streptococcus agalactiae', 'streptococcus pneumoniae', 'streptococcus pyogenes',
]
PCR_KLEB = [
    'klebsiella aerogenes', 'klebsiella oxytoca',
    'klebsiella pneumoniae group'
]
PCR_OTHER_BAC = [
    'acinetobacter calcoaceticus-baumannii complex',
    'enterobacter cloacae complex', 'escherichia coli',
    'proteus spp', 'serratia marcescens',
    'pseudomonas aeruginosa', 'haemophilus influenzae', 'chlamydia_pneumoniae_lower_resp',
    'mycoplasma pneumonaie', 'moraxella catarrhalis',
    'legionalla pneumophilia', 'bordetella pertussis', 'bordetella parapertussis',
    'chlamydia pneumoniae', 'mycoplasma pneumoniae',

]

PCR_INFLUENZA = [
    'influenza a', 'influenza b',
    'parainfluenzae virus', 'influenza-a respan23', 'influenza a h3 (seasonal)',
    'influenza a h1 (seasonal)', 'influenza a h1-2009',
    'influenza-b respan21',
    'para influ 1', 'para influ 2', 'para influ 3', 'parainfluenza virus 4', 
]
PCR_CORONAVIRUS = [
    'coronavirus',  'coronavirus 229e', 'coronavirus hku1',
    'coronavirus nl63', 'coronavirus oc43',
]
PCR_ADENOVIRUS = [
    'adenovirus'
]
PCR_RSV = [
    'respiratory syncytial virus'
]
PCR_OTHER_VIRUS = [
    'covid_19',  
    'human metapneumovirus',
    'human rhinovirus/enterovirus', 
]

PCR_BACTERIA = PCR_STAPH + PCR_STREP + PCR_KLEB + PCR_OTHER_BAC

PCR_VIRUS = PCR_INFLUENZA + PCR_CORONAVIRUS + PCR_ADENOVIRUS + PCR_RSV + PCR_OTHER_VIRUS

PCR_RESISTANCE_MARKERS = [
    'meca/c and mrej', 'oxa-48-like', 'ctx-m', 'kpc', 'ndm', 'imp', 'vim',
]

PCR_ALL = PCR_BACTERIA + PCR_VIRUS + PCR_RESISTANCE_MARKERS

Verify above categories

In [None]:
set(pcr.columns) - set(PCR_BACTERIA) - set(PCR_RESISTANCE_MARKERS) - set(PCR_VIRUS)

In [None]:
pd.Series(PCR_BACTERIA).duplicated().sum()

In [None]:
pd.Series(PCR_RESISTANCE_MARKERS).duplicated().sum()

In [None]:
pd.Series(PCR_VIRUS).duplicated().sum()

Convert organism quantity to numbers

In [None]:
for i in range(1, 7):
    data[f"organism_{i}_name"] = data[f"organism_{i}_name"].fillna("NA")
    if not pd.api.types.is_numeric_dtype(data[f"organism_{i}_quantity"]):
        data[f"organism_{i}_quantity"] = data[f"organism_{i}_quantity"].replace(
            "<10,000", "100"
        ).str.replace(
            ">", ""
        ).str.replace(",", "").astype(float)

Process names and resistance indicators

In [None]:
def process_name(name):
    name = name.strip()
    resistance = False
    
    # Fancy antibiotics
    if 'Ceftolozane/tazobactam' in name:
        name = name[:name.index('Ceftolozane/tazobactam')].strip()
    # Iron attached to 2 strong antibiotics Fetroga
    if 'Cefiderocol' in name:
        name = name[:name.index('Cefiderocol')].strip()
    if name.endswith('(ESBL Positive)') or name.endswith('(ESBL positive)'):
        name = name[:-len('(ESBL Positive)')].strip()
        resistance = 'esbl'
    if name.endswith('(Vancomycin Resistant)'):
        name = name[:-len('(Vancomycin Resistant)')].strip()
        resistance = 'vanc'
    if name.endswith('Beta Lactamase Positive'):
        name = name[:-len('Beta Lactamase Positive')].strip()
        resistance = 'beta-l'
    if name.endswith('Beta Lactamase Negative'):
        name = name[:-len('Beta Lactamase Negative')].strip()
    if name.endswith('Too fastidious for routine susceptibility testing.'):
        name = name[:-len('Too fastidious for routine susceptibility testing.')].strip()
        resistance = 'not-tested'
        
   
    if name.endswith('(presumptive)'):
        name = name[:-len('(presumptive)')].strip()
    
    if name.endswith('(Encapsulated Strain)'):
        name = name[:-len('(Encapsulated Strain)')].strip()
    if name.endswith('#2') or name.endswith('#3'):
        name = name[:-2].strip()
    if name.endswith('Group') or name.endswith('group'):
        name = name[:-5].strip()
        
    if ',' not in name and '(' not in name and name.count(' ') == 1:
        return name, resistance
    
    if name == 'Yeast, Not Cryptococcus Species':
        return name, resistance
    if name == 'Staphylococcus coagulase negative':
        return name, resistance
    if name == 'Methicillin-Resistant Staphylococcus aureus':
        return 'Staphylococcus aureus', 'mrsa'
    if name == 'Beta Hemolytic Streptococci, Group C':
        return name, resistance
    if name == 'Streptococcus agalactiae (Group B)':
        return name, resistance
    if name == 'Burkholderia cepacia complex':
        return name, resistance
    if name == 'Beta Hemolytic Streptococci, Group F':
        return name, resistance
    if name == 'Citrobacter freundii group (ESBL Positive) Note: This organism produces the KPC carbapenemase. Consultation with Infectious Disease Service is recommended.':
        return 'Citrobacter freundii group', 'esbl, kpc'
    if name == 'Enterobacter cloacae complex':
        return name, resistance
    if name == 'Haemophilus species, not influenzae':
        return name, resistance
    if name == 'Beta Hemolytic Streptococci, Group G':
        return name, resistance
    if name == 'Beta Hemolytic Streptococci, not Group A, B, C, D, F, or G':
        return name, resistance
    if name == 'Enterococcus faecalis Preliminarily reported as Enterococcus faecium':
        return 'Enterococcus faecalis', resistance
    if name == 'Streptococcus pyogenes (Group A)':
        return name, resistance
    if name == 'Acinetobacter baumannii complex':
        return 'Acinetobacter baumannii', resistance
    if name == 'Klebsiella (Enterobacter) aerogenes':
        return 'Klebsiella aerogenes', resistance
    if name == 'Streptococcus mitis oralis':
        return name, resistance

    print(f'[{name}]')
    return name, resistance

In [None]:
def compile_culture_result(row):
    result = []
    for i in range(1, 7):
        name = row[f'organism_{i}_name']
        if name in ('NA', np.nan):
            continue
        name, resistance = process_name(name)
        result.append(dict(
            name=name,
            resistance=resistance,
            cfu=row[f'organism_{i}_quantity'],
        ))
    performed = False
    if isinstance(row.gram_stain_report_organisms, str) and len(row.gram_stain_report_organisms):
        performed = True
    return json.dumps(dict(performed=performed, organisms=result))
    
culture_results = data.loc[
    data.index_type.notna(), 
    [f"organism_{i}_name" for i in range(1, 7)] 
        + [f"organism_{i}_quantity" for i in range(1, 7)]
        + ['gram_stain_report_organisms']
].apply(
    compile_culture_result, 
    axis="columns"
)

In [None]:
def compile_pcr_result(row):
    pos = row.index[row.fillna(False)]
    result = dict(resistance=[], bacteria=[], virus=[])
    if pos.isin(PCR_RESISTANCE_MARKERS).sum() > 0:
        result['resistance'] = pos[pos.isin(PCR_RESISTANCE_MARKERS)].tolist()
    if pos.isin(PCR_BACTERIA).sum() > 0:
        result['bacteria'] = pos[pos.isin(PCR_BACTERIA)].tolist()
    if pos.isin(PCR_VIRUS).sum() > 0:
        result['virus'] = pos[pos.isin(PCR_VIRUS)].tolist()
    result['performed'] = False
    if row.notna().sum() > 0:
        result['performed'] = True
    result['tests'] = row.index[row.notna()].tolist()
    return json.dumps(result)
    
pcr_results = pcr.apply(compile_pcr_result, axis="columns")

In [None]:
FUNGAL_PREFIXES = [
    'Candida', 'Yeast', 'Blastomyces', 'Aspergillus',
    'Penicillium', 'Saccharomyces'
]
def process_fungal_name(name):
    name = name.strip()
    quantity = 'NA'
    
    if name.startswith('Rare'):
        name = name[4:].strip()
        quantity = 'rare'
    if name.startswith('Many'):
        name = name[4:].strip()
        quantity = 'many'
    if name.startswith('Few'):
        name = name[3:].strip()
        quantity = 'few'
    if name.startswith('Moderate'):
        name = name[len('Moderate'):].strip()
        quantity = 'moderate'
    
    if name == 'CANGLA':
        name = 'Candida glabrata'
    if name == 'CANKRU':
        name = 'Candida krusei'
    if name == 'CANALB':
        name = 'Candida albicans'
    if name == 'CANPARAP':
        name = 'Candida parapsilosis'
    if name == 'CANDUB':
        name = 'Candida dubliniensis'
    if name == 'SACCER':
        name = 'Saccharomyces cerevisiae'
    if name == 'YEAST':
        name = 'Yeast'
    if name == 'CANGUI':
        name = 'Candida guilliermondii'
    if name == 'CANLUS':
        name = 'Candida lusitaniae'
    if name == 'ASPNF':
        name = 'Aspergillus species, not fumigatus'
    if name == 'ASPFUM':
        name = 'Aspergillus fumigatus'
    if name == 'YEANCA':
        name = 'Yeast NCA'
    if name == 'CANKEF':
        name = 'Candida kefyr'
    if name == 'PENEC':
        name = 'Penicillium species'
    if name == 'CANTRO':
        name = 'Candida tropicalis'
    if name == 'ASPER':
        name = 'Aspergillus species'

    if name == 'CRYNEO':
        return 'Cryptococcus neoformans', quantity
    if name == 'BLADER':
        return 'Blastomyces dermatitidis', quantity
    if name == 'YNCRY':
        return 'Yeast, Not Cryptococcus Species', quantity
    if name == 'GEOTR':
        return 'Geotrichum species', quantity
    if name == 'SMNFS':
        return 'Sterile Mold', quantity
    if name == 'Yeast, Not Cryptococcus Species':
        return name, quantity
    
    for prefix in FUNGAL_PREFIXES:
        if name.startswith(prefix) and name.count(' ') < 2 and '(' not in name:
            return name, quantity

    print(f'[{name}]')
    return name, quantity

In [None]:
def compile_fungal_result(row):
    result = []
    for i in range(1, 4):
        name = row[f'culture_fungal_w_smear_bal_organism_id_{i}']
        if name in ('NA', np.nan, 'Negative for Growth of Fungus at 4 weeks.'):
            continue
       
        if name == 'See comment':
            continue
        name, quantity = process_fungal_name(name)
        result.append(dict(
            name=name,
            quantity=quantity,
        ))
    return json.dumps(result)
    
fungal_results = data.loc[
    data.index_type.notna(), 
    [f'culture_fungal_w_smear_bal_organism_id_{i}' for i in range(1, 4)]
].apply(
    compile_fungal_result, 
    axis='columns'
)

In [None]:
def merge_pcr_culture_fungal(row):
    cul = json.loads(row.Culture)
    pcr = json.loads(row.PCR)
    fun = json.loads(row.Fungal)
    return json.dumps(dict(pcr=pcr, culture=cul, fungal=fun))

pathogen_results = pcr_results.to_frame().merge(
    culture_results.to_frame(),
    left_index=True, 
    right_index=True, 
    how="left"
).merge(
    fungal_results.to_frame(),
    left_index=True, 
    right_index=True, 
    how="left"
).rename({"0_x": "PCR", "0_y": "Culture", 0: "Fungal"}, axis="columns").apply(
    merge_pcr_culture_fungal,
    axis='columns'
)

## Add PCR and Culture results to main dataframe

In [None]:
data.loc[pathogen_results.index, 'pathogen_results'] = pathogen_results

## Setting pathogen fields from this

* Any CFU thresholding?
* Bacteria from both PCR and Culture, united?
* **NB**: We trust PCR bacterial results without positive Culture

A series of binary flags:
1. Virus detected?
2. Bacteria detected?
3. Fungi detected?
4. Resistance detected?
5. Aspergillus detected?

And
+ names of the bugs with a separator

1. Any virus (from PCR)?

In [None]:
def any_virus(row):
    if not isinstance(row.pathogen_results, str) and np.isnan(row.pathogen_results):
        return np.nan
    pathogens = json.loads(row.pathogen_results)
    if len(pathogens['pcr']['virus']) > 0:
        return True
    return False
data['pathogen_virus'] = data.apply(any_virus, axis=1)

2. Any bacteria (from PCR or from Culture with CFU >= 1000)

In [None]:
def any_bacteria(row):
    if not isinstance(row.pathogen_results, str) and np.isnan(row.pathogen_results):
        return np.nan
    pathogens = json.loads(row.pathogen_results)
    if len(pathogens['pcr']['bacteria']) > 0:
        return True
    for pathogen in pathogens['culture']['organisms']:
        if pathogen['cfu'] >= 1000:
            return True
    return False
data['pathogen_bacteria'] = data.apply(any_bacteria, axis=1)

3. Any fungus (from Fungal)

In [None]:
def any_fungus(row):
    if not isinstance(row.pathogen_results, str) and np.isnan(row.pathogen_results):
        return np.nan
    pathogens = json.loads(row.pathogen_results)
    if len(pathogens['fungal']) > 0:
        return True
    return False
data['pathogen_fungi'] = data.apply(any_fungus, axis=1)

4. Any resistance (from PCR markers or from Culture with CFU >= 1000)

In [None]:
def any_resistance(row):
    if not isinstance(row.pathogen_results, str) and np.isnan(row.pathogen_results):
        return np.nan
    pathogens = json.loads(row.pathogen_results)
    if len(pathogens['pcr']['resistance']) > 0:
        return True
    for pathogen in pathogens['culture']['organisms']:
        if pathogen['cfu'] >= 1000:
            if pathogen['resistance'] and pathogen['resistance'] != 'not-tested':
                return True
    return False
data['pathogen_resistance'] = data.apply(any_resistance, axis=1)

5. Any aspergillus?

In [None]:
def any_aspergillus(row):
    if not isinstance(row.pathogen_results, str) and np.isnan(row.pathogen_results):
        return np.nan
    pathogens = json.loads(row.pathogen_results)
    for pathogen in pathogens['fungal']:
        if pathogen['name'].startswith('Aspergillus'):
            return True
    return False
data['pathogen_aspergillus'] = data.apply(any_aspergillus, axis=1)

### Process BAL comment field for intracellular and extracellular pathogens

In [None]:
def process_comments(comment_o):
    comment = comment_o
    if not isinstance(comment, str):
        return np.nan
    if 'results called by' in comment.lower():
        comment = comment[:comment.lower().index('results called by')]
    if 'result called by' in comment.lower():
        comment = comment[:comment.lower().index('result called by')]
    if 'see separate' in comment.lower():
        comment = comment[:comment.lower().index('see separate')]
    if 'spoke to' in comment.lower():
        comment = comment[:comment.lower().index('spoke to')]
        
    comment = re.split(r'\.|,|/|\sand\s', comment)
    # print(comment)
    result = []
    for piece in comment:
        piece = piece.strip()
        if len(piece) == 0:
            continue
        piece = piece.lower()
        if 'other' in piece:
            result.append('other')
        elif 'smear' in piece:
            result.append('smear')
        elif (
            'alveolar' in piece 
            or 'alveloar' in piece 
            or 'aveolar' in piece
            or 'avleolar' in piece
        ):
            result.append('alveolar macs')
        elif 'squamous' in piece or 'squamos' in piece:
            result.append('squamous')
        elif 'yeast' in piece:
            result.append('yeast')
        elif 'differential' in piece:
            result.append('no result')
        elif 'lining' in piece:
            result.append('lining')
        elif 'epithelial' in piece:
            result.append('epithelial')
        elif 'eosinophil' in piece:
            result.append('eosinophil')
        elif 'neutrophil' in piece:
            result.append('neutrophil')
        elif 'lymphocyte' in piece:
            result.append('lymphocyte')
        elif 'monocyte' in piece:
            result.append('monocyte')
        elif 'macrophage' in piece:
            result.append('macrophage')
        elif 'plasma cells' in piece:
            result.append('plasma cells')
        elif 'metamyelocytes' in piece:
            result.append('metamyelocytes')
        elif 'erythrophagocytosis' in piece or 'erythrophages' in piece:
            result.append('erythrophagocytosis')
        elif 'leukophagocytosis' in piece:
            result.append('leukophagocytosis')
        elif 'mucous clots' in piece:
            result.append('mucous clots')
        elif 'bacteria' in piece:
            result.append('bacteria')
        elif 'fungal' in piece or 'fungi' in piece:
            result.append('fungi')
        elif (
            'affect' in piece 
            or ('affected' in piece and 'counts' in piece)
            or 'inaccurate' in piece
            or 'accurate' in piece
            or 'affcted' in piece
        ):
            result.append('counts affected')
        elif 'degeneration' in piece or 'degenration' in piece or 'degenerated' in piece:
            result.append('degeneration')
        elif 'extracellular' in piece and 'intracellular' in piece:
            result.append('extracellular')
            result.append('intracellular')
        elif (
            'extracellular' in piece 
            or 'axtracellular' in piece
            or 'extracelllular' in piece
            or 'extra cellular' in piece
            or 'external' in piece
        ):
            result.append('extracellular')
        elif (
            'intracellular' in piece 
            or 'itracellular' in piece
            or 'intacellular' in piece
            or 'intra' in piece
            or 'internal' in piece
        ):
            result.append('intracellular')
        elif 'unidentified' in piece:
            result.append('unidentified')
        elif 'no cells' in piece:
            result.append('no cells')
        elif 'sample too viscous' in piece:
            result.append('no result')
        elif 'microorganisms' in piece:
            result.append('bacteria')
        elif (piece == '"' or piece == 'critical'):
            pass
        else:
            pass
            print(comment_o)
            print(f'[{piece}]')
    return ','.join(sorted(result))

    
data['pathogen_comments'] = data.comments.apply(process_comments)

In [None]:
data['pathogen_comments_intra'] = data.pathogen_comments.str.contains('intracellular')
data['pathogen_comments_extra'] = data.pathogen_comments.str.contains('extracellular')

In [None]:
data.pathogen_comments_intra.value_counts()

In [None]:
data.pathogen_comments_extra.value_counts()

## Set BAL numbers

Galactomannan

In [None]:
col = 'aspergillus galactomannan antigen(nmh/lfh)'

In [None]:
def pick_max(val):
    if not isinstance(val, str):
        return np.nan
    result = []
    for i in val.split():
        result.append(float(i))
    return max(result)
data['pathogen_galactomannan'] = data[col].str.replace(
    r'Negative|Positive|>=?', 
    '', 
    regex=True
).str.strip().apply(pick_max)

Amylase

In [None]:
data['amylase'] = data['amylase bf']
data.loc[data.amylase.str.contains('The reference range and other method').fillna(False), 'amylase'] = np.nan
data.amylase = data.amylase.str.replace('<10', '5').str.replace('>', '').astype(float)

BAL cell count differential

In [None]:
columns = {
    'neutrophils, body fluid': 'bal_pct_neutro',
    'macrophage bf': 'bal_pct_macro',
    'monocyte bf': 'bal_pct_mono',
    'lymph bf': 'bal_pct_lympho',
    'eosinophils, body fluid': 'bal_pct_eos',
    'plasma cell bf': 'bal_pct_plasma',
    'other cells, body fluid': 'bal_pct_other'
}

In [None]:
for col, new_name in columns.items():
    data[new_name] = data[col]
    if pd.api.types.is_numeric_dtype(data[new_name].dtype):
        continue
    data.loc[data[new_name].str.lower().str.contains('viscous').fillna(False), new_name] = np.nan
    data[new_name] = (
        data[new_name]
            .str.strip()
            .replace('TVTC', np.nan)
            .replace('WBC too low to do differential. Smear on file.', np.nan)
            .replace('See Comment', np.nan)
            .astype(float)
    )

In [None]:
cd_data.export(
    data, 
    path=cd_data.materials.general['03_counting'].path, 
    name='03_counting_with_BAL'
)