# Der Höhepunkt: SQL, Spark-Tables, Reporting ...

* SQL Abfragen auf Dataframes anwenden
* Mit Spark Datenbanken & Tabellen arbeiten
* User-Defined Functions benutzen
* Datenmegen joinen
* Testdaten erzeugen, welche zwei Geschäftsprozesse aus der Versicherungswelt abbilden
* Die ersten (und wesentlichsten) Schritte für ein sinnvolles Monitoring dieser Prozesse gehen


## Wie immer eine Spark-Application registieren

In [1]:
import findspark
findspark.init()
findspark.find()

'/usr/local/lib/python3.10/dist-packages/pyspark'

In [2]:
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

spark = (
    SparkSession
        .builder
        .appName("sql")
        .config("spark.dynamicAllocation.enabled",False)
        .config("spark.sql.adaptive.enabled",False)
        .master("local[4]")
        .getOrCreate()
)
spark

23/09/10 09:18:24 WARN Utils: Your hostname, keen-northcutt resolves to a loopback address: 127.0.1.1; using 116.203.107.225 instead (on interface eth0)
23/09/10 09:18:24 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/09/10 09:18:26 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
23/09/10 09:18:27 WARN Utils: Service 'SparkUI' could not bind on port 4040. Attempting port 4041.
23/09/10 09:18:27 WARN Utils: Service 'SparkUI' could not bind on port 4041. Attempting port 4042.
23/09/10 09:18:27 WARN Utils: Service 'SparkUI' could not bind on port 4042. Attempting port 4043.
23/09/10 09:18:27 WARN Utils: Service 'SparkUI' could not bind on port 4043. Attempting port 4044.


## Nun Testdaten erzeugen
Hierzu haben wir ein Beispiel aus der Versicherungswelt vorbereiten. Vielleicht ist es dem in der Signal sogar ein bisschen ähnlich ;-)


![](architecture-level-01.dio.svg)

In [3]:
import generate_test_data
generate_test_data.main(1000) # führe das nur _einmal_ aus!

Wir sehen nun auf im Dateibrowser eine Liste von Dateien, die überwiegend fachliche Events enthalten, die zwei Geschäftsprozesse abbilden:

### Ein Kunden bekommt einen neuen Vertrag

Die Systeme, die die Events schreiben liegen in der Verantwortung verschiedener Teams:

#### Team Antrag
 * **Antrag Erzeugt:** Der Kunde benutzt über das Web ein System, um einen Antrag auf einen Versicherungsvertrag zu stellen. Dieses System validert schon ein wenig, versucht einen schon bestehenden Kunden im Partner-Systeme zu finden und veröffentlicht im Anschluss dieses Business-Event.
 * **AB-Test:** Das Team Antrag ist stehst bemüht die User-Experience zu optimieren. So möchte es zum Beispiel die Durchlaufzeit auf seinem eigenen System zu verkürzen. Dazu variert es z.B. die Farbe eines Knopfes. Wenn bei einem Antrag die Standardfarbe des Knopfes abweicht, veröffentlicht es dieses technische Event
 * **Kunde hat Angebot aktzeptiert:** Nachdem das Vertragssystem sein grünes Licht gegeben hat, wurde der Vertrag vom Kunden auch akzeptiert.
 * **Kunde hat Angebot abgelehnt:** Wie vorhergehendes Event nur der Kunde will das Angebot nicht annehmen.
 
#### Team Vertrag
 * **Antrag abgelehnt**: Ein Antrag kann vom Vertragssystem abgelehnt werden. Dieses fragt z.B. noch bei Fraud an und validert auch sonst den Antrag viel "besser" als es das Antragssystem könnte.
 * **Kunde angelegt**: (eigentlich müsste ein anderes System dieses Event schreiben, aber in unserem Beispiel soll das mal OK sein) Wenn in einem Antrag nicht schon eine Referenz auf einen Kunden vorhanden ist, dann legt das Vertragssystem einen Kunden an und informiert Gott und die Welt hierüber, indem es dieses Event veröffentlicht.
 * **Vertrag angeboten**: Wenn ein Vertragsantrag aus Sicht des Vertragssystems gut aussieht, dann gibt es den Vertrag frei zur Annahme durch den Kunden. 
 * **Vertrag policiert**: Wenn der Kunde ein Vertragsangebot akzeptiert hat, policieren wir den Vertrag. Nun ist der Kunde versichert. Jetzt können bedenkenlos Schadensfälle eintreten.
 
#### Team Schaden
 * **Schaden reguliert**: wenn ein Schadensfall geprüft wurde und alles passt, dann überweisen wir Geld

#### Übrigens
**kunden.csv** ist nur ein Container mit allen Kunden ;-)



## Erste Blicke in unsere Testdaten

In [None]:
kunden_df = spark.read.option("header", True).option("inferSchema", True).csv("kunden.csv")
kunden_df.printSchema()
kunden_df.show(4, truncate=False)

root
 |-- id: string (nullable = true)
 |-- name: string (nullable = true)

+------------------------------------+--------------+
|id                                  |name          |
+------------------------------------+--------------+
|ce4cf720-4a1a-11ee-8349-7b31dfcc7252|gorzala       |
|d55a2ede-4a1a-11ee-8cce-b739d8c6e256|Olli          |
|1dbdbea0-814b-46ee-88a5-a92e30231a18|Relaxed Cray  |
|8b5d9978-c30a-4b32-85cb-8dd901c822bd|Pedantic Ellis|
+------------------------------------+--------------+
only showing top 4 rows



Soweit so bekannt. Wir könnten jetzt auf die Daten zugreifen in dem wir wie in den vorhergegangen Kapiteln filter anwenden, Null-Werte entfernen usw.
Wir möchten aber ab sofort ganz normal mit SQL auf unsere Daten zugreifen. Dazu müssen wir zu einem Dataframe in Spark einen _View_ anlegen. Diese Views werden dann ganz normal wie Datenbanktabellen in SQL-Datenbanken behandelt (read-only).

In [62]:
kunden_df.createOrReplaceTempView("kunden")

**Beachte:** die Funktion gibt nichts zurück. In der Spark-Application ist jetzt eine eine Tabelle unter dem Namen "kunden" verfügbar!

In [63]:
spark.sql("SELECT * FROM kunden")

DataFrame[id: string, name: string]

Wir sehen das wir ein Dataframe zurückbekommen. Auf diesem können wir dann wieder die bekannten Operationen ausführen. Beachte, ob Du die Dateframe-API oder die SQL-API nutzt, hat in der Regel **keinen** Impact auf die Performance!

**Übung** setzte hier mal ein paar SQLs gegen diese Tabelle ab. Filter nach bestimmten Namen... 

In [64]:
spark.sql('SELECT * FROM kunden').show()

+--------------------+--------------------+
|                  id|                name|
+--------------------+--------------------+
|ce4cf720-4a1a-11e...|             gorzala|
|d55a2ede-4a1a-11e...|                Olli|
|1dbdbea0-814b-46e...|        Relaxed Cray|
|8b5d9978-c30a-4b3...|      Pedantic Ellis|
|0232746d-2025-4dc...|  Mystifying Solomon|
|331c4f94-82ad-443...|          Elastic Wu|
|f783150d-95d6-425...|       Hardcore Cori|
|89b2f281-5743-42b...|   Inspiring Solomon|
|12c4de06-eb24-4b4...|Competent Montalcini|
|129d91bf-35b6-44b...|        Jovial Rubin|
|bc5967d0-2077-4d6...|            Zen Pike|
|4b9d040f-8432-429...|   Objective Goodall|
|14f2a015-c9e9-4bb...|   Distracted Bouman|
|23ac28d5-0247-4e9...|   Mystifying Agnesi|
|05a0c2f2-5701-432...|   Nostalgic Pasteur|
|cab57827-f0bf-497...|    Awesome Poincare|
|945cec5c-a952-4a1...|      Sleepy Hermann|
|da324d84-5b3d-400...|  Epic Proskuriakova|
|51249084-0897-4b6...|      Charming Gauss|
|d4a3bf2f-66aa-485...|      Unru

## Alle Testdaten importieren
und views anlegen

In [65]:
# AB-test
ab_test_schema = "AntragsId STRING"
ab_test_df = spark.read.option("header", True).schema(ab_test_schema).csv("ab_test.csv")
ab_test_df.createOrReplaceTempView("ab_test")

# Antrag Abgelehnt
antrag_abgelehnt_schema = "AntragsId STRING, KundenId STRING, TimeStamp TIMESTAMP, Grund STRING"
antrag_abgelehnt_df = spark.read.option("header", True).schema(antrag_abgelehnt_schema).csv("antrag_abgelehnt.csv")
antrag_abgelehnt_df.createOrReplaceTempView("antrag_abgelehnt")

# Antrag Erzeugt
antrag_erzeugt_schema = "AntragsId STRING, StartZeit TIMESTAMP, EndZeit TIMESTAMP, KundenId STRING"
antrag_erzeugt_df = spark.read.option("header", True).schema(antrag_erzeugt_schema).csv("antrag_erzeugt.csv")
antrag_erzeugt_df.createOrReplaceTempView("antrag_erzeugt")

# Kunde Angelegt
kunde_angelegt_schema = "KundenId STRING, TimeStamp TIMESTAMP"
kunde_angelegt_df = spark.read.option("header", True).schema(kunde_angelegt_schema).csv("kunde_angelegt.csv")
kunde_angelegt_df.createOrReplaceTempView("kunde_angelegt")

# Kunde hat Angebot Abgelehnt
kunde_hat_angebot_abgelehnt_schema = "VertragsId STRING, AntragsId STRING, Grund STRING, TimeStamp TIMESTAMP"
kunde_hat_angebot_abgelehnt_df = spark.read.option("header", True).schema(kunde_hat_angebot_abgelehnt_schema).csv("kunde_hat_angebot_abgelehnt.csv")
kunde_hat_angebot_abgelehnt_df .createOrReplaceTempView("kunde_hat_angebot_abgelehnt")

# Kunde hat Angebot Akzeptiert
kunde_hat_angebot_akzeptiert_schema = "VertragsId STRING, AntragsId STRING, KundenId STRING, TimeStamp TIMESTAMP"
kunde_hat_angebot_akzeptiert_df = spark.read.option("header", True).schema(kunde_hat_angebot_akzeptiert_schema).csv("kunde_hat_angebot_akzeptiert.csv")
kunde_hat_angebot_akzeptiert_df.createOrReplaceTempView("kunde_hat_angebot_akzeptiert")

# Schaden Gemeldet
schaden_gemeldet_schema = "VertragsId STRING, SchadensId STRING, Schadenshoehe INT, TimeStamp TIMESTAMP"
schaden_gemeldet_df = spark.read.option("header", True).schema(schaden_gemeldet_schema).csv("schaden_gemeldet.csv")
schaden_gemeldet_df.createOrReplaceTempView("schaden_gemeldet")

# Schaden Reguliert
schaden_reguliert_schema = "SchadensId STRING, TimeStamp TIMESTAMP"
schaden_reguliert_df = spark.read.option("header", True).schema(schaden_reguliert_schema).csv("schaden_reguliert.csv")
schaden_reguliert_df.createOrReplaceTempView("schaden_reguliert")

# Vertrag Angeboten
vertrag_angeboten_schema = "VertragsId STRING, AntragsId STRING, KundenId STRING, TimeStamp TIMESTAMP"
vertrag_angeboten_df = spark.read.option("header", True).schema(vertrag_angeboten_schema).csv("vertrag_angeboten.csv")
vertrag_angeboten_df.createOrReplaceTempView("vertrag_angeboten")

# Vertrag Policiert
vertrag_policiert_schema = "VertragsId STRING, TimeStamp TIMESTAMP"
vertrag_policiert_df = spark.read.option("header", True).schema(vertrag_policiert_schema).csv("vertrag_policiert.csv")
vertrag_policiert_df.createOrReplaceTempView("vertrag_policiert")

In [66]:
# alles da?
spark.sql("SHOW TABLES").show(truncate=False)

+---------+----------------------------+-----------+
|namespace|tableName                   |isTemporary|
+---------+----------------------------+-----------+
|default  |mytesttable2                |false      |
|         |ab_test                     |true       |
|         |antrag_abgelehnt            |true       |
|         |antrag_erzeugt              |true       |
|         |kunde_angelegt              |true       |
|         |kunde_hat_angebot_abgelehnt |true       |
|         |kunde_hat_angebot_akzeptiert|true       |
|         |kunden                      |true       |
|         |schaden_gemeldet            |true       |
|         |schaden_reguliert           |true       |
|         |vertrag_angeboten           |true       |
|         |vertrag_policiert           |true       |
+---------+----------------------------+-----------+



In [12]:
# Stichprobe bzgl. der Datentypen?
spark.sql("DESCRIBE TABLE EXTENDED schaden_gemeldet").show()

+-------------+---------+-------+
|     col_name|data_type|comment|
+-------------+---------+-------+
|    VertagsId|   string|   null|
|   SchadensId|   string|   null|
|Schadenshoehe|      int|   null|
|    TimeStamp|timestamp|   null|
+-------------+---------+-------+



## Ein erstes Reporting

In [13]:
# Zeige alle Anträge an, bei den wir die Farbe des Knopfs verändert haben
# erstmal alle Anträge

spark.sql("SELECT count(1) FROM antrag_erzeugt").show()

# Und nun die wo wir den Knopf verändert haben
spark.sql("""
     SELECT count(1)
       FROM antrag_erzeugt
 INNER JOIN ab_test
         ON antrag_erzeugt.AntragsId = ab_test.AntragsId
""").show()

+--------+
|count(1)|
+--------+
|    1000|
+--------+

+--------+
|count(1)|
+--------+
|     119|
+--------+



**Aufgabe** Erzeuge eine Abfrage, die folgendes berechnet: Durchlaufzeit für den normalen Fall (Knopffarbe hat sich nicht geändert) und für den Fall Knopffarbe hat sich gerändert.

Wenn die das zu schwer ist, fange an mit einer Abfrage für den normalen Fall und eine für den Farbewechselfall.

## Spark Tables

Bisher haben wir gesehen, wie wir CSV Dateien laden und speichern können.
Dabei mussten wir allerdings immer ein Schema angeben.
Das ist auf Dauer lästig und mindestens aufwendig.


![](spark-catalog.dio.svg)

In [14]:
# beispiel mit dem Kunde angelegt Event
kunde_angelegt_df.printSchema()

root
 |-- KundenId: string (nullable = true)
 |-- TimeStamp: timestamp (nullable = true)



In [16]:
kunde_angelegt_df.write.mode("overwrite").saveAsTable("myTestTable2")

                                                                                

In [17]:
kunde_angelegt_df = spark.read.table("myTestTable2")

In [18]:
kunde_angelegt_df.printSchema()

root
 |-- KundenId: string (nullable = true)
 |-- TimeStamp: timestamp (nullable = true)



wir können aber auch direkt mit SQL aus einer Table lesen

In [22]:
spark.sql("SELECT * FROM myTestTable2")

DataFrame[KundenId: string, TimeStamp: timestamp]

### Typen von Catalogs


* **In-memory Catalog** Wenn eine Spark-Sitzung ended, wird auch der Catalog aufgeräumt
* **Persistent Catalog** Metadaten werden permanent gespeichert, Spark hat einen [Hive](https://hive.apache.org/)-basierten Catalog eingebaut.

### Typen von Tables

* **Managed Tables** Spark kümmert sich um den Lebenszyklus. Es werden Daten und Schemata verwaltet. Nützlich für staging Daten. Wenn eine Tabelle gelöscht wird, dann werden Daten und Schemata gelöscht.
* **Unmanged Tables/External Tables** Das Schema wird auch hier von Spark gemanaged, aber die Daten in einer frei wählbaren location. Löschen einer Tabelle löscht hier nur das Schema. Nützlich für das Ergebniss von ETL Pipelines

In [24]:
import findspark
findspark.init()

from IPython.display import *
display(HTML("<style>pre { white-space: pre !important; }</style>"))

from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

spark = (
    SparkSession
        .builder
        .appName("SparkTablesApp")
        .master("local[4]")
        .config("spark.dynamicAllocation.enabled", "false")
        .config("spark.sql.adaptive.enabled", "false")
        
        .enableHiveSupport() # <---------------------------------
    
        .getOrCreate()
)

sc = spark.sparkContext
spark

23/09/04 14:42:17 WARN SparkSession: Using an existing Spark session; only runtime SQL configurations will take effect.


### Nun eine Datenbank in Hive erstellen

In [25]:
spark.sql("""
  SHOW DATABASES
  """).show()

+---------+
|namespace|
+---------+
|  default|
+---------+



In [26]:
spark.sql("""
    CREATE DATABASE IF NOT EXISTS Foo
""").show()

++
||
++
++



In [27]:
spark.sql("""
    SHOW DATABASES
""").show()

+---------+
|namespace|
+---------+
|  default|
|      foo|
+---------+



In [28]:
kunde_angelegt_df.write.mode("overwrite").saveAsTable("foo.bar")
# wurde als managed table abgespeichert, da keine externe location angegeben wurde

In [30]:
spark.sql("""
    SHOW TABLES in default
""").show(truncate=False)

+---------+----------------------------+-----------+
|namespace|tableName                   |isTemporary|
+---------+----------------------------+-----------+
|default  |mytesttable2                |false      |
|         |ab_test                     |true       |
|         |antrag_abgelehnt            |true       |
|         |antrag_erzeugt              |true       |
|         |kunde_angelegt              |true       |
|         |kunde_hat_angebot_abgelehnt |true       |
|         |kunde_hat_angebot_akzeptiert|true       |
|         |kunden                      |true       |
|         |schaden_gemeldet            |true       |
|         |schaden_reguliert           |true       |
|         |vertrag_angeboten           |true       |
|         |vertrag_policiert           |true       |
+---------+----------------------------+-----------+



In [31]:
spark.sql("""
SELECT * from foo.bar
""").show()

+--------------------+-------------------+
|            KundenId|          TimeStamp|
+--------------------+-------------------+
|cab57827-f0bf-497...|2023-09-17 07:33:51|
|7962496d-8b1a-468...|2023-09-01 16:48:04|
|f75bc42b-cc5f-457...|2023-09-16 08:06:43|
|bcd595a1-7417-48a...|2023-09-02 03:55:48|
|631ec411-4379-4ac...|2023-09-16 11:48:36|
|4c94784d-27c8-4c3...|2023-09-11 19:40:10|
|e4a9b53a-6590-4d6...|2023-09-14 06:03:20|
|18666e62-c367-410...|2023-09-17 19:06:53|
|2b969628-a210-4df...|2023-09-07 07:07:48|
|c05e8ec3-ce7d-43f...|2023-09-12 04:17:26|
|2ecf7929-add0-47b...|2023-09-15 17:55:03|
|217502fb-7070-4eb...|2023-09-08 03:47:06|
|e21ac097-dff6-438...|2023-09-12 20:06:39|
|44b04de1-40ff-424...|2023-09-07 16:38:51|
|f357e4f0-9248-4fc...|2023-09-11 02:32:09|
|f1726fb3-37aa-443...|2023-09-11 16:47:04|
|67c689a1-151a-4fd...|2023-09-17 04:59:07|
|5a04500d-5ad1-495...|2023-09-05 11:23:34|
|fb363d72-6531-426...|2023-09-07 23:39:07|
|3aa73b67-1086-42d...|2023-09-19 20:04:50|
+----------

In [32]:
# oder auch die Tabelle direkt in ein Dataframe lesen

tmp = spark.read.table("foo.bar")
tmp.printSchema()
tmp.show(2)

root
 |-- KundenId: string (nullable = true)
 |-- TimeStamp: timestamp (nullable = true)

+--------------------+-------------------+
|            KundenId|          TimeStamp|
+--------------------+-------------------+
|cab57827-f0bf-497...|2023-09-17 07:33:51|
|7962496d-8b1a-468...|2023-09-01 16:48:04|
+--------------------+-------------------+
only showing top 2 rows



In [33]:
# Nun schauen wir uns mal die Details der Tabelle an
spark.sql("""
    DESCRIBE TABLE EXTENDED foo.bar
""").show(truncate=False)

+----------------------------+----------------------------------------------------------------------+-------+
|col_name                    |data_type                                                             |comment|
+----------------------------+----------------------------------------------------------------------+-------+
|KundenId                    |string                                                                |null   |
|TimeStamp                   |timestamp                                                             |null   |
|                            |                                                                      |       |
|# Detailed Table Information|                                                                      |       |
|Catalog                     |spark_catalog                                                         |       |
|Database                    |foo                                                                   |       |
|Table    

In [34]:
# Nun eine UNMANAGED Table anlegen
(
    kunden_df
        .write
        .mode("overwrite")
        .option("path", "/home/pupil/spark-course/course/03-SQL/spark-warehouse/persistent/kunden.parquet")
        #.option("format", "csv") # defaults to parquet
        .saveAsTable("foo.bar")
)

In [35]:
spark.sql("""
 DESCRIBE TABLE EXTENDED foo.bar
""").show(truncate=False)

+----------------------------+---------------------------------------------------------------------------------------+-------+
|col_name                    |data_type                                                                              |comment|
+----------------------------+---------------------------------------------------------------------------------------+-------+
|id                          |string                                                                                 |null   |
|name                        |string                                                                                 |null   |
|                            |                                                                                       |       |
|# Detailed Table Information|                                                                                       |       |
|Catalog                     |spark_catalog                                                                    

In [36]:
spark.sql("""
DROP TABLE foo.bar
""").show(truncate=False)

++
||
++
++



im datei browser anschauen, dass die Dateien nicht wirklich gelöscht wurden
Nun mit SQL die Tabelle wieder herstellen


In [37]:
# Nun eine Table direkt aus parquet herstellen (parquet hat Schema eingebaut ;-) )
spark.sql("""
    CREATE TABLE  foo.bar
    USING PARQUET
    LOCATION "/home/pupil/spark-course/course/03-SQL/spark-warehouse/persistent/kunden.parquet"
""").show(truncate=False)

++
||
++
++



In [38]:
spark.sql("""
    SELECT * FROM foo.bar
""").show()

+--------------------+--------------------+
|                  id|                name|
+--------------------+--------------------+
|ce4cf720-4a1a-11e...|             gorzala|
|d55a2ede-4a1a-11e...|                Olli|
|1dbdbea0-814b-46e...|        Relaxed Cray|
|8b5d9978-c30a-4b3...|      Pedantic Ellis|
|0232746d-2025-4dc...|  Mystifying Solomon|
|331c4f94-82ad-443...|          Elastic Wu|
|f783150d-95d6-425...|       Hardcore Cori|
|89b2f281-5743-42b...|   Inspiring Solomon|
|12c4de06-eb24-4b4...|Competent Montalcini|
|129d91bf-35b6-44b...|        Jovial Rubin|
|bc5967d0-2077-4d6...|            Zen Pike|
|4b9d040f-8432-429...|   Objective Goodall|
|14f2a015-c9e9-4bb...|   Distracted Bouman|
|23ac28d5-0247-4e9...|   Mystifying Agnesi|
|05a0c2f2-5701-432...|   Nostalgic Pasteur|
|cab57827-f0bf-497...|    Awesome Poincare|
|945cec5c-a952-4a1...|      Sleepy Hermann|
|da324d84-5b3d-400...|  Epic Proskuriakova|
|51249084-0897-4b6...|      Charming Gauss|
|d4a3bf2f-66aa-485...|      Unru

**Take Away** Spark Tables machen die Entwicklung schneller und einfacher

## Spark User Defined Functions

Spark stellt dir zwar eine Menge an Funktionen zur Datenmanipulation zur Verfügung, aber machnchmal fehlt doch etwas.
Aus diesem Grund gibt es User Defined Funktions.

In [39]:
spark.sql("""
  SELECT * FROM antrag_erzeugt
""").show(truncate=False)


+------------------------------------+-------------------+-------------------+------------------------------------+
|AntragsId                           |StartZeit          |EndZeit            |KundenId                            |
+------------------------------------+-------------------+-------------------+------------------------------------+
|ebd66d59-d3f9-484c-8e80-45ce16168b6c|2023-09-05 02:47:54|2023-09-05 02:53:54|1dbdbea0-814b-46ee-88a5-a92e30231a18|
|27b66f96-9c4c-4c34-997a-ccf09933f20c|2023-09-04 13:47:39|2023-09-04 13:56:39|8b5d9978-c30a-4b32-85cb-8dd901c822bd|
|f2c2e697-546d-4b0b-9716-771dbb92e832|2023-09-13 19:21:42|2023-09-13 19:28:42|0232746d-2025-4dc7-a9b6-94eaa90fb5d4|
|19f45c99-84ae-444c-a7c0-831e511aa974|2023-09-14 17:50:22|2023-09-14 17:57:22|331c4f94-82ad-4435-bb75-6e9875b1aa6a|
|7da30a98-d3db-42a4-8fb2-21b94cb58f65|2023-09-08 01:13:28|2023-09-08 01:19:28|f783150d-95d6-4257-ba00-04c34dca2cdb|
|043ef706-9b16-4930-8113-19596c2f62ed|2023-09-05 05:25:10|2023-09-05 05:

Wir möchten hier zum Beispiel die Zeitauer als eigene Spalte haben, die der Kunde auf der Antragsstrecke verbracht hat. Das würde zwar auch mit SQL gehen, aber soll hier als Beispiel dienen.

**Beachte**: Für Spark sind UDFs Blackboxen und Spark wendet deswegen keinerlei Optimierungen auf deren Anwendung an.

In [41]:
# meine erste UDF
def duration(first_ts, second_ts):
    delta = second_ts - first_ts
    return int(delta.total_seconds())

In [42]:
duration_udf = udf( lambda first, second: duration(first,second), IntegerType()) 

### Anwenden in Dataframe-Code

In [43]:
antrag_erzeugt_df.select(
    "*",
    duration_udf(col("StartZeit"), col("EndZeit")).alias("Duration")
).show(5, truncate=False)

[Stage 24:>                                                         (0 + 1) / 1]

+------------------------------------+-------------------+-------------------+------------------------------------+--------+
|AntragsId                           |StartZeit          |EndZeit            |KundenId                            |Duration|
+------------------------------------+-------------------+-------------------+------------------------------------+--------+
|ebd66d59-d3f9-484c-8e80-45ce16168b6c|2023-09-05 02:47:54|2023-09-05 02:53:54|1dbdbea0-814b-46ee-88a5-a92e30231a18|360     |
|27b66f96-9c4c-4c34-997a-ccf09933f20c|2023-09-04 13:47:39|2023-09-04 13:56:39|8b5d9978-c30a-4b32-85cb-8dd901c822bd|540     |
|f2c2e697-546d-4b0b-9716-771dbb92e832|2023-09-13 19:21:42|2023-09-13 19:28:42|0232746d-2025-4dc7-a9b6-94eaa90fb5d4|420     |
|19f45c99-84ae-444c-a7c0-831e511aa974|2023-09-14 17:50:22|2023-09-14 17:57:22|331c4f94-82ad-4435-bb75-6e9875b1aa6a|420     |
|7da30a98-d3db-42a4-8fb2-21b94cb58f65|2023-09-08 01:13:28|2023-09-08 01:19:28|f783150d-95d6-4257-ba00-04c34dca2cdb|360     |


                                                                                

### Anwenden in SQL

In [44]:
# für spark sql registrieren
spark.udf.register("duration", duration, IntegerType())

<function __main__.duration(first_ts, second_ts)>

In [45]:
spark.sql("""
    SELECT 
        *,
        duration(StartZeit, Endzeit) AS Foo
    FROM antrag_erzeugt
""").show(4, truncate=False)

+------------------------------------+-------------------+-------------------+------------------------------------+---+
|AntragsId                           |StartZeit          |EndZeit            |KundenId                            |Foo|
+------------------------------------+-------------------+-------------------+------------------------------------+---+
|ebd66d59-d3f9-484c-8e80-45ce16168b6c|2023-09-05 02:47:54|2023-09-05 02:53:54|1dbdbea0-814b-46ee-88a5-a92e30231a18|360|
|27b66f96-9c4c-4c34-997a-ccf09933f20c|2023-09-04 13:47:39|2023-09-04 13:56:39|8b5d9978-c30a-4b32-85cb-8dd901c822bd|540|
|f2c2e697-546d-4b0b-9716-771dbb92e832|2023-09-13 19:21:42|2023-09-13 19:28:42|0232746d-2025-4dc7-a9b6-94eaa90fb5d4|420|
|19f45c99-84ae-444c-a7c0-831e511aa974|2023-09-14 17:50:22|2023-09-14 17:57:22|331c4f94-82ad-4435-bb75-6e9875b1aa6a|420|
+------------------------------------+-------------------+-------------------+------------------------------------+---+
only showing top 4 rows



### Übung, schreibe eine UDF, die die Dauer auf einen Ampelwert abbildet

* alles unter 200 Sekunden "grün"
* zwischen 200 und 500 "gelb"
* der rest rot

## Joins

![](joins.dio.svg)

Spark unterstützt die gängisten Joins

* Inner Join
* Left Outer Join
* Right Outer Join
* Full Outer Join

Nur in Python und Scala verfügbar:

* Cross Join
* Semi Join
* Anti Join

### Inner Join

![](inner-join.dio.svg)


Alle Anträge die in der Tabelle Antrag erzeugt und AB-Test drin sind.


In [46]:
spark.sql("""
    SELECT *,
           unix_timestamp(EndZeit) - unix_timestamp(StartZeit) AS foo
      FROM antrag_erzeugt
INNER JOIN ab_test 
        ON antrag_erzeugt.AntragsId = ab_test.AntragsId
""").show(truncate=False)

+------------------------------------+-------------------+-------------------+------------------------------------+------------------------------------+---+
|AntragsId                           |StartZeit          |EndZeit            |KundenId                            |AntragsId                           |foo|
+------------------------------------+-------------------+-------------------+------------------------------------+------------------------------------+---+
|a61d6dfa-486c-4142-a46d-a0bcb6766a01|2023-09-14 12:06:47|2023-09-14 12:10:47|12c4de06-eb24-4b4d-994d-c76f443da9fc|a61d6dfa-486c-4142-a46d-a0bcb6766a01|240|
|530be6cd-7bcf-4d68-95ad-c8cbe4617349|2023-09-19 02:16:47|2023-09-19 02:19:47|129d91bf-35b6-44bc-aa5a-02a2bb0c7380|530be6cd-7bcf-4d68-95ad-c8cbe4617349|180|
|e31ae0e3-7e86-49fb-bbaf-77345aa66edb|2023-09-05 01:30:46|2023-09-05 01:31:46|4b9d040f-8432-429c-b4e7-8ab1b1370536|e31ae0e3-7e86-49fb-bbaf-77345aa66edb|60 |
|43186a36-8130-4ffc-8311-8eebc34c65a3|2023-09-06 09:25:37|

In [47]:
spark.sql("""
    SELECT count(1)
      FROM antrag_erzeugt
INNER JOIN ab_test
        ON antrag_erzeugt.AntragsId = ab_test.AntragsId
""").show(truncate=False)

spark.sql("""
    SELECT count(1)
      FROM antrag_erzeugt""").show(truncate=False)

+--------+
|count(1)|
+--------+
|119     |
+--------+

+--------+
|count(1)|
+--------+
|1000    |
+--------+



In [48]:
# geht auch mit Dataframe API

joined_df = (
    antrag_erzeugt_df
        .join(
            ab_test_df,
            antrag_erzeugt_df.AntragsId == ab_test_df.AntragsId,
            "inner" # left, leftouter, right, rightouter, full etc.
        )
)
joined_df.count()


119

### Set Operation

* Union All
* Union
* Intersect
* Except / Minus


## Aufgaben
### Identifziere Fälle in denen möglicherweise ein Event verloren geganen ist
Wenn der Kunde zum Beispiel ein Vertragsangebot akzeptiert, dann sollten wir mit dem policieren des Vertrags reagieren. Finde Fälle in denen das nicht passiert ist.
### Gebe Kunden mit auffallend hohen Schadenfällen aus
Überlege Dir was auffallend hoch heißen kann.
### Erzeuge eine Sicht pro Schadensfall mit folgenden Spalten

KundenId, VertragsId, AntragsEingangTS, PoliciertTS, SchadenFall, Reguliert(ja/nein) 


## Windows

In [56]:
spark.sql("""
  DESCRIBE schaden_gemeldet
""").show()

+-------------+---------+-------+
|     col_name|data_type|comment|
+-------------+---------+-------+
|   VertragsId|   string|   null|
|   SchadensId|   string|   null|
|Schadenshoehe|      int|   null|
|    TimeStamp|timestamp|   null|
+-------------+---------+-------+



In [71]:
spark.sql("""
    SELECT VertragsId, Count(1) AS count_cases 
      FROM schaden_gemeldet
  GROUP BY VertragsId
  ORDER BY count_cases DESC
""").show(truncate=False)




+------------------------------------+-----------+
|VertragsId                          |count_cases|
+------------------------------------+-----------+
|2f1927ab-65bb-4731-8880-07e38822caf6|7          |
|2c452600-d322-414b-a587-e326f105a296|7          |
|d2775ae1-95e1-49d5-9e62-6af51ed3968c|7          |
|43de383c-30ea-4d95-bcc6-b6f1245f9b63|6          |
|5ea252e0-392a-4e33-8a21-a40202fc3bbc|6          |
|7ca32486-0152-47cb-af7e-b40122041ee7|6          |
|4bd3d9ce-94ec-4d5e-81fc-9730784c27f2|6          |
|0ad6f771-30aa-4e16-8296-155d02b42b81|6          |
|f49d14b8-4fcb-4931-8ef7-20e71247cf53|6          |
|35930c72-1cf3-4860-a96c-486769019286|6          |
|ea4e424d-b5ed-4584-9417-1ea9e17c55d1|6          |
|41ab602b-069e-40e2-bb12-da5a976c4f06|6          |
|33e7093c-5992-4c76-ab94-1dffe576c35b|6          |
|3bbe040d-ccc6-44e3-8e07-b29cb90a9f34|6          |
|ad06fb71-c2c9-44e7-9b82-b68d43bbb043|6          |
|8a751f05-618d-49f5-b806-83ff1f278ee1|6          |
|45bd5814-13f2-42da-9434-d86319

                                                                                

In [None]:
spark.sql(
    """
    WITH total_stat as SELECT * FROM 
""")
