In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import ceil
from pyspark import SparkFiles

spark = SparkSession \
    .builder \
    .appName("Python Spark SQL basic example") \
    .getOrCreate()

spark.sparkContext.addFile("https://opendata.ecdc.europa.eu/covid19/casedistribution/csv")

In [2]:
day = 10
month = 12
year = 2020

In [3]:
df = spark.read.csv("file://"+SparkFiles.get("csv"), header=True, inferSchema= True)

In [4]:
today_df = df.filter(df['day'] == day).filter(df['month'] == month).filter(df['year'] == year)
today_df.show()

+----------+---+-----+----+-----+------+-----------------------+-----+--------------------+-----------+------------+----------------------------------------------------------+
|   dateRep|day|month|year|cases|deaths|countriesAndTerritories|geoId|countryterritoryCode|popData2019|continentExp|Cumulative_number_for_14_days_of_COVID-19_cases_per_100000|
+----------+---+-----+----+-----+------+-----------------------+-----+--------------------+-----------+------------+----------------------------------------------------------+
|09/12/2020|  9|   12|2020|  135|    13|            Afghanistan|   AF|                 AFG|   38041757|        Asia|                                                6.96340077|
|09/12/2020|  9|   12|2020|  753|    14|                Albania|   AL|                 ALB|    2862427|      Europe|                                              354.10510032|
|09/12/2020|  9|   12|2020|  591|    12|                Algeria|   DZ|                 DZA|   43053054|      Africa|    

In [5]:
columns_to_remove = ["day", "month", "year", "geoId", "countryterritoryCode"]
today_df = today_df.drop(*columns_to_remove)

In [6]:
today_df.show()

+----------+-----+------+-----------------------+-----------+------------+----------------------------------------------------------+
|   dateRep|cases|deaths|countriesAndTerritories|popData2019|continentExp|Cumulative_number_for_14_days_of_COVID-19_cases_per_100000|
+----------+-----+------+-----------------------+-----------+------------+----------------------------------------------------------+
|09/12/2020|  135|    13|            Afghanistan|   38041757|        Asia|                                                6.96340077|
|09/12/2020|  753|    14|                Albania|    2862427|      Europe|                                              354.10510032|
|09/12/2020|  591|    12|                Algeria|   43053054|      Africa|                                               28.83883685|
|09/12/2020|   43|     0|                Andorra|      76177|      Europe|                                             1018.68017906|
|09/12/2020|   81|     1|                 Angola|   31825299| 

In [8]:
today_df = today_df.withColumn('death_per_million', ceil((today_df.deaths / today_df.popData2019)*1000000))
today_df = today_df.withColumn('death_rate', (today_df.deaths / today_df.cases))

In [9]:
today_df.show()

+----------+-----+------+-----------------------+-----------+------------+----------------------------------------------------------+-----------------+--------------------+
|   dateRep|cases|deaths|countriesAndTerritories|popData2019|continentExp|Cumulative_number_for_14_days_of_COVID-19_cases_per_100000|death_per_million|          death_rate|
+----------+-----+------+-----------------------+-----------+------------+----------------------------------------------------------+-----------------+--------------------+
|09/12/2020|  135|    13|            Afghanistan|   38041757|        Asia|                                                6.96340077|                1|  0.0962962962962963|
|09/12/2020|  753|    14|                Albania|    2862427|      Europe|                                              354.10510032|                5| 0.01859229747675963|
|09/12/2020|  591|    12|                Algeria|   43053054|      Africa|                                               28.83883685|  

In [11]:
today_df[df.countriesAndTerritories=="France"].show()

+----------+-----+------+-----------------------+-----------+------------+----------------------------------------------------------+-----------------+--------------------+
|   dateRep|cases|deaths|countriesAndTerritories|popData2019|continentExp|Cumulative_number_for_14_days_of_COVID-19_cases_per_100000|death_per_million|          death_rate|
+----------+-----+------+-----------------------+-----------+------------+----------------------------------------------------------+-----------------+--------------------+
|09/12/2020|13713|   831|                 France|   67012883|      Europe|                                              232.50156242|               13|0.060599431196674686|
+----------+-----+------+-----------------------+-----------+------------+----------------------------------------------------------+-----------------+--------------------+



In [None]:
# A faire : Lancé ce programme avec Airflow, tous les jours à 23h59 avec les paramètres day, month & year adequats
# Le programme récupère la data depuis open data pour le jour actuel et effectue le pré-processing nécessaire
# /!\ A faire : Poster via l'API ELK le dataset today_df de façon à pouvoir l'indexé là-bas
# A faire : Faire un dashboard de reporting sur Kibana de la situation du jour

# Potentiel : Rajouter des pre processing sur à Spark

# (OpenData => Dockerised Spark => ELK) scheduler par Airflow