<a href="https://colab.research.google.com/github/Fuenfgeld/DMA2024TeamA/blob/main/ETL_GrpA.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Objective**: Create a Datawarehouse and transform data from source database to datawarehouse db



# Importing libraries

In [1]:
import sqlite3
from sqlite3 import Error

In [2]:
from google.colab import drive
drive.mount('/content/drive')

Mounted at /content/drive


# Path of input/output data


In [3]:
# Path
material_path = "/content/drive/MyDrive/M3_DM_GrpA"

Uncomment one type of patient that you would like to create for it datawarehouse database and  transfer tables from source db to it.

In [4]:
## type of patients
# patient_type = "allergy"
# patient_type = "asthma"
# patient_type = "breast_cancer"
# patient_type = "metabolic_syndrome_disease"
# patient_type = "colorectal_cancer"
patient_type = "covid19"
# patient_type = "dermatitis"
# patient_type = "lung_cancer"

In [5]:
# path of source database
DB_SOURCE_PATH = material_path + f"/source_covid19.db"
DB_SOURCE_PATH

'/content/drive/MyDrive/M3_DM_GrpA/source_covid19.db'

In [6]:
# path of datawarehouse
DB_DWH_PATH = material_path + f"/DWH_GrpA.db"


# Create Datawarehouse

In [12]:
class DB(object):
  def __init__(self, db_file):
    self.conn = sqlite3.connect(db_file)
    self.cur = self.conn.cursor()
    self.__init_db()

  def __del__(self):
      self.conn.commit()
      self.conn.close()

  def __init_db(self):
    # sql queries to create tables in Datawarehouse

    #  sql query to create patients_info table
    create_patients_info = """CREATE TABLE IF NOT EXISTS patients_info (
                           Id STRING PRIMARY KEY,
                           BIRTHDATE DATE,
                           DEATHDATE DATE,
                           SSN STRING,
                           DRIVERS STRING,
                           PASSPORT STRING,
                           PREFIX STRING,
                           FIRST STRING,
                           LAST STRING,
                           SUFFIX STRING,
                           MAIDEN STRING,
                           MARITAL STRING,
                           RACE STRING,
                           ETHNICITY STRING,
                           GENDER STRING,
                           BIRTHPLACE STRING,
                           ADDRESS STRING,
                           CITY STRING,
                           STATE STRING,
                           COUNTY STRING,
                           ZIP STRING,
                           LAT INTEGER,
                           LON INTEGER,
                           HEALTHCARE_EXPENSES INTEGER,
                           HEALTHCARE_COVERAGE INTEGER,
                           FOREIGN KEY (COUNTY)
                              REFERENCES county (county)
                       );"""

    #  sql query to create encounters table
    create_encounters_info = """CREATE TABLE IF NOT EXISTS encounters_info (
                           Id STRING PRIMARY KEY,
                           START DATE,
                           STOP DATE,
                           PATIENT STRING,
                           ORGANIZATIONS STRING,
                           PROVIDER STRING,
                           PAYER STRING,
                           ENCOUNTERCLASS STRING,
                           CODE STRING,
                           DESCRIPTION STRING,
                           BASE_ENCOUNTER_COST INTEGER,
                           TOTAL_CLAIM_COST INTEGER,
                           PAYER_COVERAGE INTEGER,
                           REASONCODE STRING,
                           REASONDESCRIPTION STRING,
                           FOREIGN KEY (PATIENT)
                              REFERENCES patients (Id)
                       );"""

    # sql query to create condition table
    create_conditions_info = """CREATE TABLE IF NOT EXISTS conditions_info (
                           START DATE,
                           STOP DATE,
                           PATIENT STRING,
                           ENCOUNTER STRING,
                           CODE STRING,
                           DESCRIPTION STRING,
                           FOREIGN KEY (PATIENT)
                              REFERENCES patients_info (Id)
                           FOREIGN KEY (Encounter)
                              REFERENCES encounters (Id)

                       );"""

    # sql query to create careplans table
    create_careplans_info = """CREATE TABLE IF NOT EXISTS careplans_info (
                           Id STRING PRIMARY KEY,
                           START DATE,
                           STOP DATE,
                           PATIENT STRING,
                           ENCOUNTER STRING,
                           CODE STRING,
                           DESCRIPTION STRING,
                           REASONCODE STRING,
                           REASONDESCRIPTION STRING,
                           FOREIGN KEY (PATIENT)
                              REFERENCES patients_info (Id)
                           FOREIGN KEY (Encounter)
                              REFERENCES encounters (Id)

                       );"""

    # sql query to create procedures table
    create_procedures_info = """CREATE TABLE IF NOT EXISTS procedures_info (
                           DATE DATE,
                           PATIENT STRING,
                           ENCOUNTER STRING,
                           CODE STRING,
                           DESCRIPTION STRING,
                           BASE_COST INTEGER,
                           REASONCODE STRING,
                           REASONDESCRIPTION STRING,
                           FOREIGN KEY (PATIENT)
                              REFERENCES patients_info (Id)
                       );"""

    # sql query to create medications_info table
    create_medications_info = """CREATE TABLE IF NOT EXISTS medications_info (
                           START DATE,
                           STOP DATE,
                           PATIENT STRING,
                           PAYER STRING,
                           ENCOUNTER STRING,
                           CODE STRING,
                           DESCRIPTION STRING,
                           BASE_COST INTEGER,
                           PAYER_COVERAGE INTEGER,
                           DISPENSES INTEGER,
                           TOTALCOST INTEGER,
                           REASONCODE STRING,
                           REASONDESCRIPTION STRING,
                           FOREIGN KEY (PATIENT)
                              REFERENCES patients_info (Id)

                       );"""

    # sql query to create observations table
    create_observations_info = """CREATE TABLE IF NOT EXISTS observations_info (
                           DATE DATE,
                           PATIENT STRING,
                           ENCOUNTER STRING,
                           CODE STRING,
                           DESCRIPTION STRING,
                           VALUE STRING,
                           UNITS STRING,
                           TYPE STRING,
                           FOREIGN KEY (PATIENT)
                              REFERENCES patients_info (Id)
                       );"""


    # sql query to create devices table
    create_devices_info = """CREATE TABLE IF NOT EXISTS devices_info (
                           START DATE,
                           STOP DATE,
                           PATIENT STRING,
                           ENCOUNTER STRING,
                           CODE STRING,
                           DESCRIPTION STRING,
                           UDI STRING,
                           FOREIGN KEY (PATIENT)
                              REFERENCES patients_info (Id)
                       );"""

   # sql query to create county table
    create_county_info = """CREATE TABLE IF NOT EXISTS county_info(
                    STATE STRING,
                    COUNTY STRING PRIMARY KEY,
                    POPULATION INTEGER,
                    AREA_sqm INTEGER,
                    DENSITY_per_sqm INTEGER,
                    SEASIDE INTEGER
                  );"""

    create_tables = [create_patients_info, create_county_info, # demographic data
                     create_conditions_info, # diagnoses data
                     create_encounters_info, create_careplans_info, create_procedures_info, create_medications_info, # procedures data
                     create_observations_info, create_devices_info # lab values data
                     ]


    if self.conn is not None:
      # self.cur.execute(f"drop table if exists medications_info")
      for query in create_tables:
          self.cur.execute(query)
    else:
      print('Connection to database failed')



#ETL/ELT (Extract, transform, load )

In [8]:
# exporting queries
class SqlQuery:
  def __init__(self, source_table, column_names, sink_table):
    self.source_table = source_table
    self.column_numbers = len(column_names)
    self.column_names = ', '.join(column_names)
    self.sink_table = sink_table

  def extract_query(self):
    return 'SELECT ' + self.column_names + ' FROM ' + self.source_table

  def load_query(self):
    values_str = '?,' * self.column_numbers
    # print("*****", values_str, column_names, column_numbers)
    values_str = values_str[:-1]
    return 'INSERT OR REPLACE INTO ' + self.sink_table + ' VALUES (' + values_str + ')'

    # return 'INSERT INTO ' + self.sink_table + '(' + self.column_names + ') VALUES (' + values_str + ')'




In [9]:
def etl(query, source_cnx, target_cnx):
  # extract data from source db
  source_cursor = source_cnx.cursor()
  source_cursor.execute(query.extract_query())
  data = source_cursor.fetchall()
  source_cursor.close()

  # load data into warehouse db
  if data:
    target_cursor = target_cnx.cursor()
    target_cursor.executemany(query.load_query(), data)
    print('data loaded to warehouse db')
    target_cnx.commit()
    target_cursor.close()
  else:
    print('data is empty')


def etl_process(queries, target_cnx, db_source):
  """
  queries: list
        a list of queries
  target_cnx: SQLite connection
  db_source: str
        path of source database

  """
  # establish source db connection
  try:
    source_cnx = sqlite3.connect(db_source)
  except Error as err:
    print(err)

  # loop through sql queries
  for query in etl_queue:
    etl(query, source_cnx, target_cnx)

  # close the source db connection
  source_cnx.close()

In [13]:
# create Datawarehouse
dwh_db = DB(DB_DWH_PATH)

In [17]:
print('starting etl')
# list for iteration
etl_queue = []

# demographic table
patients_columns = ['Id', 'BIRTHDATE', 'DEATHDATE', 'SSN', 'DRIVERS', 'PASSPORT', 'PREFIX',
                    'FIRST', 'LAST', 'SUFFIX', 'MAIDEN', 'MARITAL', 'RACE', 'ETHNICITY',
                    'GENDER', 'BIRTHPLACE', 'ADDRESS', 'CITY', 'STATE', 'COUNTY', 'ZIP',
                    'LAT', 'LON', 'HEALTHCARE_EXPENSES', 'HEALTHCARE_COVERAGE']
sql_query_patients = SqlQuery("patients", patients_columns, "patients_info")
etl_queue.append(sql_query_patients)

county_columns = ['STATE', 'COUNTY', 'POPULATION', 'AREA_sqm', 'DENSITY_per_sqm', 'SEASIDE']
sql_query_county = SqlQuery("county", county_columns, "county_info")
etl_queue.append(sql_query_county)

# diagnoses tables
conditions_columns = ['START', 'STOP', 'PATIENT', 'ENCOUNTER', 'CODE', 'DESCRIPTION']
sql_query_conditions = SqlQuery("conditions", conditions_columns, "conditions_info")
etl_queue.append(sql_query_conditions)

# procedures tables
encounters_columns = ['Id', 'START', 'STOP', 'PATIENT', 'ORGANIZATIONS', 'PROVIDER', 'PAYER', 'ENCOUNTERCLASS', 'CODE', 'DESCRIPTION', 'BASE_ENCOUNTER_COST', 'TOTAL_CLAIM_COST', 'PAYER_COVERAGE', 'REASONCODE', 'REASONDESCRIPTION']
sql_query_disease = SqlQuery("encounters", encounters_columns, "encounters_info")
etl_queue.append(sql_query_disease)

procedures_columns = ['DATE', 'PATIENT', 'ENCOUNTER', 'CODE', 'DESCRIPTION', 'BASE_COST','REASONCODE', 'REASONDESCRIPTION']
sql_query_procedures = SqlQuery("procedures", procedures_columns, "procedures_info")
etl_queue.append(sql_query_procedures)

careplans_columns = ['Id', 'START', 'STOP', 'PATIENT', 'ENCOUNTER', 'CODE', 'DESCRIPTION', 'REASONCODE', 'REASONDESCRIPTION']
sql_query_careplans = SqlQuery("careplans", careplans_columns, "careplans_info")
etl_queue.append(sql_query_careplans)

medications_columns = ['START', 'STOP', 'PATIENT', 'PAYER', 'ENCOUNTER', 'CODE', 'DESCRIPTION',
                       'BASE_COST', 'PAYER_COVERAGE', 'DISPENSES', 'TOTALCOST', 'REASONCODE',
                       'REASONDESCRIPTION']
sql_query_medications = SqlQuery("medications", medications_columns, "medications_info")
etl_queue.append(sql_query_medications)

# lab values tables
observations_columns = ['DATE', 'PATIENT', 'ENCOUNTER', 'CODE', 'DESCRIPTION', 'VALUE', 'UNITS','TYPE']
sql_query_observations = SqlQuery("observations", observations_columns, "observations_info")
etl_queue.append(sql_query_observations)

devices_columns = ['START', 'STOP', 'PATIENT', 'ENCOUNTER', 'CODE', 'DESCRIPTION', 'UDI']
sql_query_devices = SqlQuery("devices", devices_columns, "devices_info")
etl_queue.append(sql_query_devices)

# list for iteration
# etl_queue

starting etl


In [18]:
# establish connection for target database (sql-server)
target_cnx = dwh_db.conn
etl_process(etl_queue, target_cnx, DB_SOURCE_PATH)

data loaded to warehouse db
data loaded to warehouse db
data loaded to warehouse db
data loaded to warehouse db
data loaded to warehouse db
data loaded to warehouse db
data loaded to warehouse db
data loaded to warehouse db
data loaded to warehouse db


In [19]:
target_cnx.commit()

In [20]:
target_cnx.close()