<img src="https://raw.githubusercontent.com/andre-marcos-perez/ebac-course-utils/main/media/logo/newebac_logo_black_half.png" alt="ebac-logo">

---

# **Módulo** | Computação em Nuvem III
Caderno de **Exercícios**<br>
Professor [André Perez](https://www.linkedin.com/in/andremarcosperez/)

---

# **Tópicos**

<ol type="1">
  <li>Introdução;</li>
  <li>Apache Spark;</li>
  <li>Data Wrangling com Spark.</li>
</ol>

---

# **Exercícios**

## 1\. Apache Spark

Replique as atividades do item 2.1 e 2.2 para instalar e configurar um cluster Apache Spark na máquina virtual do Google Colab.

## 2\. Data Wrangling

A base de dados presente neste [link](https://www.kaggle.com/datasets/bank-of-england/a-millennium-of-macroeconomic-data) contem dados macroeconômicos sobre o Reino Unido desde o século 13.

**2.1\. Data**

Faça o download dos dados utilizando a máquina virutal do Google Colab com o código abaixo.

In [11]:
!wget -q "https://raw.githubusercontent.com/cluster-apps-on-docker/spark-standalone-cluster-on-docker/master/build/workspace/data/uk-macroeconomic-data.csv" -O "uk-macroeconomic-data.csv"

**2.2. Wrangling**

Processe os dados para que a base de dados final apresente os valores da taxa de desemprego (`Unemployment rate`) e população (`Population (GB+NI)`) estejam ordenados por ano decrescente:

```csv
year,population,unemployment_rate
...,...,...
```

Para isso, utilize:

 - Pandas

In [177]:
# Leitura dos Dados
import pandas as pd

df = pd.read_csv("uk-macroeconomic-data.csv")

In [178]:
# Mudando os nomes das colunas
df = df.rename(columns={
    'Description': 'year',
    'Population (GB+NI)' : 'population',
    'Unemployment rate': 'unemployment_rate'
    })

In [179]:
# Selecionando somente as colunas que vamos trabalhar
df = df[["year", "population", "unemployment_rate"]]

In [180]:
# Para remoção dos valores nulos, preciso jogar os dados das colunas para int ou float
df.info()

<class 'pandas.core.frame.DataFrame'>
RangeIndex: 841 entries, 0 to 840
Data columns (total 3 columns):
 #   Column             Non-Null Count  Dtype 
---  ------             --------------  ----- 
 0   year               809 non-null    object
 1   population         318 non-null    object
 2   unemployment_rate  163 non-null    object
dtypes: object(3)
memory usage: 19.8+ KB


In [181]:
# Separando a primeira a linha do data frame
df_description = df[df['year'] == 'Units']
print('Após df_description:')
print(df_description)

Após df_description:
    year population unemployment_rate
0  Units       000s                 %


 - PySpark

In [1]:
%%capture
!wget -q https://dlcdn.apache.org/spark/spark-3.5.3/spark-3.5.3-bin-hadoop3.tgz
!tar xf spark-3.5.3-bin-hadoop3.tgz && rm spark-3.5.3-bin-hadoop3.tgz


In [53]:
%%capture
!apt-get remove openjdk*
!apt-get update --fix-missing
!apt-get install openjdk-21-jdk-headless -qq > /dev/null

In [3]:
!pip install -q pyspark==3.5.3

In [8]:
import os

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-21-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.3-bin-hadoop3"

In [5]:
!pip install -q findspark

In [6]:
import findspark

findspark.init()

In [119]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[*]").appName("pyspark-notebook").getOrCreate()

In [182]:
data = spark.createDataFrame(df)
data_description = spark.createDataFrame(df_description)

print('Antes de apagar a primeira linha: {}'.format(data.count()))

Antes de apagar a primeira linha: 841


In [196]:
from pyspark.sql.functions import broadcast
from pyspark.sql.functions import col

In [184]:
data = data.join(other=broadcast(data_description), on=['year'], how='left_anti')

print('Após apagar a primeira linha: {}'.format(data.count()))

Após apagar a primeira linha: 840


In [158]:
# Os dados das colunas estão como string, sendo necessário mudar para remover os valores nulos
data.printSchema()

root
 |-- year: string (nullable = true)
 |-- population: string (nullable = true)
 |-- unemployment_rate: string (nullable = true)



In [185]:
data = data.withColumn('population', col('population').cast('float'))
data = data.withColumn('unemployment_rate', col('unemployment_rate').cast('float'))

In [186]:
data = data.dropna(subset=['year', 'population', 'unemployment_rate'])

In [194]:
data_sorted = data.orderBy(col('year').desc())

In [195]:
data_sorted.write.csv(path ='uk-macroeconomic-data-clean.csv', sep=',', header=True, mode='overwrite')