<img src="https://raw.githubusercontent.com/andre-marcos-perez/ebac-course-utils/main/media/logo/newebac_logo_black_half.png" alt="ebac-logo">

---

# **Módulo** | Computação em Nuvem III
Caderno de **Exercícios**<br> 
Professor [André Perez](https://www.linkedin.com/in/andremarcosperez/)

---

# **Tópicos**

<ol type="1">
  <li>Introdução;</li>
  <li>Apache Spark;</li>
  <li>Data Wrangling com Spark.</li>
</ol>

---

# **Exercícios**

## 1\. Apache Spark

Replique as atividades do item 2.1 e 2.2 para instalar e configurar um cluster Apache Spark na máquina virtual do Google Colab.

<li>Instalação da JVM do Spark:

In [1]:
%%capture
# baixando o Spark versão 3.0.0:
!wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop2.7.tgz
# Extraindo o arquivo:
!tar xf spark-3.0.0-bin-hadoop2.7.tgz
# Excluindo o arquivo .tgz:
!rm spark-3.0.0-bin-hadoop2.7.tgz

<li>Instalando o Java (versão 8):

In [2]:
%%capture
!apt-get remove openjdk*
!apt-get update --fix-missing
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

<li>Instalando o PySpark (versão 3.0.0):

In [3]:
!pip install -q pyspark==3.0.0

[K     |████████████████████████████████| 204.7 MB 26 kB/s 
[K     |████████████████████████████████| 198 kB 44.7 MB/s 
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


<li>Configurando a máquina do colab para reconhecer o Spark:

In [4]:
import os
# fazendo o sistema reconhecer o java e spark:
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop2.7"

In [5]:
# Para conhecetar o Python ao Java e Spark, e permitir a sua correta conexão entre o pacotes vai ser utilizado o pacote findspark do Python:
# Instalando o pacote findspark: 
!pip install -q findspark==1.4.2

In [6]:
# Utilizando o método INIT para inicializar os ambientes Java (rodar a JVM) e Spark:
import findspark
findspark.init()

## 2\. Data Wrangling

A base de dados presente neste [link](https://www.kaggle.com/datasets/bank-of-england/a-millennium-of-macroeconomic-data) contem dados macroeconômicos sobre o Reino Unido desde o século 13.

**2.1\. Data**

Faça o download dos dados utilizando a máquina virutal do Google Colab com o código abaixo.

In [7]:
!wget -q "https://raw.githubusercontent.com/cluster-apps-on-docker/spark-standalone-cluster-on-docker/master/build/workspace/data/uk-macroeconomic-data.csv" -O "uk-macroeconomic-data.csv"

**2.2. Wrangling**

Processe os dados para que a base de dados final apresente os valores da taxa de desemprego (`Unemployment rate`) e população (`Population (GB+NI)`) estejam ordenados por ano decrescente:

```csv
year,population,unemployment_rate
...,...,...
```

Para isso, utilize:

 - Pandas

In [8]:
import pandas as pd
import numpy as np

In [9]:
# carregando os dados macroeconomicos:
df_pandas = pd.read_csv('uk-macroeconomic-data.csv', sep=',')

In [10]:
# visualizando o dataframe:
df_pandas.head()

Unnamed: 0,Description,Real GDP of England at market prices,Real GDP of England at factor cost,"Real UK GDP at market prices, geographically-consistent estimate based on post-1922 borders","Real UK GDP at factor cost, geographically-consistent estimate based on post-1922 borders","Index of real UK GDP at factor cost - based on changing political boundaries,",Composite estimate of English and (geographically-consistent) UK real GDP at factor cost,HP-filter of log of real composite estimate of English and UK real GDP at factor cost,"Real UK gross disposable national income at market prices, constant border estimate",Real consumption,...,UK Public sector debt.1,UK Public sector debt.2,Central Government Gross Debt,Central Government Gross Debt.1,Trade deficit,Trade deficit.1,Current account,Current account .1,Current account deficit including estimated non-monetary bullion flows,Current account deficit including estimated non-monetary bullion flows.1
0,Units,"£mn, Chained Volume measure, 2013 prices","£mn, Chained Volume measure, 2013 prices","£mn, Chained Volume measure, 2013 prices","£mn, Chained Volume measure, 2013 prices","GB before 1801, GB+Ireland 1801-1920, GB + Nor...",2013=100,approx. % difference from trend,"£mn, Chained Volume measure, 2013 prices. Nom...","£mn, Chained Volume measure, 2013 prices",...,as a % of nominal GDP: measure 1,as a % of nominal GDP: measure 2,"Financial year end, nominal par value £mn","Financial year end, market value £mn",£mn,as a % of nominal GDP,£mn,as a % of nominal GDP,£mn,as a % of nominal GDP
1,1209,,,,,,,,,,...,,,,,,,,,,
2,1210,,,,,,,,,,...,,,,,,,,,,
3,1211,,,,,,,,,,...,,,,,,,,,,
4,1212,,,,,,,,,,...,,,,,,,,,,


In [11]:
# Selecionando as colunas:
df_pandas = df_pandas[['Description', 'Population (GB+NI)','Unemployment rate']]
# mudando o nome das colunas:
df_pandas.rename(columns={'Description':'year', 'Population (GB+NI)':'population','Unemployment rate':'unemployment_rate'}, inplace=True)

In [12]:
# Eliminando a primeira linha de todas as colunas:
df_pandas.drop([0], axis=0, inplace=True)
# Outra forma de elinhar a primeira linha, seria fazer uma consulta .query() 
# e colocar o nome de um coluna e o valor da primeira linha.
# >> df_pandas.query('year != "Units"')

# Eliminando os elementos nulos:
df_pandas.dropna(inplace=True)

In [13]:
# verificando o schema do dataframe:
df_pandas.info()

<class 'pandas.core.frame.DataFrame'>
Int64Index: 162 entries, 647 to 808
Data columns (total 3 columns):
 #   Column             Non-Null Count  Dtype 
---  ------             --------------  ----- 
 0   year               162 non-null    object
 1   population         162 non-null    object
 2   unemployment_rate  162 non-null    object
dtypes: object(3)
memory usage: 5.1+ KB


In [14]:
# ajudando os types das colunas:
df_pandas = df_pandas.astype({'year':np.int32, 'population':np.int64, 'unemployment_rate':np.float32}, errors='ignore')

In [15]:
# Organizando o dataframe pela coluna year de forma decrescente:
df_pandas.sort_values(by='year', ignore_index=True, ascending=False, inplace=True)

In [16]:
df_pandas.head()

Unnamed: 0,year,population,unemployment_rate
0,2016,65573,4.9
1,2015,65110,5.38
2,2014,64597,6.18
3,2013,64106,7.61
4,2012,63705,7.97


 - PySpark

In [17]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.sql.streaming import IntegerType, DoubleType


In [18]:
# Inicialização do spark:
spark = SparkSession.\
 builder.\
 master("local[*]").\
 appName("pyspark-notebook").\
 getOrCreate()

In [19]:
# carregando os dados macroeconomicos:
df_spark = spark.read.format('csv').load('uk-macroeconomic-data.csv', header=True, sep=',', inferSchema=True)

In [20]:
df_spark.show(5) #exibir o dataframe

+-----------+------------------------------------+-----------------------------------+-------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------+------------------------------------------------------------------------------+----------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------+--------------------+--------------------+--------------------------+-------------------------------------------------+--------------------+--------------------+---------------------------------------+-------------------------------+---------------------------------+------------------+--------------------+----------+-----------------+---------------------------+------------------------------

<li>Selecionando as colunas Description, Population (GB+NI), unemployment rate:


In [21]:
# Selecionando as colunas e mudando seu nome:
df_spark = df_spark.select(
    col('Description').alias('year'),
    col('Population (GB+NI)').alias('population'),
    col('Unemployment rate').alias('unemployment_rate')
)
df_spark.show(5) #exibindo as mudanças

+-----+----------+-----------------+
| year|population|unemployment_rate|
+-----+----------+-----------------+
|Units|      000s|                %|
| 1209|      null|             null|
| 1210|      null|             null|
| 1211|      null|             null|
| 1212|      null|             null|
+-----+----------+-----------------+
only showing top 5 rows



<li>Limpando o DataFrame e retirando a primeira linha:

In [22]:
# Retirando os elementos nulos do dataframe:
df_spark = df_spark.dropna()
df_spark.show(5)

+-----+----------+-----------------+
| year|population|unemployment_rate|
+-----+----------+-----------------+
|Units|      000s|                %|
| 1855|     23241|             3.73|
| 1856|     23466|             3.52|
| 1857|     23689|             3.95|
| 1858|     23914|             5.23|
+-----+----------+-----------------+
only showing top 5 rows



In [23]:
# retirando a primeira linha:
df_spark = df_spark.where(col('year') != "Units")


In [24]:
# verificando os schema do dateframe:
df_spark.printSchema()

root
 |-- year: string (nullable = true)
 |-- population: string (nullable = true)
 |-- unemployment_rate: string (nullable = true)



In [25]:
# alterando os tipos das colunas para seus valores respectivos:
df_spark = df_spark.withColumn('year', col('year').cast('integer')).withColumn('population', col('population').cast('integer')).withColumn('unemployment_rate', col('unemployment_rate').cast('double'))

In [26]:
# Organizando o dataframe em ordem decrescente:
df_spark = df_spark.orderBy(col('year').desc())

In [27]:
# exibindo o dataframe:
df_spark.show(5)

+----+----------+-----------------+
|year|population|unemployment_rate|
+----+----------+-----------------+
|2016|     65573|              4.9|
|2015|     65110|             5.38|
|2014|     64597|             6.18|
|2013|     64106|             7.61|
|2012|     63705|             7.97|
+----+----------+-----------------+
only showing top 5 rows

