# Transformación datos

### 7. Transformación de datos Covid-19

* Data on 14-day notification rate of new COVID-19 cases and deaths
* Data on COVID-19 vaccination in the EU/EEA

In [1]:
import pandas as pd
import eurostat
import plotly.express as px # Graphics
from pyspark.sql import SparkSession 
from pyspark.sql.types import StructField, StructType, StringType, IntegerType, FloatType 
from pyspark.sql import functions as func

In [2]:
spark = SparkSession.builder.appName("TrOnCovid").master("local") .getOrCreate() 

In [3]:
cases14day = spark.read.parquet("hdfs://localhost:9000/TFM_CEE/row/Weeklycases.parquet")
VaccinationQ = spark.read.parquet("hdfs://localhost:9000/TFM_CEE/row/WeeklyVaccineData.parquet")

#### Objetivo 1:
- Convertir datos en variable YearWeekISO / year_weeka a datos trimestrales en nueva variable DataQ
        - Extraer quarter y año from date
        - Concatenar en una columna
        - Agrupar datos por trimestre

In [4]:
cases14day = cases14day.select('country_code', 'country', 'population', 'indicator', 'year_week', 'date', 'weekly_count', 'rate_14_day',
                                'cumulative_count')

#Con la función quarter extraigo el trimestre de la variable date en nueva columna quarter
casesQ = cases14day.withColumn('quarter', func.quarter(cases14day.date)) #Covid
VaccinationQ = VaccinationQ.withColumn('quarter', func.quarter(VaccinationQ.date)) #Vaccination

#Con la función year extraigo el año de la variable date
# No uso el año de la variable Year que ya tengo porque me he dado cuenta que la ultima semana de 2020
# es en ralidad la primera de 2021 en modo ISO. nueva columna Real_year
casesQ = casesQ.withColumn('Real_year', func.year(cases14day.date))
VaccinationQ = VaccinationQ.withColumn('Real_year', func.year(VaccinationQ.date))

# Nueva variable DateQ concatenando los valores de Real_year y quarter tengo el mismo formato que el resto de DF
casesQ = casesQ.withColumn('DateQ', func.concat_ws("Q",casesQ.Real_year,casesQ.quarter))
VaccinationQ = VaccinationQ.withColumn('DateQ', func.concat_ws("Q",VaccinationQ.Real_year,VaccinationQ.quarter))

casesQ.show()

+------------+-------+----------+---------+---------+----------+------------+-----------------+----------------+-------+---------+------+
|country_code|country|population|indicator|year_week|      date|weekly_count|      rate_14_day|cumulative_count|quarter|Real_year| DateQ|
+------------+-------+----------+---------+---------+----------+------------+-----------------+----------------+-------+---------+------+
|         AUT|Austria|   8901064|    cases|  2020-01|2020-01-06|           0|              0.0|               0|      1|     2020|2020Q1|
|         AUT|Austria|   8901064|    cases|  2020-02|2020-01-13|           0|              0.0|               0|      1|     2020|2020Q1|
|         AUT|Austria|   8901064|    cases|  2020-03|2020-01-20|           0|              0.0|               0|      1|     2020|2020Q1|
|         AUT|Austria|   8901064|    cases|  2020-04|2020-01-27|           0|              0.0|               0|      1|     2020|2020Q1|
|         AUT|Austria|   8901064| 

In [5]:
casesQ.select('quarter', 'Real_year', 'DateQ').show(2)

+-------+---------+------+
|quarter|Real_year| DateQ|
+-------+---------+------+
|      1|     2020|2020Q1|
|      1|     2020|2020Q1|
+-------+---------+------+
only showing top 2 rows



In [6]:
# Una vez asignado el trimestre a cada fecha agrupo los datos por DateQ y obtengo:
# Para weekly_count suma de casos dentro del trimestre
# Para cumulative_count el dato más alto en determinado trimestre
# Para rate_14_day la media para cada trimestre

from pyspark.sql.functions import sum,avg,max,min,mean,count

CovidCasesQ = casesQ.groupBy('country_code', 'country', 'DateQ', 'population', 'indicator') \
            .agg(sum('weekly_count').alias('Quarter_count'), \
             max('cumulative_count').alias('cumulative_count'), \
             avg('rate_14_day').alias('rate_Quarter')\
            )\
            .sort('country', 'indicator', 'DateQ')

In [7]:
CovidCasesQ.filter(CovidCasesQ.country == 'Spain').show(3)

+------------+-------+------+----------+---------+-------------+----------------+------------------+
|country_code|country| DateQ|population|indicator|Quarter_count|cumulative_count|      rate_Quarter|
+------------+-------+------+----------+---------+-------------+----------------+------------------+
|         ESP|  Spain|2020Q1|  47332614|    cases|       157679|          157679| 44.09124427723027|
|         ESP|  Spain|2020Q2|  47332614|    cases|       100538|          258217|39.433535356663576|
|         ESP|  Spain|2020Q3|  47332614|    cases|       543499|          801716|165.47884200619376|
+------------+-------+------+----------+---------+-------------+----------------+------------------+
only showing top 3 rows



In [8]:
# Una vez asignado el trimestre a cada fecha agrupo los datos por DateQ y obtengo para cada variable total vacunas aplicadas por timestre

from pyspark.sql.functions import sum,avg,max,min,mean,count

VaccinationQ = VaccinationQ.groupBy('ReportingCountry', 'DateQ', 'Vaccine', 'Denominator') \
            .agg(sum('FirstDose').alias('FirstDose'), \
             sum('SecondDose').alias('SecondDose'), \
             sum('UnknownDose').alias('UnknownDose')\
                )\
            .sort('ReportingCountry', 'Vaccine', 'DateQ')

In [9]:
#DF Vaccination por trimestre
VaccinationQ.filter(VaccinationQ.ReportingCountry == 'IT').show(3)

+----------------+------+-------+-----------+---------+----------+-----------+
|ReportingCountry| DateQ|Vaccine|Denominator|FirstDose|SecondDose|UnknownDose|
+----------------+------+-------+-----------+---------+----------+-----------+
|              IT|2021Q1|     AZ|   50208329|  2206753|       973|          0|
|              IT|2021Q2|     AZ|   50208329|  4195214|   2719313|          0|
|              IT|2021Q3|     AZ|   50208329|    27517|   2817909|          0|
+----------------+------+-------+-----------+---------+----------+-----------+
only showing top 3 rows



#### Objetivo 2 Covid Data:
- Porcentaje de población casos positivos y decesos
        - Nueva columna con porcentaje de población diagnosticada como caso positivo
        - Nueva columna con porcentaje de población Fallecida por Covid-19

In [10]:
CovidPercQ = CovidCasesQ.withColumn('cumulative_perc', (CovidCasesQ.cumulative_count * 100) / CovidCasesQ.population) \
                .withColumn('Quarter_perc', (CovidCasesQ.Quarter_count * 100) / CovidCasesQ.population) 


In [11]:
from pyspark.sql.functions import col, expr, when
CovidPercQ = CovidPercQ.select("*", func.round(col('cumulative_perc'),3).alias('cumul_percentage'))
CovidPercQ = CovidPercQ.select("*", func.round(col('Quarter_perc'),3).alias('Quarter_percentage'))
CovidPercQ = CovidPercQ.select("*", func.round(col('rate_Quarter'),3).alias('rate_Quarter_'))
CovidPercQ = CovidPercQ.select("country_code", "country", "DateQ", "population", "indicator", "Quarter_count", "cumulative_count",
                               "rate_Quarter_", "Quarter_percentage", "cumul_percentage")



CovidPercQ.show()

+------------+-------+------+----------+---------+-------------+----------------+-------------+------------------+----------------+
|country_code|country| DateQ|population|indicator|Quarter_count|cumulative_count|rate_Quarter_|Quarter_percentage|cumul_percentage|
+------------+-------+------+----------+---------+-------------+----------------+-------------+------------------+----------------+
|         AUT|Austria|2020Q1|   8901064|    cases|         9196|            9196|       11.463|             0.103|           0.103|
|         AUT|Austria|2020Q2|   8901064|    cases|         8439|           17635|       18.728|             0.095|           0.198|
|         AUT|Austria|2020Q3|   8901064|    cases|        25851|           43486|       40.947|              0.29|           0.489|
|         AUT|Austria|2020Q4|   8901064|    cases|       307401|          350887|      523.808|             3.454|           3.942|
|         AUT|Austria|2021Q1|   8901064|    cases|       203338|          55

In [12]:
#Guardo DF con resultados perc en hdfs /TFM_CEE/output
#CovidPercQ.write.mode('overwrite').parquet("hdfs://localhost:9000/TFM_CEE/output/casesporcenpobl.parquet")

#### Objetivo 3 Vaccine Data:
- Ratio de población denominator con dosis completa de vacunación
        - Hallar datos para cantidad de dosis completa vacunación
        - Janssen = FirstDose + UnknownDose
        - No Janssen = SecondDose
        - Resultados anteriores en nueva columna Fully_vaccine
        - Sacar acumulado 
        - Hallar porcentaje de población vacunada en cada trimestre todas las vacunas
        - Hallar porcentaje acumulado

In [13]:
#Nota: planteado antes de dosis de refuerzo
VaccinationQ.filter((VaccinationQ.Vaccine == 'JANSS')).show(10)

+----------------+------+-------+-----------+---------+----------+-----------+
|ReportingCountry| DateQ|Vaccine|Denominator|FirstDose|SecondDose|UnknownDose|
+----------------+------+-------+-----------+---------+----------+-----------+
|              AT|2021Q1|  JANSS|    7358443|       59|         0|          0|
|              AT|2021Q2|  JANSS|    7358443|   128937|        13|          0|
|              AT|2021Q3|  JANSS|    7358443|   183302|       225|          0|
|              AT|2021Q4|  JANSS|    7358443|    19127|        71|          0|
|              BE|2021Q1|  JANSS|    9202196|        0|         0|          0|
|              BE|2021Q2|  JANSS|    9202196|   268348|         0|          0|
|              BE|2021Q3|  JANSS|    9202196|   110684|         0|          0|
|              BE|2021Q4|  JANSS|    9202196|    14717|         0|          0|
|              BG|2021Q2|  JANSS|    5761802|    39994|         0|          0|
|              BG|2021Q3|  JANSS|    5761802|   1557

In [14]:
VaccinationQ.filter((VaccinationQ.Vaccine != 'JANSS')).show(10)

+----------------+------+-------+-----------+---------+----------+-----------+
|ReportingCountry| DateQ|Vaccine|Denominator|FirstDose|SecondDose|UnknownDose|
+----------------+------+-------+-----------+---------+----------+-----------+
|              AT|2021Q1|     AZ|    7358443|   421739|       154|          0|
|              AT|2021Q2|     AZ|    7358443|   379998|    578781|          0|
|              AT|2021Q3|     AZ|    7358443|     6429|    185758|          0|
|              AT|2021Q4|     AZ|    7358443|      241|       723|          0|
|              AT|2021Q1|    COM|    7358443|   784947|    492275|          0|
|              AT|2021Q2|    COM|    7358443|  2629100|   1872751|          0|
|              AT|2021Q3|    COM|    7358443|   476355|   1314452|          0|
|              AT|2021Q4|    COM|    7358443|   170220|    130103|          0|
|              AT|2021Q1|    MOD|    7358443|   116782|     23929|          0|
|              AT|2021Q2|    MOD|    7358443|   3670

In [15]:
# Hallar datos para cantidad de dosis completa vacunación
from pyspark.sql.functions import col, expr, when

Fully_vaccine = when(col("Vaccine") == 'JANSS',  (VaccinationQ.FirstDose + VaccinationQ.UnknownDose)) \
                .when(col("Vaccine") != 'JANSS', VaccinationQ.SecondDose)

In [16]:
VaccinationQF = VaccinationQ.withColumn("Fully_vaccine", Fully_vaccine)

In [17]:
VaccinationQF.show(10)

+----------------+------+-------+-----------+---------+----------+-----------+-------------+
|ReportingCountry| DateQ|Vaccine|Denominator|FirstDose|SecondDose|UnknownDose|Fully_vaccine|
+----------------+------+-------+-----------+---------+----------+-----------+-------------+
|              AT|2021Q1|     AZ|    7358443|   421739|       154|          0|          154|
|              AT|2021Q2|     AZ|    7358443|   379998|    578781|          0|       578781|
|              AT|2021Q3|     AZ|    7358443|     6429|    185758|          0|       185758|
|              AT|2021Q4|     AZ|    7358443|      241|       723|          0|          723|
|              AT|2021Q1|    COM|    7358443|   784947|    492275|          0|       492275|
|              AT|2021Q2|    COM|    7358443|  2629100|   1872751|          0|      1872751|
|              AT|2021Q3|    COM|    7358443|   476355|   1314452|          0|      1314452|
|              AT|2021Q4|    COM|    7358443|   170220|    130103|    

In [18]:
VaccinationQF.groupBy('ReportingCountry', 'DateQ', 'Denominator').sum('Fully_vaccine').sort('ReportingCountry', 'DateQ').show()

+----------------+------+-----------+------------------+
|ReportingCountry| DateQ|Denominator|sum(Fully_vaccine)|
+----------------+------+-----------+------------------+
|              AT|2021Q1|    7358443|            516528|
|              AT|2021Q2|    7358443|           2877588|
|              AT|2021Q3|    7358443|           1865563|
|              AT|2021Q4|    7358443|            156344|
|              BE|2021Q1|    9202196|            607894|
|              BE|2021Q2|    9202196|           3608357|
|              BE|2021Q3|    9202196|           3644522|
|              BE|2021Q4|    9202196|             90022|
|              BG|2020Q4|    5761802|                 0|
|              BG|2021Q1|    5761802|            100880|
|              BG|2021Q2|    5761802|            759696|
|              BG|2021Q3|    5761802|            468464|
|              BG|2021Q4|    5761802|            225712|
|              CY|2020Q4|     717452|                 0|
|              CY|2021Q1|     7

In [19]:
VaccinationQF = VaccinationQF.withColumnRenamed('ReportingCountry','CountryCode') \
                .groupBy('CountryCode', 'DateQ', 'Denominator') \
                .agg(sum('Fully_vaccine').alias('Full_vaccine')) \
                .sort('CountryCode', 'DateQ')

In [20]:
VaccinationQF.show(200)

+-----------+------+-----------+------------+
|CountryCode| DateQ|Denominator|Full_vaccine|
+-----------+------+-----------+------------+
|         AT|2021Q1|    7358443|      516528|
|         AT|2021Q2|    7358443|     2877588|
|         AT|2021Q3|    7358443|     1865563|
|         AT|2021Q4|    7358443|      156344|
|         BE|2021Q1|    9202196|      607894|
|         BE|2021Q2|    9202196|     3608357|
|         BE|2021Q3|    9202196|     3644522|
|         BE|2021Q4|    9202196|       90022|
|         BG|2020Q4|    5761802|           0|
|         BG|2021Q1|    5761802|      100880|
|         BG|2021Q2|    5761802|      759696|
|         BG|2021Q3|    5761802|      468464|
|         BG|2021Q4|    5761802|      225712|
|         CY|2020Q4|     717452|           0|
|         CY|2021Q1|     717452|       46697|
|         CY|2021Q2|     717452|      338651|
|         CY|2021Q3|     717452|      173689|
|         CY|2021Q4|     717452|       13700|
|         CZ|2020Q4|    8694474|  

In [21]:
#Hallar acumulado
from pyspark.sql import Window


windowval = (Window.partitionBy('CountryCode').orderBy('CountryCode', 'DateQ')
             .rangeBetween(Window.unboundedPreceding, 0))
VaccineFull = VaccinationQF.withColumn('Full_vac_acumulado', func.sum('Full_vaccine').over(windowval))
VaccineFull.show()

+-----------+------+-----------+------------+------------------+
|CountryCode| DateQ|Denominator|Full_vaccine|Full_vac_acumulado|
+-----------+------+-----------+------------+------------------+
|         LT|2020Q4|    2295269|           0|                 0|
|         LT|2021Q1|    2295269|      176354|            176354|
|         LT|2021Q2|    2295269|      896086|           1072440|
|         LT|2021Q3|    2295269|      518153|           1590593|
|         LT|2021Q4|    2295269|       67487|           1658080|
|         FI|2020Q4|    4476235|           0|                 0|
|         FI|2021Q1|    4476235|       92618|             92618|
|         FI|2021Q2|    4476235|     1067984|           1160602|
|         FI|2021Q3|    4476235|     2276257|           3436859|
|         FI|2021Q4|    4476235|      266169|           3703028|
|         RO|2021Q1|   15684219|     1198317|           1198317|
|         RO|2021Q2|   15684219|     3349305|           4547622|
|         RO|2021Q3|   15

In [22]:
VaccineFull.show(200)

+-----------+------+-----------+------------+------------------+
|CountryCode| DateQ|Denominator|Full_vaccine|Full_vac_acumulado|
+-----------+------+-----------+------------+------------------+
|         LT|2020Q4|    2295269|           0|                 0|
|         LT|2021Q1|    2295269|      176354|            176354|
|         LT|2021Q2|    2295269|      896086|           1072440|
|         LT|2021Q3|    2295269|      518153|           1590593|
|         LT|2021Q4|    2295269|       67487|           1658080|
|         FI|2020Q4|    4476235|           0|                 0|
|         FI|2021Q1|    4476235|       92618|             92618|
|         FI|2021Q2|    4476235|     1067984|           1160602|
|         FI|2021Q3|    4476235|     2276257|           3436859|
|         FI|2021Q4|    4476235|      266169|           3703028|
|         RO|2021Q1|   15684219|     1198317|           1198317|
|         RO|2021Q2|   15684219|     3349305|           4547622|
|         RO|2021Q3|   15

In [23]:
VaccineFull = VaccineFull.withColumn('Vac_perc_acum', (VaccineFull.Full_vac_acumulado * 100) / VaccineFull.Denominator) \
                .withColumn('Vac_perc', (VaccineFull.Full_vaccine * 100) / VaccineFull.Denominator) 
                

In [24]:
VaccineFull.show()

+-----------+------+-----------+------------+------------------+------------------+------------------+
|CountryCode| DateQ|Denominator|Full_vaccine|Full_vac_acumulado|     Vac_perc_acum|          Vac_perc|
+-----------+------+-----------+------------+------------------+------------------+------------------+
|         LT|2020Q4|    2295269|           0|                 0|               0.0|               0.0|
|         LT|2021Q1|    2295269|      176354|            176354|   7.6833695745466|   7.6833695745466|
|         LT|2021Q2|    2295269|      896086|           1072440| 46.72393519016725| 39.04056561562065|
|         LT|2021Q3|    2295269|      518153|           1590593| 69.29876193160801|22.574826741440763|
|         LT|2021Q4|    2295269|       67487|           1658080| 72.23902732098068| 2.940265389372662|
|         FI|2020Q4|    4476235|           0|                 0|               0.0|               0.0|
|         FI|2021Q1|    4476235|       92618|             92618|2.0691049

In [25]:
VaccineFull = VaccineFull.select("*", func.round(col('Vac_perc'),2).alias('percentage'))
VaccineFull = VaccineFull.select("*", func.round(col('Vac_perc_acum'),2).alias('perc_acum'))
VaccineFull = VaccineFull.select('CountryCode', 'DateQ', 'Denominator', 'Full_vaccine', 'Full_vac_acumulado', 'percentage', 'perc_acum')
VaccineFull.show()

+-----------+------+-----------+------------+------------------+----------+---------+
|CountryCode| DateQ|Denominator|Full_vaccine|Full_vac_acumulado|percentage|perc_acum|
+-----------+------+-----------+------------+------------------+----------+---------+
|         LT|2020Q4|    2295269|           0|                 0|       0.0|      0.0|
|         LT|2021Q1|    2295269|      176354|            176354|      7.68|     7.68|
|         LT|2021Q2|    2295269|      896086|           1072440|     39.04|    46.72|
|         LT|2021Q3|    2295269|      518153|           1590593|     22.57|     69.3|
|         LT|2021Q4|    2295269|       67487|           1658080|      2.94|    72.24|
|         FI|2020Q4|    4476235|           0|                 0|       0.0|      0.0|
|         FI|2021Q1|    4476235|       92618|             92618|      2.07|     2.07|
|         FI|2021Q2|    4476235|     1067984|           1160602|     23.86|    25.93|
|         FI|2021Q3|    4476235|     2276257|         

In [26]:
validDates= ['2021Q1', '2021Q2', '2021Q3']
completeVaccine = VaccineFull.filter(VaccineFull.DateQ.isin(validDates))


In [27]:
from MyFunctions import EUcountries
ps_EU = spark.createDataFrame(EUcountries())

In [28]:
ps_EU.show()

+----+----------+--------------+
|Code|   Country|Accession_year|
+----+----------+--------------+
|  AT|   Austria|          1995|
|  BE|   Belgium|          1958|
|  BG|  Bulgaria|          2007|
|  CY|    Cyprus|          2004|
|  CZ|   Czechia|          2004|
|  DE|   Germany|          1958|
|  DK|   Denmark|          1973|
|  EE|   Estonia|          2004|
|  ES|     Spain|          1986|
|  FI|   Finland|          1995|
|  FR|    France|          1958|
|  EL|    Greece|          1981|
|  HR|   Croatia|          2013|
|  HU|   Hungary|          2004|
|  IE|   Ireland|          1973|
|  IT|     Italy|          1958|
|  LT| Lithuania|          2004|
|  LU|Luxembourg|          1958|
|  LV|    Latvia|          2004|
|  MT|     Malta|          2004|
+----+----------+--------------+
only showing top 20 rows



In [29]:
completeVaccine.join(ps_EU, completeVaccine.CountryCode == ps_EU.Code, 'outer').show()

+-----------+------+-----------+------------+------------------+----------+---------+----+-----------+--------------+
|CountryCode| DateQ|Denominator|Full_vaccine|Full_vac_acumulado|percentage|perc_acum|Code|    Country|Accession_year|
+-----------+------+-----------+------------+------------------+----------+---------+----+-----------+--------------+
|         LT|2021Q1|    2295269|      176354|            176354|      7.68|     7.68|  LT|  Lithuania|          2004|
|         LT|2021Q2|    2295269|      896086|           1072440|     39.04|    46.72|  LT|  Lithuania|          2004|
|         LT|2021Q3|    2295269|      518153|           1590593|     22.57|     69.3|  LT|  Lithuania|          2004|
|         FI|2021Q1|    4476235|       92618|             92618|      2.07|     2.07|  FI|    Finland|          1995|
|         FI|2021Q2|    4476235|     1067984|           1160602|     23.86|    25.93|  FI|    Finland|          1995|
|         FI|2021Q3|    4476235|     2276257|           

In [30]:
completeVaccineF = completeVaccine.join(ps_EU, completeVaccine.CountryCode == ps_EU.Code, 'outer')
completeVaccineF = completeVaccineF.select('Code', 'Country', 'DateQ', 'Denominator', 'Full_vaccine', 'Full_vac_acumulado', 'percentage', 'perc_acum')
completeVaccineF.show()

+----+-----------+------+-----------+------------+------------------+----------+---------+
|Code|    Country| DateQ|Denominator|Full_vaccine|Full_vac_acumulado|percentage|perc_acum|
+----+-----------+------+-----------+------------+------------------+----------+---------+
|  LT|  Lithuania|2021Q1|    2295269|      176354|            176354|      7.68|     7.68|
|  LT|  Lithuania|2021Q2|    2295269|      896086|           1072440|     39.04|    46.72|
|  LT|  Lithuania|2021Q3|    2295269|      518153|           1590593|     22.57|     69.3|
|  FI|    Finland|2021Q1|    4476235|       92618|             92618|      2.07|     2.07|
|  FI|    Finland|2021Q2|    4476235|     1067984|           1160602|     23.86|    25.93|
|  FI|    Finland|2021Q3|    4476235|     2276257|           3436859|     50.85|    76.78|
|  RO|    Romania|2021Q1|   15684219|     1198317|           1198317|      7.64|     7.64|
|  RO|    Romania|2021Q2|   15684219|     3349305|           4547622|     21.35|    28.99|

In [31]:
#Guardo DF con resultados perc en hdfs /TFM_CEE/output
completeVaccine.write.mode('overwrite').parquet("hdfs://localhost:9000/TFM_CEE/output/completeVaccine.parquet")