# **Objective**: Create a Datawarehouse and transform data from source database to datawarehouse db



# Importing libraries

In [1]:
import sqlite3
from sqlite3 import Error

In [6]:
#from google.colab import drive
# mount drive to access database
#drive.mount("/content/drive")

from google.colab import drive
drive.mount('/content/drive/')

Mounted at /content/drive/


# Path of input/output data


In [7]:
material_path = "/content/drive/MyDrive/WS2324_Kurs5"

Uncomment one type of patient that you would like to create for it datawarehouse database and  transfer tables from source db to it.

In [8]:
## type of patients
#patient_type = "allergy"
#patient_type = "asthma"
patient_type = "breast_cancer"
#patient_type = "metabolic_syndrome_disease"
#patient_type = "colorectal_cancer"
#patient_type = "covid19"
#patient_type = "dermatitis"
#patient_type = "lung_cancer"

In [9]:
# path of source database
DB_SOURCE_PATH = f"/content/drive/MyDrive/source_dbs/source_breast_cancer_test.db"
DB_SOURCE_PATH

'/content/drive/MyDrive/source_dbs/source_breast_cancer_test.db'

In [10]:
# path of datawarehouse
DB_DWH_PATH = f"/content/drive/MyDrive/source_dbs/gruppeb_test.db"


# Create Datawarehouse

In [None]:

class DB(object):
  def __init__(self, db_file):
    self.conn = sqlite3.connect(db_file)
    self.cur = self.conn.cursor()
    self.__init_db()

  def __del__(self):
      self.conn.commit()
      self.conn.close()

  def __init_db(self):
    # sql queries to create tables in Datawarehouse

    #  sql query to create patients_info table
    create_patients_info = """CREATE TABLE IF NOT EXISTS patients_info (
                           Id STRING PRIMARY KEY,
                           BIRTHDATE DATE,
                           DEATHDATE DATE,
                           SSN STRING,
                           DRIVERS STRING,
                           PASSPORT STRING,
                           PREFIX STRING,
                           FIRST STRING,
                           LAST STRING,
                           SUFFIX STRING,
                           MAIDEN STRING,
                           MARITAL STRING,
                           RACE STRING,
                           ETHNICITY STRING,
                           GENDER STRING,
                           BIRTHPLACE STRING,
                           ADDRESS STRING,
                           CITY STRING,
                           STATE STRING,
                           COUNTRY STRING,
                           ZIP STRING,
                           LAT INTEGER,
                           LON INTEGER,
                           HEALTHCARE_EXPENSES INTEGER,
                           HEALTHCARE_COVERAGE INTEGER
                       );"""

    #  sql query to create disease table
    create_disease_info = """CREATE TABLE IF NOT EXISTS disease_info (
                           START DATE,
                           STOP DATE,
                           PATIENT STRING,
                           ENCOUNTER STRING,
                           CODE STRING,
                           DESCRIPTION STRING,
                           FOREIGN KEY (PATIENT)
                              REFERENCES patients_info (Id)
                           FOREIGN KEY (Encounter)
                              REFERENCES encounters (Id)

                       );"""


    # sql query to create condition table
    create_conditions_info = """CREATE TABLE IF NOT EXISTS conditions_info (
                           START DATE,
                           STOP DATE,
                           PATIENT STRING,
                           ENCOUNTER STRING,
                           CODE STRING,
                           DESCRIPTION STRING,
                           FOREIGN KEY (PATIENT)
                              REFERENCES patients_info (Id)
                           FOREIGN KEY (Encounter)
                              REFERENCES encounters (Id)

                       );"""

    # sql query to create careplans table
    create_careplans_info = """CREATE TABLE IF NOT EXISTS careplans_info (
                           Id STRING PRIMARY KEY,
                           START DATE,
                           STOP DATE,
                           PATIENT STRING,
                           ENCOUNTER STRING,
                           CODE STRING,
                           DESCRIPTION STRING,
                           REASONCODE STRING,
                           REASONDESCRIPTION STRING,
                           FOREIGN KEY (PATIENT)
                              REFERENCES patients_info (Id)
                           FOREIGN KEY (Encounter)
                              REFERENCES encounters (Id)

                       );"""

    # sql query to create procedures table
    create_procedures_info = """CREATE TABLE IF NOT EXISTS procedures_info (
                           DATE DATE,
                           PATIENT STRING,
                           ENCOUNTER STRING,
                           CODE STRING,
                           DESCRIPTION STRING,
                           BASE_COST INTEGER,
                           REASONCODE STRING,
                           REASONDESCRIPTION STRING,
                           FOREIGN KEY (PATIENT)
                              REFERENCES patients_info (Id)
                       );"""

    # sql query to create medications_info table
    create_d_Medication = """CREATE TABLE IF NOT EXISTS d_Medication (
                           START DATE,
                           STOP DATE,
                           PATIENT STRING,
                           PAYER STRING,
                           ENCOUNTER STRING,
                           CODE STRING,
                           DESCRIPTION STRING,
                           BASE_COST INTEGER,
                           PAYER_COVERAGE INTEGER,
                           DISPENSES INTEGER,
                           TOTALCOST INTEGER,
                           REASONCODE STRING,
                           REASONDESCRIPTION STRING,
                           FOREIGN KEY (PATIENT)
                              REFERENCES patients_info (Id)

                       );"""

    # sql query to create observations table
    create_observations_info = """CREATE TABLE IF NOT EXISTS observations_info (
                           DATE DATE,
                           PATIENT STRING,
                           ENCOUNTER STRING,
                           CODE STRING,
                           DESCRIPTION STRING,
                           VALUE STRING,
                           UNITS STRING,
                           TYPE STRING,
                           FOREIGN KEY (PATIENT)
                              REFERENCES patients_info (Id)
                       );"""






    create_tables = [create_patients_info, # demographic data
                     create_disease_info, create_conditions_info, # diagnoses data
                     create_careplans_info, create_procedures_info, create_medications_info, # procedures data
                     create_observations_info # lab values data
                     ]


    if self.conn is not None:
      # self.cur.execute(f"drop table if exists medications_info")
      for query in create_tables:
          self.cur.execute(query)
    else:
      print('Connection to database failed')



#ETL/ELT (Extract, transform, load )

In [12]:
# exporting queries
# Die Klasse SQLQuery erstellt ein Objekt, mit dem aus einer Quelltabelle bestimmte Spalten in eine Zieltabelle exportiert werden. 
class SqlQuery:
  def __init__(self, source_table, column_names, sink_table):
    self.source_table = source_table
    self.column_numbers = len(column_names)
    self.column_names = ', '.join(column_names)
    self.sink_table = sink_table

  def extract_query(self):
    return 'SELECT ' + self.column_names + ' FROM ' + self.source_table


#Eine Funktion für ein prepared sql statement, das den dynamischen Parameter ? als Platzhalter benutzt, um die gemäß der 
# Funktion extract_query aus der Quelltabelle extrahierten Daten mittels SQL-Befehl in die neue Tabelle sink_table zu laden. 
# Je mehr Spalten man "herüberschieben" möchte, desto mehr Spaltenplatzhalter braucht man. Dafür ist '?,' * self.column_numbers". 
  def load_query(self):
    values_str = '?,' * self.column_numbers
    # print("*****", values_str, column_names, column_numbers)
    values_str = values_str[:-1]
    return 'INSERT OR REPLACE INTO ' + self.sink_table + ' VALUES (' + values_str + ')'

    # return 'INSERT INTO ' + self.sink_table + '(' + self.column_names + ') VALUES (' + values_str + ')'




In [None]:
#query muss ein Objekt der SQLQuery-Klasse sein. Erst dann können die Funktionen extract_query und load_query angewendet werden. 
def etl(query, source_cnx, target_cnx):
  # extract data from source db
  source_cursor = source_cnx.cursor()
  source_cursor.execute(query.extract_query())
  data = source_cursor.fetchall()
  source_cursor.close()

  # load data into warehouse db
  if data:
    target_cursor = target_cnx.cursor()
    target_cursor.executemany(query.load_query(), data)
    print('data loaded to warehouse db')
    target_cnx.commit()
    target_cursor.close()
  else:
    print('data is empty')


def etl_process(queries, target_cnx, db_source):
  """
  queries: list
        a list of queries
  target_cnx: SQLite connection
  db_source: str
        path of source database

  """
  # establish source db connection
  try:
    source_cnx = sqlite3.connect(db_source)
  except Error as err:
    print(err)

  # loop through sql queries
  # Diese Zeile muss for query in queries: lauten. etl_queue wurde noch nicht erstellt Funktionen nutzen lokale Variablen. 
  for query in etl_queue:
    etl(query, source_cnx, target_cnx)

  # close the source db connection
  source_cnx.close()

In [14]:
# create Datawarehouse
# Anhand der ganz oben beschriebenen Klasse DB() wird ein Objekt mit dem Namen dwh_db generiert. Unter dwh_db sind 
# nun alle leeren _info-Tabellen, die mittels SQL-Befehl erstellt wurden, gespeichert. 
dwh_db = DB(DB_DWH_PATH)

In [15]:
print('starting etl')

# list for iteration
etl_queue = []

# demographic table
# Eine Liste von Spaltennamen einer Quelltabelle, welche die SQLQuery-Klasse benötigt.  
patients_columns = ['Id', 'BIRTHDATE', 'DEATHDATE', 'SSN', 'DRIVERS', 'PASSPORT', 'PREFIX',
                    'FIRST', 'LAST', 'SUFFIX', 'MAIDEN', 'MARITAL', 'RACE', 'ETHNICITY',
                    'GENDER', 'BIRTHPLACE', 'ADDRESS', 'CITY', 'STATE', 'COUNTRY', 'ZIP',
                    'LAT', 'LON', 'HEALTHCARE_EXPENSES', 'HEALTHCARE_COVERAGE']
# Das Objekt sql_query_patients wird erstellt gemäß Schema (source_table, column_names, sink_table), 
# wie es die SQLQuery-KLasse verlangt
sql_query_patients = SqlQuery("patients", patients_columns, "patients_info")
etl_queue.append(sql_query_patients)

# diagnoses tables
disease_columns = ['START', 'STOP', 'PATIENT', 'ENCOUNTER', 'CODE', 'DESCRIPTION']
sql_query_disease = SqlQuery("disease", disease_columns, "disease_info")
etl_queue.append(sql_query_disease)

conditions_columns = ['START', 'STOP', 'PATIENT', 'ENCOUNTER', 'CODE', 'DESCRIPTION']
sql_query_conditions = SqlQuery("conditions", conditions_columns, "conditions_info")
etl_queue.append(sql_query_conditions)

# procedures tables
procedures_columns = ['DATE', 'PATIENT', 'ENCOUNTER', 'CODE', 'DESCRIPTION', 'BASE_COST','REASONCODE', 'REASONDESCRIPTION']
sql_query_procedures = SqlQuery("procedures", procedures_columns, "procedures_info")
etl_queue.append(sql_query_procedures)

careplans_columns = ['Id', 'START', 'STOP', 'PATIENT', 'ENCOUNTER', 'CODE', 'DESCRIPTION', 'REASONCODE', 'REASONDESCRIPTION']
sql_query_careplans = SqlQuery("careplans", careplans_columns, "careplans_info")
etl_queue.append(sql_query_careplans)

medications_columns = ['START', 'STOP', 'PATIENT', 'PAYER', 'ENCOUNTER', 'CODE', 'DESCRIPTION',
                       'BASE_COST', 'PAYER_COVERAGE', 'DISPENSES', 'TOTALCOST', 'REASONCODE',
                       'REASONDESCRIPTION']
sql_query_medications = SqlQuery("medications", medications_columns, "medications_info")
etl_queue.append(sql_query_medications)

# lab values tables
observations_columns = ['DATE', 'PATIENT', 'ENCOUNTER', 'CODE', 'DESCRIPTION', 'VALUE', 'UNITS','TYPE']
sql_query_observations = SqlQuery("observations", observations_columns, "observations_info")
etl_queue.append(sql_query_observations)

# Die etl_queue ist eine Liste, der alle oben erstellten SQLQuery-Objekte angehängt sind. 
# list for iteration
# etl_queue

starting etl


In [16]:
# establish connection for target database (sql-server)
target_cnx = dwh_db.conn
etl_process(etl_queue, target_cnx, DB_SOURCE_PATH)

data loaded to warehouse db
data is empty
data loaded to warehouse db
data loaded to warehouse db
data is empty
data loaded to warehouse db
data loaded to warehouse db
data loaded to warehouse db
data is empty


In [17]:
target_cnx.commit()

In [None]:
# target_cnx.close()

In [18]:
# # check list of tables
dwh_cursor = target_cnx.cursor()
# dwh_cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
# print(dwh_cursor.fetchall())


In [19]:
# # check columns in a table
dwh_cursor.execute('PRAGMA table_info(' + "patients_info" + ');')
dwh_cursor.fetchall()

[(0, 'Id', 'STRING', 0, None, 1),
 (1, 'BIRTHDATE', 'DATE', 0, None, 0),
 (2, 'DEATHDATE', 'DATE', 0, None, 0),
 (3, 'SSN', 'STRING', 0, None, 0),
 (4, 'DRIVERS', 'STRING', 0, None, 0),
 (5, 'PASSPORT', 'STRING', 0, None, 0),
 (6, 'PREFIX', 'STRING', 0, None, 0),
 (7, 'FIRST', 'STRING', 0, None, 0),
 (8, 'LAST', 'STRING', 0, None, 0),
 (9, 'SUFFIX', 'STRING', 0, None, 0),
 (10, 'MAIDEN', 'STRING', 0, None, 0),
 (11, 'MARITAL', 'STRING', 0, None, 0),
 (12, 'RACE', 'STRING', 0, None, 0),
 (13, 'ETHNICITY', 'STRING', 0, None, 0),
 (14, 'GENDER', 'STRING', 0, None, 0),
 (15, 'BIRTHPLACE', 'STRING', 0, None, 0),
 (16, 'ADDRESS', 'STRING', 0, None, 0),
 (17, 'CITY', 'STRING', 0, None, 0),
 (18, 'STATE', 'STRING', 0, None, 0),
 (19, 'COUNTRY', 'STRING', 0, None, 0),
 (20, 'ZIP', 'STRING', 0, None, 0),
 (21, 'LAT', 'INTEGER', 0, None, 0),
 (22, 'LON', 'INTEGER', 0, None, 0),
 (23, 'HEALTHCARE_EXPENSES', 'INTEGER', 0, None, 0),
 (24, 'HEALTHCARE_COVERAGE', 'INTEGER', 0, None, 0)]

In [20]:
dwh_cursor.execute("SELECT Id, BIRTHDATE, FIRST, LAST, MARITAL, GENDER from patients_info")
rows = dwh_cursor.fetchall()
for row in rows[:10]:
  print(row)

('d2061cc7-bee0-0e6c-3ac4-15c197c474e0', '1956-06-22', 'Lucio648', 'Simonis280', 'M', 'M')
('073d8e80-ff90-1c8d-57e4-29bfca52c87f', '1964-08-28', 'Buffy238', 'Wolf938', 'M', 'F')
('e1ff7e68-4097-9faf-514d-e4cfcfdf252e', '1998-08-28', 'Debora709', 'Klocko335', '', 'F')
('a0f679cc-875f-dd72-ed13-9ca863ec6cf3', '2004-01-20', 'Walton167', 'Kessler503', '', 'M')
('e4166a9c-f7c5-bef7-ea43-b96281a2d586', '2006-03-23', 'Mariano761', 'Wiegand701', '', 'M')
('ee537126-7509-05de-2349-ab19604545d8', '1991-05-21', 'Britta584', 'Ziemann98', 'M', 'F')
('826d5ce3-2b23-8bd8-b352-a77fdd083d68', '1964-07-20', 'Arron144', 'Botsford977', 'M', 'M')
('0edfef0f-fa74-ab35-1122-4c537bb9765d', '1997-05-05', 'Edgardo196', 'Morar593', '', 'M')
('0fe7e161-0f29-2b70-8e3d-757b15026295', '2008-12-26', 'Fred155', 'Leuschke194', '', 'M')
('478e120c-67a4-4375-646f-c41e74f72478', '1971-01-02', 'Frances376', 'Zulauf375', 'S', 'M')


In [None]:
dwh_cursor.execute("SELECT PATIENT,CODE, DESCRIPTION STRING, BASE_COST from procedures_info")
rows = dwh_cursor.fetchall()
for row in rows[:10]:
  print(row)

('b4001499-a15a-980e-e3bb-e73b63045411', 180325003, 'Electrical cardioversion', 28061.2)
('b4001499-a15a-980e-e3bb-e73b63045411', 180325003, 'Electrical cardioversion', 27985.19)
('b4001499-a15a-980e-e3bb-e73b63045411', 180325003, 'Electrical cardioversion', 20897.09)
('b4001499-a15a-980e-e3bb-e73b63045411', 180325003, 'Electrical cardioversion', 33252.79)
('b4001499-a15a-980e-e3bb-e73b63045411', 180325003, 'Electrical cardioversion', 42511.55)
('b4001499-a15a-980e-e3bb-e73b63045411', 180325003, 'Electrical cardioversion', 25637.28)
('b4001499-a15a-980e-e3bb-e73b63045411', 18286008, 'Catheter ablation of tissue of heart', 9522.56)
('b4001499-a15a-980e-e3bb-e73b63045411', 180325003, 'Electrical cardioversion', 42511.75)
('b4001499-a15a-980e-e3bb-e73b63045411', 180325003, 'Electrical cardioversion', 22743.91)
('b4001499-a15a-980e-e3bb-e73b63045411', 180325003, 'Electrical cardioversion', 38693.95)


# Links:

https://pynative.com/python-sqlite/

https://www.vertabelo.com/blog/using-python-and-mysql-in-the-etl-process/

https://github.com/iamaziz/etl/blob/master/pipeline.py


https://medium.com/datadriveninvestor/complete-data-analytics-solution-using-etl-pipeline-in-python-edd6580de24b