# 03-03 Fakten Aufgabe

## Hinweise zur Übung

Ziel dieser Übung ist die beispielhafte Erzeugung von Faktentabellen für das Data Warehouse-Szenario. Hierzu nutzen wir die Inhalte der Staging-Datenbanken als Datenquelle und transformieren daraus die Faktendaten und liefern sie in die Reporting-Datenbank aus.

Die Übung setzt auf den Ergebnissen der Extraktions (Übung 01) auf. Da in der Übung nur einzelne Tabellen in die Staging-Datenbanken übertragen wurden, setzen wir hier mit einem einheitlichen Stand aus vorbereiteten SQLite-Datenbanken auf, in die alle TSV-Dateien bereits importiert wurden.

Die Tabellenstruktur der zu erstellenden Faktentabellen inkl. der Spalten (nicht jedoch Datentypen) geht aus dem ER-Diagramm im `data`-Verzeichnis hervor. Die Datentypen wiederum können in den Dokumentationen der jeweiligen Qeulldatenbereiche (Bielefeld, Mannheim, Dimensionen) nachgeschlagen werden.

Der Block zur Konfiguration des Notebooks ist in dieser Übung etwas länger, da die zuvor in der Demo angelegten Tabellen hier in einheitlicher Form noch einmal "frisch" für das Notebook erzeugt werden, damit in den  weiteren Schritten darauf zugegriffen werden kann.


## Konfiguration des Notebooks

In [None]:
# Ggf. fehlende Pakete installieren
!pip install --quiet ipython-sql

In [None]:
import os
import sys
import urllib.request
%load_ext sql

In [None]:
# Konfiguration
base_url_quellen = "https://raw.githubusercontent.com/fau-lmi/lct-ehealth/main/07-ETL+DWH/data"
base_url_staging = "./"

In [None]:
# SQlite-Datenbanken aus Github auf den Jupyter-Server herunterladen
urllib.request.urlretrieve(base_url_quellen + "/datenbanken/stg_bielefeld.sqlite",  "stg_bielefeld.sqlite")
urllib.request.urlretrieve(base_url_quellen + "/datenbanken/stg_mannheim.sqlite",   "stg_mannheim.sqlite")
urllib.request.urlretrieve(base_url_quellen + "/datenbanken/stg_dimensions.sqlite", "stg_dimensions.sqlite")
urllib.request.urlretrieve(base_url_quellen + "/datenbanken/staging.sqlite",        "staging.sqlite")

In [None]:
# Datenbankverbindung als Pfad (für das ETL) & iPython SQL (für die Abfragen) herstellen
db_path_stg_bielefeld  = base_url_staging + "stg_bielefeld.sqlite"
db_path_stg_mannheim   = base_url_staging + "stg_mannheim.sqlite"
db_path_stg_dimensions = base_url_staging + "stg_dimensions.sqlite"
db_path_staging        = base_url_staging + "staging.sqlite"
db_path_reporting      = base_url_staging + "reporting.sqlite"

db_url_stg_bielefeld  = "sqlite:///" + db_path_stg_bielefeld
db_url_stg_mannheim   = "sqlite:///" + db_path_stg_mannheim
db_url_stg_dimensions = "sqlite:///" + db_path_stg_dimensions
db_url_staging        = "sqlite:///" + db_path_staging
db_url_reporting      = "sqlite:///" + db_path_reporting

%sql $db_url_stg_bielefeld
%sql $db_url_stg_mannheim
%sql $db_url_stg_dimensions
%sql $db_url_staging
%sql $db_url_reporting

In [None]:
%%sql $db_url_staging
ATTACH DATABASE :db_path_stg_bielefeld  AS bielefeld;
ATTACH DATABASE :db_path_stg_mannheim   AS mannheim;
ATTACH DATABASE :db_path_stg_dimensions AS dimensions;
ATTACH DATABASE :db_path_reporting      AS reporting;

In [None]:
# Dimension D_FALLART erzeugen (wird für die Generierung der Tabelle F_FAELLE benötigt)
%%sql $db_url_staging
DROP TABLE IF EXISTS reporting.d_fallart;

CREATE TABLE reporting.d_fallart (
  fallart_id INTEGER PRIMARY KEY AUTOINCREMENT,
  fallart_name VARCHAR(50)
);

INSERT INTO reporting.d_fallart (fallart_name)

SELECT DISTINCT
       fallart AS fallart_name
  FROM bielefeld.faelle

UNION

SELECT DISTINCT
       encounterclass AS fallart_name
  FROM mannheim.encounters;

In [None]:
# Tabelle F_FAELLE erzeugen (wird für die Generierung weiterer Faktentabellen benötigt)
%%sql $db_url_staging
DROP TABLE IF EXISTS reporting.f_faelle;

CREATE TABLE reporting.f_faelle (
  fall_id                 INTEGER,
  patient_id              INTEGER,
  einrichtung_id          INTEGER,
  aufnahme_datum          VARCHAR(10),
  aufnahme_datumzeit      VARCHAR(18),
  entlass_datum           VARCHAR(10),
  entlass_datumzeit       VARCHAR(18),
  fallart_id              INTEGER,
  aufnahmegrund_id        VARCHAR(20),
  hauptdiagnose_snomed_id VARCHAR(20),
  aufnahmealter_jahre     FLOAT,
  liegedauer_tage         FLOAT,
  erloes_fallpauschale    FLOAT
);

INSERT INTO reporting.f_faelle (fall_id, patient_id, einrichtung_id, aufnahme_datum, aufnahme_datumzeit, entlass_datum, entlass_datumzeit, fallart_id, aufnahmegrund_id,
                                hauptdiagnose_snomed_id, aufnahmealter_jahre, liegedauer_tage, erloes_fallpauschale)

SELECT 'B-' || fal.fall_id                 AS fall_id,
       'B-' || fal.patient_id              AS patient_id,
       'B-' || fal.einrichtung_id          AS einrichtung_id,
       SUBSTR(fal.aufnahmedatum, 1, 10)    AS aufnahme_datum,
       fal.aufnahmedatum                   AS aufnahme_datumzeit,
       SUBSTR(fal.entlassdatum, 1, 10)     AS entlass_datum,
       fal.entlassdatum                    AS entlass_datumzeit,
       frt.fallart_id,
       fal.aufnahmegrund                   AS aufnahmegrund_id,
       fal.hauptdiagnose                   AS hauptdiagnose_snomed_id,
       (JULIANDAY(fal.aufnahmedatum) - JULIANDAY(pat.geburtsdatum))/365 AS aufnahmealter_jahre,
       JULIANDAY(fal.entlassdatum)     - JULIANDAY(fal.aufnahmedatum)   AS liegedauer_tage,
       fal.erloes_fallpauschale
  FROM bielefeld.faelle    fal
  JOIN bielefeld.patienten pat ON fal.patient_id = pat.patient_id
  JOIN reporting.d_fallart frt ON fal.fallart    = frt.fallart_name

UNION

SELECT 'M-' || enc.id                      AS fall_id,
       'M-' || enc.patient_id              AS patient_id,
       'M-' || enc.organization_id         AS einrichtung_id,
       SUBSTR(enc.start, 1, 10)            AS aufnahme_datum,
       enc.start                           AS aufnahme_datumzeit,
       SUBSTR(enc.stop, 1, 10)             AS entlass_datum,
       enc.stop                            AS entlass_datumzeit,
       frt.fallart_id,
       enc.code                            AS aufnahmegrund_id,
       enc.reasoncode                      AS hauptdiagnose_snomed_id,
       (JULIANDAY(enc.start) - JULIANDAY(pat.birthdate))/365 AS aufnahmealter_jahre,
       JULIANDAY(stop) - JULIANDAY(start)  AS liegedauer_tage,
       enc.base_encounter_cost             AS erloes_fallpauschale
  FROM mannheim.encounters enc
  JOIN mannheim.patients   pat ON enc.patient_id     = pat.id
  JOIN reporting.d_fallart frt ON enc.encounterclass = frt.fallart_name
;

In [None]:
# Tabelle F_DIAGNOSEN anlegen (Nutzung für Abfragen)
%%sql $db_url_staging
DROP TABLE IF EXISTS reporting.f_diagnosen;

CREATE TABLE reporting.f_diagnosen (
  fall_id                 INTEGER,
  patient_id              INTEGER,
  einrichtung_id          INTEGER,
  aufnahme_datum          VARCHAR(10),
  aufnahme_datumzeit TEXT VARCHAR(18),
  entlass_datum           VARCHAR(10),
  entlass_datumzeit TEXT  VARCHAR(18),
  fallart_id              INTEGER,
  aufnahmegrund_id        VARCHAR(20),
  hauptdiagnose_snomed_id VARCHAR(20),
  diagnose_datum          VARCHAR(10),
  diagnose_snomed_id      VARCHAR(20),
  aufnahmealter_jahre     FLOAT,
  liegedauer_tage         FLOAT,
  erloes_fallpauschale    FLOAT
);

INSERT INTO reporting.f_diagnosen (fall_id, patient_id, einrichtung_id, aufnahme_datum, aufnahme_datumzeit, entlass_datum, entlass_datumzeit, fallart_id, aufnahmegrund_id,
                                   hauptdiagnose_snomed_id, diagnose_datum, diagnose_snomed_id, aufnahmealter_jahre, liegedauer_tage, erloes_fallpauschale)

SELECT fal.fall_id,
       fal.patient_id,
       fal.einrichtung_id,
       fal.aufnahme_datum,
       fal.aufnahme_datumzeit,
       fal.entlass_datum,
       fal.entlass_datumzeit,
       fal.fallart_id,
       fal.aufnahmegrund_id,
       fal.hauptdiagnose_snomed_id,
       dia.diagnose_datum,
       dia.diagnose_code           AS diagnose_snomed_id,
       fal.aufnahmealter_jahre,
       fal.liegedauer_tage,
       fal.erloes_fallpauschale
  FROM reporting.f_faelle fal
  JOIN bielefeld.diagnosen dia ON fal.fall_id = 'B-' || dia.fall_id

UNION

SELECT fal.fall_id,
       fal.patient_id,
       fal.einrichtung_id,
       fal.aufnahme_datum,
       fal.aufnahme_datumzeit,
       fal.entlass_datum,
       fal.entlass_datumzeit,
       fal.fallart_id,
       fal.aufnahmegrund_id,
       fal.hauptdiagnose_snomed_id,
       cnd.start                   AS diagnose_datum,
       cnd.code                    AS diagnose_snomed_id,
       fal.aufnahmealter_jahre,
       fal.liegedauer_tage,
       fal.erloes_fallpauschale
  FROM reporting.f_faelle  fal
  JOIN mannheim.conditions cnd ON fal.fall_id = 'M-' || cnd.encounter_id

In [None]:
# Indizes anlegen
%%sql $db_url_reporting
CREATE UNIQUE INDEX IF NOT EXISTS ix_fal_fall_id ON f_faelle (fall_id);
CREATE INDEX IF NOT EXISTS ix_fal_patient_id ON f_faelle (patient_id);
CREATE INDEX IF NOT EXISTS ix_fal_aufnahme_datum ON f_faelle (aufnahme_datum);
CREATE INDEX IF NOT EXISTS ix_fal_entlass_datum ON f_faelle (entlass_datum);
CREATE INDEX IF NOT EXISTS ix_fal_fallart_id ON f_faelle (fallart_id);
CREATE INDEX IF NOT EXISTS ix_fal_aufnahmegrund_id ON f_faelle (aufnahmegrund_id);
CREATE INDEX IF NOT EXISTS ix_fal_hauptdiagnose_snomed_id ON f_faelle (hauptdiagnose_snomed_id);
CREATE INDEX IF NOT EXISTS ix_dia_fall_id                 ON f_diagnosen (fall_id);
CREATE INDEX IF NOT EXISTS ix_dia_patient_id              ON f_diagnosen (patient_id);
CREATE INDEX IF NOT EXISTS ix_dia_aufnahme_datum          ON f_diagnosen (aufnahme_datum);
CREATE INDEX IF NOT EXISTS ix_dia_entlass_datum           ON f_diagnosen (entlass_datum);
CREATE INDEX IF NOT EXISTS ix_dia_fallart_id              ON f_diagnosen (fallart_id);
CREATE INDEX IF NOT EXISTS ix_dia_aufnahmegrund_id        ON f_diagnosen (aufnahmegrund_id);
CREATE INDEX IF NOT EXISTS ix_dia_hauptdiagnose_snomed_id ON f_diagnosen (hauptdiagnose_snomed_id);
CREATE INDEX IF NOT EXISTS ix_dia_diagnose_snomed_id      ON f_diagnosen (diagnose_snomed_id);
CREATE INDEX IF NOT EXISTS ix_dia_diagnose_datum          ON f_diagnosen (diagnose_datum);

## Aufgabe: Faktentabelle F_PROZEDUREN generieren

Prozeduren sind Maßnahmen, die im Rahmen eines Behandlungsfalls an den Patient:innen durchgeführt werden. Sie werden u.a. für die Abrechnung dokumentiert, wobei in Deutschland der *Operationen- und Prozedurenschlüssel (OPS)*, in den hier verwendeten Synthea-Daten aber die SNOMED CT-Nomenklatur eingesetzt wird. Neben der zu dokumentierenden Maßnahme können ggf. noch Zeitpunkte wie z.B. der Beginn & Ende einer Operation erfasst werden.

Wie zuvor ergänzen wir die unmittelbar zur Prozedur erfassten Merkmale (z.B. Prozedurencode, Zeitpunkt) mit den allgemeinen Angaben zum Behandlungsfall, um beide in einem Star Schema (mit nur einer Faktentabelle) abbilden zu können. Wir stellen dabei die *dimensionalen Spalten* an den Beginn der Tabelle und die *Kennzahlenspalten* ans Ende.

### 1. Zieltabelle anlegen

Hierzu legen wir zunächst mit einem CREATE-Table die Tabelle in der Reporting-Datenbank (leer) an.

### 2. Daten aus den Quelltabellen auslesen & in Zieltabelle schreiben

Auch hier sollen die fallbezogenen Daten wieder aus der bereits generierten Tabelle `F_FAELLE`übernommen werden. Beim JOIN mit den Prozeduren-Tabellen des Bielefelder bzw. Mannheimer Datensatzes muss entsprechend wieder auf das Qualifizieren der Fall-IDs mit dem B-/M-Präfix geachtet werden.

Um die Dauer von Prozeduren in Stunden auszurechnen, muss wie für die Falldauer in der `F_FAELLE`-Tabelle die `JULIANDAY()`-Funktion von SQLite verwendet werden. Sie konvertiert eine textuelle Datumsangabe in eine Form, die eine Subtraktion der Zeitstempel voneinander auf Basis von Tagen ermöglicht. Für die Umwandlung in Stunden muss das Ergebnis mit 24 multipliziert werden.

### 3. Indizes ergänzen

Auch für diese Tabelle sollten Indizes auf Spalten angelegt werden, die als Primär- oder Fremschlüssel für den Join mit anderen Tabellen bzw. als Selektionskriterien verwendet werden.





### 4. Ergebniskontrolle

Anschließend können wir mit einer `SELECT`-Abfrage überprüfen, ob die Tabelle korrekt erstellt wurde.
