# <span style="color:blue">MBA em Ciência de Dados</span>
# <span style="color:blue">Análise de Dados com Base em Processamento Massivo em Paralelo</span>

## <span style="color:blue">Aula 06: Processamento Paralelo e Distribuído</span>
## <span style="color:blue">Respostas dos Exercícios</span>

**Material Produzido por:**<br>
>**Profa. Dra. Cristina Dutra de Aguiar Ciferri**<br>

**CEMEAI - ICMC/USP São Carlos**

Esta lista de exercícios contém 9 exercícios, os quais estão espalhados ao longo do texto. Por favor, procurem por EXERCÍCIO para encontrar a especificação dos exercícios e as suas respectivas respostas. Também é possível localizar os exercícios utilizando o menu de navegação. Por completude, o notebook possui todas as descrições apresentadas na parte prática da Aula 06. Recomenda-se fortemente que a lista de exercícios seja respondida antes de se consultar as respostas dos exercícios Recomenda-se fortemente que a lista de exercícios seja respondida antes de se consultar as respostas dos exercícios.

# 1 Introdução

A aplicação de *data warehousing* da BI Solutions utiliza como base uma contelação de fatos que une dois esquemas estrela, conforme descrito a seguir.

**Tabelas de dimensão**

- data (dataPK, dataCompleta, dataDia, dataMes, dataBimestre, dataTrimestre, dataSemestre, dataAno)
- funcionario (funcPK, funcMatricula, funcNome, funcSexo, funcDataNascimento, funcDiaNascimento, funcMesNascimento, funcAnoNascimento, funcCidade, funcEstadoNome, funcEstadoSigla, funcRegiaoNome, funcRegiaoSigla, funcPaisNome, funcPaisSigla)
- equipe (equipePK, equipeNome, filialNome, filialCidade, filialEstadoNome, filialEstadoSigla, filialRegiaoNome, filialRegiaoSigla, filialPaisNome, filialPaisSigla)
- cargo (cargoPK, cargoNome, cargoRegimeTrabalho, cargoEscolaridadeMinima, cargoNivel)
- cliente (clientePK, clienteNomeFantasia, clienteSetor, clienteCidade, clienteEstadoNome, clienteEstadoSigla, clienteRegiaoNome, clienteRegiaoSigla, clientePaisNome, clientePaisSigla)

**Tabelas de fatos**
- pagamento (dataPK, funcPK, equipePK, cargoPK, salario, quantidadeLancamento)
- negociacao (dataPK, equipePK, clientePK, receita, quantidadeNegociacao)

Primeiramente, são definidos `paths`, sendo que cada `path` se refere a uma tabela de fatos ou uma tabela de dimensão. 

In [1]:
# Tabelas de dimensão
pathData = 'dados/data.csv'
pathFuncionario = 'dados/funcionario.csv'
pathEquipe = 'dados/equipe.csv'
pathCargo = 'dados/cargo.csv'
pathCliente = 'dados/cliente.csv'

# Tabelas de fato
pathPagamento = 'dados/pagamento.csv'
pathNegociacao = 'dados/negociacao.csv'

Na sequência,  todos os arquivos referentes às tabelas de fatos e às tabelas de dimensão são baixados, sendo armazenados na pasta `dados`.

In [2]:
%%capture
!git clone https://github.com/GuiMuzziUSP/Data_Mart_BI_Solutions.git dados

# 2 Apache Spark Cluster

### 2.1 Instalação

Neste *notebook* é criado um *cluster* Spark composto apenas por um **nó mestre**. Ou seja, o *cluster* não possui um ou mais **nós de trabalho** e o **gerenciador de cluster**. Nessa configuração, as tarefas (*tasks*) são realizadas no próprio *driver* localizado no **nó mestre**.

In [3]:
#instalando Java Runtime Environment (JRE) versão 8
%%capture
!apt-get remove openjdk*
!apt-get update --fix-missing
!apt-get install openjdk-8-jdk-headless -qq > /dev/null

Na sequência, é feito o *download* do Apache Spark versão 3.0.0.

In [4]:
#baixando Apache Spark versão 3.0.0
%%capture
!wget -q https://archive.apache.org/dist/spark/spark-3.0.0/spark-3.0.0-bin-hadoop2.7.tgz
!tar xf spark-3.0.0-bin-hadoop2.7.tgz && rm spark-3.0.0-bin-hadoop2.7.tgz

Na sequência, são configuradas as variáveis de ambiente JAVA_HOME e SPARK_HOME. Isto permite que tanto o Java quanto o Spark possam ser encontrados.

In [5]:
import os
#configurando a variável de ambiente JAVA_HOME
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
#configurando a variável de ambiente SPARK_HOME
os.environ["SPARK_HOME"] = "/content/spark-3.0.0-bin-hadoop2.7"

Por fim, são instalados dois pacotes da linguagem de programação Python, cujas funcionalidades são descritas a seguir.

Pacote findspark: Usado para ler a variável de ambiente SPARK_HOME e armazenar seu valor na variável dinâmica de ambiente PYTHONPATH. Como resultado, Python pode encontrar a instalação do Spark.

Pacote pyspark: PySpark é a API do Python para Spark. Ela possibilita o uso de Python, considerando que o framework Apache Spark encontra-se desenvolvido na linguagem de programação Scala.

In [6]:
%%capture
#instalando o pacote findspark
!pip install -q findspark==1.4.2
#instalando o pacote pyspark
!pip install -q pyspark==3.0.0

### 2.4 Conexão

PySpark não é adicionado ao *sys.path* por padrão. Isso significa que não é possível importá-lo, pois o interpretador da linguagem Python não sabe onde encontrá-lo. 

Para resolver esse aspecto, é necessário instalar o módulo `findspark`. Esse módulo mostra onde PySpark está localizado. Os comandos a seguir têm essa finalidade.

In [7]:
#importando o módulo findspark
import findspark
#carregando a variávels SPARK_HOME na variável dinâmica PYTHONPATH
findspark.init()

Depois de configurados os pacotes e módulos e inicializadas as variáveis de ambiente, é possível criar o objeto SparkContext. No comando de criação a seguir, é definido que é utilizado o próprio sistema operacional deste notebook como nó mestre por meio do parâmetro local do método setMaster. O complemento do parametro [*] indica que são alocados todos os núcleos de processamento disponíveis para o objeto driver criado.

In [8]:
from pyspark import SparkConf, SparkContext

conf = SparkConf().setMaster("local[*]")
spark = SparkContext(conf=conf)

# 3 Carregamento dos Dados da Aplicação da BI Solutions

### 3.1 Carregamento da tabela de dimensão **data**

O comando a seguir utiliza o método `textFile()` para armazenar no RDD chamado `data` os registros do arquivo de texto `"data.csv"`, os quais possuem os dados da tabela de dimensão `data`.

In [9]:
data_rdd = spark.textFile(pathData)

Os comandos a seguir realizam alterações no RDD `data` de forma que seus elementos representem linhas (ou tuplas) da tabela.

In [10]:
#imprimindo as 3 primeiras linhas de "data" e verificando que a primeira linha contém metadados (ou seja, o esquema referente aos dados)
data_rdd.take(3)

['dataPK,dataCompleta,dataDia,dataMes,dataBimestre,dataTrimestre,dataSemestre,dataAno',
 '1,1/1/2016,1,1,1,1,1,2016',
 '2,2/1/2016,2,1,1,1,1,2016']

In [11]:
#removendo a primeira linha de "data", desde que ela se refere a metadados
#capturando o cabeçalho
firstRow = data_rdd.first()
#removendo o cabeçalho
data_rdd = data_rdd.filter(lambda line: line != firstRow)

In [12]:
#imprimindo as 3 primeiras linhas de "data" e verificando que elas contêm apenas dados
data_rdd.take(3)

['1,1/1/2016,1,1,1,1,1,2016',
 '2,2/1/2016,2,1,1,1,1,2016',
 '3,3/1/2016,3,1,1,1,1,2016']

In [13]:
#imprimindo o cabeçalho de "data" e verificando os metadados 
data_header = firstRow[:].split(",")
display(data_header)

['dataPK',
 'dataCompleta',
 'dataDia',
 'dataMes',
 'dataBimestre',
 'dataTrimestre',
 'dataSemestre',
 'dataAno']

Desde que o arquivo lido encontra-se no formato `.csv`, utiliza-se o método `()` para transformar os elementos do RDD `data` em uma lista de valores divididos por `","`.

In [14]:
#mapeando os valores do RDD "data" utilizando o separador vírgula
data_rdd = data_rdd.map(lambda line: tuple(line.split(",")))
#imprimindo as 3 primeiras linhas de "data"
data_rdd.take(3)

[('1', '1/1/2016', '1', '1', '1', '1', '1', '2016'),
 ('2', '2/1/2016', '2', '1', '1', '1', '1', '2016'),
 ('3', '3/1/2016', '3', '1', '1', '1', '1', '2016')]

### 3.2 Carregamento das demais tabelas

#### **EXERCÍCIO 1**

Realize o carregamento dos demais arquivos referentes à constelação de fatos da BI solution, a saber: (i) tabelas de dimensão `funcionario`, `equipe`, `cargo` e `cliente`; e (ii) tabelas de fato `pagamento` e `negociação`. 

**Dica 1**: Crie uma função para executar este procedimento repetidas vezes. Utilize o esqueleto a seguir como base.

**Dica 2**: Se tiver dificuldades em criar a função, replique os comandos descritos para o carregamento da tabela de dimensão data para todas as tabelas restantes.

```python
def processaRdd(spark, path):
  ...
  return header, rddCsv
```

In [15]:
# Resposta do exercício

#criando a função 
def processaRdd(spark, path):
  rddCsv = spark.textFile(path)
  #capturando o cabeçalho
  firstRow = rddCsv.first()
  rddCsv = rddCsv.filter(lambda line: line != firstRow)
  header = firstRow[:].split(",")

  #processando o rddCsv
  rddCsv = rddCsv.map(lambda x: tuple(x.split(",")))

  return header, rddCsv

In [16]:
#resposta do exercício 1

#realizando o carregamento das tabelas de dimensão
funcionario_header, funcionario_rdd = processaRdd(spark, pathFuncionario)
equipe_header, equipe_rdd = processaRdd(spark, pathEquipe)
cargo_header, cargo_rdd = processaRdd(spark, pathCargo)
cliente_header, cliente_rdd = processaRdd(spark, pathCliente)

#realizando o carregamento das tabelas de fatos
pagamento_header, pagamento_rdd = processaRdd(spark, pathPagamento)
negociacao_header, negociacao_rdd = processaRdd(spark, pathNegociacao)

In [17]:
#resposta do exercício 1

#exibindo os metadados e as 3 primeiras linhas de funcionario 
display(funcionario_header)
funcionario_rdd.take(3)

['funcPK',
 'funcMatricula',
 'funcNome',
 'funcSexo',
 'funcDataNascimento',
 'funcDiaNascimento',
 'funcMesNascimento',
 'funcAnoNascimento',
 'funcCidade',
 'funcEstadoNome',
 'funcEstadoSigla',
 'funcRegiaoNome',
 'funcRegiaoSigla',
 'funcPaisNome',
 'funcPaisSigla']

[('1',
  'M-1',
  'ALINE ALMEIDA',
  'F',
  '1/1/1990',
  '1',
  '1',
  '1990',
  'SAO PAULO',
  'SAO PAULO',
  'SP',
  'SUDESTE',
  'SE',
  'BRASIL',
  'BR'),
 ('2',
  'M-2',
  'ARAO ALVES',
  'M',
  '2/2/1990',
  '2',
  '2',
  '1990',
  'CAMPINAS',
  'SAO PAULO',
  'SP',
  'SUDESTE',
  'SE',
  'BRASIL',
  'BR'),
 ('3',
  'M-3',
  'ARON ANDRADE',
  'M',
  '3/3/1990',
  '3',
  '3',
  '1990',
  'SANTOS',
  'SAO PAULO',
  'SP',
  'SUDESTE',
  'SE',
  'BRASIL',
  'BR')]

In [18]:
#resposta do exercício 1

#exibindo os metadados e as 3 primeiras linhas de equipe 
display(equipe_header)
equipe_rdd.take(3)

['equipePK',
 'equipeNome',
 'filialNome',
 'filialCidade',
 'filialEstadoNome',
 'filialEstadoSigla',
 'filialRegiaoNome',
 'filialRegiaoSigla',
 'filialPaisNome',
 'filialPaisSigla']

[('1',
  'APP - DESKTOP',
  'SAO PAULO - AV. PAULISTA',
  'SAO PAULO',
  'SAO PAULO',
  'SP',
  'SUDESTE',
  'SE',
  'BRASIL',
  'BR'),
 ('2',
  'APP - DESKTOP',
  'RIO DE JANEIRO - BARRA DA TIJUCA',
  'RIO DE JANEIRO',
  'RIO DE JANEIRO',
  'RJ',
  'SUDESTE',
  'SE',
  'BRASIL',
  'BR'),
 ('3',
  'WEB',
  'SAO PAULO - AV. PAULISTA',
  'SAO PAULO',
  'SAO PAULO',
  'SP',
  'SUDESTE',
  'SE',
  'BRASIL',
  'BR')]

In [19]:
#resposta do exercício 1

#exibindo os metadados e as 3 primeiras linhas de cargo 
display(cargo_header)
cargo_rdd.take(3)

['cargoPK',
 'cargoNome',
 'cargoRegimeTrabalho',
 'cargoJornadaTrabalho',
 'cargoEscolaridadeMinima',
 'cargoNivel']

[('1',
  'PROGRAMADOR DE SISTEMAS DE INFORMACAO',
  'TEMPORARIO',
  '20H',
  'MEDIO',
  'JUNIOR'),
 ('2',
  'PROGRAMADOR DE SISTEMAS DE INFORMACAO',
  'TEMPORARIO',
  '20H',
  'SUPERIOR',
  'PLENO'),
 ('3',
  'PROGRAMADOR DE SISTEMAS DE INFORMACAO',
  'TEMPORARIO',
  '20H',
  'POS',
  'SENIOR')]

In [20]:
#resposta do exercício 1

#exibindo os metadados e as 3 primeiras linhas de cliente 
display(cliente_header)
cliente_rdd.take(3)

['clientePK',
 'clienteNomeFantasia',
 'clienteSetor',
 'clienteCidade',
 'clienteEstadoNome',
 'clienteEstadoSigla',
 'clienteRegiaoNome',
 'clienteRegiaoSigla',
 'clientePaisNome',
 'clientePaisSigla']

[('1',
  'VIA FOOD',
  'BEBIDAS E ALIMENTOS',
  'SAO PAULO',
  'SAO PAULO',
  'SP',
  'SUDESTE',
  'SE',
  'BRASIL',
  'BR'),
 ('2',
  'VIA PIZZA',
  'BEBIDAS E ALIMENTOS',
  'SAO PAULO',
  'SAO PAULO',
  'SP',
  'SUDESTE',
  'SE',
  'BRASIL',
  'BR'),
 ('3',
  'VIA JAPA',
  'BEBIDAS E ALIMENTOS',
  'SAO PAULO',
  'SAO PAULO',
  'SP',
  'SUDESTE',
  'SE',
  'BRASIL',
  'BR')]

In [21]:
#resposta do exercício 1

#exibindo os metadados e as 3 primeiras linhas de pagamento 
display(pagamento_header)
pagamento_rdd.take(3)

['funcPK', 'equipePK', 'dataPK', 'cargoPK', 'salario', 'quantidadeLancamentos']

[('147', '2', '5', '64', '1559.94', '1'),
 ('124', '2', '5', '329', '8102.77', '1'),
 ('175', '1', '5', '328', '2532.51', '1')]

In [22]:
#resposta do exercício 1

#exibindo os metadados e as 3 primeiras linhas de negociacao 
display(negociacao_header)
negociacao_rdd.take(3)

['equipePK', 'clientePK', 'dataPK', 'receita', 'quantidadeNegociacoes']

[('2', '9', '22', '11564.75', '1'),
 ('2', '24', '11', '17990.5', '1'),
 ('2', '28', '21', '16335.9', '1')]

# 4 Uso dos Métodos para Propósitos Específicos

## 4.1 Método map()

O método `map()` pode ser utilizado para selecionar colunas.

In [23]:
#selecionando as seguintes colunas do RDD "funcionario":  segunda, terceira, décima
funcionario_rdd \
  .map(lambda x: (x[1], x[2], x[9])) \
  .take(5)

[('M-1', 'ALINE ALMEIDA', 'SAO PAULO'),
 ('M-2', 'ARAO ALVES', 'SAO PAULO'),
 ('M-3', 'ARON ANDRADE', 'SAO PAULO'),
 ('M-4', 'ADA BARBOSA', 'SAO PAULO'),
 ('M-5', 'ABADE BATISTA', 'SAO PAULO')]

### **EXERCÍCIO 2**

Selecione as seguintes colunas do RDD `cliente`: primeira, segunda, terceira e sexta. Exiba as 5 primeiras linhas resultantes. Consulte `cliente_header` caso necessário. 

In [24]:
# Resposta do exercício

#exibindo quais são as colunas do RDD "cliente"
display(cliente_header)

['clientePK',
 'clienteNomeFantasia',
 'clienteSetor',
 'clienteCidade',
 'clienteEstadoNome',
 'clienteEstadoSigla',
 'clienteRegiaoNome',
 'clienteRegiaoSigla',
 'clientePaisNome',
 'clientePaisSigla']

In [25]:
# Resposta do exercício

#selecionando as seguintes colunas do RDD "cliente":  primeira, segunda, terceira e sexta
#exibindo as 5 primeiras linhas
cliente_rdd \
  .map(lambda x: (x[0], x[1], x[2], x[5])) \
  .take(5)

[('1', 'VIA FOOD', 'BEBIDAS E ALIMENTOS', 'SP'),
 ('2', 'VIA PIZZA', 'BEBIDAS E ALIMENTOS', 'SP'),
 ('3', 'VIA JAPA', 'BEBIDAS E ALIMENTOS', 'SP'),
 ('4', 'VIA VEG', 'BEBIDAS E ALIMENTOS', 'SP'),
 ('5', 'VIA DRINK', 'BEBIDAS E ALIMENTOS', 'SP')]

## 4.2 Método filter()

O método `filter()` pode ser utilizado para filtrar valores de acordo com  critérios de seleção.

No exemplo a seguir, o método `filter()` é utilizado para filtrar os funcionários que **não** são do estado de `'SAO PAULO'`.

In [26]:
#exibindo os 5 primeiros funcionarios
#verificando que esses funcionarios são do estado de SAO PAULO.
funcionario_rdd.take(5)

[('1',
  'M-1',
  'ALINE ALMEIDA',
  'F',
  '1/1/1990',
  '1',
  '1',
  '1990',
  'SAO PAULO',
  'SAO PAULO',
  'SP',
  'SUDESTE',
  'SE',
  'BRASIL',
  'BR'),
 ('2',
  'M-2',
  'ARAO ALVES',
  'M',
  '2/2/1990',
  '2',
  '2',
  '1990',
  'CAMPINAS',
  'SAO PAULO',
  'SP',
  'SUDESTE',
  'SE',
  'BRASIL',
  'BR'),
 ('3',
  'M-3',
  'ARON ANDRADE',
  'M',
  '3/3/1990',
  '3',
  '3',
  '1990',
  'SANTOS',
  'SAO PAULO',
  'SP',
  'SUDESTE',
  'SE',
  'BRASIL',
  'BR'),
 ('4',
  'M-4',
  'ADA BARBOSA',
  'F',
  '4/4/1990',
  '4',
  '4',
  '1990',
  'SANTO ANDRE',
  'SAO PAULO',
  'SP',
  'SUDESTE',
  'SE',
  'BRASIL',
  'BR'),
 ('5',
  'M-5',
  'ABADE BATISTA',
  'M',
  '5/5/1990',
  '5',
  '5',
  '1990',
  'PIRACICABA',
  'SAO PAULO',
  'SP',
  'SUDESTE',
  'SE',
  'BRASIL',
  'BR')]

In [27]:
#aplicando o método filter() para recuperar apenas os funcionarios que não são do estado de SAO PAULO
#mostrando os 5 primeiros funcionarios que atende ao critério de seleção
def filterFuncionarioEstado(estado):
  return True if estado[9] != 'SAO PAULO' else False

funcionario_rdd \
  .filter(filterFuncionarioEstado) \
  .map(lambda x: (x[1], x[2], x[9])) \
  .take(5)

[('M-13', 'ABDIEL DIAS', 'RIO DE JANEIRO'),
 ('M-14', 'ABDALA DUARTE', 'RIO DE JANEIRO'),
 ('M-15', 'ABDALLA FREITAS', 'RIO DE JANEIRO'),
 ('M-16', 'ABDALLA FERNANDES', 'RIO DE JANEIRO'),
 ('M-17', 'ABDAO FERREIRA', 'MINAS GERAIS')]

### **EXERCÍCIO 3**

Recupere os clientes que moram no estado de Minas Gerais.

In [28]:
# Resposta do exercício

#identificando qual a coluna a ser considerada na filtragem dos dados
display(cliente_header)

['clientePK',
 'clienteNomeFantasia',
 'clienteSetor',
 'clienteCidade',
 'clienteEstadoNome',
 'clienteEstadoSigla',
 'clienteRegiaoNome',
 'clienteRegiaoSigla',
 'clientePaisNome',
 'clientePaisSigla']

In [29]:
#resposta do exercício 3

#aplicando o método filter() para recuperar apenas os clientes que são do estado de MINAS GERAIS
#mostrando os 5 primeiros clientes que atende ao critério de seleção
def filterClienteEstado(estado):
  return True if estado[4] == 'MINAS GERAIS' else False

cliente_rdd \
  .filter(filterClienteEstado) \
  .map(lambda x: (x[0], x[1], x[2], x[4], x[5])) \
  .take(5)

[('10', 'VIA LIFE', 'SAUDE', 'MINAS GERAIS', 'MG'),
 ('11', 'VIA MED', 'SAUDE', 'MINAS GERAIS', 'MG'),
 ('30', 'SR. HAPPY HOUR', 'BEBIDAS E ALIMENTOS', 'MINAS GERAIS', 'MG'),
 ('31', 'SR. LIFE', 'SAUDE', 'MINAS GERAIS', 'MG'),
 ('50', 'SR. FRIENDS', 'BEBIDAS E ALIMENTOS', 'MINAS GERAIS', 'MG')]

## 4.3 Método join()

O método `join()` pode ser utilizado para juntar duas tabelas de acordo com a integridade referencial, ou seja, de acordo com a chave primária presente em uma primeira tabela e a chave secundária presente em uma segunda tabela.

No exemplo a seguir, o método join() é utilizado para juntar dados da tabela de dimensão funcionario com dados da tabela de dimensão pagamento, utilizando a junção estrela definida em termos de funcionario.funcPK = pagamento.funcPK.

In [30]:
#listando os metadados de funcionário
display(funcionario_header)

#criando um RDD temporário para funcionário, contendo apenas algumas colunas 
funcionario_temp = funcionario_rdd \
  .map(lambda x: (x[0], x[2], x[9]))

#listando os 5 primeiros elementos de "funcionario_temp"
funcionario_temp \
  .take(5)

['funcPK',
 'funcMatricula',
 'funcNome',
 'funcSexo',
 'funcDataNascimento',
 'funcDiaNascimento',
 'funcMesNascimento',
 'funcAnoNascimento',
 'funcCidade',
 'funcEstadoNome',
 'funcEstadoSigla',
 'funcRegiaoNome',
 'funcRegiaoSigla',
 'funcPaisNome',
 'funcPaisSigla']

[('1', 'ALINE ALMEIDA', 'SAO PAULO'),
 ('2', 'ARAO ALVES', 'SAO PAULO'),
 ('3', 'ARON ANDRADE', 'SAO PAULO'),
 ('4', 'ADA BARBOSA', 'SAO PAULO'),
 ('5', 'ABADE BATISTA', 'SAO PAULO')]

In [31]:
#listando os metadados de pagamento
display(pagamento_header)

#criando um RDD temporário para pagamento, contendo apenas as colunas referentes à funcPK e às medidas numéricas
pagamento_temp = pagamento_rdd\
  .map(lambda x: (x[0], x[4], x[1]))

#listando os 5 primeiros elementos de "pagamento_temp" para o funcionário com funcPK = 5
def filterPagamentoFuncionario(func):
  return True if func[0] == '4' else False
pagamento_temp.filter(filterPagamentoFuncionario).take(5)

['funcPK', 'equipePK', 'dataPK', 'cargoPK', 'salario', 'quantidadeLancamentos']

[('4', '10498.14', '5'),
 ('4', '10498.14', '5'),
 ('4', '10498.14', '5'),
 ('4', '10498.14', '5'),
 ('4', '10498.14', '5')]

In [32]:
#realizando a junção de funcionario_temp com pagamento_temp, para o funcionário com funcPK igual a 5
#note que a juncao é feita considerando a igualdade na primeira coluna
#note também que somente os valores da segunda coluna de funcionario e de pagamento são retornados 
funcionario_temp \
  .join(pagamento_temp) \
  .take(5)

[('4', ('ADA BARBOSA', '10498.14')),
 ('4', ('ADA BARBOSA', '10498.14')),
 ('4', ('ADA BARBOSA', '10498.14')),
 ('4', ('ADA BARBOSA', '10498.14')),
 ('4', ('ADA BARBOSA', '10498.14'))]

### **EXERCÍCIO 4**

Realize a junção da tabela cliente com a tabela negociacao, considerando a integridade referencial definida em termos de clientePK (ou seja, cliente.clientePK = negociacao.clientePK). 

In [33]:
# Resposta do exercício

display(cliente_header)
display(negociacao_header)

['clientePK',
 'clienteNomeFantasia',
 'clienteSetor',
 'clienteCidade',
 'clienteEstadoNome',
 'clienteEstadoSigla',
 'clienteRegiaoNome',
 'clienteRegiaoSigla',
 'clientePaisNome',
 'clientePaisSigla']

['equipePK', 'clientePK', 'dataPK', 'receita', 'quantidadeNegociacoes']

In [34]:
# Resposta do exercício

cliente_rdd.join(negociacao_rdd).take(5)

[('4', ('VIA VEG', '4')),
 ('4', ('VIA VEG', '18')),
 ('4', ('VIA VEG', '36')),
 ('4', ('VIA VEG', '135')),
 ('4', ('VIA VEG', '142'))]

## 4.4 Método reduceByKey()

O método `reduceByKey()` pode ser utilizado para calcular valores agregados para cada valor de chave presente em uma coluna. 

No exemplo a seguir, é listada a quantidade de funcionários por estado.

Para responder a essa consulta, é necessário utilizar os dados de `funcionario_rdd`, usando a coluna `funcEstadoNome` e contando quantas vezes o mesmo nome de estado aparece. São executados os seguintes passos:

- Utilização do método `map()` para selecionar a coluna desejada `funcEstadoNome`, que é a décima coluna de `funcionario_rdd`, e para criar pares chave-valor da seguinte forma: a chave corresponde à coluna `funcEstadoNome` e o valor corresponde a 1.
- Utilização do método `reduceByKey()` para calcular, para cada chave a quantidade de vezes que ela aparece.
- Utilização do método `collect()` para exibir os pares chave-valor obtidos.

In [35]:
#verificando o esquema de funcionario_rdd
display(funcionario_header)

['funcPK',
 'funcMatricula',
 'funcNome',
 'funcSexo',
 'funcDataNascimento',
 'funcDiaNascimento',
 'funcMesNascimento',
 'funcAnoNascimento',
 'funcCidade',
 'funcEstadoNome',
 'funcEstadoSigla',
 'funcRegiaoNome',
 'funcRegiaoSigla',
 'funcPaisNome',
 'funcPaisSigla']

In [36]:
#executando os passos definidos anteriormente
funcionario_rdd \
  .map(lambda x: (x[9], 1)) \
  .reduceByKey(lambda x, y: x + y) \
  .collect()

[('MINAS GERAIS', 28),
 ('PARANA', 28),
 ('PERNAMBUCO', 21),
 ('SAO PAULO', 95),
 ('RIO DE JANEIRO', 28)]

### **EXERCÍCIO 5** 

Liste a quantidade de clientes por região. Ordene o resultado pelo nome da região em ordem crescente.

Para responder a essa consulta, é necessário utilizar os dados de `cliente_rdd`, usando a coluna `clienteRegiaoNome` e contando quantas vezes o mesmo nome de região aparece. São executados os seguintes passos:

- Utilização do método `map()` para selecionar a coluna desejada `clienteRegiaoNome`, que é a sétima coluna de `funcionario_rdd`, e para criar pares chave-valor da seguinte forma: a chave corresponde à coluna `clienteRegiaoNome` e o valor corresponde a 1.
- Utilização do método `reduceByKey()` para calcular, para cada chave a quantidade de vezes que ela aparece.
- Utilização do método `sortBy()` para ordenar o resultado pelo nome da região em ordem crescente.
- Utilização do método `collect()` para exibir os pares chave-valor obtidos.

In [37]:
# Resposta do exercício

#verificando o esquema de cliente_rdd
display(cliente_header)

['clientePK',
 'clienteNomeFantasia',
 'clienteSetor',
 'clienteCidade',
 'clienteEstadoNome',
 'clienteEstadoSigla',
 'clienteRegiaoNome',
 'clienteRegiaoSigla',
 'clientePaisNome',
 'clientePaisSigla']

In [38]:
# Resposta do exercício

#executando os passos definidos anteriormente
cliente_rdd \
  .map(lambda x: (x[6], 1)) \
  .reduceByKey(lambda x, y: x + y) \
  .sortBy(lambda element: element[0]) \
  .collect()

[('CENTRO-OESTE', 10),
 ('NORDESTE', 30),
 ('NORTE', 10),
 ('SUDESTE', 130),
 ('SUL', 30)]

# 5 Exercícios Adicionais

### **EXERCÍCIO 6** 

Qual o menor salário pago?



Para responder a essa consulta, é necessário utilizar os dados de `pagamento_rdd`, usando a coluna `salario` e aplicando o método `min()`, considerando a seguinte sequência de passos.

- Utilização do método `map()` para transformar o tipo de dados da coluna salario em um número de ponto flutuante (ou seja, float)

- Utilização do método `min()` para descobrir o menor salário.

In [39]:
# Resposta do exercício

#exibindo os metadados de "pagamento_rdd"
display(pagamento_header)

['funcPK', 'equipePK', 'dataPK', 'cargoPK', 'salario', 'quantidadeLancamentos']

In [40]:
# Resposta do exercício

#calculando o menor salário
pagamento_rdd \
  .map(lambda x: float(x[4])) \
  .min()

1501.57

### **EXERCÍCIO 7** 

Qual foi o maior salário pago em 2019?

Para responder a essa consulta, é necessário utilizar os dados de `pagamento_rdd` e de `data_rdd`, desde que `salário` encontra-se em `pagamento_rdd` e o `ano` encontra-se em `data_rdd`. 

A seguinte sequência de passos deve ser realizada:

- Utilização do método `filter()` para identificar as linhas de `data_rdd` que possuem valor igual a `2019`.
- Utilização do método `map()` para transformar o resultado obtido em pares chave-valor da seguinte forma: chave identificada por `dataPK` e valor identificado como o ano desejado (ou seja, 2019). 
- Utilização do método `map()` para selecionar as colunas desejadas de `pagamento_rdd` (`dataPK` e `salario`), bem como para transformar o tipo de dados da coluna `salario` em um número de ponto flutuante (ou seja, float).
- Utilização do método `join()` para juntar os elementos do primeiro RDD calculado e do segundo RDD calculado, usando como base a igualdade da coluna referente à `dataPK`.
- Utilização do método `mapValues()` para mapear os valores referentes ao salário e do método `map()` para remover as chaves referentes à dataPK.
- Utilização do método `max()` para calcular o maior salário.

In [41]:
# Resposta do exercício

#exibindo os metadados de "pagamento_rdd" e de "data_rdd"
display(pagamento_header)
display(data_header)

['funcPK', 'equipePK', 'dataPK', 'cargoPK', 'salario', 'quantidadeLancamentos']

['dataPK',
 'dataCompleta',
 'dataDia',
 'dataMes',
 'dataBimestre',
 'dataTrimestre',
 'dataSemestre',
 'dataAno']

In [42]:
# Resposta do exercício

#identificando as linhas de data_rdd referentes ao ano de 2019 
#transformando o resultado em pares chave-valor
def filterDataAno(ano):
  return True if ano[7] == '2019' else False

data_temp = data_rdd \
  .filter(filterDataAno) \
  .map(lambda x: (x[0], x[7]))

data_temp \
  .take(5)

[('1097', '2019'),
 ('1098', '2019'),
 ('1099', '2019'),
 ('1100', '2019'),
 ('1101', '2019')]

In [43]:
# Resposta do exercício

#identificando as colunas de pagamento_rdd desejadas
#transformando o resultado em pares chave-valor
pagamento_temp = pagamento_rdd \
  .map(lambda x: (x[2], float(x[4])))

pagamento_temp \
  .take(5)

[('5', 1559.94), ('5', 8102.77), ('5', 2532.51), ('5', 7882.7), ('5', 4404.59)]

In [44]:
# Resposta do exercício

#realizando a junção, mapeando o valor dos salários e removendo a chave dataPK
#calculando o maior salário
pagamento_temp \
  .join(data_temp) \
  .mapValues(lambda x: x[0]) \
  .map(lambda x: x[1]) \
  .max()

47140.17

### **EXERCÍCIO 8** 

Qual a idade média dos funcionários? Para fazer este exercício, não precisa considerar a idade exata, ou seja, não é necessário considerar o dia no qual o funcionário nasceu. Considere apenas o ano de nascimento do funcionário.  

**EXERCÍCIO 8a** Resolva o exercício utilizando o método mean().

A seguinte sequência de passos deve ser realizada:

- Utilização do método `map()` para obter os anos de nascimento dos funcionários.
- Utilização do método `map()` para calcular a idade do funcionário, considerando o ano atual de 2020 e o ano no qual o funcionário nasceu. 
- Utilização do método `mean()` para gerar o resultado final.

In [45]:
# Resposta do exercício

#criando uma função que calcula o ano a partir de idade
def idadeApartirDeAno(ano):
  anoAtual = 2020
  return anoAtual - int(ano)

#calculando a idade média dos funcionários usando o método mean()
funcionario_rdd \
  .map(lambda x: (x[7])) \
  .map(lambda x: idadeApartirDeAno(x)) \
  .mean()

37.595

**EXERCÍCIO 8b** Resolva o exercício sem usar o método mean().

A seguinte sequência de passos deve ser realizada:

- Utilização do método `map()` para obter os anos de nascimento dos funcionários.
- Utilização do método `map()` para calcular a idade do funcionário, considerando o ano atual de 2020 e o ano no qual o funcionário nasceu. 
- Utilização do método `map()` para gerar pares chave-valor, de forma que a chave seja a idade do funcionário e o valor seja 1.
- Utilização do método `reduceByKey()` reduzindo pelas chaves e somando as idades e as quantidades de linhas.
- Utilização do método `mapValues()` para mapear os valores e para dividir as somas das idades pelas quantidades de linhas.
- Utilização do método `map()` para mapear a idade média dos funcionários.

In [46]:
# Resposta do exercício

#criando uma função que calcula o ano a partir de idade
def idadeApartirDeAno(ano):
  anoAtual = 2020
  return anoAtual - int(ano)

#calculando a idade média dos funcionários usando o método mean()
funcionario_rdd \
  .map(lambda x: (x[7])) \
  .map(lambda x: idadeApartirDeAno(x)) \
  .map(lambda x: (1, (x, 1))) \
  .reduceByKey(lambda x, y: (x[0]+y[0], x[1]+y[1])) \
  .mapValues(lambda x: x[0]/x[1]) \
  .map(lambda x: x[1]) \
  .collect()


[37.595]

### EXERCÍCIO 9

Liste a quantidade de vezes que cada palavra individual aparece nos nomes de cidades que os funcionários moram. Por exemplo, na cidade de SAO JOSE DO RIO PRETO, existem 5 palavras individuais. Ordene o resultado de forma que as palavras individuais que mais se repetem sejam mostradas primeiro. 


In [47]:
# Resposta do exercício
display(funcionario_header)

['funcPK',
 'funcMatricula',
 'funcNome',
 'funcSexo',
 'funcDataNascimento',
 'funcDiaNascimento',
 'funcMesNascimento',
 'funcAnoNascimento',
 'funcCidade',
 'funcEstadoNome',
 'funcEstadoSigla',
 'funcRegiaoNome',
 'funcRegiaoSigla',
 'funcPaisNome',
 'funcPaisSigla']

In [48]:
# Resposta do exercício
funcionario_rdd.map(lambda x: x[8]). \
    flatMap(lambda line: line.split(" ")). \
    map(lambda word: (word, 1)). \
    reduceByKey(lambda x, y: x + y). \
    sortBy(lambda word_count: word_count[1], ascending=False). \
    collect()

[('SAO', 24),
 ('PRETO', 23),
 ('RECIFE', 21),
 ('RIO', 15),
 ('CAMPINAS', 8),
 ('RIBEIRAO', 8),
 ('ILHA', 8),
 ('JOSE', 8),
 ('PAULO', 8),
 ('SANTOS', 8),
 ('SANTO', 8),
 ('ANDRE', 8),
 ('PIRACICABA', 8),
 ('CARLOS', 8),
 ('BELA', 8),
 ('DO', 8),
 ('OSASCO', 8),
 ('ARARAQUARA', 8),
 ('BARUERI', 7),
 ('DE', 7),
 ('REDONDA', 7),
 ('ANGRA', 7),
 ('DOS', 7),
 ('PRAIA', 7),
 ('SECA', 7),
 ('HORIZONTE', 7),
 ('ARAGUARI', 7),
 ('MONTE', 7),
 ('CURITIBA', 7),
 ('GUARATUBA', 7),
 ('MORRETES', 7),
 ('JANEIRO', 7),
 ('VOLTA', 7),
 ('REIS', 7),
 ('BELO', 7),
 ('OURO', 7),
 ('VERDE', 7),
 ('LONDRINA', 7)]