# <span style="color:blue">MBA em Ciência de Dados</span>
# <span style="color:blue">Análise de Dados com Base em Processamento Massivo em Paralelo</span>

## <span style="color:blue">Avaliação Final</span>

**Material Produzido por:**<br>
>**Profa. Dra. Cristina Dutra de Aguiar**<br>


**CEMEAI - ICMC/USP São Carlos**

As questões desta avaliação final estão espalhadas ao longo do texto. Por favor, procurem por Questão para encontrar a especificação das questões e o local no qual cada questão deve ser respondida. Também é possível localizar as questões utilizando o menu de navegação. O *notebook* contém a constelação de fatos da BI Solutions que deve ser utilizada para responder às questões e também todas as bibliotecas, bases de dados, inicializações, instalações, importações, geração de dataFrames, geração de visões temporárias e conversão dos tipos de dados necessárias para a realização da questão. Portanto, o notebook está preparado para ser executado usando Pandas, o método spark.sql() e os métodos do módulo pyspark.sql.

O uso do *framework* Spark requer diversas configurações no ambiente de desenvolvimento para executar o *notebook*. Dado que tal complexidade foge do escopo de nossa disciplina, recomenda-se que o *notebook* seja executado na plataforma de desenvolvimento COLAB. O uso do COLAB  proporciona um ambiente de desenvolvimento pré-configurado e remove a complexidade de instalação e configuração de pacotes e *frameworks* que são utilizados na disciplina.

**IMPORTANTE**

**Antes de fazer a avaliação, leia atentamente a seção 5, que detalha instruções importantes sobre a avaliação e o critério de correção.**  

**INSTRUÇÕES DE ENTREGA**

**O que deve ser entregue:**
- **O notebook com as respostas no formato .ipynb**
- **O notebook com as respostas no formato .pdf**

**Ambos arquivos devem ser nomeados usando o primeiro nome e o último sobrenome do aluno. Por exemplo: CristinaAguiar.ipynb e CristinaAguiar.pdf.**

Boa avaliação!

#1 Constelação de Fatos da BI Solutions

A aplicação de *data warehousing* da BI Solutions utiliza como base uma constelação de fatos, conforme descrita a seguir.

**Tabelas de dimensão**

- data (dataPK, dataCompleta, dataDia, dataMes, dataBimestre, dataTrimestre, dataSemestre, dataAno)
- funcionario (funcPK, funcMatricula, funcNome, funcSexo, funcDataNascimento, funcDiaNascimento, funcMesNascimento, funcAnoNascimento, funcCidade, funcEstadoNome, funcEstadoSigla, funcRegiaoNome, funcRegiaoSigla, funcPaisNome, funcPaisSigla)
- equipe (equipePK, equipeNome, filialNome, filialCidade, filialEstadoNome, filialEstadoSigla, filialRegiaoNome, filialRegiaoSigla, filialPaisNome, filialPaisSigla)
- cargo (cargoPK, cargoNome, cargoRegimeTrabalho, cargoEscolaridadeMinima, cargoNivel)
- cliente (clientePK, clienteNomeFantasia, clienteSetor, clienteCidade, clienteEstadoNome, clienteEstadoSigla, clienteRegiaoNome, clienteRegiaoSigla, clientePaisNome, clientePaisSigla)

**Tabelas de fatos**
- pagamento (dataPK, funcPK, equipePK, cargoPK, salario, quantidadeLancamentos)
- negociacao (dataPK, equipePK, clientePK, receita, quantidadeNegociacoes)


#2 Obtenção dos Dados da BI Solutions


## 2.1 Baixando o Módulo wget

Para baixar os dados referentes ao esquema relacional da constelação de fatos da BI Solutions, é utilizado o módulo  **wget**. O comando a seguir realiza a instalação desse módulo. <br>

In [1]:
#instalando o módulo wget
%%capture
!pip install -q wget
!mkdir data

## 2.2 Obtenção dos Dados das Tabelas de Dimensão

Os comandos a seguir baixam os dados que povoam as tabelas de dimensão. 

In [2]:
#baixando os dados das tabelas de dimensão
import wget

url = "https://raw.githubusercontent.com/GuiMuzziUSP/Data_Mart_BI_Solutions/main/data.csv"
wget.download(url, "data/data.csv")

url = "https://raw.githubusercontent.com/GuiMuzziUSP/Data_Mart_BI_Solutions/main/funcionario.csv"
wget.download(url, "data/funcionario.csv")

url = "https://raw.githubusercontent.com/GuiMuzziUSP/Data_Mart_BI_Solutions/main/equipe.csv"
wget.download(url, "data/equipe.csv")

url = "https://raw.githubusercontent.com/GuiMuzziUSP/Data_Mart_BI_Solutions/main/cargo.csv"
wget.download(url, "data/cargo.csv")

url = "https://raw.githubusercontent.com/GuiMuzziUSP/Data_Mart_BI_Solutions/main/cliente.csv"
wget.download(url, "data/cliente.csv")

'data/cliente.csv'

## 2.3 Obtenção dos Dados Tabelas de Fatos

Os comandos a seguir baixam os dados que povoam as tabelas de fatos. 

In [3]:
#baixando os dados das tabelas de fatos
url = "https://raw.githubusercontent.com/GuiMuzziUSP/Data_Mart_BI_Solutions/main/pagamento.csv"
wget.download(url, "data/pagamento.csv")

url = "https://raw.githubusercontent.com/GuiMuzziUSP/Data_Mart_BI_Solutions/main/negociacao.csv"
wget.download(url, "data/negociacao.csv")

'data/negociacao.csv'

# 3 Apache Spark Cluster

## 3.1 Instalação

Neste *notebook* é criado um *cluster* Spark composto apenas por um **nó mestre**. Ou seja, o *cluster* não possui um ou mais **nós de trabalho** e o **gerenciador de cluster**. Nessa configuração, as tarefas (*tasks*) são realizadas no próprio *driver* localizado no **nó mestre**.

Para que o cluster possa ser criado, primeiramente é instalado o Java Runtime Environment (JRE) versão 8. 

In [53]:
#instalando Java Runtime Environment (JRE) versão 8
%%capture
!apt-get remove openjdk*
!apt-get update --fix-missing
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

Na sequência, é feito o *download* do Apache Spark versão 3.0.0.

In [4]:
#baixando Apache Spark versão 3.0.0
%%capture
!wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop2.7.tgz
!tar xf spark-3.0.0-bin-hadoop2.7.tgz && rm spark-3.0.0-bin-hadoop2.7.tgz

Na sequência, são configuradas as variáveis de ambiente JAVA_HOME e SPARK_HOME. Isto permite que tanto o Java quanto o Spark possam ser encontrados.

In [5]:
import os
#configurando a variável de ambiente JAVA_HOME
os.environ["MYSQL_PWD"] = "/usr/lib/jvm/java-8-openjdk-amd64"
#configurando a variável de ambiente SPARK_HOME
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop2.7"

Por fim, são instalados dois pacotes da linguagem de programação Python, cujas funcionalidades são descritas a seguir.

> **Pacote findspark:** Usado para ler a variável de ambiente SPARK_HOME e armazenar seu valor na variável dinâmica de ambiente PYTHONPATH. Como resultado, Python pode encontrar a instalação do Spark. 

> **Pacote pyspark:** PySpark é a API do Python para Spark. Ela possibilita o uso de Python, considerando que o *framework* Apache Spark encontra-se desenvolvido na linguagem de programação Scala. 

In [6]:
%%capture
#instalando o pacote findspark
!pip install -q findspark==1.4.2
#instalando o pacote pyspark
!pip install -q pyspark==3.0.0

## 3.2 Conexão

PySpark não é adicionado ao *sys.path* por padrão. Isso significa que não é possível importá-lo, pois o interpretador da linguagem Python não sabe onde encontrá-lo. 

Para resolver esse aspecto, é necessário instalar o módulo `findspark`. Esse módulo mostra onde PySpark está localizado. Os comandos a seguir têm essa finalidade.


In [7]:
#importando o módulo findspark
import findspark
#carregando a variávels SPARK_HOME na variável dinâmica PYTHONPATH
findspark.init()

Depois de configurados os pacotes e módulos e inicializadas as variáveis de ambiente, é possível iniciar o uso do Spark na aplicação de `data warehousing`. Para tanto, é necessário importar o comando `SparkSession` do módulo `pyspark.sql`. São utilizados os seguintes conceitos: <br>

- `SparkSession`: permite a criação de `DataFrames`. Como resultado, as tabelas relacionais podem ser manipuladas por meio de `DataFrames` e é possível realizar consultas OLAP por meio de comandos SQL. <br>
- `builder`: cria uma instância de SparkSession. <br>
- `appName`: define um nome para a aplicação, o qual pode ser visto na interface de usuário web do Spark. <br> 
- `master`: define onde está o nó mestre do *cluster*. Como a aplicação é executada localmente e não em um *cluster*, indica-se isso pela *string* `local` seguida do parâmetro `[*]`. Ou seja, define-se que apenas núcleos locais são utilizados. 
- `getOrCreate`: cria uma SparkSession. Caso ela já exista, retorna a instância existente. 


**Observação**: A lista completa de todos os parâmetros que podem ser utilizados na inicialização do *cluster* pode ser encontrada neste [link](https://spark.apache.org/docs/latest/spark-standalone.html#cluster-launch-scripts).

In [8]:
from pyspark.sql import SparkSession

spark = SparkSession.builder.appName("pyspark-notebook").master("local[*]").getOrCreate()

# 4 Geração dos DataFrames em Pandas da BI Solutions

Nesta seção são gerados os DataFrames em Pandas. Atenção aos nomes desses DataFrames. 


In [9]:
import pandas as pd
pd.set_option('display.float_format', lambda x: '%.2f' % x)

In [10]:
cargoPandas = pd.read_csv('https://raw.githubusercontent.com/GuiMuzziUSP/Data_Mart_BI_Solutions/main/cargo.csv')
clientePandas = pd.read_csv('https://raw.githubusercontent.com/GuiMuzziUSP/Data_Mart_BI_Solutions/main/cliente.csv')
dataPandas = pd.read_csv('https://raw.githubusercontent.com/GuiMuzziUSP/Data_Mart_BI_Solutions/main/data.csv')
equipePandas = pd.read_csv('https://raw.githubusercontent.com/GuiMuzziUSP/Data_Mart_BI_Solutions/main/equipe.csv')
funcionarioPandas = pd.read_csv('https://raw.githubusercontent.com/GuiMuzziUSP/Data_Mart_BI_Solutions/main/funcionario.csv')
negociacaoPandas = pd.read_csv('https://raw.githubusercontent.com/GuiMuzziUSP/Data_Mart_BI_Solutions/main/negociacao.csv')
pagamentoPandas = pd.read_csv('https://raw.githubusercontent.com/GuiMuzziUSP/Data_Mart_BI_Solutions/main/pagamento.csv')

# 5 Geração dos DataFrames em Spark da BI Solutions

Nesta seção são gerados dos DataFrames em Spark. Atenção aos nomes desses DataFrames. 


## 5.1 Criação dos DataFrames

In [11]:
#criando os DataFrames em Spark 
cargo = spark.read.csv(path="data/cargo.csv", header=True, sep=",")
cliente = spark.read.csv(path="data/cliente.csv", header=True, sep=",")
data = spark.read.csv(path="data/data.csv", header=True, sep=",")
equipe = spark.read.csv(path="data/equipe.csv", header=True, sep=",")
funcionario = spark.read.csv(path="data/funcionario.csv", header=True, sep=",")
negociacao = spark.read.csv(path="data/negociacao.csv", header=True, sep=",")
pagamento = spark.read.csv(path="data/pagamento.csv", header=True, sep=",")

## 5.2 Atualização dos Tipos de Dados 

Nos comandos a seguir, primeiro são identificados quais colunas de quais `DataFrames` devem ser do tipo de dado inteiro. Na sequência, ocorre a conversão. Por fim, são exibidos os esquemas dos `DataFrames`, possibilitando visualizar a mudança de tipo de dados das colunas especificadas.

In [12]:
# identificando quais colunas de quais DataFrames devem ser do tipo de dado inteiro
colunas_cargo = ["cargoPK"]
colunas_cliente = ["clientePK"]
colunas_data = ["dataPK", "dataDia", "dataMes", "dataBimestre", "dataTrimestre", "dataSemestre", "dataAno"]
colunas_equipe = ["equipePK"]
colunas_funcionario = ["funcPK", "funcDiaNascimento", "funcMesNascimento", "funcAnoNascimento"]
colunas_negociacao = ["equipePK", "clientePK", "dataPK", "quantidadeNegociacoes"]
colunas_pagamento = ["funcPK", "equipePK", "dataPK", "cargoPK", "quantidadeLancamentos"]

In [13]:
# importando o tipo de dado desejado
from pyspark.sql.types import IntegerType


# atualizando o tipo de dado das colunas especificadas 
# substituindo as colunas já existentes 

for coluna in colunas_cargo:
  cargo = cargo.withColumn(coluna, cargo[coluna].cast(IntegerType()))

for coluna in colunas_cliente:
  cliente = cliente.withColumn(coluna, cliente[coluna].cast(IntegerType()))

for coluna in colunas_data:
  data = data.withColumn(coluna, data[coluna].cast(IntegerType()))

for coluna in colunas_equipe:
  equipe = equipe.withColumn(coluna, equipe[coluna].cast(IntegerType()))

for coluna in colunas_funcionario:
  funcionario = funcionario.withColumn(coluna, funcionario[coluna].cast(IntegerType()))

for coluna in colunas_negociacao:
  negociacao = negociacao.withColumn(coluna, negociacao[coluna].cast(IntegerType()))

for coluna in colunas_pagamento:
  pagamento = pagamento.withColumn(coluna, pagamento[coluna].cast(IntegerType()))

Nos comandos a seguir, primeiro são identificados quais colunas de quais `DataFrames` devem ser do tipo de dado número de ponto flutuante. Na sequência, ocorre a conversão. Por fim, são exibidos os esquemas dos `DataFrames`, possibilitando visualizar a mudança de tipo de dados das colunas especificadas.

In [14]:
# identificando quais colunas de quais DataFrames devem ser do tipo de dado número de ponto flutuante
colunas_negociacao = ["receita"]
colunas_pagamento = ["salario"]

In [15]:
# importando o tipo de dado desejado
from pyspark.sql.types import FloatType


# atualizando o tipo de dado das colunas especificadas 
# substituindo as colunas já existentes 

for coluna in colunas_negociacao:
  negociacao = negociacao.withColumn(coluna, negociacao[coluna].cast(FloatType()))

for coluna in colunas_pagamento:
  pagamento = pagamento.withColumn(coluna, pagamento[coluna].cast(FloatType()))

In [34]:
# importando funções adicionais 
from pyspark.sql.functions import round, desc

## 5.3 Criação de Visões Temporárias

In [16]:
#criando as visões temporárias 
cargo.createOrReplaceTempView("cargo")
cliente.createOrReplaceTempView("cliente")
data.createOrReplaceTempView("data")
equipe.createOrReplaceTempView("equipe")
funcionario.createOrReplaceTempView("funcionario")
negociacao.createOrReplaceTempView("negociacao")
pagamento.createOrReplaceTempView("pagamento")

# 6 Instruções Importantes sobre a Avaliação

## 6.1 Especificação das Consultas OLAP

As consultas OLAP devem ser respondidas de acordo com o solicitado em cada questão. As seguintes solicitações podem ser feitas:

- Resolva a questão especificando a consulta OLAP usando **Pandas**. Neste caso, a consulta deve ser respondida usando os conceitos apresentados na Aula 05 da disciplina. Ou seja, a consulta deve ser respondida usando os métodos disponíveis na biblioteca Pandas para uso em Python. Não é possível usar o método `spark.sql()` para especificar a consulta. Também não é possível usar os demais métodos do módulo `pyspark.sql` para especificar a consulta.

- Resolva a questão especificando a consulta OLAP na **linguagem SQL**. Neste caso, a consulta deve ser respondida usando os conceitos apresentados na Aula 07 da disciplina. Ou seja, a consulta deve ser respondida usando a linguagem SQL textual e o método `spark.sql()`. Não é possível usar os métodos disponíveis na biblioteca Pandas para especificar a consulta. Também não é possível usar os demais métodos do módulo `pyspark.sql` para especificar a consulta, com exceção do método `show()` para listar o resultado da consulta. 

- Resolva a questão especificando a consulta OLAP usando os **métodos de pyspark.sql**. Neste caso, a consulta deve ser respondida usando os conceitos apresentados na Aula 08 da disciplina. Ou seja, a consulta deve ser respondida usando os demais métodos do módulo `pyspark.sql`. Não é possível usar os métodos disponíveis na biblioteca Pandas para especificar a consulta. Também não é possível usar o método `spark.sql()` para especificar a consulta.

**AVISO: Caso a consulta seja especificada de forma diferente do que foi solicitado, a resposta não será considerada, mesmo que ela esteja correta.**

## 6.2 Ordem das Colunas e das Linhas

A resolução das questões deve seguir estritamente as especificações definidas em cada consulta. Isto significa que:

- As **colunas** solicitadas devem ser exibidas exatamente na mesma ordem que a definida na questão. Note que todas as colunas a serem exibidas como resposta da consulta, bem como a ordem na qual elas devem aparecer são sempre definidas na questão. 

- As **linhas** retornadas como respostas devem ser exibidas exatamente na mesma ordem que a definida na questão. Note que a ordem na qual as linhas devem aparecer são sempre definidas na questão. 

- Os **nomes das colunas** renomeadas devem seguir estritamente os nomes definidos na questão. Para evitar possíveis erros, os nomes das colunas renomeadas não possuem acentos e espaços em branco, além de serem escritos utilizando apenas letras maiúsculas. Note que os nomes das colunas renomeadas são sempre definidos na questão.

**AVISO: Essas orientações devem ser seguidas uma vez que a correção da avaliação será realizada de forma automática. Caso a consulta retorne resultados de forma diferente do que foi solicitado, a resposta não será considerada, mesmo que ela esteja correta.**  

## 6.3 Listagem das Respostas das Consultas

A resposta de cada consulta deve ser listada usando o método `show()`. Nenhum outro método pode ser utilizado com essa finalidade.  

Devem ser listadas apenas as `25` primeiras linhas de resposta de cada consulta. Adicionalmente, devem ser listadas *strings* com tamanho maior do que 20 caracteres, ou seja, o parâmetro `truncate` do método `show()` deve ser inicializado como `false`.

Portanto, a listagem das respostas deve ser feita utilizando o método `show()` como especificado a seguir. 

- Quando a consulta OLAP for especificada usando **Pandas**. Utilize o comando `df.head(25)` para exibir o resultado da consulta.

- Quando a consulta OLAP for especificada usando a **linguagem SQL**. Utilize o comando `spark.sql(consultaSQL).show(25,truncate=False)` para exibir o resultado da consulta. 

- Quando a consulta OLAP for especificada usando os demais **métodos de pyspark.sql**. Utilize o comando `nomeDoDataFrame.show(25,truncate=False)` para exibir o resultado da consulta.

## 6.4 Arredondamento dos Dados

Deve ser realizado o arredondamento dos dados todas as vezes que uma função de agregação for aplicada às medidas numéricas `salario` da tabela de dimensão `pagamento` e `receita` da tabela de dimensão `negociacao`. 

O arredondamento deve ser realizado usando a função `round()` na linguagem SQL e o método `round()` em `pyspark.sql` e deve arredondar os dados até duas casas decimais. Por exemplo, podem ser produzidos resultados da forma `112233.4` e `112233.44`. 

Portanto, o arredondamento dos dados deve ser feito como especificado a seguir.

- Quando a consulta OLAP for especificada usando **Pandas**. Utilize o comando `df.round(2)` para arredondar os dados até duas casas decimais.

- Quando a consulta OLAP for especificada usando a **linguagem SQL**. Utilize a função `ROUND(funçãoDeAgregação,2)` para arredondar o dado até duas casas decimais.

- Quando a consulta OLAP for especificada usando os demais **métodos de pyspark.sql**. Utilize o método `round(funçãoDeAgregação,2)` para arredondar o dado até duas casas decimais.

## 6.5 Respostas das Questões

As respostas das questões devem ser fornecidas de duas formas diferentes:

- Exibidas na saída padrão.

- Armazenadas em um arquivo no formato csv.

A seguir são detalhadas as instruções em **Pandas**, **spark.sql()** e **pyspark()** para que as respostas sejam mostradas de forma apropriada para as correções. 


Em **Pandas**

```python
# Resolve a questão usando a variável de nome questao
QuestaoX = consulta.round(2).head(25)
onde X é o número da questão, por exemplo Questao1, Questao2, ...

# Exibe a resposta da questão na saída padrão
display(QuestaoX)
onde X é o número da questão, por exemplo Questao1, Questao2, ...

# Gera o arquivo no formato csv com a resposta da questão
QuestaoX.to_csv("questaoX.csv", index=False, header=True)
onde X é o número da questão, por exemplo Questao1, Questao2, ...
```


**Em spark.sql()**

```python
# Resolve a questão usando a variável de nome questao
QuestaoX = spark.sql(query)
onde X é o número da questão, por exemplo Questao1, Questao2, ... 

# Exibe a resposta da questão na saída padrão
QuestaoX.show(25, truncate=False)
onde X é o número da questão, por exemplo Questao1, Questao2, ...

# Gera o arquivo no formato csv com a resposta da questão
QuestaoX\
  .coalesce(1).limit(25) \
  .toPandas().to_csv("questaoX.csv", index=False, header=True)
onde X é o número da questão, por exemplo Questao1, Questao2, ...
```

**Em pyspark()**

```python
# Resolve a questão usando a variável de nome questao
QuestaoX = consulta 
onde X é o número da questão, por exemplo Questao1, Questao2, ...

# Exibe a resposta da questão na saída padrão
QuestaoX.show(25, truncate=False)
onde X é o número da questão, por exemplo Questao1, Questao2, ...

# Gera o arquivo no formato csv com a resposta da questão
QuestaoX\
  .coalesce(1).limit(25) \
  .toPandas().to_csv("QuestaoX.csv", index=False, header=True)
onde X é o número da questão, por exemplo Questao1, Questao2, ...
```

## 6.6 Comentários Explicativos

Devem ser colocados comentários no código que expliquem o passo a passo da resolução da questão. Os comentários explicativos devem ser realizados como especificado a seguir. 

- Quando a consulta OLAP for especificada usando **Pandas**. Utilize `#` para colocar comentário. Por exemplo:

```
# para a solução desta consulta OLAP, primeiramente é aplicado o método ... para ..... 
# Na sequência, é aplicado o método ... para ...
```

- Quando a consulta OLAP for especificada usando a **linguagem SQL**. Utilize `#` para colocar comentários gerais (conforme explicado para os demais métodos de `pyspark.sql`) ou utilize `--` para colocar comentários no comando SQL. Por exemplo:

```
# neste comentário são descritas as características de cada cláusula da consulta SQL. 
# A funcionalidade da cláusula SELECT nesta consulta é ... 
# A funcionalidade da cláusula FROM nesta consulta é ... 
# A funcionalidade da cláusula WHERE nesta consulta é ...
```

```
-- A funcionalidade da cláusula SELECT nesta consulta é ...
SELECT funcNome
-- A funcionalidade da cláusula FROM nesta consulta é ...
FROM funcionario
-- A funcionalidade da cláusula WHERE nesta consulta é ...
WHERE funcPK = 1
```

- Quando a consulta OLAP for especificada usando os demais **métodos de pyspark.sql**. Utilize `#` para colocar comentário. Por exemplo:

```
# para a solução desta consulta OLAP, primeiramente é aplicado o método ... para ..... 
# Na sequência, é aplicado o método ... para ...
```

## 6.7 Indentação e Organização

As consultas e os comandos que respondem às questões dessa avaliação devem ser escritos de forma indentada. Em caso de dúvida, observem os *notebooks* da Aula 05, da Aula 07 e da Aula 08 e verifiquem como as consultas e os comandos foram indentados.

**AVISO: Com relação à organização, é necessário que as respostas às questões sejam localizadas aonde especificado no *notebook*. Por favor, procurem por "Resposta da Questão" para encontrar o local no qual as respostas devem ser especificadas. Também é possível localizar o local das respostas utilizando o menu de navegação.**

## 6.8 Critério de Avaliação

Na correção da avaliação, serão ponderados os seguintes aspectos:

- Corretude da execução das consultas OLAP.

- Atendimento às especificações definidas nas seções 6.1, 6.2, 6.3, 6.4, 6.5.

- Atendimento às especificações da sintaxe das cláusulas e dos métodos utilizados para resolver cada questão.

- Qualidade da documentação entregue, de acordo com as especificações definidas nas seções 6.6 e 6.7. 


# 7 Consultas OLAP

O objetivo das consultas OLAP é realizar diferentes investigações sobre aspectos específicos no que tange às atividades realizadas pela BI Solutions. Os resultados obtidos nas investigações poderão ser posteriormente utilizados para a definição de estratégias que a empresa deve executar para prover melhorias. 


## 7.1 Análises Relacionadas aos Cargos

Foi identificado que, nos últimos anos, o cargo de nome "ADMINISTRADOR EM SEGURANCA DA INFORMACAO" teve um aumento expressivo no que tange aos gastos com salários. O objetivo das análises desta seção é obter uma visão relacionada a esse aspecto, por meio da investigação dos gastos em salários considerando diferentes fatores. 

Podem ser realizadas diferentes análises, sendo que três delas são solicitadas a seguir.

### **Questão 1 (valor 1,0)**

Liste, para cada ano, a soma dos salários para o cargo de nome "ADMINISTRADOR EM SEGURANCA DA INFORMACAO". Arredonde a soma dos salários para até duas casas decimais. Devem ser exibidas as colunas na ordem e com os nomes especificados a seguir: "ANO", "TOTALDESPESA". Ordene as linhas exibidas primeiro pelo total de despesa em ordem descendente e depois pelo ano em ordem descendente. Liste as primeiras 25 linhas da resposta, sem truncamento das *strings*. 

**Resolva a questão especificando a consulta OLAP usando Pandas**.


### Resposta da Questão 1

In [17]:
# Resposta da Questão 1 
# Não se esqueça de finalizar a consulta mostrando seu resultado com .head()
# Não se esqueça de exibir a resposta na saída padrão
# Não se esqueça de gerar o arquivo no formato csv com a resposta da questão

# Realizar join de datas e cargos
# Filtrar pelo cargo de nome "ADMINISTRADOR EM SEGURANCA DA INFORMACAO" 
consulta = pagamentoPandas.merge(dataPandas, on = 'dataPK')\
.merge(cargoPandas, on = 'cargoPK').query('cargoNome == "ADMINISTRADOR EM SEGURANCA DA INFORMACAO"')

# Renomear colunas para o padrão definido e realizar a soma dos salarios agrupando por ano
consulta = consulta.rename(mapper={"dataAno": "ANO", "salario": "TOTALDESPESA"}, axis=1)

# Agrupando por ANO e TOTALDESPESA e somando o resultado
consulta = consulta.groupby(['ANO'])['TOTALDESPESA'].sum().reset_index()

# Aplicando ordenação descendente para as colunas 'ANO', 'TOTALDESPESA'
consulta = consulta.sort_values(by = ['ANO', 'TOTALDESPESA'], ascending=[False,False])

Questao1 = consulta.round(2).head(25)

# Exibe a resposta na saída padrão
display(Questao1)

# Gera o arquivo no formato csv com a resposta
Questao1.to_csv("questao1.csv", index=False, header=True)

Unnamed: 0,ANO,TOTALDESPESA
4,2020,1883273.28
3,2019,1883273.28
2,2018,1239394.8
1,2017,943759.32
0,2016,475625.52


### **Questão 2 (valor: 1,0)**

Liste, para cada nome da região da filial, a soma dos salários para o cargo de nome "ADMINISTRADOR EM SEGURANCA DA INFORMACAO". Arredonde a soma dos salários para até duas casas decimais. Devem ser exibidas as colunas na ordem e com os nomes especificados a seguir: "REGIAO", "TOTALDESPESA". Ordene as linhas exibidas primeiro pelo total de despesa em ordem descendente e depois pelo nome da região em ordem descendente. Liste as primeiras 25 linhas da resposta, sem truncamento das *strings*. 

**Resolva a questão especificando a consulta OLAP na linguagem SQL**.

### Resposta da Questão 2

In [18]:
# Resposta da Questão 2
# Não se esqueça de finalizar a consulta mostrando seu resultado com .show()
# Não se esqueça de exibir a resposta na saída padrão
# Não se esqueça de gerar o arquivo no formato csv com a resposta da questão


query = """
-- Seleção de regiao e soma dos salarios com rename de coluna para padrão desejado
SELECT filialRegiaoNome as REGIAO, ROUND(SUM(salario),2) AS TOTALDESPESA
FROM pagamento 
-- Join com as duas tabelas alvos (data para agrupamento e equipe para filtro)
JOIN data USING (dataPK)
JOIN cargo USING (cargoPK)
JOIN equipe USING (equipePK)
WHERE 
-- Filtro por cargo
cargo.cargoNome == "ADMINISTRADOR EM SEGURANCA DA INFORMACAO"
-- Agrupamento por REGIAO
GROUP BY REGIAO
-- Ordenação descendente por ano e totaldespesa 
ORDER BY REGIAO DESC, TOTALDESPESA DESC
"""

# Enviando a query SQL para execução
Questao2  = spark.sql(query)

# Exibe a resposta da questão na saída padrão
Questao2.show(25, truncate=False)

# Gera o arquivo no formato csv com a resposta da questão
Questao2\
  .coalesce(1).limit(25) \
  .toPandas().to_csv("questao2.csv", index=False, header=True)


+------------+------------+
|REGIAO      |TOTALDESPESA|
+------------+------------+
|SUDESTE     |3923904.22  |
|NORDESTE    |932295.13   |
|CENTRO-OESTE|1569126.83  |
+------------+------------+



### **Questão 3 (valor: 1,0)**

Liste, por sexo, a soma dos salários para o cargo de nome "ADMINISTRADOR EM SEGURANCA DA INFORMACAO". Arredonde a soma dos salários para até duas casas decimais. Devem ser exibidas as colunas na ordem e com os nomes especificados a seguir: "SEXO", "TOTALDESPESA". Ordene as linhas exibidas primeiro pelo total de despesa em ordem descendente e depois pelo sexo em ordem descendente. Liste as primeiras 25 linhas da resposta, sem truncamento das *strings*. 

**Resolva a questão especificando a consulta OLAP usando os métodos de pyspark.sql**

### Resposta da Questão 3

In [41]:
# Resposta da Questão 3
# Não se esqueça de finalizar a consulta mostrando seu resultado com .show()
# Não se esqueça de exibir a resposta na saída padrão
# Não se esqueça de gerar o arquivo no formato csv com a resposta da questão

# Realizar join de datas e cargos
# Filtrar pelo cargo de nome "ADMINISTRADOR EM SEGURANCA DA INFORMACAO" 

# Join da tabela pagamento com data usando a PK dataPK
query = pagamento.join(data, 'dataPK', 'inner')
# Join da tabela funcionario com pagamento usando a PK funcPK
query = query.join(funcionario, 'funcPK', 'inner')
# Join com a tabela cargo com pagamento usando PK cargoPK e filtro de cargo por nome
query = query.join(cargo, (cargo.cargoPK ==  pagamento.cargoPK) & (cargo.cargoNome == "ADMINISTRADOR EM SEGURANCA DA INFORMACAO"),'inner')

#Selecionando campos alvos da consulta
query = query.select('funcSexo','salario','cargoNome')

#Agrupamento por sexo
query = query.groupBy('funcSexo')

#Soma dos salários
query = query.sum("salario")\

#Arredondamento da soma de salário para duas casa decimais
query = query.withColumn("sum(salario)", round("sum(salario)",2))

#Rename de colunas
query = query.withColumnRenamed("sum(salario)", "TOTALDESPESA")
query = query.withColumnRenamed("funcSexo", "SEXO")

##Ordenação por TOTALDESPESA e SEXO em ordem descrescente
Questao3 = query.orderBy(desc("TOTALDESPESA"), desc('SEXO'))

# Exibe a resposta da questão na saída padrão
Questao3.show(25, truncate=False)

# Gera o arquivo no formato csv com a resposta da questão
Questao3\
  .coalesce(1).limit(25) \
  .toPandas().to_csv("Questao3.csv", index=False, header=True)

+----+------------+
|SEXO|TOTALDESPESA|
+----+------------+
|M   |4380094.08  |
|F   |2045232.1   |
+----+------------+



## 7.2 Análises Relacionadas às Regiões

Foi identificada a necessidade de se investigar despesas e receitas no que tange às regiões. O objetivo das análises desta seção é obter uma visão relacionada a esse aspecto. 

Podem ser realizadas diferentes análises, sendo que três delas são solicitadas a seguir.

### **Questão 4 (valor: 1,0)** 

Liste, para cada nome do estado da filial, a soma das receitas por ano considerando apenas o trimestre 1 e os clientes cuja região na qual eles moram é a mesma região na qual a filial está localizada. Arredonde a soma das receitas para até duas casas decimais. Devem ser exibidas as colunas na ordem e com os nomes especificados a seguir: "ESTADO", "ANO", "TOTALRECEITA". Ordene as linhas exibidas primeiro pelo total de receitas em ordem descendente, depois por estado em ordem descendente, depois pelo ano em ordem descendente. Liste as primeiras 25 linhas da resposta, sem truncamento das *strings*.

**Resolva a questão especificando a consulta OLAP usando Pandas**.

### Resposta da Questão 4

In [19]:
# Resposta da Questão 4 
# Não se esqueça de finalizar a consulta mostrando seu resultado com .head()
# Não se esqueça de exibir a resposta na saída padrão
# Não se esqueça de gerar o arquivo no formato csv com a resposta da questão


# Realizando merge da tabela de negociação com a tabela de clientes através da chave primária clientePK
consulta  = negociacaoPandas.merge(clientePandas, on = 'clientePK')\

# Realizando merge com a tabela data através da chave primária dataPK (negociacao.dataPK == data.dataPK)
consulta = consulta.merge(dataPandas, on = 'dataPK')

# Filtrar somente os registros cujo a data.dataTrimestre == 1
consulta = consulta.query('dataTrimestre == 1')

# Renomear colunas para o padrão definido 
consulta = consulta.rename(mapper={"dataAno": "ANO", "clienteEstadoNome": "ESTADO", 'receita':'TOTALRECEITA'}, axis=1)

# Agrupando por ANO e ESTADO somando o TOTALRECEITA
consulta = consulta.groupby(['ANO','ESTADO'])['TOTALRECEITA'].sum().reset_index()

# Aplicando ordenação descendente para as colunas 'TOTALRECEITA','ESTADO','ANO' em roder descendente
consulta = consulta.sort_values(by = ['TOTALRECEITA','ESTADO','ANO'], ascending=[False,False,False])

Questao4 = consulta.round(2).head(25)

# Exibe a resposta na saída padrão
display(Questao4)

# Gera o arquivo no formato csv com a resposta
Questao4.to_csv("questao4.csv", index=False, header=True)

Unnamed: 0,ANO,ESTADO,TOTALRECEITA
39,2019,SAO PAULO,3901649.25
49,2020,SAO PAULO,2848293.65
36,2019,RIO DE JANEIRO,1706011.15
46,2020,RIO DE JANEIRO,1482059.4
29,2018,SAO PAULO,1073581.65
33,2019,MINAS GERAIS,827838.95
43,2020,MINAS GERAIS,776380.25
31,2019,CEARA,598676.3
35,2019,PERNAMBUCO,539018.5
44,2020,PARANA,528529.6


### **Questão 5 (valor: 1,0)** 

Liste, para cada nome da região da filial, a soma dos salários e a soma das receitas, considerando apenas o ano de 2017. Arredonde a soma dos salários e a soma das receitas para até duas casas decimais. Devem ser exibidas as colunas na ordem e com os nomes especificados a seguir: "REGIAO", "TOTALRECEITAEQUIPE", "TOTALDESPESAEQUIPE". Ordene as linhas exibidas primeiro pelo total de receitas em ordem descendente, depois pelo total de despesas ordem descendente. Liste as primeiras 25 linhas da resposta, sem truncamento das *strings*. 

**Resolva a questão especificando a consulta OLAP na linguagem SQL**.

### Resposta da Questão 5

In [79]:
# Resposta da Questão 5
# Não se esqueça de finalizar a consulta mostrando seu resultado com .show()
# Não se esqueça de exibir a resposta na saída padrão
# Não se esqueça de gerar o arquivo no formato csv com a resposta da questão

query = """
SELECT 

-- selecionar a região da filial
filialRegiaoNome as REGIAO, 

-- Soma das despesas com arredondamento em duas casas
ROUND(SUM(salario),2) AS TOTALDESPESAEQUIPE,

-- Soma das receitas com arredondamento em duas casas
ROUND(SUM(receita),2) AS TOTALRECEITAEQUIPE

-- Seleção das tabelas equipe (usado no campo filialRegiaoNome) e data (para filtro match com pagamento e negociação)
FROM equipe, data

-- join  de pagamentos da mesma equipe e data
INNER JOIN pagamento ON pagamento.equipePK = equipe.equipePK AND pagamento.dataPK = data.dataPK

-- join  de necocição da mesma equipe e data
INNER JOIN negociacao ON negociacao.equipePK = equipe.equipePK AND negociacao.dataPK = data.dataPK

-- Filtrando apenas pelo ano de 2017
WHERE  data.dataAno == 2017

-- Agrupamento por REGIAO
GROUP BY REGIAO
-- Ordenação descendente por TOTALDESPESAEQUIPE e TOTALRECEITAEQUIPE 
ORDER BY TOTALDESPESAEQUIPE DESC, TOTALRECEITAEQUIPE DESC
"""

# Enviando a query SQL para execução
Questao5  = spark.sql(query)

# Exibe a resposta da questão na saída padrão
Questao5.show(25, truncate=False)

# Gera o arquivo no formato csv com a resposta da questão
Questao5\
  .coalesce(1).limit(25) \
  .toPandas().to_csv("questao5.csv", index=False, header=True)

+------------+------------------+------------------+
|REGIAO      |TOTALDESPESAEQUIPE|TOTALRECEITAEQUIPE|
+------------+------------------+------------------+
|SUDESTE     |2359394.58        |6703714.63        |
|CENTRO-OESTE|606071.4          |754395.4          |
+------------+------------------+------------------+



### **Questão 6 (valor: 1,5)** 

Liste todas as agregações que podem ser geradas para a partir da soma dos salários por nome do estado da filial e por ano, considerando apenas o trimestre 1 e os funcionários cuja região na qual eles moram é a mesma região na qual a filial está localizada. Arredonde a soma dos salários para até duas casas decimais. Devem ser exibidas as colunas na ordem e com os nomes especificados a seguir: "ESTADO", "ANO", "TOTALRECEITA". Ordene as linhas exibidas primeiro pelo total de receita em ordem descendente, depois por estado em ordem descendente, depois pelo ano em ordem descendente. Liste as primeiras 25 linhas da resposta, sem truncamento das *strings*. 

**Resolva a questão especificando a consulta OLAP usando os métodos de pyspark.sql.**

### Resposta da Questão 6

In [36]:
# Resposta da Questão 6
# Não se esqueça de finalizar a consulta mostrando seu resultado com .show()
# Não se esqueça de exibir a resposta na saída padrão
# Não se esqueça de gerar o arquivo no formato csv com a resposta da questão


#Selecionar tabela de fatos pagamento
query = pagamento
# Join com data através da PK dataPK e filtro por semestre 1
query = query.join(data, (pagamento.dataPK == data.dataPK) & (data.dataTrimestre==1), 'inner')
# Join com tabela de equipe através da PK equipePK para acesso ao campo filialEstadoNome e filtro pela filialRegiaoNome
query = query.join(equipe, 'equipePK', 'inner')
# Join com a tabela de funcionário usando como junção o pagamento e cuja a região na qual o funcionario reside é a mesma região na qual a filial está localizada
query = query.join(funcionario, (funcionario.funcPK == pagamento.funcPK ) & (funcionario.funcRegiaoNome == equipe.filialRegiaoNome), 'inner')

# Selecão dos campos para uso
query = query.select('dataAno','filialEstadoNome','salario')

#Dimensões das agregações para todas as combinações de valores nas colunas selecionadas
query = query.cube('dataAno','filialEstadoNome')
#soma de salário para cada conjunto de agregação
query = query.sum("salario")
# Arrendondamento para duas casas decimais
query = query.withColumn("sum(salario)", round("sum(salario)",2))
#Rename de colunas
query = query.withColumnRenamed("sum(salario)", "TOTALRECEITA")
query = query.withColumnRenamed("dataAno", "ANO")
query = query.withColumnRenamed("filialEstadoNome", "ESTADO")

#Ordenação
Questao6 = query.orderBy(desc("TOTALRECEITA"), desc('ANO'))

# Exibe a resposta da questão na saída padrão
Questao6.show(25, truncate=False)

# Gera o arquivo no formato csv com a resposta da questão
Questao6\
  .coalesce(1).limit(25) \
  .toPandas().to_csv("Questao6.csv", index=False, header=True)

+----+--------------+------------+
|ANO |ESTADO        |TOTALRECEITA|
+----+--------------+------------+
|null|null          |9342133.85  |
|null|SAO PAULO     |5657776.45  |
|null|RIO DE JANEIRO|3549567.87  |
|2020|null          |2450982.1   |
|2019|null          |2450982.1   |
|2018|null          |2065272.34  |
|2017|null          |1550525.2   |
|2020|SAO PAULO     |1514279.37  |
|2019|SAO PAULO     |1514279.37  |
|2018|SAO PAULO     |1195964.37  |
|2017|SAO PAULO     |957678.16   |
|2020|RIO DE JANEIRO|869307.97   |
|2019|RIO DE JANEIRO|869307.97   |
|2018|RIO DE JANEIRO|869307.97   |
|2016|null          |824372.11   |
|2017|RIO DE JANEIRO|592847.04   |
|2016|SAO PAULO     |475575.18   |
|2016|RIO DE JANEIRO|348796.92   |
|null|PERNAMBUCO    |134789.52   |
|2020|PERNAMBUCO    |67394.76    |
|2019|PERNAMBUCO    |67394.76    |
+----+--------------+------------+



## 7.3 Análise Relacionada a Totais

O objetivo da análise desta seção é obter uma tabela de totais.

### **Questão 7 (valor: 1,5)** 

Liste, para cada nome da região da filial, o número total de funcionários diferentes, o número total de clientes diferentes e o número total de equipes diferentes. Devem ser exibidas as colunas na ordem e com os nomes especificados a seguir: "REGIAO", "TOTALFUNCIONARIOS", "TOTALCLIENTES", "TOTALEQUIPES". Ordene as linhas exibidas primeiro pela região em ordem descendente, depois pelo total de funcionários em ordem descendente, depois pelo total de clientes em ordem descendente, depois pelo total de equipes em ordem descendente. Liste as primeiras 25 linhas da resposta, sem truncamento das *strings*.

**Resolva a questão especificando a consulta OLAP na linguagem SQL**.

### Resposta da Questão 7

In [21]:
# Resposta da Questão 7
# Não se esqueça de finalizar a consulta mostrando seu resultado com .show()
# Não se esqueça de exibir a resposta na saída padrão
# Não se esqueça de gerar o arquivo no formato csv com a resposta da questão

query = """
SELECT
-- selecionar a região da filial
equipe.filialRegiaoNome as REGIAO, 

/* nas proximas 3 linhas foram utilizados a função COUNT e DISTINCT 
 para contabilizar as ocorrências únicas usando os indices de cada dimensão 
 
 indice funcPK para funcionarios
 indice clientePK para clientes
 indice equipePK para equipes   */

count(distinct(pagamento.funcPK)) AS TOTALFUNCIONARIOS,
count(distinct(negociacao.clientePK)) AS TOTALCLIENTES,
count(distinct(equipe.equipePK)) AS TOTALEQUIPES

-- A tabela equipe possui informação de região da filial
FROM equipe

/****** Totalização de funcionários únicos por região TOTALFUNCIONARIOS ******
   Segundo o modelo proposto, o relacionamento entre funcionário e equipe se dá atraves da tabela de fatos pagamento */
INNER JOIN pagamento ON pagamento.equipePK = equipe.equipePk

/* Por questão de otimização não será necessário realizar o join com a tabela funcionario
uma vez que já temos o identificador exclusivo funcPK na dimensão pagamento.
INNER JOIN funcionario ON pagamento.funcPK = funcionario.funcPK */

/******* Totalização de clientes únicos por região TOTALCLIENTES **********
   Segundo o modelo proposto, o relacionamento entre cliente e equipe se dá atraves da tabela de fatos negociacao */
LEFT JOIN negociacao ON negociacao.equipePK = equipe.equipePK 

/* Por questão de otimização não será necessário realizar o join com a tabela cliente
uma vez que já temos o identificador exclusivo clientePK na dimensão negociacao.
INNER JOIN cliente ON negociacao.clientePK = cliente.clientePK  */


-- Agrupamento por região
GROUP BY equipe.filialRegiaoNome 
ORDER BY REGIAO DESC, TOTALFUNCIONARIOS DESC, TOTALCLIENTES DESC, TOTALEQUIPES DESC
"""

# Enviando a query SQL para execução
Questao7  = spark.sql(query)

# Exibe a resposta da questão na saída padrão
Questao7.show(25, truncate=False)

# Gera o arquivo no formato csv com a resposta da questão
Questao7\
  .coalesce(1).limit(25) \
  .toPandas().to_csv("questao7.csv", index=False, header=True)

+------------+-----------------+-------------+------------+
|REGIAO      |TOTALFUNCIONARIOS|TOTALCLIENTES|TOTALEQUIPES|
+------------+-----------------+-------------+------------+
|SUDESTE     |134              |138          |7           |
|NORDESTE    |20               |55           |1           |
|CENTRO-OESTE|46               |86           |2           |
+------------+-----------------+-------------+------------+



# 8 Estendendo a Aplicação da BI Solutions

A aplicação da BI Solutions está sendo estendida de forma a analisar um novo assunto de interesse: os gastos realizados na compra de equipamentos. Para tanto, é necessário criar uma nova tabela de dimensão chamada Equipamento, a qual tem como objetivo armazenar dados de equipamentos, os quais devem ser obtidos a partir de 3 fontes de dados heterogêneas.




### 8.1 Detalhamento das Fontes  

Considere que o processo de integração de dados já tenha sido realizado. Como resultado, as 3 fontes de dados (Fonte1, Fonte2, Fonte3) possuem os mesmos  atributos, com os mesmos nomes. Esses atributos encontram-se listados a seguir, sendo seus nomes semânticos.

- equipamentoPK

- equipamentoNome

- equipamentoCor

- equipamentoTipo

- equipamentoMoeda

- equipamentoPreco 


In [22]:
import pandas as pd

# Obtenção dos dados da Fonte1 e armazenamento desses dados no Dataframe chamado Fonte1
Fonte1 = pd.read_csv('https://raw.githubusercontent.com/CristinaAguiar/QuestaoIntegra2021/main/Fonte1.csv')

In [23]:
# Obtenção dos dados da Fonte2 e armazenamento desses dados no Dataframe chamado Fonte2
Fonte2 = pd.read_csv('https://raw.githubusercontent.com/CristinaAguiar/QuestaoIntegra2021/main/Fonte2.csv')

In [24]:
# Obtenção dos dados da Fonte3 e armazenamento desses dados no Dataframe chamado Fonte3
Fonte3 = pd.read_csv('https://raw.githubusercontent.com/CristinaAguiar/QuestaoIntegra2021/main/Fonte3.csv')

### 8.2 Detalhamento da tabela de dimensão Equipamento  

A tabela de dimensão Equipamento da BI Solutions deve possuir os seguintes atributos:

- **equipamentoPK**, correspondente aos atributos de mesmo nome nas fontes de dados.

- **equipamentoNome**, correspondente aos atributos de mesmo nome nas fontes de dados.

- **equipamentoDescricao**, correspondente aos atributos de mesmo nome nas fontes de dados.

- **equipamentoCor**, correspondente aos atributos de mesmo nome nas fontes de dados.

- **equipamentoTipo**, correspondente aos atributos de mesmo nome nas fontes de dados.

- **equipamentoPreco**, correspondente aos atributos equipamentoMoeda e equipamentoPreco nas fontes de dados. 


### 8.3 Regras de Negócio do Processo de Integração de Instâncias

No processo de integração de instâncias, devem ser consideradas as seguintes regras de negócio:

- A integração deve ser feita pelo atributo equipamentoPK. Equipamentos que possuam o mesmo valor desse atributo referem-se ao mesmo equipamento.

- Todas as *strings* devem ser escritas em letras maiúsculas, sem acento e sem o uso de caracteres especiais.

- Os valores das *strings* não devem ser truncados.

- Os preços dos equipamentos devem ser armazenados somente em reais. Portanto, para se calcular os valores da coluna equipamentoPreco da tabela de dimensão Equipamento, deve ser feito o cálculo desse valor em reais, utilizando os atributos equipamentoMoeda e equipamentoPreco presentes nas fontes de dados originais. Considere, para isso, as seguintes conversões: (i) 1 dolar USD = 5 reais; e (ii) 1 euro EUR = 6 reais.

- Os preços dos equipamentos devem ter duas casas decimais e não devem incluir a sigla "R$". 

- As cores dos equipamentos devem ser armazenadas por meio de números, da seguinte forma:

  - 1: correspondente à cor PRETO nas fontes de dados
  - 2: correspondente à cor AZUL nas fontes de dados
  - 3: correspondente à cor BRANCO nas fontes de dados
  - 4: correspondente à cor PRATA nas fontes de dados
  - 5: correspondente à cor VERMELHO nas fontes de dados
  - 6: correspondente à cor AMARELO nas fontes de dados

- Para resolver inconsistências nos valores de cada atributo que aparecem nas diferentes fontes, desconsidere os valores nulos e considere que: 

  - (i) quando em uma coluna o valor for igual nas três fontes, esse valor deve ser armazenado na tabela de dimensão Equipamento na coluna equivalente. Por exemplo, se o nome do equipamento de PK = 1 for caneta nas três fontes de dados, então o valor a ser armazenado é CANETA.

  - (ii) quando em uma coluna o valor for igual em duas fontes e diferente na terceira fonte, o  valor a ser armazenado na tabela de dimensão Equipamento na coluna equivalente é o valor que aparece nas duas fontes. Por exemplo, se o nome do equipamento de PK = 1 for caneta em duas fontes de dados e borracha na terceira fonte de dados, então o valor a ser armazenado é CANETA.

  - (iii) quando em uma coluna quando o valor for diferente nas três fontes, escolhe-se por armazenar o valor da Fonte 1 na tabela de dimensão Equipamento na coluna equivalente. Caso o valor da Fonte 1 seja nulo (inexistente), escolhe-se por armazenar o valor da Fonte 2. Caso o valor da Fonte 2 também seja nulo (inexistente), escolhe-se por armazenar o valor da Fonte 3. Isso significa que Fonte1 é mais confiável do que Fonte2, a qual é mais confiável do que Fonte3. 



### **Questão 8 (valor: 2,0)** 

Realize a geração da tabela de dimensão Equipamento, considerando os detalhamentos dos atributos das seções 8.1 e 8.2 e as regras de negócio do processo de integração de instâncias definido na seção 8.3. A tabela de dimensão Equipamento deve possuir as colunas na ordem e com os nomes especificados a seguir: equipamentoPK, equipamentoNome, equipamentoDescricao, equipamentoCor, equipamentoTipo, equipamentoPreco. Ordene as linhas exibidas pelo atributo equipamentoPK em ordem **ascendente**. Liste **todas** as linhas da resposta, sem truncamento das *strings*.

**Resolva a questão usando Pandas**. Coloque comentários detalhados explicando a sua resposta.

### Resposta da Questão 8

In [26]:
%%capture
#Para realizar a remoção de acentuação precisamos instalar o pacote unidecode
# que será utilizado na funcão normalizar_dataframe()
!pip install Unidecode

In [33]:
import numpy as np
import unidecode
import re
from pandas.api.types import is_string_dtype


#Definição de taxas de câmbio com seguintes conversões: (i) 1 dolar USD = 5 reais; e (ii) 1 euro EUR = 6 reais. 
# será utilizado na função converter_moeda_para_real()
TAXA_CONVERSAO = {'USD': 5, 'EUR': 6}

# Vamos fazer uma copia do dataframe para não sujar os dados originais
# Iremos trabalhar com a cópia
A = Fonte1.copy()
B = Fonte2.copy()
C = Fonte3.copy()

def processar_conflitos(fonte_A, fonte_B, fonte_C):
  """
  Processa os conflitos das fontes com base na confiabilidade das mesmas,
  Sendo a fonte_A a mais confiável.
  """
  # Vamos percorrer cada indice de fonte_A e verifica se existem pares em fonte_B e fonte_C
  for index in fonte_A.index:
    row_A = fonte_A.loc[index]

    exist_B = False #Flag para indentificar se indice foi encontrado em fonte_B
    if index in fonte_B.index:
      row_B = fonte_B.loc[index]
      exist_B = True
    exist_B = False #Flag para indentificar se indice foi encontrado em fonte_C
    if index in fonte_C.index:
      row_C = fonte_C.loc[index]
      exist_C = True

    # Caso exista valores correspondende em B e C
    if exist_B and exist_C:    
      #Iremos percorreer cada coluna da FonteA e realizar atualização de dados conforme a regra
      for column in fonte_A.columns:
        #Definição 8.3 (iii) quando valor de B e C forem iguais e A diferente, usamos o valor de B
        if not pd.isna(row_B[column]) and row_A[column] != row_B[column] and row_B[column] == row_C[column]:
          fonte_A.at[index, column] = row_B[column]
  return fonte_A

def converter_cores_para_numeros(dataframe):
  """
  Converte cores de string para inteiro conforme o valor correspondentes abaixo:
    1: correspondente à cor PRETO nas fontes de dados
    2: correspondente à cor AZUL nas fontes de dados
    3: correspondente à cor BRANCO nas fontes de dados
    4: correspondente à cor PRATA nas fontes de dados
    5: correspondente à cor VERMELHO nas fontes de dados
    6: correspondente à cor AMARELO nas fontes de dados
  """
  dataframe['equipamentoCor'].replace(to_replace=['PRETO', 'AZUL', 'BRANCO','PRATA','VERMELHO','AMARELO'], value=[1, 2, 3, 4, 5 ,6], inplace=True)
  return dataframe

def converter_moeda_para_real(dataframe):
  """
  Realiza a conversão de 'equipamentoPreco' com taxas pré-definidas
  Atualiza o valor 'equipamentoMoeda' para a moeda 'BRL'
  """
  def converte_valor(valor,moeda):
    return valor*TAXA_CONVERSAO[moeda]

  # Para cada moeda encontrado no dataframe iremos aplicar a taxa de conversão
  for moeda in TAXA_CONVERSAO:
    dataframe.loc[dataframe['equipamentoMoeda'] == moeda,'equipamentoPreco'] = dataframe[dataframe['equipamentoMoeda'] == moeda]['equipamentoPreco'].apply(converte_valor,moeda=moeda)
    dataframe.loc[dataframe['equipamentoMoeda'] == moeda,'equipamentoMoeda'] = dataframe[dataframe['equipamentoMoeda'] == moeda]['equipamentoMoeda'] = 'BRL'

  return dataframe


def normalizar_dataframe(dataframe):
  """
   Função para normalizar as strings: todas as strings devem ser escritas em letras maiúsculas, sem acento e sem o uso de caracteres especiais.
  """
  def normalizar_texto(valor):  
    #Primeiro vamos remover qualquer acento dos caracteres, ex.: Á vira A
    remove_acentos = unidecode.unidecode(valor)
    #Segundo vamos remover todos o caracteres especiais emanter somente caractere alphanuméricos e espaços
    remove_especiais = re.sub(r'\W\s+', '', remove_acentos)
    #Por úlitmo vamos converter a string para letras maiúsculas
    return remove_especiais.upper()

  #Iremos percorrer cada coluna do dataframe para aplicar a normalização
  for nome_coluna in dataframe.columns:
    #A normalização de string só pode ocorrer em colunas do tipo String, a função is_string_dtype do pandas faz essa checagem :)
    if is_string_dtype(dataframe[nome_coluna]):
      dataframe[nome_coluna] = dataframe[[nome_coluna]].applymap(normalizar_texto)
  return dataframe


"""def validar_dados(dataframe):
  for nome_coluna in df1.columns.values[1:]: """
    

def preprocessamento_dataframe(df):
  """
  Clona o DataFrame, e altera o indice para funcMatricula
  """
  ndf = df.copy(deep=True)
  return ndf.set_index('equipamentoPK')


def merge_null_todas_colunas(df1,df2):

  # Iremos fazer um full merge do dataframe 1 com o 2
  # Especificando how=outer para unir todas as linhas do dataframe esquerdo e todas as linhas do dataframe direito.
  novo = pd.merge(df1,df2,on='equipamentoPK', how='outer')

  # Vamos percorrer cada coluna do dataframe, exceto a primeira que é PK
  for nome_coluna in df1.columns.values[1:]:  

    #Vamos obter o indice de localização da coluna (ordem) para recriarmos no menos lugar
    idx_coluna = df1.columns.get_loc(nome_coluna)

    #A operação de merge renomeou as colunas para o padrão coluna_x e coluna_y, vamos recriar a original no mesmo local inicial com valor NaN
    novo.insert(idx_coluna,nome_coluna,np.nan)

    #Vamos preenchar a coluna recriada com o o valor da coluna_x (primeiro dataframe), caso esse valor seja NULO iremos usar o valor da coluna_y(segundo dataframe)
    # Essa regra atende ao quesito Caso o valor da Fonte 1 seja nulo (inexistente), escolhe-se por armazenar o valor da Fonte 2.
    novo[nome_coluna] = novo.apply(lambda row: row[f"{nome_coluna}_y"] if pd.isna(row[f"{nome_coluna}_x"]) else row[f"{nome_coluna}_x"], axis=1)
    novo.drop([f"{nome_coluna}_y",f"{nome_coluna}_x"], axis='columns', inplace=True)
  return novo


#Primeiro passo é obter um dataframe com merge de FonteA,FonteB e FonteC
#Vamos inicar o merge com FonteA e FonteA
df_processado = merge_null_todas_colunas(A,B)

#Em sequencia vamos pegar o resultado e fazer merge com FonteC
df_processado = merge_null_todas_colunas(df_processado,C)

#Agora iremos indexar o dataframe com a PK equipamentoPK
df_processado_indexado = preprocessamento_dataframe(df_processado)

#Agora iremos indexar a FonteB com a PK equipamentoPK
FonteB_indexada = preprocessamento_dataframe(B)
#Agora iremos indexar a FonteC com a PK equipamentoPK
FonteC_indexada = preprocessamento_dataframe(C)

# Comandos para resolver indices do conflito de df_processado_indexado(primário), FonteB_indexada e FonteC_indexada (secundários)
df_processado = processar_conflitos(df_processado_indexado, FonteB_indexada, FonteC_indexada)

# Aplicar a funçõa para normalizar as STRINGS
# Todas as strings devem ser escritas em letras maiúsculas, sem acento e sem o uso de caracteres especiais.
df_processado = normalizar_dataframe(df_processado)

# Aplicar função para realizar a conversão de moedas estrangeiras para Real
df_processado = converter_moeda_para_real(df_processado)

# Aplicar função para realizar a conversão de cores
Questao8 = converter_cores_para_numeros(df_processado)

#Aplicar ordenação de indice (equipamentoPK)
Questao8 = Questao8.sort_index()

# Exibe a resposta da questão na saída padrão
display(Questao8)

# Gera o arquivo no formato csv com a resposta da questão
Questao8.to_csv("questao8.csv", index=False, header=True)

Unnamed: 0_level_0,equipamentoNome,equipamentoDescricao,equipamentoCor,equipamentoTipo,equipamentoMoeda,equipamentoPreco
equipamentoPK,Unnamed: 1_level_1,Unnamed: 2_level_1,Unnamed: 3_level_1,Unnamed: 4_level_1,Unnamed: 5_level_1,Unnamed: 6_level_1
1,MONITOR,MONITOR LCD LG 15 POLEGADAS,1,EQUIP INFORMATICA,BRL,234.0
2,MONITOR,MONITOR LCD ITAUTEC 19 POLEGADAS,1,EQUIP INFORMATICA,BRL,180.2
3,MONITOR,MONITOR LCD AOC 17 POLEGADAS,1,EQUIP INFORMATICA,BRL,167.41
4,COMPUTADOR,LENOVO THINKCENTRE MT-M WINTEL CORE 2 DUO 2.33...,1,EQUIP INFORMATICA,BRL,2399.41
5,COMPUTADOR,LENOVO THINKCENTRE MT-M WINTEL CORE 2 DUO 2.33...,1,EQUIP INFORMATICA,BRL,2399.41
6,COMPUTADOR,ITAUTEC INFOWAY SM 3322 WAMD PHENOM X2 8GB 320...,4,EQUIP INFORMATICA,BRL,1877.84
7,COMPUTADOR,POSITIVO PLUS R70 WMOBO ITAUTEC SM3322 AMD PHE...,1,EQUIP INFORMATICA,BRL,3500.47
8,COMPUTADOR,ITAUTEC INFOWAY ST 1430 WMB ITAUTEC SM3322 AMD...,4,EQUIP INFORMATICA,BRL,1474.87
9,COMPUTADOR,HP COMPAQ DC5850 WAMD PHENOM X4 4GB RAM 250GB HD,4,EQUIP INFORMATICA,BRL,1500.0
10,COMPUTADOR,HP COMPAQ DC5850 WAMD PHENOM X4 4GB RAM 250GB HD,4,EQUIP INFORMATICA,BRL,1583.74
