# PySpark Tutorial

## Part 1: Exploring the data

In [1]:
from pyspark.sql import SparkSession

Create spark session and import the data from the csv.

In [3]:
spark = SparkSession.builder.appName('Dataframe').getOrCreate()
spark

In [4]:
df = spark.read.options(header='True').csv('country_vaccinations.csv', inferSchema=True)
df.show()

                                                                                

+-----------+--------+-------------------+------------------+-----------------+-----------------------+----------------------+------------------+------------------------------+-----------------------------+-----------------------------------+------------------------------+--------------------+--------------------+--------------------+
|    country|iso_code|               date|total_vaccinations|people_vaccinated|people_fully_vaccinated|daily_vaccinations_raw|daily_vaccinations|total_vaccinations_per_hundred|people_vaccinated_per_hundred|people_fully_vaccinated_per_hundred|daily_vaccinations_per_million|            vaccines|         source_name|      source_website|
+-----------+--------+-------------------+------------------+-----------------+-----------------------+----------------------+------------------+------------------------------+-----------------------------+-----------------------------------+------------------------------+--------------------+--------------------+-----------

Check the schema of the dataframe

In [5]:
df.printSchema()

root
 |-- country: string (nullable = true)
 |-- iso_code: string (nullable = true)
 |-- date: timestamp (nullable = true)
 |-- total_vaccinations: double (nullable = true)
 |-- people_vaccinated: double (nullable = true)
 |-- people_fully_vaccinated: double (nullable = true)
 |-- daily_vaccinations_raw: double (nullable = true)
 |-- daily_vaccinations: double (nullable = true)
 |-- total_vaccinations_per_hundred: double (nullable = true)
 |-- people_vaccinated_per_hundred: double (nullable = true)
 |-- people_fully_vaccinated_per_hundred: double (nullable = true)
 |-- daily_vaccinations_per_million: double (nullable = true)
 |-- vaccines: string (nullable = true)
 |-- source_name: string (nullable = true)
 |-- source_website: string (nullable = true)



**Simpler way to import the data with header and infer the schema**

In [6]:
df = spark.read.csv('country_vaccinations.csv', header=True, inferSchema=True)
df.show()

+-----------+--------+-------------------+------------------+-----------------+-----------------------+----------------------+------------------+------------------------------+-----------------------------+-----------------------------------+------------------------------+--------------------+--------------------+--------------------+
|    country|iso_code|               date|total_vaccinations|people_vaccinated|people_fully_vaccinated|daily_vaccinations_raw|daily_vaccinations|total_vaccinations_per_hundred|people_vaccinated_per_hundred|people_fully_vaccinated_per_hundred|daily_vaccinations_per_million|            vaccines|         source_name|      source_website|
+-----------+--------+-------------------+------------------+-----------------+-----------------------+----------------------+------------------+------------------------------+-----------------------------+-----------------------------------+------------------------------+--------------------+--------------------+-----------

### Explore the dataset

In [7]:
display(df.printSchema(), df.columns)

root
 |-- country: string (nullable = true)
 |-- iso_code: string (nullable = true)
 |-- date: timestamp (nullable = true)
 |-- total_vaccinations: double (nullable = true)
 |-- people_vaccinated: double (nullable = true)
 |-- people_fully_vaccinated: double (nullable = true)
 |-- daily_vaccinations_raw: double (nullable = true)
 |-- daily_vaccinations: double (nullable = true)
 |-- total_vaccinations_per_hundred: double (nullable = true)
 |-- people_vaccinated_per_hundred: double (nullable = true)
 |-- people_fully_vaccinated_per_hundred: double (nullable = true)
 |-- daily_vaccinations_per_million: double (nullable = true)
 |-- vaccines: string (nullable = true)
 |-- source_name: string (nullable = true)
 |-- source_website: string (nullable = true)



None

['country',
 'iso_code',
 'date',
 'total_vaccinations',
 'people_vaccinated',
 'people_fully_vaccinated',
 'daily_vaccinations_raw',
 'daily_vaccinations',
 'total_vaccinations_per_hundred',
 'people_vaccinated_per_hundred',
 'people_fully_vaccinated_per_hundred',
 'daily_vaccinations_per_million',
 'vaccines',
 'source_name',
 'source_website']

In [8]:
df.head(5)

[Row(country='Afghanistan', iso_code='AFG', date=datetime.datetime(2021, 2, 22, 0, 0), total_vaccinations=0.0, people_vaccinated=0.0, people_fully_vaccinated=None, daily_vaccinations_raw=None, daily_vaccinations=None, total_vaccinations_per_hundred=0.0, people_vaccinated_per_hundred=0.0, people_fully_vaccinated_per_hundred=None, daily_vaccinations_per_million=None, vaccines='Johnson&Johnson, Oxford/AstraZeneca, Pfizer/BioNTech, Sinopharm/Beijing', source_name='World Health Organization', source_website='https://covid19.who.int/'),
 Row(country='Afghanistan', iso_code='AFG', date=datetime.datetime(2021, 2, 23, 0, 0), total_vaccinations=None, people_vaccinated=None, people_fully_vaccinated=None, daily_vaccinations_raw=None, daily_vaccinations=1367.0, total_vaccinations_per_hundred=None, people_vaccinated_per_hundred=None, people_fully_vaccinated_per_hundred=None, daily_vaccinations_per_million=34.0, vaccines='Johnson&Johnson, Oxford/AstraZeneca, Pfizer/BioNTech, Sinopharm/Beijing', sourc

In [11]:
df.select(['country', 'total_vaccinations']).show()

+-----------+------------------+
|    country|total_vaccinations|
+-----------+------------------+
|Afghanistan|               0.0|
|Afghanistan|              null|
|Afghanistan|              null|
|Afghanistan|              null|
|Afghanistan|              null|
|Afghanistan|              null|
|Afghanistan|            8200.0|
|Afghanistan|              null|
|Afghanistan|              null|
|Afghanistan|              null|
|Afghanistan|              null|
|Afghanistan|              null|
|Afghanistan|              null|
|Afghanistan|              null|
|Afghanistan|              null|
|Afghanistan|              null|
|Afghanistan|              null|
|Afghanistan|              null|
|Afghanistan|              null|
|Afghanistan|              null|
+-----------+------------------+
only showing top 20 rows



In [13]:
df.dtypes

[('country', 'string'),
 ('iso_code', 'string'),
 ('date', 'timestamp'),
 ('total_vaccinations', 'double'),
 ('people_vaccinated', 'double'),
 ('people_fully_vaccinated', 'double'),
 ('daily_vaccinations_raw', 'double'),
 ('daily_vaccinations', 'double'),
 ('total_vaccinations_per_hundred', 'double'),
 ('people_vaccinated_per_hundred', 'double'),
 ('people_fully_vaccinated_per_hundred', 'double'),
 ('daily_vaccinations_per_million', 'double'),
 ('vaccines', 'string'),
 ('source_name', 'string'),
 ('source_website', 'string')]

In [16]:
df.describe().show()



+-------+-----------+--------+--------------------+-------------------+-----------------------+----------------------+------------------+------------------------------+-----------------------------+-----------------------------------+------------------------------+--------------------+--------------------+--------------------+
|summary|    country|iso_code|  total_vaccinations|  people_vaccinated|people_fully_vaccinated|daily_vaccinations_raw|daily_vaccinations|total_vaccinations_per_hundred|people_vaccinated_per_hundred|people_fully_vaccinated_per_hundred|daily_vaccinations_per_million|            vaccines|         source_name|      source_website|
+-------+-----------+--------+--------------------+-------------------+-----------------------+----------------------+------------------+------------------------------+-----------------------------+-----------------------------------+------------------------------+--------------------+--------------------+--------------------+
|  count|    

                                                                                

### Modifying the schema

In [28]:
# Adding a column
test = df.withColumn('Test', df['total_vaccinations'] + 69)
test.columns

['country',
 'iso_code',
 'date',
 'total_vaccinations',
 'people_vaccinated',
 'people_fully_vaccinated',
 'daily_vaccinations_raw',
 'daily_vaccinations',
 'total_vaccinations_per_hundred',
 'people_vaccinated_per_hundred',
 'people_fully_vaccinated_per_hundred',
 'daily_vaccinations_per_million',
 'vaccines',
 'source_name',
 'source_website',
 'Test']

In [29]:
# Droping a column
test = test.drop('Test')
test.columns

['country',
 'iso_code',
 'date',
 'total_vaccinations',
 'people_vaccinated',
 'people_fully_vaccinated',
 'daily_vaccinations_raw',
 'daily_vaccinations',
 'total_vaccinations_per_hundred',
 'people_vaccinated_per_hundred',
 'people_fully_vaccinated_per_hundred',
 'daily_vaccinations_per_million',
 'vaccines',
 'source_name',
 'source_website']

In [30]:
# Renaming a column
test.withColumnRenamed('iso_code', 'ISO').columns

['country',
 'ISO',
 'date',
 'total_vaccinations',
 'people_vaccinated',
 'people_fully_vaccinated',
 'daily_vaccinations_raw',
 'daily_vaccinations',
 'total_vaccinations_per_hundred',
 'people_vaccinated_per_hundred',
 'people_fully_vaccinated_per_hundred',
 'daily_vaccinations_per_million',
 'vaccines',
 'source_name',
 'source_website']

## Part 2: Dataframe NA operations

### Dropping rows

In [33]:
# Dropping all rows with null values
df.na.drop().show()

+-----------+--------+-------------------+------------------+-----------------+-----------------------+----------------------+------------------+------------------------------+-----------------------------+-----------------------------------+------------------------------+--------------------+--------------------+--------------------+
|    country|iso_code|               date|total_vaccinations|people_vaccinated|people_fully_vaccinated|daily_vaccinations_raw|daily_vaccinations|total_vaccinations_per_hundred|people_vaccinated_per_hundred|people_fully_vaccinated_per_hundred|daily_vaccinations_per_million|            vaccines|         source_name|      source_website|
+-----------+--------+-------------------+------------------+-----------------+-----------------------+----------------------+------------------+------------------------------+-----------------------------+-----------------------------------+------------------------------+--------------------+--------------------+-----------

In [34]:
# Drop only rows that contain all NA values
df.na.drop(how='all').show()

+-----------+--------+-------------------+------------------+-----------------+-----------------------+----------------------+------------------+------------------------------+-----------------------------+-----------------------------------+------------------------------+--------------------+--------------------+--------------------+
|    country|iso_code|               date|total_vaccinations|people_vaccinated|people_fully_vaccinated|daily_vaccinations_raw|daily_vaccinations|total_vaccinations_per_hundred|people_vaccinated_per_hundred|people_fully_vaccinated_per_hundred|daily_vaccinations_per_million|            vaccines|         source_name|      source_website|
+-----------+--------+-------------------+------------------+-----------------+-----------------------+----------------------+------------------+------------------------------+-----------------------------+-----------------------------------+------------------------------+--------------------+--------------------+-----------

In [37]:
# Drop only rows that contain more than 4 NA values
df.na.drop(how='any', thresh=4).show()

+-----------+--------+-------------------+------------------+-----------------+-----------------------+----------------------+------------------+------------------------------+-----------------------------+-----------------------------------+------------------------------+--------------------+--------------------+--------------------+
|    country|iso_code|               date|total_vaccinations|people_vaccinated|people_fully_vaccinated|daily_vaccinations_raw|daily_vaccinations|total_vaccinations_per_hundred|people_vaccinated_per_hundred|people_fully_vaccinated_per_hundred|daily_vaccinations_per_million|            vaccines|         source_name|      source_website|
+-----------+--------+-------------------+------------------+-----------------+-----------------------+----------------------+------------------+------------------------------+-----------------------------+-----------------------------------+------------------------------+--------------------+--------------------+-----------

In [40]:
# Drop only rows that have NA values in a specific column
df.na.drop(subset=['daily_vaccinations_raw']).show()

+-----------+--------+-------------------+------------------+-----------------+-----------------------+----------------------+------------------+------------------------------+-----------------------------+-----------------------------------+------------------------------+--------------------+--------------------+--------------------+
|    country|iso_code|               date|total_vaccinations|people_vaccinated|people_fully_vaccinated|daily_vaccinations_raw|daily_vaccinations|total_vaccinations_per_hundred|people_vaccinated_per_hundred|people_fully_vaccinated_per_hundred|daily_vaccinations_per_million|            vaccines|         source_name|      source_website|
+-----------+--------+-------------------+------------------+-----------------+-----------------------+----------------------+------------------+------------------------------+-----------------------------+-----------------------------------+------------------------------+--------------------+--------------------+-----------

### Replacing the values

In [46]:
# Replacing all missing values with a specific value
df.na.fill(9999999).show()

+-----------+--------+-------------------+------------------+-----------------+-----------------------+----------------------+------------------+------------------------------+-----------------------------+-----------------------------------+------------------------------+--------------------+--------------------+--------------------+
|    country|iso_code|               date|total_vaccinations|people_vaccinated|people_fully_vaccinated|daily_vaccinations_raw|daily_vaccinations|total_vaccinations_per_hundred|people_vaccinated_per_hundred|people_fully_vaccinated_per_hundred|daily_vaccinations_per_million|            vaccines|         source_name|      source_website|
+-----------+--------+-------------------+------------------+-----------------+-----------------------+----------------------+------------------+------------------------------+-----------------------------+-----------------------------------+------------------------------+--------------------+--------------------+-----------

In [50]:
# Replacing all missing values in a group of columns with a specific value
df.na.fill(9999999, ['people_fully_vaccinated','daily_vaccinations_raw','daily_vaccinations']).show()

+-----------+--------+-------------------+------------------+-----------------+-----------------------+----------------------+------------------+------------------------------+-----------------------------+-----------------------------------+------------------------------+--------------------+--------------------+--------------------+
|    country|iso_code|               date|total_vaccinations|people_vaccinated|people_fully_vaccinated|daily_vaccinations_raw|daily_vaccinations|total_vaccinations_per_hundred|people_vaccinated_per_hundred|people_fully_vaccinated_per_hundred|daily_vaccinations_per_million|            vaccines|         source_name|      source_website|
+-----------+--------+-------------------+------------------+-----------------+-----------------------+----------------------+------------------+------------------------------+-----------------------------+-----------------------------------+------------------------------+--------------------+--------------------+-----------

Create an imputer to transform the selected columns and replace the NA values with the mean of the columns

In [57]:
# Creation of the imputer. The strategy can be changed to "median" and "mode" as well

from pyspark.ml.feature import Imputer

inputCols = ['people_fully_vaccinated','daily_vaccinations_raw','daily_vaccinations']

imputer = Imputer(
    inputCols=inputCols,
    outputCols=[f'{col}_imputed' for col in inputCols]
).setStrategy('mean')

In [58]:
# Fit the imputer the data from the dataframe and add the imputed columns to it
imputer.fit(df).transform(df).select(['people_fully_vaccinated_imputed','daily_vaccinations_raw_imputed','daily_vaccinations_imputed']).show()

+-------------------------------+------------------------------+--------------------------+
|people_fully_vaccinated_imputed|daily_vaccinations_raw_imputed|daily_vaccinations_imputed|
+-------------------------------+------------------------------+--------------------------+
|           1.4138299848152157E7|             270599.5782478367|        131305.48607518588|
|           1.4138299848152157E7|             270599.5782478367|                    1367.0|
|           1.4138299848152157E7|             270599.5782478367|                    1367.0|
|           1.4138299848152157E7|             270599.5782478367|                    1367.0|
|           1.4138299848152157E7|             270599.5782478367|                    1367.0|
|           1.4138299848152157E7|             270599.5782478367|                    1367.0|
|           1.4138299848152157E7|             270599.5782478367|                    1367.0|
|           1.4138299848152157E7|             270599.5782478367|                