<a href="https://colab.research.google.com/github/MichaelKru92/Projekt-ML-Modelierung/blob/main/Schritt_6_SQL_DB_Faktentabelle_Datenupload.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**Aufbau eine SQL Datenbank**

Im diesem Schritt bauen wir analog zum Schema unserer DataFrames und CSV-Rohdaten eine SQL Datenbank namens synthea.db in sqlite3 auf.

Das Schema zur Datenbank ist im Wiki hinterlegt.

In [None]:
import sqlite3
import os

#Aufbau der SQL Datenbank
db_name = "synthea.db"
db_path = os.path.join(".", db_name)                                                                           #DB Dateiname wird vergeben
conn = sqlite3.connect(db_path)                                                                   #Verbindung zur DB Datei wird hergestellt
cur = conn.cursor()                                                                               #Ueber dise Verbindung wird auf die DB ein Cursor gesetzt

sql_table_careplans = """
CREATE TABLE IF NOT EXISTS careplans (
    Id STRING PRIMARY KEY,
    START DATE,
    STOP DATE,
    PATIENT STRING,
    ENCOUNTER STRING,
    CODE STRING,
    DESCRIPTION STRING,
    REASONCODE STRING,
    REASONDESCRIPTION STRING,
    FOREIGN KEY (PATIENT)
       REFERENCES patients (Id),
    FOREIGN KEY (ENCOUNTER)
       REFERENCES encounters (Id)
);
"""
cur.execute(sql_table_careplans)


try:
    sql_table_patients = """
    CREATE TABLE IF NOT EXISTS patients (
        Id TEXT PRIMARY KEY,
        BIRTHDATE TEXT,
        DEATHDATE TEXT,
        SSN TEXT,
        FIRST TEXT,
        LAST TEXT,
        SUFFIX TEXT,
        RACE TEXT,
        ETHNICITY TEXT,
        GENDER TEXT,
        BIRTHPLACE TEXT,
        ADDRESS TEXT,
        CITY TEXT,
        STATE TEXT,
        COUNTY TEXT,
        ZIP TEXT,
        LAT REAL,
        LON REAL,
        HEALTHCARE_EXPENSES REAL,
        HEALTHCARE_COVERAGE REAL
        );
        """
    cur = conn.cursor()
    cur.execute(sql_table_patients)
    conn.commit()

    sql_table_organizations = """
    CREATE TABLE IF NOT EXISTS organizations (
        Id TEXT PRIMARY KEY,
        NAME TEXT,
        ADDRESS TEXT,
        CITY TEXT,
        STATE TEXT,
        ZIP TEXT,
        LAT REAL,
        LON REAL,
        PHONE TEXT,
        REVENUE REAL,
        UTILIZATION INTEGER
        );
        """
    cur = conn.cursor()
    cur.execute(sql_table_organizations)
    conn.commit()

    sql_table_providers = """
    CREATE TABLE IF NOT EXISTS providers (
        Id TEXT PRIMARY KEY,
        ORGANIZATION TEXT,
        NAME TEXT,
        GENDER TEXT,
        SPECIALITY TEXT,
        FOREIGN KEY (ORGANIZATION)
            REFERENCES organizations (Id)
        );
        """
    cur = conn.cursor()
    cur.execute(sql_table_providers)
    conn.commit()

    sql_table_payers = """
    CREATE TABLE IF NOT EXISTS payers (
        Id TEXT PRIMARY KEY,
        NAME TEXT,
        ADDRESS TEXT,
        CITY TEXT,
        STATE_HEADQUARTERED TEXT,
        ZIP TEXT,
        PHONE TEXT,
        AMOUNT_COVERED REAL,
        AMOUNT_UNCOVERED REAL,
        REVENUE INTEGER,
        COVERED_ENCOUNTERS INTEGER,
        UNCOVERED_ENCOUNTERS INTEGER,
        COVERED_MEDICATIONS INTEGER,
        UNCOVERED_MEDICATIONS INTEGER,
        COVERED_PROCEDURES INTEGER,
        UNCOVERED_PROCEDURES INTEGER,
        COVERED_IMMUNIZATIONS INTEGER,
        UNCOVERED_IMMUNIZATIONS INTEGER,
        UNIQUE_CUSTOMERS INTEGER,
        QOLS_AVG REAL,
        MEMBER_MONTHS INTEGER
        );"""
    cur = conn.cursor()
    cur.execute(sql_table_payers)
    conn.commit()

    sql_table_encounters = """
    CREATE TABLE IF NOT EXISTS encounters (
        Id TEXT PRIMARY KEY,
        START TEXT,
        STOP TEXT,
        PATIENT TEXT,
        ORGANIZATION TEXT,
        PROVIDER TEXT,
        PAYER TEXT,
        ENCOUNTERCLASS TEXT,
        CODE TEXT,
        DESCRIPTION TEXT,
        BASE_ENCOUNTER_COST REAL,
        TOTAL_CLAIM_COST REAL,
        PAYER_COVERAGE REAL,
        FOREIGN KEY (PATIENT)
            REFERENCES patients (Id),
        FOREIGN KEY (ORGANIZATION)
            REFERENCES organizations (Id),
        FOREIGN KEY (PROVIDER)
            REFERENCES providers (Id),
        FOREIGN KEY (PAYER)
            REFERENCES payers (Id)
        );
        """
    cur = conn.cursor()
    cur.execute(sql_table_encounters)
    cur.commit()

    sql_table_careplans = """
    CREATE TABLE IF NOT EXISTS careplans (
        Id TEXT PRIMARY KEY,
        START TEXT,
        STOP TEXT,
        PATIENT TEXT,
        ENCOUNTER TEXT,
        CODE TEXT,
        DESCRIPTION TEXT,
        REASONCODE TEXT,
        REASONDESCRIPTION TEXT,
        FOREIGN KEY (PATIENT)
            REFERENCES patients (Id),
        FOREIGN KEY (ENCOUNTER)
            REFERENCES encounters (Id)
        );
        """
    cur = conn.cursor()
    cur.execute(sql_table_careplans)
    conn.commit()

    sql_table_conditions = """
    CREATE TABLE IF NOT EXISTS conditions (
        START TEXT,
        STOP TEXT,
        PATIENT TEXT,
        ENCOUNTER TEXT,
        CODE TEXT,
        DESCRIPTION TEXT,
        FOREIGN KEY (PATIENT)
            REFERENCES patients (Id),
        FOREIGN KEY (ENCOUNTER)
            REFERENCES encounters (Id)
        );
        """
    cur = conn.cursor()
    cur.execute(sql_table_conditions)
    conn.commit()

    sql_table_medications = """
    CREATE TABLE IF NOT EXISTS medications (
        START TEXT,
        STOP TEXT,
        PATIENT TEXT,
        PAYER TEXT,
        ENCOUNTER TEXT,
        CODE TEXT,
        DESCRIPTION TEXT,
        BASE_COST REAL,
        PAYER_COVERAGE REAL,
        DISPENSES REAL,
        TOTALCOST REAL,
        FOREIGN KEY (PATIENT)
            REFERENCES patients (Id),
        FOREIGN KEY (ENCOUNTER)
            REFERENCES encounters (Id)
        );
        """
    cur = conn.cursor()
    cur.execute(sql_table_medications)
    conn.commit()

    sql_table_procedures = """
    CREATE TABLE IF NOT EXISTS procedures (
        DATE DATE,
        PATIENT TEXT,
        ENCOUNTER TEXT,
        CODE TEXT,
        DESCRIPTION TEXT,
        BASE_COST REAL,
        REASONCODE TEXT,
        REASONDESCRIPTION TEXT,
        FOREIGN KEY (PATIENT)
            REFERENCES patients (Id),
        FOREIGN KEY (ENCOUNTER)
            REFERENCES encounters (Id)
        );
        """
    cur = conn.cursor()
    cur.execute(sql_table_procedures)
    conn.commit()

    sql_table_observations = """
    CREATE TABLE IF NOT EXISTS observations (
        DATE DATE,
        PATIENT TEXT,
        ENCOUNTER TEXT,
        CODE TEXT,
        DESCRIPTION TEXT,
        VALUE TEXT,
        UNITS TEXT,
        TYPE TEXT,
        FOREIGN KEY (PATIENT)
            REFERENCES patients (Id),
        FOREIGN KEY (ENCOUNTER)
            REFERENCES encounters (Id)
        );
        """
    cur = conn.cursor()
    cur.execute(sql_table_observations)
    conn.commit()

    sql_table_devices = """
    CREATE TABLE IF NOT EXISTS devices (
        START TEXT,
        STOP TEXT,
        PATIENT TEXT,
        ENCOUNTER TEXT,
        CODE TEXT,
        DESCRIPTION TEXT,
        UDI TEXT,
        FOREIGN KEY (PATIENT)
            REFERENCES patients (Id),
        FOREIGN KEY (ENCOUNTER)
            REFERENCES encounters (Id)
        );
        """
    cur = conn.cursor()
    cur.execute(sql_table_devices)
    conn.commit()

    sql_table_imaging = """
    CREATE TABLE IF NOT EXISTS imaging_studies (
        Id TEXT PRIMARY KEY,
        DATE DATE,
        PATIENT TEXT,
        ENCOUNTER TEXT,
        BODYSITE_CODE TEXT,
        BODYSITE_DESCRIPTION TEXT,
        MODALITY_CODE TEXT,
        MODALITY_DESCRIPTION TEXT,
        SOP_CODE TEXT,
        SOP_DESCRIPTION TEXT,
        FOREIGN KEY (PATIENT)
            REFERENCES patients (Id),
        FOREIGN KEY (ENCOUNTER)
             REFERENCES encounters (Id)
        );
        """
    cur = conn.cursor()
    cur.execute(sql_table_imaging)
    conn.commit()

    sql_table_immunizations = """
    CREATE TABLE IF NOT EXISTS immunizations (
        DATE DATE,
        PATIENT TEXT,
        ENCOUNTER TEXT,
        CODE TEXT,
        DESCRIPTION TEXT,
        BASE_COST REAL,
        FOREIGN KEY (PATIENT)
            REFERENCES patients (Id),
        FOREIGN KEY (ENCOUNTER)
            REFERENCES encounters (Id)
        );
        """
    cur = conn.cursor()
    cur.execute(sql_table_immunizations)
    conn.commit()

    sql_table_payer_transitions = """
    CREATE TABLE IF NOT EXISTS payer_transitions (
        PATIENT TEXT,
        START_YEAR INTEGER,
        END_YEAR INTEGER,
        PAYER TEXT,
        OWNERSHIP TEXT,
        FOREIGN KEY (PATIENT)
            REFERENCES patients (Id)
        FOREIGN KEY (PAYER)
            REFERENCES payers (Id)
        );
        """
    cur = conn.cursor()
    cur.execute(sql_table_payer_transitions)
    conn.commit()

    sql_table_disease = """
    CREATE TABLE IF NOT EXISTS disease (
        START TEXT,
        STOP TEXT,
        PATIENT TEXT,
        ENCOUNTER TEXT,
        CODE TEXT,
        DESCRIPTION TEXT,
        FOREIGN KEY (PATIENT)
            REFERENCES patients (Id),
        FOREIGN KEY (ENCOUNTER)
            REFERENCES encounters (Id)
        );
        """
    cur = conn.cursor()
    cur.execute(sql_table_disease)
    conn.commit()

    sql_table_supplies = """
    CREATE TABLE IF NOT EXISTS supplies (
        DATE DATE,
        PATIENT TEXT,
        ENCOUNTER TEXT,
        CODE TEXT,
        DESCRIPTION TEXT,
        QUANTITY INTEGER,
        FOREIGN KEY (PATIENT)
            REFERENCES patients (Id)
        FOREIGN KEY (ENCOUNTER)
            REFERENCES encounters (Id)
        );
        """
    cur = conn.cursor()
    cur.execute(sql_table_supplies)
    conn.commit()

    sql_table_encounters = """
    CREATE TABLE IF NOT EXISTS encounters_final (
        Id TEXT PRIMARY KEY,
        START TEXT,
        STOP TEXT,
        PATIENT TEXT,
        ORGANIZATION TEXT,
        PROVIDER TEXT,
        PAYER TEXT,
        ENCOUNTERCLASS TEXT,
        CODE TEXT,
        DESCRIPTION TEXT,
        BASE_ENCOUNTER_COST REAL,
        TOTAL_CLAIM_COST REAL,
        PAYER_COVERAGE REAL,
        MEDICATION_COUNT INTEGER,
        VACCINATION_COUNT INTEGER,
        CONDITION_COUNT INTEGER,
        PROCEDURE_COUNT INTEGER,
        BMI INTEGER,
        AGE_AT_ENCOUNTER,
        GENDER TEXT,
        TOTAL_MED_COST REAL,
        FOREIGN KEY (PATIENT)
            REFERENCES patients (Id),
        FOREIGN KEY (ORGANIZATION)
            REFERENCES organizations (Id),
        FOREIGN KEY (PROVIDER)
            REFERENCES providers (Id),
        FOREIGN KEY (PAYER)
            REFERENCES payers (Id)
        );
        """
    cur = conn.cursor()
    cur.execute(sql_table_encounters)
    cur.commit()

    print("Alle Tabellen (3NF) wurden erfolgreich angelegt!")
except Exception as e:
    print(f"Ein Fehler ist aufgetreten: {e}")
    conn.rollback()

**Load der Daten in synthea.db**

Abschließend werden die Inhalte der DataFrames in über eine Funktion automatisiert in die synthea.db geladen.

Funktionell wichtig ist hierbei die Reihenfolge der Tabellen/DataFrame namen in den Dictionaries DF_DICT und SQL_TABLE_MAPPING, welche an die Schlüsselbedingungen der synthea.db angepasst sind.

Die ETL-Pipeline ist an dieser Stelle abgeschlossen und auf der synthea.db kann in den nächsten Schritten ein ML-Modell trainiert werden.

In [None]:
#Mapping Dictionary zum automatisierten befuellen der synthea.db
SQL_TABLE_MAPPING = {
    "df_patients": "patients",
    "df_organizations": "organizations",
    "df_providers": "providers",
    "df_payers": "payers",
    "df_encounters": "encounters",
    "df_careplans": "careplans",
    "df_conditions": "conditions",
    "df_medications": "medications",
    "df_procedures": "procedures",
    "df_observations": "observations",
    "df_devices": "devices",
    "df_imaging_studies": "imaging_studies",
    "df_immunizations": "immunizations",
    "df_payer_transitions": "payer_transitions",
    "df_disease": "disease",
    "df_supplies": "supplies",
    "df_encounters_final" : "encounters_final"
}
#Faktentabelle in DF_Dict aufnehmen zum befüllen der synthea.db
DF_DICT = {
    "df_patients": df_patients,
    "df_organizations": df_organizations,
    "df_providers": df_providers,
    "df_payers": df_payers,
    "df_encounters": df_encounters,
    "df_careplans": df_careplans,
    "df_conditions": df_conditions,
    "df_medications": df_medications,
    "df_procedures": df_procedures,
    "df_observations": df_observations,
    "df_devices": df_devices,
    "df_imaging_studies": df_imaging_studies,
    "df_immunizations": df_immunizations,
    "df_payer_transitions": df_payer_transitions,
    "df_disease": df_disease,
    "df_supplies": df_supplies,
}
DF_DICT["df_encounters_final"] = df_encounters_final

#Funktion zum Befüllen der synthea.db. CAVE Reihenfolge in DF_DICT und SQL_MAPPING_DICT ist entscheidend!
def load_df_to_sql_db(df_dict=DF_DICT, sql_table_mapping=SQL_TABLE_MAPPING, con=conn):
    for df_name, df in df_dict.items():
        if df_name in sql_table_mapping:
            table_name = sql_table_mapping[df_name]
            try:
                cur = con.cursor()
                df.to_sql(table_name, conn, if_exists="replace", index=True)
                con.commit()
                print(f"DataFrame '{df_name}' wurde in Tabelle '{table_name}' der synthea.db geladen.")
            except Exception as e:
                print(f"Fehler beim laden des DataFrame '{df_name}': {e}")
                con.rollback()
        else:
            print(f"Es konnte keine SQL-Tabelle in synthea.db für den DataFrame '{df_name}' gefunden.")


load_df_to_sql_db()

conn.close()