<a href="https://colab.research.google.com/github/Fuenfgeld/DMA2023TeamC/blob/main/Datenbank/ETL.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# **Objective**: Create a Datawarehouse and transform data from source database to datawarehouse db



# Importing libraries

In [None]:
%reset -f

In [None]:
import sqlite3
from sqlite3 import Error
import random
import string
import csv

In [None]:
# Mount google Drive to access the data

from google.colab import drive
drive.mount("/content/drive")

print(f'The current working directory is:')
!pwd

# Path of input/output data 


In [None]:
# Define the variables to store the paths to csv files and to the data folder

disease = 'metabolic_syndrome_disease'
path_csv_files = f"/content/drive/Shareddrives/TeamC/Material/csv_data/{disease}"
path_teamc = "content/drive/Shareddrives/TeamC"

# path of the source database
DB_SOURCE_PATH = "/content/drive/Shareddrives/TeamC/teamc_db.db"

# path of the data warehouse
DB_DWH_PATH = "/content/drive/Shareddrives/TeamC/teamc_dwh.db"

In [None]:
# Define the patient type
patient_type = "metabolic_syndrome_disease"


# Create Datawarehouse 

In [None]:
class DB(object):
  def __init__(self, db_file):
    self.conn = sqlite3.connect(db_file)
    self.cur = self.conn.cursor()
    self.__init_db()
  
  # This function commits the changes and closes the connection
  def __del__(self):
      self.conn.commit()
      self.conn.close()

  # If the DB does not exist, it will be created. Afterwards empty tables will be created using the SQL Statements from the source DB
  def __init_db(self):

    # drop the existing tables (in case the code has been run already)
    drop_patients_info = """DROP TABLE IF EXISTS patients_info"""
    drop_conditions_info = """DROP TABLE IF EXISTS conditions_info"""
    drop_medications_info = """DROP TABLE IF EXISTS medications_info"""
    drop_med_codes = """DROP TABLE IF EXISTS med_codes"""
    drop_conditions_codes = """DROP TABLE IF EXISTS conditions_codes"""

    #  sql query to create patients_info table
    create_patients_info = """CREATE TABLE IF NOT EXISTS patients_info (
                            Id STRING PRIMARY KEY, 
                            BIRTHDATE DATE, 
                            DEATHDATE DATE,
                            RACE STRING,
                            ETHNICITY STRING
                            );"""


    # sql query to create conditions table
    create_conditions_info = """CREATE TABLE IF NOT EXISTS conditions_info (
                                START DATE,
                                STOP DATE, 
                                PATIENT STRING,
                                CODE STRING,
                                FOREIGN KEY (PATIENT) REFERENCES patients_info (Id)
                                FOREIGN KEY (CODE) REFERENCES conditions_codes (CODE)
                                );"""

    # sql query to create medications table
    create_medications_info = '''CREATE TABLE IF NOT EXISTS medications_info (
                                  START DATE,
                                  STOP DATE,
                                  PATIENT STRING,
                                  CODE STRING,
                                  FOREIGN KEY (PATIENT) REFERENCES patients (Id)
                                  FOREIGN KEY (CODE) REFERENCES med_codes (CODE)
                                  );'''
    # create table to store the medication codes and their description
    create_med_codes = '''CREATE TABLE IF NOT EXISTS med_codes (
                          CODE STRING,
                          DESCRIPTION STRING);'''
                          
    # create table to store conditions codes and their description
    create_conditions_codes = '''CREATE TABLE IF NOT EXISTS conditions_codes (
                              CODE STRING,
                              DESCRIPTION STRING);'''                   


    # A list with the names of the tables that were created in the new DB
    create_tables = [create_patients_info, # demographic data
                     create_conditions_info, # diagnoses data
                     create_medications_info,# encounters data
                     create_med_codes, # medication codes and their description
                     create_conditions_codes #condition codes and their description
                     ]
    drop_tables =  [drop_patients_info,
                   drop_conditions_info,
                   drop_medications_info,
                   drop_med_codes,
                   drop_conditions_codes]

    if self.conn is not None: # If connection was succesfully initialized, the following loop will run
      
      # Drop every table
      for query in drop_tables:
        self.cur.execute(query)
        
      # For every element in the 'create_tables' list, its corresponding statement will be executed, 
      # which in this case means, the creating of the tables
      for query in create_tables:
        self.cur.execute(query)

    else:
      # If the connection was not succesfully initialized, print this message
      print('Connection to database failed')



#ETL/ELT (Extract, transform, load )

In [None]:
# Defining class SqlQuery and its methods

class SqlQuery:
  def __init__(self, source_table, column_names, sink_table):
    self.source_table = source_table

    # Define how many comlumns there are
    self.column_numbers = len(column_names) 

    # Transform the list of column names into a comma-separated string with the names
    self.column_names = ', '.join(column_names) 
    self.sink_table = sink_table

  # The following function returns SELECT query using column names from the 'column_names' variable,
  # transformed in the above function to be a comma-separated string
  def extract_query(self):
    return 'SELECT ' + self.column_names + ' FROM ' + self.source_table 

  def load_query(self):

    # As many comma-separated question marks as there are columns
    values_str = '?,' * self.column_numbers

    # Delete the last comma
    values_str = values_str[:-1] 

    # Return an INSERT statement, targeting 'sink_table' with values not yet defined (question marks
    # are here placeholders for a later function)
    return 'INSERT OR REPLACE INTO ' + self.sink_table + ' VALUES (' + values_str + ')'

  



In [None]:
# Copy the data from the source db into the target db
# source_cxn - connection to the source db
# target_cnx - connection to the target db

def etl(query, source_cnx, target_cnx):

  ## extract data from source db
  # create a cursor on the source connection
  source_cursor = source_cnx.cursor()

  # Using the query from the 'query' variable, it being a SqlQuery class object,
  # use the 'extract_query' method to return a SELECT statement and then execute it
  source_cursor.execute(query.extract_query())

  # Store the extracted data in the 'data' variable
  data = source_cursor.fetchall()

  # close the cursor
  source_cursor.close()


  # load data into warehouse db
  # if the data variable contains any data, do the following
  if data:

    # Initialize cursor on the target db connection
    target_cursor = target_cnx.cursor()

    # Using the 'load_query' method, return a Sql INSERT Statement and complement it with
    # the data from the 'data' variable - that is the data extracted from the source table.
    # Then, execute the statement using the target db cursor
    target_cursor.executemany(query.load_query(), data)

    # After executing the above statement, print out the following message
    print('data loaded to warehouse db') 

    # Commit the changes to the targed db
    target_cnx.commit()

    # Close the cursor
    target_cursor.close()
  else:
    print('data is empty')


# Define a function to process multiple queries, so that the whole db can be copied

def etl_process(queries, target_cnx, db_source):

# 'queries' - a list of queries
# 'target_cnx' - connection to the target DB
# 'db_source' - path to the source db file

  # establish source db connection
  try:
    source_cnx = sqlite3.connect(db_source)
  except Error as err:
    print(err)
  
  # loop through sql queries, using the above defined 'etl' function
  for query in etl_queue:
    etl(query, source_cnx, target_cnx)
    
  # close the source db connection
  source_cnx.close()

In [None]:
## create Datawarehouse
# Using the aforedefined DB Class, create a Database file in the 'DB_DWH_PATH' path
# store it in a variable dwh_db
dwh_db = DB(DB_DWH_PATH)

In [None]:
## Create sql queries to populate the tables

# create an empty list, where later on the sql queries will be stored
etl_queue = []

# store the column names of the 'patients' table in a variable
patients_columns = ['Id', 'BIRTHDATE', 'DEATHDATE', 'RACE', 'ETHNICITY']
                  
# create a variable sql_query_patients, which is to be of class 'SqlQuery'
# the argument order within the class is: source_table, column_names, sink_table                
sql_query_patients = SqlQuery("patients", patients_columns, "patients_info")

# add the above sql query to the query list
etl_queue.append(sql_query_patients)


# repeat the above process for the 'conditions' table
conditions_columns = ['START', 'STOP', 'PATIENT', 'CODE']
sql_query_conditions = SqlQuery("conditions", conditions_columns, "conditions_info")
etl_queue.append(sql_query_conditions)

# repeat for the medications table
medications_columns = ['START', 'STOP', 'PATIENT', 'CODE']
sql_query_medications = SqlQuery("medications", medications_columns, "medications_info")
etl_queue.append(sql_query_medications)

In [None]:
# establish connection for target database
target_cnx = dwh_db.conn

# use the 'etl_process' function to fill the target database with the data from the source database
# for every table, a message will be printed out
etl_process(etl_queue, target_cnx, DB_SOURCE_PATH)

In [None]:
target_cnx.commit()

In [None]:
# check list of tables
# there should be 2 tables: 'patients_info' and 'conditions_info'
dwh_cursor = target_cnx.cursor()
dwh_cursor.execute("SELECT name FROM sqlite_master WHERE type='table';")
print(dwh_cursor.fetchall())


In [None]:
# now, populate the med_codes and the conditions_codes and group by code to get unique codes and their descriptions
# to do that, use the data from the source database

target_cnx = dwh_db.conn
source_cnx = sqlite3.connect(DB_SOURCE_PATH)

dwh_cursor = target_cnx.cursor()
src_cursor = source_cnx.cursor()

# table med_codes
src_cursor.execute('''SELECT CODE, DESCRIPTION FROM medications GROUP BY CODE''')
data = src_cursor.fetchall()
dwh_cursor.executemany('''INSERT INTO med_codes (CODE, DESCRIPTION) VALUES (?,?)''', data)

#table conditions_codes
src_cursor.execute('''SELECT CODE, DESCRIPTION FROM conditions GROUP BY CODE''')
data = src_cursor.fetchall()
dwh_cursor.executemany('''INSERT INTO conditions_codes (CODE, DESCRIPTION) VALUES (?,?)''', data)


#check if tables are properly populated
dwh_cursor.execute('SELECT * FROM conditions_codes')
data = dwh_cursor.fetchall()
for n in data:
  print(n)

dwh_cursor.execute('SELECT * FROM med_codes')
data = dwh_cursor.fetchall()
for n in data:
  print(n)

In [None]:
target_cnx.commit()

In [None]:
# check columns in a table
dwh_cursor.execute('PRAGMA table_info(' + "patients_info" + ');')
dwh_cursor.fetchall()

In [None]:
# check if the table patients_info is properly filled by printing out the first 5 rows
dwh_cursor.execute("SELECT * from patients_info limit 5")
rows = dwh_cursor.fetchall()
for row in rows:
  print(row)

In [None]:
# check if the table conditions_info is properly filled by printing out the first 5 rows
dwh_cursor.execute("SELECT * from conditions_info LIMIT 5")
rows = dwh_cursor.fetchall()
for row in rows:
  print(row)

In [None]:
# check if the table encounters_info is properly filled by printing out the first 5 rows
dwh_cursor.execute("SELECT * from medications_info LIMIT 5")
rows = dwh_cursor.fetchall()
for row in rows:
  print(row)

## Pseudonymize the patients' Ids

### Random string
Define a function to generate random strings of a specified length in order to generate new Ids

In [None]:
def ranstr(length):
    randomsigns = string.ascii_letters + string.digits
    randomstring = ''.join(random.choice(randomsigns) for i in range(length))
    return randomstring

### Fake Ids
1. Generate the new Ids and stash them in a dictionary
2. Create a new table that will be used to store the pseudonymized and original IDs

In [None]:
# Fetch the original IDs from the patients_info table and store them in a variable Id_true.
# Those are, by definition, all the original IDs that need to be pseudonymized
dwh_cursor.execute('''SELECT Id FROM patients_info;''')
Id_true = dwh_cursor.fetchall()

# Id_true is now a list of tuples. In order to make it easier to loop over, convert it
# into a list. Use a temporary variable tempid, loop over elements in the Id_true list
# and choose the value indexed as 0. In the end, free the tempid variable
tempid = [Id_true[n][0] for n in range(len(Id_true))]
Id_true = tempid
del tempid

# Create an empty list to store pseudonymized IDs in.
Id_fake = []

# Populate the Id_fake list with random strings. The number of the random strings must be
# the same as the number of the original IDs
for n in range(len(Id_true)):
    x = ranstr(20)

    # if the above generated string is already in the list, generate consequent strings until
    # nothing repeats itself
    while x in Id_fake:
      x = ranstr(20)
    Id_fake.append(x)

# Check if the lists are of the same length
if len(Id_fake) != len(Id_true):
  print('The lists contain different number of Ids')

# Create a list of lists (an iterable of iterables) with the original and new IDs
# to feed into the executemany function
pseudo_data = [[Id_true[n], Id_fake[n]] for n in range(len(Id_fake))]


# Create a dictionary with the original and new IDs to ease the process of 
# finding the corresponding ID (original and new)
pseudo_dict = {pseudo_data[n][0] : pseudo_data[n][1] for n in range(len(pseudo_data))}

# free the variables
del pseudo_data
del Id_fake
del Id_true


In [None]:
# CREATE TABLE WITH PSEUDO IDS

# First, drop the table with the pseudonymized and original IDs, if exists
# dwh_cursor.execute('''DROP TABLE IF EXISTS pseudo;''')

# Create the table with the pseudonymized and original IDs
# dwh_cursor.execute('''CREATE TABLE pseudo (Id, new_Id);''')

# populate the 'pseudo' table with the original IDs and their corresponding pseudonymized counterparts
# dwh_cursor.executemany('''INSERT OR REPLACE INTO pseudo (Id, new_Id) VALUES (?,?);''', pseudo_data)

### Export the dictionary
Save the dictionary to a csv file, so that, if need be, the patients' identity can be recovered.

In [None]:
# CSV file path
pseudo_csv_file = "/content/drive/Shareddrives/TeamC/pseudo_data.csv"

# determine the names of the columns in the dictionary
#pseudo_data_columns = ['original_ID', 'fake_ID']

with open(pseudo_csv_file, mode = 'w', newline = '') as file:

  #create a writer object
  writer = csv.writer(file)

  # write the values and keys of the ditionary to the csv file
  writer.writerow(pseudo_dict.keys())
  writer.writerow(pseudo_dict.values())

### Pseudonymize function
Define a function that accepts the name of the table and column to be pseudonymized as well as a dictionary as a reference

In [None]:
def pseudonymize(table, column, fake_dict):

  # fetch the IDs from the selected table and store them in the variable 'table_data'
  dwh_cursor.execute(f'''SELECT {column} FROM {table}''')
  table_data = dwh_cursor.fetchall()
  
  # create two empty lists to store the original and pseudonymized IDs
  orig_ids = []
  fake_ids = []
  
  # populate the 'orig_ids' with the original IDs from the chosen table
  orig_ids = [[table_data[n][0]] for n in range(len(table_data))]

  # populate the 'fake_ids' with the pseudonymized IDs in the same order corresponding to the orig_ids
  fake_ids = [[pseudo_dict[orig_ids[n][0]]] for n in range(len(orig_ids))]

  # loop over every original id in the orig_ids and store it in the 'org' variable. Store its counterpart
  # in the 'fake' variable. Then, use the sql REPLACE command to replace the original ID with the fake one.
  for n in range(len(orig_ids)):
    org = str(orig_ids[n][0])
    fake = fake_ids[n][0]
    try:
      dwh_cursor.execute(f'''UPDATE {table} SET {column} = REPLACE({column}, ?, ?);''', (org, fake) )
    except Error as err:
      print(err)

### Run the pseudonymize function over the tables that need to be pseudonymized

In [None]:
pseudonymize('patients_info', 'Id', pseudo_dict)
pseudonymize('conditions_info', 'PATIENT', pseudo_dict)
pseudonymize('medications_info', 'PATIENT', pseudo_dict)

## Commit and close the connection

In [None]:
target_cnx.commit()
target_cnx.close()