<a href="https://colab.research.google.com/github/Fuenfgeld/TeamDataScDatenmanagementUndArchivierung/blob/add-license-1/COVID19_Allergy_ETL.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# ETL Strecke und Implementierung von Data Lake und Data Warehouse mit der Information von COVID-19 und Allergy

###Import Bibliotheken

In [1]:
import pandas as pd
from functools import reduce
import sqlite3 as sq
from sqlite3 import Error
import hashlib as hl
import csv
import numpy as np
import seaborn as sns
import matplotlib.pyplot as plt
from pandas_profiling import ProfileReport
# will make plot outputs appear and stored within the notebook.
%matplotlib inline
from google.colab import drive
drive.mount("/content/drive", force_remount=True)


Mounted at /content/drive


###Definition der Variablen für die CSV-Dateien und Datenbanken

In [2]:
# Studies
patient_allergy = "allergy"
patient_covid19 = "covid19"

# csv files
material_path_covid19 = "/content/drive/MyDrive/csv_data/"+patient_covid19+"/"
material_path_allergy = "/content/drive/MyDrive/csv_data/"+patient_allergy+"/"

# list elements in directory
# print("Allergy...")
# !ls {material_path_allergy}
# print("COVID-19...")
# !ls {material_path_covid19} 

# Data Warehouse
db_file_path_cov_alle = "/content/drive/MyDrive/db_files/cov_alle.db"
!rm {db_file_path_cov_alle} # delete file if exists

# Data Lake
db_file_path_lake = "/content/drive/MyDrive/db_files/cov_alle_lake.db"
!rm {db_file_path_lake} # delete file if exists

### Extraktion und Transformation

Benuzte Datasets
- `patients`
- `observations`
- `conditions`
- `procedures`

Mehtode
* Laden CSV-Dateien in Data Frames
* Einfügen einer neuen Spalte mit den Studien
* Zwei Data Frames zusammenfügen
* Löschen von redundanten und nicht notweindige Variablen
* Erstellung von Pseudonym (nur für patients)
* Handlung von fehlenden Werten

In [3]:
# Patients

#covid
patient_cov = pd.read_csv(material_path_covid19 + "/patients.csv")
patient_cov["STUDY"] = 'COVID-19' # new column with study

#allergy
patient_all = pd.read_csv(material_path_allergy + "/patients.csv")
patient_all["STUDY"] = 'Allergy'

#union of both dataframes
patient = pd.concat([patient_all, patient_cov]).drop_duplicates()

# delete not important columns
patient = patient.drop(['SSN', 'PREFIX', 'ZIP', 'DRIVERS', 'PASSPORT', 'FIRST',
              'LAST', 'BIRTHPLACE', 'ADDRESS', 'STATE', 'COUNTY', 'MAIDEN', 'SUFFIX'], axis=1)

# new hash patient id
patient['PSPID'] = [hl.md5(val.encode('UTF-8')).hexdigest() for val in patient['Id']]

# another tranformations to clean the information

patient['MARITAL'].fillna(patient['MARITAL'].mode()[0], inplace=True)
patient["DEATHDATE"] = patient.DEATHDATE.fillna(pd.to_datetime("today"))

# date time transformation and keep the year of birth and year of death
patient["DEATHDATE"] = pd.to_datetime(patient["DEATHDATE"])
patient["DEATHDATE"] = patient.DEATHDATE.dt.year
patient["BIRTHDATE"] = pd.to_datetime(patient["BIRTHDATE"])
patient['BIRTHDATE'] = patient.BIRTHDATE.dt.year

# calculate age
patient["AGE"] = patient.DEATHDATE - patient.BIRTHDATE

# show some patients
pd.concat([patient.head(3), patient.tail(3)])

Unnamed: 0,Id,BIRTHDATE,DEATHDATE,MARITAL,RACE,ETHNICITY,GENDER,CITY,LAT,LON,HEALTHCARE_EXPENSES,HEALTHCARE_COVERAGE,STUDY,PSPID,AGE
0,27b0d72c-f2fb-7e25-38c0-7d5120ebbedf,1945,2021,M,white,nonhispanic,F,Kingston,41.996106,-70.786205,63141.11,3176.78,Allergy,b39635c709eb0bb9fad25a234e38b38e,76
1,17f0c6d9-8931-8839-66cb-3ca6fb066d3e,1959,2021,M,white,nonhispanic,M,Sutton,42.12613,-71.749848,1333971.83,4339.05,Allergy,006ac4c59e4aeee8ea94f75f6080bce1,62
2,aff157cc-b6d3-412b-ccbe-bfd5fac1c2d5,1991,2021,M,white,nonhispanic,M,Haverhill,42.763237,-71.117704,735728.62,4015.4,Allergy,a0a9af4cf38d5c1796cfe65ae7f20f16,30
1047,4c523ab1-3255-911a-7eef-3bc3e48b7b05,1961,2021,M,white,nonhispanic,M,Marblehead,42.539997,-70.904396,1564112.49,4164.68,COVID-19,e05850a25837df82e321894d789f2e31,60
1048,8d49cb05-b1c3-b8fe-dc75-e43ad861f074,1920,2020,M,native,hispanic,F,Westfield,42.173612,-72.740437,1592240.0,7091.05,COVID-19,44e35b270b123de0b7709955081c0f4f,100
1049,fce5ed5c-e218-4839-a0c5-6ef906e5ae73,1920,2021,S,native,hispanic,F,Westfield,42.164697,-72.796595,1516470.0,13088.44,COVID-19,9fda66aeaa2d82203a86f87e47152f50,101


In [4]:
# Observations

observation_cov = pd.read_csv(material_path_covid19 + "/observations.csv")
observation_cov["STUDY"] = 2

observation_all = pd.read_csv(material_path_allergy + "/observations.csv")
observation_all["STUDY"] = 1

observation = pd.concat([observation_all, observation_cov]).drop_duplicates()

observation = observation.drop(['ENCOUNTER', 'TYPE'], axis=1)

observation["DATE"] = pd.to_datetime(observation["DATE"])

pd.concat([observation.head(3), observation.tail(3)])

Unnamed: 0,DATE,PATIENT,CODE,DESCRIPTION,VALUE,UNITS,STUDY
0,2011-04-11 11:40:19+00:00,3575b903-dbd0-1d55-6146-9e8aa4ed52a5,8302-2,Body Height,152.6,cm,1
1,2011-04-11 11:40:19+00:00,3575b903-dbd0-1d55-6146-9e8aa4ed52a5,72514-3,Pain severity - 0-10 verbal numeric rating [Sc...,2.0,{score},1
2,2011-04-11 11:40:19+00:00,3575b903-dbd0-1d55-6146-9e8aa4ed52a5,29463-7,Body Weight,65.9,kg,1
119934,2018-02-27 06:00:19+00:00,fce5ed5c-e218-4839-a0c5-6ef906e5ae73,QOLS,QOLS,0.9,{score},2
119935,2019-02-27 06:00:19+00:00,fce5ed5c-e218-4839-a0c5-6ef906e5ae73,QOLS,QOLS,0.9,{score},2
119936,2020-02-27 06:00:19+00:00,fce5ed5c-e218-4839-a0c5-6ef906e5ae73,QOLS,QOLS,1.0,{score},2


In [5]:
# Conditions

condition_cov = pd.read_csv(material_path_covid19 + "/conditions.csv")
condition_cov["STUDY"] = 2

condition_all = pd.read_csv(material_path_allergy + "/conditions.csv")
condition_all["STUDY"] = 1

condition = pd.concat([condition_all, condition_cov]).drop_duplicates()

condition = condition.drop(['ENCOUNTER'], axis=1)

condition["START"] = pd.to_datetime(condition["START"])
condition["STOP"] = condition.STOP.fillna(pd.to_datetime("today"))

condition["STOP"] = pd.to_datetime(condition["STOP"])

pd.concat([condition.head(3), condition.tail(3)])

Unnamed: 0,START,STOP,PATIENT,CODE,DESCRIPTION,STUDY
0,1946-01-11,2021-01-29 09:26:51.073537,3575b903-dbd0-1d55-6146-9e8aa4ed52a5,232353008,Perennial allergic rhinitis with seasonal vari...,1
1,1955-01-17,2021-01-29 09:26:51.073537,3575b903-dbd0-1d55-6146-9e8aa4ed52a5,162864005,Body mass index 30+ - obesity (finding),1
2,1999-06-06,2021-01-29 09:26:51.073537,17f0c6d9-8931-8839-66cb-3ca6fb066d3e,162864005,Body mass index 30+ - obesity (finding),1
6372,2020-03-25,2021-01-29 09:26:51.073537,8d49cb05-b1c3-b8fe-dc75-e43ad861f074,67782005,Acute respiratory distress syndrome (disorder),2
6373,1991-09-20,2021-01-29 09:26:51.073537,fce5ed5c-e218-4839-a0c5-6ef906e5ae73,230690007,Stroke,2
6374,2016-11-18,2021-01-29 09:26:51.073537,fce5ed5c-e218-4839-a0c5-6ef906e5ae73,49436004,Atrial Fibrillation,2


In [6]:
# Procedures

procedure_cov = pd.read_csv(material_path_covid19 + "/procedures.csv")
procedure_cov["STUDY"] = 2

procedure_all = pd.read_csv(material_path_allergy + "/procedures.csv")
procedure_all["STUDY"] = 1

procedure = pd.concat([procedure_all, procedure_cov])

procedure = procedure.drop(['ENCOUNTER', 'REASONCODE', 'REASONDESCRIPTION'], axis=1)

procedure["DATE"] = pd.to_datetime(procedure["DATE"])

pd.concat([procedure.head(3), procedure.tail(3)])

Unnamed: 0,DATE,PATIENT,CODE,DESCRIPTION,BASE_COST,STUDY
0,2012-12-12 14:56:19+00:00,3575b903-dbd0-1d55-6146-9e8aa4ed52a5,73761001,Colonoscopy,9209.61,1
1,2015-05-04 11:40:19+00:00,3575b903-dbd0-1d55-6146-9e8aa4ed52a5,430193006,Medication Reconciliation (procedure),414.03,1
2,2017-12-11 15:25:19+00:00,3575b903-dbd0-1d55-6146-9e8aa4ed52a5,73761001,Colonoscopy,13437.13,1
4454,2017-11-24 06:00:19+00:00,fce5ed5c-e218-4839-a0c5-6ef906e5ae73,18286008,Catheter ablation of tissue of heart,18651.66,2
4455,2018-11-30 06:00:19+00:00,fce5ed5c-e218-4839-a0c5-6ef906e5ae73,180325003,Electrical cardioversion,35410.97,2
4456,2019-12-06 06:00:19+00:00,fce5ed5c-e218-4839-a0c5-6ef906e5ae73,180325003,Electrical cardioversion,29709.1,2


## Data Lake & Data Warehouse
### Vorbereitung der Dimentionen für Data Warehouse mit Hilfe der Tabelle `patients`

* Spalten von Interesse herausnehmen
* Lösche reduntante Werte
* Erstellung neues Index
* Einfüge neue Spalte mit IDs

In [7]:
# gender
# select gender
gender = patient[['GENDER']]

# delete duplicated
gender = gender.drop_duplicates()

# new index
gender = gender.reset_index()

# new column with IDs
gender["ID"] = gender.index + 1
gender = gender.drop(['index'], axis=1)

gender

Unnamed: 0,GENDER,ID
0,F,1
1,M,2


In [8]:
# race
race = patient[['RACE']]
race = race.drop_duplicates()

race = race.reset_index()
race['ID'] = race.index + 1
race = race.drop('index', axis=1)

race

Unnamed: 0,RACE,ID
0,white,1
1,black,2
2,asian,3
3,native,4


In [9]:
# marital
marital = patient[['MARITAL']]
marital = marital.drop_duplicates()

marital = marital.reset_index()
marital['ID'] = marital.index + 1
marital = marital.drop('index', axis=1)

marital

Unnamed: 0,MARITAL,ID
0,M,1
1,S,2


In [10]:
# ethnicity
ethnicity = patient[['ETHNICITY']]
ethnicity = ethnicity.drop_duplicates()

ethnicity = ethnicity.reset_index()
ethnicity['ID'] = ethnicity.index + 1
ethnicity = ethnicity.drop('index', axis=1)

ethnicity

Unnamed: 0,ETHNICITY,ID
0,nonhispanic,1
1,hispanic,2


In [11]:
# study
study = patient[['STUDY']]
study = study.drop_duplicates()

study = study.reset_index()
study['ID'] = study.index + 1
study = study.drop('index', axis=1)

study

Unnamed: 0,STUDY,ID
0,Allergy,1
1,COVID-19,2


In [12]:
# city
city = patient[['CITY']]
city = city.drop_duplicates()

city = city.reset_index()
city['ID'] = city.index + 1
city = city.drop('index', axis=1)

city.head(3)

Unnamed: 0,CITY,ID
0,Kingston,1
1,Sutton,2
2,Haverhill,3


## Tabellen für Data Lake und Data Warehouse

In [13]:
sql_table_dwh = {} # Data Warehouse
sql_table_lake = {} # Date Lake

### Tabellen für "rohe" Daten in Data Lake

In [14]:
# patient: Id	BIRTHDATE	DEATHDATE	MARITAL	RACE	ETHNICITY	GENDER	CITY	LAT	LON	HEALTHCARE_EXPENSES	HEALTHCARE_COVERAGE	STUDY	PSPID	AGE
sql_table_lake['patient'] = """
  create table if not exists patient(
    ID VARCHAR,
    BIRTHDATE INTEGER,
    DEATHDATE INTEGER,
    MARITAL VARCHAR,
    RACE VARCHAR,
    ETHNICITY VARCHAR,
    GENDER VARCHAR,
    CITY VARCHAR,
    LAT DOUBLE,
    LON DOUBLE,
    HEALTHCARE_EXPENSES DOUBLE,
    HEALTHCARE_COVERAGE DOUBLE,
    STUDY VARCHAR,
    PSPID VARCHAR,
    AGE INTEGER
  );
"""


In [15]:
# observations: DATE	PATIENT	CODE	DESCRIPTION	VALUE	UNITS	STUDY
sql_table_lake['observation'] = """
  create table if not exists observation(
    DATE DATE,
    PATIENT VARCHAR,
    CODE VARCHAR,
    DESCRIPTION VARCHAR,
    VALUE VARCHAR,
    UNITS VARCHAR,
    STUDY VARCHAR
  );
"""

In [16]:
# conditions: START	STOP	PATIENT	CODE	DESCRIPTION	STUDY
sql_table_lake['condition'] = """
  create table if not exists condition(
    START DATE,
    STOP DATE,
    PATIENT VARCHAR,
    CODE VARCHAR,
    DESCRIPTION VARCHAR,
    STUDY VARCHAR
  );
"""

In [17]:
# procedures: DATE	PATIENT	CODE	DESCRIPTION	BASE_COST	STUDY
sql_table_lake['procedures'] = """
  create table if not exists procedure(
    DATE DATE,
    PATIENT VARCHAR,
    CODE VARCHAR,
    DESCRIPTION VARCHAR,
    BASE_COST VARCHAR,
    STUDY VARCHAR
  );
"""

## Dimension Tables für Data Lake und Data Warehouse

In [18]:
# patient
sql_table_dwh['dimPatient'] = """
  create table if not exists dimPatient(
    ID VARCHAR,
    BIRTHDATE INTEGER,
    DEATHDATE INTEGER,
    LAT DOUBLE,
    LON DOUBLE,
    HEALTHCARE_EXPENSES DOUBLE,
    HEALTHCARE_COVERAGE DOUBLE,
    PSPID VARCHAR PRIMARY KEY,
    AGE INTEGER
  );
"""

In [19]:
# Gender
sql_table_dwh['dimGender'] = """
  create table if not exists dimGender(
    ID INTEGER PRIMARY KEY,
    GENDER VARCHAR UNIQUE NOT NULL
  );
"""

In [20]:
# Study
sql_table_dwh['dimStudy'] = """
  create table if not exists dimStudy(
    ID INTEGER PRIMARY KEY,
    STUDY VARCHAR UNIQUE NOT NULL
  );
"""

In [21]:
# City
sql_table_dwh['dimCity'] = """
  create table if not exists dimCity(
    ID INTEGER PRIMARY KEY,
    CITY VARCHAR UNIQUE NOT NULL
  );
"""

In [22]:
# Ethnicity
sql_table_dwh['dimEthnicity'] = """
  create table if not exists dimEthnicity(
    ID INTEGER PRIMARY KEY,
    ETHNICITY VARCHAR UNIQUE NOT NULL
  );
"""

In [23]:
# Marital
sql_table_dwh['dimMarital'] = """
  create table if not exists dimMarital(
    ID INTEGER PRIMARY KEY,
    MARITAL VARCHAR UNIQUE NOT NULL
  );
"""

In [24]:
# Race
sql_table_dwh['dimRace'] = """
  create table if not exists dimRace(
    ID INTEGER PRIMARY KEY,
    RACE VARCHAR UNIQUE NOT NULL
  );
"""

In [25]:
# SNOMED
sql_table_dwh['dimSnomed'] = """
  create table if not exists dimSnomed(
    CODE VARCHAR PRIMARY KEY,
    DESCRIPTION VARCHAR UNIQUE NOT NULL
  );
"""

In [26]:
# LOINC
sql_table_dwh['dimLoinc'] = """
  create table if not exists dimLoinc(
    CODE VARCHAR PRIMARY KEY,
    DESCRIPTION VARCHAR UNIQUE NOT NULL
  );
"""

In [27]:
# show tables
print(sql_table_lake.keys())
print(sql_table_dwh.keys())

dict_keys(['patient', 'observation', 'condition', 'procedures'])
dict_keys(['dimPatient', 'dimGender', 'dimStudy', 'dimCity', 'dimEthnicity', 'dimMarital', 'dimRace', 'dimSnomed', 'dimLoinc'])


### Funktion für die Verbindung mit einer SQLite-Datenbank

In [28]:
def connect_to_db(db_file):
    sqlite3_conn = None
    try:
        sqlite3_conn = sq.connect(db_file)
        return sqlite3_conn

    except Error as err:
        print(err)

        if sqlite3_conn is not None:
            sqlite3_conn.close()

### Herstellung der Tabellen im Data Lake

In [29]:
conn_lake = connect_to_db(db_file_path_lake)
if conn_lake is not None:
        cursor_lake = conn_lake.cursor()
        for name in sql_table_lake.keys():
          print(name)          
          cursor_lake.execute(sql_table_lake[name])

        for name in sql_table_dwh.keys():
          print(name)
          cursor_lake.execute(sql_table_dwh[name])
else:
        print('Connection to database failed')

patient
observation
condition
procedures
dimPatient
dimGender
dimStudy
dimCity
dimEthnicity
dimMarital
dimRace
dimSnomed
dimLoinc


### Einfüge der Information der Data Frames in Data Lake

In [30]:
# raw data
patient.to_sql(name = 'patient', con=conn_lake, if_exists='append', index=False)
observation.to_sql(name = 'observation', con=conn_lake, if_exists='append', index=False)
condition.to_sql(name = 'condition', con=conn_lake, if_exists='append', index=False)
procedure.to_sql(name = 'procedure', con=conn_lake, if_exists='append', index=False)

# dimensions
gender.to_sql(name = 'dimGender', con=conn_lake, if_exists='append', index=False)
study.to_sql(name = 'dimStudy', con=conn_lake, if_exists='append', index=False)
city.to_sql(name = 'dimCity', con=conn_lake, if_exists='append', index=False)
ethnicity.to_sql(name = 'dimEthnicity', con=conn_lake, if_exists='append', index=False)
marital.to_sql(name = 'dimMarital', con=conn_lake, if_exists='append', index=False)
race.to_sql(name = 'dimRace', con=conn_lake, if_exists='append', index=False)

### Herstellung der Dimenton Tabellen im Data Warehouse

In [31]:
conn_dwh = connect_to_db(db_file_path_cov_alle)
if conn_dwh is not None:
        cursor_dwh = conn_dwh.cursor()
        for name in sql_table_dwh.keys():
          print(name)
          
          cursor_dwh.execute(sql_table_dwh[name])
else:
        print('Connection to database failed')

dimPatient
dimGender
dimStudy
dimCity
dimEthnicity
dimMarital
dimRace
dimSnomed
dimLoinc


### Einfüge der Information der Data Frames in Data Warehouse
In dem Fall der Patient-Tabelle werden einige Spalten gelöscht und durch IDs erzets.

In [32]:
# dimPatien dimGender', 'dimStudy', 'dimCity', 'dimEthnicity', 'dimMarital', 'dimRace

# drop some columns from patients
patient_to_dim = patient.drop(['MARITAL', 'RACE', 'GENDER', 'CITY', 'STUDY', 'ETHNICITY'], axis=1)

patient_to_dim.to_sql(name = 'dimPatient', con=conn_dwh, if_exists='append', index=False)
gender.to_sql(name = 'dimGender', con=conn_dwh, if_exists='append', index=False)
study.to_sql(name = 'dimStudy', con=conn_dwh, if_exists='append', index=False)
city.to_sql(name = 'dimCity', con=conn_dwh, if_exists='append', index=False)
ethnicity.to_sql(name = 'dimEthnicity', con=conn_dwh, if_exists='append', index=False)
marital.to_sql(name = 'dimMarital', con=conn_dwh, if_exists='append', index=False)
race.to_sql(name = 'dimRace', con=conn_dwh, if_exists='append', index=False)


### Extraktion von SNOMED-CT und LOINC from aus Data Lake für die Dimentionen in Data Warehouse

**SQL-Erklärung**: Selektiert die verschiedene `code` und `description` der Tabellen ` procedure` und `condition` für SNOMED und `observation` für LOINC, davon für jede `code` nur die längste ` description` nehmen (es gibt `code` mit verschiedenen `description`), und sortiert die das Ergebnis nach `code`.

Solche SQL-Statement wird in einem Data Frame gespeichert, und in der Dimention Tabellen in Data Lake und Data Warehouse eingefügt.

In [33]:
# SNOMED-CT
snomed = pd.read_sql_query("""
select distinct code, description from(
  select distinct code, description FROM "procedure" p  
    union
  select distinct code, description FROM "condition" c
) as snomed 
group by code
having max(LENGTH(description))
order by code    
;""", conn_lake
  )

snomed.to_sql(name = 'dimSnomed', con=conn_dwh, if_exists='append', index=False)
snomed.to_sql(name = 'dimSnomed', con=conn_lake, if_exists='append', index=False)
snomed.head(3)

Unnamed: 0,code,description
0,10383002,Counseling for termination of pregnancy
1,104091002,Hemoglobin / Hematocrit / Platelet count
2,104326007,Measurement of Varicella-zoster virus antibody


In [34]:
loinc = pd.read_sql_query("""
select distinct code, description from(
  select distinct code, description FROM observation 
) as loinc 
group by code
having max(LENGTH(description))
order by code
;""", conn_lake
  )

loinc.to_sql(name = 'dimLoinc', con=conn_dwh, if_exists='append', index=False)
loinc.to_sql(name = 'dimLoinc', con=conn_lake, if_exists='append', index=False)

loinc.head(3)

Unnamed: 0,code,description
0,10230-1,Left ventricular Ejection fraction
1,10834-0,Globulin [Mass/volume] in Serum by calculation
2,14804-9,Lactate dehydrogenase [Enzymatic activity/volu...


## Fakten Tabllen in Data Warehouse
* `factObservation`
* `factProcedure`
* `factCondition`

Jede Tabelle besitzt Indizes an jede Spalte mit IDs

In [35]:
sql_table_dwh = {} # tables
sql_index_dwh = {} # indices

In [36]:
# factObservation
sql_table_dwh['factObservation'] = """
  create table if not exists factObservation(
    PATIENT_PSPID VARCHAR REFERENCES dimPatient(PSPID),
    BIRTHYEAR INTEGER,
    DEATHYEAR INTEGER,
    MARITAL_ID VARCHAR REFERENCES dimMarital(ID),
    RACE_ID VARCHAR REFERENCES dimRace(ID),
    ETHNICITY_ID VARCHAR REFERENCES dimEthnicity(ID),
    GENDER_ID VARCHAR REFERENCES dimGender(ID),
    CITY_ID VARCHAR REFERENCES dimCity(ID),
    STUDY_ID VARCHAR REFERENCES dimStudy(ID),
    AGE INTEGER,
    DATE DATE,
    LOINC VARCHAR REFERENCES dimLoinc(CODE),
    VALUE VARCHAR,
    UNITS VARCHAR
  );
"""

sql_index_dwh["ix_factObservation_patient"] = """CREATE INDEX if not exists ix_factObservation_patient on factObservation(PATIENT_PSPID);"""
sql_index_dwh["ix_factObservation_marital"] = """CREATE INDEX if not exists ix_factObservation_marital on factObservation(MARITAL_ID);"""
sql_index_dwh["ix_factObservation_race"] = """CREATE INDEX if not exists ix_factObservation_race on factObservation(RACE_ID);"""
sql_index_dwh["ix_factObservation_ethnicity"] = """CREATE INDEX if not exists ix_factObservation_ethnicity on factObservation(ETHNICITY_ID);"""
sql_index_dwh["ix_factObservation_gender"] = """CREATE INDEX if not exists ix_factObservation_gender on factObservation(GENDER_ID);"""
sql_index_dwh["ix_factObservation_city"] = """CREATE INDEX if not exists ix_factObservation_city on factObservation(CITY_ID);"""
sql_index_dwh["ix_factObservation_study"] = """CREATE INDEX if not exists ix_factObservation_study on factObservation(STUDY_ID);"""
sql_index_dwh["ix_factObservation_loinc"] = """CREATE INDEX if not exists ix_factObservation_loinc on factObservation(LOINC);"""

In [37]:
# factProcedure
sql_table_dwh['factProcedure'] = """
  create table if not exists factProcedure(
    PATIENT_PSPID VARCHAR REFERENCES dimPatient(PSPID),
    BIRTHYEAR INTEGER,
    DEATHYEAR INTEGER,
    MARITAL_ID VARCHAR REFERENCES dimMarital(ID),
    RACE_ID VARCHAR REFERENCES dimRace(ID),
    ETHNICITY_ID VARCHAR REFERENCES dimEthnicity(ID),
    GENDER_ID VARCHAR REFERENCES dimGender(ID),
    CITY_ID VARCHAR REFERENCES dimCity(ID),
    STUDY_ID VARCHAR REFERENCES dimStudy(ID),
    AGE INTEGER,
    DATE DATE,
    SNOMED VARCHAR REFERENCES dimSnomed(CODE)
  );
"""

sql_index_dwh["ix_factProcedure_patient"] = """CREATE INDEX if not exists ix_factProcedure_patient on factProcedure(PATIENT_PSPID);"""
sql_index_dwh["ix_factProcedure_marital"] = """CREATE INDEX if not exists ix_factProcedure_marital on factProcedure(MARITAL_ID);"""
sql_index_dwh["ix_factProcedure_race"] = """CREATE INDEX if not exists ix_factProcedure_race on factProcedure(RACE_ID);"""
sql_index_dwh["ix_factProcedure_ethnicity"] = """CREATE INDEX if not exists ix_factProcedure_ethnicity on factProcedure(ETHNICITY_ID);"""
sql_index_dwh["ix_factProcedure_gender"] = """CREATE INDEX if not exists ix_factProcedure_gender on factProcedure(GENDER_ID);"""
sql_index_dwh["ix_factProcedure_city"] = """CREATE INDEX if not exists ix_factProcedure_city on factProcedure(CITY_ID);"""
sql_index_dwh["ix_factProcedure_study"] = """CREATE INDEX if not exists ix_factProcedure_study on factProcedure(STUDY_ID);"""
sql_index_dwh["ix_factProcedure_snomed"] = """CREATE INDEX if not exists ix_factProcedure_snomed on factProcedure(SNOMED);"""

In [38]:
# factCondition
sql_table_dwh['factCondition'] = """
  create table if not exists factCondition(
    PATIENT_PSPID VARCHAR REFERENCES dimPatient(PSPID),
    BIRTHYEAR INTEGER,
    DEATHYEAR INTEGER,
    MARITAL_ID VARCHAR REFERENCES dimMarital(ID),
    RACE_ID VARCHAR REFERENCES dimRace(ID),
    ETHNICITY_ID VARCHAR REFERENCES dimEthnicity(ID),
    GENDER_ID VARCHAR REFERENCES dimGender(ID),
    CITY_ID VARCHAR REFERENCES dimCity(ID),
    STUDY_ID VARCHAR REFERENCES dimStudy(ID),
    AGE INTEGER,
    START DATE,
    STOP DATE,
    SNOMED VARCHAR REFERENCES dimSnomed(CODE)
  );
"""

sql_index_dwh["ix_factCondition_patient"] = """CREATE INDEX if not exists ix_factCondition_patient on factCondition(PATIENT_PSPID);"""
sql_index_dwh["ix_factCondition_marital"] = """CREATE INDEX if not exists ix_factCondition_marital on factCondition(MARITAL_ID);"""
sql_index_dwh["ix_factCondition_race"] = """CREATE INDEX if not exists ix_factCondition_race on factCondition(RACE_ID);"""
sql_index_dwh["ix_factCondition_ethnicity"] = """CREATE INDEX if not exists ix_factCondition_ethnicity on factCondition(ETHNICITY_ID);"""
sql_index_dwh["ix_factCondition_gender"] = """CREATE INDEX if not exists ix_factCondition_gender on factCondition(GENDER_ID);"""
sql_index_dwh["ix_factCondition_city"] = """CREATE INDEX if not exists ix_factCondition_city on factCondition(CITY_ID);"""
sql_index_dwh["ix_factCondition_study"] = """CREATE INDEX if not exists ix_factCondition_study on factCondition(STUDY_ID);"""
sql_index_dwh["ix_factCondition_snomed"] = """CREATE INDEX if not exists ix_factCondition_snomed on factCondition(SNOMED);"""

In [39]:
print(sql_table_dwh.keys()) # show tables
print(sql_index_dwh.keys()) # show indices

dict_keys(['factObservation', 'factProcedure', 'factCondition'])
dict_keys(['ix_factObservation_patient', 'ix_factObservation_marital', 'ix_factObservation_race', 'ix_factObservation_ethnicity', 'ix_factObservation_gender', 'ix_factObservation_city', 'ix_factObservation_study', 'ix_factObservation_loinc', 'ix_factProcedure_patient', 'ix_factProcedure_marital', 'ix_factProcedure_race', 'ix_factProcedure_ethnicity', 'ix_factProcedure_gender', 'ix_factProcedure_city', 'ix_factProcedure_study', 'ix_factProcedure_snomed', 'ix_factCondition_patient', 'ix_factCondition_marital', 'ix_factCondition_race', 'ix_factCondition_ethnicity', 'ix_factCondition_gender', 'ix_factCondition_city', 'ix_factCondition_study', 'ix_factCondition_snomed'])


###Herstellung der Fakten Tabellen und Indizes in Data Warehouse

In [40]:
if conn_dwh is not None:
        # cursor_dwh = conn_dwh.cursor()
        for name in sql_table_dwh.keys():
          print(name)
          
          cursor_dwh.execute(sql_table_dwh[name])

        for ix_name in sql_index_dwh.keys():
          print(ix_name)

          cursor_dwh.execute(sql_index_dwh[ix_name])
else:
        print('Connection to database failed')

factObservation
factProcedure
factCondition
ix_factObservation_patient
ix_factObservation_marital
ix_factObservation_race
ix_factObservation_ethnicity
ix_factObservation_gender
ix_factObservation_city
ix_factObservation_study
ix_factObservation_loinc
ix_factProcedure_patient
ix_factProcedure_marital
ix_factProcedure_race
ix_factProcedure_ethnicity
ix_factProcedure_gender
ix_factProcedure_city
ix_factProcedure_study
ix_factProcedure_snomed
ix_factCondition_patient
ix_factCondition_marital
ix_factCondition_race
ix_factCondition_ethnicity
ix_factCondition_gender
ix_factCondition_city
ix_factCondition_study
ix_factCondition_snomed


### Auswahl der Information in Data Lake für Fakt Tabellen und einfügen in Fakt Tabellen in Data Warehouse

- Auswahl der Information mit Hilfe von Select-Statement und speichern in Data Frame
- Einfügen des Data Frames in Fakt Tabellen in Data Warehouse

In [41]:
# fatObservation
factObservation = pd.read_sql_query("""
select DISTINCT 
  PSPID PATIENT_PSPID, 
  BIRTHDATE BIRTHYEAR,
  DEATHDATE DEATHYEAR,
  dm.ID MARITAL_ID,
  dr.ID RACE_ID ,
  de.ID ETHNICITY_ID,
  dg.ID GENDER_ID,
  dc.ID CITY_ID,
  ds.ID STUDY_ID,
  AGE,
  o.date DATE,
  o.CODE LOINC,
  o.VALUE,
  o.UNITS 
from patient pat 
join dimMarital dm
  on dm.MARITAL = pat.MARITAL
join dimRace dr
  on dr.RACE = pat.RACE
join dimEthnicity de
  on de.ETHNICITY = pat.ETHNICITY 
join dimCity dc 
  on dc.CITY = pat.CITY
join dimGender dg
  on dg.GENDER = pat.GENDER
join dimStudy ds
  on ds.STUDY = pat.STUDY
join observation o
  on o.PATIENT = pat.Id
;"""
  , conn_lake)  

factObservation.to_sql(name='factObservation', con=conn_dwh, if_exists='append', index=False)

In [42]:
# factProcedure
factProcedure = pd.read_sql_query(""" 
  select DISTINCT 
  PSPID PATIENT_PSPID, 
  BIRTHDATE BIRTHYEAR,
  DEATHDATE DEATHYEAR,
  dm.ID MARITAL_ID,
  dr.ID RACE_ID ,
  de.ID ETHNICITY_ID,
  dg.ID GENDER_ID,
  dc.ID CITY_ID,
  ds.ID STUDY_ID,
  AGE,
  p.date DATE,
  p.CODE SNOMED
from patient pat 
join dimMarital dm
  on dm.MARITAL = pat.MARITAL
join dimRace dr
  on dr.RACE = pat.RACE
join dimEthnicity de
  on de.ETHNICITY = pat.ETHNICITY 
join dimCity dc 
  on dc.CITY = pat.CITY
join dimGender dg
  on dg.GENDER = pat.GENDER
join dimStudy ds
  on ds.STUDY = pat.STUDY
join "procedure" p
  on p.PATIENT = pat.Id
  ;""", conn_lake)

factProcedure.to_sql(name='factProcedure', con=conn_dwh, if_exists='append', index=False)

In [43]:
# factCondition
factCondition = pd.read_sql_query("""
   select DISTINCT 
  PSPID PATIENT_PSPID, 
  BIRTHDATE BIRTHYEAR,
  DEATHDATE DEATHYEAR,
  dm.ID MARITAL_ID,
  dr.ID RACE_ID ,
  de.ID ETHNICITY_ID,
  dg.ID GENDER_ID,
  dc.ID CITY_ID,
  ds.ID STUDY_ID,
  AGE,
  c.START,
  c.STOP,
  c.CODE SNOMED
from patient pat 
join dimMarital dm
  on dm.MARITAL = pat.MARITAL
join dimRace dr
  on dr.RACE = pat.RACE
join dimEthnicity de
  on de.ETHNICITY = pat.ETHNICITY 
join dimCity dc 
  on dc.CITY = pat.CITY
join dimGender dg
  on dg.GENDER = pat.GENDER
join dimStudy ds
  on ds.STUDY = pat.STUDY
join "condition" c
  on c.PATIENT = pat.Id 
    ;""", conn_lake)


factCondition.to_sql(name='factCondition', con=conn_dwh, if_exists='append', index=False)

## Views in Data Warehouse
* `v_observations`
* `v_conditions`
* `v_procedures`

Solche Views dienen Erleichterung der Data Analysis.

In [44]:
cursor_dwh.executescript(
    """
-- Observations

create view v_observations as
select
  PATIENT_PSPID PATIENT,
  BIRTHYEAR,
  DEATHYEAR,
  dm.MARITAL,
  dr.RACE,
  de.ETHNICITY,
  dg.GENDER,
  dc.CITY ,
  AGE,
  DATE,
  LOINC,
  dl.description DESCRIPTION,
  VALUE,
  UNITS,
  ds.STUDY 
from factObservation fo
join dimMarital dm
  on fo.MARITAL_ID = dm.ID
join dimRace dr
  on dr.ID = fo.RACE_ID
join dimEthnicity de 
  on de.ID = fo.ETHNICITY_ID 
join dimGender dg 
  on dg.ID = fo.GENDER_ID
join dimCity dc 
  on dc.ID = fo.CITY_ID
join dimLoinc dl 
  on dl.code = fo.LOINC
join dimStudy ds
  on ds.ID = fo.STUDY_ID
;

-- Conditions

create view v_conditions as
select
  PATIENT_PSPID PATIENT,
  BIRTHYEAR,
  DEATHYEAR,
  dm.MARITAL,
  dr.RACE,
  de.ETHNICITY,
  dg.GENDER,
  dc.CITY ,
  AGE,
  "START" ,
  STOP ,
  SNOMED ,
  dsn.description DESCRIPTION,
  ds.STUDY 
from factCondition fc
join dimMarital dm
  on fc.MARITAL_ID = dm.ID
join dimRace dr
  on dr.ID = fc.RACE_ID
join dimEthnicity de 
  on de.ID = fc.ETHNICITY_ID 
join dimGender dg 
  on dg.ID = fc.GENDER_ID
join dimCity dc 
  on dc.ID = fc.CITY_ID
join dimSnomed dsn 
  on dsn.code = fc.SNOMED 
join dimStudy ds
  on ds.ID = fc.STUDY_ID
;

-- Procedures
create view v_procedures as
select
  PATIENT_PSPID PATIENT,
  BIRTHYEAR,
  DEATHYEAR,
  dm.MARITAL,
  dr.RACE,
  de.ETHNICITY,
  dg.GENDER,
  dc.CITY ,
  AGE,
  DATE ,
  SNOMED ,
  dsn.description DESCRIPTION,
  ds.STUDY 
from factProcedure fc
join dimMarital dm
  on fc.MARITAL_ID = dm.ID
join dimRace dr
  on dr.ID = fc.RACE_ID
join dimEthnicity de 
  on de.ID = fc.ETHNICITY_ID 
join dimGender dg 
  on dg.ID = fc.GENDER_ID
join dimCity dc 
  on dc.ID = fc.CITY_ID
join dimSnomed dsn 
  on dsn.code = fc.SNOMED 
join dimStudy ds
  on ds.ID = fc.STUDY_ID
;
  """
)


<sqlite3.Cursor at 0x7fa3e0df1490>

In [45]:
# commit and close connections

conn_dwh.commit()
conn_lake.commit()

conn_dwh.close()
conn_lake.close()