<a href="https://colab.research.google.com/github/KarinaKatke/HiRiD/blob/main/getting_started.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [4]:
import glob
import math
import numpy as np
import os
import pandas as pd
import re
import shutil
import sys

In [5]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


In [12]:
data_path = '/content/drive/My Drive/HiRID/raw_stage'

In [15]:
import tarfile

file_path = '/content/drive/My Drive/HiRID/raw_stage/observation_tables_parquet.tar.gz'
extract_path = '/content/observation_tables'

with tarfile.open(file_path, 'r:gz') as tar:
    tar.extractall(path=extract_path)

In [23]:
import os

extracted_files = os.listdir(extract_path)
print(extracted_files)

['observation_tables']


In [28]:

all_parquet_files = []
for root, dirs, files in os.walk(observation_table_path):
    for file in files:
        if file.endswith('.parquet'):
            all_parquet_files.append(os.path.join(root, file))

print(all_parquet_files)

['/content/observation_tables/observation_tables/parquet/part-228.parquet', '/content/observation_tables/observation_tables/parquet/part-135.parquet', '/content/observation_tables/observation_tables/parquet/part-188.parquet', '/content/observation_tables/observation_tables/parquet/part-122.parquet', '/content/observation_tables/observation_tables/parquet/part-18.parquet', '/content/observation_tables/observation_tables/parquet/part-62.parquet', '/content/observation_tables/observation_tables/parquet/part-8.parquet', '/content/observation_tables/observation_tables/parquet/part-185.parquet', '/content/observation_tables/observation_tables/parquet/part-206.parquet', '/content/observation_tables/observation_tables/parquet/part-86.parquet', '/content/observation_tables/observation_tables/parquet/part-154.parquet', '/content/observation_tables/observation_tables/parquet/part-221.parquet', '/content/observation_tables/observation_tables/parquet/part-19.parquet', '/content/observation_tables/o

In [None]:
for parquet_file in all_parquet_files:
    df = pd.read_parquet(parquet_file)
    print(f"Reading file: {parquet_file}")
    print(df.columns)

## Reading using Pandas
Required python packages are `pandas` and `pyarrow`

### Reading a part

In [13]:
part = 4

In [14]:
df_part = pd.read_parquet(os.path.join('observation_tables', 'parquet', f'part-{part}.parquet'))
#df_part['value'].count()
print(df_part.columns)

FileNotFoundError: [Errno 2] No such file or directory: 'observation_tables/parquet/part-4.parquet'

### Reading a specific patient

In [None]:
patientid = 3

In [None]:
def load_patient_index(path):
    df_ind = pd.read_csv(path)
    return { pid : part for (pid, part) in zip(df_ind['patientid'], df_ind['part'])}

pat_index = load_patient_index(os.path.join(data_path, 'observation_tables', 'observation_tables_index.csv'))

In [None]:
def load_patient(pid, data_path, pat_index):
    df_part = pd.read_parquet(os.path.join(data_path, 'observation_tables', 'parquet', f"part-{pat_index[pid]}.parquet"))

    return df_part.query(f'patientid == {pid}')

print ("Patient {} in partition {}.".format(patientid, pat_index[patientid]))
load_patient(3, data_path, pat_index)

### Stats over columns

`pandas` can also read several parts at the same time. To save memory, only the needed columns can be selected.

In [None]:
df_ph = pd.read_parquet(os.path.join(data_path, 'pharma_records', 'parquet'), columns=['pharmaid', 'givendose'])

In [None]:
df_ph.info()

In [None]:
# stats over augmentin doses
df_ph.query('pharmaid == 1000274')['givendose'].describe()

## Using spark

Access with `pyspark` (no need for a cluster)

In [None]:
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf

from pyspark.sql import functions as sf

In [None]:
def get_spark_session(cores, memory_per_executor):
    driver_mem = cores * memory_per_executor + 2000 # + some driver overhead

    cfg = (SparkConf().set("spark.driver.memory", "{}m".format(driver_mem)).
            set("spark.executor.memory", "{}m".format(memory_per_executor)).
            set("spark.master", "local[{}]".format(cores)).
            set("spark.sql.execution.arrow.enabled", True)
          )

    return (SparkSession.
             builder.
             config(conf=cfg).
             getOrCreate())

In [None]:
spark = get_spark_session(4, 1024)

### Stats over columns

In [None]:
df_obs = spark.read.parquet(os.path.join(data_path, 'observation_tables', 'parquet'))

In [None]:
# stats over weights (considering all parts)
df_obs.where('variableid == 10000400').select('value').summary().toPandas()