# Datawarehouse-Datenbank erstellen


---

Datensatz: [Synthea Breast Cancer Dataset](https://github.com/Fuenfgeld/DMA2023TeamA/tree/main/Daten/Quelldaten)

Primär- und Fremdschlüsseldefinitionen: [Synthea GitHub Repository](https://github.com/synthetichealth/synthea/wiki/CSV-File-Data-Dictionary)

Projektgruppe GitHub Repository: [DMA2023TeamA](https://github.com/Fuenfgeld/DMA2023TeamA)

Source-DB: [GoogleDrive Ablage](https://drive.google.com/drive/folders/1k5cfjGXjNHmwQkydzjTdVHoBvCniBU_W), erstellt mit [Setup_and_fill_Database.ipynb](https://github.com/Fuenfgeld/DMA2023TeamA/blob/main/Code/Setup_and_fill_Database.ipynb)

Mithilfe dieses Colab-Books wird aus der Source-Datenbank (= Staging-DB) eine Datawarehouse-Datenbank (DWH-Datenbank) (= Reporting-DB) erstellt, welche als OLAP-Datenbank für Analysen dient. 



*Version*: 1.0

Version Date: 28/01/2023

In [None]:
# Vorsichtshalber: Löschen aller Variablen
%reset -f

# Laden der benötigten Libraries
from google.colab import drive
import sqlite3 as sq
from sqlite3 import Error
import pandas as pd

Mounten des Google Drives, überprüfen des Verzeichnisses, in welchem die Datenbanken liegen.
Außerdem setzen von Pfadangaben, die im späteren Code benötigt werden:


1.   Festlegen des Pfads zur Quelldatenbank (DB_SOURCE)
2.   Festlegen des Pfads zur Datawarehouse-Datenbank (DB_DWH)


In [None]:
# Google Drive mounten, force_remount auf True setzen, damit ein Remount erzwungen wird
drive.mount('/content/gdrive/', force_remount=True)

# Datenbankordner auf dem Shareddrive checken, es müssen source_breast_cancer.db und DWH_breast_cancer.db vorhanden sein
!ls "/content/gdrive/Shareddrives/DMA_Datenprojekt_TeamA/Daten/Datenbank"

# Patiententyp festlegen
patient_type = "breast_cancer"

# Pfad zur Quelldatenbank setzen
DB_SOURCE_PATH = "/content/gdrive/Shareddrives/DMA_Datenprojekt_TeamA/Daten/Datenbank/source_breast_cancer.db"

# Pfad zur DWH-Datenbank setzen
DB_DWH_PATH = "/content/gdrive/Shareddrives/DMA_Datenprojekt_TeamA/Daten/Datenbank/DWH_breast_cancer.db"

# Check
print("\n" + DB_SOURCE_PATH)

print("\n" + DB_DWH_PATH)

# Erstellung der DWH-Datenbank (Reporting-DB)

In [None]:
class DB(object):
  #__init__ Funktion, wird aufgerufen, wenn Klasse initiiert wird
  def __init__(self, db_file):
    # Verbindung zur Datenbank herstellen
    self.conn = sq.connect(db_file)
    # Cursor für Operationen auf die Datenbank herstellen
    self.cur = self.conn.cursor()
    self.__init_db()
  
  # Funktion für den Commit der Änderungen auf die Datenbank und das Schließen der Verbindung
  def __del__(self):
      self.conn.commit()
      self.conn.close()

  # Funktion für die Initialisierung der Datenbank
  def __init_db(self):
    # Im Folgenden werden die SQL-Queries angelegt, mithilfe welcher die Tabellen in der DWH-Datenbank erstellt werden
    # Tabellen mit Tabellen mit Präfix "F" = Faktentabellen, Präfix "D" = Dimensionstabellen


    #  sql query zum Erstellen der D_patients - Tabelle
    create_D_patients = """CREATE TABLE IF NOT EXISTS D_patients (
                           Id VARCHAR PRIMARY KEY,
                           BIRTHDATE DATE,
                           DEATHDATE DATE,
                           SSN VARCHAR,
                           DRIVERS VARCHAR,
                           PASSPORT VARCHAR,
                           PREFIX VARCHAR,
                           FIRST VARCHAR,
                           LAST VARCHAR,
                           SUFFIX VARCHAR,
                           MAIDEN VARCHAR,
                           MARITAL CHAR(1),
                           RACE VARCHAR,
                           ETHNICITY VARCHAR,
                           GENDER CHAR(1),
                           BIRTHPLACE VARCHAR,
                           ADDRESS VARCHAR,
                           CITY VARCHAR,
                           STATE VARCHAR,
                           COUNTRY VARCHAR,
                           ZIP VARCHAR,
                           LAT FLOAT,
                           LON FLOAT,
                           HEALTHCARE_EXPENSES FLOAT,
                           HEALTHCARE_COVERAGE FLOAT);""" 


    #  sql query zum Erstellen der D_conditions - Tabelle
    create_D_conditions = """CREATE TABLE IF NOT EXISTS D_conditions (
                           START DATE,
                           STOP DATE,
                           PATIENT VARCHAR,
                           ENCOUNTER VARCHAR,
                           CODE INTEGER,
                           DESCRIPTION VARCHAR,
                           FOREIGN KEY (PATIENT)
                              REFERENCES patients (Id) 
                           FOREIGN KEY (Encounter)
                              REFERENCES encounters (Id) 
                           );"""
    

    #  sql query zum Erstellen der D_procedures - Tabelle
    create_D_procedures = """CREATE TABLE IF NOT EXISTS D_procedures (
                           DATE DATETIME,
                           PATIENT VARCHAR,
                           ENCOUNTER VARCHAR,
                           CODE INTEGER,
                           DESCRIPTION VARCHAR,
                           BASE_COST FLOAT,
                           REASONCODE VARCHAR,
                           REASONDESCRIPTION VARCHAR,
                           FOREIGN KEY (PATIENT)
                             REFERENCES patients (Id) 
                           FOREIGN KEY (Encounter)
                             REFERENCES encounters (Id) 
                           );"""

    #  sql query zum Erstellen der D_medications - Tabelle
    create_D_medications = """CREATE TABLE IF NOT EXISTS D_medications (
                           START DATETIME,
                           STOP DATETIME,
                           PATIENT VARCHAR,
                           PAYER VARCHAR,
                           ENCOUNTER VARCHAR,
                           CODE VARCHAR,
                           DESCRIPTION VARCHAR,
                           BASE_COST FLOAT,
                           PAYER_COVERAGE FLOAT,
                           DISPENSES INTEGER,
                           TOTAL_COST FLOAT,
                           REASONCODE VARCHAR,
                           REASONDESCRIPTION VARCHAR,
                           FOREIGN KEY (PATIENT)
                             REFERENCES patients (Id) 
                           FOREIGN KEY (PAYER)
                             REFERENCES payers (Id)
                           FOREIGN KEY (ENCOUNTER)
                             REFERENCES encounters (Id) 
                           );"""

    #  sql query zum Erstellen der D_payers - Tabelle
    create_D_payers = """CREATE TABLE IF NOT EXISTS D_payers (
                      Id VARCHAR PRIMARY KEY,
                      NAME VARCHAR,
                      ADDRESS VARCHAR,
                      CITY VARCHAR,
                      STATE_HEADQUATERED VARCHAR,
                      ZIP VARCHAR,
                      PHONE VARCHAR,
                      AMOUNT_COVERED FLOAT,
                      ANOUNT_UNCOVERED FLOAT,
                      REVENUE INTEGER,
                      COVERED_ENCOUNTERS INTEGER,
                      UNCOVERED_ENCOUNTERS INTEGER,
                      COVERED_MEDICATIONS INTEGER,
                      UNCOVERED_MEDIACTIONS INTEGER,
                      COVERED_PROCEDURES INTEGER,
                      UNCOVERED_PROCEDURES INTEGER,
                      COVERED_IMMUNIZATIONS INTEGER,
                      UNCOVERED_IMMUNIZATIONS INTEGER,
                      UNIQUE_CUSTOMERS INTEGER,
                      QOLS_AVG FLOAT,
                      MEMBER_MONTHS INTEGER
                      );"""

    # sql query zum Erstellen der F_encounters - Tabelle
    create_F_encounters = """CREATE TABLE IF NOT EXISTS F_encounters (
                           Id Varchar PRIMARY KEY,
                           START DATETIME,
                           STOP DATETIME,
                           PATIENT VARCHAR,
                           ORGANIZATION VARCHAR,
                           PROVIDER VARCHAR,
                           PAYER VARCHAR,
                           ENCOUNTERCLASS VARCHAR,
                           CODE INTEGER,
                           DESCRIPTION VARCHAR,
                           BASE_ENCOUNTER_COST FLOAT,
                           TOTAL_CLAIM_COST FLOAT,
                           PAYER_COVERAGE FLOAT,
                           REASONCODE INTEGER,
                           REASONDESCRIPTION VARCHAR,
                           FOREIGN KEY (PATIENT)
                             REFERENCES patients (Id)
                           FOREIGN KEY (PAYER)
                             REFERENCES payers (Id)
                           );"""

    # CREATE-TABLE-Statements in eine Liste zusammenfassen
    create_tables = [create_D_patients, # Daten zu Patienten
                     create_D_payers, # Daten zu Versicherungen
                     create_D_procedures, # Daten zu Prozeduren
                     create_D_medications, # Daten zu Medikamentionen
                     create_F_encounters, # Daten zum Behandlungsfall (zentrale Faktentabelle)
                     create_D_conditions # Daten zu Diagnosen
                     ]
     

    if self.conn is not None:
      # self.cur.execute(f"drop table if exists medications_info")
      for query in create_tables:
          # Führ jedes SQL-Statement aus der create_tables Liste aus
          self.cur.execute(query)
    else:
      print('Connection to database failed')



#ETL-Strecke (Extract-Transform-Load) zur Befüllung der DWH-DB

In [None]:
# Klasse um SQL-Queries zu exportieren
class SqlQuery:
  #__init__ Funktion, wird aufgerufen, wenn Klasse initiiert wird

  """ sink_table = Zieltabelle in der DWH-Datenbank DWH_breast_cancer.db
      column_names = Spaltennamen
      source_table = Quelltabelle aus der Quelldatenbank source_breast_cancer.db
  """
  def __init__(self, source_table, column_names, sink_table):
    self.source_table = source_table
    # Anzahl der column_names für loop
    self.column_numbers = len(column_names)
    # Spaltennamen als Liste, kommasepariert
    self.column_names = ', '.join(column_names)
    self.sink_table = sink_table

  # Funktion extract_query gibt ein SQL-SELECT-Statement auf die Spalten der Quelltabelle source_breast_cancer.db zurück
  # Spaltennamen kommen aus der liste "column_names"
  def extract_query(self):
    return 'SELECT ' + self.column_names + ' FROM ' + self.source_table 

  # Funktion load_query gibt ein SQL INSERT-INTO-Statement zurück, welches die Werte aus den Tabellen der Quelldatenbank in die Tabellen der DWH-Datenbank überträgt
  def load_query(self):
    values_str = '?,' * self.column_numbers
    # print("*****", values_str, column_names, column_numbers)
    values_str = values_str[:-1]
    return 'INSERT OR REPLACE INTO ' + self.sink_table + ' VALUES (' + values_str + ')'

    # return 'INSERT INTO ' + self.sink_table + '(' + self.column_names + ') VALUES (' + values_str + ')'
  



In [None]:
# Funktion etl extrahiert die Daten aus der Quelldatenbank und lädt die Daten in die DWH-Datenbank
def etl(query, source_cnx, target_cnx):

  """source_cnx = Verbindung zur Quelldatenbank source_breast_cancer.db
     target_cnx = Verbindung zur DWH-Datenbank DWH_breast_cancer.db
     query = Objekt mit der Klasse "SqlQuery"
  """
  source_cursor = source_cnx.cursor()
  # extract_query() ist Methode aus der Klasse SqlQuery
  source_cursor.execute(query.extract_query())
  data = source_cursor.fetchall()
  source_cursor.close()

  # Wenn Daten in die Variable "data" über fetchall() gezogen wurden, wird die folgende Schleife ausgeführt 
  if data:
    target_cursor = target_cnx.cursor()
    # Multi-Query, .load_query() ist Methode aus der Klasse "SqlQuery"
    target_cursor.executemany(query.load_query(), data)
    print('Daten wurden erfolgreich in die DWH-Datenbank übertragen!') 
    # Änderungen commiten
    target_cnx.commit()
    # Verbindung zur DWH_DB schließen
    target_cursor.close()
  else:
    print('Es wurden keine Daten übertragen.')

# Funktion etl_process durchläuft die Liste der SQL-Queries und ruft jeweils die Funktion etl auf
def etl_process(queries, target_cnx, db_source):
  """ queries: query-Liste mit SQL
      target_cnx: Verbindung zur DWH-Datenbank DWH_breast_cancer.db
      db_source: Pfad zur Quelldatenbank source_breast_cancer.db (String)
  """  
  # Verbindung zur Quelldatenbank herstellen
  try:
    source_cnx = sq.connect(db_source)
  except Error as err:
    print(err)
  
  # Durch alle queries Loopen, die als Liste übergeben wurden
  for query in etl_queue:
    # Funktion etl() aufrufen, welche die Daten aus den Tabellen der Quelldatenbank extrahiert und in die DWH-Datenbank lädt
    etl(query, source_cnx, target_cnx)
    
  # Verbindung zur Quelldatenbank schließen
  source_cnx.close()

In [None]:
# Objekt der Klasse DB erzeugen, als Argument den Pfad zur DWH-Datenbank übergeben
dwh_db = DB(DB_DWH_PATH)

In [None]:
print('ETL-Prozess wird gestartet.')   
# Liste für die SqlQueries anlegen
etl_queue = []


# Im Folgenden werden Objekte mit Klasse SqlQuery erzeugt
""" Übergeben werden:
    Sql_Query(a, b, c)
    a = Quelltabelle aus der Quelldatenbank source_breast_cancer.db
    b = Spaltennamen, welche in das DWH übertragen werden sollen
    c = Zieltabelle in der DWH-Datenbank DWH_breast_cancer.db
"""

# Daten zu Patienten
patients_columns = ['Id', 'BIRTHDATE', 'DEATHDATE', 'SSN', 'DRIVERS', 'PASSPORT', 'PREFIX',
                    'FIRST', 'LAST', 'SUFFIX', 'MAIDEN', 'MARITAL', 'RACE', 'ETHNICITY',
                    'GENDER', 'BIRTHPLACE', 'ADDRESS', 'CITY', 'STATE', 'COUNTRY', 'ZIP',
                    'LAT', 'LON', 'HEALTHCARE_EXPENSES', 'HEALTHCARE_COVERAGE']

sql_query_patients = SqlQuery("patients", patients_columns, "D_patients")
etl_queue.append(sql_query_patients)

# Daten zu Diagnosen
conditions_columns = ['START', 'STOP', 'PATIENT', 'ENCOUNTER', 'CODE', 'DESCRIPTION']

sql_query_conditions = SqlQuery("conditions", conditions_columns, "D_conditions")
etl_queue.append(sql_query_conditions)

# Daten zu Prozeduren
procedures_columns = ['DATE', 'PATIENT', 'ENCOUNTER', 'CODE', 'DESCRIPTION', 'BASE_COST','REASONCODE', 'REASONDESCRIPTION']

sql_query_procedures = SqlQuery("procedures", procedures_columns, "D_procedures")
etl_queue.append(sql_query_procedures)


# Daten zu Versicherungen
payers_columns = ['Id', 'NAME', 'ADDRESS', 'CITY', 'STATE_HEADQUATERED', 'ZIP', 'PHONE', 
                  'AMOUNT_COVERED', 'ANOUNT_UNCOVERED', 'REVENUE', 'COVERED_ENCOUNTERS',
                  'UNCOVERED_ENCOUNTERS', 'COVERED_MEDICATIONS', 'UNCOVERED_MEDIACTIONS',
                  'COVERED_PROCEDURES', 'UNCOVERED_PROCEDURES', 'COVERED_IMMUNIZATIONS',
                  'UNCOVERED_IMMUNIZATIONS', 'UNIQUE_CUSTOMERS', 'QOLS_AVG', 'MEMBER_MONTHS']

sql_query_payers = SqlQuery("payers", payers_columns, "D_payers")
etl_queue.append(sql_query_payers)

# Daten zu Medikamentationen
medications_columns = ['START', 'STOP', 'PATIENT', 'PAYER', 'ENCOUNTER', 'CODE', 'DESCRIPTION',
                       'BASE_COST', 'PAYER_COVERAGE', 'DISPENSES', 'TOTAL_COST', 'REASONCODE',
                       'REASONDESCRIPTION']

sql_query_medications = SqlQuery("medications", medications_columns, "D_medications")
etl_queue.append(sql_query_medications)

# Daten zu Behandlungsfällen
encounters_columns = ['Id', 'START', 'STOP', 'PATIENT', 'ORGANIZATION', 'PROVIDER', 'PAYER',
                      'ENCOUNTERCLASS', 'CODE', 'DESCRIPTION', 'BASE_ENCOUNTER_COST', 'TOTAL_CLAIM_COST',
                      'PAYER_COVERAGE', 'REASONCODE', 'REASONDESCRIPTION']

sql_query_encounters = SqlQuery("encounters", encounters_columns, "F_encounters")
etl_queue.append(sql_query_encounters)                     

# list for iteration
# etl_queue

In [None]:
# Verbindung zur DWH-Datenbank herstellen
target_cnx = dwh_db.conn
etl_process(etl_queue, target_cnx, DB_SOURCE_PATH)

In [None]:
# Änderungen Committen
target_cnx.commit()

In [None]:
# Erstellte Tabellen checken
dwh_cursor = target_cnx.cursor()
dwh_cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
print(dwh_cursor.fetchall())


In [None]:
# Spalten einer Tabelle stichprobenartig checken
dwh_cursor.execute('PRAGMA table_info(' + "F_encounters" + ');')
dwh_cursor.fetchall()

In [None]:
# Testen einer Selektion
dwh_cursor.execute("SELECT Id, BIRTHDATE, FIRST, LAST, MARITAL, GENDER from D_patients")
rows = dwh_cursor.fetchall()
for row in rows[:10]:
  print(row)

In [None]:
# Verbindung schließen
target_cnx.close()