<img src="https://raw.githubusercontent.com/andre-marcos-perez/ebac-course-utils/main/media/logo/newebac_logo_black_half.png" alt="ebac-logo">

---

# **Módulo** | Computação em Nuvem III
Caderno de **Exercícios**<br>
Professor [André Perez](https://www.linkedin.com/in/andremarcosperez/)

---

# **Tópicos**

<ol type="1">
  <li>Introdução;</li>
  <li>Apache Spark;</li>
  <li>Data Wrangling com Spark.</li>
</ol>

---

# **Exercícios**

## 1\. Apache Spark

Replique as atividades do item 2.1 e 2.2 para instalar e configurar um cluster Apache Spark na máquina virtual do Google Colab.

* Download do Spark, versão 3.0.0.

In [1]:
%%capture
!wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop2.7.tgz
!tar xf spark-3.0.0-bin-hadoop2.7.tgz && rm spark-3.0.0-bin-hadoop2.7.tgz

 - Download e instalação do Java, versão 8.

In [2]:
%%capture
!apt-get remove openjdk*
!apt-get update --fix-missing
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

> **Nota**: a versão do PySpark deve ser a mesma que a versão da aplicação Spark.

In [3]:
!pip install -q pyspark==3.0.0

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m204.7/204.7 MB[0m [31m6.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K   [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m198.6/198.6 kB[0m [31m12.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


### **Configuração**

Na etapa de configuração, é necessário configurar as máquinas (nós) do cluster para que tanto a aplicação do Spark quanto a instalação do Java possam ser encontrados pelo PySpark e, consequentemente, pelo Python. Para isso, basta preencher as variáveis de ambiente JAVA_HOME e SPARK_HOME com o seus respectivos caminhos de instalação.

In [4]:
import os

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop2.7"

Por fim, para conectar o PySpark (e o Python) ao Spark e ao Java, pode-se utilizar o pacote Python [FindSpark](https://pypi.org/project/findspark/).

In [5]:
!pip install -q findspark==1.4.2

O método `init()` injeta as variáveis de ambiente `JAVA_HOME` e `SPARK_HOME` no ambiente de execução Python, permitindo assim a correta conexão entre o pacote PySpark com aplicação Spark.

In [6]:
import findspark

findspark.init()

## 2\. Data Wrangling

A base de dados presente neste [link](https://www.kaggle.com/datasets/bank-of-england/a-millennium-of-macroeconomic-data) contem dados macroeconômicos sobre o Reino Unido desde o século 13.

**2.1\. Data**

Faça o download dos dados utilizando a máquina virutal do Google Colab com o código abaixo.

In [7]:
!wget -q "https://raw.githubusercontent.com/cluster-apps-on-docker/spark-standalone-cluster-on-docker/master/build/workspace/data/uk-macroeconomic-data.csv" -O "uk-macroeconomic-data.csv"

**2.2. Wrangling**

Processe os dados para que a base de dados final apresente os valores da taxa de desemprego (`Unemployment rate`) e população (`Population (GB+NI)`) estejam ordenados por ano decrescente:

```csv
year,population,unemployment_rate
...,...,...
```

Para isso, utilize:

 - Pandas

In [8]:
#Importando pacote
import pandas as pd


#Criando o DataFrame
pandas_data = pd.read_csv("uk-macroeconomic-data.csv")

#Selecionando as colunas
pandas_data = pandas_data.loc[:,('Description',
                            'Population (GB+NI)',
                            'Unemployment rate')]

#Renomeando as colunas
pandas_data.rename(columns= { 'Description':'Year',
                              'Population (GB+NI)':'Population',
                              'Unemployment rate':'Unemployment'},
                  inplace=True)

#Limpando e ordenando os dados
pandas_data = pandas_data.dropna().drop(0).sort_values(by='Year', ascending=False, ignore_index=True)

#Visualizando os dados
pandas_data.head(5)

Unnamed: 0,Year,Population,Unemployment
0,2016,65573,4.9
1,2015,65110,5.38
2,2014,64597,6.18
3,2013,64106,7.61
4,2012,63705,7.97


 - PySpark

In [10]:
#Importando pacotes
from pyspark.sql import SparkSession # import SparkSession

#Criando a SparkSession
spark = SparkSession.builder.appName("UKMacroData").getOrCreate() # create SparkSession object

#Criando o DataFrame
pyspark_data = spark.read.csv(path="uk-macroeconomic-data.csv", sep=",", header=True)

#Selecionando as colunas
pyspark_data = pyspark_data.select(['Description',
                            'Population (GB+NI)',
                            'Unemployment rate'])

#Renomeando as colunas
pyspark_data = pyspark_data.withColumnRenamed("Description", "Year")\
                           .withColumnRenamed("Population (GB+NI)", "Population")\
                           .withColumnRenamed("Unemployment rate", "Unemployment")

#Limpando e ordenando os dados
pyspark_data = pyspark_data.dropna().filter(pyspark_data['Year'] != 'Units').orderBy(pyspark_data['Year'].desc())

#Visualizando os dados
pyspark_data.show(5)

+----+----------+------------+
|Year|Population|Unemployment|
+----+----------+------------+
|2016|     65573|        4.90|
|2015|     65110|        5.38|
|2014|     64597|        6.18|
|2013|     64106|        7.61|
|2012|     63705|        7.97|
+----+----------+------------+
only showing top 5 rows

