In [2]:
from polars_utils import *
from folder import StandardFolder
from pathlib import Path
import polars as pl
from datetime import datetime

hn = pl.read_csv('D:/Prut/Warehouses/output/Dec23/n/Stroke/stroke_n_updated_05032024.csv').to_series().to_list()

print(len(hn))

28810


In [3]:
class StrokeWarehouse(StandardFolder):
    def __init__(self, hn_list: list[str], folder: str, streaming: bool = True) -> None:
        super().__init__(folder)
        self.hn_list = hn_list
        self.streaming = streaming
        self.lab_conversion = pl.read_csv('../std/lab_conversion.csv')
        self.meds_to_select = pl.read_csv('../std/med_groups_prut.csv')
        self.export_folder = Path('../output/Dec23/wh/complete')
        self.ran_all = False

    def get_labs(self, selected_labs: list[str] = ['Glucose', 'Creatinine', 'LDL', 'HDL', 'Triglyceride']):
        folder_path = self.lab
        to_concat = []
        for path in list(folder_path.iterdir()):
            file = (
                scan_file(path)
                .select(pl.col(['ENC_HN', 'REPORT_DATE', 'SHORT_TEST', 'UNIT', 'RESULT_VAL']))
                # Select HN
                .filter(pl.col('ENC_HN').is_in(self.hn_list))
                # Select wanted labs (on SHORT_TEST)
                .pipe(identify_in_list, col_name='SHORT_TEST', criteria=selected_labs)
                # Parse dates
                .pipe(parse_dates, 'REPORT_DATE')
                # Create a new column which is name+units
                .with_columns(pl.concat_str('SHORT_TEST', 'UNIT', separator=',').alias('name_with_units'))
                .filter(pl.col('name_with_units').is_in(self.lab_conversion.to_series()))
                .join(self.lab_conversion.lazy(), how='left', on='name_with_units')
                .rename({'new_name': 'Lab'})
                # Clean lab values
                .with_columns(pl.col('RESULT_VAL').str.extract(r'^\d+\.\d+|\d+$', 0).cast(pl.Float32))
                .with_columns(pl.col('RESULT_VAL').str.replace(r'\.{2,}', r'\.'))
                .with_columns(pl.col('RESULT_VAL').str.strip_chars('<>.,()/\\\'"'))
                .filter(pl.col('RESULT_VAL').str.contains('^[0-9.]+$'))
                # Multiply to standardise due to different units
                .with_columns(pl.col('RESULT_VAL').mul(pl.col('mul_factor')))
            )

            # collect
            file = file.collect(streaming=self.streaming)

            # Pivot labs
            file = file.pivot(values='RESULT_VAL', index=['ENC_HN', 'REPORT_DATE'], columns='Lab', aggregate_function='max')

            # Rename a bit
            file = file.rename({'REPORT_DATE': 'D001KEY'})

            # Append
            to_concat.append(file)

        self.lab_df = pl.concat(to_concat, how='diagonal_relaxed').unique()
        
    def get_visits(self):
        folder_path = self.visit
        to_concat = []
        for path in folder_path.iterdir():
            file = (
                scan_file(path)
                .filter(pl.col('ENC_HN').is_in(self.hn_list))
                .select(pl.col(['ENC_HN', 'D001KEY', 'D108KEY']))
                .pipe(parse_dates, 'D001KEY')
            )
            to_concat.append(file.collect(streaming=self.streaming))
        self.visit_df = pl.concat(to_concat).unique()

    def get_deaths(self):
        folder_path = self.deaths
        to_concat = []
        for path in folder_path.iterdir():
            file = (
                scan_file(path)
                .filter(pl.col('ENC_HN').is_in(self.hn_list))
                .select(pl.col('ENC_HN', 'D001KEY')).pipe(parse_dates, 'D001KEY').rename({'D001KEY': 'Death_date'})
            )
            to_concat.append(file.collect(streaming=self.streaming))
        self.deaths_df = pl.concat(to_concat).unique()

    def get_dx(self, select: list = None):
        folder_path = self.dx
        to_concat = []
        for path in folder_path.iterdir():
            file = (
                scan_file(path)
                .filter(pl.col('ENC_HN').is_in(self.hn_list))
                .select(pl.col(['ENC_HN', 'D001KEY', 'D035KEY']))
                .pipe(parse_dates, 'D001KEY')
            )
            if select is not None:
                file = file.filter(pl.col('D035KEY').is_in(select))
            file = file.group_by(pl.col(['ENC_HN', 'D001KEY'])).agg(pl.col('D035KEY')).with_columns(pl.col('D035KEY').list.unique().list.sort().list.join(', '))
            to_concat.append(file.collect(streaming=self.streaming))
        self.dx_df = pl.concat(to_concat).unique()

    def get_meds(self):
        folder_path = self.bill
        select = self.meds_to_select.to_series(1).to_list()
        self.meds_to_select = self.meds_to_select.lazy()
        to_concat = []
        for path in folder_path.iterdir():
            file = (
                scan_file(path)
                .filter(pl.col('ENC_HN').is_in(self.hn_list)))
            if {'PER_DATE_2', 'SERVICE_ID', 'CAL_SER_AMT'}.issubset(file.columns):
                file = file.rename({'PER_DATE_2': 'D001KEY', 'SERVICE_ID': 'D033KEY', 'CAL_SER_AMT': 'M1022'})
            file = (
                file
                .select(pl.col(['ENC_HN', 'D001KEY', 'D033KEY', 'M1022']))
                .pipe(parse_dates, 'D001KEY')
            )

            if select is not None:
                file = (
                    file
                    .filter(pl.col('D033KEY').is_in(select))
                    .join(self.meds_to_select.select(pl.col(['Drug_code', 'Class'])), left_on='D033KEY', right_on='Drug_code', how='left')
                )

            to_concat.append(file.collect(streaming=self.streaming))
        
        # pivot
        self.meds_df = pl.concat(to_concat)
        self.meds_df = self.meds_df.pivot(index=['ENC_HN', 'D001KEY'], values='M1022', columns='Class', aggregate_function='max').unique().with_columns(pl.col(['antihypertensive','antidiabetic','antidyslipidemic','anticlotting']).is_not_null())



    def get_vs(self):
        folder_path = self.vs
        rename_long = {'RECORD': 'D001KEY', 'DIA': 'DBP', 'SYS': 'SBP', 'HRBP': 'HR'}
        rename_wide = {'RECORDDATE': 'D001KEY', 'HIGH': 'HEIGHT', 'BW': 'WEIGHT'}
        select = ['ENC_HN', 'D001KEY', 'SBP', 'DBP', 'HR', 'WEIGHT', 'HEIGHT', 'BMI']

        to_concat = []
        for path in folder_path.iterdir():
            file = (
                scan_file(path)
                # .filter(pl.col('ENC_HN').is_in(self.hn_list))
                )

            # Must collect before pivot
            file = file.collect(streaming=self.streaming)
            
            # There are two formats, wide and long
            if len(file.columns) < 6:
                file = file.pivot(values = 'TEST_VALUE', index = ['ENC_HN', 'RECORD'], columns='TEST_NAME')
                file = file.rename(rename_long)
            else:
                file = file.rename(rename_wide)

            file = file.pipe(parse_dates, 'D001KEY').select(select).filter(pl.col('ENC_HN').is_in(self.hn_list))
            to_concat.append(file)

        
        self.vs_df = pl.concat(to_concat).unique()


    def get_demo(self):
        folder_path = self.demo
        cols = ['ENC_HN', 'D020AT3', 'H2L1KEY', 'H6L1KEY', 'H6L1DES']
        new_col_names = ['ENC_HN', 'DOB', 'Sex', 'Province_ID', 'Province_Thai']
        to_concat = []
        for path in folder_path.iterdir():
            file = scan_file(path)
            if set(cols).issubset(set(file.columns)):
                file = file.select(cols).collect(streaming=self.streaming).pipe(parse_dates, 'D020AT3') # New bug: only works in dataframes, so must collect first
                to_concat.append(file)
        self.demo_df = pl.concat(to_concat).unique()
        self.demo_df = self.demo_df.rename(dict(zip(cols, new_col_names)))


    def run_all(self):
        print('start')
        self.get_meds()
        print('meds')
        self.get_vs()
        print('vs')
        self.get_labs()
        print('labs')
        self.get_visits()
        print('visit')
        self.get_deaths()
        print('deaths')
        self.get_dx()
        print('dx')
        self.get_demo()
        print('demo')
        
        self.ran_all = True

    def merge(self):
        if not self.ran_all:
            raise Exception('Please run all first.')
        
        self.merged_df = (
            self.visit_df
            .join(self.vs_df, on=['ENC_HN', 'D001KEY'], how='outer_coalesce')
            .join(self.dx_df, on=['ENC_HN', 'D001KEY'], how='outer_coalesce')
            .join(self.meds_df, on=['ENC_HN', 'D001KEY'], how='outer_coalesce')
            .join(self.lab_df, on=['ENC_HN', 'D001KEY'], how='outer_coalesce')
            .join(self.deaths_df, on=['ENC_HN'], how='outer_coalesce')
            .join(self.demo_df, on=['ENC_HN'], how='left')
            .unique()
        )

        # Final Column Rename

        self.merged_df = self.merged_df.rename({'D001KEY': 'Date', 'D035KEY': 'ICD10', 'D108KEY': 'Site'})
    
    def export(self) -> None:
        if not self.ran_all:
            raise Exception('Please run all first.')
        self.file_name = self.export_folder / f'Stroke_warehouse_exported_{datetime.now().strftime('%d-%m-%Y')}'
        print(f'Exporting {self.file_name}')
        print(f'n = {self.merged_df['ENC_HN'].n_unique()}')
        self.merged_df.write_parquet(self.file_name.with_suffix('.parquet.gzip'), compression='gzip')
        print('Exported parquet')
        self.merged_df.write_csv(self.file_name.with_suffix('.csv'))
        print('Exported csv')
        print('Complete')


In [4]:
s = StrokeWarehouse(hn_list=hn, folder='D:/Datalake/Data/20231231_fu_nc')
s.run_all()
s.merge()
s.export()

readme not included.
start
meds
vs
labs
visit
deaths
dx
demo
Exporting ..\output\Dec23\wh\complete\Stroke_warehouse_exported_05-03-2024
n = 28810
Exported parquet
Exported csv
Complete
