<img src="https://raw.githubusercontent.com/andre-marcos-perez/ebac-course-utils/main/media/logo/newebac_logo_black_half.png" alt="ebac-logo">

---

# **Módulo** | Computação em Nuvem III

---

# **Tópicos**

<ol type="1">
  <li>Introdução;</li>
  <li>Apache Spark;</li>
  <li>Data Wrangling com Spark.</li>
</ol>

---

# **Exercícios**

## 1\. Apache Spark

Replique as atividades do item 2.1 e 2.2 para instalar e configurar um cluster Apache Spark na máquina virtual do Google Colab.

In [1]:
# Download do Spark

%%capture
!wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop2.7.tgz
!tar xf spark-3.0.0-bin-hadoop2.7.tgz && rm spark-3.0.0-bin-hadoop2.7.tgz

In [2]:
# Download e instalação do Java, versão 8.

%%capture
!apt-get remove openjdk*
!apt-get update --fix-missing
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

In [3]:
# instalando o pyspark

!pip install -q pyspark==3.0.0

[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m204.7/204.7 MB[0m [31m2.2 MB/s[0m eta [36m0:00:00[0m
[?25h  Preparing metadata (setup.py) ... [?25l[?25hdone
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m198.6/198.6 kB[0m [31m16.5 MB/s[0m eta [36m0:00:00[0m
[?25h  Building wheel for pyspark (setup.py) ... [?25l[?25hdone


In [4]:
# Configuração

import os

os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop2.7"

In [5]:
# Conectar PySpark ao Spark e Java

!pip install -q findspark==1.4.2

In [6]:
import findspark

findspark.init()

## 2\. Data Wrangling

A base de dados presente neste [link](https://www.kaggle.com/datasets/bank-of-england/a-millennium-of-macroeconomic-data) contem dados macroeconômicos sobre o Reino Unido desde o século 13.

**2.1\. Data**

Faça o download dos dados utilizando a máquina virutal do Google Colab com o código abaixo.

In [7]:
!wget -q "https://raw.githubusercontent.com/cluster-apps-on-docker/spark-standalone-cluster-on-docker/master/build/workspace/data/uk-macroeconomic-data.csv" -O "uk-macroeconomic-data.csv"

**2.2. Wrangling**

Processe os dados para que a base de dados final apresente os valores da taxa de desemprego (`Unemployment rate`) e população (`Population (GB+NI)`) estejam ordenados por ano decrescente:

```csv
year,population,unemployment_rate
...,...,...
```

Para isso, utilize:

* **PySpark**

In [8]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.master("local[*]").appName("pyspark-notebook").getOrCreate()

* **Exploração**

In [9]:
dataframe = spark.read.csv(path='uk-macroeconomic-data.csv', sep=',', header=True)

In [10]:
dataframe.show(n=10)

+-----------+------------------------------------+-----------------------------------+-------------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------------+------------------------------------------------------------------------------+----------------------------------------------------------------------------------------+-------------------------------------------------------------------------------------+-----------------------------------------------------------------------------------+--------------------+--------------------+--------------------------+-------------------------------------------------+--------------------+--------------------+---------------------------------------+-------------------------------+---------------------------------+------------------+--------------------+----------+-----------------+---------------------------+------------------------------

In [12]:
dataframe.columns

['Description',
 'Real GDP of England at market prices',
 'Real GDP of England at factor cost ',
 'Real UK GDP at market prices, geographically-consistent estimate based on post-1922 borders',
 'Real UK GDP at factor cost, geographically-consistent estimate based on post-1922 borders',
 'Index of real UK GDP at factor cost - based on changing political boundaries, ',
 'Composite estimate of English and (geographically-consistent) UK real GDP at factor cost',
 'HP-filter of log of real composite estimate of English and UK real GDP at factor cost',
 'Real UK gross disposable national income at market prices, constant border estimate',
 'Real consumption',
 'Real investment',
 'Stockbuilding contribution',
 'Real government consumption of goods and services',
 'Export volumes',
 'Import volumes',
 'Nominal GDP of England at market prices',
 'Nominal UK GDP at market prices',
 'Nominal UK GDP at market prices.1',
 'Population (GB+NI)',
 'Population (England)',
 'Employment',
 'Unemployment

In [13]:
len(dataframe.columns)

77

* **Limpeza:**

In [14]:
dataframe = dataframe.select(['Description', 'Population (GB+NI)', 'Unemployment rate'])

In [15]:
# Renomenado as colunas

data = dataframe.\
  withColumnRenamed('Description', 'year').\
  withColumnRenamed('Population (GB+NI)', 'population').\
  withColumnRenamed('Unemployment rate', 'unemployment_rate')

In [16]:
data.show(n=10)

+-----+----------+-----------------+
| year|population|unemployment_rate|
+-----+----------+-----------------+
|Units|      000s|                %|
| 1209|      null|             null|
| 1210|      null|             null|
| 1211|      null|             null|
| 1212|      null|             null|
| 1213|      null|             null|
| 1214|      null|             null|
| 1215|      null|             null|
| 1216|      null|             null|
| 1217|      null|             null|
+-----+----------+-----------------+
only showing top 10 rows



In [17]:
# Selecionando uma linha do dataframe baseado no conteudo da coluna 'year'

data_description = data.filter(data['year'] == 'Units')

In [18]:
data_description.show()

+-----+----------+-----------------+
| year|population|unemployment_rate|
+-----+----------+-----------------+
|Units|      000s|                %|
+-----+----------+-----------------+



In [19]:
(data.count(), len(data.columns))

(841, 3)

* **Junçao:**

In [20]:
from pyspark.sql.functions import broadcast

In [21]:
data = data.join(other=broadcast(data_description), on=['year'], how='left_anti')

In [22]:
data.show(n=10)

+----+----------+-----------------+
|year|population|unemployment_rate|
+----+----------+-----------------+
|1209|      null|             null|
|1210|      null|             null|
|1211|      null|             null|
|1212|      null|             null|
|1213|      null|             null|
|1214|      null|             null|
|1215|      null|             null|
|1216|      null|             null|
|1217|      null|             null|
|1218|      null|             null|
+----+----------+-----------------+
only showing top 10 rows



In [23]:
# Remover todas as linhas que apresentarem ao menos um valor nulo

data = data.dropna()

In [24]:
data.show(n=10)

+----+----------+-----------------+
|year|population|unemployment_rate|
+----+----------+-----------------+
|1855|     23241|             3.73|
|1856|     23466|             3.52|
|1857|     23689|             3.95|
|1858|     23914|             5.23|
|1859|     24138|             3.27|
|1860|     24360|             2.94|
|1861|     24585|             3.72|
|1862|     24862|             4.68|
|1863|     25142|             4.15|
|1864|     25425|             2.99|
+----+----------+-----------------+
only showing top 10 rows



In [25]:
# Criar novas colunas

data = data.withColumn('century', 1 + (data['year']/100).cast('int'))

In [26]:
data.select(['century', 'year']).groupBy('century').agg({'year': 'count'}).show()

+-------+-----------+
|century|count(year)|
+-------+-----------+
|     20|        100|
|     19|         45|
|     21|         17|
+-------+-----------+



In [27]:
# Coletando os resultados dos nós e retorna para o Python

timing = data.select(['century', 'year']).groupBy('century').agg({'year': 'count'}).collect()

In [28]:
timing

[Row(century=20, count(year)=100),
 Row(century=19, count(year)=45),
 Row(century=21, count(year)=17)]

In [29]:
timing[0].asDict()

{'century': 20, 'count(year)': 100}

In [30]:
# Ordenando os dados por ordem decrescente

data = data.orderBy('year', ascending=False)

In [31]:
data.show()

+----+----------+-----------------+-------+
|year|population|unemployment_rate|century|
+----+----------+-----------------+-------+
|2016|     65573|             4.90|     21|
|2015|     65110|             5.38|     21|
|2014|     64597|             6.18|     21|
|2013|     64106|             7.61|     21|
|2012|     63705|             7.97|     21|
|2011|     63285|             8.11|     21|
|2010|     62759|             7.87|     21|
|2009|     62260|             7.61|     21|
|2008|     61824|             5.69|     21|
|2007|     61319|             5.33|     21|
|2006|     60827|             5.42|     21|
|2005|     60413|             4.83|     21|
|2004|     59950|             4.75|     21|
|2003|     59637|             5.01|     21|
|2002|     59366|             5.19|     21|
|2001|     59113|             5.10|     21|
|2000|     58886|             5.46|     21|
|1999|     58684|             5.98|     20|
|1998|     58475|             6.26|     20|
|1997|     58314|             6.