In [None]:
import glob
import math
import numpy as np
import os
import pandas as pd
import re
import shutil
import sys

In [None]:
data_path = '..../hirid_data'   #complete data path

## Reading using Pandas
Required python packages are `pandas` and `pyarrow`

### Reading a part

In [None]:
part = 4

In [None]:
df_part = pd.read_parquet(os.path.join(data_path, 'observation_tables', 'parquet', f'part-{part}.parquet'))
df_part['value'].count()

### Reading a specific patient

In [None]:
patientid = 3

In [None]:
def load_patient_index(path):
    df_ind = pd.read_csv(path)
    return { pid : part for (pid, part) in zip(df_ind['patientid'], df_ind['part'])}

pat_index = load_patient_index(os.path.join(data_path, 'observation_tables', 'observation_tables_index.csv'))

In [None]:
def load_patient(pid, data_path, pat_index):
    df_part = pd.read_parquet(os.path.join(data_path, 'observation_tables', 'parquet', f"part-{pat_index[pid]}.parquet"))
    
    return df_part.query(f'patientid == {pid}')

print ("Patient {} in partition {}.".format(patientid, pat_index[patientid]))
load_patient(3, data_path, pat_index)

### Stats over columns

`pandas` can also read several parts at the same time. To save memory, only the needed columns can be selected.

In [None]:
df_ph = pd.read_parquet(os.path.join(data_path, 'pharma_records', 'parquet'), columns=['pharmaid', 'givendose'])

In [None]:
df_ph.info()

In [None]:
# stats over augmentin doses
df_ph.query('pharmaid == 1000274')['givendose'].describe()

## Using spark

Access with `pyspark` (no need for a cluster)

In [None]:
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf

from pyspark.sql import functions as sf

In [None]:
def get_spark_session(cores, memory_per_executor):
    driver_mem = cores * memory_per_executor + 2000 # + some driver overhead
    
    cfg = (SparkConf().set("spark.driver.memory", "{}m".format(driver_mem)).
            set("spark.executor.memory", "{}m".format(memory_per_executor)).
            set("spark.master", "local[{}]".format(cores)).
            set("spark.sql.execution.arrow.enabled", True)
          )
    
    return (SparkSession.
             builder.
             config(conf=cfg).
             getOrCreate())

In [None]:
spark = get_spark_session(4, 1024)

### Stats over columns

In [None]:
df_obs = spark.read.parquet(os.path.join(data_path, 'observation_tables', 'parquet'))

In [None]:
# stats over weights (considering all parts)
df_obs.where('variableid == 10000400').select('value').summary().toPandas()