# Installation and set up environment

In [1]:
! mkdir lab05; mkdir lab05/input

In [2]:
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!cp /content/drive/MyDrive/MMDS/spark-3.1.1-bin-hadoop3.2.tgz /content
!tar xf spark-3.1.1-bin-hadoop3.2.tgz
!pip install -q findspark

In [3]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.1.1-bin-hadoop3.2"

In [10]:
# SPARK_HOME = os.environ["SPARK_HOME"]
# PYTHONPATH = os.environ["PYTHONPATH"]
# PATH = os.environ["PATH"]
# os.environ["PYTHONPATH"] = f"{PYTHONPATH}:{SPARK_HOME}/python:{SPARK_HOME}"+
#                               "/python/lib/py4j-0.10.9-src.zip"
# os.environ["PATH"] = f"{PATH}:{SPARK_HOME}/python"

In [4]:
import findspark
findspark.init()
print(findspark.find())

/content/spark-3.1.1-bin-hadoop3.2


In [11]:
from pyspark.sql import SparkSession
from pyspark.sql import functions as F
from pyspark.sql.window import Window
from pyspark.sql.types import *

In [12]:
spark = SparkSession.builder.appName("First Spark").getOrCreate()

In [13]:
df = spark.read.csv("lab05/input/WHO-COVID-19-20210601-213841.tsv",
                    sep="\t", header=True)

# Finding ASEAN Cases

In [16]:
keyRegion = "South-East Asia"
# Only filter the Cases ... cells that Region == keyRegion and cast it
# to decimal

asian = df.filter(df["WHO Region"] == keyRegion) \
    .select(
        "Name",
        F.regexp_replace(df["Cases - cumulative total"], ",", "") \
        .cast(DecimalType(15, 3)) \
        .alias("Cumulative Cases of Asian")
    )

## Sum of cumulative total cases among ASEAN

In [35]:
asian.agg(F.sum(asian["Cumulative Cases of Asian"]) \
          .alias("Sum of cumulative total cases among ASEAN")) \
          .show()

+------------------------------------------+
|Sum of cumulative total cases among ASEAN |
+------------------------------------------+
|                              31923614.000|
+------------------------------------------+



## Maximum number of cumulative total cases among ASEAN

In [34]:
# Find min cumulation -> join to Asian with "Cumlat..." -> select "Name"
asian.agg(F.max(asian["Cumulative Cases of Asian"]) \
          .alias("Cumulative Cases of Asian")) \
          .join(asian, "Cumulative Cases of Asian") \
          .select(asian["Name"] \
                .alias("Maximum number of cumulative total cases among ASEAN")) \
          .show(truncate=False)

+----------------------------------------------------+
|Maximum number of cumulative total cases among ASEAN|
+----------------------------------------------------+
|India                                               |
+----------------------------------------------------+



## Top 3 countries with the lowest number of cumulative cases among ASEAN

In [37]:
window = Window.orderBy(F.asc("Cumulative Cases of Asian"))
ranked_asian = asian.withColumn("rank", F.rank().over(window))
ranked_asian.filter(ranked_asian["rank"] <= 3).show(truncate=False)

+-------------------------------------+-------------------------+----+
|Name                                 |Cumulative Cases of Asian|rank|
+-------------------------------------+-------------------------+----+
|Democratic People's Republic of Korea|0.000                    |1   |
|Bhutan                               |1620.000                 |2   |
|Timor-Leste                          |6994.000                 |3   |
+-------------------------------------+-------------------------+----+

