In [None]:
import os
from pathling import PathlingContext

#MIMIC_FHIR_PATH="../_data/mimic4-demo-ptl"
MIMIC_FHIR_PATH = "/Users/szu004/datasets/work/mimic-iv/mimic4-ptl"

In [None]:
os.environ['SPARK_CONF_DIR'] = os.path.abspath('../env/spark-conf')
pc = PathlingContext.create()
spark = pc.spark
print('spark.sql.ansi.enabled', spark.conf.get("spark.sql.ansi.enabled"))
print('spark.sql.timestampType', spark.conf.get("spark.sql.timestampType"))
data = pc.read.parquet(MIMIC_FHIR_PATH)

In [None]:
import glob
import json
import re

from typing import NamedTuple

class ViewCtx(NamedTuple):
    spark: 'SparkSession'
    ds: 'DataSource'


VIEW_CTX = ViewCtx(spark, data)

class SOFView:
    def __init__(self, view_def, name=None):
        self._view_def = view_def
        self._name = name or view_def['name']

    def run(self, view_ctx):
        print(f"Creating SOF view {self._name}")
        view = view_ctx.ds.view(json=json.dumps(self._view_def))
        view.cache().createOrReplaceTempView(self._name)

    def select(self, view_ctx):
        return view_ctx.spark.sql(f"SELECT * FROM {self._name}")

    @classmethod
    def from_file(cls, file_path):
        with open(file_path) as f:
            view_def = json.load(f)
        return cls(view_def)

for view_file in glob.glob('../src/sof/*.json'):
    sof_view = SOFView.from_file(view_file)
    sof_view.run(VIEW_CTX)


class SQLView:
    def __init__(self, sql, name, depends_on=None):
        self._sql = sql
        self._name = name

    def run(self, view_ctx):
        print(f"Creating SQL view {self._name}")
        view_ctx.spark.sql(self._sql)

    def select(self, view_ctx):
        return view_ctx.spark.sql(f"SELECT * FROM {self._name}")


    def cache(self, view_ctx):
        return view_ctx.spark.sql(f"CACHE LAZY TABLE {self._name};")


    def to_csv(self, view_ctx, file_path):
        self.select(view_ctx).toPandas().to_csv(file_path, index = False)

    @classmethod
    def from_file(cls, file_path):
        with open(file_path) as f:
            sql = f.read()
        match = re.search(r'CREATE OR REPLACE \w* VIEW (\w+) AS', sql)
        match = match or re.search(r'CREATE TABLE (\w+) AS', sql)
        if not match:
            raise ValueError(f"Cannot find view name in {file_path}")  
        return cls(sql, match.group(1))

for view_file in sorted(glob.glob('../src/sql/*.sql')):
    sof_view = SQLView.from_file(view_file)
    sof_view.run(VIEW_CTX)

SQLView.from_file('../src/sql/md_icustay_detail.sql').cache(VIEW_CTX)
SQLView.from_file('../src/sql/md_oxygen_delivery.sql').cache(VIEW_CTX)

#SQLView.from_file('../src/sql/md_icustay_detail.sql').select(VIEW_CTX).show()
#SQLView.from_file('../src/sql/dv_o2_delivery_device.sql').select(VIEW_CTX).show()


SQLView.from_file('../src/sql/xcoh_subject.sql').to_csv(VIEW_CTX, '../data/fhir_mimic-2.2/subject.csv')
SQLView.from_file('../src/sql/xcoh_x_reading_o2_flow.sql').to_csv(VIEW_CTX, '../data/fhir_mimic-2.2/reading_o2_flow.csv')