<a href="https://colab.research.google.com/github/Fuenfgeld/DMA2023TeamB/blob/main/ETL_and_Implementing_DWH.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

Creating a Datawarehouse and transform data from source database to datawarehouse db

Importing required libraries and mounting Google Drive

In [2]:
import sqlite3
from sqlite3 import Error

from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


Defining File Paths 

DB_SOURCE_PATH --> where Staging database resides

DB_DWH_PATH --> where DWH will reside

In [3]:
# path of source database
DB_SOURCE_PATH = f"/content/drive/Shareddrives/DMA_2023_D/DMA2023TeamB/source_dbs/Source_covid19_Staging.db"

# path of datawarehouse
DB_DWH_PATH = f"/content/drive/Shareddrives/DMA_2023_D/DMA2023TeamB/source_dbs/DWH_covid19.db"

Creating Datawarehouse

In [4]:
class DB(object):
  def __init__(self, db_file):
    self.conn = sqlite3.connect(db_file)
    self.cur = self.conn.cursor()
    self.__init_db()
  
  def __del__(self):
      self.conn.commit()
      self.conn.close()

  def __init_db(self):
    # sql queries to create tables in Datawarehouse 

#  sql query to create d_conditions table
    create_d_conditions = """CREATE TABLE IF NOT EXISTS d_conditions (
                           CONDITIONS_CODE STRING PRIMARY KEY,
                           CONDITIONS_DESCRIPTION STRING
                       );"""

#  sql query to create d_devices table
    create_d_devices = """CREATE TABLE IF NOT EXISTS d_devices (
                            DEVICES_CODE STRING PRIMARY KEY,
                            DEVICES_DESCRIPTION STRING
                           );"""

#  sql query to create d_medications table
    create_d_medications = """CREATE TABLE IF NOT EXISTS d_medications (
                            MEDICATIONS_CODE STRING PRIMARY KEY,
                            MEDICATIONS_DESCRIPTION STRING
                           );"""

#  sql query to create d_procedures table
    create_d_procedures = """CREATE TABLE IF NOT EXISTS d_procedures (
                            PROCEDURES_CODE STRING PRIMARY KEY,
                            PROCEDURES_DESCRIPTION STRING
                           );"""

#  sql query to create d_patients table
    create_d_patients = """CREATE TABLE IF NOT EXISTS d_patients (
                            PATIENTS_ID STRING PRIMARY KEY,
                            PATIENTS_BIRTHDATE DATE,
                            PATIENTS_DEATHDATE DATE,
                            PATIENTS_GENDER STRING
                           );""" 

# sql query to create F_ANTICOVIS table
    create_F_ANTICOVIS = """CREATE TABLE IF NOT EXISTS F_ANTICOVIS (
                           PATIENTS_ID STRING,
                           CONDITIONS_CODE STRING,
                           PROCEDURES_CODE STRING,
                           DEVICES_CODE STRING,
                           MEDICATIONS_CODE STRING,
                           MEDICATION_GRADING INT DEFAULT '0', 
                           COURSE_OF_DESEASE INT DEDAULT '0',
                           FOREIGN KEY (PATIENTS_ID)
                              REFERENCES d_patients (PATIENTS_ID),
                           FOREIGN KEY (CONDITIONS_CODE)
                              REFERENCES d_conditions (CONDITIONS_CODE),
                           FOREIGN KEY (PROCEDURES_CODE)
                              REFERENCES d_procedures (PROCEDURES_CODE),
                           FOREIGN KEY (DEVICES_CODE)
                              REFERENCES d_devices(DEVICES_CODE),
                           FOREIGN KEY (MEDICATIONS_CODE)
                              REFERENCES d_medications (MEDICATIONS_CODE)
                       );"""
    create_tables =  [create_d_conditions,# conditions dimension
                      create_d_devices,# devices dimension
                      create_d_medications,# medications dimension
                      create_d_procedures, # procedures dimension
                      create_d_patients, #patients dimension
                      create_F_ANTICOVIS #Factstable
                      ]

    if self.conn is not None:
      for query in create_tables:
          self.cur.execute(query)
    else:
      print('Connection to database failed')

ETL/ELT (Extract, transform, load )

In [5]:
# exporting queries
class SqlQuery:
  def __init__(self, source_table, column_names, sink_table):
    self.source_table = source_table
    self.column_numbers = len(column_names)
    self.column_names = ', '.join(column_names)
    self.sink_table = sink_table

  def extract_query(self):
    return 'SELECT ' + self.column_names + ' FROM ' + self.source_table 

  def load_query(self):
    values_str = '?,' * self.column_numbers
    # print("*****", values_str, column_names, column_numbers)
    values_str = values_str[:-1]
    return 'INSERT OR REPLACE INTO ' + self.sink_table + ' VALUES (' + values_str + ')'

    # return 'INSERT INTO ' + self.sink_table + '(' + self.column_names + ') VALUES (' + values_str + ')'
  



In [6]:
def etl(query, source_cnx, target_cnx):
  # extract data from source db
  source_cursor = source_cnx.cursor()
  source_cursor.execute(query.extract_query())
  data = source_cursor.fetchall()
  source_cursor.close()

  # load data into warehouse db
  if data:
    target_cursor = target_cnx.cursor()
    target_cursor.executemany(query.load_query(), data)
    print('data loaded to warehouse db') 
    target_cnx.commit()
    target_cursor.close()
  else:
    print('data is empty')


def etl_process(queries, target_cnx, db_source):
  """
  queries: list
        a list of queries
  target_cnx: SQLite connection
  db_source: str
        path of source database      
  
  """  
  # establish source db connection
  try:
    source_cnx = sqlite3.connect(db_source)
  except Error as err:
    print(err)
  
  # loop through sql queries
  for query in etl_queue:
    etl(query, source_cnx, target_cnx)
    
  # close the source db connection
  source_cnx.close()

In [7]:
# create Datawarehouse
dwh_db = DB(DB_DWH_PATH)

In [8]:
# # check list of tables
target_cnx = dwh_db.conn
dwh_cursor = target_cnx.cursor()
dwh_cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
#dwh_cursor.execute('PRAGMA table_info(' + "F_ANTICOVIS" + ');')
print(dwh_cursor.fetchall())

[('d_conditions',), ('d_devices',), ('d_medications',), ('d_procedures',), ('d_patients',), ('F_ANTICOVIS',)]


In [9]:
print('starting etl')   
# list for iteration
etl_queue = []

# d_conditions table
conditions_columns = ['CONDITIONS_CODE', 'CONDITIONS_DESCRIPTION']
sql_query_conditions = SqlQuery("S_CONDITIONS", conditions_columns, "d_conditions")
etl_queue.append(sql_query_conditions)

# d_devices table
devices_columns = ['DEVICES_CODE', 'DEVICES_DESCRIPTION']
sql_query_devices = SqlQuery("S_DEVICES", devices_columns, "d_devices")
etl_queue.append(sql_query_devices)

# d_medications table
medications_columns = ['MEDICATIONS_CODE','MEDICATIONS_DESCRIPTION']
sql_query_medications = SqlQuery("S_MEDICATIONS", medications_columns, "d_medications")
etl_queue.append(sql_query_medications)

# d_procedures table
procedures_columns = ['PROCEDURES_CODE', 'PROCEDURES_DESCRIPTION']
sql_query_procedures = SqlQuery("S_PROCEDURES", procedures_columns, "d_procedures")
etl_queue.append(sql_query_procedures)


# d_patients table
patients_columns = ['PATIENTS_ID', 'PATIENTS_BIRTHDATE', 'PATIENTS_DEATHDATE', 'PATIENTS_GENDER']
sql_query_patients = SqlQuery("S_PATIENTS", patients_columns, "d_patients")
etl_queue.append(sql_query_patients)


# list for iteration
# etl_queue

starting etl


Load form staging Database to Datawarehouse

In [10]:
# establish connection for target database 
target_cnx = dwh_db.conn
etl_process(etl_queue, target_cnx, DB_SOURCE_PATH)

data loaded to warehouse db
data loaded to warehouse db
data loaded to warehouse db
data loaded to warehouse db
data loaded to warehouse db


In [None]:
from pandas import DataFrame
import numpy as np

# selecting relevant data for Staging DB as a Facts_Table DataFrame
connctino_to_Staging_DB = sqlite3.connect(DB_SOURCE_PATH)
conn = connctino_to_Staging_DB
Staging_DB_Cursor = conn.cursor()

Staging_DB_Cursor.execute (""" SELECT SSS_patients.patients_ID,
                                      SSS_conditions.conditions_code, 
                                      SSS_procedures.procedures_code, 
                                      SSS_devices.devices_code, 
                                      SSS_medications.medications_code 
                               
                               FROM SSS_PATIENTS 
                                    LEFT JOIN SSS_CONDITIONS 
                                           ON SSS_patients.patients_ID = SSS_conditions.conditions_patient_ID
                                    LEFT JOIN SSS_PROCEDURES 
                                           ON SSS_patients.patients_ID = SSS_procedures.procedures_patient_ID
                                    LEFT JOIN SSS_DEVICES 
                                           ON SSS_patients.patients_ID = SSS_devices.devices_patient_ID
                                    LEFT JOIN SSS_MEDICATIONS
                                           ON SSS_patients.patients_ID = SSS_medications.medications_patient_ID
                                  """)
DF_of_Facts_Table = DataFrame(Staging_DB_Cursor.fetchall())

# renaming culums in Dataframe
DF_of_Facts_Table.columns =['patients_ID', 'conditions_code','procedures_code', 'devices_code', 'medications_code']
# remooving duplicates from Dataframe
DF_of_Facts_Table.drop_duplicates()
conn.close()

# creating new table columns for medication_grading and course_of_desease grading
DF_of_Facts_Table ['medication_grading'] = '0'
DF_of_Facts_Table ['course_of_Desease'] = '0'
# Replacing NaN
DF_of_Facts_Table = DF_of_Facts_Table.fillna(0)


# defininf and implemening course_of_desease classes

grades_mapping = {
386661006: 1,
25064002:	1,
36955009:	1,
267102003: 1,
261352009: 1,
43724002:	2,
49727002:	2,
267060006: 2,
84229001:	2,
57676002:	2,
68962001:	2,
68235000:	2,
422587007: 2,
249497008: 2,
234466008: 3,
132281000119108: 3,
267036007: 3,
233604007: 3,
56018004:	3,
371908008: 3,
706870000: 4,
66857006:	4,
389087006: 4,
86175003:	4,
40095003:	4,
271825005: 4,
112798008: 4,
433112001: 4,
431182000: 4,
67782005:	5,
65710008:	5,
84114007:	5,
770349000: 5,
76571007:	5,
36965003:	5,
449071006: 5,
26763009:	5,
180325003: 5,
302497006: 5,

}

# Loop through all rows
for i, row in DF_of_Facts_Table.iterrows():
    max_grade = []    
    # Check if any of the codes in the 'conditions_code' column is present in the grades_mapping
    if row['conditions_code'] in grades_mapping:
        max_grade.append(grades_mapping[row['conditions_code']])
    # Check if any of the codes in the 'procedures_code' column is present in the grades_mapping
    if row['procedures_code'] in grades_mapping:
        max_grade.append(grades_mapping[row['procedures_code']])
    # Check if any of the codes in the 'devices_code' column is present in the grades_mapping
    if row['devices_code'] in grades_mapping:
        max_grade.append(grades_mapping[row['devices_code']])
    if len(max_grade) != 0:
        DF_of_Facts_Table.at[i, 'course_of_Desease'] = max(max_grade)
    else:
        DF_of_Facts_Table.at[i, 'course_of_Desease'] = 0

DF_of_Facts_Table.head(30)



In [12]:
# defining and implementing medication_grading.

medication_grading = {
854235:1,
854252:1,
854228:1,
309362:2,
855332:3,
}

# Loop through all rows
for i, row in DF_of_Facts_Table.iterrows():   
    # Check if any of the codes in the 'conditions_code' column is present in the grades_mapping
    if row['medications_code'] in medication_grading:
        DF_of_Facts_Table.at[i, 'medication_grading'] = medication_grading[row['medications_code']]



In [None]:
DF_of_Facts_Table.head(30)

In [14]:
DF_of_Facts_Table
conn = sqlite3.connect(DB_DWH_PATH)

DF_of_Facts_Table.to_sql('F_ANTICOVIS',conn, if_exists = 'replace', index = False)
conn.commit()
conn.close()

In [16]:
conn = sqlite3.connect(DB_DWH_PATH)
cursor = conn.cursor()
cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
#cursor.execute("SELECT count(*) FROM F_ANTICOVIS")
#cursor.execute("""DELETE FROM F_ANTICOVIS""")
print(cursor.fetchall())

[('d_conditions',), ('d_devices',), ('d_medications',), ('d_procedures',), ('d_patients',), ('F_ANTICOVIS',)]
