<img src="https://raw.githubusercontent.com/andre-marcos-perez/ebac-course-utils/main/media/logo/newebac_logo_black_half.png" alt="ebac-logo">

---

# **Módulo** | Computação em Nuvem III
Caderno de **Exercícios**<br>
Professor [André Perez](https://www.linkedin.com/in/andremarcosperez/)

---

# **Tópicos**

<ol type="1">
  <li>Introdução;</li>
  <li>Apache Spark;</li>
  <li>Data Wrangling com Spark.</li>
</ol>

---

# **Exercícios**

## 1\. Apache Spark

Replique as atividades do item 2.1 e 2.2 para instalar e configurar um cluster Apache Spark na máquina virtual do Google Colab.

In [1]:
# Download do Spark na mesma versão estável mais recente e sem conflitos (spark-3.5.0)

%%capture
!wget -q https://archive.apache.org/dist/spark/spark-3.5.0/spark-3.5.0-bin-hadoop3.tgz
!tar xf spark-3.5.0-bin-hadoop3.tgz && rm spark-3.5.0-bin-hadoop3.tgz

In [2]:
# Download e instalação do Java, versão 8

%%capture
!apt-get remove openjdk*
!apt-get update --fix-missing
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [3]:
# Download do PySpark na mesma versão estável mais recente e sem conflitos (pyspark-3.5.4)

!pip install pyspark==3.5.4



In [4]:
import os

os.environ["SPARK_HOME"] = "/content/spark-3.5.0-bin-hadoop3"
os.environ["PYSPARK_PYTHON"] = "python3"

In [5]:
!pip install -q findspark==1.4.2

In [6]:
import findspark

findspark.init()

In [9]:
from pyspark.sql import SparkSession

spark = SparkSession.builder \
    .master("local[*]") \
    .appName("pyspark-notebook") \
    .getOrCreate()

print("Spark rodando com sucesso!")

Spark rodando com sucesso!


## 2\. Data Wrangling

A base de dados presente neste [link](https://www.kaggle.com/datasets/bank-of-england/a-millennium-of-macroeconomic-data) contem dados macroeconômicos sobre o Reino Unido desde o século 13.

**2.1\. Data**

Faça o download dos dados utilizando a máquina virutal do Google Colab com o código abaixo.

In [10]:
!wget -q "https://raw.githubusercontent.com/cluster-apps-on-docker/spark-standalone-cluster-on-docker/master/build/workspace/data/uk-macroeconomic-data.csv" -O "uk-macroeconomic-data.csv"

**2.2. Wrangling**

Processe os dados para que a base de dados final apresente os valores da taxa de desemprego (`Unemployment rate`) e população (`Population (GB+NI)`) estejam ordenados por ano decrescente:

```csv
year,population,unemployment_rate
...,...,...
```

In [11]:
data = spark.read.csv(path="uk-macroeconomic-data.csv", sep=",", header=True)

In [12]:
data.show()

+-----------+------------------------------------+-----------------------------------+-------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------+------------------------------------------------------------------------------+----------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------+--------------------+--------------------+--------------------------+-------------------------------------------------+--------------------+--------------------+---------------------------------------+-------------------------------+---------------------------------+------------------+--------------------+----------+-----------------+---------------------------+------------------------------

In [13]:
data.count() #numero de linhas

841

In [14]:
data.columns #lista python de todas as colunas

['Description',
 'Real GDP of England at market prices',
 'Real GDP of England at factor cost ',
 'Real UK GDP at market prices, geographically-consistent estimate based on post-1922 borders',
 'Real UK GDP at factor cost, geographically-consistent estimate based on post-1922 borders',
 'Index of real UK GDP at factor cost - based on changing political boundaries, ',
 'Composite estimate of English and (geographically-consistent) UK real GDP at factor cost',
 'HP-filter of log of real composite estimate of English and UK real GDP at factor cost',
 'Real UK gross disposable national income at market prices, constant border estimate',
 'Real consumption',
 'Real investment',
 'Stockbuilding contribution',
 'Real government consumption of goods and services',
 'Export volumes',
 'Import volumes',
 'Nominal GDP of England at market prices',
 'Nominal UK GDP at market prices',
 'Nominal UK GDP at market prices.1',
 'Population (GB+NI)',
 'Population (England)',
 'Employment',
 'Unemployment

In [15]:
len(data.columns) #quantidade de colunas

77

In [16]:
data = data.select(["Description", "Population (GB+NI)", "Unemployment rate"])

In [17]:
#renomear as colunas

data = data.\
  withColumnRenamed("Description", 'year').\
  withColumnRenamed("Population (GB+NI)", "population").\
  withColumnRenamed("Unemployment rate", "unemployment_rate")

In [18]:
data.show(n=10) #equivalente a data.head

#a primeira linha está errada e para não dar inconsistencia, deveremos descartá-la

+-----+----------+-----------------+
| year|population|unemployment_rate|
+-----+----------+-----------------+
|Units|      000s|                %|
| 1209|      NULL|             NULL|
| 1210|      NULL|             NULL|
| 1211|      NULL|             NULL|
| 1212|      NULL|             NULL|
| 1213|      NULL|             NULL|
| 1214|      NULL|             NULL|
| 1215|      NULL|             NULL|
| 1216|      NULL|             NULL|
| 1217|      NULL|             NULL|
+-----+----------+-----------------+
only showing top 10 rows



In [19]:
data_description = data.filter(data['year'] == 'Units')

In [20]:
data_description.show(n=10)

#pegamos essa linha e colocamos em um novo DF

+-----+----------+-----------------+
| year|population|unemployment_rate|
+-----+----------+-----------------+
|Units|      000s|                %|
+-----+----------+-----------------+



In [21]:
(data.count(), len(data.columns)) #DF original

(841, 3)

In [22]:
(data_description.count(), len(data_description.columns)) #DF criado com a linha inconsistente

(1, 3)

 - PySpark

In [23]:
from pyspark.sql.functions import broadcast

In [24]:
data = data.join(other=broadcast(data_description), on=['year'], how='left_anti') #pega o DF original + DF criado e extrai o que é comum nos dois (a linha com inconsistência)

In [25]:
data.show(n=10)

+----+----------+-----------------+
|year|population|unemployment_rate|
+----+----------+-----------------+
|1209|      NULL|             NULL|
|1210|      NULL|             NULL|
|1211|      NULL|             NULL|
|1212|      NULL|             NULL|
|1213|      NULL|             NULL|
|1214|      NULL|             NULL|
|1215|      NULL|             NULL|
|1216|      NULL|             NULL|
|1217|      NULL|             NULL|
|1218|      NULL|             NULL|
+----+----------+-----------------+
only showing top 10 rows



In [26]:
data = data.dropna()

In [27]:
data = data.withColumn('century', 1 + (data['year']/100).cast('int'))

In [28]:
data.select(['century', 'year']).groupBy('century').agg({'year': 'count'}).show() #vou contar quantos anos tem em cada um dos séculos citados

+-------+-----------+
|century|count(year)|
+-------+-----------+
|     20|        100|
|     19|         45|
|     21|         17|
+-------+-----------+



In [29]:
timing = data.select(['century', 'year']).groupBy('century').agg({'year': 'count'}).collect() #coleta os resultados do cluster e traz para o python

In [30]:
timing

[Row(century=20, count(year)=100),
 Row(century=19, count(year)=45),
 Row(century=21, count(year)=17)]

In [31]:
data.repartition('century').write.csv(path="uk-macroeconomic-data-clean", sep=",", header=True, mode="overwrite") #escreve o resultado em um arquivo csv e combina com repartition de nossos dados

In [32]:
# Ordenar os dados por ano em ordem decrescente

ordered_data = data.orderBy(data['year'].desc())

In [33]:
# Selecionar as colunas de interesse e mostrar os resultados

ordered_data.select(['year', 'population', 'unemployment_rate']).show()

+----+----------+-----------------+
|year|population|unemployment_rate|
+----+----------+-----------------+
|2016|     65573|             4.90|
|2015|     65110|             5.38|
|2014|     64597|             6.18|
|2013|     64106|             7.61|
|2012|     63705|             7.97|
|2011|     63285|             8.11|
|2010|     62759|             7.87|
|2009|     62260|             7.61|
|2008|     61824|             5.69|
|2007|     61319|             5.33|
|2006|     60827|             5.42|
|2005|     60413|             4.83|
|2004|     59950|             4.75|
|2003|     59637|             5.01|
|2002|     59366|             5.19|
|2001|     59113|             5.10|
|2000|     58886|             5.46|
|1999|     58684|             5.98|
|1998|     58475|             6.26|
|1997|     58314|             6.97|
+----+----------+-----------------+
only showing top 20 rows



In [34]:
# Salvar o resultado em um arquivo CSV

ordered_data.write.csv(path="uk-macroeconomic-data-ordered", sep=",", header=True, mode="overwrite"  )