<a href="https://colab.research.google.com/github/Fuenfgeld/DatamanagementAndArchiving/blob/main/DataWarehousePrototype/ETLprototype.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Objective**: Create a Datawarehouse and transform data from source database to datawarehouse db



# Importing libraries

In [None]:
import sqlite3
from sqlite3 import Error

In [None]:
from google.colab import drive
# mount drive to access database
drive.mount("/content/drive")

Drive already mounted at /content/drive; to attempt to forcibly remount, call drive.mount("/content/drive", force_remount=True).


# Path of input/output data 


In [None]:
material_path = "/content/drive/Shareddrives/05_BIDS_Datenmanagement & Archivierung im Umfeld der Forschung/Material"

Uncomment one type of patient that you would like to create for it datawarehouse database and  transfer tables from source db to it.

In [None]:
## type of patients
patient_type = "allergy"
# patient_type = "asthma"
# patient_type = "breast_cancer"
# patient_type = "metabolic_syndrome_disease"
# patient_type = "colorectal_cancer"
# patient_type = "covid19"
# patient_type = "dermatitis"
# patient_type = "lung_cancer"

In [None]:
# path of source database
DB_SOURCE_PATH = f"{material_path}/source_dbs/source_{patient_type}_test.db"
DB_SOURCE_PATH

'/content/drive/Shareddrives/05_BIDS_Datenmanagement & Archivierung im Umfeld der Forschung/Material/source_dbs/source_allergy_test.db'

In [None]:
# path of datawarehouse
DB_DWH_PATH = f"{material_path}/DWH_dbs/DWH_{patient_type}_test.db"


# Create Datawarehouse 

In [None]:
class DB(object):
  def __init__(self, db_file):
    self.conn = sqlite3.connect(db_file)
    self.cur = self.conn.cursor()
    self.__init_db()
  
  def __del__(self):
      self.conn.commit()
      self.conn.close()

  def __init_db(self):
    # sql queries to create tables in Datawarehouse 

    #  sql query to create patients_info table
    create_patients_info = """CREATE TABLE IF NOT EXISTS patients_info (
                           Id STRING PRIMARY KEY,
                           BIRTHDATE DATE,
                           DEATHDATE DATE,
                           SSN STRING,
                           DRIVERS STRING,
                           PASSPORT STRING,
                           PREFIX STRING,
                           FIRST STRING,
                           LAST STRING,
                           SUFFIX STRING,
                           MAIDEN STRING,
                           MARITAL STRING,
                           RACE STRING,
                           ETHNICITY STRING,
                           GENDER STRING,
                           BIRTHPLACE STRING,
                           ADDRESS STRING,
                           CITY STRING,
                           STATE STRING,
                           COUNTRY STRING,
                           ZIP STRING,
                           LAT INTEGER,
                           LON INTEGER,
                           HEALTHCARE_EXPENSES INTEGER,
                           HEALTHCARE_COVERAGE INTEGER
                       );"""

    #  sql query to create disease table
    create_disease_info = """CREATE TABLE IF NOT EXISTS disease_info (
                           START DATE,
                           STOP DATE,
                           PATIENT STRING,
                           ENCOUNTER STRING,
                           CODE STRING,
                           DESCRIPTION STRING,
                           FOREIGN KEY (PATIENT)
                              REFERENCES patients_info (Id) 
                           FOREIGN KEY (Encounter)
                              REFERENCES encounters (Id) 

                       );"""      


    # sql query to create condition table
    create_conditions_info = """CREATE TABLE IF NOT EXISTS conditions_info (
                           START DATE,
                           STOP DATE,
                           PATIENT STRING,
                           ENCOUNTER STRING,
                           CODE STRING,
                           DESCRIPTION STRING,
                           FOREIGN KEY (PATIENT)
                              REFERENCES patients_info (Id) 
                           FOREIGN KEY (Encounter)
                              REFERENCES encounters (Id) 

                       );"""

    # sql query to create careplans table
    create_careplans_info = """CREATE TABLE IF NOT EXISTS careplans_info (
                           Id STRING PRIMARY KEY,
                           START DATE,
                           STOP DATE,
                           PATIENT STRING,
                           ENCOUNTER STRING,
                           CODE STRING,
                           DESCRIPTION STRING,
                           REASONCODE STRING,
                           REASONDESCRIPTION STRING,
                           FOREIGN KEY (PATIENT)
                              REFERENCES patients_info (Id) 
                           FOREIGN KEY (Encounter)
                              REFERENCES encounters (Id) 

                       );"""     

    # sql query to create procedures table
    create_procedures_info = """CREATE TABLE IF NOT EXISTS procedures_info (
                           DATE DATE,
                           PATIENT STRING,
                           ENCOUNTER STRING,
                           CODE STRING,
                           DESCRIPTION STRING,
                           BASE_COST INTEGER,
                           REASONCODE STRING,
                           REASONDESCRIPTION STRING,
                           FOREIGN KEY (PATIENT)
                              REFERENCES patients_info (Id) 
                       );"""

    # sql query to create medications_info table
    create_medications_info = """CREATE TABLE IF NOT EXISTS medications_info (
                           START DATE,
                           STOP DATE,
                           PATIENT STRING,
                           PAYER STRING,
                           ENCOUNTER STRING,
                           CODE STRING,
                           DESCRIPTION STRING,
                           BASE_COST INTEGER,
                           PAYER_COVERAGE INTEGER,
                           DISPENSES INTEGER,
                           TOTALCOST INTEGER,
                           REASONCODE STRING,
                           REASONDESCRIPTION STRING,
                           FOREIGN KEY (PATIENT)
                              REFERENCES patients_info (Id) 
                           
                       );"""

    # sql query to create observations table
    create_observations_info = """CREATE TABLE IF NOT EXISTS observations_info (
                           DATE DATE,
                           PATIENT STRING,
                           ENCOUNTER STRING,
                           CODE STRING,
                           DESCRIPTION STRING,
                           VALUE STRING,
                           UNITS STRING,
                           TYPE STRING,
                           FOREIGN KEY (PATIENT)
                              REFERENCES patients_info (Id)
                       );"""


    # sql query to create devices table
    create_devices_info = """CREATE TABLE IF NOT EXISTS devices_info (
                           START DATE,
                           STOP DATE,
                           PATIENT STRING,
                           ENCOUNTER STRING,
                           CODE STRING,
                           DESCRIPTION STRING,
                           UDI STRING,
                           FOREIGN KEY (PATIENT)
                              REFERENCES patients_info (Id)                 
                       );"""
   
   # sql query to create Imaging_studies table
    create_imaging_studies_info = """CREATE TABLE IF NOT EXISTS imaging_studies_info (
                           Id STRING PRIMARY KEY,
                           DATE DATE,
                           PATIENT STRING,
                           ENCOUNTER STRING,
                           BODYSITE_CODE STRING,
                           BODYSITE_DESCRIPTION STRING,
                           MODALITY_CODE STRING,
                           MODALITY_DESCRIPTION STRING,
                           SOP_CODE STRING,
                           SOP_DESCRIPTION STRING,
                           FOREIGN KEY (PATIENT)
                              REFERENCES patients_info (Id)                            
                       );"""

    create_tables = [create_patients_info, # demographic data
                     create_disease_info, create_conditions_info, # diagnoses data
                     create_careplans_info, create_procedures_info, create_medications_info, # procedures data
                     create_observations_info, create_devices_info, create_imaging_studies_info # lab values data
                     ]
     

    if self.conn is not None:
      # self.cur.execute(f"drop table if exists medications_info")
      for query in create_tables:
          self.cur.execute(query)
    else:
      print('Connection to database failed')



#ETL/ELT (Extract, transform, load )

In [None]:
# exporting queries
class SqlQuery:
  def __init__(self, source_table, column_names, sink_table):
    self.source_table = source_table
    self.column_numbers = len(column_names)
    self.column_names = ', '.join(column_names)
    self.sink_table = sink_table

  def extract_query(self):
    return 'SELECT ' + self.column_names + ' FROM ' + self.source_table 

  def load_query(self):
    values_str = '?,' * self.column_numbers
    # print("*****", values_str, column_names, column_numbers)
    values_str = values_str[:-1]
    return 'INSERT OR REPLACE INTO ' + self.sink_table + ' VALUES (' + values_str + ')'

    # return 'INSERT INTO ' + self.sink_table + '(' + self.column_names + ') VALUES (' + values_str + ')'
  



In [None]:
def etl(query, source_cnx, target_cnx):
  # extract data from source db
  source_cursor = source_cnx.cursor()
  source_cursor.execute(query.extract_query())
  data = source_cursor.fetchall()
  source_cursor.close()

  # load data into warehouse db
  if data:
    target_cursor = target_cnx.cursor()
    target_cursor.executemany(query.load_query(), data)
    print('data loaded to warehouse db') 
    target_cnx.commit()
    target_cursor.close()
  else:
    print('data is empty')


def etl_process(queries, target_cnx, db_source):
  """
  queries: list
        a list of queries
  target_cnx: SQLite connection
  db_source: str
        path of source database      
  
  """  
  # establish source db connection
  try:
    source_cnx = sqlite3.connect(db_source)
  except Error as err:
    print(err)
  
  # loop through sql queries
  for query in etl_queue:
    etl(query, source_cnx, target_cnx)
    
  # close the source db connection
  source_cnx.close()

In [None]:
# create Datawarehouse
dwh_db = DB(DB_DWH_PATH)

In [None]:
print('starting etl')   
# list for iteration
etl_queue = []

# demographic table
patients_columns = ['Id', 'BIRTHDATE', 'DEATHDATE', 'SSN', 'DRIVERS', 'PASSPORT', 'PREFIX',
                    'FIRST', 'LAST', 'SUFFIX', 'MAIDEN', 'MARITAL', 'RACE', 'ETHNICITY',
                    'GENDER', 'BIRTHPLACE', 'ADDRESS', 'CITY', 'STATE', 'COUNTRY', 'ZIP',
                    'LAT', 'LON', 'HEALTHCARE_EXPENSES', 'HEALTHCARE_COVERAGE']
sql_query_patients = SqlQuery("patients", patients_columns, "patients_info")
etl_queue.append(sql_query_patients)

# diagnoses tables
disease_columns = ['START', 'STOP', 'PATIENT', 'ENCOUNTER', 'CODE', 'DESCRIPTION']
sql_query_disease = SqlQuery("disease", disease_columns, "disease_info")
etl_queue.append(sql_query_disease)

conditions_columns = ['START', 'STOP', 'PATIENT', 'ENCOUNTER', 'CODE', 'DESCRIPTION']
sql_query_conditions = SqlQuery("conditions", conditions_columns, "conditions_info")
etl_queue.append(sql_query_conditions)

# procedures tables
procedures_columns = ['DATE', 'PATIENT', 'ENCOUNTER', 'CODE', 'DESCRIPTION', 'BASE_COST','REASONCODE', 'REASONDESCRIPTION']
sql_query_procedures = SqlQuery("procedures", procedures_columns, "procedures_info")
etl_queue.append(sql_query_procedures)

careplans_columns = ['Id', 'START', 'STOP', 'PATIENT', 'ENCOUNTER', 'CODE', 'DESCRIPTION', 'REASONCODE', 'REASONDESCRIPTION']
sql_query_careplans = SqlQuery("careplans", careplans_columns, "careplans_info")
etl_queue.append(sql_query_careplans)

medications_columns = ['START', 'STOP', 'PATIENT', 'PAYER', 'ENCOUNTER', 'CODE', 'DESCRIPTION',
                       'BASE_COST', 'PAYER_COVERAGE', 'DISPENSES', 'TOTALCOST', 'REASONCODE',
                       'REASONDESCRIPTION']
sql_query_medications = SqlQuery("medications", medications_columns, "medications_info")
etl_queue.append(sql_query_medications)

# lab values tables
observations_columns = ['DATE', 'PATIENT', 'ENCOUNTER', 'CODE', 'DESCRIPTION', 'VALUE', 'UNITS','TYPE']
sql_query_observations = SqlQuery("observations", observations_columns, "observations_info")
etl_queue.append(sql_query_observations)

devices_columns = ['START', 'STOP', 'PATIENT', 'ENCOUNTER', 'CODE', 'DESCRIPTION', 'UDI']
sql_query_devices = SqlQuery("devices", devices_columns, "devices_info")
etl_queue.append(sql_query_devices)

imaging_studies_columns = ['Id', 'DATE', 'PATIENT', 'ENCOUNTER', 'BODYSITE_CODE',
                           'BODYSITE_DESCRIPTION', 'MODALITY_CODE', 'MODALITY_DESCRIPTION',
                           'SOP_CODE', 'SOP_DESCRIPTION']
sql_query_imaging_studies = SqlQuery("imaging_studies", imaging_studies_columns, "imaging_studies_info")
etl_queue.append(sql_query_imaging_studies)

# list for iteration
# etl_queue

starting etl


In [None]:
# establish connection for target database (sql-server)
target_cnx = dwh_db.conn
etl_process(etl_queue, target_cnx, DB_SOURCE_PATH)

data loaded to warehouse db
data loaded to warehouse db
data loaded to warehouse db
data loaded to warehouse db
data loaded to warehouse db
data loaded to warehouse db
data loaded to warehouse db
data loaded to warehouse db
data loaded to warehouse db


In [None]:
target_cnx.commit()

In [None]:
# target_cnx.close()

In [None]:
# # check list of tables
dwh_cursor = target_cnx.cursor()
# dwh_cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
# print(dwh_cursor.fetchall())


In [None]:
# # check columns in a table
dwh_cursor.execute('PRAGMA table_info(' + "patients_info" + ');')
dwh_cursor.fetchall()

[(0, 'Id', 'STRING', 0, None, 1),
 (1, 'BIRTHDATE', 'DATE', 0, None, 0),
 (2, 'DEATHDATE', 'DATE', 0, None, 0),
 (3, 'SSN', 'STRING', 0, None, 0),
 (4, 'DRIVERS', 'STRING', 0, None, 0),
 (5, 'PASSPORT', 'STRING', 0, None, 0),
 (6, 'PREFIX', 'STRING', 0, None, 0),
 (7, 'FIRST', 'STRING', 0, None, 0),
 (8, 'LAST', 'STRING', 0, None, 0),
 (9, 'SUFFIX', 'STRING', 0, None, 0),
 (10, 'MAIDEN', 'STRING', 0, None, 0),
 (11, 'MARITAL', 'STRING', 0, None, 0),
 (12, 'RACE', 'STRING', 0, None, 0),
 (13, 'ETHNICITY', 'STRING', 0, None, 0),
 (14, 'GENDER', 'STRING', 0, None, 0),
 (15, 'BIRTHPLACE', 'STRING', 0, None, 0),
 (16, 'ADDRESS', 'STRING', 0, None, 0),
 (17, 'CITY', 'STRING', 0, None, 0),
 (18, 'STATE', 'STRING', 0, None, 0),
 (19, 'COUNTRY', 'STRING', 0, None, 0),
 (20, 'ZIP', 'STRING', 0, None, 0),
 (21, 'LAT', 'INTEGER', 0, None, 0),
 (22, 'LON', 'INTEGER', 0, None, 0),
 (23, 'HEALTHCARE_EXPENSES', 'INTEGER', 0, None, 0),
 (24, 'HEALTHCARE_COVERAGE', 'INTEGER', 0, None, 0)]

In [None]:
dwh_cursor.execute("SELECT Id, BIRTHDATE, FIRST, LAST, MARITAL, GENDER from patients_info")
rows = dwh_cursor.fetchall()
for row in rows[:10]:
  print(row)

('27b0d72c-f2fb-7e25-38c0-7d5120ebbedf', '1945-04-17', 'Emmie273', 'Schoen8', 'M', 'F')
('17f0c6d9-8931-8839-66cb-3ca6fb066d3e', '1959-05-31', 'Karl184', 'Stroman228', 'M', 'M')
('aff157cc-b6d3-412b-ccbe-bfd5fac1c2d5', '1991-11-27', 'Monty345', 'Krajcik437', 'M', 'M')
('e92765c7-1c4b-9ee7-dbc8-5300fcb40a54', '2001-11-18', 'Hugo693', 'Valencia279', '', 'M')
('3575b903-dbd0-1d55-6146-9e8aa4ed52a5', '1941-12-22', 'Hayley136', 'Schamberger479', 'M', 'F')
('9bbbcada-7a45-92f0-6ae6-d197bcefc0d4', '1951-12-14', 'Jamison785', 'Marvin195', 'M', 'M')
('28124841-1cf3-2818-d4ee-8574fac23298', '1973-11-29', 'Adalberto916', 'Leffler128', 'M', 'M')
('bdf7af8e-c765-1e02-dc9d-24b49ad290a3', '1987-10-30', 'Rich940', 'Jakubowski832', 'M', 'M')
('03127a78-1c85-5b31-d4d5-14e941262148', '2017-10-27', 'Annamae625', 'Lindgren255', '', 'F')
('b202a7c5-b5b7-cf47-d5a6-d41e468d06c7', '1998-12-31', 'Lilliam592', 'Hickle134', '', 'F')


In [None]:
dwh_cursor.execute("SELECT PATIENT,CODE, DESCRIPTION STRING, BASE_COST from procedures_info")
rows = dwh_cursor.fetchall()
for row in rows[:10]:
  print(row)

('3575b903-dbd0-1d55-6146-9e8aa4ed52a5', 73761001, 'Colonoscopy', 9209.61)
('3575b903-dbd0-1d55-6146-9e8aa4ed52a5', 430193006, 'Medication Reconciliation (procedure)', 414.03)
('3575b903-dbd0-1d55-6146-9e8aa4ed52a5', 73761001, 'Colonoscopy', 13437.13)
('3575b903-dbd0-1d55-6146-9e8aa4ed52a5', 23426006, 'Measurement of respiratory function (procedure)', 334.63)
('3575b903-dbd0-1d55-6146-9e8aa4ed52a5', 430193006, 'Medication Reconciliation (procedure)', 626.56)
('3575b903-dbd0-1d55-6146-9e8aa4ed52a5', 261352009, 'Face mask (physical object)', 3.8)
('aff157cc-b6d3-412b-ccbe-bfd5fac1c2d5', 127783003, 'Spirometry (procedure)', 9285.07)
('aff157cc-b6d3-412b-ccbe-bfd5fac1c2d5', 430193006, 'Medication Reconciliation (procedure)', 548.9)
('17f0c6d9-8931-8839-66cb-3ca6fb066d3e', 430193006, 'Medication Reconciliation (procedure)', 518.09)
('17f0c6d9-8931-8839-66cb-3ca6fb066d3e', 430193006, 'Medication Reconciliation (procedure)', 496.22)


# Links:

https://pynative.com/python-sqlite/

https://www.vertabelo.com/blog/using-python-and-mysql-in-the-etl-process/

https://github.com/iamaziz/etl/blob/master/pipeline.py


https://medium.com/datadriveninvestor/complete-data-analytics-solution-using-etl-pipeline-in-python-edd6580de24b