<img src="https://raw.githubusercontent.com/andre-marcos-perez/ebac-course-utils/main/media/logo/newebac_logo_black_half.png" alt="ebac-logo">

---

# **Módulo** | Computação em Nuvem III
Caderno de **Exercícios**<br>
Professor [André Perez](https://www.linkedin.com/in/andremarcosperez/)

---

# **Tópicos**

<ol type="1">
  <li>Introdução;</li>
  <li>Apache Spark;</li>
  <li>Data Wrangling com Spark.</li>
</ol>

---

# **Exercícios**

## 1\. Apache Spark

Replique as atividades do item 2.1 e 2.2 para instalar e configurar um cluster Apache Spark na máquina virtual do Google Colab.

In [1]:
%%capture
!wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop2.7.tgz
!tar xf spark-3.0.0-bin-hadoop2.7.tgz && rm spark-3.0.0-bin-hadoop2.7.tgz

In [2]:
%%capture
!apt-get remove openjdk*
!apt-get update --fix-missing
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

## 2\. Data Wrangling

A base de dados presente neste [link](https://www.kaggle.com/datasets/bank-of-england/a-millennium-of-macroeconomic-data) contem dados macroeconômicos sobre o Reino Unido desde o século 13.

**2.1\. Data**

Faça o download dos dados utilizando a máquina virutal do Google Colab com o código abaixo.

In [3]:
!wget -q "https://raw.githubusercontent.com/cluster-apps-on-docker/spark-standalone-cluster-on-docker/master/build/workspace/data/uk-macroeconomic-data.csv" -O "uk-macroeconomic-data.csv"

**2.2. Wrangling**

Processe os dados para que a base de dados final apresente os valores da taxa de desemprego (`Unemployment rate`) e população (`Population (GB+NI)`) estejam ordenados por ano decrescente:

```csv
year,population,unemployment_rate
...,...,...
```

In [31]:
import pandas as pd

# Exemplo de dados iniciais
data = {
    "Year": [2020, 2019, 2018, 2017],
    "Unemployment rate": [4.5, 3.8, 4.2, 4.7],
    "Population (GB+NI)": [67500000, 67200000, 66900000, 66600000],
}

# Criação do DataFrame
df = pd.DataFrame(data)

# Ordenar por ano decrescente
df_sorted = df.sort_values(by="Year", ascending=False).reset_index(drop=True)

# Mostrar o DataFrame ordenado
print(df_sorted)


   Year  Unemployment rate  Population (GB+NI)
0  2020                4.5            67500000
1  2019                3.8            67200000
2  2018                4.2            66900000
3  2017                4.7            66600000


Para isso, utilize:

 - Pandas

In [32]:
import pandas as pd
import os

# Get the current working directory
current_directory = os.getcwd()
print(f"Current working directory: {current_directory}")

# Define the file path
file_path = os.path.join(current_directory, "/content/uk-macroeconomic-data.csv")  # Use os.path.join for platform compatibility
# or provide full path, e.g., file_path = "/path/to/your/file/dados_desemprego.csv"


# Check if the file exists
if os.path.exists(file_path):
    # Load the CSV file if it exists
    df = pd.read_csv(file_path)

    # Your data processing code here
    # ...
else:
    print(f"Error: File not found at path: {file_path}")





Current working directory: /content


In [33]:
import pandas as pd

# Carregar o arquivo CSV (substitua pelo seu arquivo real)
df = pd.read_csv("/content/uk-macroeconomic-data.csv")

# Exibir as colunas disponíveis no DataFrame
print("Colunas disponíveis no DataFrame:")
print(df.columns)

# Corrigir possíveis espaços ou caracteres indesejados nos nomes das colunas
df.columns = df.columns.str.strip()

# Exibir as primeiras linhas para verificar o conteúdo do DataFrame
print("\nPrimeiras linhas do DataFrame:")
print(df.head())

# Renomear as colunas para padronizar os nomes (ajuste conforme necessário)
df.rename(columns={
    'Ano': 'Year',  # Exemplo: traduzindo 'Ano' para 'Year'
    'Taxa de Desemprego': 'Unemployment rate',  # Ajuste conforme o nome correto
    'População (GB+NI)': 'Population (GB+NI)'  # Ajuste conforme o nome correto
}, inplace=True)

# Verificar novamente as colunas após renomeação
print("\nColunas após renomeação:")
print(df.columns)

# Ordenar pelos valores da coluna 'Year' de forma decrescente
try:
    df_sorted = df.sort_values(by="Year", ascending=False).reset_index(drop=True)
    print("\nDados ordenados por ano (decrescente):")
    print(df_sorted)
except KeyError as e:
    print(f"Erro: {e}. Verifique se a coluna 'Year' está presente e corretamente nomeada.")



Colunas disponíveis no DataFrame:
Index(['Description', 'Real GDP of England at market prices',
       'Real GDP of England at factor cost ',
       'Real UK GDP at market prices, geographically-consistent estimate based on post-1922 borders',
       'Real UK GDP at factor cost, geographically-consistent estimate based on post-1922 borders',
       'Index of real UK GDP at factor cost - based on changing political boundaries, ',
       'Composite estimate of English and (geographically-consistent) UK real GDP at factor cost',
       'HP-filter of log of real composite estimate of English and UK real GDP at factor cost',
       'Real UK gross disposable national income at market prices, constant border estimate',
       'Real consumption', 'Real investment', 'Stockbuilding contribution',
       'Real government consumption of goods and services', 'Export volumes',
       'Import volumes', 'Nominal GDP of England at market prices',
       'Nominal UK GDP at market prices', 'Nominal UK GD

 - PySpark

In [35]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col

# Criar uma sessão do Spark
spark = SparkSession.builder \
    .appName("Processamento de Dados com PySpark") \
    .getOrCreate()

# Carregar o arquivo CSV em um DataFrame do PySpark (ajuste o caminho para o seu arquivo)
#file_path = "dados_desemprego.csv"
df = spark.read.csv("/content/uk-macroeconomic-data.csv", header=True, inferSchema=True)

# Mostrar as colunas disponíveis
print("Colunas disponíveis no DataFrame:")
df.printSchema()

# Renomear colunas para padronizar os nomes (ajuste conforme necessário)
df = df.withColumnRenamed("Ano", "Year") \
       .withColumnRenamed("Taxa de Desemprego", "Unemployment rate") \
       .withColumnRenamed("População (GB+NI)", "Population (GB+NI)")

# Verificar os dados após renomeação
print("\nPrimeiras linhas do DataFrame:")
df.show()

# Ordenar os dados pela coluna 'Year' em ordem decrescente
df_sorted = df.orderBy(col("year").desc())

# Exibir o DataFrame ordenado
print("\nDados ordenados por ano (decrescente):")
df_sorted.show()

# Salvar os dados ordenados em um novo arquivo CSV (opcional)
output_path = "dados_ordenados.csv"
df_sorted.write.csv(output_path, header=True)

# Encerrar a sessão do Spark
spark.stop()



Colunas disponíveis no DataFrame:
root
 |-- Description: string (nullable = true)
 |-- Real GDP of England at market prices: string (nullable = true)
 |-- Real GDP of England at factor cost : string (nullable = true)
 |-- Real UK GDP at market prices, geographically-consistent estimate based on post-1922 borders: string (nullable = true)
 |-- Real UK GDP at factor cost, geographically-consistent estimate based on post-1922 borders: string (nullable = true)
 |-- Index of real UK GDP at factor cost - based on changing political boundaries, : string (nullable = true)
 |-- Composite estimate of English and (geographically-consistent) UK real GDP at factor cost: string (nullable = true)
 |-- HP-filter of log of real composite estimate of English and UK real GDP at factor cost: string (nullable = true)
 |-- Real UK gross disposable national income at market prices, constant border estimate: string (nullable = true)
 |-- Real consumption: string (nullable = true)
 |-- Real investment: string 

AnalysisException: [UNRESOLVED_COLUMN.WITH_SUGGESTION] A column or function parameter with name `year` cannot be resolved. Did you mean one of the following? [`M1`, `Credit `, `Real ERI`, `Bank Rate`, `Employment`].;
'Sort ['year DESC NULLS LAST], true
+- Relation [Description#4751,Real GDP of England at market prices#4752,Real GDP of England at factor cost #4753,Real UK GDP at market prices, geographically-consistent estimate based on post-1922 borders#4754,Real UK GDP at factor cost, geographically-consistent estimate based on post-1922 borders#4755,Index of real UK GDP at factor cost - based on changing political boundaries, #4756,Composite estimate of English and (geographically-consistent) UK real GDP at factor cost#4757,HP-filter of log of real composite estimate of English and UK real GDP at factor cost#4758,Real UK gross disposable national income at market prices, constant border estimate#4759,Real consumption#4760,Real investment#4761,Stockbuilding contribution#4762,Real government consumption of goods and services#4763,Export volumes#4764,Import volumes#4765,Nominal GDP of England at market prices#4766,Nominal UK GDP at market prices#4767,Nominal UK GDP at market prices.1#4768,Population (GB+NI)#4769,Population (England)#4770,Employment#4771,Unemployment rate#4772,Average weekly hours worked#4773,Capital Services, whole economy#4774,... 53 more fields] csv


In [38]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, lit

# Criar uma sessão do Spark
spark = SparkSession.builder \
    .appName("Processamento de Dados com PySpark") \
    .getOrCreate()

# Carregar o arquivo CSV em um DataFrame do PySpark (ajuste o caminho para o seu arquivo)
file_path = "dados_desemprego.csv"
df = spark.read.csv("/content/uk-macroeconomic-data.csv", header=True, inferSchema=True)

# Mostrar as colunas disponíveis para diagnóstico
print("Estrutura inicial do DataFrame:")
df.printSchema()

# Exibir as primeiras linhas para verificar os dados
print("\nPrimeiras linhas do DataFrame:")
df.show(5)

# Criar uma nova coluna 'Year' com valores baseados na lógica do contexto
# Aqui vamos assumir que você possui dados ou uma lógica para adicionar anos (ajuste conforme necessário)
# Exemplo: Preenchendo a coluna com um ano fixo (substitua por valores reais ou lógica apropriada)
df = df.withColumn("Year", lit(2024))  # Substitua `2024` por lógica específica, se necessário

# Exibir o DataFrame atualizado
print("\nDataFrame com a coluna 'Year' adicionada:")
df.show(5)

# Ordenar os dados pela coluna 'Year' em ordem decrescente
df_sorted = df.orderBy(col("Year").desc())

# Exibir o DataFrame ordenado
print("\nDados ordenados por ano (decrescente):")
df_sorted.show()

# Salvar os dados ordenados em um novo arquivo CSV (opcional)
output_path = "dados_ordenados.csv"
df_sorted.write.csv(output_path, header=True)

# Encerrar a sessão do Spark
spark.stop()


Estrutura inicial do DataFrame:
root
 |-- Description: string (nullable = true)
 |-- Real GDP of England at market prices: string (nullable = true)
 |-- Real GDP of England at factor cost : string (nullable = true)
 |-- Real UK GDP at market prices, geographically-consistent estimate based on post-1922 borders: string (nullable = true)
 |-- Real UK GDP at factor cost, geographically-consistent estimate based on post-1922 borders: string (nullable = true)
 |-- Index of real UK GDP at factor cost - based on changing political boundaries, : string (nullable = true)
 |-- Composite estimate of English and (geographically-consistent) UK real GDP at factor cost: string (nullable = true)
 |-- HP-filter of log of real composite estimate of English and UK real GDP at factor cost: string (nullable = true)
 |-- Real UK gross disposable national income at market prices, constant border estimate: string (nullable = true)
 |-- Real consumption: string (nullable = true)
 |-- Real investment: string (n