# <center><span style="font-size: 42px;color: darkgreen;">Projeto - Design De Um MapReduce com Spark e MRJob para os Gastos Totais por Cliente</center></span>

<br><br>

---

<br>

# Análise de Vendas em Grandes Empresas

<br><br>

## Introdução

Você já parou para pensar quantas vendas são realizadas por dia em grandes empresas como, por exemplo, **Amazon** ou **Walmart**?  
Empresas que faturam bilhões vendendo os mais variados produtos para um grande número de clientes.

E se você fosse contratado para um projeto em uma dessas empresas e seu primeiro trabalho fosse calcular o total de vendas por cliente?  
Tarefa aparentemente simples. Sua primeira abordagem talvez fosse buscar o banco de dados transacional com as informações de vendas, cruzar os dados com o cadastro de clientes e obter o valor total gasto por cliente.

<br><br>

---

<br>

## O Problema

Mas quantos clientes uma empresa como a Amazon possui?  
E se a solicitação fosse para gerar o total gasto por cliente nos últimos **5 anos**, de modo a criar uma campanha personalizada para os clientes que tiveram os maiores gastos ao longo dos anos?

Após alguma pesquisa, você poderia obter um dataset no seguinte formato:

<br>

| Código do cliente | Valor gasto em uma única compra |
|-------------------|---------------------------------|
| 1288              | 99.90                          |
| 1029              | 349.12                         |
| 1284              | 5.76                           |

<br>

Sua pesquisa identificou que todos os registros dos últimos **5 anos** geram um dataset com apenas duas colunas, mas **200 milhões de registros**. Definitivamente, esse não é um trabalho para um banco de dados relacional.

<br><br>

---

<br>

## A Solução

Você precisa de uma ferramenta que possa **rapidamente processar os dados e retornar apenas um valor total por cliente**.  
Você então decide criar um job de **MapReduce**. Com poucas linhas de código e usando a linguagem **Python**, você consegue gerar o resultado esperado.

Mas ainda tem um problema: **Como processar esse job da forma mais rápida possível?**

<br><br>

---

<br>

## Big Data: A Solução Ideal

**Spark**/**Hadoop** é a solução ideal. Esse é um exemplo claro de projeto de **Big Data**.  
Um grande volume de dados e tudo que você precisa é extrair uma simples informação, que poderá fazer toda a diferença na estratégia da empresa.

<br><br>

---

<br>

# Objetivo

<br>

Seu trabalho agora é **criar um Job de MapReduce que processe o grande volume de dados para calcular o total de gastos por cliente**. O objetivo principal é consolidar os dados de várias transações para cada cliente, somando os valores de todas as compras realizadas nos últimos 5 anos. Esse resultado permitirá identificar os clientes com os maiores gastos, possibilitando a criação de campanhas personalizadas e estratégias de retenção.

<br>

#### Explicação do Objetivo

<br>

1. **Entrada**: Um dataset com milhões de registros contendo o identificador do cliente e o valor gasto em cada transação.

<br>

2. **Processo**:

   - **Map**: Dividir os dados por cliente e preparar os valores para serem somados.
   - **Reduce**: Consolidar os valores de todas as transações por cliente, calculando o total gasto por cada um.

<br>

3. **Saída**: Um conjunto de dados com os identificadores de clientes e o total de gastos consolidado, no formato:

```bash
Id_Cliente, Gasto_Total
```

<br><br><br><br><br>

---

<br><br><br><br>

# <center><span style="font-size: 42px;color: darkgreen;">Iniciando o Projeto</center></span>


<br><br><br><br>


# Etapa 1. - Iniciando os Serviços

<br>

Ná **máquina virtual** executar os comandos abaixo:

<br>

- **1.1 Iniciar o HDFS (NameNode, DataNode, SecondaryNameNode)**:
   ```bash
   start-dfs.sh  |  stop-dfs.sh
   ```
- **1.2 Iniciar o YARN (ResourceManager, NodeManager)**:
   ```bash
   start-yarn.sh  |  stop-yarn.sh
   ```
- **1.3 Verificando serviços**:
   ```bash
   jps
   ```  
<br>

- **1.4 Verificar o Status do Safe Mode**:
   ```bash
   hdfs dfsadmin -safemode get
   ```
  - **1.4.1 Se o Safe Mode estiver ativado, forçar a saída**:
  ```bash
  hdfs dfsadmin -safemode leave
  ```

<br>

- **1.5 Criando diretório no HDFS**:
   ```bash
   hdfs dfs -mkdir /user/projetos
   hdfs dfs -mkdir /user/projetos/design_mapreduce_gastos_totais
   hdfs dfs -mkdir /user/projetos/design_mapreduce_gastos_totais/datasets
   ```
<br>

- **1.6 Define permissões amplas para evitar problemas de acesso**:
   ```bash
   hdfs dfs -chmod 777 /user/projetos/design_mapreduce_gastos_totais
   ```
   
<br>

- **1.7 Copiar o arquivo para o HDFS**:
    ```bash
    hdfs dfs -copyFromLocal /home/hadoop/Documents/Datasets/gastos-cliente.csv /user/projetos/design_mapreduce_gastos_totais/datasets/
    
    hdfs dfs -ls /user/projetos/design_mapreduce_gastos_totais/datasets
    ```
<br>

- **1.8 Visualizar as primeiras linhas**:
    ```bash
    hdfs dfs -cat /user/projetos/design_mapreduce_gastos_totais/datasets/gastos-cliente.csv | head -n 10
    ```
    
<br><br><br>

# Etapa 2. - Criando o MapReduce (Utilizando MRJob)

<br>

Criar o código para MapReduce utilizando a biblioteca **MRJob**, que permite executar jobs MapReduce no Hadoop.

<br>

#### Versão 1
```python
# -*- coding: utf-8 -*-
from mrjob.job import MRJob

class MRGastos_Cliente(MRJob):
	def mapper(self, key, line):
		values = line.split(',')

		# Sabendo que o dataset contém 3 colunas sem cabeçalho representado por ID, ID_Cliente e Gasto_Total.
		# Vamos mapeas apenas ID_Cliente e Gasto_Total.

		id_cliente = values[1]
		gasto_total = float(values[2])  # Convertendo para somar no reducer

		# Emitir o par chave-valor
		yield id_cliente, gasto_total


	def reducer(self, id_cliente, gastos):

		# Somando todas as ocorrências por cliente
		yield id_cliente, sum(gastos)

if __name__ == '__main__':
	MRGastos_Cliente.run()
```

#### Versão 2
```python
# -*- coding: utf-8 -*-
from mrjob.job import MRJob
from mrjob.step import MRStep

class MRGastos_Cliente(MRJob):
    
    def mapper(self, _, line):
        values = line.split(',')

        # Sabendo que o dataset contém 3 colunas sem cabeçalho representado por ID, ID_Cliente e Gasto_Total.
        # Vamos mapear apenas ID_Cliente e Gasto_Total.

        id_cliente = values[1]
        gasto_total = float(values[2])  # Convertendo para somar no reducer

        # Emitir o par chave-valor
        yield id_cliente, gasto_total

    def reducer(self, id_cliente, gastos):
        # Somando todas as ocorrências por cliente
        total_gasto = sum(gastos)
        # Emitir o resultado (todos os clientes e seus gastos totais)
        yield None, (total_gasto, id_cliente)

    def reducer_final(self, _, id_cliente_gasto):
        # Ordenando os resultados por gasto e pegando os 15 maiores
        top_15 = sorted(id_cliente_gasto, reverse=True, key=lambda x: x[0])[:15]

        # Exibir no terminal somente os 15 maiores
        for gasto, id_cliente in top_15:
            print(f"Cliente {id_cliente} gastou {gasto}")

        # Salvar todos os resultados no HDFS
        #with open('/mnt/data/resultados_completos.txt', 'w') as f:
            #for gasto, id_cliente in id_cliente_gasto:
                #f.write(f"Cliente {id_cliente} gastou {gasto}\n")

    def steps(self):
        # Definir os passos para o job
        return [
            MRStep(mapper=self.mapper,
                   reducer=self.reducer),
            MRStep(reducer=self.reducer_final)
        ]

if __name__ == '__main__':
    MRGastos_Cliente.run()
```

<br><br><br>

# Etapa 3. - Executando o MapReduce (Utilizando MRJob)

<br>

Ir ao terminal e digitar o comando:

<br>

```bash
python MapReduceGastosCliente.py hdfs:////user/projetos/design_mapreduce_gastos_totais/datasets/gastos-cliente.csv  -r hadoop --python-bin ~/.conda/envs/py397/bin/python
```

<br><br><br><br><br><br>





# Etapa 4. - Criando o MapReduce (Utilizando Spark)

<br>

Agora vamos criar o código para MapReduce utilizando **PySpark**. Este código pode ser executado com o comando `spark-submit`.

<br>

#### Versão 1
```python
from pyspark import SparkConf, SparkContext

# Configuração do Spark Context, pois o job será executado via linha de comando com o spark-submit
conf = SparkConf().setMaster("local").setAppName("GastosPorCliente")
sc = SparkContext(conf=conf)

# Função de mapeamento que separa cada um dos campos no dataset
def MapCliente(line):
    campos = line.split(',')
    return (int(campos[1]), float(campos[2]))  # Retorna uma tupla (ID_Cliente, Gasto_Total)

# Leitura do dataset a partir do HDFS
input = sc.textFile("hdfs://localhost:9000/user/projetos/design_mapreduce_gastos_totais/datasets/gastos-cliente.csv")

mappedInput = input.map(MapCliente)

# Operação de redução por chave para calcular o total gasto por cliente
totalPorCliente = mappedInput.reduceByKey(lambda x, y: x + y)

# Imprime o resultado
resultados = totalPorCliente.collect()
for resultado in resultados:
    print(resultado)

# Salvar os resultados completos no HDFS
totalPorCliente.saveAsTextFile("hdfs://localhost:9000/user/projetos/design_mapreduce_gastos_totais/saida_resultados_completos")
```

### Explicação do código:

1. **SparkContext**: Configura o ambiente do Spark, definindo a aplicação como "GastosPorCliente". A opção `local` significa que o job será executado localmente (não em um cluster distribuído).

2. **MapCliente**: Função de mapeamento que transforma cada linha do arquivo CSV em uma tupla contendo o `ID_Cliente` (inteiro) e o `Gasto_Total` (float).

3. **Leitura do Dataset**: Utiliza o Spark para ler o arquivo CSV do HDFS utilizando `sc.textFile("hdfs://...")`.

4. **reduceByKey**: A operação de redução soma os gastos por cliente.

5. **Impressão e Salvamento**: Os resultados são coletados e impressos no terminal. Além disso, os resultados completos (todos os clientes) são salvos no HDFS.

<br><br><br>

# Etapa 5. - Executando o MapReduce (Utilizando Spark)

<br>

Ir ao terminal e digitar o comando:

<br>

```bash
spark-submit --master local --deploy-mode client --conf spark.executor.memory=2g --conf spark.driver.memory=2g MapReduceGastosClientes_Spark.py
```

<br><br><br><br><br><br>

# Resumo do Projeto: Design de um MapReduce com Spark para os Gastos Totais por Cliente

## Objetivo do Projeto
O objetivo principal deste projeto foi criar um job de MapReduce utilizando **PySpark** para calcular o total de gastos por cliente em um grande conjunto de dados. A empresa fictícia **Amazon** ou **Walmart** foi utilizada como exemplo para ilustrar a necessidade de processar milhões de registros de vendas para identificar os clientes que mais gastaram ao longo de um período de cinco anos.

O job foi projetado para:

- **Entrada**: Um dataset com milhões de registros, onde cada linha representa uma transação de compra contendo o **ID do Cliente** e o **Valor Gasto**.
- **Processo**: Utilizou-se a abordagem de MapReduce para:
  - **Map**: Separar os dados por cliente, atribuindo os valores de compras a cada cliente.
  - **Reduce**: Somar os valores de todas as compras feitas por um cliente, obtendo o total gasto por ele.
- **Saída**: Um conjunto de dados com o **ID do Cliente** e o **Gasto Total** consolidado.

---

## Passos do Projeto

### Preparação do Ambiente e Iniciação dos Serviços

- O ambiente foi configurado utilizando **Hadoop** e **Spark**. A máquina virtual foi configurada para rodar os serviços necessários (**HDFS**, **YARN**, **Spark**).
- Foram criados diretórios no **HDFS** para armazenar os dados e o resultado final. Os arquivos de dados foram copiados para o HDFS, e a segurança de acesso foi configurada para garantir que o job fosse executado corretamente.

### MapReduce com MRJob (Versões 1 e 2)

Inicialmente, a solução foi implementada usando a biblioteca **MRJob**, que permite a execução de jobs MapReduce no Hadoop. Duas versões do código foram criadas:

- **Versão 1**: Utilizando a função `mapper` para mapear o **ID do Cliente** e o **Gasto Total**, e a função `reducer` para somar os gastos por cliente.
- **Versão 2**: Incluindo um segundo estágio no job para selecionar e exibir os 15 clientes com maior gasto total, além de salvar todos os resultados no HDFS.

### MapReduce com Spark (PySpark)

Após a implementação inicial com MRJob, a solução foi otimizada utilizando **PySpark**, que oferece maior flexibilidade e desempenho para processamento distribuído.

- **PySpark** foi configurado e o código de MapReduce foi criado utilizando a função `map` para dividir os dados e `reduceByKey` para somar os valores.
- O job foi executado localmente com o comando `spark-submit` e os resultados foram salvos no **HDFS**, conforme especificado.

### Execução do Job

- O job foi executado com sucesso em ambas as abordagens (**MRJob** e **PySpark**). O resultado da execução foi a soma dos gastos por cliente, que foi coletada e salva em um diretório do **HDFS**.
- Durante a execução, a configuração do **HDFS** e do **Spark** foi ajustada para garantir que a quantidade de memória e a configuração de recursos fosse adequada para processar os dados grandes sem problemas de desempenho.

---

## Conclusão

O projeto foi concluído com sucesso, e o objetivo de calcular os gastos totais por cliente foi alcançado utilizando **MapReduce** com **PySpark**. A solução foi capaz de processar grandes volumes de dados, somando os valores de transações de forma eficiente, e foi escalável o suficiente para rodar em um cluster Hadoop se necessário.

O projeto demonstrou como **Big Data** e **MapReduce** são poderosas ferramentas para resolver problemas de análise de grandes volumes de dados, como o cálculo de gastos totais de clientes em empresas de grande porte.

