# Installation

<font size="20">**⚠**</font> <font size="4"> Nachfolgende Chunk muss nur in **Google-Coolab** ausgeführt werden und klont das Projekt-Repository ins Arbeitsverzeichniss, um auf die zusätzlichen Skript und CSV-Dateien zugreifen zu können. </font>

In [None]:
# !git clone https://github.com/Fuenfgeld/DMA2022DataProjectC.git
# %cd DMA2022DataProjectC/src

# Inhaltsverzeichniss

Der folgende [ETL-Prozess](https://github.com/Fuenfgeld/DMA2022DataProjectC/wiki/ETL-Prozess-Verzeichniss) wird im Projekt-Wiki näher beschrieben:

* [Einladen der Rohdaten und überprüfung auf Veränderungen](https://github.com/Fuenfgeld/DMA2022DataProjectC/wiki/Datenvorverarbeitung#datenpr%C3%BCfung)
* Verbindung und Prüfung der Datenbank
* [Anonymisierung von patientenbezogenen Daten](https://github.com/Fuenfgeld/DMA2022DataProjectC/wiki/Datenschutzfolgeabsch%C3%A4tzung#personenbezogene-daten)
* [Messung der Datenfehler](https://github.com/Fuenfgeld/DMA2022DataProjectC/wiki/Datenqualit%C3%A4t)
* [Filterung auf die relevante Daten](https://github.com/Fuenfgeld/DMA2022DataProjectC/wiki/Mappingtabellen)
* [Transformation ins Stern Schenma](https://github.com/Fuenfgeld/DMA2022DataProjectC/wiki/Datenschema)

![image](../images/filtered_data.png)

In [33]:
%reset -f  # reset kernel

Don't know how to reset  #, please run `%reset?` for details
Don't know how to reset  reset, please run `%reset?` for details
Don't know how to reset  kernel, please run `%reset?` for details


##

# Genutzte Umgebung

Hier werden die verwendeten Versionen von 'system', 'pandas' und 'numpy' in einem logger Objekt gespeichert.

In [11]:
import pandas as pd
from logger import Logger
from test_executer import TestExecutor
import extract
import sys
import numpy as np
import hashlib

logger = Logger()
testExecutor = TestExecutor(logger)

dependencies = [
    ('system', sys.version_info),
    ('pandas', pd.__version__),
    ('numpy', np.__version__),
]
for dependency in dependencies:
    logger.log(f"{dependency[0]} is installed with version {dependency[1]}") 

logger.startTimeMeasurement('etl-process', 'Full etl process')

{"type": "info", "time": 1658325470771, "message": "system is installed with version sys.version_info(major=3, minor=8, micro=10, releaselevel='final', serial=0)", "params": null}
{"type": "info", "time": 1658325470771, "message": "pandas is installed with version 1.4.3", "params": null}
{"type": "info", "time": 1658325470771, "message": "numpy is installed with version 1.23.0", "params": null}


# Setup der Daten

Zuerst laden wir die benötigten Daten herunter und initialisieren die genutzten Python Objekte.

In [12]:
tables = [
    "conditions",
    "observations",
    "patients",
]

files = [
    "data/others/",
    "data/asthma/",
    "data/gallstones/",
    "data/hypertension/",
]

md5Hashes = {
    "data/others/conditions.csv": "ce0034e9ed9185b7d4c408ee9916de18",
    "data/others/observations.csv": "b9e3bf1b033dc4af7f7ade78a48a50a4",
    "data/others/patients.csv": "530570c8e30b77a822b37e927d1486b2",
    "data/asthma/conditions.csv": "e7965095ec41ef88498540341c79c49e",
    "data/asthma/observations.csv": "1b8583de62d4d9e80c224005d74dd736",
    "data/asthma/patients.csv": "b139ef00c850308c3d3f8e7fa0f97724",
    "data/gallstones/conditions.csv": "8a19bf13191cf074c64534c2fa01f15c",
    "data/gallstones/observations.csv": "9d3807dc05cd7b4ccc3f0ee7b4f7b55e",
    "data/gallstones/patients.csv": "3766f46941ee2155e0d1ed6e749e8ba7",
    "data/hypertension/conditions.csv": "8310cdc07924b48e07aa841f9075b488",
    "data/hypertension/observations.csv": "f7564c732eebe9ace17a46e50b3cc857",
    "data/hypertension/patients.csv": "2ebdf6b168e9c968ffa949463cd074e7",
}

Hier wird geprüft, ob die Quelldaten bereits heruntergeladen wurden. Falls nicht, wird der download gestartet.<br>
Außerdem wird geprüft, ob sich die [Daten gegenüber den originalen Rohdaten verändert](https://github.com/Fuenfgeld/DMA2022DataProjectC/wiki/Datenvorverarbeitung#datenpr%C3%BCfung) haben.<br>
Die Information werden im logger Objekt gespeichert.

In [13]:
from urllib.request import urlopen
import os

def ensure_file_has_been_downloaded(filename):
    full_filename = "../" + filename

    url = "https://raw.githubusercontent.com/Fuenfgeld/DMA2022DataProjectC/main/" + filename
    if os.path.isfile(full_filename):
        logger.log("File {} already exists, skipping download".format(filename))
    else:
        logger.log("Downloading {}".format(filename))
        download_file(url, full_filename)

def download_file(url, filename):
    with open(filename, 'wb') as out_file:
        with urlopen(url) as file:
            out_file.write(file.read())

if not os.path.isfile("extract.py"):
    download_file(
        "https://raw.githubusercontent.com/Fuenfgeld/DMA2022DataProjectC/main/src/extract.py",
        "extract.py"
    )

dataChanged = False
for file in files:
    for table in tables:
        filename = file+table+".csv"
        ensure_file_has_been_downloaded(filename)

        with open("../" + filename) as fileHandle:
            fileContent = fileHandle.read()
            fileHandle.close()

        md5Hash = hashlib.md5(fileContent.encode()).hexdigest()
        if md5Hashes[filename] != md5Hash:
            dataChanged = True
    
if dataChanged:
    logger.log("❌ Data set changed")
else:
    logger.log("✅ Using original data set")

{"type": "info", "time": 1658325470854, "message": "File data/others/conditions.csv already exists, skipping download", "params": null}
{"type": "info", "time": 1658325470855, "message": "File data/others/observations.csv already exists, skipping download", "params": null}
{"type": "info", "time": 1658325470881, "message": "File data/others/patients.csv already exists, skipping download", "params": null}
{"type": "info", "time": 1658325470882, "message": "File data/asthma/conditions.csv already exists, skipping download", "params": null}
{"type": "info", "time": 1658325470885, "message": "File data/asthma/observations.csv already exists, skipping download", "params": null}
{"type": "info", "time": 1658325470964, "message": "File data/asthma/patients.csv already exists, skipping download", "params": null}
{"type": "info", "time": 1658325470965, "message": "File data/gallstones/conditions.csv already exists, skipping download", "params": null}
{"type": "info", "time": 1658325470968, "mes

# Mit Datenbank verbinden

Durch den Aufruf der Funktion 'connect_to_db', die in der Datei 'extract.py' definiert ist, werden die Quelltabellen in der Datenbank initialisiert .  

In [14]:
databaseFile = "data.sqlite"

logger.startTimeMeasurement('open-db', 'Connected to db and created tables')
connection = extract.connect_to_db(logger, databaseFile)  # create table patients, observations, conditions
logger.endTimeMeasurement('open-db')

#### Test der Datenbankverbindung

In [15]:
def test_sqliteConnection(_logger):
    cursor = connection.cursor()
    cursor.execute("SELECT name FROM sqlite_master WHERE type='table'")
    tablesInDb = list(map(lambda tableResult: tableResult[0], cursor.fetchall()))
    tablesInDb.sort()

    for table in tables:
        if not(table in tablesInDb):
            raise Exception('Table not found:', table)

testExecutor.execute('Test connection to database', test_sqliteConnection)

{"type": "info", "time": 1658325471179, "message": "✅ Test ran successfully: Test connection to database", "params": null}


# Daten in Datenbank laden

Laden der verwendete Daten in die Datenbank

-   conditions
-   observations
-   patients

In [16]:
logger.startTimeMeasurement('load-data', 'Loading data into db')
for file in files:
    for table in tables:
        extract.insert_values_to_table(logger, connection.cursor(), table, "../"+ file + table + ".csv")  # TODO: insert ALL values in the right tables 
        connection.commit()

logger.endTimeMeasurement('load-data')

{"type": "info", "time": 1658325471204, "message": "🏗 Extracting data from ../data/others/conditions.csv", "params": null}
{"type": "info", "time": 1658325471223, "message": "🏗 Extracting data from ../data/others/observations.csv", "params": null}
{"type": "info", "time": 1658325471512, "message": "🏗 Extracting data from ../data/others/patients.csv", "params": null}
{"type": "info", "time": 1658325471518, "message": "🏗 Extracting data from ../data/asthma/conditions.csv", "params": null}
{"type": "info", "time": 1658325471620, "message": "🏗 Extracting data from ../data/asthma/observations.csv", "params": null}
{"type": "info", "time": 1658325472654, "message": "🏗 Extracting data from ../data/asthma/patients.csv", "params": null}
{"type": "info", "time": 1658325472661, "message": "🏗 Extracting data from ../data/gallstones/conditions.csv", "params": null}
{"type": "info", "time": 1658325472696, "message": "🏗 Extracting data from ../data/gallstones/observations.csv", "params": null}
{"type

# Anonymisierung

Zur Einhaltung der [Datenschutzfolgeabschätzung](https://github.com/Fuenfgeld/DMA2022DataProjectC/wiki/Datenschutzfolgeabsch%C3%A4tzung) müssen die patientenbezogenen Daten [anoymisiert](https://github.com/Fuenfgeld/DMA2022DataProjectC/wiki/Datenvorverarbeitung#anonymisierung) werden, sodass kein Rückschluss auf die Personen in darauf folgenden analysen gezogen werden kann.

In [17]:
import pandas as pd
import random
import hashlib

logger.startTimeMeasurement('anonymization', 'Anonymizing all data sets')

# create data frames
patientDf = pd.read_sql_query('SELECT * FROM patients;', connection)
conditionsDf = pd.read_sql_query('SELECT * FROM conditions;', connection)
observationsDf = pd.read_sql_query('SELECT * FROM observations;', connection)

# concatenate all existent patient ids
patientIds = [*patientDf.Id, *conditionsDf.PATIENT, *observationsDf.PATIENT]

# sanity check whether all patient ids were concatenated
def test_sanityCheckCombiningIds(_logger): 
    expectedLen = len(patientDf) + len(conditionsDf) + len(observationsDf)
    actualLen = len(patientIds)
    if actualLen != expectedLen:
        raise Exception('Not all patient ids were concatenated')

testExecutor.execute('Sanity check: extracting all ids worked', test_sanityCheckCombiningIds)

# converts list to a set with only unique values
uniqueIds = set(patientIds)
logger.log(f"{len(uniqueIds)} unique patient ids found")
if len(uniqueIds) >= len(patientDf.Id):
    logger.log(f"⚠️ The dataset contains {len(uniqueIds)} unique patientIds but only {len(patientDf.Id)} patients.")

anonymizedIds = {}
# anonymization
for id in uniqueIds:
    # use uppercase here so it is easy to see if anonymized ids are used
    anonymizedIds[id] = hashlib.sha256(f"{id}={random.random()}".encode()).hexdigest().upper()

# sanity check whether a origin id still exists in anonymized id list
def test_sanityEnsureAllIdsAreAnonymized(_logger): 
    for id in patientIds:
        if id in anonymizedIds:
            raise Exception('A origin id still exists in anonymized id list')

testExecutor.execute('Sanity check: no origin ids exist anymore', test_sanityCheckCombiningIds)

# convert data frames in sql tables
logger.startTimeMeasurement('anonymizedPatients', 'Writing anonymized patients')
patientDf = patientDf.replace({"Id": anonymizedIds}).drop(columns=[
    'SSN', 'DRIVERS', 'PASSPORT', 'PREFIX', 'FIRST', 'LAST', 'MAIDEN', 'BIRTHPLACE',
    'ADDRESS', 'ZIP', 'LAT', 'LON'
])
patientDf.to_sql(name="anonymized_patients", con=connection, if_exists='replace')
logger.endTimeMeasurement('anonymizedPatients')

logger.startTimeMeasurement('anonymizedConditions', 'Writing anonymized conditions')
conditionsDf = conditionsDf.replace({"PATIENT": anonymizedIds})
conditionsDf.to_sql(name="anonymized_conditions", con=connection, if_exists='replace')
logger.endTimeMeasurement('anonymizedConditions')

logger.startTimeMeasurement('anonymizedObservations', 'Writing anonymized Observations')
observationsDf = observationsDf.replace({"PATIENT": anonymizedIds})
observationsDf.to_sql(name="anonymized_observations", con=connection, if_exists='replace')
logger.endTimeMeasurement('anonymizedObservations')

connection.execute('DROP TABLE patients;')
connection.execute('DROP TABLE observations;')
connection.execute('DROP TABLE conditions;')

connection.commit()
logger.endTimeMeasurement('anonymization')

tables = ['anonymized_patients', 'anonymized_conditions', 'anonymized_observations']

{"type": "info", "time": 1658325476118, "message": "✅ Test ran successfully: Sanity check: extracting all ids worked", "params": null}
{"type": "info", "time": 1658325476153, "message": "1330 unique patient ids found", "params": null}
{"type": "info", "time": 1658325476153, "message": "⚠️ The dataset contains 1330 unique patientIds but only 1326 patients.", "params": null}
{"type": "info", "time": 1658325476155, "message": "✅ Test ran successfully: Sanity check: no origin ids exist anymore", "params": null}


Für eine bessere Übersicht lassen sich mit folgendem Befehl die in der Datenbank enthaltenen Tabellen anzeigen:

In [18]:
print(pd.read_sql_query('''SELECT name FROM sqlite_master 
WHERE type IN ('table','view') 
AND name NOT LIKE 'sqlite_%'
ORDER BY 1''', connection))

                      name
0    anonymized_conditions
1  anonymized_observations
2      anonymized_patients


# Messung der Datenfehler

Die [Datenqualität](https://github.com/Fuenfgeld/DMA2022DataProjectC/wiki/Datenqualit%C3%A4t) ist von wichtiger Bedeutung und gibt an ob die Daten für die weitere Auswertung überhaupt zu gebrauchen sind. Zum Beispiel sind für unsere [Forschungsfrage](https://github.com/Fuenfgeld/DMA2022DataProjectC/wiki) ausschließlich Daten mit gemessenen BMI relevant. Wurde dieser nicht vermessen oder eingetragen, so können die Daten für die [Forschungsfrage](https://github.com/Fuenfgeld/DMA2022DataProjectC/wiki) nicht verwendet werden und sind somit unbrauchbar.

### Anzahl NULL-Values

Die Rohdaten werden zuvor auf die [Anzahl an NULL-Values überprüft](https://github.com/Fuenfgeld/DMA2022DataProjectC/wiki/Datenqualit%C3%A4t#prozentualer-anteil-von-null-werten-im-jedem-merkmal).<br>
Weisen mehr als **ein drittel der Daten** Lücken in der Kodierung auf, wird ein Fehler in der Verfassung angenommen und die Daten müssen manuell überprüft werden.

In [19]:
logger.startTimeMeasurement('null-check', 'Checking for NULL values')

null_counter = 0
num_of_elements = 0
for table in tables:
    querie = f"SELECT * from {table};"
    df = pd.read_sql_query(querie,connection)
    result_string = str(df.isna().sum()).replace("\n"," NULL-Values in Column ")
    logger.log(f"Found {result_string} null-values in {table}.")
    null_counter = df.isna().sum().sum() + null_counter
    num_of_elements = num_of_elements + df.size
perc_null_val = round(null_counter / num_of_elements,3)

if perc_null_val > 0.33:
    logger.log(f"Found {perc_null_val} null-values.",type='Warning')
else:
    logger.log(f"Found {perc_null_val} null-values.")

logger.endTimeMeasurement('null-check')

{"type": "info", "time": 1658325528782, "message": "Found index                  0 NULL-Values in Column Id                     0 NULL-Values in Column BIRTHDATE              0 NULL-Values in Column DEATHDATE              0 NULL-Values in Column SUFFIX                 0 NULL-Values in Column MARITAL                0 NULL-Values in Column RACE                   0 NULL-Values in Column ETHNICITY              0 NULL-Values in Column GENDER                 0 NULL-Values in Column CITY                   0 NULL-Values in Column STATE                  0 NULL-Values in Column COUNTRY                0 NULL-Values in Column HEALTHCARE_EXPENSES    0 NULL-Values in Column HEALTHCARE_COVERAGE    0 NULL-Values in Column dtype: int64 null-values in anonymized_patients.", "params": null}
{"type": "info", "time": 1658325528870, "message": "Found index          0 NULL-Values in Column START          0 NULL-Values in Column STOP           0 NULL-Values in Column PATIENT        0 NULL-Values in Column ENC

### Prüfung auf Duplikate

Duplikate verfälschen die Ergebnisse des Analyseteil durch Steigerung der Grundgesamheit mit gleichen Werten. Somit müssen die Daten auf [Duplikate in den einzelnen Files überprüft](https://github.com/Fuenfgeld/DMA2022DataProjectC/wiki/Datenqualit%C3%A4t#pr%C3%BCfung-auf-duplikate) werden, um gleiche Messungen zu finden und gegebenfalls im ETL-Process zu entfernen.


In [20]:
logger.startTimeMeasurement('duplicate-check', 'Checking for duplicate values')

num_of_duplicates = 0
num_of_elements = 0
for table in tables:
    querie = f"SELECT * from {table};"
    df = pd.read_sql_query(querie,connection)
    duplicates = df.groupby(df.columns.tolist()).size().reset_index().\
    rename(columns={0:'records'})
    curr_num_duplicate = (duplicates.records -1).sum() 
    num_of_duplicates = num_of_duplicates + curr_num_duplicate
    logger.log(f"Found {curr_num_duplicate} duplicate-values in {table}.")
    num_of_elements = num_of_elements + df.size
perc_duplicates = round(num_of_duplicates / num_of_elements,3)
logger.log(f"Found {perc_duplicates} duplicate-values.")

logger.endTimeMeasurement('duplicate-check')

{"type": "info", "time": 1658325531781, "message": "Found 0 duplicate-values in anonymized_patients.", "params": null}
{"type": "info", "time": 1658325531911, "message": "Found 0 duplicate-values in anonymized_conditions.", "params": null}
{"type": "info", "time": 1658325534404, "message": "Found 0 duplicate-values in anonymized_observations.", "params": null}
{"type": "info", "time": 1658325534404, "message": "Found 0.0 duplicate-values.", "params": null}


### Prozentualer Anteil von Gewichts und BMI Werten für Patienten

Für unsere [Forschungsfrage](https://github.com/Fuenfgeld/DMA2022DataProjectC/wiki) sind [BMI-Werte relevant](https://github.com/Fuenfgeld/DMA2022DataProjectC/wiki/Datenqualit%C3%A4t#prozentualer-anteil-an-gewichts--und-bmi-werten-im-datensatz) und müssen für den Patienten mindestes einmal kodiert worden sein. Um die [Forschungsfrage](https://github.com/Fuenfgeld/DMA2022DataProjectC/wiki) mit den zur vorliegenden Daten zu beanworten, sollten auch hier mindestens **ein drittel der Daten** mit einen BMI kodiert worden sein.


In [21]:
all_patients_query = """
SELECT COUNT(id) FROM anonymized_patients;"""
count_bmi_query = """
SELECT
    COUNT(distinct id)
FROM anonymized_patients patients
JOIN anonymized_observations observations
    ON patients.id == observations.patient
WHERE observations.Code = '59576-9'
"""

count_all_bmi_query = f"""
SELECT COUNT(patient) FROM anonymized_observations WHERE code = '59576-9'"""

patient_all_count = connection.execute(all_patients_query).fetchall()[0][0]
patient_bmi_count = connection.execute(count_bmi_query).fetchall()[0][0]
bmi_count = connection.execute(count_all_bmi_query).fetchall()[0][0]
ratio = round(patient_bmi_count/patient_all_count, 3) 

logger.log(f"Total num of patients {patient_all_count}.")

if ratio > 0.33:
    logger.log(f"Found {patient_bmi_count} patients ({round(ratio*100,3)}%) with {bmi_count} BMI-values.")
else:
    logger.log(f"Found {patient_bmi_count} patients ({round(ratio*100,3)}%) with {bmi_count} BMI-values.")


{"type": "info", "time": 1658325534540, "message": "Total num of patients 1326.", "params": null}
{"type": "info", "time": 1658325534540, "message": "Found 450 patients (33.9%) with 3539 BMI-values.", "params": null}


# [Vorfilterung der Datensätze](https://github.com/Fuenfgeld/DMA2022DataProjectC/wiki/Datenvorverarbeitung#vergr%C3%B6berung-der-daten)

Um uns die weitere Verarbeitung der Daten zu erleichtern, entfernen wir alle Tabellenspalten und Datensätze, die wir in der Analyse nicht benötigen.

In [22]:
logger.startTimeMeasurement('filter-data', 'Remove unnecessary data for etl process')

connection.execute('DROP TABLE IF EXISTS  filtered_patients;')
connection.execute('''
    CREATE TABLE IF NOT EXISTS filtered_patients(
        patient_id STRING PRIMARY KEY UNIQUE,
        birth_date STRING,
        death_date STRING
    );
''');
connection.execute('''
    INSERT INTO filtered_patients (patient_id, birth_date, death_date)
        SELECT Id, BIRTHDATE, DEATHDATE FROM anonymized_patients;
''')

connection.execute('DROP TABLE IF EXISTS  filtered_conditions;')
connection.execute('''
    CREATE TABLE IF NOT EXISTS filtered_conditions(
        patient_id STRING,
        code STRING,
        description STRING,
        start_date STRING,
        end_date STRING
    );
''')
connection.execute('''
    INSERT INTO filtered_conditions (patient_id, code, description, start_date, end_date)
        SELECT PATIENT, CODE, DESCRIPTION, START, STOP FROM anonymized_conditions
        WHERE DESCRIPTION NOT LIKE '%finding%';
''')

connection.execute('DROP TABLE IF EXISTS  filtered_observations;')
connection.execute('''
    CREATE TABLE IF NOT EXISTS filtered_observations(
        patient_id STRING,
        date STRING,
        code STRING,
        description STRING,
        value STRING,
        units STRING,
        type STRING
    );
''')
connection.execute('''
    INSERT INTO filtered_observations (patient_id, date, code, description, value, units, type)
        SELECT PATIENT, DATE, CODE, DESCRIPTION, VALUE, UNITS, TYPE FROM anonymized_observations
        WHERE CODE in ('8302-2', '29463-7', '39156-5', '8462-4', '8480-6', '8867-4');
''')

connection.commit()
logger.endTimeMeasurement('filter-data')

# Star Schema

Im folgenden werden die vorverabeiten Tabellen nun in das Stern-Schema überführt. Wodruch können die Abfragen und [Analysen](https://github.com/Fuenfgeld/DMA2022DataProjectC/wiki/Analyse) der Daten effizent durchgeführt werden.

![](../images/star_schema.png)

In [23]:
cursor = connection.cursor()

#### Dimensionstabelle _patientDimension_

In [24]:
# table patients_
cursor.execute('''DROP TABLE IF EXISTS patientDimension;''')
cursor.execute('''
        CREATE TABLE patientDimension ( 
        ID STRING PRIMARY KEY UNIQUE,
        AGE INT64
        );''')

<sqlite3.Cursor at 0x7fd6ea97ec00>

Im folgenden Abschnitt wird aus dem Geburts- und Todesdatum, bzw. aus dem Geburtsdatum und dem heutigen Tag das Alter jedes Patienten berechnet.

In [25]:
logger.startTimeMeasurement('fill-patient-dimension', 'Fill patient dimension table')

# create df patients
cursor.execute('''SELECT id, birthdate, deathdate FROM anonymized_patients;''')
df_patients = pd.DataFrame(cursor.fetchall(), columns=['id', 'birthdate', 'deathdate'])

# convert to date
df_patients["deathdate"] = pd.to_datetime(df_patients["deathdate"])
df_patients["birthdate"] = pd.to_datetime(df_patients["birthdate"])
# fill null values withh todays date
df_patients['deathdate'] = df_patients.deathdate.fillna(pd.to_datetime("today"))
# calculate age
df_patients["age"] = df_patients.deathdate.dt.year - df_patients.birthdate.dt.year
# drop unnecessary variables
df_patients = df_patients.drop(['birthdate', 'deathdate'], axis=1)

logger.log(f"Number of Duplicated Rows: {df_patients.duplicated(df_patients.columns).sum()}")

df_patients.to_sql('df_patients', connection, if_exists='replace', index=False)
cursor.execute('INSERT INTO patientDimension (id, age) SELECT id, age FROM df_patients;')
cursor.execute('''DROP TABLE IF EXISTS df_patients;''')

logger.endTimeMeasurement('fill-patient-dimension')

{"type": "info", "time": 1658325534805, "message": "Number of Duplicated Rows: 0", "params": null}


#### Dimensionstabelle _diseaseDimension_

In [26]:
# table conditions
cursor.execute('''DROP TABLE IF EXISTS diseaseDimension;''')
cursor.execute('''
    CREATE TABLE diseaseDimension ( 
    code STRING,
    description STRING
);''')

logger.startTimeMeasurement('fill-disease-dimension', 'Fill disease dimension table')
cursor.execute('''
INSERT INTO diseaseDimension
    SELECT DISTINCT code, description FROM filtered_conditions;
''')
cursor.close()
connection.commit()

logger.endTimeMeasurement('fill-disease-dimension')

### Faktentabelle

Für jedes Merkmal
* BMI
* Größe
* Gewicht
* diastolischer Blutdruck
* systolischer Blutdruck
* Herzfrequenz

wird eine Zwischentabelle angelegt, die als Hilfestellungen zum Füllen der Faktentabelle dienen.

In [27]:
def createValueTableForCode(code, name):
    connection.execute(f'DROP TABLE IF EXISTS patient_condition_{name};')
    connection.execute(f'''
        CREATE TABLE patient_condition_{name} (
            condition_row_id INT64,
            patient_id STRING,
            disease_code STRING,
            {name} STRING,
            date_diff STRING
        );
    ''')
    connection.execute(f'''
        INSERT INTO patient_condition_{name} (
            condition_row_id,
            patient_id,
            disease_code,
            {name},
            date_diff
        )
        SELECT
            rowid,
            patient_id,
            code,
            {name},
            MIN({name}_date_diff)
        FROM (
            SELECT
                condition.rowid as rowid,
                condition.patient_id as patient_id,
                condition.code as code,
                {name}_observation.value as {name},
                ABS(JULIANDAY(condition.start_date) - JULIANDAY({name}_observation.date)) as {name}_date_diff
            FROM filtered_conditions condition
            LEFT JOIN filtered_observations {name}_observation ON
                condition.patient_id = {name}_observation.patient_id
                AND
                {name}_observation.code = '{code}'
        )
        GROUP BY rowid;
    ''')

    connection.commit()

logger.startTimeMeasurement('transform-facts', 'Transform facts for fact table')
createValueTableForCode('39156-5', 'bmi')
createValueTableForCode('8302-2', 'height')
createValueTableForCode('29463-7', 'weight')
createValueTableForCode('8462-4', 'diastolic_blood_pressure')
createValueTableForCode('8480-6', 'systolic_blood_pressure')
createValueTableForCode('8867-4', 'heart_rate')
logger.endTimeMeasurement('transform-facts')

Initialisieren und Füllen der Faktentabelle.

In [28]:
connection.execute('''DROP TABLE IF EXISTS fact_table;''')
connection.execute('''
        CREATE TABLE fact_table ( 
            patient_id STRING,
            disease_code STRING,
            bmi STRING,
            height STRING,
            weight STRING,
            heart_rate STRING,
            diastolic_blood_pressure STRING,
            systolic_blood_pressure STRING
        );''')

logger.startTimeMeasurement('fill-fact-table', 'Fill fact table')
connection.execute('''
    INSERT INTO fact_table (
        patient_id,
        disease_code,
        bmi,
        height,
        weight,
        heart_rate,
        diastolic_blood_pressure,
        systolic_blood_pressure
    ) SELECT 
        bmi.patient_id,
        bmi.disease_code,
        bmi.bmi,
        height.height,
        weight.weight,
        heart_rate.heart_rate,
        diastolic_blood_pressure.diastolic_blood_pressure,
        systolic_blood_pressure.systolic_blood_pressure
    FROM patient_condition_bmi bmi
    JOIN patient_condition_height height
        ON height.condition_row_id = bmi.condition_row_id
    JOIN patient_condition_weight weight
        ON weight.condition_row_id = bmi.condition_row_id
    JOIN patient_condition_heart_rate heart_rate
        ON heart_rate.condition_row_id = bmi.condition_row_id
    JOIN patient_condition_diastolic_blood_pressure diastolic_blood_pressure
        ON diastolic_blood_pressure.condition_row_id = bmi.condition_row_id
    JOIN patient_condition_systolic_blood_pressure systolic_blood_pressure
        ON systolic_blood_pressure.condition_row_id = bmi.condition_row_id;
''')

connection.execute('''
    DELETE FROM fact_table
    WHERE
        bmi IS NULL
        AND height IS NULL
        AND weight IS NULL;
''')

connection.execute('''
    UPDATE fact_table
        SET bmi = weight / (height * height)
    WHERE
        bmi IS NULL
        AND weight IS NOT NULL
        AND height IS NOT NULL;
''')

connection.commit()
logger.endTimeMeasurement('fill-fact-table')

## Aufräumen & Logs speichern

Zum Schluss wird die Logg-Datei für die Nachverfolgbarkeit archiviert.

In [29]:
logger.endTimeMeasurement('etl-process')
connection.close()
logger.logTimings()
logger.writeToFile("../artefacts-for-release/etl-log.json")

{"type": "info", "time": 1658325535915, "message": "⏳ Full etl process in 507122ms", "params": {"timingInMilliseconds": 507122}}
{"type": "info", "time": 1658325535915, "message": "⏳ Connected to db and created tables in 442026ms", "params": {"timingInMilliseconds": 442026}}
{"type": "info", "time": 1658325535915, "message": "⏳ Loading data into db in 445599ms", "params": {"timingInMilliseconds": 445599}}
{"type": "info", "time": 1658325535915, "message": "⏳ Anonymizing all data sets in 496011ms", "params": {"timingInMilliseconds": 496011}}
{"type": "info", "time": 1658325535915, "message": "⏳ Writing anonymized patients in 442355ms", "params": {"timingInMilliseconds": 442355}}
{"type": "info", "time": 1658325535915, "message": "⏳ Writing anonymized conditions in 443570ms", "params": {"timingInMilliseconds": 443570}}
{"type": "info", "time": 1658325535915, "message": "⏳ Writing anonymized Observations in 492235ms", "params": {"timingInMilliseconds": 492235}}
{"type": "info", "time": 16

# Analyse

Anschließend kann die Analyse auf den Daten durchgeführt werden. Das Notebook hierzu finden Sie [hier](https://github.com/Fuenfgeld/DMA2022DataProjectC/blob/main/src/analysis.ipynb) oder eine schrieftliche Ausarbeitung [hier](https://github.com/Fuenfgeld/DMA2022DataProjectC/wiki/Analyse).