<img src="https://raw.githubusercontent.com/andre-marcos-perez/ebac-course-utils/main/media/logo/newebac_logo_black_half.png" alt="ebac-logo">

---

# **Módulo** | Computação em Nuvem III
Caderno de **Exercícios**<br>
Professor [André Perez](https://www.linkedin.com/in/andremarcosperez/)

---

# **Tópicos**

<ol type="1">
  <li>Introdução;</li>
  <li>Apache Spark;</li>
  <li>Data Wrangling com Spark.</li>
</ol>

---

# **Exercícios**

## 1\. Apache Spark

Replique as atividades do item 2.1 e 2.2 para instalar e configurar um cluster Apache Spark na máquina virtual do Google Colab.

* Download do Spark, versão atualizada.

In [1]:
!rm -rf /content/spark-3.5.1*

In [2]:
%%capture
!wget -q https://archive.apache.org/dist/spark/spark-3.5.1/spark-3.5.1-bin-hadoop3.tgz
!tar -xzf spark-3.5.1-bin-hadoop3.tgz

* Instalação do Java 8, versão atualizada.

In [3]:
%%capture
!apt-get update
!apt-get install -y openjdk-8-jdk-headless


* Download e instalação PySpark. Sem necessidade de utilizar o FindSpark.

In [4]:
!pip install -q pyspark==3.5.1

* Configuração das variáveis de ambiente corretamente.

In [5]:
import os

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.5.1-bin-hadoop3"

Com o *cluster* devidamente configurado, vamos criar uma aplicação Spark. O objeto `SparkSession` do pacote PySpark (e seu atributo `builder`) auxiliam na criação da aplicação:

 - `master`: endereço (local ou remoto) do *cluster*;
 - `appName`: nome da aplicação;
 - `getOrCreate`: método que de fato cria os recursos e instância a aplicação.

A forma ideal para inicialização na versão mais recente do Spark.

In [6]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .appName("EBAC Spark 3.5.1") \
    .master("local[*]") \
    .getOrCreate()

Com o objeto SparkSession devidamente instanciado, podemos começar a interagir com os dados utilizando os recursos do cluster através de uma estrutura de dados que já conhecemos: DataFrames:

## 2\. Data Wrangling

A base de dados presente neste [link](https://www.kaggle.com/datasets/bank-of-england/a-millennium-of-macroeconomic-data) contem dados macroeconômicos sobre o Reino Unido desde o século 13.

**2.1\. Data**

Faça o download dos dados utilizando a máquina virutal do Google Colab com o código abaixo.

In [7]:
!wget -q "https://raw.githubusercontent.com/cluster-apps-on-docker/spark-standalone-cluster-on-docker/master/build/workspace/data/uk-macroeconomic-data.csv" -O "uk-macroeconomic-data.csv"

In [14]:
data = spark.read.csv(path="uk-macroeconomic-data.csv", sep=",", header=True)

In [15]:
data.show()

+-----------+------------------------------------+-----------------------------------+-------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------+------------------------------------------------------------------------------+----------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------+--------------------+--------------------+--------------------------+-------------------------------------------------+--------------------+--------------------+---------------------------------------+-------------------------------+---------------------------------+------------------+--------------------+----------+-----------------+---------------------------+------------------------------

In [16]:
data.printSchema()

root
 |-- Description: string (nullable = true)
 |-- Real GDP of England at market prices: string (nullable = true)
 |-- Real GDP of England at factor cost : string (nullable = true)
 |-- Real UK GDP at market prices, geographically-consistent estimate based on post-1922 borders: string (nullable = true)
 |-- Real UK GDP at factor cost, geographically-consistent estimate based on post-1922 borders: string (nullable = true)
 |-- Index of real UK GDP at factor cost - based on changing political boundaries, : string (nullable = true)
 |-- Composite estimate of English and (geographically-consistent) UK real GDP at factor cost: string (nullable = true)
 |-- HP-filter of log of real composite estimate of English and UK real GDP at factor cost: string (nullable = true)
 |-- Real UK gross disposable national income at market prices, constant border estimate: string (nullable = true)
 |-- Real consumption: string (nullable = true)
 |-- Real investment: string (nullable = true)
 |-- Stockbuildi

**2.2. Wrangling**

Processe os dados para que a base de dados final apresente os valores da taxa de desemprego (`Unemployment rate`) e população (`Population (GB+NI)`) estejam ordenados por ano decrescente:

Para isso utilize:

 - PySpark

* Limpeza:

In [17]:
data = data.select(["Description", "Population (GB+NI)", "Unemployment rate"])

In [18]:
data = data.\
  withColumnRenamed("Description", 'year').\
  withColumnRenamed("Population (GB+NI)", "population").\
  withColumnRenamed("Unemployment rate", "unemployment_rate")

In [19]:
data.show(n=10)

+-----+----------+-----------------+
| year|population|unemployment_rate|
+-----+----------+-----------------+
|Units|      000s|                %|
| 1209|      NULL|             NULL|
| 1210|      NULL|             NULL|
| 1211|      NULL|             NULL|
| 1212|      NULL|             NULL|
| 1213|      NULL|             NULL|
| 1214|      NULL|             NULL|
| 1215|      NULL|             NULL|
| 1216|      NULL|             NULL|
| 1217|      NULL|             NULL|
+-----+----------+-----------------+
only showing top 10 rows



In [20]:
data_description = data.filter(data['year'] == 'Units')

In [21]:
data_description.show(n=10)

+-----+----------+-----------------+
| year|population|unemployment_rate|
+-----+----------+-----------------+
|Units|      000s|                %|
+-----+----------+-----------------+



* Junção:

In [22]:
(data.count(), len(data.columns))

(841, 3)

In [23]:
(data_description.count(), len(data_description.columns))

(1, 3)

O método `join` faz a junção distribuída de dois `DataFrames`. Já o método `broadcast` "marca" um `DataFrame` como "pequeno" e força o Spark a trafega-lo pela rede.

In [24]:
from pyspark.sql.functions import broadcast

In [25]:
data = data.join(other=broadcast(data_description), on=['year'], how='left_anti')

In [26]:
data.show(n=10)

+----+----------+-----------------+
|year|population|unemployment_rate|
+----+----------+-----------------+
|1209|      NULL|             NULL|
|1210|      NULL|             NULL|
|1211|      NULL|             NULL|
|1212|      NULL|             NULL|
|1213|      NULL|             NULL|
|1214|      NULL|             NULL|
|1215|      NULL|             NULL|
|1216|      NULL|             NULL|
|1217|      NULL|             NULL|
|1218|      NULL|             NULL|
+----+----------+-----------------+
only showing top 10 rows



O método `dropna` remove todas as linhas que apresentarem ao menos um valor nulo.

In [27]:
data = data.dropna()

In [28]:
data.show(n=10)

+----+----------+-----------------+
|year|population|unemployment_rate|
+----+----------+-----------------+
|1855|     23241|             3.73|
|1856|     23466|             3.52|
|1857|     23689|             3.95|
|1858|     23914|             5.23|
|1859|     24138|             3.27|
|1860|     24360|             2.94|
|1861|     24585|             3.72|
|1862|     24862|             4.68|
|1863|     25142|             4.15|
|1864|     25425|             2.99|
+----+----------+-----------------+
only showing top 10 rows



* Criação de novas colunas:

In [29]:
data = data.withColumn('century', 1 + (data['year']/100).cast('int'))

In [30]:
data.select(['century', 'year']).groupBy('century').agg({'year': 'count'}).show()

+-------+-----------+
|century|count(year)|
+-------+-----------+
|     20|        100|
|     19|         45|
|     21|         17|
+-------+-----------+



O método `collect` é uma ação que coleta os resultados dos nós e retorna para o Python.

In [31]:
timing = data.select(['century', 'year']).groupBy('century').agg({'year': 'count'}).collect()

In [32]:
timing

[Row(century=20, count(year)=100),
 Row(century=19, count(year)=45),
 Row(century=21, count(year)=17)]

In [33]:
timing[0].asDict()

{'century': 20, 'count(year)': 100}

* Escrita:

In [34]:
data.repartition('century').write.csv(path="uk-macroeconomic-data-clean", sep=",", header=True, mode="overwrite")

In [38]:
from pyspark.sql.functions import col

#Ordenando o DataFrame PySpark por 'year' decrescente.
data_final_pyspark = data.orderBy(col("year").desc())

print("DataFrame PySpark ordenado (primeiras 10 linhas):")
data_final_pyspark.show(n=10)

DataFrame PySpark ordenado (primeiras 10 linhas):
+----+----------+-----------------+-------+
|year|population|unemployment_rate|century|
+----+----------+-----------------+-------+
|2016|     65573|             4.90|     21|
|2015|     65110|             5.38|     21|
|2014|     64597|             6.18|     21|
|2013|     64106|             7.61|     21|
|2012|     63705|             7.97|     21|
|2011|     63285|             8.11|     21|
|2010|     62759|             7.87|     21|
|2009|     62260|             7.61|     21|
|2008|     61824|             5.69|     21|
|2007|     61319|             5.33|     21|
+----+----------+-----------------+-------+
only showing top 10 rows



Para isso, utilize:

 - Pandas

1. Importando as bibliotecas necessárias:



In [None]:
import pandas as pd
import numpy as np

2. Carregando os dados em um DataFrame Pandas:

In [39]:
df_pandas = pd.read_csv("uk-macroeconomic-data.csv")
print("DataFrame Pandas carregado.")

DataFrame Pandas carregado.


3. Selecionado as colunas originais e renomeando conforme fiz anteriormente.

In [40]:
df_pandas = df_pandas.select_dtypes(include=[np.object_,np.number]).copy()
df_pandas = df_pandas[[
    "Description",
    "Population (GB+NI)",
    "Unemployment rate"
]].rename(columns={
    "Description": "year",
    "Population (GB+NI)": "population",
    "Unemployment rate": "unemployment_rate"
})
print("Colunas selecionadas e renomeadas.")

Colunas selecionadas e renomeadas.


4. Removendo a linha de cabeçalho "Units" e valores nulos.

In [42]:
df_pandas = df_pandas[df_pandas['year'] != 'Units']
df_pandas = df_pandas.dropna()
df_pandas.head(10)

Unnamed: 0,year,population,unemployment_rate
647,1855,23241,3.73
648,1856,23466,3.52
649,1857,23689,3.95
650,1858,23914,5.23
651,1859,24138,3.27
652,1860,24360,2.94
653,1861,24585,3.72
654,1862,24862,4.68
655,1863,25142,4.15
656,1864,25425,2.99


5. Converter os tipos de dados para número. Evita erros de parsing e permite que operações matemáticas funcionem corretamente.

In [45]:
df_pandas['year'] = pd.to_numeric(df_pandas['year'])
df_pandas['population'] = pd.to_numeric(df_pandas['population'])
df_pandas['unemployment_rate'] = pd.to_numeric(df_pandas['unemployment_rate'])
df_pandas.info()

<class 'pandas.core.frame.DataFrame'>
Index: 162 entries, 647 to 808
Data columns (total 3 columns):
 #   Column             Non-Null Count  Dtype  
---  ------             --------------  -----  
 0   year               162 non-null    int64  
 1   population         162 non-null    int64  
 2   unemployment_rate  162 non-null    float64
dtypes: float64(1), int64(2)
memory usage: 9.1 KB


6. Ordenar os dados por 'year' decrescente:

In [None]:
df_final_pandas = df_pandas.sort_values(by="year", ascending=False)

7. Resultado final:

In [46]:
print(df_final_pandas.head(10))

     year  population  unemployment_rate  century
808  2016       65573               4.90       21
807  2015       65110               5.38       21
806  2014       64597               6.18       21
805  2013       64106               7.61       21
804  2012       63705               7.97       21
803  2011       63285               8.11       21
802  2010       62759               7.87       21
801  2009       62260               7.61       21
800  2008       61824               5.69       21
799  2007       61319               5.33       21


In [47]:
print(df_final_pandas.tail(10))

     year  population  unemployment_rate  century
656  1864       25425               2.99       19
655  1863       25142               4.15       19
654  1862       24862               4.68       19
653  1861       24585               3.72       19
652  1860       24360               2.94       19
651  1859       24138               3.27       19
650  1858       23914               5.23       19
649  1857       23689               3.95       19
648  1856       23466               3.52       19
647  1855       23241               3.73       19
