# CS5540, Group 1 - Apache Spark Assignment

This is the submission document for our programming assignment over Apache Spark. 

The submission was written as a Jupyter notebook but will be exported to a PDF for submission. We can provide the GitHub repo or the original Jupyter notebook if requested.

In [13]:
%pip install pyspark

Note: you may need to restart the kernel to use updated packages.


In [14]:
from pyspark.sql import SparkSession
from pyspark import SparkContext
from pyspark.sql.functions import col, split, explode, lower, trim, avg
import re

In [15]:
spark = SparkSession.builder\
    .appName("my-spark-app")\
    .config("spark.sql.catalogImplementation", "hive")\
    .getOrCreate()
sc = SparkContext.getOrCreate()

## Table of Contents

1. [Team Members](#team-members)
2. [Question 1](#question-1)
    - [1.1](#11)
    - [1.2](#12)
    - [1.3](#13)
    - [1.4](#14)
    - [1.5](#15)
3. [Question 2](#question-2)
4. [Question 3](#question-3)

## Team Members

This assignment was completed by the following team members (Group 1):

- Odai Athamneh
- Scott Brunton
- Ayushman (Jeet) Das
- Koti Paruchuri
- Varshith Thota

## Question 1

Question 1 is as follows: 

> Given file (`/data/shakespeare-1.txt`) contains the scenes from Shakespeare’s plays. You may use this 
file as an input dataset to identify the following notes for a student of Classical Drama.

In [16]:
# tokenize input text file and clean the word column
df = spark.read.text("data/shakespeare-1.txt")

df = df.select(explode(split(col("value"), " ")).alias("word"))
df = df.select(lower(trim(col("word"))).alias("word"))

df.show(5, truncate=False)

+-----+
|word |
+-----+
|this |
|is   |
|the  |
|100th|
|etext|
+-----+
only showing top 5 rows



### Question 1.1

The question reads as follows:

> How many different countries are mentioned in the whole file? (Regardless of how many times a single country is mentioned, this country only contributes as a single entry). 

To address this question, we need a dataset of country names. We are using the `country-list.csv` file provided by the professor. The file contains 211 entries. 

The caveat to this approach is that the dataset may not contain all countries, such as: 
- Countries that no longer exist
- Countries that are misspelled in the original Shakespearean text
- Countries where the name or spelling has changed over time

Addressing this issue is beyond the scope of this assignment and would likely require some degree of manual curation.

In [17]:
# load countries dataframe and clean the country column
countries = spark.read.csv("data/country-list.csv", header=False)
countries = countries.select(lower(trim("_c0")).alias("country"))

countries.show(5, truncate=False)
countries.count()

+--------------+
|country       |
+--------------+
|afghanistan   |
|albania       |
|algeria       |
|american samoa|
|andorra       |
+--------------+
only showing top 5 rows



211

Now that we have our list of countries, we can use a simple `.join()` to find the number of countries mentioned in the Shakespearean text. We will use the `Country` column as our key and perform an inner join with the Shakespearean text. This will return a new DataFrame with only the rows that have a match in both DataFrames. We can then use `.count()` to get the number of rows in the resulting DataFrame.

In [18]:
# perform join after converting both columns to lowercase and trimming the country column
unique_countries = df.join(countries, df.word == countries.country, "inner").select("country").distinct()
unique_countries.show(5, truncate=False)

print("Number of unique countries in the text file: {}".format(unique_countries.count()))

+-------+
|country|
+-------+
|greece |
|poland |
|austria|
|guinea |
|france |
+-------+
only showing top 5 rows

Number of unique countries in the text file: 22


### Question 1.2

The question reads as follows:

> Compute the total number of times any country is mentioned. (This is different from  the  question1.1,  since  in  this  calculation,  if  a  country  is  mentioned  three  times,  then  it contributes three times). 



In [19]:
country_mentions = df.join(countries, df.word == countries.country, "inner").select("country").groupBy("country").count().orderBy("count", ascending=False)

# show the sum of the count column
country_mentions.agg({"count": "sum"}).show()

+----------+
|sum(count)|
+----------+
|       392|
+----------+



### Question 1.3

The question reads as follows:

> Determine  the  most  popular  countries.  (It  can  be  done  by  finding  the  three countries mentioned the most). 

This is fairly straightforward, and we can reuse the `country_mentions` variable from the last question.

In [20]:
country_mentions.show(3)

+--------+-----+
| country|count|
+--------+-----+
|  france|  149|
| england|  128|
|scotland|   24|
+--------+-----+
only showing top 3 rows



### Question 1.4

The question reads as follows:

> After exploring the dataset, now  calculate how many times specific countries are mentioned. (For example, how many times was France mentioned?) 

The code to do this is below and reuses the `country_mentions` variable again. Note that, by default, a Jupyter notebook will only show the first 20 rows of the resulting DataFrame. We use `.show(1000)` to ensure all rows are listed.

In [21]:
country_mentions.show(1000)

+---------+-----+
|  country|count|
+---------+-----+
|   france|  149|
|  england|  128|
| scotland|   24|
|    egypt|   15|
|    wales|   15|
|    italy|   12|
|   cyprus|   10|
|  denmark|   10|
|   greece|    6|
|     oman|    4|
|   norway|    3|
|  austria|    3|
|    syria|    3|
|    spain|    2|
|   poland|    1|
|   guinea|    1|
|  iceland|    1|
|  germany|    1|
|palestine|    1|
|   turkey|    1|
|   russia|    1|
|  armenia|    1|
+---------+-----+



### Question 1.5

The question reads as follows:

> Finally, what is the average number of times a country is mentioned? 

In [22]:
total_countries = countries.count()
total_mentions = country_mentions.agg({"count": "sum"}).collect()[0][0]

avg_mentions = (total_countries/total_mentions) * 100
avg_words = (total_mentions/df.count()) * 100

print("Percentage of countries mentioned in the text file: {}%".format(round(avg_mentions, 2)))
print("Percentage of words in the text file that are countries: {}%".format(round(avg_words, 2)))

Percentage of countries mentioned in the text file: 53.83%
Percentage of words in the text file that are countries: 0.03%


## Question 3

In [23]:
df = spark.read.csv('data/daily_weather-2.csv', header=True)
df.show(5)

+------+-------------+-----------+-------------------+---------------+-------------------+---------------+------------------+
|number|air_pressure.|  air_temp.|avg_wind_direction.|avg_wind_speed.|max_wind_direction.|max_wind_speed.|relative_humidity.|
+------+-------------+-----------+-------------------+---------------+-------------------+---------------+------------------+
|     0|       918.06|     74.822|              271.1|      2.0803542|              295.4|      2.8632832|             42.42|
|     1|  917.3476881|71.40384263|        101.9351794|    2.443009216|        140.4715485|    3.533323602|       24.32869729|
|     2|       923.04|     60.638|                 51|     17.0678522|               63.7|     22.1009672|               8.9|
|     3|  920.5027512|70.13889487|        198.8321327|    4.337363056|        211.2033412|     5.19004536|       12.18910187|
|     4|       921.16|     44.294|              277.8|      1.8566602|              136.5|      2.8632832|            

In [24]:
new_cols=(column.replace('.', '') for column in df.columns)
df = df.toDF(*new_cols)
df = df.drop("number")
df.show(5)

+------------+-----------+------------------+--------------+------------------+--------------+-----------------+
|air_pressure|   air_temp|avg_wind_direction|avg_wind_speed|max_wind_direction|max_wind_speed|relative_humidity|
+------------+-----------+------------------+--------------+------------------+--------------+-----------------+
|      918.06|     74.822|             271.1|     2.0803542|             295.4|     2.8632832|            42.42|
| 917.3476881|71.40384263|       101.9351794|   2.443009216|       140.4715485|   3.533323602|      24.32869729|
|      923.04|     60.638|                51|    17.0678522|              63.7|    22.1009672|              8.9|
| 920.5027512|70.13889487|       198.8321327|   4.337363056|       211.2033412|    5.19004536|      12.18910187|
|      921.16|     44.294|             277.8|     1.8566602|             136.5|     2.8632832|            92.41|
+------------+-----------+------------------+--------------+------------------+--------------+--

### Question 3.1

The question reads as follows:

> Count the number of days where all parameters have difference of ± 2

Informally pivoting the column headers to the column typeMeasurement, these will be used later to create a relation to the DataTable.  Column name, avgs, lower and upper ranges will be collected to create a new dataframe.

In [25]:
dfAvg = df.select( avg('air_pressure'), avg('air_temp'), avg('avg_wind_direction'), avg('avg_wind_speed'),
                           avg('max_wind_direction'), avg('max_wind_speed'), avg('relative_humidity'))
#set to one to offset row used for intilization of list var data
x = 1                   
data = [(0,"ab",0.0,0.0,0.0)]

for col in df.columns:
  typOfMeasurment = x
  avrg = dfAvg.collect()[0][x -1]
  upprRng = avrg + 2
  lowrRng = avrg -2

  data.append([x,col,avrg,upprRng,lowrRng])
  x += 1
  
dfAvgSummary = spark.createDataFrame(schema = ['index','typeMeasurement','avg','upprRng','lowrRng'], data = data)
dfAvgSummary.show()

+-----+------------------+------------------+------------------+------------------+
|index|   typeMeasurement|               avg|           upprRng|           lowrRng|
+-----+------------------+------------------+------------------+------------------+
|    0|                ab|               0.0|               0.0|               0.0|
|    1|      air_pressure| 918.8825513141026| 920.8825513141026| 916.8825513141026|
|    2|          air_temp| 64.93300141293575| 66.93300141293575| 62.93300141293575|
|    3|avg_wind_direction|142.23551070020164|144.23551070020164|140.23551070020164|
|    4|    avg_wind_speed| 5.508284242259157| 7.508284242259157|3.5082842422591574|
|    5|max_wind_direction|148.95351796495402|150.95351796495402|146.95351796495402|
|    6|    max_wind_speed| 7.019513529173236| 9.019513529173235| 5.019513529173236|
|    7| relative_humidity|34.241402059256586|36.241402059256586|32.241402059256586|
+-----+------------------+------------------+------------------+------------

Removes record that was only used to intilize the column data types.

In [26]:
# clean DF and remove values that were used to initilize df columen types
dfAvgSummaryC = dfAvgSummary.filter(dfAvgSummary.index != 0)
dfAvgSummaryC.show()

+-----+------------------+------------------+------------------+------------------+
|index|   typeMeasurement|               avg|           upprRng|           lowrRng|
+-----+------------------+------------------+------------------+------------------+
|    1|      air_pressure| 918.8825513141026| 920.8825513141026| 916.8825513141026|
|    2|          air_temp| 64.93300141293575| 66.93300141293575| 62.93300141293575|
|    3|avg_wind_direction|142.23551070020164|144.23551070020164|140.23551070020164|
|    4|    avg_wind_speed| 5.508284242259157| 7.508284242259157|3.5082842422591574|
|    5|max_wind_direction|148.95351796495402|150.95351796495402|146.95351796495402|
|    6|    max_wind_speed| 7.019513529173236| 9.019513529173235| 5.019513529173236|
|    7| relative_humidity|34.241402059256586|36.241402059256586|32.241402059256586|
+-----+------------------+------------------+------------------+------------------+



For Loop to iterate through each column in the datatable.  Once the column is selected the correlating lower and upper range are also retrived.  The lower and upper range are type cast to perform regular expression to only retreive the ranges(numeric values).  Column, lower and upper range are appended to query to display number of records outside of the +-2 range.

In [27]:
df.createOrReplaceTempView('dataTable')
dfAvgSummaryC.createOrReplaceTempView('avgTable')

exp = r"[^0-9.]"

dataTableCol = df.columns
avgCols = dfAvg.columns

for dCol in dataTableCol:
  upprRng = dfAvgSummaryC.select(dfAvgSummaryC.upprRng).filter(dfAvgSummaryC.typeMeasurement.contains(dCol)).collect()  
  lowrRng = dfAvgSummaryC.select(dfAvgSummaryC.lowrRng).filter(dfAvgSummaryC.typeMeasurement.contains(dCol)).collect()
  
  x =''.join(map(str, upprRng)) 
  upr = re.sub(exp, '', x)
  y =''.join(map(str, lowrRng)) 
  lowr = re.sub(exp, '', y)
  
  spark.sql("select count(dt." + dCol + ") from dataTable as dt" +
                      " where dt." + dCol + " > " + upr +
                      " or dt." + dCol + " < " + lowr ).show()

+-------------------+
|count(air_pressure)|
+-------------------+
|                606|
+-------------------+

+---------------+
|count(air_temp)|
+---------------+
|            951|
+---------------+

+-------------------------+
|count(avg_wind_direction)|
+-------------------------+
|                     1084|
+-------------------------+

+---------------------+
|count(avg_wind_speed)|
+---------------------+
|                  754|
+---------------------+

+-------------------------+
|count(max_wind_direction)|
+-------------------------+
|                     1082|
+-------------------------+

+---------------------+
|count(max_wind_speed)|
+---------------------+
|                  826|
+---------------------+

+------------------------+
|count(relative_humidity)|
+------------------------+
|                    1041|
+------------------------+



### Question 3.2

The question reads as follows:

> Count number of days where max_wind_speed and ang_wind_speed has difference more than 5. 

In [28]:
df = spark.read.csv('data/daily_weather-2.csv', header=True)
df.show(5)

+------+-------------+-----------+-------------------+---------------+-------------------+---------------+------------------+
|number|air_pressure.|  air_temp.|avg_wind_direction.|avg_wind_speed.|max_wind_direction.|max_wind_speed.|relative_humidity.|
+------+-------------+-----------+-------------------+---------------+-------------------+---------------+------------------+
|     0|       918.06|     74.822|              271.1|      2.0803542|              295.4|      2.8632832|             42.42|
|     1|  917.3476881|71.40384263|        101.9351794|    2.443009216|        140.4715485|    3.533323602|       24.32869729|
|     2|       923.04|     60.638|                 51|     17.0678522|               63.7|     22.1009672|               8.9|
|     3|  920.5027512|70.13889487|        198.8321327|    4.337363056|        211.2033412|     5.19004536|       12.18910187|
|     4|       921.16|     44.294|              277.8|      1.8566602|              136.5|      2.8632832|            

In [29]:
new_cols=(column.replace('.', '') for column in df.columns)
df = df.toDF(*new_cols)

df.createOrReplaceTempView("dataT")

spark.sql("select  count(*) as 5DaysGreater " +      
          " from dataT dt " +
          " where dt.max_wind_speed - dt.avg_wind_speed  > 5").show()


+------------+
|5DaysGreater|
+------------+
|          20|
+------------+

