# SETUP


In [1]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [2]:
!wget -q https://dlcdn.apache.org/spark/spark-3.1.2/spark-3.1.2-bin-hadoop3.2.tgz

In [3]:
!tar xf /content/spark-3.1.2-bin-hadoop3.2.tgz

In [4]:
!pip install -q findspark

In [5]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.2-bin-hadoop3.2"

In [6]:
import findspark
findspark.init()

In [7]:
findspark.find()

'/content/spark-3.1.2-bin-hadoop3.2'

# IMPORT

In [26]:
from pyspark.sql import SparkSession, Window
from pyspark import SparkFiles
import pyspark.sql.functions as F
#import os

spark = SparkSession.builder\
        .master("local")\
        .appName("Covid Exploratory Data Analysis")\
        .config('spark.ui.port', '4050')\
        .getOrCreate()

# VACCINATIONS

In [10]:
#dataset shows the amount of people vaccinated in germany, taken from Robert Koch Institut
VAC_URL = 'https://raw.githubusercontent.com/robert-koch-institut/COVID-19-Impfungen_in_Deutschland/master/Aktuell_Deutschland_Bundeslaender_COVID-19-Impfungen.csv'
sc = spark.sparkContext
sc.addFile(VAC_URL)
df_vac = spark.read.csv(SparkFiles.get('Aktuell_Deutschland_Bundeslaender_COVID-19-Impfungen.csv'), header=True)

In [44]:
#in order to find the true amount of people who are vaccinated, only the second vaccine shots and the Janssen vaccine are kept (since Janssen is only 1 shot)
df_vac_filtered = df_vac.filter((F.col('Impfserie') >= 2) | (F.col('Impfstoff') == 'Janssen')).show(10)

+----------+--------------------+---------+---------+------+
| Impfdatum|BundeslandId_Impfort|Impfstoff|Impfserie|Anzahl|
+----------+--------------------+---------+---------+------+
|2020-12-27|                  05|Comirnaty|        2|    11|
|2020-12-27|                  06|Comirnaty|        2|     2|
|2020-12-28|                  06|Comirnaty|        2|    36|
|2020-12-28|                  14|Comirnaty|        2|   125|
|2020-12-29|                  05|Comirnaty|        2|   578|
|2020-12-29|                  06|Comirnaty|        2|   135|
|2020-12-29|                  14|Comirnaty|        2|    28|
|2020-12-30|                  05|Comirnaty|        2|     2|
|2020-12-30|                  06|Comirnaty|        2|   114|
|2020-12-30|                  09|Comirnaty|        2|     1|
+----------+--------------------+---------+---------+------+
only showing top 10 rows



In [39]:
#amount of people who had the vaccine are summed over date
df_vaccinated = df_vac_filtered.groupBy('Impfdatum') \
                .agg(F.sum('Anzahl').alias('SUM_VACCINATIONS')) \
                .withColumnRenamed('Impfdatum', 'DATUM')
df_vaccinated.orderBy(['DATUM'], ascending=[False]).show(10)

+----------+----------------+
|     DATUM|SUM_VACCINATIONS|
+----------+----------------+
|2021-09-05|         35609.0|
|2021-09-04|         62080.0|
|2021-09-03|        141763.0|
|2021-09-02|        149874.0|
|2021-09-01|        173328.0|
|2021-08-31|        144099.0|
|2021-08-30|        119167.0|
|2021-08-29|         53244.0|
|2021-08-28|         79740.0|
|2021-08-27|        155920.0|
+----------+----------------+
only showing top 10 rows



# INFECTIONS

In [12]:
#dataset shows the amount of people who are infected in germany
INF_URL = 'https://media.githubusercontent.com/media/robert-koch-institut/SARS-CoV-2_Infektionen_in_Deutschland/master/Aktuell_Deutschland_SarsCov2_Infektionen.csv'
sc_inf = spark.sparkContext
sc_inf.addFile(INF_URL)
df_inf = spark.read.csv(SparkFiles.get('Aktuell_Deutschland_SarsCov2_Infektionen.csv'), header=True)


In [19]:
#infections and deaths are summed over date
df_infected = df_inf.groupBy('Meldedatum') \
              .agg(F.sum('AnzahlFall').alias('SUM_INFECTIONS'), F.sum('AnzahlTodesfall').alias('SUM_DEATHS')) \
              .withColumnRenamed('Meldedatum', 'DATUM') \
              .orderBy(['DATUM'], ascending=[False])
df_infected.show(10)

+----------+--------------+----------+
|     DATUM|SUM_INFECTIONS|SUM_DEATHS|
+----------+--------------+----------+
|2021-09-05|        2774.0|       0.0|
|2021-09-04|        8204.0|       1.0|
|2021-09-03|       11007.0|       0.0|
|2021-09-02|       12206.0|       3.0|
|2021-09-01|       14406.0|       4.0|
|2021-08-31|       14432.0|       6.0|
|2021-08-30|        6995.0|       6.0|
|2021-08-29|        3720.0|       5.0|
|2021-08-28|        8377.0|       9.0|
|2021-08-27|       10351.0|      19.0|
+----------+--------------+----------+
only showing top 10 rows



# Data Analysis

Goal is to find whether there is a correlation between the amount of people getting vaccinated and the infections from covid.

In [40]:
merged_table = df_vaccinated.join(df_infected,['DATUM'],'inner').orderBy('DATUM', ascending=[False])

In [41]:
merged_table.show(20)

+----------+----------------+--------------+----------+
|     DATUM|SUM_VACCINATIONS|SUM_INFECTIONS|SUM_DEATHS|
+----------+----------------+--------------+----------+
|2021-09-05|         35609.0|        2774.0|       0.0|
|2021-09-04|         62080.0|        8204.0|       1.0|
|2021-09-03|        141763.0|       11007.0|       0.0|
|2021-09-02|        149874.0|       12206.0|       3.0|
|2021-09-01|        173328.0|       14406.0|       4.0|
|2021-08-31|        144099.0|       14432.0|       6.0|
|2021-08-30|        119167.0|        6995.0|       6.0|
|2021-08-29|         53244.0|        3720.0|       5.0|
|2021-08-28|         79740.0|        8377.0|       9.0|
|2021-08-27|        155920.0|       10351.0|      19.0|
|2021-08-26|        184512.0|       11578.0|       8.0|
|2021-08-25|        218572.0|       12716.0|      15.0|
|2021-08-24|        191737.0|       12876.0|      17.0|
|2021-08-23|        154483.0|        6525.0|      18.0|
|2021-08-22|         59568.0|        3097.0|    

In [52]:
#since sum_vaccinations only gives the amount of people at a given date, I summed the amount of vaccinated people with the preceding date
#all_vaccinations gives the total amount of people, who are vaccinated at the given date
windowval = (Window.orderBy('DATUM') \
             .rangeBetween(Window.unboundedPreceding, 0))
final_table = merged_table.withColumn('ALL_VACCINATIONS', F.sum('SUM_VACCINATIONS').over(windowval))
final_table.orderBy('DATUM', ascending=[False]).show()

+----------+----------------+--------------+----------+----------------+
|     DATUM|SUM_VACCINATIONS|SUM_INFECTIONS|SUM_DEATHS|ALL_VACCINATIONS|
+----------+----------------+--------------+----------+----------------+
|2021-09-05|         35609.0|        2774.0|       0.0|      5.102845E7|
|2021-09-04|         62080.0|        8204.0|       1.0|     5.0992841E7|
|2021-09-03|        141763.0|       11007.0|       0.0|     5.0930761E7|
|2021-09-02|        149874.0|       12206.0|       3.0|     5.0788998E7|
|2021-09-01|        173328.0|       14406.0|       4.0|     5.0639124E7|
|2021-08-31|        144099.0|       14432.0|       6.0|     5.0465796E7|
|2021-08-30|        119167.0|        6995.0|       6.0|     5.0321697E7|
|2021-08-29|         53244.0|        3720.0|       5.0|      5.020253E7|
|2021-08-28|         79740.0|        8377.0|       9.0|     5.0149286E7|
|2021-08-27|        155920.0|       10351.0|      19.0|     5.0069546E7|
|2021-08-26|        184512.0|       11578.0|       

In [43]:
final_table.corr('ALL_VACCINATIONS','SUM_DEATHS')
#as seen below there is a significant negative correlation between the increasing amount of people getting vaccinated and deaths resulting from covid

-0.6026857246230378

In [46]:
final_table.corr('ALL_VACCINATIONS','SUM_INFECTIONS')
#negative correlation can be observed with the infections too however vaccination is much more efficient in preventing deaths rather than infections

-0.5217619676628977

In [51]:
#recovery rate is calculated by subtracting the amount of deaths from the infected and dividing it by the amount of infected people
rdd = merged_table.rdd.map(lambda x: (x[0],(x[2]-x[3])*100/x[2]))
df2 = rdd.toDF(["DATUM","%_RECOVERY_RATE"])
df2.show(10)

+----------+-----------------+
|     DATUM|  %_RECOVERY_RATE|
+----------+-----------------+
|2021-09-05|            100.0|
|2021-09-04| 99.9878108239883|
|2021-09-03|            100.0|
|2021-09-02| 99.9754219236441|
|2021-09-01|99.97223379147577|
|2021-08-31|99.95842572062084|
|2021-08-30|99.91422444603288|
|2021-08-29|99.86559139784946|
|2021-08-28|  99.892562970037|
|2021-08-27|99.81644285576273|
+----------+-----------------+
only showing top 10 rows



In [57]:
#finally fatality rate of covid is caluclated by dividing the total death numbers to the total infections
fatality = merged_table.agg(F.sum("SUM_INFECTIONS").alias("TOTAL_INFECTED"), F.sum("SUM_DEATHS").alias("TOTAL_DEATH"))
fatality.withColumn("%_FATALITY_RATE", fatality["TOTAL_DEATH"]*100/fatality["TOTAL_INFECTED"]).show()

+--------------+-----------+------------------+
|TOTAL_INFECTED|TOTAL_DEATH|   %_FATALITY_RATE|
+--------------+-----------+------------------+
|     2359822.0|    46157.0|1.9559526099849904|
+--------------+-----------+------------------+

