#Homework 2 - Spark

## Lettura dei dati

In [0]:
import requests

from pyspark.sql.functions import isnan, when, count, isnull

In [0]:
local_path_popolazione_regioni = "dbfs:/FileStore/shared_uploads/gu.dichiara@studenti.unina.it/popolazione_regioni.csv"
local_path_dati_covid_regioni = "dbfs:/FileStore/shared_uploads/gu.dichiara@studenti.unina.it/dati_covid_regioni.csv"
local_path_dati_vaccini_somministrazioni = "dbfs:/FileStore/shared_uploads/gu.dichiara@studenti.unina.it/dati_vaccini_somministrazioni.csv"
local_path_dati_vaccini_consegne = "dbfs:/FileStore/shared_uploads/gu.dichiara@studenti.unina.it/dati_vaccini_consegne.csv"

# local_path_popolazione_regioni = "dbfs:/FileStore/shared_uploads/fabio.dandrea@studenti.unina.it/popolazione_regioni.csv"
# local_path_dati_covid_regioni = "dbfs:/FileStore/shared_uploads/fabio.dandrea@studenti.unina.it/dati_covid_regioni.csv"
# local_path_dati_vaccini_somministrazioni = "dbfs:/FileStore/shared_uploads/fabio.dandrea@studenti.unina.it/dati_vaccini_somministrazioni.csv"
# local_path_dati_vaccini_consegne = "dbfs:/FileStore/shared_uploads/fabio.dandrea@studenti.unina.it/dati_vaccini_consegne.csv"

popolazione_regioni_URL = "https://raw.githubusercontent.com/pcm-dpc/COVID-19/master/dati-statistici-riferimento/popolazione-istat-regione-range.csv"
dati_covid_regioni_URL = "https://raw.githubusercontent.com/pcm-dpc/COVID-19/master/dati-regioni/dpc-covid19-ita-regioni.csv"
dati_vaccini_somministrazioni_URL = "https://raw.githubusercontent.com/italia/covid19-opendata-vaccini/master/dati/somministrazioni-vaccini-latest.csv"
dati_vaccini_consegne_URL = "https://raw.githubusercontent.com/italia/covid19-opendata-vaccini/master/dati/consegne-vaccini-latest.csv"

dbutils.fs.put(local_path_popolazione_regioni, requests.get(popolazione_regioni_URL).text, True)
dbutils.fs.put(local_path_dati_covid_regioni, requests.get(dati_covid_regioni_URL).text, True)
dbutils.fs.put(local_path_dati_vaccini_somministrazioni, requests.get(dati_vaccini_somministrazioni_URL).text, True)
dbutils.fs.put(local_path_dati_vaccini_consegne, requests.get(dati_vaccini_consegne_URL).text, True)

popolazione_regioni = spark.read.option("inferSchema", "true") \
                                .option("header", "true") \
                                .csv(local_path_popolazione_regioni)
dati_covid_regioni = spark.read.option("inferSchema", "true") \
                                .option("header", "true") \
                                .csv(local_path_dati_covid_regioni)
dati_vaccini_somministrazioni = spark.read.option("inferSchema", "true") \
                                          .option("header", "true") \
                                          .csv(local_path_dati_vaccini_somministrazioni)
dati_vaccini_consegne = spark.read.option("inferSchema", "true") \
                                  .option("header", "true") \
                                  .csv(local_path_dati_vaccini_consegne)

## Preprocessing

### Popolazione Regioni
Si eliminano le colonne che non sono di interesse.

In [0]:
popolazione_regioni = popolazione_regioni.drop('codice_regione', 'latitudine_regione', 'longitudine_regione')
popolazione_regioni.show()

# popolazione_regioni.groupBy('denominazione_regione').sum('totale_generale').orderBy('sum(totale_generale)', ascending=False).show()

Si verifica che il formato dei dati sia corretto.

In [0]:
popolazione_regioni.dtypes

Si verifica la correttezza delle colonne categoriche.

In [0]:
num_regioni = popolazione_regioni.groupBy('denominazione_regione') \
                .count() \
                .count()
print('Il numero di regioni è:', num_regioni)
print()
popolazione_regioni.groupBy('codice_nuts_1', 'descrizione_nuts_1', 'codice_nuts_2', 'denominazione_regione', 'sigla_regione') \
                    .count() \
                    .drop('count') \
                    .orderBy('descrizione_nuts_1') \
                    .show(num_regioni)

In [0]:
popolazione_regioni.groupBy('range_eta') \
                    .count() \
                    .orderBy('range_eta') \
                    .show()

Si verifica la correttezza delle colonne numeriche.

In [0]:
popolazione_regioni.describe(['totale_genere_maschile', 'totale_genere_femminile', 'totale_generale']) \
                    .show()

Si verifica la presenza di *missing values*.

In [0]:
popolazione_regioni.select([count(when(isnull(c), c)).alias(c) for c in popolazione_regioni.columns]).show()

### Dati Covid Regioni
Si eliminano le colonne che non sono di interesse.

In [0]:
dati_covid_regioni = dati_covid_regioni.drop('stato', 'codice_regione', 'lat', 'long', 'note_test', 'note_casi', 'note',
                                             'casi_da_sospetto_diagnostico', 'casi_da_screening',
                                             'casi_testati', 'ingressi_terapia_intensiva', 
                                             'totale_positivi_test_molecolare', 'totale_positivi_test_antigenico_rapido',
                                             'tamponi_test_molecolare', 'tamponi_test_antigenico_rapido')
dati_covid_regioni.show()

Si verifica che il formato dei dati sia corretto.

In [0]:
dati_covid_regioni.dtypes

Si verifica la correttezza delle colonne categoriche.

In [0]:
num_regioni = dati_covid_regioni.groupBy('denominazione_regione') \
                .count() \
                .count()
print('Il numero di regioni è:', num_regioni)
print()
dati_covid_regioni.groupBy('codice_nuts_1', 'codice_nuts_2', 'denominazione_regione') \
                    .count() \
                    .drop('count') \
                    .orderBy('codice_nuts_1') \
                    .show(50)

Si possono fare alcune osservazioni:
- per alcune entry mancano i codici nuts;
- per aclune entry i codici nuts sono sbagliati (problema dovuto alla lettura del file csv);
- alcune regioni (Bolzano e Trento) hanno una denominazione diversa rispetto alla tabella `popolazione_regioni`;

In [0]:
join1 = popolazione_regioni.join(dati_covid_regioni, on='denominazione_regione', how='anti') \
                    .groupBy('codice_nuts_2', 'denominazione_regione') \
                    .count() \
                    .drop('count') \
                    .withColumnRenamed('denominazione_regione', 'popolazione_regioni_denominazione_regione')

join2 = dati_covid_regioni.join(popolazione_regioni, on='denominazione_regione', how='anti') \
                    .groupBy('codice_nuts_2', 'denominazione_regione') \
                    .count() \
                    .drop('count') \
                    .withColumnRenamed('denominazione_regione', 'dati_covid_regioni_denominazione_regione')

join1.join(join2, on='codice_nuts_2').show()

dati_covid_regioni = dati_covid_regioni.replace('P.A. Trento', value='Trento', subset='denominazione_regione')
dati_covid_regioni = dati_covid_regioni.replace('P.A. Bolzano', value='Bolzano', subset='denominazione_regione')

In [0]:
dati_covid_regioni = dati_covid_regioni.drop('codice_nuts_1', 'codice_nuts_2')

data = popolazione_regioni.groupBy('codice_nuts_1', 'codice_nuts_2', 'denominazione_regione') \
                          .count() \
                          .drop('count')

dati_covid_regioni = dati_covid_regioni.join(data, on=['denominazione_regione'], how='left')

dati_covid_regioni.groupBy('codice_nuts_1', 'codice_nuts_2', 'denominazione_regione') \
                    .count() \
                    .drop('count') \
                    .orderBy('codice_nuts_1').show(num_regioni)

Si verifica la correttezza delle colonne numeriche.

In [0]:
dati_covid_regioni.describe(['ricoverati_con_sintomi', 'terapia_intensiva', 'totale_ospedalizzati', 'dimessi_guariti']) \
                  .show()

In [0]:
dati_covid_regioni.describe(['isolamento_domiciliare', 'totale_positivi', 'variazione_totale_positivi', 'nuovi_positivi', 'deceduti', 'totale_casi', 'tamponi']) \
                  .show()

In [0]:
dati_covid_regioni.filter(condition='nuovi_positivi < 0').show()

Si verifica la presenza di *missing values*.

In [0]:
dati_covid_regioni.select([count(when(isnull(c), c)).alias(c) for c in dati_covid_regioni.drop('data').columns]).show()

### Dati Somministrazioni Vaccini

Si elmininano le colonne che non sono di interesse

In [0]:
dati_vaccini_somministrazioni = dati_vaccini_somministrazioni.drop('area', 'codice_regione_ISTAT')

dati_vaccini_somministrazioni.show()

Si verifica che il formato dei dati sia corretto

In [0]:
dati_vaccini_somministrazioni.dtypes

Si verifica la correttezza delle colonne categoriche.

In [0]:
num_regioni = dati_vaccini_somministrazioni.groupBy('nome_area') \
                                           .count() \
                                           .count()
print('Il numero di regioni è:', num_regioni)
print()
dati_vaccini_somministrazioni.groupBy('codice_NUTS1', 'codice_NUTS2', 'nome_area') \
                             .count() \
                             .drop('count') \
                             .orderBy('codice_NUTS1') \
                             .show(50)

In [0]:
join1 = popolazione_regioni.join(dati_vaccini_somministrazioni, popolazione_regioni.denominazione_regione == dati_vaccini_somministrazioni.nome_area, how='anti') \
                    .groupBy('codice_nuts_2', 'denominazione_regione') \
                    .count() \
                    .drop('count') 

join2 = dati_vaccini_somministrazioni.join(popolazione_regioni, popolazione_regioni.denominazione_regione == dati_vaccini_somministrazioni.nome_area, how='anti') \
                    .groupBy('codice_NUTS2', 'nome_area') \
                    .count() \
                    .drop('count') 

join1.join(join2, popolazione_regioni.codice_nuts_2 == dati_vaccini_somministrazioni.codice_NUTS2).show()

dati_vaccini_somministrazioni = dati_vaccini_somministrazioni.replace('Provincia Autonoma Trento', value='Trento', subset='nome_area')
dati_vaccini_somministrazioni = dati_vaccini_somministrazioni.replace('Provincia Autonoma Bolzano / Bozen', value='Bolzano', subset='nome_area')
dati_vaccini_somministrazioni = dati_vaccini_somministrazioni.replace('Friuli-Venezia Giulia', value='Friuli Venezia Giulia', subset='nome_area')
dati_vaccini_somministrazioni = dati_vaccini_somministrazioni.replace('Valle d\'Aosta / Vallée d\'Aoste', value='Valle d\'Aosta', subset='nome_area')

In [0]:
dati_vaccini_somministrazioni.groupBy('codice_NUTS1', 'codice_NUTS2', 'nome_area') \
                             .count() \
                             .drop('count') \
                             .orderBy('codice_NUTS1') \
                             .show(num_regioni)

In [0]:
dati_vaccini_somministrazioni.groupBy('fascia_anagrafica') \
                    .count() \
                    .drop('count') \
                    .orderBy('fascia_anagrafica') \
                    .show()

Si osserva che rispetto alla tabella popolazione_regioni manca la `fascia_anagrafica` 0-15

Si verifica la correttezza delle colonne numeriche.

In [0]:
dati_vaccini_somministrazioni.describe(['sesso_maschile', 'sesso_femminile', 'prima_dose', 'seconda_dose']) \
                             .show()

In [0]:
dati_vaccini_somministrazioni.describe(['categoria_operatori_sanitari_sociosanitari', 'categoria_personale_non_sanitario', 'categoria_ospiti_rsa', 'categoria_60_69', 'categoria_70_79', 'categoria_over80', 'categoria_forze_armate', 'categoria_personale_scolastico', 'categoria_soggetti_fragili', 'categoria_altro']) \
                             .show()

Si verifica la presenza di *missing values*.

In [0]:
dati_vaccini_somministrazioni.select([count(when(isnull(c), c)).alias(c) for c in dati_vaccini_somministrazioni.drop('data').columns]).show()

## Analisi

### Percentuale popolazione vaccinata per regione
Si riporta la percentuale di popolazione vaccinata (prima e seconda dose) per regione.

In [0]:
dosi_somministrate_regione = dati_vaccini_somministrazioni.groupBy('nome_area') \
                                                          .sum('prima_dose', 'seconda_dose') \
                                                          .orderBy('nome_area') \
                                                          .withColumnRenamed('nome_area', 'regione') \
                                                          .withColumnRenamed('sum(prima_dose)', 'vaccinati_prima_dose') \
                                                          .withColumnRenamed('sum(seconda_dose)', 'vaccinati_seconda_dose')

popolazione = popolazione_regioni.groupBy('denominazione_regione').sum('totale_generale')

cond = [popolazione.denominazione_regione == dosi_somministrate_regione.regione]
dosi_somministrate_regione = dosi_somministrate_regione.join(popolazione, cond, how='left') \
                                                       .drop('denominazione_regione') \
                                                       .withColumnRenamed('sum(totale_generale)', 'popolazione') \
                                                       .orderBy('regione')

dosi_somministrate_regione = dosi_somministrate_regione.withColumn('percentuale_prima_dose', dosi_somministrate_regione.vaccinati_prima_dose / dosi_somministrate_regione.popolazione * 100)
dosi_somministrate_regione = dosi_somministrate_regione.withColumn('percentuale_seconda_dose', dosi_somministrate_regione.vaccinati_seconda_dose / dosi_somministrate_regione.popolazione * 100)

dosi_somministrate_regione.show(21)

### Percentuale popolazione vaccinata per fascia anagrafica
Si riporta la percentuale di popolazione vaccinata (prima e seconda dose) per fascia anagrafica.

In [0]:
dosi_somministrate_fascia = dati_vaccini_somministrazioni.groupBy('fascia_anagrafica') \
                                                          .sum('prima_dose', 'seconda_dose') \
                                                          .orderBy('fascia_anagrafica') \
                                                          .withColumnRenamed('sum(prima_dose)', 'vaccinati_prima_dose') \
                                                          .withColumnRenamed('sum(seconda_dose)', 'vaccinati_seconda_dose')

popolazione = popolazione_regioni.groupBy('range_eta').sum('totale_generale')

cond = [popolazione.range_eta == dosi_somministrate_fascia.fascia_anagrafica]
dosi_somministrate_fascia = dosi_somministrate_fascia.join(popolazione, cond, how='left') \
                                                       .withColumnRenamed('sum(totale_generale)', 'popolazione') \
                                                       .drop('range_eta') \
                                                       .orderBy('fascia_anagrafica')

dosi_somministrate_fascia = dosi_somministrate_fascia.withColumn('percentuale_prima_dose', dosi_somministrate_fascia.vaccinati_prima_dose / dosi_somministrate_fascia.popolazione * 100)
dosi_somministrate_fascia = dosi_somministrate_fascia.withColumn('percentuale_seconda_dose', dosi_somministrate_fascia.vaccinati_seconda_dose / dosi_somministrate_fascia.popolazione * 100)

dosi_somministrate_fascia.show(9)

### Percentuale popolazione vaccinata per regione e fascia anagrafica
Si riporta la percentuale di popolazione vaccinata (prima e seconda dose) per regione e fascia anagrafica.

In [0]:
dosi_somministrate = dati_vaccini_somministrazioni.groupBy('nome_area', 'fascia_anagrafica') \
                                                  .sum('prima_dose', 'seconda_dose') \
                                                  .orderBy('nome_area', 'fascia_anagrafica') \
                                                  .withColumnRenamed('nome_area', 'regione') \
                                                  .withColumnRenamed('sum(prima_dose)', 'vaccinati_prima_dose') \
                                                  .withColumnRenamed('sum(seconda_dose)', 'vaccinati_seconda_dose')

cond = [popolazione_regioni.denominazione_regione == dosi_somministrate.regione, popolazione_regioni.range_eta == dosi_somministrate.fascia_anagrafica]
dosi_somministrate = dosi_somministrate.join(popolazione_regioni.select('denominazione_regione', 'range_eta', 'totale_generale'), cond, how='left') \
                                       .drop('denominazione_regione', 'range_eta') \
                                       .withColumnRenamed('totale_generale', 'popolazione') \
                                       .orderBy('regione', 'fascia_anagrafica')

dosi_somministrate = dosi_somministrate.withColumn('percentuale_prima_dose', dosi_somministrate.vaccinati_prima_dose / dosi_somministrate.popolazione * 100)
dosi_somministrate = dosi_somministrate.withColumn('percentuale_seconda_dose', dosi_somministrate.vaccinati_seconda_dose / dosi_somministrate.popolazione * 100)

dosi_somministrate.show(189)