<a href="https://colab.research.google.com/github/arutraj/.githubcl/blob/main/Exploration_of_Covid_Dataset.ipynb" target="_parent"><img src="https://colab.research.google.com/assets/colab-badge.svg" alt="Open In Colab"/></a>

# Install pyspark and customize Colab configuration


The pyspark installation will persist until the runtime is recycled.

In [1]:
!sudo apt update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
#Check this site for the latest download link https://www.apache.org/dyn/closer.lua/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
!wget -q https://dlcdn.apache.org/spark/spark-3.2.1/spark-3.2.1-bin-hadoop3.2.tgz
!tar xf spark-3.2.1-bin-hadoop3.2.tgz
!pip install -q findspark
!pip install pyspark
!pip install py4j

import os
import sys
# os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
# os.environ["SPARK_HOME"] = "/content/spark-3.2.1-bin-hadoop3.2"


import findspark
findspark.init()
findspark.find()

[33m0% [Working][0m            Get:1 https://cloud.r-project.org/bin/linux/ubuntu jammy-cran40/ InRelease [3,626 B]
Get:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  InRelease [1,581 B]
Get:3 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu2204/x86_64  Packages [1,032 kB]
Get:4 http://security.ubuntu.com/ubuntu jammy-security InRelease [129 kB]
Hit:5 http://archive.ubuntu.com/ubuntu jammy InRelease
Hit:6 https://ppa.launchpadcontent.net/deadsnakes/ppa/ubuntu jammy InRelease
Get:7 http://archive.ubuntu.com/ubuntu jammy-updates InRelease [128 kB]
Hit:8 https://ppa.launchpadcontent.net/graphics-drivers/ppa/ubuntu jammy InRelease
Ign:9 https://r2u.stat.illinois.edu/ubuntu jammy InRelease
Hit:10 https://ppa.launchpadcontent.net/ubuntugis/ppa/ubuntu jammy InRelease
Get:11 https://r2u.stat.illinois.edu/ubuntu jammy Release [5,713 B]
Get:12 https://r2u.stat.illinois.edu/ubuntu jammy Release.gpg [793 B]
Get:13 http://archive.ubuntu.com/ubuntu 

'/usr/local/lib/python3.10/dist-packages/pyspark'

# Initialize Spark

In [2]:
import pyspark
from pyspark import SparkContext

from pyspark.sql import DataFrame, SparkSession
from typing import List
import pyspark.sql.types as T
import pyspark.sql.functions as F

spark= SparkSession \
       .builder \
       .appName("Our First Spark Example") \
       .getOrCreate()

sc = spark.sparkContext
spark

# Load Covid Dataset

In [3]:
# Importing the requests library for handling HTTP requests
import requests

# Specifying the path to the raw CSV file on GitHub
path = "https://raw.githubusercontent.com/owid/covid-19-data/master/public/data/owid-covid-data.csv"

# Sending a GET request to the specified URL to retrieve the content of the CSV file
req = requests.get(path)

# Storing the content of the CSV file
url_content = req.content

# Specifying the name for the local CSV file to be saved
csv_file_name = 'owid-covid-data.csv'

# Opening the local CSV file in binary write mode
csv_file = open(csv_file_name, 'wb')

# Writing the content of the retrieved CSV file to the local file
csv_file.write(url_content)

# Closing the local CSV file
csv_file.close()

# Using Apache Spark to read the CSV file into a DataFrame
df = spark.read.csv('/content/'+csv_file_name, header=True, inferSchema=True)


In [6]:
df.count()

429435

In [4]:
df.show()

+--------+---------+-----------+----------+-----------+---------+------------------+------------+----------+-------------------+-----------------------+---------------------+------------------------------+------------------------+----------------------+-------------------------------+-----------------+------------+------------------------+-------------+-------------------------+---------------------+---------------------------------+----------------------+----------------------------------+-----------+---------+------------------------+----------------------+------------------+-------------------------------+-------------+--------------+-----------+------------------+-----------------+-----------------------+--------------+----------------+-------------------------+------------------------------+-----------------------------+-----------------------------------+--------------------------+-------------------------------------+------------------------------+-------------------------------

# Lazy Evaluation

In [7]:
# Filter all rows which have location India
df.filter("location='India'")

DataFrame[iso_code: string, continent: string, location: string, date: date, total_cases: int, new_cases: int, new_cases_smoothed: double, total_deaths: int, new_deaths: int, new_deaths_smoothed: double, total_cases_per_million: double, new_cases_per_million: double, new_cases_smoothed_per_million: double, total_deaths_per_million: double, new_deaths_per_million: double, new_deaths_smoothed_per_million: double, reproduction_rate: double, icu_patients: int, icu_patients_per_million: double, hosp_patients: int, hosp_patients_per_million: double, weekly_icu_admissions: int, weekly_icu_admissions_per_million: double, weekly_hosp_admissions: int, weekly_hosp_admissions_per_million: double, total_tests: bigint, new_tests: int, total_tests_per_thousand: double, new_tests_per_thousand: double, new_tests_smoothed: double, new_tests_smoothed_per_thousand: double, positive_rate: double, tests_per_case: double, tests_units: string, total_vaccinations: bigint, people_vaccinated: bigint, people_full

In [8]:
# Only once we call count, the expression is evaluated to give result
df.filter("location='India'").count()

1682

# Broadcast variable

In [9]:
## Initialize a dict of encoding as broadcast variable
broadcast_var = sc.broadcast({"Asia": 0, "Europe": 1, "Africa": 2, "Oceania": 3, "North America": 4, "South America": 5})

In [10]:
# select continent column and convert the dataframe into RDD
data_rdd = df.select('continent').filter("continent is not null").rdd

In [11]:
# Note there are 6 continents
data_rdd.distinct().collect()

[Row(continent='Africa'),
 Row(continent='Asia'),
 Row(continent='Europe'),
 Row(continent='Oceania'),
 Row(continent='South America'),
 Row(continent='North America')]

In [12]:
# Function for categorical encoding
def encode_continent(x):
  return broadcast_var.value[x[0]]

In [14]:
data_rdd.map(lambda x: encode_continent(x)).take(5)
# see the encoded rdd

[0, 0, 0, 0, 0]

# RDD Actions and Transformations

### collect

In [15]:
# Get all the data on driver in a list
data_rdd.collect()

[Row(continent='Asia'),
 Row(continent='Asia'),
 Row(continent='Asia'),
 Row(continent='Asia'),
 Row(continent='Asia'),
 Row(continent='Asia'),
 Row(continent='Asia'),
 Row(continent='Asia'),
 Row(continent='Asia'),
 Row(continent='Asia'),
 Row(continent='Asia'),
 Row(continent='Asia'),
 Row(continent='Asia'),
 Row(continent='Asia'),
 Row(continent='Asia'),
 Row(continent='Asia'),
 Row(continent='Asia'),
 Row(continent='Asia'),
 Row(continent='Asia'),
 Row(continent='Asia'),
 Row(continent='Asia'),
 Row(continent='Asia'),
 Row(continent='Asia'),
 Row(continent='Asia'),
 Row(continent='Asia'),
 Row(continent='Asia'),
 Row(continent='Asia'),
 Row(continent='Asia'),
 Row(continent='Asia'),
 Row(continent='Asia'),
 Row(continent='Asia'),
 Row(continent='Asia'),
 Row(continent='Asia'),
 Row(continent='Asia'),
 Row(continent='Asia'),
 Row(continent='Asia'),
 Row(continent='Asia'),
 Row(continent='Asia'),
 Row(continent='Asia'),
 Row(continent='Asia'),
 Row(continent='Asia'),
 Row(continent='

### take

In [16]:
# Fetch first 5 rows
data_rdd.take(5)

[Row(continent='Asia'),
 Row(continent='Asia'),
 Row(continent='Asia'),
 Row(continent='Asia'),
 Row(continent='Asia')]

### count

In [17]:
# Count number of rows
data_rdd.count()

402910

In [18]:
# function passing
data_rdd.map(lambda x: encode_continent(x)).reduce(lambda x, y: x + y)

794170

### caching

In [19]:
# Cache the rdd for faster computation
data_rdd.cache()

MapPartitionsRDD[40] at javaToPython at NativeMethodAccessorImpl.java:0

In [20]:
# Count number of rows
data_rdd.count()

402910

In [21]:
# note the difference in run time
data_rdd.count()

402910

# Spark Dataframe

In [22]:
df.show()

+--------+---------+-----------+----------+-----------+---------+------------------+------------+----------+-------------------+-----------------------+---------------------+------------------------------+------------------------+----------------------+-------------------------------+-----------------+------------+------------------------+-------------+-------------------------+---------------------+---------------------------------+----------------------+----------------------------------+-----------+---------+------------------------+----------------------+------------------+-------------------------------+-------------+--------------+-----------+------------------+-----------------+-----------------------+--------------+----------------+-------------------------+------------------------------+-----------------------------+-----------------------------------+--------------------------+-------------------------------------+------------------------------+-------------------------------

### groupBy

In [23]:
df.groupBy("continent").count().show()

+-------------+-----+
|    continent|count|
+-------------+-----+
|       Europe|91031|
|       Africa|95419|
|         NULL|26525|
|North America|68638|
|South America|23440|
|      Oceania|40183|
|         Asia|84199|
+-------------+-----+



### orderBy

In [24]:
df.orderBy("continent", ascending=False).show()

+--------+-------------+----------------+----------+-----------+---------+------------------+------------+----------+-------------------+-----------------------+---------------------+------------------------------+------------------------+----------------------+-------------------------------+-----------------+------------+------------------------+-------------+-------------------------+---------------------+---------------------------------+----------------------+----------------------------------+-----------+---------+------------------------+----------------------+------------------+-------------------------------+-------------+--------------+---------------+------------------+-----------------+-----------------------+--------------+----------------+-------------------------+------------------------------+-----------------------------+-----------------------------------+--------------------------+-------------------------------------+------------------------------+------------------

In [25]:
df.select("iso_code").drop_duplicates().orderBy("iso_code").show()

+--------+
|iso_code|
+--------+
|     ABW|
|     AFG|
|     AGO|
|     AIA|
|     ALB|
|     AND|
|     ARE|
|     ARG|
|     ARM|
|     ASM|
|     ATG|
|     AUS|
|     AUT|
|     AZE|
|     BDI|
|     BEL|
|     BEN|
|     BES|
|     BFA|
|     BGD|
+--------+
only showing top 20 rows



### rename column

In [26]:
df.withColumnRenamed("location", "country").show()

+--------+---------+-----------+----------+-----------+---------+------------------+------------+----------+-------------------+-----------------------+---------------------+------------------------------+------------------------+----------------------+-------------------------------+-----------------+------------+------------------------+-------------+-------------------------+---------------------+---------------------------------+----------------------+----------------------------------+-----------+---------+------------------------+----------------------+------------------+-------------------------------+-------------+--------------+-----------+------------------+-----------------+-----------------------+--------------+----------------+-------------------------+------------------------------+-----------------------------+-----------------------------------+--------------------------+-------------------------------------+------------------------------+-------------------------------

# Find number of null rows in total deaths

In [27]:
from pyspark.sql.functions import col, sum, avg

In [33]:
df.select(sum(col("total_deaths").isNull().cast("int")).alias("NullCount"))

DataFrame[NullCount: bigint]

In [34]:
# Method1
df.select(sum(col("total_deaths").isNull().cast("int")).alias("NullCount")).show()

+---------+
|NullCount|
+---------+
|    17631|
+---------+



In [29]:
# Method2
df.filter("total_deaths IS NULL").count()

17631

# Find average number of deaths per location

In [30]:
#Method1
df.groupBy("location").agg(sum("total_deaths").alias("total_deaths")).select(avg(col("total_deaths")).alias("AverageDeaths")).show()

+--------------------+
|       AverageDeaths|
+--------------------+
|1.3602852734146342E8|
+--------------------+



In [35]:
df.groupBy("location").agg(avg(col("total_deaths")).alias("AverageDeathsperLocation")).orderBy("AverageDeathsperLocation", ascending=False).show()

+--------------------+------------------------+
|            location|AverageDeathsperLocation|
+--------------------+------------------------+
|               World|      4784922.9301075265|
|High-income count...|       1973653.174432497|
|Upper-middle-inco...|      1950755.4677419355|
|              Europe|       1410457.609916368|
|       North America|      1123438.5167264037|
|                Asia|      1069536.9653524493|
|       South America|       983825.7897252091|
| European Union (27)|       852482.2956989247|
|Lower-middle-inco...|       827846.0722819594|
|       United States|       777909.9964157706|
|              Brazil|      505097.67144563916|
|               India|      380320.23178016726|
|              Russia|      256931.14994026284|
|              Mexico|       251657.1714456392|
|              Africa|      183366.67562724015|
|                Peru|      169138.48207885303|
|      United Kingdom|       165832.2616487455|
|               Italy|       135473.0310

In [31]:
#Method2
df.groupBy("location").agg(sum("total_deaths").alias("total_deaths")).selectExpr("avg(total_deaths) AS AverageDeaths").show()

+--------------------+
|       AverageDeaths|
+--------------------+
|1.3602852734146342E8|
+--------------------+



---

