In [1]:
import pyspark
from pyspark.sql import SparkSession

In [2]:
import os
import pandas

from pyspark.sql.types import *
from pyspark.sql.functions import avg, round

To connect to the Spark cluster, we must create a SparkSession object with the following params:

* **appName**: application name displayed at the Spark Master Web UI;
* **master**: Spark Master URL, same used by Spark Workers;
* **spark.executor.memory**: must be less than or equals to docker compose SPARK_WORKER_MEMORY config.

In [3]:
spark = SparkSession.builder.appName('spark-notebook-1').master('spark://spark-master:7077').config('spark.executor.memmory', '2G').getOrCreate()

2023-09-13 08:44:04,461 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


In [9]:
# The data from covid.csv will be loaded from a local file path
for path, subdirs, files in os.walk('./data/'):
    for name in files:
        if "covid" in name:
            csvName = name
            csvPath = os.path.join(path, name)
            print("Loading data from csv {}".format(csvPath))
            covidDfPandas = pandas.read_csv(csvPath)

Loading data from csv ./data/covid.csv


In [4]:
# Structure of data in the DataFrame
schema = StructType([
    StructField("dateRep", StringType(), True),
    StructField("day", IntegerType(), True),
    StructField("month", IntegerType(), True),
    StructField("year", IntegerType(), True),
    StructField("cases", IntegerType(), True),
    StructField("deaths", IntegerType(), True),
    StructField("countriesAndTerritories", StringType(), True),
    StructField("geoId", StringType(), True),
    StructField("countryterritoryCode", StringType(), True),
    StructField("popData2019", FloatType(), True),
    StructField("continentExp", StringType(), True),
    StructField("Cumulative_number_for_14_days_of_COVID-19_cases_per_100000", DoubleType(), True),
])

In [10]:
# Create PySpark DataFrame from Pandas
covidDfSpark = spark.createDataFrame(covidDfPandas, schema=schema)

In [7]:
# The sum of cases with Covid in October of 2020 in Belgium.

cases_Greece = covidDfSpark.filter((covidDfSpark['month'] == 10) & (covidDfSpark['year'] == 2020) & (covidDfSpark['countryterritoryCode'] == 'BEL'))

print('Sum of cases with Covid in October of 2020 in Belgium:')
cases_Greece.agg({'cases':'sum'}).show()

Sum of cases with Covid in October of 2020 in Belgium:


2023-09-11 13:00:32,267 WARN scheduler.TaskSetManager: Stage 0 contains a task of very large size (1650 KiB). The maximum recommended task size is 1000 KiB.
[Stage 0:>                                                          (0 + 2) / 2]

+----------+
|sum(cases)|
+----------+
|    320023|
+----------+



                                                                                

In [8]:
# The sum of cases and deaths for each country.

group_continents = covidDfSpark.groupBy('countryterritoryCode').agg({'cases':'sum', 'deaths':'sum'})

print('Sum of cases and deaths for each country:')
group_continents.show(10)

Sum of cases and deaths for each country:


2023-09-08 21:40:06,991 WARN scheduler.TaskSetManager: Stage 2 contains a task of very large size (1650 KiB). The maximum recommended task size is 1000 KiB.


+--------------------+----------+-----------+
|countryterritoryCode|sum(cases)|sum(deaths)|
+--------------------+----------+-----------+
|                 HTI|      9565|        234|
|                 PSE|    124657|       1079|
|                 LVA|     25675|        349|
|                 POL|   1135676|      22864|
|                 BRB|       292|          7|
|                 ZMB|     18274|        367|
|                 JAM|     11710|        273|
|                 BRA|   6901952|     181402|
|                 ARM|    148682|       2503|
|                 MOZ|     16954|        142|
+--------------------+----------+-----------+
only showing top 10 rows



In [9]:
# The average number of cases and deaths in the countries of Asia.

cases_Asia = covidDfSpark.filter(covidDfSpark['continentExp'] == 'Asia').withColumnRenamed('countriesAndTerritories', 'country')

result = cases_Asia.groupBy('country').agg(
    round(avg('cases'), 2).alias('Average of cases'),
    round(avg('deaths'), 2).alias('Average of deaths') 
).orderBy('country')

print('Average number of cases and deaths in the countries of Asia:')
result.show(10)

Average number of cases and deaths in the countries of Asia:


2023-09-08 21:40:08,140 WARN scheduler.TaskSetManager: Stage 8 contains a task of very large size (1650 KiB). The maximum recommended task size is 1000 KiB.

+-----------------+----------------+-----------------+
|          country|Average of cases|Average of deaths|
+-----------------+----------------+-----------------+
|      Afghanistan|          144.92|              5.8|
|          Bahrain|          255.42|              1.0|
|       Bangladesh|         1783.76|            25.64|
|           Bhutan|            1.59|              0.0|
|Brunei_Darussalam|            0.54|             0.01|
|         Cambodia|            1.05|              0.0|
|            China|          262.92|            13.54|
|            India|         28321.2|           410.76|
|        Indonesia|         1801.22|            54.87|
|             Iran|         3166.48|           149.13|
+-----------------+----------------+-----------------+
only showing top 10 rows



                                                                                

In [10]:
# The sum of deaths of Covid in Europe by date.

cases_Europe = covidDfSpark.filter(covidDfSpark['continentExp'] == 'Europe')

print('Sum of deaths of Covid in Europe by date:')
cases_Europe.groupBy('dateRep').agg({'deaths':'sum'}).sort('sum(deaths)', ascending=False).show(10)

Sum of deaths of Covid in Europe by date:


2023-09-08 21:40:09,981 WARN scheduler.TaskSetManager: Stage 10 contains a task of very large size (1650 KiB). The maximum recommended task size is 1000 KiB.

+----------+-----------+
|   dateRep|sum(deaths)|
+----------+-----------+
|25/11/2020|       6391|
|02/12/2020|       6195|
|09/12/2020|       6074|
|28/11/2020|       6001|
|21/11/2020|       5909|
|18/11/2020|       5829|
|05/12/2020|       5783|
|03/12/2020|       5728|
|26/11/2020|       5719|
|04/12/2020|       5716|
+----------+-----------+
only showing top 10 rows



                                                                                

In [11]:
import time

epochNow = int(time.time())

In [12]:
# Write Dataframe into HDFS as a parquet file

covidDfSpark.write.parquet("hdfs://namenode:8020/covid/{}_{}.parquet".format(csvName,epochNow))
print("Covid Dataframe stored in Hadoop.")

2023-09-08 21:40:25,566 WARN scheduler.TaskSetManager: Stage 12 contains a task of very large size (1650 KiB). The maximum recommended task size is 1000 KiB.
[Stage 12:>                                                         (0 + 2) / 2]

Covid Dataframe stored in Hadoop.


                                                                                

In [13]:
# Read from HDFS to confirm it was successfully stored

df_load = spark.read.parquet("hdfs://namenode:8020/covid/{}_{}.parquet".format(csvName,epochNow))
print("Covid Dataframe read from Hadoop : ")
df_load.show()

Covid Dataframe read from Hadoop : 
+----------+---+-----+----+-----+------+-----------------------+-----+--------------------+-----------+------------+----------------------------------------------------------+
|   dateRep|day|month|year|cases|deaths|countriesAndTerritories|geoId|countryterritoryCode|popData2019|continentExp|Cumulative_number_for_14_days_of_COVID-19_cases_per_100000|
+----------+---+-----+----+-----+------+-----------------------+-----+--------------------+-----------+------------+----------------------------------------------------------+
|08/09/2020|  8|    9|2020|   80|     1|             Kazakhstan|   KZ|                 KAZ|1.8551428E7|        Asia|                                               36.33682539|
|07/09/2020|  7|    9|2020|  199|     1|             Kazakhstan|   KZ|                 KAZ|1.8551428E7|        Asia|                                               35.90559174|
|06/09/2020|  6|    9|2020|  896|     8|             Kazakhstan|   KZ|              

In [11]:
spark.stop()