# Advanced Data Analysis - week 6, more examples

In the advanced data analysis course, we assume basic knowledge of Python, as could be acquired by attending the Introduction to Programming bridging course.

This notebook includes the examples and exercises presented in Week 6. There is an additional notebook with the examples and exercises suggested for autonomous study during the week.

In week 6, we will focus on introducing Spark.


[//]: # (We will be using latex for fomulas)

<script type="text/javascript"
        src="https://cdnjs.cloudflare.com/ajax/libs/mathjax/2.7.0/MathJax.js?config=TeX-AMS_CHTML"></script>


# Install Spark

In [1]:
# RUN THIS CELL ONLY IF RUNNING IN COLAB

!apt-get install openjdk-11-jdk-headless
!pip install pyspark
!pip install gdown
!gdown --id 1Suzt37ohetSKLNP0kFUv0Ji1joiXumir
!unzip sbe_data_2223.zip

/bin/bash: apt-get: command not found
Collecting pyspark
  Downloading pyspark-3.4.1.tar.gz (310.8 MB)
[2K     [91m━━[0m[91m╸[0m[90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m21.5/310.8 MB[0m [31m611.6 kB/s[0m eta [36m0:07:54[0m
[?25h[31mERROR: Exception:
Traceback (most recent call last):
  File "/Users/hendrik/anaconda3/lib/python3.11/site-packages/pip/_vendor/urllib3/response.py", line 438, in _error_catcher
    yield
  File "/Users/hendrik/anaconda3/lib/python3.11/site-packages/pip/_vendor/urllib3/response.py", line 561, in read
    data = self._fp_read(amt) if not fp_closed else b""
           ^^^^^^^^^^^^^^^^^^
  File "/Users/hendrik/anaconda3/lib/python3.11/site-packages/pip/_vendor/urllib3/response.py", line 527, in _fp_read
    return self._fp.read(amt) if amt is not None else self._fp.read()
           ^^^^^^^^^^^^^^^^^^
  File "/Users/hendrik/anaconda3/lib/python3.11/site-packages/pip/_vendor/cachecontrol/filewrapper.py", line 90, in read
    data = self.__fp.

Installing collected packages: gdown
Successfully installed gdown-4.7.1
Downloading...
From: https://drive.google.com/uc?id=1Suzt37ohetSKLNP0kFUv0Ji1joiXumir
To: /Users/hendrik/Documents/M.Sc. BA - NOVA/Semester 1/Advanced Data Analytics/week 3/ada_week3_lecture2/sbe_data_2324.zip
100%|██████████████████████████████████████| 1.07M/1.07M [00:00<00:00, 4.03MB/s]
unzip:  cannot find or open sbe_data_2223.zip, sbe_data_2223.zip.zip or sbe_data_2223.zip.ZIP.


## Import Spark SQL

Import the Spark SQL packages.

In [None]:
import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *

import os
import pandas as pd
import matplotlib.pyplot as plt

plt.style.use('seaborn')


In [None]:
spark = SparkSession.builder \
    .master("local") \
    .appName("Simple test") \
    .config("spark.sql.execution.arrow.pyspark.enabled", "true") \
    .getOrCreate()

## Window operations

SQL allows to define operations over windows of values in a table.

This can be used for computing, for example, moving averages.

Consider the following dataset.

In [None]:
dataDF = spark.createDataFrame( [(1,10), (2,11), (3,13), (4,16), (5,20), \
                                 (6,25), (7,31), (8,38), (9,46), (10,55)],
                               ["day","value"])
dataDF.createOrReplaceTempView("data0")
dataDF.show()


In [None]:
spark.sql("""SELECT day, value, MEAN(value) OVER 
                            (ORDER BY day ROWS BETWEEN 3 PRECEDING AND 3 FOLLOWING)
                            AS centerMA
                        FROM data0""").show()

### Moving average

The syntax for using [windows in Spark SQL](https://spark.apache.org/docs/latest/sql-ref-syntax-qry-select-window.html) is the following (simplified):
```SELECT fun(col1) OVER (ORDER BY col2 ROWS BETWEEN offset AND offset FROM ...```, with offset ```UNBOUNDED PRECEDING | offset PRECEDING | CURRENT ROW | offset FOLLOWING | UNBOUNDED FOLLOWING```.

Computing an approximation of the moving averages can be done as follows - unlike moving averages, the computation is still performed if the windows is not complete. For this reason, the computed values include results for all rows.

In [None]:
spark.sql("""SELECT day, value, MEAN(value) OVER 
                            (ORDER BY day ROWS BETWEEN 3 PRECEDING AND 3 FOLLOWING)
                            AS centerMA
                        FROM data0""").createOrReplaceTempView("data1")

result = spark.sql("select * from data1")
result.show();


In [None]:
spark.sql("""SELECT day, value, centerMA, MEAN(value) OVER 
                            (ORDER BY day ROWS BETWEEN 6 PRECEDING AND CURRENT ROW)
                            AS simpleMA
                        FROM data1
                        ORDER BY day""").createOrReplaceTempView("data2")

result = spark.sql("SELECT * FROM data2")
result.show();



You should have seen the warning: **No Partition Defined for Window operation! Moving all data to a single partition, this can cause serious performance degradation.**. 

The problem with Window operations performed over distributed data is that it is not possible to partition data, as windows overlap in a rolling way.


### Cumulative sum

Other computations that can be performed easily using windows are the cummulative sum, mean, etc.

In [None]:
spark.sql("""SELECT day, value, centerMA, simpleMA, SUM(value) OVER 
                            (ORDER BY day ROWS BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW)
                            AS acum
                        FROM data2
                        ORDER BY day""").createOrReplaceTempView("data3")

result = spark.sql("select * from data3")
result.show();

### Operations over groups using windows

Windows can also be used to execute operation over groups, because the definition of a windows includes the possibility of partitioning the table and executing the defined operation over each partition independently.

We can use this for, in the example of the first part of lecture 2, get the row with the youngest persons that are a good and bad company.

Let's start by loading data.

In [None]:
# Let's create a PATH in a OS independent way
# File lec1-example.csv is in directory data
fileName = os.path.join( "data", "lec1-example.csv")

# Read a CSV file into a DataFrame
df = spark.read.option("header", True).option("inferSchema",True).csv(fileName)
df = df.withColumnRenamed("Educational level", "EducationalLevel")

df.createOrReplaceTempView("persons")

df.show(5)

The following code partitions data by the value of the Company column, and for each partition order the rows by age ```(PARTITION BY company ORDER BY age)```. For each group, it will get the position (rank) of the row in the order.


In [None]:
youngest = spark.sql("""SELECT *, RANK(age) OVER (PARTITION BY company ORDER BY age) AS rank
                            FROM persons
                            """)
youngest.show()

Now, it is just necessary to keep only the rows with rank equals to 1. The following code creates an additional table and then performs a selection on the new table.

In [None]:
spark.sql("""SELECT *, RANK(age) OVER (PARTITION BY company ORDER BY age) AS rank
                            FROM persons
                            """).createOrReplaceTempView("personsExt")
youngest = spark.sql("""SELECT * FROM personsExt WHERE rank = 1""")

youngest.show()

The following code performs the same computation by using a nested **SELECT**, where a select is performed on the result of other select statement.


In [None]:
youngest = spark.sql("""SELECT * FROM (SELECT *, 
                            RANK(age) OVER (PARTITION BY company ORDER BY age) AS rank
                        FROM persons)
                        WHERE rank = 1""")
youngest.show()

## Correlations

It is possible to use statistical function, such as correlation in Spark SQL.

Let's start by computing the movign averages of COVID cases and deaths in Portugal.



In [None]:
ptCovidFileName = os.path.join( "data", "PT-covid.csv")

# Read a CSV file into a DataFrame
ptCovidDF = spark.read.option("header", True).option("inferSchema",True).csv(ptCovidFileName)

# Converts the date into the datetime type
ptCovidDF = ptCovidDF.withColumn("date",to_date(col("date"), "yyyy-MM-dd"))

ptCovidDF.createOrReplaceTempView("covidPT0")

spark.sql("""SELECT *, MEAN(cases) OVER 
                            (ORDER BY date ROWS BETWEEN 6 PRECEDING AND CURRENT ROW)
                            AS casesMA,
                       MEAN(deaths) OVER 
                            (ORDER BY date ROWS BETWEEN 6 PRECEDING AND CURRENT ROW)
                            AS deathsMA
                        FROM covidPT0""").createOrReplaceTempView("covidPT")

ptCovidPdDF = spark.sql("SELECT date, casesMA, deathsMA FROM covidPT").toPandas()

ptCovidPdDF.plot(x="date",y=["casesMA","deathsMA"],secondary_y=["deathsMA"])


It is now possible to compute the correlation between two columns by calling the function ```CORR( col1, col2)```.


In [None]:
corrDF = spark.sql("SELECT CORR(casesMA, deathsMA) FROM covidPT")

corrDF.show()

It is possible to compute the correlation only for a subset of rows, by using a ```WHERE``` condition.


In [None]:
print("Before 2021-03-01")
corrDF = spark.sql("SELECT CORR(casesMA, deathsMA) FROM covidPT WHERE date < '2021-03-01'")
corrDF.show()


print()
print("After 2021-03-01")
corrDF = spark.sql("SELECT CORR(casesMA, deathsMA) FROM covidPT WHERE date >= '2021-03-01'")
corrDF.show()


## Functions over multiple tables

Often, data will be in multiple tables/Dataframes. To process data, it is necessary to execute operations over these tables. 

We now introduce some of the operations available on Spark SQL for combining multiple tables.


### Appending tables

Sometimes, we have data over which we want to perform a computation that is in two tables. 

The ```SELECT ... FROM table1 ... UNION SELECT ... FROM table2 ...``` allows to combine the selected rows from both tables - in general, ```UNION``` allows to combine the results from two (or more) select statements. Unlike Pandas ```append```function, results must have the same columns.

The following code show the example running.


In [None]:
population1DF = spark.createDataFrame( [("PT",10276617), ("ES",46937060), ("DE",83019213)],
                               ["country","population"])
population1DF.createOrReplaceTempView("population1")
population1DF.show()



In [None]:
population2DF = spark.createDataFrame( [("BR",211049519), 
                                        ("MX",127575529), 
                                        ("UY",3461731)],
                               ["country","population"])
population2DF.createOrReplaceTempView("population2")
population2DF.show()


In [None]:
population = spark.sql("SELECT * FROM population1 UNION select * FROM population2")
population.createOrReplaceTempView("population")

population.show()


### Joining tables

It is also possible to join two or more table. 

Consider thehat we have the following two tables. The first table has a list of countries and their population.

| country | population |
|---------|------------|
| PT | 10276617 |
| ES | 46937060 |
| DE | 83019213 |

The second table has the language spoken in each country.

| country | language |
|---------|----------|
| PT | Portuguese |
| ES | Spanish |
| MX | Spanish |
| AR | Spanish |
| DE | German |
| IT | Italian |
| BR | Portuguese |

If we want to compute the number of persons that speak each language, it would be interesting to have a single table with the country, population and language columns. To this end, we need to combine both of the previous tables (this can also be seen as extending the first table with the values of the second table).

What we want to achieve is the following table, with columns country, population and language: 

| country | population | language |
|---------|------------|----------|
| PT | 10276617 | Portuguese |
| ES | 46937060 | Spanish |
| DE | 83019213 | German |


The ```SELECT * FROM table1 [LEFT | INNER | RIGHT | OUTTER | nothing] JOIN table2 ON condition ....``` joins two tables using the given condition to specify how a row in table1 is joined with rows in table2. By default, join performs an INNER join.

In our example, we want to combine the rows with the same country.

This example is coded in the following cells.

In [None]:
languageDF = spark.createDataFrame( [("PT","Portuguese"), ("ES","Spanish"), ("MX","Spanish"), 
                                     ("AR","Spanish"), ("DE","German"), ("BR","Portuguese")],
                               ["country","language"])
languageDF.createOrReplaceTempView("language")
languageDF.show()



In [None]:
countries1 = spark.sql("""SELECT * FROM population1 
                                JOIN language ON population1.country = language.country""")

countries1.show()

Sometimes, it is convenient to use a shorter table name to refer to columns - this can be done by usins a ```table AS short_name``` or simply ```table short_name```. 

The following code uses this and gets a single entry for the country.

NOTE: you only need to prefix a column name with the name of the table when confusion may arise. In the following code, as population and language columns only occur in one of the table, there is no need to prefix references with the table name.

In [None]:
countries1 = spark.sql("""SELECT p.country, population, language FROM population1 p 
                                    JOIN language l ON p.country = l.country""")

countries1.show()

It is possible to execute any function over the values of joined table. The following code computes the number of persons that speak each language.

In [None]:
langStats = spark.sql("""SELECT language, SUM(population) AS population 
                                FROM population1 p 
                                JOIN language l ON p.country = l.country 
                                GROUP BY language""")
langStats.show()

### Join type : left

The way join works varies depending on the type of join.

In a left join, each row of *table1* is combined with all possible values of *table2*. If no row in the second table matches the joining column of the first, then the value for the columns will be **null**.

For better exemplifying, we start by extending our language table to include one other language for Spain : Catalan.

| country | language |
|---------|----------|
| PT | Portuguese |
| ES | Spanish |
| ES | Catalan |
| MX | Spanish |
| AR | Spanish |
| DE | German |
| IT | Italian |
| BR | Portuguese |


In [None]:
languageExtDF = spark.createDataFrame( [("PT","Portuguese"), ("ES","Spanish"), ("ES","Catalan"), ("MX","Spanish"), 
                                     ("AR","Spanish"), ("DE","German"), ("IT","Italian"), ("BR","Portuguese")],
                               ["country","language"])
languageExtDF.createOrReplaceTempView("languageExt")
languageExtDF.show()



In [None]:
countries = spark.sql("""SELECT * FROM population p LEFT JOIN language l
                                ON p.country = l.country""")

countries.show()

The line for Uruguay (UY) has **null** in the language column.


### Join type : right

In a right join, each row of *table* is combined with all possible values in the first *table*. If no row in the first table matches the joining column of the second, then the value for the columns will be **null**.



In [None]:
countries = spark.sql("""SELECT * FROM population p RIGHT JOIN language l
                                ON p.country = l.country""")

countries.show()


### Join type : inner (default)

In an inner join, each row of *table1* is combined with all possible values in the second *table*. If no row in the second table matches the joining column of the first, then the row will not be part of the result.



In [None]:
countries = spark.sql("""SELECT * FROM population p INNER JOIN language l
                                ON p.country = l.country""")
countries.show()

# Just check that INNER is the default mode
countries = spark.sql("""SELECT * FROM population p JOIN language l
                                ON p.country = l.country""")
countries.show()



### Join type : outer

In an outter join, each row of *table1* is combined with all possible values in the second *table*. If no row exists in any of the dataframes, both row will appear in the final result.



In [None]:
countries = spark.sql("""SELECT * FROM population p FULL OUTER JOIN language l
                                ON p.country = l.country""")
countries.show()

## Exercises

File **countries.csv** has information on countries, including the continent.

Using this file and compute and plot statistics on cases and deaths by 

In [None]:
countriesInfoFileName = os.path.join( "data", "countries.csv")

# Read a CSV file into a DataFrame
countriesInfoDF = spark.read.option("header", True).option("inferSchema",True).csv(countriesInfoFileName)

## Register Dataframe as a temporary view
countriesInfoDF.createOrReplaceTempView("countriesInfo")

countriesInfoDF.show(5)

### Exercise 1

Based on the data in the **ALL-covid.csv** and **countries.csv**, compute and plot information on COVID by continent.


In [None]:
allCovidFileName = os.path.join( "data", "ALL-covid.csv")

# Read a CSV file into a DataFrame
allCovidDF = spark.read.option("header", True).option("inferSchema",True).csv(allCovidFileName)

# Converts the date into the datetime type
allCovidDF = allCovidDF.withColumn("date",to_date(col("date"), "yyyy-MM-dd"))

allCovidDF.createOrReplaceTempView("covidALL")



### Exercise 2

Select at least four countries - find which pair of countries have shown a more direct correlation for the cases of COVID.

In [None]:
#Let's stop the spark session
spark.stop()