# Data Preperation

In [1]:
from pyspark.sql import DataFrameReader
from pyspark.sql import SparkSession
from pyspark.ml.feature import IndexToString, Normalizer, StringIndexer, VectorAssembler, VectorIndexer
from pyspark.ml.classification import DecisionTreeClassifier
from helpers.helper_functions import translate_to_file_string
from pyspark.sql.functions import col,lit,to_date
from pyspark.sql.functions import expr,when
from pyspark.ml.feature import Imputer
from pyspark.ml.feature import StringIndexer
from sklearn.impute import KNNImputer
from pyspark.sql.functions import rand, desc
from pyspark.ml.feature import StringIndexer
from pyspark.ml import Pipeline
from pyspark.ml.feature import MinHashLSH
from pyspark.ml.feature import BucketedRandomProjectionLSH
from pyspark.sql import Row
from pyspark.ml.linalg import Vectors
from pyspark.ml.feature import MinMaxScaler
from IPython.display import display, clear_output
import random
import time


# for pretty printing
def printDf(sprkDF): 
    newdf = sprkDF.toPandas()
    from IPython.display import display, HTML
    return HTML(newdf.to_html())

inputFile = translate_to_file_string("./data/RKI_COVID19_20210529.csv")

## Create Spark Session

In [2]:
#create a SparkSession
spark = (SparkSession
       .builder
       .appName("RKICOVID19PREPARATION")
       .getOrCreate())
# create a DataFrame using an ifered Schema 
df = spark.read.option("header", "true") \
       .option("inferSchema", "true") \
       .option("delimiter", ",") \
       .csv(inputFile)   
print(df.printSchema())

root
 |-- ObjectId: integer (nullable = true)
 |-- IdBundesland: integer (nullable = true)
 |-- Bundesland: string (nullable = true)
 |-- Landkreis: string (nullable = true)
 |-- Altersgruppe: string (nullable = true)
 |-- Geschlecht: string (nullable = true)
 |-- AnzahlFall: integer (nullable = true)
 |-- AnzahlTodesfall: integer (nullable = true)
 |-- Meldedatum: string (nullable = true)
 |-- IdLandkreis: integer (nullable = true)
 |-- Datenstand: string (nullable = true)
 |-- NeuerFall: integer (nullable = true)
 |-- NeuerTodesfall: integer (nullable = true)
 |-- Refdatum: string (nullable = true)
 |-- NeuGenesen: integer (nullable = true)
 |-- AnzahlGenesen: integer (nullable = true)
 |-- IstErkrankungsbeginn: integer (nullable = true)
 |-- Altersgruppe2: string (nullable = true)

None


### Überprüfung
Zur Überprüfung wird die Gesamtzahl der eingelesenen Instanzen ermittelt.

In [3]:
df.count()

2003106

## Datenreinigung
Nimmt das Feld NeuerFall den Wert -1 an, so ist er laut RKI "[...] nur in der Publikation des Vortags enthalten". Es handelt sich also um eine Korrektur der Puplikation des Vortages, die in der aktuellen Datenauswertung aus dem Dataframe herausgefiltert werden muss.

In [4]:
df = df.filter(df.NeuerFall > -1)

In [5]:
df.count()

2002544

## Datentransformation
In der Spalte AnzahlFall steht jeweils die Summe der Fälle. Um nun die verschiedenen Modelle trainieren zu können, muss nun die Aggration rückgängig gemacht werden. Das heißt für jeden Fall muss nun ein Record im DataFrame aufgenommen werden. Die neue Anzahl der Records muss der Anzahl der gemeldeten Fälle entsprechen. Darüber hinaus ist eine neue Spalte anzufügen. Die neue Spalte gibt an ob die Person genesen, gestorben oder keins von beiden ist. Die Spalten AnzahlFall, AnzahlTodesfall und AnzahlGenesen können anschließend wegfallen. Dies trifft ebenso auf die Felder NeuerFall, NeuerTodesfall und NeuGenesen zu.

### Vereinzelung
Für die Vereinzelung werden die Funktionen Explode und Array-Repeat verwendet. Nach der Anwendung der beiden Funktionen sollte die Anzahl der Datensätze, der Anzahl der gemeldeten Corona-Fälle entsprechen.

In [6]:
df = df.withColumn("AnzahlFall", expr("explode(array_repeat(AnzahlFall,int(AnzahlFall)))"))
df.count()

3675296

### Neue Spalte
Wie oben beschrieben wird in Folgendem eine neue Spalte 'FallStatus' angefügt. Diese dient zukünftig als Label.

In [7]:
df = df.withColumn("FallStatus", when(df.AnzahlGenesen > 0, "GENESEN")
                                 .when(df.AnzahlTodesfall > 0, "GESTORBEN")
                                 .otherwise("NICHTEINGETRETEN"))

In [8]:
df.limit(10).show()

+--------+------------+------------------+------------+------------+----------+----------+---------------+--------------------+-----------+--------------------+---------+--------------+--------------------+----------+-------------+--------------------+-----------------+----------+
|ObjectId|IdBundesland|        Bundesland|   Landkreis|Altersgruppe|Geschlecht|AnzahlFall|AnzahlTodesfall|          Meldedatum|IdLandkreis|          Datenstand|NeuerFall|NeuerTodesfall|            Refdatum|NeuGenesen|AnzahlGenesen|IstErkrankungsbeginn|    Altersgruppe2|FallStatus|
+--------+------------+------------------+------------+------------+----------+----------+---------------+--------------------+-----------+--------------------+---------+--------------+--------------------+----------+-------------+--------------------+-----------------+----------+
|       1|           1|Schleswig-Holstein|SK Flensburg|     A15-A34|         M|         3|              0|2021/03/19 00:00:...|       1001|29.05.2021, 00:

### Datumsformat
Aktuell ist im DataFrame das Datum als StringType deklariert. Um mit dem Wert besser arbeiten zu können, wird er in einen DateType umgewandelt.

In [9]:
# DateFormart
df = df.withColumn("Falldatum",to_date(col("Refdatum"), "yyyy/MM/dd HH:mm:ssx")).drop("Refdatum")

In [10]:
# Überprüfung
df.limit(10).show()

+--------+------------+------------------+------------+------------+----------+----------+---------------+--------------------+-----------+--------------------+---------+--------------+----------+-------------+--------------------+-----------------+----------+----------+
|ObjectId|IdBundesland|        Bundesland|   Landkreis|Altersgruppe|Geschlecht|AnzahlFall|AnzahlTodesfall|          Meldedatum|IdLandkreis|          Datenstand|NeuerFall|NeuerTodesfall|NeuGenesen|AnzahlGenesen|IstErkrankungsbeginn|    Altersgruppe2|FallStatus| Falldatum|
+--------+------------+------------------+------------+------------+----------+----------+---------------+--------------------+-----------+--------------------+---------+--------------+----------+-------------+--------------------+-----------------+----------+----------+
|       1|           1|Schleswig-Holstein|SK Flensburg|     A15-A34|         M|         3|              0|2021/03/19 00:00:...|       1001|29.05.2021, 00:00...|        0|            -9

## Datenreduktion
In dem folgenden Schritt werden alle Spalten gelöscht die nicht notwendig sind. Spalte Altersgruppe2 ist nicht mit konkreten Werten befüllt und kann daher entfernt werden. Die Informationen aus AnzahlTodesfall und AnzahlGenesen bzw. NeuerTodefall und NeuGenesen werden nun von 'FallStatus' abgebildet und können auch entfernt werden. Die Felder AnzahlFall und NeuerFall sind durch die Vereinzelung und das Herausfiltern von Korrekturwerten überflüssig geworden. Die ObjectId besitzt keine Aussagekraft, da nicht mehrere Puplikationen verglichen werden, sondern jeweils nur die Aktuelle Puplikation betrachtet wird. Auch das Feld "IstErkrankungsbeginn" ist nicht notwendig. Das Feld 'Datenstand' enthält für alle Records den selben Datumswert. Würde der RKI-Datensatz am heutigen Tag heruntergeladen, so enthielte es das aktuelle Datum. Daher kann es in der weiteren Verarbeitung verworfen werden. Die gesamte Datenreduktion ist im Zuge der Feature Selection erfolgt. 

In [11]:
# Definition der zu löschenden Spalten
columnsToDelete = ("Altersgruppe2", "AnzahlFall", "NeuerFall", "AnzahlTodesfall", "NeuerTodesfall", "AnzahlGenesen", "NeuGenesen", "IstErkrankungsbeginn", "Datenstand", "ObjectId", "Meldedatum")
df = df.drop(*columnsToDelete)

In [12]:
#Zeige die ersten Zehn Einträge zur Kontrolle
df.limit(10).show()

+------------+------------------+------------+------------+----------+-----------+----------+----------+
|IdBundesland|        Bundesland|   Landkreis|Altersgruppe|Geschlecht|IdLandkreis|FallStatus| Falldatum|
+------------+------------------+------------+------------+----------+-----------+----------+----------+
|           1|Schleswig-Holstein|SK Flensburg|     A15-A34|         M|       1001|   GENESEN|2021-03-16|
|           1|Schleswig-Holstein|SK Flensburg|     A15-A34|         M|       1001|   GENESEN|2021-03-16|
|           1|Schleswig-Holstein|SK Flensburg|     A15-A34|         M|       1001|   GENESEN|2021-03-16|
|           1|Schleswig-Holstein|SK Flensburg|     A15-A34|         M|       1001|   GENESEN|2021-03-19|
|           1|Schleswig-Holstein|SK Flensburg|     A15-A34|         M|       1001|   GENESEN|2021-03-19|
|           1|Schleswig-Holstein|SK Flensburg|     A15-A34|         M|       1001|   GENESEN|2021-03-19|
|           1|Schleswig-Holstein|SK Flensburg|     A15-

## Imputation fehlender Werte
Ein Teil der Datenreinigung und Aufbereitung wird durch die Impautation fehlender Werte dargestellt. Generell ist es keine Seltenheit, dass ein Datensatz fehlende Werte aufweist. Dies kann unter anderem zu Problemen während des Modelings führen, weshalb es verschiedene Möglichkeiten gibt damit umzugehen. Das einfache Löschen von Datensätzen mit fehlenden Werten könnte zu einer Verzerrung führen. Daher werden oftmals Machine Learing Techniken angewandt um plausible Werte für die einzelen Features zu finden. (García et al. 2016, 4) In dem vorliegenden Datensatz des RKIs ist in bestimmten Fällen das Geschlecht bzw. das Alter unbekannt.
### Imputation des Geschlechts
Da der der Imputer von PySpark nicht für kategorische Werte geignet ist (Apache Spark 2021), wurde das Verhältnis zwischen dem Männlichen sowie dem Weiblichen Geschlecht ermittelt. Anschließend wurden die unbekannten Werte durch ein gewichtetes Zufallsverfahren mit geschätzten Werten befüllt.

In [13]:
countWoman = df.filter(df.Geschlecht == "W").count()
countMan = df.filter(df.Geschlecht == "M").count()
countAll = countWoman + countMan
print("Anzahl Frauen: ", countWoman)
print("Anzahl Männer: ", countMan)
print("Gesamtzahl aller Datensätze mit vergebenem Geschlecht: ", countAll)

Anzahl Frauen:  1880476
Anzahl Männer:  1769786
Gesamtzahl aller Datensätze mit vergebenem Geschlecht:  3650262


In [14]:
df = df.withColumn("random", (rand() * countAll))
df = df.withColumn("randomGender", when(df.random > countWoman, "M").otherwise("W"))
df = df.withColumn("Geschlecht", when(df.Geschlecht == "unbekannt", df.randomGender).otherwise(df.Geschlecht)).drop("random","randomGender")

In [15]:
#Prüfung ob der Datensatz nun keine unbekannten Geschlächter enthält
df.filter(df.Geschlecht == "unbekannt").count()

0

## Umwandlung in numerische Werte
Da viele Modelle nur mit numerischen Werten arbeiten können, müssen nicht numerische Features mittels eines StringIndexers in numerische Features umgewandelt werden. Dies erfolgt mithilfe des StringIndexers jeweils für die Altersgruppe und das Geschlecht. Um die beiden Indexer miteinder zu verketten, wurde an dieser Stelle eine Pipeline verwendet.

In [16]:
altersgruppeIndexer = StringIndexer(inputCol="Altersgruppe", outputCol="AltersgruppeIndex")
geschlechtsIndexer = StringIndexer(inputCol="Geschlecht", outputCol="GeschlechtIndex")
fallstatusIndexer = StringIndexer(inputCol="FallStatus", outputCol="FallStatusIndex")
pipeline = Pipeline(stages=[altersgruppeIndexer, geschlechtsIndexer,fallstatusIndexer])
df = pipeline.fit(df).transform(df)

In [17]:
#Zeige die ersten zehn Einträge
df.limit(10).show()

+------------+------------------+------------+------------+----------+-----------+----------+----------+-----------------+---------------+---------------+
|IdBundesland|        Bundesland|   Landkreis|Altersgruppe|Geschlecht|IdLandkreis|FallStatus| Falldatum|AltersgruppeIndex|GeschlechtIndex|FallStatusIndex|
+------------+------------------+------------+------------+----------+-----------+----------+----------+-----------------+---------------+---------------+
|           1|Schleswig-Holstein|SK Flensburg|     A15-A34|         M|       1001|   GENESEN|2021-03-16|              1.0|            1.0|            0.0|
|           1|Schleswig-Holstein|SK Flensburg|     A15-A34|         M|       1001|   GENESEN|2021-03-16|              1.0|            1.0|            0.0|
|           1|Schleswig-Holstein|SK Flensburg|     A15-A34|         M|       1001|   GENESEN|2021-03-16|              1.0|            1.0|            0.0|
|           1|Schleswig-Holstein|SK Flensburg|     A15-A34|         M|

## Imputation der Altersgruppe
Für die Imputation von kategorischen Werten bietet PySpark keinen StandardImputer. Aus diesem Grund wurde hierfür das K-Nearest-Neighbour-Verfahren gewählt. Konkret fand die Klasse BucketedRandomProjectionLSH Anwendung, die auf dem Prinzip des Locality Sensitive Hashing basiert und als Entfernungsmaß die Euklidische Distanz verwendet.

### Aufbau des Feature-Vektors

In [18]:
assembler =  VectorAssembler(outputCol="features", inputCols=["FallStatusIndex", "GeschlechtIndex","IdLandkreis"])
featureVector = assembler.transform(df)


### Aufteilung des Vektors in Trainings- und Erkennungsdaten 

In [19]:
trainingFeatureVector = featureVector.filter(df.Altersgruppe != "unbekannt");
targetFeatureVector = featureVector.filter(df.Altersgruppe == "unbekannt");

### Aufteilung des Trainingvektors
Da sich im Zuge der vorliegenden Arbeit herausgestellt hat, dass das Finden des nächsten Nachbarwertes sehr Zeit- und Rechenintensiv ist, wurden die einzelnen Instanzen nach dem Feature-Vektor und der Altersgruppe gruppiert.

In [20]:
# Anzahl der Instanzen vor der Gruppierung
trainingFeatureVector.count()

3671974

In [21]:
# Gruppierung nach Feature-Vektor und Altersgruppe
trainingFeatureVectorGrouped = trainingFeatureVector.groupBy("features","Altersgruppe").count().orderBy(desc("features"))
# Kontrollausgabe
trainingFeatureVectorGrouped.limit(10).show()

+-----------------+------------+-----+
|         features|Altersgruppe|count|
+-----------------+------------+-----+
|[2.0,1.0,16077.0]|     A35-A59|    6|
|[2.0,1.0,16077.0]|     A60-A79|   48|
|[2.0,1.0,16077.0]|        A80+|   86|
|[2.0,1.0,16077.0]|     A15-A34|    1|
|[2.0,1.0,16076.0]|     A35-A59|    3|
|[2.0,1.0,16076.0]|        A80+|   69|
|[2.0,1.0,16076.0]|     A60-A79|   42|
|[2.0,1.0,16075.0]|     A60-A79|   34|
|[2.0,1.0,16075.0]|        A80+|   55|
|[2.0,1.0,16075.0]|     A35-A59|    7|
+-----------------+------------+-----+



In [22]:
# Anzahl der Instanzen nach der Gruppierung
trainingFeatureVectorGrouped.count()

12121

### Training des KNN-Modells
Während der berechnungen stellte sich heraus, dass die Ermettlung eines NN mit dem Paramter numHashTables=1 am schnellsten erfolgt. Dies liegt daran, dass höhere Werte zu einer erhöhten Modellkomplexität führen. Zum erhöhen des Wertes kann jedoch auch die False-Negativ-Rate gesenkt werden. (Bucket Random Projection LSH 2021)

In [23]:
mhLSH = BucketedRandomProjectionLSH(inputCol="features", outputCol="hashes", bucketLength=2.0,numHashTables=1)
model = mhLSH.fit(trainingFeatureVectorGrouped)
transformedTrainingFeatureVector = model.transform(trainingFeatureVectorGrouped)

In [24]:
#Kontroll-Ausgabe. Hier zeigt sich auch die Auswirkung des Parameters numHashTables
transformedTrainingFeatureVector.limit(10).show()

+-----------------+------------+-----+----------+
|         features|Altersgruppe|count|    hashes|
+-----------------+------------+-----+----------+
|[2.0,1.0,16077.0]|     A35-A59|    6|[[-331.0]]|
|[2.0,1.0,16077.0]|     A60-A79|   48|[[-331.0]]|
|[2.0,1.0,16077.0]|        A80+|   86|[[-331.0]]|
|[2.0,1.0,16077.0]|     A15-A34|    1|[[-331.0]]|
|[2.0,1.0,16076.0]|     A35-A59|    3|[[-331.0]]|
|[2.0,1.0,16076.0]|        A80+|   69|[[-331.0]]|
|[2.0,1.0,16076.0]|     A60-A79|   42|[[-331.0]]|
|[2.0,1.0,16075.0]|     A60-A79|   34|[[-331.0]]|
|[2.0,1.0,16075.0]|        A80+|   55|[[-331.0]]|
|[2.0,1.0,16075.0]|     A35-A59|    7|[[-331.0]]|
+-----------------+------------+-----+----------+



### Gruppieren der Verarbeitungsdaten
Da die Ermittlung von Nachbarn bis zu 20 Sekunden pro Funktionsaufruf dauert (s.u.), wurden an dieser Stelle auch Instanzen mit unbekanntem Alter gruppiert, um so weniger Erkennungen ausführen zu müssen.

In [25]:
targetFeatureVectorGrouped = targetFeatureVector.groupBy("features").count()
# Ausgabe der ersten Zehn Gruppen zur Kontrolle
targetFeatureVectorGrouped.show(10, False)

+-----------------+-----+
|features         |count|
+-----------------+-----+
|[0.0,1.0,9277.0] |1    |
|[2.0,1.0,7111.0] |1    |
|[2.0,1.0,11005.0]|1    |
|[1.0,0.0,5382.0] |1    |
|[0.0,1.0,8125.0] |5    |
|[0.0,0.0,7235.0] |1    |
|[0.0,0.0,7335.0] |3    |
|[0.0,0.0,11003.0]|9    |
|[0.0,0.0,6435.0] |2    |
|[0.0,0.0,7134.0] |4    |
+-----------------+-----+
only showing top 10 rows



In [26]:
# Anzahl der nun vorliegenen Instanzen
targetFeatureVectorGrouped.count()

444

### Durchführen der Erkennung
Nachfolgend werden für jede Instanz aus den gruppierten Erkennungsdaten die sechs nächsten Nachbarn (6 verschiedene Altersgruppen) ermittelt. Diese dienen als Basis für die zufällige Generierung der Altersgruppe. Je häufiger eine Altersgruppe in den ermittelten Nachbarn repräsentiert ist (count der gruppierten Trainingsdaten), desto wahrscheinlicher wird diese Altersgruppe vergeben. Die Generierung der Altersgruppe erfolgt nach der Methode getRandomAgeByFraction. 
Damit das Modell mit den Daten Arbeiten kann, müssen die zu erkennden Instanzen zuerst in den Speicher geladen werden. Die Verarbeitung erfolgt anschließend über eine Schleife. Die Ergebnisse werden in einem Array gespeichert und nachfolgend einem DateFrame zugeführt. Die Anzahl der Bereits verarbeiteten Instanzen und der Zeitdifferenz pro Durchlauf werden entsprechend ausgegeben.

In [27]:
# Definiton der Altersgruppen-Generierungsfunktion
def getRandomAgeByFraction(recordList):
    sum = 0
    dictList = []
    for record in recordList :
        dict = record.asDict()
        sum = sum + dict["count"]
        dictList.append(dict)

    percentSum = 0    
    for dict in dictList :
        dict["startPercent"] = percentSum
        dict["percent"] = dict["count"] / sum
        percentSum = percentSum + dict["percent"]
        dict["endPercent"] = percentSum


    randNr = random.random()
    Altergruppe = ""
    for dict in dictList :
        Altergruppe = dict["Altersgruppe"] if ((randNr >= dict["startPercent"]) & (randNr <= dict["endPercent"])) else Altergruppe
    
    return Altergruppe

In [28]:
# Durchführung der Erkennung und der zufälligen Vergabe der Altersgruppe mit Ausgabe des aktuellem Stands und der Zeitdifferenz pro Durchlauf
resultList = []

records = targetFeatureVectorGrouped.localCheckpoint(True).collect()
iterrations = 0
startIterTime = time.time()

for record in records :
    iterrations = iterrations + 1
    if (iterrations % 1) == 0 :
        clear_output(wait=True)
        currentTime = time.time()
        timeDiff = currentTime - startIterTime
        startIterTime = currentTime
        display('Iteration '+str(iterrations)+" - TimeDiff: " +str(timeDiff))
    groupedRecords = model.approxNearestNeighbors(transformedTrainingFeatureVector, record.features, 6).orderBy(desc("count")).drop("features","hashes").collect()
    randAge = getRandomAgeByFraction(groupedRecords)    
    newRecord = Row(features=record.features, Altersgruppe=randAge)
    resultList.append(newRecord)

'Iteration 444 - TimeDiff: 16.09078884124756'

In [29]:
# Erstellen eines neuen DataFrames
columns = ["features","AltersgruppeRandom"]
resultListDF = spark.createDataFrame(data=resultList, schema = columns)

In [30]:
# Kontrollausgabe des Dateframes
resultListDF.limit(10).show()

+-----------------+------------------+
|         features|AltersgruppeRandom|
+-----------------+------------------+
| [0.0,1.0,9277.0]|           A35-A59|
| [2.0,1.0,7111.0]|              A80+|
|[2.0,1.0,11005.0]|              A80+|
| [1.0,0.0,5382.0]|           A15-A34|
| [0.0,1.0,8125.0]|           A35-A59|
| [0.0,0.0,7235.0]|           A60-A79|
| [0.0,0.0,7335.0]|           A15-A34|
|[0.0,0.0,11003.0]|           A05-A14|
| [0.0,0.0,6435.0]|           A35-A59|
| [0.0,0.0,7134.0]|           A35-A59|
+-----------------+------------------+



### Zusammenführen der beiden DataFrames
Nachfolgend wird der initiale Dataframe mit dem neuen Dataframe zusammengeführt. In diesem Zuge erfolgt die Imputation der unbekannten Altersgruppen.

In [31]:
joinedDF = featureVector.join(resultListDF, on='features', how='left')
joinedDF.show()

+-----------------+------------+----------+---------+------------+----------+-----------+----------+----------+-----------------+---------------+---------------+------------------+
|         features|IdBundesland|Bundesland|Landkreis|Altersgruppe|Geschlecht|IdLandkreis|FallStatus| Falldatum|AltersgruppeIndex|GeschlechtIndex|FallStatusIndex|AltersgruppeRandom|
+-----------------+------------+----------+---------+------------+----------+-----------+----------+----------+-----------------+---------------+---------------+------------------+
|[0.0,0.0,16067.0]|          16| Thüringen| LK Gotha|     A00-A04|         W|      16067|   GENESEN|2020-04-21|              5.0|            0.0|            0.0|              null|
|[0.0,0.0,16067.0]|          16| Thüringen| LK Gotha|     A00-A04|         W|      16067|   GENESEN|2020-09-12|              5.0|            0.0|            0.0|              null|
|[0.0,0.0,16067.0]|          16| Thüringen| LK Gotha|     A00-A04|         W|      16067|   GEN

In [32]:
# Kontrollausgabe
joinedDF.filter(joinedDF.AltersgruppeRandom.isNotNull() & (joinedDF.Altersgruppe == "unbekannt")).show()

+-----------------+------------+-------------------+-------------------+------------+----------+-----------+----------------+----------+-----------------+---------------+---------------+------------------+
|         features|IdBundesland|         Bundesland|          Landkreis|Altersgruppe|Geschlecht|IdLandkreis|      FallStatus| Falldatum|AltersgruppeIndex|GeschlechtIndex|FallStatusIndex|AltersgruppeRandom|
+-----------------+------------+-------------------+-------------------+------------+----------+-----------+----------------+----------+-----------------+---------------+---------------+------------------+
| [0.0,1.0,9277.0]|           9|             Bayern|      LK Rottal-Inn|   unbekannt|         M|       9277|         GENESEN|2021-05-03|              6.0|            1.0|            0.0|           A35-A59|
| [2.0,1.0,7111.0]|           7|    Rheinland-Pfalz|         SK Koblenz|   unbekannt|         M|       7111|       GESTORBEN|2021-01-26|              6.0|            1.0|      

In [33]:
# Hier werden die unbekannten Werte der Altersgruppe durch die zufällig generierten Werte ersetzt.
dfAltersgruppeImputed = joinedDF.withColumn("Altersgruppe", when((joinedDF.Altersgruppe == "unbekannt") & (joinedDF.AltersgruppeRandom.isNotNull()), joinedDF.AltersgruppeRandom).otherwise(joinedDF.Altersgruppe)).drop("AltersgruppeRandom","features")

In [34]:
# Überprüfung, dass die Altersgruppe nun für alle Instanzen befüllt ist
dfAltersgruppeImputed.filter((dfAltersgruppeImputed.Altersgruppe == "unbekannt")).count()

0

## AltersguppeIndex
Da die Altersgruppe verändert wurde, muss hierfür der AltersgruppeIndex aktualisiert werden.

In [35]:
altersgruppeIndexer = StringIndexer(inputCol="Altersgruppe", outputCol="AltersgruppeIndexNeu")
altersgruppeModel = altersgruppeIndexer.fit(dfAltersgruppeImputed)
dfWithAgeIndexNew = altersgruppeModel.transform(dfAltersgruppeImputed)
dfWithAgeIndex = dfWithAgeIndexNew.withColumn("AltersgruppeIndex", dfWithAgeIndexNew.AltersgruppeIndexNeu).drop("AltersgruppeIndexNeu")
dfWithAgeIndex.show()


+------------+----------+---------+------------+----------+-----------+----------+----------+-----------------+---------------+---------------+
|IdBundesland|Bundesland|Landkreis|Altersgruppe|Geschlecht|IdLandkreis|FallStatus| Falldatum|AltersgruppeIndex|GeschlechtIndex|FallStatusIndex|
+------------+----------+---------+------------+----------+-----------+----------+----------+-----------------+---------------+---------------+
|          16| Thüringen| LK Gotha|     A00-A04|         W|      16067|   GENESEN|2020-04-21|              5.0|            0.0|            0.0|
|          16| Thüringen| LK Gotha|     A00-A04|         W|      16067|   GENESEN|2020-09-12|              5.0|            0.0|            0.0|
|          16| Thüringen| LK Gotha|     A00-A04|         W|      16067|   GENESEN|2020-10-23|              5.0|            0.0|            0.0|
|          16| Thüringen| LK Gotha|     A00-A04|         W|      16067|   GENESEN|2020-11-18|              5.0|            0.0|         

## Speichern des Verarbeitungsergebnisses
Da die Verarbeitung sehr zeitintensiv ist macht es Sinn, das Ergebnis der Data-Preperation als neues CSV-File zu speichern, um so für die kommenden Phase nur noch die verarbeitete Datei einzulesen. Sie wird im Ordner data-preperation-result hinterlegt. Der Dateiname ist hierbei nicht beinflussbar, weshalb er im Nachhinein anzupassen ist.

In [36]:
finalDF = dfWithAgeIndex.selectExpr("Bundesland", "IdBundesland as BundeslandIndex","Landkreis", "IdLandkreis as LandkreisIndex","Altersgruppe", "AltersgruppeIndex", "Geschlecht", "GeschlechtIndex", "FallStatus", "FallStatusIndex","Falldatum")

In [37]:
finalDF.repartition(1).write.format('csv').option('header',True).mode('overwrite').option('sep',';').save(translate_to_file_string("./data/data-preperation-result"))