<img src="https://raw.githubusercontent.com/andre-marcos-perez/ebac-course-utils/main/media/logo/newebac_logo_black_half.png" alt="ebac-logo">

---

# **Módulo** | Computação em Nuvem III
Caderno de **Exercícios**<br>
Professor [André Perez](https://www.linkedin.com/in/andremarcosperez/)<br>
Aluno [Marcos Pereira](https://www.linkedin.com/in/marcos-pereira-b84160300/)<br>

---

# **Exercícios**


---

## 1\. Apache Spark

### **1.1. Instalação**

Spark é uma aplicação desenvolvida na linguagem de programação [Scala](https://www.scala-lang.org), que funciona em uma máquina virtual [Java](https://www.java.com/) (JVM). Por isso, é necessário fazer o *download* da aplicação e instalar o Java em todas as máquinas (nós) do *cluster*.

> **Nota**: Mesmo sendo uma aplicação Scala, o Spark disponibiliza APIs de integração em diversas linguagens de programação. O pacote Python [PySpark](https://pypi.org/project/pyspark/) é a API para Python. Com ele, é possível interagir com o Spark como se fosse uma aplicação Python nativa. A API é similar ao pacote Pandas e sua documentação pode ser encontrada neste [link](https://spark.apache.org/docs/latest/api/python/).

 - Download do Spark, versão 3.3.2.

In [2]:
%%capture
!wget -q https://archive.apache.org/dist/spark/spark-3.3.2/spark-3.3.2-bin-hadoop3.tgz
!tar xf spark-3.3.2-bin-hadoop3.tgz && rm spark-3.3.2-bin-hadoop3.tgz

In [3]:
%%capture
!apt-get remove openjdk*
!apt-get update --fix-missing
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [4]:
!pip install -q pyspark==3.3.2

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m281.4/281.4 MB[0m [31m3.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m199.7/199.7 kB[0m [31m10.6 MB/s[0m eta [36m0:00:00[0m
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone
[31mERROR: pip's dependency resolver does not currently take into account all the packages that are installed. This behaviour is the source of the following dependency conflicts.
dataproc-spark-connect 0.7.5 requires pyspark[connect]~=3.5.1, but you have pyspark 3.3.2 which is incompatible.[0m[31m
[0m

### **1.2. Configuração**

Na etapa de configuração, é necessário configurar as máquinas (nós) do *cluster* para que tanto a aplicação do Spark quanto a instalação do Java possam ser encontrados pelo PySpark e, consequentemente, pelo Python. Para isso, basta preencher as variáveis de ambiente `JAVA_HOME` e `SPARK_HOME` com o seus respectivos caminhos de instalação.

In [5]:
import os

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.3.2-bin-hadoop3"

Por fim, para conectar o PySpark (e o Python) ao Spark e ao Java, pode-se utilizar o pacote Python [FindSpark](https://pypi.org/project/findspark/).

In [6]:
!pip install -q findspark==1.4.2

O método `init()` injeta as variáveis de ambiente `JAVA_HOME` e `SPARK_HOME` no ambiente de execução Python, permitindo assim a correta conexão entre o pacote PySpark com aplicação Spark.

In [7]:
import findspark

findspark.init()

## 2\. Data Wrangling

### **2.1. Conexão**

Com o *cluster* devidamente configurado, vamos criar uma aplicação Spark. O objeto `SparkSession` do pacote PySpark (e seu atributo `builder`) auxiliam na criação da aplicação:

 - `master`: endereço (local ou remoto) do *cluster*;
 - `appName`: nome da aplicação;
 - `getOrCreate`: método que de fato cria os recursos e instância a aplicação.

In [8]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[*]").appName("pyspark-notebook").getOrCreate()

### **2.2. Data**

A base de dados presente neste [link](https://www.kaggle.com/datasets/bank-of-england/a-millennium-of-macroeconomic-data) contem dados macroeconômicos sobre o Reino Unido desde o século 13.

Com o objeto `SparkSession` devidamente instanciado, podemos começar a interagir com os dados utilizando os recursos do *cluster* através de uma estrutura de dados que já conhecemos: `DataFrames`:
Faça o download dos dados utilizando a máquina virutal do Google Colab com o código abaixo.

In [9]:
!wget -q "https://raw.githubusercontent.com/cluster-apps-on-docker/spark-standalone-cluster-on-docker/master/build/workspace/data/uk-macroeconomic-data.csv" -O "uk-macroeconomic-data.csv"

In [10]:
data = spark.read.csv(path="uk-macroeconomic-data.csv", sep=",", header=True)

In [11]:
data.show()

+-----------+------------------------------------+-----------------------------------+-------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------+------------------------------------------------------------------------------+----------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------+--------------------+--------------------+--------------------------+-------------------------------------------------+--------------------+--------------------+---------------------------------------+-------------------------------+---------------------------------+------------------+--------------------+----------+-----------------+---------------------------+------------------------------

In [12]:
data.printSchema()

root
 |-- Description: string (nullable = true)
 |-- Real GDP of England at market prices: string (nullable = true)
 |-- Real GDP of England at factor cost : string (nullable = true)
 |-- Real UK GDP at market prices, geographically-consistent estimate based on post-1922 borders: string (nullable = true)
 |-- Real UK GDP at factor cost, geographically-consistent estimate based on post-1922 borders: string (nullable = true)
 |-- Index of real UK GDP at factor cost - based on changing political boundaries, : string (nullable = true)
 |-- Composite estimate of English and (geographically-consistent) UK real GDP at factor cost: string (nullable = true)
 |-- HP-filter of log of real composite estimate of English and UK real GDP at factor cost: string (nullable = true)
 |-- Real UK gross disposable national income at market prices, constant border estimate: string (nullable = true)
 |-- Real consumption: string (nullable = true)
 |-- Real investment: string (nullable = true)
 |-- Stockbuildi

### **2.3. Wrangling**

Vamos utilizar a API Python do Spark, o pacote PySpark, para limpar os dados, processar os dados para que a base de dados final apresente os valores da taxa de desemprego (`Unemployment rate`) e população (`Population (GB+NI)`) estejam ordenados por ano decrescente:

> **Nota**: Atente-se sempre a natureza distribuída das operações.

```csv
year,population,unemployment_rate
...,...,...
```

In [13]:
data.count()

841

In [14]:
data.columns

['Description',
 'Real GDP of England at market prices',
 'Real GDP of England at factor cost ',
 'Real UK GDP at market prices, geographically-consistent estimate based on post-1922 borders',
 'Real UK GDP at factor cost, geographically-consistent estimate based on post-1922 borders',
 'Index of real UK GDP at factor cost - based on changing political boundaries, ',
 'Composite estimate of English and (geographically-consistent) UK real GDP at factor cost',
 'HP-filter of log of real composite estimate of English and UK real GDP at factor cost',
 'Real UK gross disposable national income at market prices, constant border estimate',
 'Real consumption',
 'Real investment',
 'Stockbuilding contribution',
 'Real government consumption of goods and services',
 'Export volumes',
 'Import volumes',
 'Nominal GDP of England at market prices',
 'Nominal UK GDP at market prices',
 'Nominal UK GDP at market prices.1',
 'Population (GB+NI)',
 'Population (England)',
 'Employment',
 'Unemployment

In [15]:
len(data.columns)

77

Com método `select` seleciona colunas do `DataFrame`. Já o método `withColumnRenamed` renomeia colunas.

In [16]:
data = data.select(["Description", "Population (GB+NI)", "Unemployment rate"])

In [17]:
data = data.\
  withColumnRenamed("Description", 'year').\
  withColumnRenamed("Population (GB+NI)", "population").\
  withColumnRenamed("Unemployment rate", "unemployment_rate")

In [18]:
data.show(n=10)

+-----+----------+-----------------+
| year|population|unemployment_rate|
+-----+----------+-----------------+
|Units|      000s|                %|
| 1209|      null|             null|
| 1210|      null|             null|
| 1211|      null|             null|
| 1212|      null|             null|
| 1213|      null|             null|
| 1214|      null|             null|
| 1215|      null|             null|
| 1216|      null|             null|
| 1217|      null|             null|
+-----+----------+-----------------+
only showing top 10 rows



Com método `filter` seleciona linhas do `DataFrame` baseado no conteúdo de uma coluna.

In [19]:
data_description = data.filter(data['year'] == 'Units')

In [20]:
data_description.show(n=10)

+-----+----------+-----------------+
| year|population|unemployment_rate|
+-----+----------+-----------------+
|Units|      000s|                %|
+-----+----------+-----------------+



In [21]:
(data.count(), len(data.columns))

(841, 3)

In [22]:
(data_description.count(), len(data_description.columns))

(1, 3)

Com método `join` faz a junção distribuída de dois `DataFrames`. Já o método `broadcast` "marca" um `DataFrame` como "pequeno" e força o Spark a trafega-lo pela rede.

In [23]:
from pyspark.sql.functions import broadcast

In [24]:
data = data.join(other=broadcast(data_description), on=['year'], how='left_anti')

In [25]:
data.show(n=10)

+----+----------+-----------------+
|year|population|unemployment_rate|
+----+----------+-----------------+
|1209|      null|             null|
|1210|      null|             null|
|1211|      null|             null|
|1212|      null|             null|
|1213|      null|             null|
|1214|      null|             null|
|1215|      null|             null|
|1216|      null|             null|
|1217|      null|             null|
|1218|      null|             null|
+----+----------+-----------------+
only showing top 10 rows



Com método `dropna` remove todas as linhas que apresentarem ao menos um valor nulo.

In [26]:
data = data.dropna()

In [27]:
data.show(n=10)

+----+----------+-----------------+
|year|population|unemployment_rate|
+----+----------+-----------------+
|1855|     23241|             3.73|
|1856|     23466|             3.52|
|1857|     23689|             3.95|
|1858|     23914|             5.23|
|1859|     24138|             3.27|
|1860|     24360|             2.94|
|1861|     24585|             3.72|
|1862|     24862|             4.68|
|1863|     25142|             4.15|
|1864|     25425|             2.99|
+----+----------+-----------------+
only showing top 10 rows



Com método `withColumn` ajuda a criar novas colunas.

In [28]:
data = data.withColumn('century', 1 + (data['year']/100).cast('int'))

In [29]:
data.select(['century', 'year']).groupBy('century').agg({'year': 'count'}).show()

+-------+-----------+
|century|count(year)|
+-------+-----------+
|     20|        100|
|     19|         45|
|     21|         17|
+-------+-----------+



Com método `collect` é uma ação que coleta os resultados dos nós e retorna para o Python.

In [30]:
timing = data.select(['century', 'year']).groupBy('century').agg({'year': 'count'}).collect()

In [31]:
timing

[Row(century=20, count(year)=100),
 Row(century=19, count(year)=45),
 Row(century=21, count(year)=17)]

In [32]:
timing[0].asDict()

{'century': 20, 'count(year)': 100}

Com método `write.csv` persiste o `DataFrame` em formato `csv`. Já o método `repartition` controla o número de partições da escrita.

In [33]:
data.repartition('century').write.csv(path="uk-macroeconomic-data-clean", sep=",", header=True, mode="overwrite")