**Query** <br>
Individuare le regioni che hanno in almeno un mese dall’inizio della pandemia almeno due giorni
(anche non consecutivi) con un numero di nuovi positivi maggiore o uguale a 50.

**Dataset** <br>
Il dataset è organizzato secondo il seguente schema: <br>
data | stato| codice_regione | denominazione_regione | lat | long | ricoverati_con_sintomi | terapia_intensiva | totale_ospedalizzati | isolamento_domiciliare | totale_positivi | variazione_totale_positivi | nuovi_positivi | dimessi_guariti | deceduti | totale_casi | tamponi | casi_testati | note_it | note_en

In [1]:
%%capture
!pip install pyspark

In [2]:
from datetime import datetime
from pyspark import SparkContext

In [3]:
# DOWNLOAD FILE CSV
!wget https://raw.githubusercontent.com/pcm-dpc/COVID-19/master/dati-regioni/dpc-covid19-ita-regioni.csv

--2021-01-18 10:13:39--  https://raw.githubusercontent.com/pcm-dpc/COVID-19/master/dati-regioni/dpc-covid19-ita-regioni.csv
Resolving raw.githubusercontent.com (raw.githubusercontent.com)... 151.101.0.133, 151.101.64.133, 151.101.128.133, ...
Connecting to raw.githubusercontent.com (raw.githubusercontent.com)|151.101.0.133|:443... connected.
HTTP request sent, awaiting response... 200 OK
Length: 974734 (952K) [text/plain]
Saving to: ‘dpc-covid19-ita-regioni.csv’


2021-01-18 10:13:40 (15.2 MB/s) - ‘dpc-covid19-ita-regioni.csv’ saved [974734/974734]



In [4]:
sc = SparkContext("local[2]", "covid1")

In [5]:
# IMPORTO DATASET RIMUOVENDO L'HEADER
rawData = sc.textFile("dpc-covid19-ita-regioni.csv")
header = rawData.first()
rawData = rawData.filter(lambda line: line != header)

In [6]:
rawData.take(5)

['2020-02-24T18:00:00,ITA,13,Abruzzo,42.35122196,13.39843823,0,0,0,0,0,0,0,0,0,,,0,5,,,,,',
 '2020-02-24T18:00:00,ITA,17,Basilicata,40.63947052,15.80514834,0,0,0,0,0,0,0,0,0,,,0,0,,,,,',
 '2020-02-24T18:00:00,ITA,18,Calabria,38.90597598,16.59440194,0,0,0,0,0,0,0,0,0,,,0,1,,,,,',
 '2020-02-24T18:00:00,ITA,15,Campania,40.83956555,14.25084984,0,0,0,0,0,0,0,0,0,,,0,10,,,,,',
 '2020-02-24T18:00:00,ITA,08,Emilia-Romagna,44.49436681,11.341720800000001,10,2,12,6,18,0,18,0,0,,,18,148,,,,,']

In [7]:
data = rawData.map(lambda line:line.split(',')).filter(lambda x : int(x[12])>=50).map(lambda x : ((str(x[3]),datetime.strptime(x[0],'%Y-%m-%dT%H:%M:%S').month), 1))

In [8]:
data.take(10)

[(('Lombardia', 2), 1),
 (('Lombardia', 2), 1),
 (('Emilia-Romagna', 2), 1),
 (('Lombardia', 2), 1),
 (('Lombardia', 2), 1),
 (('Emilia-Romagna', 2), 1),
 (('Lombardia', 2), 1),
 (('Emilia-Romagna', 3), 1),
 (('Lombardia', 3), 1),
 (('Veneto', 3), 1)]

In [9]:
query = data.reduceByKey(lambda a,b : a + b).filter(lambda x : x[1] >= 2).map(lambda x: (x[0][0], 1)).reduceByKey(lambda a,b : a + b).sortByKey()

In [10]:
query.take(100)

[('Abruzzo', 7),
 ('Basilicata', 4),
 ('Calabria', 5),
 ('Campania', 8),
 ('Emilia-Romagna', 11),
 ('Friuli Venezia Giulia', 7),
 ('Lazio', 9),
 ('Liguria', 9),
 ('Lombardia', 12),
 ('Marche', 6),
 ('Molise', 4),
 ('P.A. Bolzano', 7),
 ('P.A. Trento', 7),
 ('Piemonte', 9),
 ('Puglia', 8),
 ('Sardegna', 7),
 ('Sicilia', 8),
 ('Toscana', 9),
 ('Umbria', 5),
 ("Valle d'Aosta", 4),
 ('Veneto', 10)]

In [11]:
query.coalesce(1).saveAsTextFile("output_covid1")
sc.stop()

In [13]:
!ls

dpc-covid19-ita-regioni.csv  output_covid1  sample_data


In [14]:
!cat output_covid1/part-00000

('Abruzzo', 7)
('Basilicata', 4)
('Calabria', 5)
('Campania', 8)
('Emilia-Romagna', 11)
('Friuli Venezia Giulia', 7)
('Lazio', 9)
('Liguria', 9)
('Lombardia', 12)
('Marche', 6)
('Molise', 4)
('P.A. Bolzano', 7)
('P.A. Trento', 7)
('Piemonte', 9)
('Puglia', 8)
('Sardegna', 7)
('Sicilia', 8)
('Toscana', 9)
('Umbria', 5)
("Valle d'Aosta", 4)
('Veneto', 10)


In [15]:
!rm -r output_covid1