<a href="https://colab.research.google.com/github/chrisjonesfaa/lumsa1/blob/main/Vaccini.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

In [None]:
# Install pyspark
!pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.1.tar.gz (281.4 MB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m5.3 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
Collecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 KB[0m [31m23.0 MB/s[0m eta [36m0:00:00[0m
[?25hBuilding wheels for collected packages: pyspark
  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
  Created wheel for pyspark: filename=pyspark-3.3.1-py2.py3-none-any.whl size=281845512 sha256=139901df7863146b0b2f0faf36adfbdfa80d13b9aee4f5785f5bbf8574660ac9
  Stored in directory: /root/.cache/pip/wheels/43/dc/11/ec201cd671da62fa9c5cc77078235e40722170ceba231d7598
Successfully built pyspark
Installing collected packages: py4j, pyspa

In [None]:
# Import SparkSession
from pyspark.sql import SparkSession

In [None]:
# Create a Spark Session
spark = SparkSession.builder.master("local[*]").getOrCreate()

In [32]:
from pyspark.sql import functions as F
from pyspark.sql import Window #Funzioni per operare meglio sulle righe

def primary_vaccine_count(df):
    """
    Per ogni tipologia di vaccino, contare quante nazioni lo hanno somministrato come vaccino principale
    """
    # Creiamo una "window" per trovare il numero massimo di vaccinazioni per locazione
    window = Window.partitionBy(df['location']).orderBy(df['total_vaccinations'].desc())

    # Aggiungiamo una colonna con il max totale di vaccinazioni per paese
    df = df.withColumn('max_vaccination', F.max(df['total_vaccinations']).over(window))

    # Filtriamo le righe che hanno il total_vaccinations uguale al max_vaccination
    primary_vaccines = df.where(df['total_vaccinations'] == df['max_vaccination']).select(df['location'], df['vaccine'])

    # Rimuoviamo i duplicati con "distinct"
    primary_vaccines = primary_vaccines.distinct()

    # Contiamo il numero di nazioni per vaccino
    primary_vaccines_count = primary_vaccines.groupby('vaccine').agg(F.countDistinct('location').alias('count'))
    
    return primary_vaccines_count

df = spark.read.csv("vaccini.csv", inferSchema=True, header=True)
result = primary_vaccine_count(df)
result.show()

+-----------------+-----+
|          vaccine|count|
+-----------------+-----+
|          Sinovac|    3|
|          Moderna|    2|
|  Pfizer/BioNTech|   36|
|Sinopharm/Beijing|    2|
+-----------------+-----+



In [33]:
from pyspark.sql import functions as F
from pyspark.sql import Window #Funzioni per operare meglio sulle righe

def highest_vaccination_date(df):
    """
    Per ogni nazione identificare la data in cui sono stati somministrati più vaccini (a prescindere dal tipo)
    """
    # Funzione Window per trovare il valore massimo di vaccini somministrati per paese
    window = Window.partitionBy(df['location']).orderBy(df['total_vaccinations'].desc())

    # Aggiungo una colonna con il valore massimo di vaccini per paese
    df = df.withColumn('max_vaccination', F.max(df['total_vaccinations']).over(window))

    # Selezioniamo solo le righe dove total_vaccinations e' uguale a max_vaccinations con la propria data
    max_vaccination_date = df.where(df['total_vaccinations'] == df['max_vaccination']).select(df['location'], df['date'])
    
    # Filtriamo i duplicati nel caso ci fossero
    max_vaccination_date = max_vaccination_date.distinct()
    
    return max_vaccination_date

df = spark.read.csv("vaccini.csv", inferSchema=True, header=True) #carico il csv vaccini
result = highest_vaccination_date(df)
# Il risultato finale mostra 2+ date in alcuni casi perche combaciano il numero totale di vaccini somministrati
result.show()


+--------------+-------------------+
|      location|               date|
+--------------+-------------------+
|     Argentina|2022-03-28 00:00:00|
|     Argentina|2022-03-29 00:00:00|
|       Austria|2022-03-25 00:00:00|
|       Belgium|2022-03-18 00:00:00|
|       Belgium|2022-03-25 00:00:00|
|      Bulgaria|2022-03-18 00:00:00|
|         Chile|2022-03-22 00:00:00|
|       Croatia|2022-03-25 00:00:00|
|        Cyprus|2022-03-18 00:00:00|
|       Czechia|2022-03-29 00:00:00|
|       Denmark|2022-03-25 00:00:00|
|       Ecuador|2022-01-28 00:00:00|
|       Estonia|2022-03-18 00:00:00|
|European Union|2022-03-29 00:00:00|
|       Finland|2022-03-25 00:00:00|
|        France|2022-03-28 00:00:00|
|       Germany|2022-03-29 00:00:00|
|     Hong Kong|2022-03-27 00:00:00|
|       Hungary|2022-03-18 00:00:00|
|       Iceland|2022-03-28 00:00:00|
+--------------+-------------------+
only showing top 20 rows

