# MODS Debugging

In [5]:
from pathlib import Path
import sys
sys.path.insert(0, "/opt/scratchspace/KLAB_SAIL/MODSPhenotypes/mods/")
from src.config import *
from src.utils import *
site_name = 'grady'
sample_rate = 1

In [32]:
from tqdm.auto import tqdm

In [6]:
# import pandas as pd
import pyarrow as pa
import pyarrow.parquet as pq

In [8]:
from src.extract_mods import find_pickle_paths, extraction, get_super_df, get_static_df, get_times_df

## Debugging the grady extraction

In [9]:
run_id = '2023_07_29'

In [10]:
output_path = project_path / 'data' / str(run_id) / 'extraction' / site_name # / table_name  / year / file.parquet
output_path.mkdir(parents=True, exist_ok=True)

### static_df issue in grady

In [11]:
pickle_paths = find_pickle_paths(site_name, sample_rate)

100%|██████████| 7/7 [00:02<00:00,  3.08it/s]


In [27]:
pickle_path = pickle_paths[234]

In [28]:
encounter_pickle = load_pickle(pickle_path)

In [34]:
from multiprocessing import Pool, Value
counter = Value('i', 0)

def check_csn(pickle_path):
    """
    Unpickles the file at pickle_path and checks if the 'csn' key exists.
    Increments the global counter if 'csn' key is not found.
    """
    with open(pickle_path, 'rb') as f:
        data = pickle.load(f)

    # Check for the 'csn' key
    if 'csn' not in data:
        with counter.get_lock():  # Locking is necessary for safe updates
            counter.value += 1
    return None

In [35]:
with Pool(processes=num_cpus) as pool:
    max_ = len(pickle_paths)
    with tqdm(total=max_) as pbar:
        for _ in pool.imap_unordered(func=check_csn, iterable=pickle_paths):
            pbar.update()

  0%|          | 0/172561 [00:00<?, ?it/s]

In [29]:
print(f"Number of pickles without 'csn' key: {counter.value}")

{'csn': 1013356869,
 'pat_id': 'Z1989113',
 'cultures_PerCSN': Empty DataFrame
 Columns: [pat_id, proc_cat_id, proc_cat_name, proc_code, proc_desc, component_id, component, loinc_code, specimen_collect_time, order_time, order_id, result_id, lab_result_time, result_status, lab_result, culture_genus_c, culture_genus_description, culture_species_c, culture_species_description, culture_quantity_c, culture_quantity_description, organism_id, organism_description]
 Index: []
 
 [0 rows x 23 columns],
 'beds_PerCSN':               pat_id  bed_location_start    bed_location_end       bed_unit  \
 csn                                                                           
 1013356869  Z1989113 2015-02-25 15:19:00 2015-02-25 16:54:00  GHS EMERGENCY   
 1013356869  Z1989113 2015-02-25 16:54:00 2015-02-25 17:17:00  GHS EMERGENCY   
 1013356869  Z1989113 2015-02-25 17:17:00 2015-02-25 17:44:00  GHS EMERGENCY   
 1013356869  Z1989113 2015-02-25 17:44:00 2015-02-25 20:00:00  GHS EMERGENCY   
 10133

In [None]:
encounter_pickle['sirs_scores'].columns

In [None]:
empty_df = pd.DataFrame(index=encounter_pickle['sofa_scores'].index)

empty_df[['SIRS_resp', 'SIRS_cardio', 'SIRS_temp', 'SIRS_wbc', 'hourly_total', 'delta_24h']] = pd.NA

In [None]:
empty_df

In [None]:
### `static_df` Encounter level data
# def extract_static_df(encounter_pickle, pickle_path):
static_df = get_static_df(encounter_pickle)
times_df = get_times_df(encounter_pickle)

In [None]:
static_df = pd.concat([static_df, times_df], axis=1)

In [None]:
static_schema = {}
for key in static_keys:
    static_schema = static_schema | pandas_schema["static"][key]
for key in times_keys:
    static_schema = static_schema | pandas_schema["static"]["times"][key]
try:
    static_df = static_df.astype(static_schema)
except KeyError as e:
    print(f"KeyError in {str(pickle_path.stem)}: {e}")

In [None]:
static_df.rename(
    columns={
        "t_suspicion": "times_suspicion_sepsis3",
        "t_SOFA": "times_SOFA",
        "t_sepsis3": "times_sepsis3",
        "t_abx": "times_abx_order",
        "t_clt": "times_culture",
    },
    inplace=True,
)

# Re-casting `ed_wait_time` containing multiple data-types that cannot be natively used together
# TODO: Make this less ugly
if type(static_df["ed_wait_time"].iloc[0]) is pd.Timedelta:
    static_df["ed_wait_time"].iloc[0] = float(
        static_df["ed_wait_time"].iloc[0].seconds / 60
    )
if pd.isnull(static_df["ed_wait_time"][0]):
    static_df["ed_wait_time"].iloc[0] = 0.0
    static_df["ed_wait_time"].iloc[0] = float("nan")  # TODO: Try using pd.NA

static_table = pa.Table.from_pandas(static_df, preserve_index=False)

output_folder = output_path / "static_df" / str(pickle_path.parent.stem)
output_folder.mkdir(parents=True, exist_ok=True)

pq.write_table(
    static_table,
    output_folder / f"{pickle_path.stem}.parquet",
    # TODO: Get from config
    version="2.6",
    compression="snappy",
)

In [None]:
static_df = get_static_df(encounter_pickle)

In [None]:
static_df.dtypes

### Covid column issue in grady

In [8]:
pickle_paths = find_pickle_paths(site_name, sample_rate)

100%|██████████| 7/7 [00:01<00:00,  3.80it/s]


In [10]:
pickle_path = pickle_paths[100]

In [11]:
encounter_pickle = load_pickle(pickle_path)

In [12]:
super_df = encounter_pickle["super_table"]

In [13]:
super_schema = {}
for col in super_df.columns:
    try:
        super_schema[col] = pandas_schema["dynamic"]["super_table"][col]
    except KeyError as e:
        print(f"KeyError in {str(pickle_path.stem)}: {e}")

In [14]:
list(super_df.dtypes[super_df.dtypes == 'object'].index)

['icu', 'imc', 'ed', 'procedure']

In [30]:
for pickle_path in tqdm(pickle_paths[:10_000]):
    encounter_pickle = load_pickle(pickle_path)
    super_df = encounter_pickle["super_table"]
    for key in set(pandas_schema["dynamic"]["super_table"].keys()) - set(
    super_df.columns
):
        # TODO: Log these events
        super_df[key] = None
        super_df[key] = super_df[key].astype(
            pandas_schema["dynamic"]["super_table"][key]
        )
    # The covid column in the Grady pickles has mixed datatypes and needs to be cast
    super_df['covid'] = super_df['covid'].replace('Positive',1.0).replace('Negative',0.0).astype(float)
    
    # mtp column in grady is a string and needs to be converted to float
    super_df['mtp'] = super_df['mtp'].replace('COMPLETE',1.0).astype(float)
    
    super_df.astype(super_schema)

  1%|          | 85/10000 [00:03<07:39, 21.57it/s]


KeyboardInterrupt: 

In [25]:
list(super_df.dtypes[super_df.dtypes == 'object'].index)

['mtp', 'icu', 'imc', 'ed', 'procedure', 'csn', 'pat_id']

In [27]:
super_df['mtp'].unique()

array(['COMPLETE', nan], dtype=object)

In [24]:
for col in tqdm(list(super_df.dtypes[super_df.dtypes == 'object'].index)):
    super_df[col].astype(float)

  0%|          | 0/7 [00:00<?, ?it/s]


ValueError: could not convert string to float: 'COMPLETE'

In [None]:
def debug_super(fp, schema=arrow_static_schema):
    try:
        static_table = pq.read_table(fp,
            schema=schema)
    except pa.ArrowInvalid as e:
        print(f"ERROR RAISED :: {fp.stem}", str(e))
        return (fp.stem, str(e))

## Move all the malformed static parquets elsewhere for inspection

In [None]:
from functools import reduce

In [None]:
from tqdm.auto import tqdm

In [None]:
import shutil

In [None]:
input_path = Path('/opt/bmi-585r/KLAB_SAIL/MODSPhenotypes/data/2022_08_01/extraction/emory/')

In [None]:
years = set(range(2014, 2022)) - set([2015])

In [None]:
static_schema = (
    reduce(lambda a, b: {**a, **b}, [arrow_schema['static'][k] for k in static_keys])
    |
    reduce(lambda a, b: {**a, **b}, [arrow_schema['static']['times'][k] for k in times_keys])
    )

arrow_static_schema = make_arrow_schema(static_schema)

In [None]:
def __debug_static(fp, schema=arrow_static_schema):
    for i in range(len(schema)):
        # print(i, arrow_static_schema[i].name)
        try:
            static_table = pq.read_table(fp,
                schema=schema,
                columns=[schema[i].name]
            )
        except pa.ArrowInvalid as e:
            print(f"ERROR RAISED :: {fp.stem}", i, schema[i].name, str(e))
            return (fp.stem, e)
        
def debug_static(fp, schema=arrow_static_schema):
    try:
        static_table = pq.read_table(fp,
            schema=schema)
    except pa.ArrowInvalid as e:
        print(f"ERROR RAISED :: {fp.stem}", str(e))
        return (fp.stem, str(e))

---

In [None]:
static_fps = []
yr = 2014

for year in tqdm([yr]):
    static_fps += find_files(source_path=input_path/'static_df'/str(year), ext='.parquet')

In [None]:
results = [x for x in run_imap_multiprocessing(debug_static,
                                               static_fps,
                                               num_processes=180) if x is not None]

In [None]:
error_df = pd.DataFrame(results, columns=['csn','error'])
error_df['error'] = error_df.error.astype(str)
error_df.groupby('error').count()

In [None]:
bad_fps=[Path(f"/opt/bmi-585r/KLAB_SAIL/MODSPhenotypes/data/2022_08_01/extraction/emory/static_df/{str(year)}/{r[0]}.parquet") for r in results]

In [None]:
malform_path = Path(f"/opt/bmi-585r/KLAB_SAIL/MODSPhenotypes/data/2022_08_01/extraction/emory/static_df_MALFORMED/{str(year)}/")
malform_path.mkdir(parents=True, exist_ok=True)

In [None]:
for fp in bad_fps:
    try:
        target_fp = malform_path/fp.with_suffix('.MALFORMED').name
        shutil.move(fp, target_fp)
    except FileNotFoundError as e:
        print(fp.stem)

In [None]:
pq.read_table(f"/opt/bmi-585r/KLAB_SAIL/MODSPhenotypes/data/2022_08_01/extraction/emory/static_df/{str(year)}/", schema=arrow_static_schema)

## Debugging schema issues

In [None]:
for i in range(len(arrow_static_schema)):
    try:
        static_table = pq.read_table('/opt/bmi-585r/KLAB_SAIL/MODSPhenotypes/data/2022_07_30/extraction/emory/static_df/2014/',
                                 schema=arrow_static_schema,
                                 columns=[arrow_static_schema[i].name])
    except:
        print(i, arrow_static_schema[i].name)

pq.read_table('/opt/bmi-585r/KLAB_SAIL/MODSPhenotypes/data/2022_07_30/extraction/emory/static_df/2014/',
                                 schema=arrow_static_schema,
                                 columns=['times_suspicion_sepsis3'])


## Workbench

In [None]:
%%time
from collections import Counter
numz = []
typez = Counter()

def get_stuffz(pickle_path):
    encounter_pickle = load_pickle(pickle_path)
    return get_typez(encounter_pickle), get_numz(encounter_pickle)

def get_typez(encounter_pickle):
    try:
        return type(encounter_pickle['flags']['ed_wait_time'])
    except KeyError:
        return KeyError
    
def get_numz(encounter_pickle):
    try:
        return float(encounter_pickle['flags']['ed_wait_time'])
    except KeyError:
        return KeyError
    
with Pool(processes=num_cpus) as pool:
    for typ, num in tqdm(
        pool.imap(func=get_stuffz, iterable=pickle_paths), total=len(pickle_paths)
    ):
        typez[typ] += 1
        numz.append(num)

In [None]:
display(typez)

In [None]:
import math

In [None]:
sams = list(filter(lambda n: not math.isnan(n), list(filter(lambda n: (n != KeyError), numz))))

In [None]:
pd.Series(sams).describe()

In [None]:
numz=pd.Series(numz)

In [None]:
numz.replace(KeyError, np.nan,)

In [None]:
encounter_pickle = load_pickle(pickle_paths[9413])
ed_wait_time = encounter_pickle['flags']['ed_wait_time']
ed_wait_time

In [None]:
pd.Timedelta(569.633333333, unit='min').seconds/60

In [None]:
if type(x) == pd.Timedelta:
    return float(x.seconds/60)
if type(x) == pd.NaT:
    return float('nan')

---
---
---

## Old Code

not currently used
```python
def cast_pandas_schema(df:pd.DataFrame, pandas_schema):
    schema = {}
    for col in df.columns:
        try:
            schema[col] = pandas_schema[col]
        except KeyError as e:
            print(e)
    df = df.astype(schema)
    return df
```

---