<a href="https://colab.research.google.com/github/Fuenfgeld/DMA2024TeamC/blob/main/Code/ETL2Datawarehouse.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

**Kursarbeit Datenmanagement und -Archivierung WS 23/24**

Master-Code Name-CALICO:MA


**Erstellung der Datenbank**


*   Datenbankinitialisierung:



> Laden Libraries

In [1]:
import pandas as pd
from functools import reduce
import sqlite3
from sqlite3 import Error
import csv
import requests

> Verbindung zu Googledrive herstellen (Ablageort der CSV-Dateien)

In [2]:
from google.colab import drive
drive.mount("/content/drive", force_remount=True)

Mounted at /content/drive




> Verbindung zu SQLite herstellen und Datebankinitialisierung Local




In [3]:
def create_connection_local(local_path):
  conn = None;
  try:
    #Establishing the connection
    conn = sqlite3.connect(local_path+'/datawarehouse_new.db')
    return conn
    print(sqlite3.version)
  except Error as e:
    print("Error while connecting to sqlite", e)
conn = create_connection_local('/content/drive/MyDrive/Datenmanagement_und_Archivierung_im_Umfeld_der_Forschung/CALICO_MA')
# Creating a cursor object using the cursor() method
cur = conn.cursor()
print("Successfully Connected to SQLite Public Data Warehouse")


Successfully Connected to SQLite Public Data Warehouse


> Quelldatenimport

In [4]:
# Load cancer CSVs
procedures = pd.read_csv('/content/drive/MyDrive/Datenmanagement_und_Archivierung_im_Umfeld_der_Forschung/CALICO_MA/Data_source/procedures.csv', sep=",")
encounters = pd.read_csv('/content/drive/MyDrive/Datenmanagement_und_Archivierung_im_Umfeld_der_Forschung/CALICO_MA/Data_source/encounters.csv', sep=",")
immunizations = pd.read_csv('/content/drive/MyDrive/Datenmanagement_und_Archivierung_im_Umfeld_der_Forschung/CALICO_MA/Data_source/immunizations.csv', sep=",")
medications = pd.read_csv('/content/drive/MyDrive/Datenmanagement_und_Archivierung_im_Umfeld_der_Forschung/CALICO_MA/Data_source/medications.csv', sep=",")
observations = pd.read_csv('/content/drive/MyDrive/Datenmanagement_und_Archivierung_im_Umfeld_der_Forschung/CALICO_MA/Data_source/observations.csv', sep=",")
patients = pd.read_csv('/content/drive/MyDrive/Datenmanagement_und_Archivierung_im_Umfeld_der_Forschung/CALICO_MA/Data_source/patients.csv', sep=",")


#procedures = pd.read_csv('/content/drive/MyDrive/csv_data/breast_cancer/procedures.csv', sep=",")
#encounters = pd.read_csv('/content/drive/MyDrive/csv_data/breast_cancer/encounters.csv', sep=",")
#immunizations = pd.read_csv('/content/drive/MyDrive/csv_data/breast_cancer/immunizations.csv', sep=",")
#medications = pd.read_csv('/content/drive/MyDrive/csv_data/breast_cancer/medications.csv', sep=",")
#observations = pd.read_csv('/content/drive/MyDrive/csv_data/breast_cancer/observations.csv', sep=",")
#patients = pd.read_csv('/content/drive/MyDrive/csv_data/breast_cancer/patients.csv', sep=",")

In [None]:
#print(pd.read_sql_query("PRAGMA table_info('patients')", conn))

In [5]:
#Delete Tables in case they exist
cur.execute("DROP TABLE IF EXISTS procedures")
cur.execute("DROP TABLE IF EXISTS encounters")
cur.execute("DROP TABLE IF EXISTS immunizations")
cur.execute("DROP TABLE IF EXISTS medications")
cur.execute("DROP TABLE IF EXISTS observations")
cur.execute("DROP TABLE IF EXISTS patients")
cur.execute("DROP TABLE IF EXISTS facts_table")

<sqlite3.Cursor at 0x7d06e47b5340>

In [6]:
# Create tables including facts_table
sql_create_source_data = requests.get('https://raw.githubusercontent.com/Fuenfgeld/DMA2024TeamC/main/Code/create_statements_db.sql').text
cur.executescript(sql_create_source_data)
conn.commit()
print("Successfully created tables in the database")

Successfully created tables in the database


In [7]:
#Insert data into tables

procedures.to_sql('procedures', conn, if_exists='append', index=False)
encounters.to_sql('encounters', conn, if_exists='append', index=False)
immunizations.to_sql('immunizations', conn, if_exists='append', index=False)
medications.to_sql('medications', conn, if_exists='append', index=False)
observations.to_sql('observations', conn, if_exists='append', index=False)
patients.to_sql('patients', conn, if_exists='append', index=False)

conn.commit()

### Die Daten werden aus den verschiedenen Tabellen extrahiert und ggf. transformiert
 >Die Daten aus **Patients** werden erstmal in die Faktentabelle übertragen. Es handelt sich um die Krebsart, die Patienten ID, die geographische Koordinaten, die Healthcarekosten, die vom Patient getragen werden und diejenigen, die von der Krankenversicherung bezahlt werden, und zum Ende das Geburtsdatum.
Damit die Faktentabelle keine Redundanzen aufweist, werden die Encounter ID nicht mehr einzeln gespeichert. Stattdessen werden in der Faktentabelle für jeden Patienten die gesamten Kosten gespeichert, die in jeder Kategorie (Encounters, Observations, Immunizations, Procedures umd Medications) verursacht wurden.  
Die Transaktion in der Faktentabelle der Datenbank wird mit commit beendet.



In [None]:
# TEST how many patients undergo Procedures
#cur.execute('''SELECT COUNT(*) from procedures GROUP BY procedures.PATIENT''')
#records = cur.fetchall()
#i=1
#for row in records:
#  print(f'patient', i, 'undergoes ', row,' procedures')
#  i=i+1

In [8]:
# Datenextraktion aus der Patientstabelle
cur.execute('''INSERT INTO facts_table
                    ( cancer_type, patient_ID, encounter_ID, patient_LAT, patient_LON, patient_HEALTHCARE_EXPENSES , patient_HEALTHCARE_COVERAGE, patient_BIRTHDATE)
                    SELECT
                        p.cancer_type AS cancer_type,
                        p.Id AS patient_ID,
                        NULL AS encounter_ID,  -- Replace with the actual encounter_ID or set to NULL if not applicable
                        p.LAT AS patient_LAT,
                        p.LON AS patient_LON,
                        p.HEALTHCARE_EXPENSES AS patient_HEALTHCARE_EXPENSES,
                        p.HEALTHCARE_COVERAGE AS patient_HEALTHCARE_COVERAGE,
                        p.BIRTHDATE AS patient_BIRTHDATE
                    FROM patients p
                    ;''')

# Ende der Transaktion
conn.commit()

# Schnelle Überprüfung der Datenübertragung ins die Faktentabelle
cur.execute('''SELECT patient_ID, patient_LON, patient_LAT, patient_HEALTHCARE_EXPENSES, patient_HEALTHCARE_COVERAGE, patient_BIRTHDATE FROM facts_table''')
records = cur.fetchall()
# show 10 rows
for row in records[:10]:
  print(row)

('d2061cc7-bee0-0e6c-3ac4-15c197c474e0', '-71,113260249', '42,359925869', 1475230, '4244,64', '22/06/1956')
('073d8e80-ff90-1c8d-57e4-29bfca52c87f', '-72,026316805', '42,257845470', 1489125, '4016,36', '28/08/1964')
('e1ff7e68-4097-9faf-514d-e4cfcfdf252e', '-71,818448850', '42,204289674', '57142,44', '2103,2', '28/08/1998')
('a0f679cc-875f-dd72-ed13-9ca863ec6cf3', '-70,904084060', '42,150269736', 408520, 2709, '20/01/2004')
('e4166a9c-f7c5-bef7-ea43-b96281a2d586', '-71,784873172', '42,292081923', 320400, '2970,68', '23/03/2006')
('ee537126-7509-05de-2349-ab19604545d8', '-71,041350243', '42,347570821', 784465, '2738,96', '21/05/1991')
('826d5ce3-2b23-8bd8-b352-a77fdd083d68', '-71,147479855', '42,522342970', 1425900, '3841,36', '20/07/1964')
('0edfef0f-fa74-ab35-1122-4c537bb9765d', '-71,003576538', '42,003848630', 512475, '2470,64', '05/05/1997')
('0fe7e161-0f29-2b70-8e3d-757b15026295', '-71,684116431', '42,585372624', 302400, '2583,2', '26/12/2008')
('478e120c-67a4-4375-646f-c41e74f7247

Die Daten werden aus **Observations** eintragen:
nur die Tuples mit dem neuesten Datum werden berücksichtigt. Gespeichert werden die Art der Observation (QALY, DALY oder QoLS), den neuesten Wert und das entsprechende Datum. Ausserdem wird das Alter am Tag der letzten Observation berechnet und eingetragen. Die Transaktion wird mit commit beendet. Das Speichern der neuen Einträge wird überprüft.

In [9]:
# Datenextraktion aus der Observationstabelle
cur.execute('''UPDATE
                  facts_table
               SET
                  observations_CODE = obs.CODE,
                  observations_VALUE = obs.maxVALUE,
                  observations_DATE = obs.maxDATE,
                  observations_AGE = substring(obs.maxDATE,7,4) - substring(patient_BIRTHDATE,7,4)
               FROM
                    (SELECT PATIENT, CODE, MAX(VALUE) as maxVALUE, MAX(observations.DATE) as maxDATE
                    FROM observations
                    GROUP BY PATIENT
                    ) AS obs
                WHERE
                  facts_table.patient_ID = obs.PATIENT
                    ;''')

# Ende der Transaktion
conn.commit()

# Schnelle Überprüfung der Datenübertragung ins die Faktentabelle
cur.execute('''SELECT patient_ID, observations_CODE, observations_VALUE, patient_BIRTHDATE, observations_DATE, observations_AGE  FROM facts_table''')
records = cur.fetchall()
# show 10 rows
for row in records[:10]:
  print(row)

('d2061cc7-bee0-0e6c-3ac4-15c197c474e0', 'QALY', 63, '22/06/1956', '22/06/2020 10:47:46', 64)
('073d8e80-ff90-1c8d-57e4-29bfca52c87f', 'QALY', 55, '28/08/1964', '28/08/2020 16:45:45', 56)
('e1ff7e68-4097-9faf-514d-e4cfcfdf252e', 'QALY', 21, '28/08/1998', '28/08/2020 07:43:32', 22)
('a0f679cc-875f-dd72-ed13-9ca863ec6cf3', 'QALY', 15, '20/01/2004', '20/01/2020 15:27:37', 16)
('e4166a9c-f7c5-bef7-ea43-b96281a2d586', 'QALY', 13, '23/03/2006', '23/03/2020 22:58:03', 14)
('ee537126-7509-05de-2349-ab19604545d8', 'QALY', 28, '21/05/1991', '21/05/2020 01:23:21', 29)
('826d5ce3-2b23-8bd8-b352-a77fdd083d68', 'QALY', 55, '20/07/1964', '20/07/2020 05:08:46', 56)
('0edfef0f-fa74-ab35-1122-4c537bb9765d', 'QALY', 22, '05/05/1997', '04/05/2020 22:31:18', 23)
('0fe7e161-0f29-2b70-8e3d-757b15026295', 'QALY', 10, '26/12/2008', '26/12/2019 13:49:03', 11)
('478e120c-67a4-4375-646f-c41e74f72478', 'QALY', 49, '02/01/1971', '02/01/2020 11:38:08', 49)


Die Daten werden aus **Encounters** eintragen. Es handelt sich um die Summe aller Encounter Kosten für jeden einzelnen Patienten.
Falls dieser Wert nicht existiert, wird an ihrer Stelle 0 eingetragen.
Jede Transaktion wird mit commt beendet. Das Speichern der neuen Einträge in die Faktentabelle wird überprüft.


In [10]:
# Datenextraktion aus der Encounterstabelle und Berechnung der Summe aller Encounterkosten für jeden Patienten
cur.execute('''UPDATE
                  facts_table
               SET
                  encounter_Base_Encounter_Cost = enc.TOTALCOST
               FROM
                    (SELECT PATIENT, SUM(BASE_ENCOUNTER_COST) as TOTALCOST
                    FROM encounters, facts_table
                    WHERE facts_table.patient_ID = encounters.PATIENT
                    GROUP BY PATIENT
                    ) AS enc
                WHERE
                  facts_table.patient_ID = enc.PATIENT
                    ;''')

# Ende der Transaktion
conn.commit()

# Falls es keine Encounterkosten gibt, wird der Wert 0 stattdessen eingetragen
cur.execute('''UPDATE
                  facts_table
               SET
                  encounter_Base_Encounter_Cost = 0
                WHERE encounter_Base_Encounter_Cost IS NULL
                    ;''')

# Ende der Transaktion
conn.commit()

# Kurze Überprüfung
cur.execute('''SELECT patient_ID, encounter_BASE_ENCOUNTER_COST FROM facts_table''')
records = cur.fetchall()
# show 10 rows
for row in records[:10]:
  print(row)



('d2061cc7-bee0-0e6c-3ac4-15c197c474e0', 1290)
('073d8e80-ff90-1c8d-57e4-29bfca52c87f', 1032)
('e1ff7e68-4097-9faf-514d-e4cfcfdf252e', 1032)
('a0f679cc-875f-dd72-ed13-9ca863ec6cf3', 1290)
('e4166a9c-f7c5-bef7-ea43-b96281a2d586', 1290)
('ee537126-7509-05de-2349-ab19604545d8', 387)
('826d5ce3-2b23-8bd8-b352-a77fdd083d68', 1032)
('0edfef0f-fa74-ab35-1122-4c537bb9765d', 903)
('0fe7e161-0f29-2b70-8e3d-757b15026295', 1419)
('478e120c-67a4-4375-646f-c41e74f72478', 645)


In [11]:
cur.execute('''SELECT patient_ID, encounter_BASE_ENCOUNTER_COST, SUM(BASE_ENCOUNTER_COST)
                FROM facts_table, encounters WHERE facts_table.patient_ID = encounters.PATIENT
                GROUP BY patient_ID''')
records = cur.fetchall()
# show 10 rows
for row in records[:10]:
  print(row)

('001c3804-81e8-740e-dd91-973b584a2dd9', 387, 387.0)
('002a707d-ba35-3d1a-46c0-ad8a98a22137', 774, 774.0)
('003c0d44-9dfc-ca91-6121-373a8e265bd4', 1444, 1444.0)
('008b26ab-5c12-7a5b-cda4-f54484d5f749', 1290, 1290.0)
('009121bf-a672-8942-443e-85e18a33f766', 1161, 1161.0)
('00e798f6-7d02-8485-c5fd-4b6bfb114c69', 516, 516.0)
('00edcb46-afd4-4687-c16a-f4e20dfc3e92', 1186, 1186.0)
('0103a559-910a-03df-6117-ec429eeb4ac9', 774, 774.0)
('01518fe2-bd76-d1e9-a656-1e7eae35dd17', 1444, 1444.0)
('01581211-2e47-027b-71a0-dcbf0f4e6622', 1315, 1315.0)


Die Daten werden aus **Medications** eintragen. Es handelt sich um die Summe aller MedikationsKosten für jeden einzelnen Patienten.
Falls dieser Wert nicht existiert, wird an ihrer Stelle 0 eingetragen.
Jede Transaktion wird mit commt beendet. Das Speichern der neuen Einträge in die Faktentabelle wird überprüft.

In [12]:
cur.execute('''     UPDATE facts_table SET medications_TotalCost = med.TOTALCOST
                    FROM
                    (SELECT PATIENT, SUM(Totalcost) as TOTALCOST
                    FROM medications, facts_table
                    WHERE facts_table.patient_ID = medications.PATIENT
                    GROUP BY PATIENT) as med
                    WHERE facts_table.patient_ID = med.PATIENT
                    ;''')
conn.commit()

In [13]:
cur.execute('''UPDATE
                  facts_table
               SET
                  medications_TOTALCOST = 0
                WHERE medications_TOTALCOST IS NULL
                    ;''')
conn.commit()

In [14]:
cur.execute('''SELECT patient_ID, medications_TOTALCOST, SUM(TOTALCOST)
                FROM facts_table, medications WHERE facts_table.patient_ID = medications.PATIENT
                GROUP BY patient_ID''')
records = cur.fetchall()
# show 10 rows
for row in records[:10]:
  print(row)

('003c0d44-9dfc-ca91-6121-373a8e265bd4', 30327, 30327.0)
('01d6dc15-6313-5327-8491-49be72f40d59', 113859, 113859.0)
('023c4bb2-6dc3-972e-379a-af19b8a2392d', 117953, 117953.0)
('02db7f22-8617-0cf7-fa10-d820d596a81a', 101165, 101165.0)
('030287cd-dbd1-d7e1-6959-cfdfa0e271bd', 229, 229.0)
('037404e1-0c87-534d-0fe2-e21ef20640f6', 70050, 70050.0)
('038eb669-b744-c2bc-18c6-6e37068c722f', 49596, 49596.0)
('05023597-744e-9369-c390-e4090447a247', 63035, 63035.0)
('05a3a33e-e9b1-0aaa-66f4-f0e2b57dce04', 3216, 3216.0)
('072229fb-e8b1-a9ff-61b1-0cdab70c3c4c', 6614, 6614.0)


Die Daten werden aus **Procedures** eintragen. Es handelt sich um die Summe aller Prozedurenkosten für jeden einzelnen Patienten. Falls dieser Wert nicht existiert, wird an ihrer Stelle 0 eingetragen. Jede Transaktion wird mit commt beendet. Das Speichern der neuen Einträge in die Faktentabelle wird überprüft

In [15]:
cur.execute('''     UPDATE facts_table SET procedures_BASE_COST = proc.TOTALCOST
                    FROM
                    (SELECT PATIENT, SUM(BASE_COST) as TOTALCOST
                    FROM procedures, facts_table
                    WHERE facts_table.patient_ID = procedures.PATIENT
                    GROUP BY PATIENT) as proc
                    WHERE facts_table.patient_ID = proc.PATIENT
                    ;''')
conn.commit()

In [16]:
cur.execute('''UPDATE
                  facts_table
               SET
                  procedures_BASE_COST = 0
                WHERE procedures_BASE_COST IS NULL
                    ;''')
conn.commit()

In [17]:
cur.execute('''SELECT patient_ID, procedures_Base_COST, SUM(Base_COST)
                FROM facts_table, procedures WHERE facts_table.patient_ID = procedures.PATIENT
                GROUP BY patient_ID''')
records = cur.fetchall()
# show 10 rows
for row in records[:10]:
  print(row)

('003c0d44-9dfc-ca91-6121-373a8e265bd4', 297579, 297579.0)
('00edcb46-afd4-4687-c16a-f4e20dfc3e92', 23882, 23882.0)
('01518fe2-bd76-d1e9-a656-1e7eae35dd17', 22513, 22513.0)
('01581211-2e47-027b-71a0-dcbf0f4e6622', 34044, 34044.0)
('01d6dc15-6313-5327-8491-49be72f40d59', 405516, 405516.0)
('023c4bb2-6dc3-972e-379a-af19b8a2392d', 56168, 56168.0)
('030287cd-dbd1-d7e1-6959-cfdfa0e271bd', 34777, 34777.0)
('037404e1-0c87-534d-0fe2-e21ef20640f6', 28799, 28799.0)
('038eb669-b744-c2bc-18c6-6e37068c722f', 225337, 225337.0)
('05398088-a847-099e-3c61-c6153d83e024', 35181, 35181.0)


Die Daten werden aus **Immunizations** eintragen. Es handelt sich um die Summe aller Immunizationskosten für jeden einzelnen Patienten. Falls dieser Wert nicht existiert, wird an ihrer Stelle 0 eingetragen. Jede Transaktion wird mit commt beendet. Das Speichern der neuen Einträge in die Faktentabelle wird überprüft

In [18]:
cur.execute('''     UPDATE facts_table SET immunizations_Base_Cost = im.TOTALCOST
                    FROM
                    (SELECT PATIENT, SUM(Base_Cost) as TOTALCOST
                    FROM immunizations, facts_table
                    WHERE facts_table.patient_ID = immunizations.PATIENT
                    GROUP BY PATIENT) as im
                    WHERE facts_table.patient_ID = im.PATIENT
                    ;''')
conn.commit()

In [19]:
cur.execute('''UPDATE
                  facts_table
               SET
                  immunizations_BASE_COST = 0
                WHERE immunizations_BASE_COST IS NULL
                    ;''')
conn.commit()

In [20]:
cur.execute(''' SELECT patient_ID, immunizations_BASE_COST, SUM(BASE_COST) FROM facts_table, immunizations
                WHERE facts_table.patient_ID = immunizations.PATIENT
                GROUP BY patient_ID''' )
records = cur.fetchall()
# show 10 rows
for row in records[:10]:
  print(row)

('001c3804-81e8-740e-dd91-973b584a2dd9', 560, 560.0)
('002a707d-ba35-3d1a-46c0-ad8a98a22137', 1260, 1260.0)
('003c0d44-9dfc-ca91-6121-373a8e265bd4', 1400, 1400.0)
('008b26ab-5c12-7a5b-cda4-f54484d5f749', 1540, 1540.0)
('009121bf-a672-8942-443e-85e18a33f766', 1540, 1540.0)
('00e798f6-7d02-8485-c5fd-4b6bfb114c69', 980, 980.0)
('00edcb46-afd4-4687-c16a-f4e20dfc3e92', 1540, 1540.0)
('0103a559-910a-03df-6117-ec429eeb4ac9', 1260, 1260.0)
('01518fe2-bd76-d1e9-a656-1e7eae35dd17', 1820, 1820.0)
('01581211-2e47-027b-71a0-dcbf0f4e6622', 1680, 1680.0)


##Das Datawarehouse ist fast fertig ausgefüllt.
Hier werden die ersten Tupeln der Faktentabelle angezeigt.
Der "None"-Eintrag entspricht der Spalte für Encounters. Diese Spalte wird nicht benutzt, deshalb wurde sie nicht ausgefüllt.

In [21]:
cur.execute('''SELECT * FROM facts_table
''')
records = cur.fetchall()
# show 4 rows
for row in records[:30]:
  print(row)

('breast_cancer', 'd2061cc7-bee0-0e6c-3ac4-15c197c474e0', None, '42,359925869', '-71,113260249', 1475230, '4244,64', '22/06/1956', 'QALY', 63, '22/06/2020 10:47:46', 64, 0, 4230, 1540, 1290)
('breast_cancer', '073d8e80-ff90-1c8d-57e4-29bfca52c87f', None, '42,257845470', '-72,026316805', 1489125, '4016,36', '28/08/1964', 'QALY', 55, '28/08/2020 16:45:45', 56, 0, 0, 1540, 1032)
('breast_cancer', 'e1ff7e68-4097-9faf-514d-e4cfcfdf252e', None, '42,204289674', '-71,818448850', '57142,44', '2103,2', '28/08/1998', 'QALY', 21, '28/08/2020 07:43:32', 22, 0, 0, 1820, 1032)
('breast_cancer', 'a0f679cc-875f-dd72-ed13-9ca863ec6cf3', None, '42,150269736', '-70,904084060', 408520, 2709, '20/01/2004', 'QALY', 15, '20/01/2020 15:27:37', 16, 0, 0, 2240, 1290)
('breast_cancer', 'e4166a9c-f7c5-bef7-ea43-b96281a2d586', None, '42,292081923', '-71,784873172', 320400, '2970,68', '23/03/2006', 'QALY', 13, '23/03/2020 22:58:03', 14, 0, 0, 2660, 1290)
('breast_cancer', 'ee537126-7509-05de-2349-ab19604545d8', None

In [None]:
print(pd.read_sql_query("PRAGMA table_info('facts_table')", conn))

    cid                           name           type  notnull dflt_value  pk
0     0                    cancer_type         STRING        0       None   0
1     1                     patient_ID         STRING        0       None   0
2     2                   encounter_ID         STRING        0       None   0
3     3                    patient_LAT   DECIMAL(6,2)        0       None   0
4     4                    patient_LON   DECIMAL(6,2)        0       None   0
5     5    patient_HEALTHCARE_EXPENSES   DECIMAL(9,2)        0       None   0
6     6    patient_HEALTHCARE_COVERAGE   DECIMAL(9,2)        0       None   0
7     7              patient_BIRTHDATE           DATE        0       None   0
8     8              observations_CODE         STRING        0       None   0
9     9             observations_VALUE         STRING        0       None   0
10   10              observations_DATE           DATE        0       None   0
11   11               observations_AGE   DECIMAL(6,2)        0  