#### Processing the diagnoses/procedures/intensive care/blood tests data

In [None]:
### Import packages

import os
import polars as pl

In [None]:
# Specify the path
path = "/Users/jkv465/Desktop/Work_EWS/New_Data"
# Specify the working directory
os.chdir(path)
print("Current working directory: ", os.getcwd()) # And here we can check it

In [None]:
pl.scan_parquet("problemlist.parquet").collect_schema()

In [None]:
# Opening the dataset of diagnoses

diagnoses = pl.scan_parquet("diagnoses.parquet")

# Opening the dataset of basic demographics for individuals

bitabel = pl.scan_parquet("bitabel.parquet")

In [None]:
diagnoses.schema # The variables we are interested in

In [None]:
bitabel.schema

In [None]:
bitabel.collect(streaming=True).head(100)

# We can see that PAT_ID is the EnterpriseID with first letter being replaced by "Z" and the number being +1

In [None]:
# Replace all "NULL" string values into nulls

bitabel = bitabel.with_columns(
    pl.col(pl.String).replace("NULL",None)
)

In [None]:
# Convert date of birth into a proper datetime without UTC included

bitabel = bitabel.with_columns(
    pl.col("Dødstidspunkt").str.to_datetime("%Y-%m-%d %H:%M:%S %Z").alias("Dødstidspunkt"),
    pl.col("fødselsdato").dt.strftime("%Y-%m-%d %H:%M:%S %Z").alias("fødselsdato")
).with_columns(
    pl.col("fødselsdato").str.to_datetime("%Y-%m-%d %H:%M:%S %Z").alias("fødselsdato")
)

In [None]:
bitabel.collect()

In [None]:
# Save bitabel

bitabel.sink_parquet("bitabel_newest.parquet")


#### Datetime modifications in the Diagnoses dataframe


In [None]:
# Replace all "NULL" string values into nulls

diagnoses = diagnoses.with_columns(
    pl.col(pl.String).replace("NULL",None)
)

In [None]:
# Convert the strings which are dates into datetime variables (diagnoses dataframe)

diagnoses = diagnoses.with_columns(
    pl.col("Kontakt slut").str.to_datetime("%Y-%m-%d %H:%M:%S %Z").alias("Kontakt slut"),
    pl.col("Kontakt start").dt.strftime("%Y-%m-%d %H:%M:%S %Z").alias("Kontakt start")
).with_columns(
    pl.col("Kontakt start").str.to_datetime("%Y-%m-%d %H:%M:%S %Z").alias("Kontakt start")
)

In [None]:
# Implement PT_ID in diagnoses

# Define a function that takes the EnterpriseID column, replaces first letter of the string and adds 1 to the number remaining

def transform_column(column):
    return (
        column.str.replace("E", "Z", literal=True).str.slice(0, 1) + 
        (column.str.slice(1).cast(pl.Int64) + 1).cast(pl.Utf8)
    )

diagnoses = diagnoses.with_columns(
    transform_column(pl.col("EnterpriseID")).alias("PT_ID")
)

In [None]:
diagnoses.collect(streaming=True).head(100)

In [None]:
# Relocate PAT_ID to be after EnterpriseID
diagnoses = diagnoses.select([
    "EnterpriseID", 
    "PT_ID", 
    *[col for col in diagnoses.columns if col not in ["EnterpriseID", "PT_ID"]]
])

In [None]:
# Check the shape of the diagnoses dataframe

diagnoses.collect(streaming = True).shape

# The data has 67,467,961 million rows

In [None]:
# Check if there are NULLs in the dataframe

diagnoses.collect(streaming = True).null_count()

# We have some NA values in the admission numbers, we will drop these

In [None]:
# Drop the observations with missing hospitalization numbers (CSNs)

diagnoses = diagnoses.drop_nulls(subset="CSN")

In [None]:
# Let's check the shape of the diagnoses dataframe now

diagnoses.collect(streaming = True).shape

In [None]:
# Now I want to see if there are multiple diagnoses per CSN

csn_multiples = diagnoses.group_by(["CSN"]).agg(pl.col("Aktionsdiagnose kode").count()).collect(streaming=True)

# We see that there is a 1to1 correspondence of CSN and Action codes

In [None]:
# Use plotnine for plotting

from plotnine import *

In [None]:
csn_multiples = csn_multiples.rename({"Aktionsdiagnose kode":"Diagnosis_Codes"})

In [None]:
csn_multiples.describe()

In [None]:
mult_csn = diagnoses.collect(streaming=True).group_by(["CSN"]).agg(pl.col("Aktionsdiagnose kode").count())["Aktionsdiagnose kode"].value_counts().sort(by = "count",descending=True).lazy()

In [None]:
mult_csn.sort(by = "Aktionsdiagnose kode").collect(streaming=True)

In [None]:
# See how many we have duplicated in terms of percentage

mult_csn_perc = (mult_csn
.with_columns(pl.col("count")/pl.col("count").sum().alias("diagnosis_perc"))
)

In [None]:
mult_csn_perc.collect(streaming=True)

In [None]:
# Rename column to be more readable

mult_csn_perc = mult_csn_perc.sort(by = "Aktionsdiagnose kode").rename({"Aktionsdiagnose kode":"Diagnoses"})

In [None]:

bar_plot_multiples = (
    ggplot(mult_csn_perc.collect(streaming = True).filter(pl.col("count") >= 0.003),aes(x = "Diagnoses", y = "count", fill = "factor(Diagnoses)")) 
    + geom_col()
    + scale_fill_hue()
    + labs(fill = "Number of diagnoses per CSN", x = "", y = "Percentage")
    + theme(legend_position = "top")
).draw(show = False)

In [None]:
bar_plot_multiples

In [None]:
# Saving the figure now
# bar_plot_duplicates.savefig(fname="Diagnoses_Per_CSN.png",dpi=350)

In [None]:
# Check individuals with diagnoses per CSN

result = (
    diagnoses
    .group_by("CSN")
    .agg(pl.col("Aktionsdiagnose kode").count().alias("diagnosis_count"))
    .collect()
    .filter(pl.col("diagnosis_count") == 2)
)

In [None]:
result

In [None]:
# Final modification

diagnoses = diagnoses.select(["EnterpriseID","PT_ID","CSN","Department ID", "Aktionsdiagnose kode", "Aktionsdiagnose"])

In [None]:
diagnoses.collect(streaming=True).null_count()

In [None]:
# Save diagnoses dataframe: diagnoses.sink_parquet("diagnoses_newest.parquet")

### Opening the procedures 

In [None]:
procedures = pl.scan_parquet("procedures.parquet")

In [None]:
procedures.schema

In [None]:
# Let's add the PT_ID in the procedures

procedures = procedures.with_columns(
    transform_column(pl.col("EnterpriseID")).alias("PT_ID")
)

In [None]:
# Quick glimpse

procedures.head(50).collect(streaming = True)

In [None]:
# Relocate PAT_ID to be after EnterpriseID

procedures = procedures.select([
    "EnterpriseID", 
    "PT_ID", 
    *[col for col in procedures.columns if col not in ["EnterpriseID", "PT_ID"]]
])

In [None]:
# Rename some variables in procedures dataframe

procedures = (
    procedures
    .rename({"SKS-Kode":"SKS_Code"})
    .rename({"Udført procedurer dato":"Procedure_Date"})
)


In [None]:
# Convert procedure date into a proper datetime without UTC included

procedures = procedures.with_columns(
    pl.col("Procedure_Date").dt.strftime("%Y-%m-%d %H:%M:%S %Z").alias("Procedure_Date")
).with_columns(
    pl.col("Procedure_Date").str.to_datetime("%Y-%m-%d %H:%M:%S %Z").alias("Procedure_Date")
)

In [None]:
procedures.head(50).collect(streaming = True)

In [None]:
# Replace all "NULL" string values into nulls

procedures = procedures.with_columns(
    pl.col(pl.String).replace("NULL",None)
)

In [None]:
# Save it into a new parquet

procedures.sink_parquet("procedures_newest.parquet")

### Got access to the cookbook with the SKS_Codes

In [None]:
cookbook = pl.read_csv("Metadata Diagnostics V3.tsv", separator= '\t')

In [None]:
cookbook

In [None]:
# Replace all "NULL" string values into nulls
cookbook = cookbook.with_columns(
    pl.col(pl.String).replace("NULL",None)
)

In [None]:
# Save cookbook

cookbook.write_parquet("SKS_Codes_Cookbook.parquet")

In [None]:
diagnoses.group_by(["Aktionsdiagnose", "Aktionsdiagnose kode"]).agg(pl.len().alias("n")).sort("n", descending=True).collect(streaming=True)

In [None]:
# Plotting the top-10 most occuring diagnoses

(
    ggplot(diagnoses.collect(streaming = True)["Aktionsdiagnose"].value_counts(sort=True,name = "n").head(10).with_columns(pl.col("Aktionsdiagnose").cast(pl.Categorical)),aes(x = "Aktionsdiagnose", y = "n", fill = "Aktionsdiagnose"))
    + geom_col()
    + theme(axis_text_x=element_text(rotation=45, hjust=1))
    + theme(legend_position="none")
    + labs(x = "Most frequent diagnoses", y = "")
    )

### Intensive care unit data processing

In [None]:
ita = pl.scan_parquet("ita_respirator.parquet")

In [None]:
# Let's have an overview of the intensive care unit data
ita.schema

In [None]:
# Let's add the PT_ID in the intensive care unit dataframe

ita = ita.with_columns(
    transform_column(pl.col("EnterpriseID")).alias("PT_ID")
)

In [None]:
# Relocate PAT_ID to be after EnterpriseID

ita = ita.select([
    "EnterpriseID", 
    "PT_ID", 
    *[col for col in ita.columns if col not in ["EnterpriseID", "PT_ID"]]
])

In [None]:
ita.collect(streaming = True).head(50)

In [None]:
# I need to change the "NULL" into proper NAs

ita = ita.with_columns(
    pl.col(pl.String).replace("NULL",None)
)

In [None]:
# Convert the strings which are dates into datetime variables (diagnoses dataframe)

ita = ita.with_columns(
    pl.col("Respirator slut").str.to_datetime("%Y-%m-%d %H:%M:%S %Z").alias("Respirator slut"),
    pl.col("Respirator start").dt.strftime("%Y-%m-%d %H:%M:%S %Z").alias("Respirator start")
).with_columns(
    pl.col("Respirator start").str.to_datetime("%Y-%m-%d %H:%M:%S %Z").alias("Respirator start")
).with_columns(
    pl.col("ITA slut").str.to_datetime("%Y-%m-%d %H:%M:%S %Z").alias("ITA slut"),
    pl.col("ITA start").dt.strftime("%Y-%m-%d %H:%M:%S %Z").alias("ITA start")
).with_columns(
    pl.col("ITA start").str.to_datetime("%Y-%m-%d %H:%M:%S %Z").alias("ITA start")
)

In [None]:
ita.collect(streaming = True).head(50)

In [None]:
ita.sink_parquet("intensive_care.parquet")

### Opening the blood tests

In [None]:
tests = pl.scan_parquet("prøvesvar.parquet")

In [None]:
# Check the schema

tests.schema

In [None]:
tests.collect(streaming = True).head(50)

In [None]:
def transform_column(column):
    return (
        column.str.replace("E", "Z", literal=True).str.slice(0, 1) + 
        (column.str.slice(1).cast(pl.Int64) + 1).cast(pl.Utf8)
    )


In [None]:
# Creation of PT_ID column

tests = tests.rename({"V1":"EnterpriseID", "V2":"Blood_Test_Code", "V3":"Blood_Test_Name", "V4":"Blood_Test_Value", "V5":"Blood_Test_Status", "V6":"Blood_Test_Start", "V7":"Blood_Test_End"})

tests = tests.with_columns(
    transform_column(pl.col("EnterpriseID")).alias("PT_ID")
)


In [None]:
# Relocate PAT_ID to be after EnterpriseID

tests = tests.select([
    "EnterpriseID", 
    "PT_ID", 
    *[col for col in tests.collect_schema().names() if col not in ["EnterpriseID", "PT_ID"]]
])

In [None]:
# First let's convert the blood test dates into proper datetime objects

tests = tests.with_columns(
    pl.col("Blood_Test_Start").dt.strftime("%Y-%m-%d %H:%M:%S %Z").alias("Blood_Test_Start")
).with_columns(
    pl.col("Blood_Test_Start").str.to_datetime("%Y-%m-%d %H:%M:%S %Z").alias("Blood_Test_Start")
).with_columns(
    pl.col("Blood_Test_End").dt.strftime("%Y-%m-%d %H:%M:%S %Z").alias("Blood_Test_End")
).with_columns(
    pl.col("Blood_Test_End").str.to_datetime("%Y-%m-%d %H:%M:%S %Z").alias("Blood_Test_End")
)

In [None]:
# Check unique blood tests

tests.collect(streaming = True)["Blood_Test_Name"].unique()

# We have these unique blood test names


In [None]:
tests.collect(streaming = True)["Blood_Test_Code"].unique()

In [None]:
tests = tests.collect(streaming = True).pivot(values="Blood_Test_Value",index=["EnterpriseID", "PT_ID", "Blood_Test_Status", "Blood_Test_Start", "Blood_Test_End"],on="Blood_Test_Name",aggregate_function="first").lazy()

In [None]:
tests = tests.sort(["PT_ID","Blood_Test_Start"])

In [None]:
tests = tests.filter(pl.col("Blood_Test_Status") == "Endelig")

In [None]:
tests.collect(streaming = True).head(50)

In [None]:
# Check unusual values in the blood tests

columns_to_analyse = tests.columns[-11:]

In [None]:
results = {}

for column in columns_to_analyse:
    filtered = tests.collect(streaming=True)[column].value_counts(sort = True).lazy().filter(pl.col(column).str.contains(r"^[A-Za-z]")).collect(streaming = True)

    results[column] = filtered


In [None]:
results["Alanintransaminase [ALAT];P"]

In [None]:
results["Albumin;P"]

In [None]:
results["C-reaktivt protein [CRP];P"]

In [None]:
results["Hæmoglobin;B"]

In [None]:
results["Kreatinin;P"]

In [None]:
results["Laktat;P(aB)"]

In [None]:
results["Laktat;P(vB)"]

In [None]:
results["Laktatdehydrogenase [LDH];P"]

In [None]:
results["Leukocytter;B"]

In [None]:
results["Trombocytter;B"]

In [None]:
results["Troponin T;P"]

In [None]:
results_v2 = {}

for column in columns_to_analyse:
    filtered_v2 = tests.collect(streaming=True)[column].value_counts(sort = True).lazy().filter(pl.col(column).str.contains(r"^(<|>|=)")).collect(streaming = True)

    results_v2[column] = filtered_v2


In [None]:
results_v2["Alanintransaminase [ALAT];P"]

In [None]:
results_v2["Albumin;P"]

In [None]:
results_v2["C-reaktivt protein [CRP];P"]

In [None]:
results_v2["Hæmoglobin;B"]

In [None]:
results_v2["Kreatinin;P"]

In [None]:
results_v2["Laktat;P(aB)"]

In [None]:
results_v2["Laktat;P(vB)"]

In [None]:
results_v2["Laktatdehydrogenase [LDH];P"]

In [None]:
results_v2["Leukocytter;B"]

In [None]:
results_v2["Trombocytter;B"]

In [None]:
results_v2["Troponin T;P"]

In [None]:
# We need to impute the values of blood tests starting with a letter to NULL

last_11_cols = tests.columns[-11:]

In [None]:
for col in last_11_cols:
    tests = tests.with_columns(
        pl.when(pl.col(col).str.contains(r"^[A-Za-z]"))
        .then(None)
        .otherwise(pl.col(col))
        .alias(col))

In [None]:
tests = tests.lazy()

In [None]:
# Save the tests

tests.sink_parquet("blood_tests_newest.parquet")