# Data cleaning tutorial

## Setup session

In [None]:
from databricks.sdk.core import Config
from databricks.sdk import WorkspaceClient

config = Config(
    profile    = 'access', # arbitrary config profile name
    host       = '----',   # fill this in
    token      = '----',   # fill this in
    cluster_id = '----'    # fill this in
    warehouse_id = '----'    # fill this in
)

w = WorkspaceClient(config=config)

In [None]:
from databricks.sdk import WorkspaceClient

w = WorkspaceClient(config=config)

In [None]:
w.clusters.start(cluster_id=config.cluster_id)
w.warehouses.start_and_wait(id=config.warehouse_id).as_dict()['state']
w.clusters.wait_get_cluster_running(cluster_id=config.cluster_id).as_dict()['state']

Import tools

In [None]:
import pyspark.sql.functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import FloatType, IntegerType, StringType

import re

In [None]:
def getFirstNumber(x: str):
    import re
    #tmp = ['29,84 mL/p/1.73m2', 'lásd megj.', '>-90 mL/p/1.73m2']
    try:
        return float(re.findall(r"[-+]?(?:\d*\.*\d+)", x.split(' ')[0].replace(',', '.').strip('.'))[0])
    except:
        return float(-1111.0001)
    
def getNormRange(x: str):
    import re
    #tmp = ['29,84 mL/p/1.73m2', 'lásd megj.', '>-90 mL/p/1.73m2']
    if x is None:
        return None, None
    x = x.strip(' ')
    if x.startswith('- '): x = x.replace('- ', '<')
    if x.endswith('-') or x.endswith(' -'): x = '>'+x
    parts = x.replace(', ', ' ').replace(' - ', ' ').replace('- ', ' ').replace(' -', ' ').split(' ')
    modif = parts.copy()
    for i in range(len(parts)):
        parts[i] = getFirstNumber(parts[i])
    modif = [re.sub(r'[0-9]', '', modif[k].replace('.','').replace('-','').replace(',','')) for k in range(len(parts)) if parts[k] != -1111.0001]
    parts = [parts[k] for k in range(len(parts)) if parts[k] != -1111.0001]
    if len(parts) == 2:
        if modif[0] not in ['<', '>']: modif[0] = '>'
        if modif[1] not in ['<', '>']: modif[1] = '<'
    return modif, parts

def getNormUpper(x: str):
    r_mod, r_num = getNormRange(x)
    try:
        return max([r_num[i] for i in range(len(r_num)) if r_mod[i]=='<'])
    except:
        import numpy as np
        return np.nan

def getNormLower(x: str):
    r_mod, r_num = getNormRange(x)
    #if len(r_num) == 2 and min(r_num) == max(r_num):
    #    return None
    try:
        return min([r_num[i] for i in range(len(r_num)) if r_mod[i]=='>'])
    except:
        import numpy as np
        return np.nan

rawToValue = F.udf( lambda x : getFirstNumber(x), FloatType() )
normUpper = F.udf( lambda x : getNormUpper(x), FloatType() )
normLower = F.udf( lambda x : getNormLower(x), FloatType() )
removeNumbersFromString = F.udf( lambda x : re.sub(r'[0-9]', '', x) )

In [None]:
from pyspark.sql.connect.dataframe import DataFrame

class extendSparkDataFrameMixin(object):
    @classmethod
    def ext(cls, obj):
        obj.__class__ = cls
        return(obj)

class DataFrameExt(DataFrame, extendSparkDataFrameMixin):
    
    def check(self, n=20, **kwargs):
        print(self.count())
        self.show(n=n, **kwargs)

    def duplicateRecords(self, *cols):
        if len(cols) == 1:
            cols = cols[0]
        not_duplicate_records = self.groupBy(cols).count().where('count = 1').drop('count')
        duplicate_records = self.join(not_duplicate_records, on=cols, how='left_anti')
        return type(self).ext(duplicate_records) #DataFrameExt(duplicate_records, self._session)

    def toDate(self, col):
        return type(self).ext( self.withColumn(col, F.when( F.col(col).rlike('\d{4}-\d{1,2}-\d{1,2}'), F.to_date(col, 'yyyy-M-d') )) )


## Query data

In [None]:
from databricks.connect import DatabricksSession

spark = DatabricksSession.builder.sdkConfig(config).getOrCreate()

In [None]:
df_lab = spark.sql(""" SELECT * FROM ds_msc_2024_spring.medication.labor WHERE requested_gfr = 1 """)
df_med = spark.sql(""" SELECT * FROM ds_msc_2024_spring.medication.medication """)
df_pat = spark.sql(""" SELECT * FROM ds_msc_2024_spring.medication.patients """)
df_pre = spark.sql(""" SELECT * FROM ds_msc_2024_spring.medication.prescription """)

## Data cleaning

### Patient table

In [None]:
spark.sql(""" DESCRIBE TABLE ds_msc_2024_spring.medication.patients """).show()
df_pat.show(n=5)
df_pat.count()

Look for duplicates and other anomalies in *'patient_id'*

In [None]:
duplicate_records = DataFrameExt.ext( df_pat ).duplicateRecords(['patient_id']).orderBy('patient_id')
duplicate_records.show(20)

Remove duplicates with anti-join

In [None]:
df_pat_cleaned = df_pat.join(duplicate_records, on='patient_id', how='left_anti').orderBy('patient_id')
df_pat_cleaned.count()

### Medication table

In [None]:
spark.sql(""" DESCRIBE TABLE ds_msc_2024_spring.medication.medication """).show()
df_med.show(n=5)
df_med.count()

Look for duplicates

In [None]:
duplicate_records = DataFrameExt.ext( df_med ).duplicateRecords('standardized_name')
duplicate_records.show()

### Prescription table

In [None]:
spark.sql(""" DESCRIBE TABLE ds_msc_2024_spring.medication.prescription """).show()
df_pre.show(n=5)
df_pre.count()

Remove records with *'patient_id'* not in the **Patients table**

In [None]:
df_pre_cleaned = df_pre.join(df_pat_cleaned, on='patient_id', how='inner').drop('year_of_birth', 'sex')
#df_pre_cleaned.show(5)
df_pre_cleaned.count()

Look for duplicates, and keep only the first prescription date

In [None]:
PK = ['patient_id', 'standardized_name']

In [None]:
duplicate_records = DataFrameExt.ext( df_pre_cleaned ).duplicateRecords(PK)
duplicate_records.show()
duplicate_records.count()

In [None]:
PK = ['patient_id', 'standardized_name', 'from_date']
df_pre_cleaned = df_pre_cleaned.orderBy(PK).dropDuplicates(['patient_id', 'standardized_name'])
df_pre_cleaned.count()

### Labor table - GFR only

In [None]:
spark.sql(""" DESCRIBE TABLE ds_msc_2024_spring.medication.labor """).show()
df_lab.show(n=5)
df_lab.count()

#### "Know your data"

Show some descriptive statistics

In [None]:
df_lab.groupBy('department').count().show()
df_lab.groupBy('mrkeyword').count().show()
df_lab.groupBy('description').count().show()
df_lab.groupBy('normal_range').count().show()
df_lab.groupBy('unit').count().show()

#### Check data that are not explained trivially

Duplicates for different set of keys

In [None]:
PK_examdate = ['patient_id', 'examination_date']
PK_dept = ['patient_id', 'examination_date', 'department']

In [None]:
not_duplicate_examdate = df_lab.groupBy(PK_examdate).count().where('count = 1').drop('count')
duplicate_examdate = df_lab.join(not_duplicate_examdate, on=PK_examdate, how='left_anti').orderBy(PK_examdate)
duplicate_examdate.show(n=5)
print(duplicate_examdate.count())

not_duplicate_dept = df_lab.groupBy(PK_dept).count().where('count = 1').drop('count')
duplicate_dept = df_lab.join(not_duplicate_dept, on=PK_dept, how='left_anti').orderBy(PK_examdate)
duplicate_dept.show(n=5)
print(duplicate_dept.count())

duplicate_diff_dept = duplicate_examdate.join(duplicate_dept, on=PK_dept, how='left_anti').orderBy(PK_dept)
duplicate_diff_dept.show(n=5)
duplicate_diff_dept.count()

Check anomalies

In [None]:
df_lab.filter(df_lab['normal_range'].isNull()).show(25)

In [None]:
df_lab.filter(df_lab['unit'].isNull()).show()
df_lab.filter(df_lab['unit'].isNull()).join(df_lab.filter(df_lab['rawvalue'] == "Nem ért."), on=PK_dept, how='left_anti').show()

#### Drop records

- Where *'unit'* is null, no observation is available
- Drop records with *'patient_id'* not in **Patients table**

In [None]:
df_lab_cleaned = df_lab.na.drop(subset=['unit'])

In [None]:
df_lab_cleaned = df_lab_cleaned.join(df_pat_cleaned, on='patient_id', how='inner').orderBy(PK_examdate).drop('year_of_birth', 'sex')
df_lab_cleaned.count()

#### Transform unclean data


Extract observation from *'rawvalue'*

In [None]:
df_lab_cleaned = df_lab_cleaned.withColumn('value', rawToValue(F.col('rawvalue'))).filter(F.col('value') != -1111.0001)

Correct normal range and convert to numeric data

In [None]:
df_lab_cleaned = df_lab_cleaned.withColumn('norm_minimum', normLower(F.col('normal_range'))) \
                               .withColumn('norm_maximum', normUpper(F.col('normal_range')))

In [None]:
df_lab_cleaned.show()

In [None]:
df_lab_cleaned.filter(df_lab_cleaned['normal_range'].isNull()).show(25)

Add categories of numerical data

Stage of kidney disease from clinical experts:

    Stádium Jellemző GFR ml/perc /1,73m2 Gyakoriság (%)

        1. vesebetegség norm. v. magas GFR-rel >90 3,3

        2. enyhe vesebetegség csökkent GFR-rel 60-89 3,0

        3. mérsékelt veseelégtelenség 30-59 4,3

        4. súlyos veseelégtelenség 15-29 0,2

        5. végstádiumú veseelégtelenség <15 v. dialízis 0,1


In [None]:
gfr_ranges = {'norm_high': [90.0, 100000.0],
              'mild': [60.0, 90.0],
              'moderate': [30.0, 60.0],
              'severe': [15.0, 30.0],
              'failure': [-1000000.0, 15.0]}

gfrToStage = F.udf(lambda x : [k for k in gfr_ranges.keys() if x >= gfr_ranges[k][0] and x < gfr_ranges[k][1]][0])

In [None]:
df_lab_cleaned = df_lab_cleaned.withColumn('stage', gfrToStage(F.col('value')))

Drop measurements for patients on the same date with the same stage of kidney malfunction

Note: other conditions may arise (e.g. keep the highest GFR value from each examination), which need prior consultation with field experts, or the customer.

In [None]:
PK_stage = ['patient_id', 'examination_date', 'stage']
df_lab_cleaned = df_lab_cleaned.dropDuplicates(subset=PK_stage)
#df_lab_cleaned.show(5)

Drop unnecessary columns

In [None]:
df_lab_cleaned = df_lab_cleaned.drop('description', 'observation', 'unit', 'normal_range', 'comment', 'rawvalue', 'requested_gfr')
#df_lab_cleaned.show(5)

The *'examination_date'* column should be 'date' type instead of string. Look for any anomalies

In [None]:
df_lab_cleaned.withColumn('datedelim', removeNumbersFromString(df_lab_cleaned["examination_date"])).groupBy('datedelim').count().show()

In [None]:
df_lab_cleaned = df_lab_cleaned.withColumn('examination_date', F.when(F.col('examination_date').rlike('\d{4}-\d{1,2}-\d{1,2}'), F.to_date('examination_date', 'yyyy-M-d')))
df_lab_cleaned.printSchema()

#### Check final table

In [None]:
df_lab_cleaned.show()

In [None]:
df_lab_cleaned.select([F.count(F.when(
    F.isnan(F.col(each_col)) | \
    (F.col(each_col) == "") | \
    F.isnull(F.col(each_col)) | \
    (F.lower(F.col(each_col)) == "null"), 1 \
)).alias(each_col) for each_col in ['patient_id', 'department', 'mrkeyword', 'value']]).show()

### Statistic table - create new table for statistical analysis

- Patient demography
- GFR value (minumum, or most severe GFR)
- Kidney disease stage
- Medications prescribed

Get GFR values and stages

Get prescriptions for patients

Join tables

Export table to csv for later use

## Create clean database on Databricks

New database

In [None]:
database_name='medication_acs'
catalog_name='ds_msc_2024_spring'

In [None]:
w.schemas.create(name=database_name, catalog_name=catalog_name, comment='First pass of data cleaning of ds_msc_2024_spring.medication database')

Register tables for SQL

In [None]:
df_pat_cleaned.registerTempTable('c1_patients')
df_med.registerTempTable('c1_medication')
df_pre_cleaned.registerTempTable('c1_prescr')
df_lab_cleaned.registerTempTable('c1_labor_gfr')
df_stat.registerTempTable('c1_stat')

Upload tables to Databricks

In [None]:
spark.sql(""" CREATE TABLE ds_msc_2024_spring.medication_acs.patients AS SELECT * FROM c1_patients """)
spark.sql(""" CREATE TABLE ds_msc_2024_spring.medication_acs.medication AS SELECT * FROM c1_medication """)
spark.sql(""" CREATE TABLE ds_msc_2024_spring.medication_acs.prescription AS SELECT * FROM c1_prescr """)
spark.sql(""" CREATE TABLE ds_msc_2024_spring.medication_acs.labor_gfr AS SELECT * FROM c1_labor_gfr """)
spark.sql(""" CREATE TABLE ds_msc_2024_spring.medication_acs.stat AS SELECT * FROM c1_stat """)