# <center>Labs Trabalhando com o MapReduce</center>

### IMPORTANTE

- Para estes laboratórios foram fornecidos os arquivos python como `AvaliaFilme.py` , `AnalisaFilme.py` e `AmigosIdade.py` entre outros contendo **Jobs de MapReduce** utilizando a linguagem Python e também os arquivos `ml-100k.zip` contendo o **dataset** a ser usado no *Laboratório 1* assim como outros datasets como `amigos_facebook.csv` entre outros.

<br> <b>
    
---
    
    
<br> <b>

# O que é MapReduce?

O MapReduce é um modelo de programação distribuído utilizado principalmente em frameworks como o Hadoop para processar grandes volumes de dados. Ele permite que problemas complexos sejam divididos em pequenas partes que podem ser processadas paralelamente em um cluster de servidores. Esse modelo é particularmente útil para cenários de Big Data, onde o volume de dados é grande demais para ser processado de maneira eficiente em uma única máquina.

### Como Funciona o Modelo MapReduce

O modelo MapReduce é composto por duas fases principais: **Map** (Mapeamento) e **Reduce** (Redução), além de uma fase intermediária chamada **Shuffle**.

- **1. Fase de Mapeamento (Map)**: O processo começa com o cientista de dados analisando o problema e definindo como os dados de entrada serão representados em pares de chave e valor. Por exemplo, em uma contagem de palavras em um conjunto de textos, as palavras individuais podem ser as chaves e a contagem (geralmente 1) seria o valor associado.

O cientista de dados desenvolve um programa de mapeamento (Mapper), que aplica essas regras de negócios, "quebrando" os dados de entrada em pequenos pares de chave/valor. Cada bloco de dados de entrada é então processado pelo Mapper, que gera os pares conforme definido no código.

**Exemplo**: Se os dados de entrada forem "Deer, Bear, River, Car", o Mapper pode gerar os seguintes pares: ("Deer", 1), ("Bear", 1), ("River", 1), ("Car", 1).

<br>

- **2. Fase Intermediária** - Shuffle: Após a fase de mapeamento, ocorre a fase de Shuffle. O Shuffle é executado automaticamente pelo framework (neste caso, o Hadoop), sem que o cientista de dados precise intervir diretamente. A função dessa fase é agrupar todos os pares de chave/valor gerados pela etapa de mapeamento.

**Exemplo**: Se em várias partes do texto foram encontradas várias ocorrências da palavra "Car", o Shuffle agrupará todas essas ocorrências em um único lugar para que possam ser processadas na fase seguinte.

<br>

- **3. Fase de Redução (Reduce)**: Na fase de Reduce, os pares agrupados pelo Shuffle são processados para gerar o resultado final. O cientista de dados define como será a redução dos dados. No caso da contagem de palavras, a redução pode ser simplesmente somar os valores para cada chave.

**Exemplo**: Se os pares de chave/valor após o Shuffle são ("Car", [1, 1, 1]), a fase de redução somará esses valores, resultando em ("Car", 3).

Essa fase de redução retorna a informação processada e consolidada que o cientista de dados precisa para resolver o problema em questão. No exemplo, a saída seria uma lista das palavras e suas respectivas contagens, como ("Car", 3), ("Deer", 2), ("Bear", 2).

<br><br>

---

<br><br><br>

# <center>Laboratório 1 (Avalia Filme)</center>

<br><br>


# Objetivo do Laboratório 1

<br>

Utilizando um **dataset de sistema de recomendação de filmes**, o objetivo é **executar um programa** que conte a quantidade de avaliações recebidas para cada tipo de nota atribuída aos filmes.

Em outras palavras, vamos verificar **quantas vezes cada filme recebeu determinadas avaliações**.

Por exemplo, se um filme recebeu a nota <i>**4 estrelas**</i> uma vez ou <i>**5 estrelas**</i> sete vezes, isso será contabilizado e apresentado de forma estruturada.


<br>

---

# Pergunta de Negócio

<br>

> **Quantas avaliações de cada tipo um filme recebeu?**

Esta questão nos ajudará a entender o comportamento das avaliações, identificando quais tipos de notas são mais comuns e qual é a distribuição das avaliações para diferentes filmes.

<br>

---

# Sobre o Dataset

<br>

Para este laboratório usaremos o dataset **MovieLens 100K** que é um dos conjuntos de dados mais utilizados para sistemas de recomendação e foi criado pelo GroupLens Research. Ele é frequentemente empregado para avaliar o desempenho de algoritmos colaborativos e análise preditiva na recomendação de filmes.

---

### Descrição Geral do Dataset MovieLens 100K

- **Lançamento**: Abril de 1998
- **Tamanho**: 100.000 avaliações
- **Número de usuários**: 1.000
  - Cada usuário avaliou pelo menos 20 filmes.
- **Número de filmes**: 1.700
  - Filmes lançados até 1998, com múltiplos gêneros.
- **Formato das avaliações**: Escala de 1 a 5 estrelas.

---


### Estrutura dos Arquivos

- **`u.data`** (**será usado neste laboratório**):
  - Contém as avaliações no formato (usuário, filme, nota, timestamp).
  - **Exemplo**: 196 242 3 881250949 (Usuário 196 deu nota 3 para o filme 242 no timestamp correspondente).

- **`u.user`**: 
  - Informações sobre os usuários, como (ID, idade, sexo, ocupação, CEP).
  - **Exemplo**: 1|24|M|technician|85711.

- **`u.item`**:
  - Detalhes dos filmes, como (ID, título, data de lançamento, gêneros).
  - **Exemplo**: 1|Toy Story (1995)|01-Jan-1995|Animation|Children's|Comedy.

- **`u.genre`**:
  - Lista os gêneros disponíveis, como Action, Comedy, Drama, etc.

- **`u.occupation`**:
  - Lista as ocupações dos usuários.
  
<br>

- **Arquivos de Treino e Teste** (`*.base` e `*.test`):

 - O dataset é dividido em conjuntos para **treinamento** e **teste**, usados para validar algoritmos de recomendação.
  
<br>

- **README.txt**:
  - Contém instruções sobre a utilização do dataset, incluindo os termos de licença.

---

### Aplicações e Casos de Uso

- **Teste de Sistemas de Recomendação**: Avaliação de algoritmos colaborativos, como Filtragem Colaborativa e Modelos de Fatoração de Matrizes.
- **Análise de Comportamento do Usuário**: Identificação de padrões nas avaliações e preferências.
- **Treinamento e Validação de Modelos de Machine Learning**.

<br><br>

---

<br><br><br>

# <center><u>Iniciando o Laboratório 1</u></center>

<br><br><br>

## 1. Iniciando os Serviços

<br>

1.1 **Iniciar o HDFS (NameNode, DataNode, SecondaryNameNode)**:
   ```bash
   start-dfs.sh  |  stop-dfs.sh
   ```
1.2 **Iniciar o YARN (ResourceManager, NodeManager)**:
   ```bash
   start-yarn.sh  |  stop-yarn.sh
   ```
1.3 **Verificando serviços**:
   ```bash
   jps
   ```
<br> <br> 

---

<br> 

## 2. Criando Pasta/Diretório com o nome de `mapred` para o Laboratório no HDFS

<br>

```bash
    hdfs dfs -mkdir /mapred
```

<br>

2.1 **Lista os arquivos e diretórios no HDFS raiz**.

<br>

```bash
   hdfs dfs -ls /
```

<br> <br> 

---

<br> 

## 3. Copiando o Dataset do `Sistema Operacional Local` para dentro do `HDFS`:

<br>

**Para este laboratório precisaremos do arquivo `u.data` que está dentro da pasta extraída do dataset**.

Abrir pasta onde está o arquivo a ser copiado, abrir um terminal e digitar o comando abaixo:

<br>

```bash
    hdfs dfs -put u.data /mapred
```

<br> <br> 

---

<br> 

## 4. Criando Arquivo Python com o Job para o MapReduce

<br>

#### Código do Arquivo `AvaliaFilme.py`

---

```python
from mrjob.job import MRJob

class MRAvaliaFilme(MRJob):
    def mapper(self, key, line):
        # Dividimos cada linha por tabulação e extraímos os valores
        (userID, movieID, rating, timestamp) = line.split('\t')
        # Emitimos a nota como chave e 1 como valor
        yield rating, 1

    def reducer(self, rating, occurences):
        # Somamos todas as ocorrências da mesma avaliação
        yield rating, sum(occurences)

if __name__ == '__main__':
    # Executa o job MapReduce
    MRAvaliaFilme.run()
```

---

<br>

### 4.1 Entendendo as 3 Fases do Processo de <i>MapReduce</i> considerando o Script `AvaliaFilme.py`

<br><br>

#### Fase 1 – Mapeamento (Mapper)

![Example Image](Analytics/image1.png)

Na fase de mapeamento, o objetivo é **contar quantas vezes cada filme foi avaliado com uma determinada nota**. Utilizamos a palavra reservada `yield` para definir a **chave** que será usada no processo. No nosso caso, a **coluna** `rating` é escolhida como chave, pois queremos saber o **número total de filmes avaliados para cada tipo de nota**, variando de **1 a 5 estrelas**.

Durante esta fase, cada **rating** é **mapeado** e associado ao valor **1**, que representa uma única ocorrência dessa nota para um filme. Este passo é crucial para contabilizar a frequência de cada avaliação.

O **código** responsável por essa fase é escrito pelo **Cientista de Dados**, que define a lógica de como processar as linhas de entrada para gerar os pares de **chave-valor**.

#### Explicando o trecho do código:

- A função `mapper` recebe como entrada cada linha do arquivo de dados.
- A linha é dividida em **quatro partes**: `userID`, `movieID`, `rating` e `timestamp`, com os valores separados por tabulações (`\t`).
- A chave escolhida é a **nota (rating)** dada ao filme, enquanto o valor é o número **1**. Isso indica que para cada avaliação feita, estamos registrando **uma ocorrência** da nota.
- O `yield` é a palavra-chave que emite cada par **chave-valor** para a próxima fase do processo.

<br>

Esse processo de mapeamento é fundamental para preparar os dados para a próxima fase, onde as avaliações serão agrupadas e somadas.

<br><br>

#### Fase 2 – Shuffle e Sort

Essa fase é **processada automaticamente pelo framework MapReduce**. Não há um código explícito para essa fase no script, pois o próprio framework cuida de:

- Agrupar todos os pares de **chave-valor** emitidos pela função `mapper`.
- Organizar os dados com base nas **chaves iguais** (neste caso, os ratings de 1 a 5).
- Após o agrupamento, os pares chave-valor para as notas são semelhantes ao seguinte exemplo:

**Exemplo de Agrupamento:**

```makefile
    1: [1, 1]
    2: [1, 1, 1]
    3: [1, 1]
    4: [1]
```

<br><br>

#### Fase 3 – Redução (Reducer)

![Example Image](Analytics/image2.png)

Na fase de **redução**, o código aplica um cálculo matemático para somar as ocorrências de cada nota (rating). Utilizamos a função `sum()` para agregar todas as ocorrências de uma nota específica, resultando na **quantidade total de filmes avaliados** com aquela nota.

Essa fase é fundamental, pois transforma os dados mapeados em informações úteis: quantos filmes receberam cada tipo de avaliação.

O **Cientista de Dados** é responsável por definir como essa redução será aplicada, usando a lógica apropriada.

#### Explicando o trecho do código:

- A função `reducer` recebe como entrada a **nota (rating)** como chave, e uma lista de **ocorrências** (que são os números 1 gerados na fase de mapeamento).
- A função `sum()` é aplicada a essa lista, somando todas as ocorrências de cada nota.
- O `yield` emite o **rating (chave)** e o total de ocorrências (quantidade de filmes que receberam aquela nota).

<br>

**Exemplo de Resultado Final**:

```makefile
    1: 2
    2: 3
    3: 2
    4: 1
```

**Isso indica que**: 2 filmes receberam nota 1, 3 filmes receberam nota 2, 2 filmes receberam nota 3 e 1 filme recebeu nota 4

<br> <br>

### Resumo das 3 Fases com Código Explicado

- **Mapper (Fase 1)**: Divide as linhas do arquivo e emite pares de chave-valor, onde a **chave é a nota** (`rating`) e o **valor é 1** para cada ocorrência da nota.
- **Shuffle e Sort (Fase 2)**: O framework organiza automaticamente as notas iguais e agrupa as ocorrências.
- **Reducer (Fase 3)**: Soma as ocorrências de cada nota, retornando o total de filmes para cada tipo de nota.

<br>

Esse processo permite **contabilizar o número de filmes** que receberam cada tipo de avaliação (de 1 a 5 estrelas) no dataset.

<br> <br>

---

<br> 

## 5. Executando Arquivo Python com o Job para o MapReduce

<br>

#### Como Executar um Job MapReduce ?

<br>

> Para executar um **Job MapReduce**, precisamos rodar um script Python que utiliza o framework Hadoop para processar os dados. Aqui está uma **explicação sobre como realizar essa execução**.

<br><br><br>

#### 5.1 Executando o Script Python:

<br>

Normalmente, para executar um script Python, bastaria abrir um terminal no diretório onde o arquivo se encontra e digitar:

<br>

```bash
    python AvaliaFilme.py
```

<br><br>

#### 5.2 Passando o Conjunto de Dados como Entrada:

<br>

No contexto do nosso laboratório, o arquivo Python `AvaliaFilme.py` **precisa receber como entrada um conjunto de dados que está armazenado no HDFS (sistema de arquivos distribuído do Hadoop)**.

Para isso, precisamos indicar corretamente o caminho do arquivo de dados no **HDFS** durante a execução do script, da seguinte forma:

<br>

```bash
    python AvaliaFilme.py hdfs:///mapred/u.data -r hadoop
```

<br>

- `hdfs:///mapred/u.data` é o caminho do arquivo de dados no HDFS.
- `-r hadoop` indica que o job será executado usando o Hadoop como backend.

<br><br>

#### 5.3 Explicando a Saída no Terminal (Indicadores de Execução Bem-Sucedida)

<br>

<i>**Job Completado com Sucesso**</i>: Quando o job é finalizado sem erros, você verá uma linha de confirmação, indicando que o processamento foi concluído com sucesso:

<br>

```bash
    Job job_1728940457090_0001 completed successfully
```
<br>

<i>**Map e Reduce Concluídos**</i>: O processo de MapReduce envolve duas etapas: **mapeamento (map)** e **redução (reduce)**. O status de 100% para ambas as fases confirma que o processamento foi concluído corretamente:

```bash
    map 100% reduce 100%
```

<br>

<i>**Resultado Final Exibido**</i>: Após a execução bem-sucedida do job, o resultado é extraído do **HDFS** e apresentado. No nosso exemplo, vemos a quantidade de filmes que receberam cada tipo de nota:

```bash
    "1"  6110
    "2"  11370
    "3"  27145
    "4"  34174
    "5"  21201
```

<br>

**Isso significa que**: 6.110 filmes receberam nota 1, 11.370 filmes receberam nota 2, 27.145 filmes receberam nota 3, 34.174 filmes receberam nota 4 e 21.201 filmes receberam nota 5.

<br>

<i>**Limpeza dos Diretórios Temporários**</i>: Após a conclusão do job, os diretórios temporários no **HDFS** são removidos automaticamente para manter o sistema organizado. Isso garante que nenhum arquivo desnecessário permaneça no sistema após o processamento:

```bash
    Removing HDFS temp directory hdfs:///user/hadoop/tmp/mrjob/AvaliaFilme.hadoop.20241016.205700.452291...
```

<br> <br><br><br><br>

---

<br> <br> <br> <br> <br> 

# <center>Laboratório 2 (Gerar Média de Amigos)</center>

<br><br>


# Objetivo do Laboratório 2

<br>

Utilizando um **dataset que contém dados sobre o número de amigos no Facebook e idades dos usuários**, o objetivo é **executar um programa MapReduce** para calcular a **média de amigos no Facebook por idade**.

Em outras palavras, vamos **calcular a média do número de amigos para cada faixa etária**. Isso nos permitirá entender melhor o comportamento de amizade no Facebook, observando qual faixa etária possui mais conexões na plataforma.

**Por exemplo**, se usuários de 30 anos têm, em média, **200 amigos** e usuários de 40 anos têm, em média, **450 amigos**, esses resultados serão computados e apresentados.


<br>

---

# Pergunta de Negócio

<br>

> **Qual é a média de amigos no Facebook por faixa etária?**

Esta questão nos ajudará a entender o comportamento social dos usuários do Facebook, verificando qual faixa etária tende a ter mais ou menos amigos.

<br>

---

# Sobre o Dataset

<br>

Para este laboratório, vamos utilizar um **dataset de usuários do Facebook**, que contém informações sobre **nomes**, **idades**, e o **número de amigos** que cada usuário possui. O objetivo será calcular a **média de amigos no Facebook por faixa etária**, analisando como o número de amigos varia conforme a idade dos usuários.

#### Descrição Geral do Novo Dataset

- **Número de registros (usuários)**: 500
- **Colunas**: 4
  - **ID do usuário**: Número de identificação único de cada usuário.
  - **Nome**: Nome do usuário.
  - **Idade**: Idade do usuário.
  - **Número de Amigos**: Quantidade de amigos que o usuário possui no Facebook.
  
<br><br>

---

<br><br><br>

# <center><u>Iniciando o Laboratório 2</u></center>

<br><br><br>

## 1. Iniciando os Serviços

<br>

1.1 **Iniciar o HDFS (NameNode, DataNode, SecondaryNameNode)**:
   ```bash
   start-dfs.sh  |  stop-dfs.sh
   ```
1.2 **Iniciar o YARN (ResourceManager, NodeManager)**:
   ```bash
   start-yarn.sh  |  stop-yarn.sh
   ```
1.3 **Verificando serviços**:
   ```bash
   jps
   ```
<br> <br> 

---

<br> 

## 2. Criando Pasta/Diretório com o nome de `media_facebook` para o Laboratório no HDFS

<br>

```bash
    hdfs dfs -mkdir /media_facebook
```

<br>

2.1 **Lista os arquivos e diretórios no HDFS raiz**.

<br>

```bash
   hdfs dfs -ls /
```

<br> <br> 

---

<br> 

## 3. Copiando o Dataset do `Sistema Operacional Local` para dentro do `HDFS`:

<br>

**Para este laboratório precisaremos do arquivo `amigos_facebook.csv`**.

Abrir pasta onde está o arquivo a ser copiado, abrir um terminal e digitar o comando abaixo:

<br>

```bash
    hdfs dfs -put amigos_facebook.csv /media_facebook
```

<br> <br> 

---

<br> 

## 4. Criando Arquivo Python com o Job para o MapReduce

<br>

#### Código do Arquivo `AmigosIdade.py`

---

```python
from mrjob.job import MRJob

class MRAmigosPorIdade(MRJob):

    def mapper(self, _, line):
        (ID, nome, idade, numAmigos) = line.split(',')
        yield idade, float(numAmigos)

    def reducer(self, idade, numAmigos):
        total = 0
        numElementos = 0
        for x in numAmigos:
            total += x
            numElementos += 1
            
        yield idade, total / numElementos


if __name__ == '__main__':
    MRAmigosPorIdade.run()
```

---

<br>

### 4.1 Explicando Código do Job MapReduce (`AmigosIdade.py`)

<br>

O código **AmigosIdade.py** implementa um **job MapReduce** para calcular a média de amigos no Facebook por faixa etária. Ele é composto de duas partes principais: **mapper** e **reducer**. Aqui está uma explicação detalhada de cada componente:

<br>

#### 1. Mapper:

---

```python
def mapper(self, _, line):
    (ID, nome, idade, numAmigos) = line.split(',')
    yield idade, float(numAmigos)
```

---

A função `mapper` recebe cada linha do dataset, que contém informações separadas por vírgula (ID, nome, idade e número de amigos).

A função então:

- **Divide a linha** em suas respectivas colunas: `ID`, `nome`, `idade` e `numAmigos`.
- **Emite a idade como chave** e o número de amigos como valor para o próximo estágio do processo (reduce).

<br>

#### 2. Reducer:

---

```python
def reducer(self, idade, numAmigos):
    total = 0
    numElementos = 0
    for x in numAmigos:
        total += x
        numElementos += 1
        
    yield idade, total / numElementos

```

---

A função `reducer` recebe todas as entradas agrupadas pela mesma chave (idade), ou seja, para cada faixa etária, ela obtém uma lista de valores (número de amigos). A função então:

- **Soma todos os valores** (número de amigos) e conta o número de elementos.
- **Calcula a média** dividindo o total pelo número de entradas.
- **Emite a idade** e a média de amigos como o resultado final.

<br>

### 4.2 Resumo

- O **mapper** separa os dados por linha e emite a idade como chave e o número de amigos como valor.
- O **reducer** calcula a média de amigos por idade, somando os valores e dividindo pelo número de elementos.
- O resultado final é a **média de amigos para cada idade** no dataset.



<br> <br>

---

<br> 

## 5. Aplicando o job MapReduce

<br>

No contexto do nosso laboratório, usaremos o arquivo Python `AmigosIdade.py` para aplicar o **job MapReduce**. Para isso basta ir ao diretório do arquivo AmigosIdade.py e no terminal digitar:

<br>

---

```bash
    python AmigosIdade.py hdfs:///media_facebook/amigos_facebook.csv -r hadoop
```

---

#### 5.1 Explicando a Saída no Terminal (Indicadores de Execução Bem-Sucedida)

<br>

<i>**Job Completed Successfully**</i>: Quando o job é finalizado sem erros, você verá uma linha de confirmação, indicando que o processamento foi concluído com sucesso.

<br>

<i>**Resultado Final Exibido**</i>: Após a execução bem-sucedida do job, o resultado é extraído do **HDFS** e apresentado. No nosso exemplo, vemos a quantidade de filmes que receberam cada tipo de nota:

<br>

```bash
"18"	343.375
"19"	213.27272727272728
"20"	165.0
...

```

<br>

**Isso significa que**: Usuários de 18 anos têm, em média, 343.375 amigos no Facebook, usuários de 19 anos têm, em média, 213.27 amigos no Facebook, usuários de 20 anos têm, em média, 165 amigos no Facebook.

E assim por diante para cada idade, conforme o resultado exibido. O job calculou com sucesso a média de amigos por idade usando o algoritmo MapReduce.


<br><br><br><br><br>

---

<br><br><br><br><br> 

# <center>Laboratório 3 (Data Mining com MapReduce em Dados Não Estruturados)</center>


### O que é Data Mining ?

**Data Mining (ou mineração de dados)** é o processo de explorar grandes conjuntos de dados para descobrir padrões ocultos, tendências e informações úteis. Ele envolve o uso de técnicas de análise de dados, como estatísticas, aprendizado de máquina e visualização de dados, para extrair conhecimento e tomar decisões baseadas em dados.

<br><br>

---

<br>

# Objetivo do Laboratório 3

<br>

Realizar **Data Mining** em um **dataset de texto não estruturado**, especificamente o livro **Orgulho e Preconceito** de **Jane Austen**. Usaremos **MapReduce** para realizar **três diferentes jobs** de análise no texto, cada um com suas regras, buscando explorar e extrair informações a partir desse material.

O **objetivo** deste laboratório é entender como aplicar técnicas de mineração de dados em dados não estruturados (texto) e explorar informações relevantes, como a frequência de palavras, identificação de palavras únicas e análise de frases.

<br>

---

# Perguntas de Negócio

<br>

> **Quais são as palavras mais frequentes no livro "Orgulho e Preconceito" de Jane Austen e quantas vezes cada uma aparece?**

> **Quantas vezes cada palavra aparece no texto, considerando regras de tokenização mais precisas?**

> **Quais são as palavras mais frequentes no texto, organizadas em ordem decrescente de frequência?**

<br>

Estas questões

<br>

---

# Sobre o Dataset

<br>

Para este laboratório, utilizaremos o livro **Orgulho e Preconceito** de **Jane Austen**. O dataset é um arquivo de texto simples que contém o conteúdo completo do livro. O objetivo é realizar mineração de dados sobre o conteúdo textual utilizando jobs de MapReduce.
  
<br><br>

---

<br><br><br>

# <center><u>Iniciando o Laboratório 3</u></center>

<br><br><br>

## 1. Iniciando os Serviços

<br>

1.1 **Iniciar o HDFS (NameNode, DataNode, SecondaryNameNode)**:
   ```bash
   start-dfs.sh  |  stop-dfs.sh
   ```
1.2 **Iniciar o YARN (ResourceManager, NodeManager)**:
   ```bash
   start-yarn.sh  |  stop-yarn.sh
   ```
1.3 **Verificando serviços**:
   ```bash
   jps
   ```
<br> <br> 

---

<br> 

## 2. Criando Pasta/Diretório com o nome de `data_mining` para o Laboratório no HDFS

<br>

```bash
    hdfs dfs -mkdir /data_mining
```

<br>

2.1 **Lista os arquivos e diretórios no HDFS raiz**.

<br>

```bash
   hdfs dfs -ls /
```

<br> <br> 

---

<br> 

## 3. Copiando o Dataset do `Sistema Operacional Local` para dentro do `HDFS`:

<br>

**Para este laboratório precisaremos do arquivo `OrgulhoePreconceito.txt`**.

Abrir pasta onde está o arquivo a ser copiado, abrir um terminal e digitar o comando abaixo:

<br>

```bash
    hdfs dfs -put OrgulhoePreconceito.txt /data_mining
```

<br> <br> 

---

<br> <br> <br> <br> 

## 4. Criando Arquivo Python com o Job para o MapReduce 1 (`MR-DataMining-1.py`)

<br>

#### Código do Arquivo `MR-DataMining-1.py`

---

```python
from mrjob.job import MRJob


class MRDataMining(MRJob):

    def mapper(self, _, line):
        palavras = line.split()
        for palavra in palavras:
            yield palavra.lower(), 1

    def reducer(self, key, values):
        yield key, sum(values)


if __name__ == '__main__':
    MRDataMining.run()
```

---

<br>

### 4.1 Explicando Código do Job MapReduce 1

<br>

O código **MR-DataMining-1.py** implementa um **job MapReduce** para contar a frequência de cada palavra em um texto

<br>

#### 1. Mapper:

---

```python
def mapper(self, _, line):
        palavras = line.split()
        for palavra in palavras:
            yield palavra.lower(), 1
```

---

A função `mapper`:

- A função `mapper` lê o arquivo de entrada linha por linha.
- Cada linha é dividida em palavras usando o método `split()`.
- Em seguida, o código transforma cada palavra em minúsculas (`lower()`) para garantir que "Palavra" e "palavra" sejam tratadas como a mesma palavra.
- Para cada palavra encontrada, o `mapper` emite um par chave-valor onde a **palavra é a chave e o número **1** é o valor, indicando que a palavra apareceu uma vez.

- 

<br>

#### 2. Reducer:

---

```python
    def reducer(self, key, values):
        yield key, sum(values)
```

---

A função `reducer`:

- A função `reducer` recebe todas as ocorrências da mesma palavra (agrupadas pela chave, que é a palavra).
- Em seguida, a função soma todos os valores (o número de ocorrências da palavra) e emite a palavra junto com a soma total.

- 

<br>

### 4.2 Resumo

- Este **job MapReduce** lê um texto, divide-o em palavras, conta quantas vezes cada palavra aparece, e retorna a contagem total para cada palavra. O uso de `lower()` garante que as contagens não diferenciem maiúsculas de minúsculas, tratando "Palavra" e "palavra" como a mesma.



<br> <br>

---

<br> 

## 5. Aplicando o job MapReduce 1

<br>

No contexto do nosso laboratório, usaremos o arquivo Python `MR-DataMining-1.py` para aplicar o **job MapReduce**. Para isso basta ir ao diretório do arquivo e no terminal digitar:

<br>

---

```bash
    python MR-DataMining-1.py hdfs:///data_mining/OrgulhoePreconceito.txt -r hadoop
```

---

#### 5.1 Explicando a Saída no Terminal (Indicadores de Execução Bem-Sucedida)

<br>

<i>**Job Completed Successfully**</i>: Quando o job é finalizado sem erros, você verá uma linha de confirmação, indicando que o processamento foi concluído com sucesso.

<br>

<i>**Resultado Final Exibido**</i>: Após a execução bem-sucedida do job, o resultado é extraído do **HDFS** e apresentado.

<br>

```bash
"your"	426
"yours"	2
"yours,"	4
"yours,\""	2
"yours."	3
"yours.\""	3
"yourself"	28
"yourself,"	9
```

<br>

**Isso significa que**: Nosso job MapReduce conseguiu detectar a quantidade de palavras de forma bem simples, **isso signfica** que ele **não consegue** diferencias `yours` de `yours,` que possuí `,` por exemplo. Precisaremos cuidar disso. É o que faremos no passo 6.



<br><br><br><br>

---



## 6. Criando Arquivo Python com o Job para o MapReduce 2 (`MR-DataMining-2.py`)

<br>

#### Código do Arquivo `MR-DataMining-2.py`

---

```python
from mrjob.job import MRJob
import re

REGEXP_PALAVRA = re.compile(r"[\w']+")

class MRDataMining(MRJob):

    def mapper(self, _, line):
        palavras = REGEXP_PALAVRA.findall(line)
        for palavra in palavras:
            yield palavra.lower(), 1

    def reducer(self, key, values):
        yield key, sum(values)

if __name__ == '__main__':
    MRDataMining.run()
```

---

<br>

### 6.1 Explicando Código do Job MapReduce 2

<br>

O código **MR-DataMining-2.py** é uma versão aprimorada do primeiro job, onde aplicamos uma **tokenização mais precisa** (filtrando `pontuações` ao lado de palavras) para identificar as palavras de forma correta no texto, removendo pontuações como vírgulas e pontos finais.
<br>

#### 1. Mapper:

---

```python
def mapper(self, _, line):
        palavras = REGEXP_PALAVRA.findall(line)
        for palavra in palavras:
            yield palavra.lower(), 1

```

---

A função `mapper`:

- A função `mapper` lê o arquivo de entrada linha por linha, mas, em vez de usar `split()` como no job anterior, aqui usamos uma **expressão regular** (definida pela variável `REGEXP_PALAVRA`) para extrair apenas palavras, ignorando pontuações, como vírgulas, pontos e outros caracteres especiais.
- Cada palavra extraída é convertida para minúscula com `lower()` e emitida com o valor `1`, indicando que a palavra apareceu uma vez.

<br>

#### 2. Reducer:

---

```python
def reducer(self, key, values):
        yield key, sum(values)
```

---

A função `reducer`:

- A função `reducer` recebe todas as ocorrências de cada palavra (agrupadas pela chave, que é a palavra) e soma os valores para determinar o número total de vezes que a palavra apareceu no texto.
- Emite a palavra e sua respectiva contagem total.

<br>

### 6.2 Resumo

- Este **job MapReduce** melhora a tokenização ao usar expressões regulares para **limpar o texto**, removendo pontuações e outros caracteres especiais.
- O `mapper` divide o texto em palavras de forma mais precisa, enquanto o `reducer` conta o número total de vezes que cada palavra aparece.
- Esta abordagem corrige o problema observado no **Job 1**, onde palavras como `"yours"` e `"yours,"` eram tratadas como diferentes devido à pontuaçã



<br> <br>

---

<br> 

## 7. Aplicando o job MapReduce 2

<br>

No contexto do nosso laboratório, usaremos o arquivo Python `MR-DataMining-2.py` para aplicar o **job MapReduce**. Para isso basta ir ao diretório do arquivo e no terminal digitar:

<br>

---

```bash
    python MR-DataMining-2.py hdfs:///data_mining/OrgulhoePreconceito.txt -r hadoop
```

---

#### 7.1 Explicando a Saída no Terminal (Indicadores de Execução Bem-Sucedida)

<br>

<i>**Job Completed Successfully**</i>: Quando o job é finalizado sem erros, você verá uma linha de confirmação, indicando que o processamento foi concluído com sucesso.

<br>

<i>**Resultado Final Exibido**</i>: Após a execução bem-sucedida do job, o resultado é extraído do **HDFS** e apresentado.

<br>

```bash
"youngest"	13
"your"	448
"yours"	21
"yourself"	50

```

<br>

**Isso significa que**: Palavras e a quantidade de vezes que foram usadas no livro.


<br><br><br><br> 

---



## 8. Criando Arquivo Python com o Job para o MapReduce 3 (`MR-DataMining-3.py`)

<br>

#### Código do Arquivo `MR-DataMining-3.py`

---

```python
from mrjob.job import MRJob
from mrjob.step import MRStep
import re

REGEXP_PALAVRA = re.compile(r"[\w']+")

class MRDataMining(MRJob):

    def steps(self):
        return [
            MRStep(mapper = self.mapper_get_words, reducer = self.reducer_count_words),
            MRStep(mapper = self.mapper_make_counts_key, reducer = self.reducer_output_words)
        ]

    def mapper_get_words(self, _, line):
        palavras = REGEXP_PALAVRA.findall(line)
        for palavra in palavras:
            yield palavra.lower(), 1

    def reducer_count_words(self, palavra, values):
        yield palavra, sum(values)

    def mapper_make_counts_key(self, palavra, count):
        yield '%04d'%int(count), palavra

    def reducer_output_words(self, count, palavras):
        for palavra in palavras:
            yield count, palavra


if __name__ == '__main__':
    MRDataMining.run()
```

---

<br>

### 8.1 Explicando Código do Job MapReduce 3

<br>

O código **MR-DataMining-3.py** implementa um **job MapReduce** *mais complexo*, que envolve dois passos (steps). Primeiro, ele conta o número de ocorrências de cada palavra, e, em seguida, organiza essas palavras em **ordem decrescente de frequência**. Este job utiliza **duas fases de mapeamento e redução**.

<br>

#### 0. Steps: 

---

```python
def steps(self):
    return [
        MRStep(mapper=self.mapper_get_words, reducer=self.reducer_count_words),
        MRStep(mapper=self.mapper_make_counts_key, reducer=self.reducer_output_words)
    ]
```

---

- A função `steps` no código **MR-DataMining-3.py** é usada para **definir múltiplos passos (ou fases) no job de MapReduce**. Ela organiza a sequência de mapeamento e redução que o job irá executar, permitindo que o código tenha mais de uma fase de **MapReduce**.

<br>

#### 1. Mapper1: `mapper_get_words`

---

```python
def mapper_get_words(self, _, line):
        palavras = REGEXP_PALAVRA.findall(line)
        for palavra in palavras:
            yield palavra.lower(), 1
```

---

A função `mapper_get_words`:

- Funciona de maneira similar ao **Job 2**, utilizando a expressão regular para identificar palavras no texto, removendo pontuações.
- Para cada palavra encontrada, emite a palavra em minúsculas com o valor 1, indicando que a palavra apareceu uma vez.

<br>

#### 2. Reducer1: `reducer_count_words`

---

```python
def reducer_count_words(self, palavra, values):
        yield palavra, sum(values)

```

---

A função `reducer_count_words`: 

- Soma todas as ocorrências de cada palavra (agrupadas pela palavra).
- Emite a palavra e o total de vezes que ela apareceu no texto.

<br>

#### 3. Maper2: `mapper_make_counts_key`

---

```python
def mapper_make_counts_key(self, palavra, count):
        yield '%04d' % int(count), palavra
```

---

A função `mapper_make_counts_key`: 

- Recebe as palavras e suas contagens do primeiro mapeamento e inverte a relação chave-valor.
- Aqui, a **contagem** passa a ser a chave, formatada como uma string de quatro dígitos (`'%04d' % int(count)`), enquanto a **palavra** se torna o valor.
- O objetivo é permitir que a segunda fase de redução ordene as palavras com base em suas contagens.

<br>

#### 4. Reducer2: `reducer_output_words`

---

```python
def reducer_output_words(self, count, palavras):
        for palavra in palavras:
            yield count, palavra
```

---

A função `reducer_output_words`: 

- Recebe as contagens e as palavras agrupadas por essas contagens.
- Emite a contagem e as palavras associadas a ela, garantindo que o resultado final esteja ordenado pela frequência das palavras.


<br>

### 8.2 Resumo

- Este **job MapReduce** realiza **dois passos**:
  - **1. Contagem das palavras** no texto, da mesma forma que o Job 2.
  - **2. Organização das palavras por frequência**, colocando as palavras mais frequentes no topo.
- O uso de duas fases de mapeamento e redução permite contar as palavras no primeiro passo e ordenar as contagens no segundo.
- Esse job retorna as palavras organizadas em ordem decrescente de frequência, permitindo uma visão clara das palavras mais comuns no texto analisado.



<br> <br>

---

<br> 

## 9. Aplicando o job MapReduce 3

<br>

No contexto do nosso laboratório, usaremos o arquivo Python `MR-DataMining-3.py` para aplicar o **job MapReduce**. Para isso basta ir ao diretório do arquivo e no terminal digitar:

<br>

---

```bash
    python MR-DataMining-3.py hdfs:///data_mining/OrgulhoePreconceito.txt -r hadoop
```

---

#### 9.1 Explicando a Saída no Terminal (Indicadores de Execução Bem-Sucedida)

<br>

<i>**Job Completed Successfully**</i>: Quando o job é finalizado sem erros, você verá uma linha de confirmação, indicando que o processamento foi concluído com sucesso.

<br>

<i>**Resultado Final Exibido**</i>: Após a execução bem-sucedida do job, o resultado é extraído do **HDFS** e apresentado.

<br>

```bash
"2203"	"her"
"3658"	"and"
"3729"	"of"
"4242"	"to"
"4507"	"the"
```

<br>

**Isso significa que**: Neste exemplo, temos as palavras mais comuns no texto, como: "the" foi utilizada 4507 vezes, "to" foi utilizada 4242 vezes, "of" foi utilizada 3729 vezes.


<br><br><br><br><br>

---

<br><br><br><br><br> 

# <center>Laboratório 4 (Analisando Logs de Servidores Web)</center>

<br><br>


# Objetivo do Laboratório 4

<br>

O objetivo deste laboratório é analisar os **logs de servidores web** para entender o comportamento dos usuários que acessam o site. Mais especificamente, vamos contar quantas conexões foram feitas por cada **endereço IP**, permitindo-nos identificar quais IPs estão gerando mais tráfego para o servidor.

**Esta análise é útil para** detectar padrões de acesso, monitorar a atividade do servidor e identificar possíveis comportamentos anômalos, como acessos excessivos de determinados IPs, que podem indicar ataques ou outros problemas de desempenho.

**IMPORTANTE** -> Para este laboratório iremos responder duas perguntas de negócio usando **dois métodos diferentes**:

- `MRJob`
- `Usando apenas o Hadoop Streaming`

<br>

---

# Pergunta de Negócio

<br>

> **Quais foram os 20 principais IPs que acessam o servidor?** (Esta pergunta será respondida usando o *MRJob*)

> **Quantas conexões foram feitas por cada endereço IP?** (Esta pergunta será respondida *usando apenas o Hadoop Streaming*)

Estas questões nos ajudarão a identificar os principais IPs que acessam o servidor e a quantidade de vezes que esses IPs realizaram conexões, possibilitando insights sobre o tráfego do servidor e eventuais ações necessárias.

<br>

---

# Sobre o Dataset

<br>

Para este laboratório, vamos utilizar um **conjunto de dados de logs de servidor web**, que contém informações sobre cada requisição feita ao servidor, incluindo o endereço IP do cliente, data e hora do acesso, o tipo de requisição (por exemplo, `GET` ou `POST`), o status da resposta do servidor, e o tamanho da resposta em bytes.

### Informações sobre o Dataset:

- **Tamanho do arquivo**: 504.941.532 bytes
- **Quantidade de linhas**: 4.477.843
- **Formato dos logs**: O formato é típico de logs de servidor web, contendo campos como endereço IP, data e hora do acesso, método HTTP, URI requisitado, código de resposta e tamanho da resposta.
  
<br><br>

---

<br><br><br>

# <center><u>Iniciando o Laboratório 4</u></center>

<br><br><br>

## 1. Iniciando os Serviços

<br>

1.1 **Iniciar o HDFS (NameNode, DataNode, SecondaryNameNode)**:
   ```bash
   start-dfs.sh  |  stop-dfs.sh
   ```
1.2 **Iniciar o YARN (ResourceManager, NodeManager)**:
   ```bash
   start-yarn.sh  |  stop-yarn.sh
   ```
1.3 **Verificando serviços**:
   ```bash
   jps
   ```
<br> <br> 

---

<br> 

## 2. Criando Pasta/Diretório com o nome de `logs` para o Laboratório no HDFS

<br>

```bash
    hdfs dfs -mkdir /logs
```

<br>

2.1 **Lista os arquivos e diretórios no HDFS raiz**.

<br>

```bash
   hdfs dfs -ls /
```

<br> <br> 

---

<br> 

## 3. Copiando o Dataset do `Sistema Operacional Local` para dentro do `HDFS`:

<br>

**Para este laboratório precisaremos do arquivo `web_server.log`**.

Abrir pasta onde está o arquivo a ser copiado, abrir um terminal e digitar o comando abaixo:

<br>

```bash
    hdfs dfs -put web_server.log /logs
```

<br> <br> 

---

<br> 

## 4. Criando Arquivo Python com o Job para o MapReduce 1 (Com MRJob)

<br>

#### Código do Arquivo `LogsIp.py`

---

```python
from mrjob.job import MRJob
import re                     # Importa a biblioteca regex para trabalhar com expressões regulares
from heapq import nlargest    # Importa nlargest para selecionar os 20 maiores valores

class MRAvaliaFilme(MRJob):
    
    def mapper(self, key, line):
        # Regex para extrair as partes do log
        # Cada parte do padrão regex é configurada para capturar as diferentes informações da linha de log
        log_pattern = re.compile(
            # Captura o endereço IP: \S+ significa "qualquer sequência de caracteres que não seja espaço"
            r'^(?P<ip>\S+) '           
            # Captura o Identd do cliente: também é uma sequência sem espaços (geralmente '-')
            r'(?P<identd>\S+) ' 
            # Captura o nome do usuário autenticado (geralmente '-' se não autenticado)
            r'(?P<user>\S+) '          
            # Captura o timestamp (data e hora): .*? captura qualquer coisa entre colchetes
            r'\[(?P<timestamp>.*?)\] ' 
            # Captura a requisição HTTP: qualquer coisa entre aspas (exemplo: "GET /index.html HTTP/1.1")
            r'"(?P<request>.*?)" '     
            # Captura o código de status HTTP: \d{3} significa "exatamente três dígitos" (ex: 200, 404)
            r'(?P<status>\d{3}) '    
            # Captura o tamanho do objeto retornado: \S+ captura qualquer sequência que não seja espaço (ex: 5120 bytes ou '-')
            r'(?P<size>\S+)'           
        )

        # Aplicar regex à linha
        match = log_pattern.match(line)  # 'match' tenta aplicar o padrão à linha do log e retorna um objeto match se for bem-sucedido
        
        if match:
            # Extrair o IP do objeto match
            ip = match.group('ip')  # 'group' retorna a parte capturada pelo nome do grupo (neste caso, 'ip' é o endereço IP)
            
            # Emitir o IP com valor 1, indicando uma conexão
            yield ip, 1  # O Mapper emite o endereço IP e o valor 1, que será somado no reducer

    def reducer(self, ip, occurrences):
        # Armazenar as contagens de IPs em uma lista
        self.ip_counts = getattr(self, 'ip_counts', [])
        self.ip_counts.append((ip, sum(occurrences)))

    def reducer_final(self):
        # Selecionar os 20 IPs com mais conexões
        top_20_ips = nlargest(20, self.ip_counts, key=lambda x: x[1])
        
        # Emitir os 20 maiores IPs
        for ip, total in top_20_ips:
            yield ip, total

if __name__ == '__main__':
    MRAvaliaFilme.run()  # Executa o job de MapReduce
```

---

<br>

### 4.1 Explicando Código do Job MapReduce 1 (`LogsIp.py`)

<br>

O código **LogsIp.py** implementa um **job MapReduce** que processa arquivos de log de servidor da web, com o objetivo de contar quantas vezes cada endereço IP aparece no log, o que representa o número de conexões feitas por esse IP. O job está dividido em duas partes principais: o **Mapper** e o **Reducer**, além de uma etapa de **ordenação no reducer_final para exibir apenas os 20 maiores IPs com mais conexões**.

<br>

#### 1. Mapper:

A função `mapper(self, key, line)` é responsável por ler as linhas do arquivo de log, extrair o endereço IP de cada linha usando uma expressão regular (regex) e emitir esse IP com o valor **1**, indicando que uma conexão foi feita por esse IP.

---

```python
def mapper(self, key, line):
        # Regex para extrair as partes do log
        # Cada parte do padrão regex é configurada para capturar as diferentes informações da linha de log
        log_pattern = re.compile(
            # Captura o endereço IP: \S+ significa "qualquer sequência de caracteres que não seja espaço"
            r'^(?P<ip>\S+) '           
            # Captura o Identd do cliente: também é uma sequência sem espaços (geralmente '-')
            r'(?P<identd>\S+) ' 
            # Captura o nome do usuário autenticado (geralmente '-' se não autenticado)
            r'(?P<user>\S+) '          
            # Captura o timestamp (data e hora): .*? captura qualquer coisa entre colchetes
            r'\[(?P<timestamp>.*?)\] ' 
            # Captura a requisição HTTP: qualquer coisa entre aspas (exemplo: "GET /index.html HTTP/1.1")
            r'"(?P<request>.*?)" '     
            # Captura o código de status HTTP: \d{3} significa "exatamente três dígitos" (ex: 200, 404)
            r'(?P<status>\d{3}) '    
            # Captura o tamanho do objeto retornado: \S+ captura qualquer sequência que não seja espaço (ex: 5120 bytes ou '-')
            r'(?P<size>\S+)'           
        )

        # Aplicar regex à linha
        match = log_pattern.match(line)  # 'match' tenta aplicar o padrão à linha do log e retorna um objeto match se for bem-sucedido
        
        if match:
            # Extrair o IP do objeto match
            ip = match.group('ip')  # 'group' retorna a parte capturada pelo nome do grupo (neste caso, 'ip' é o endereço IP)
            
            # Emitir o IP com valor 1, indicando uma conexão
            yield ip, 1  # O Mapper emite o endereço IP e o valor 1, que será somado no reducer
```

---

A função `mapper`:

- **Recebe**: A função `mapper` recebe como parâmetros uma chave (`key`) e uma linha de log (`line`).
- **Processa**: Ela aplica uma **expressão regular** (regex) à linha de log para identificar e capturar partes específicas da linha, como o endereço IP, o timestamp, o status HTTP, entre outros.
- **Emite**: Quando a linha é válida (ou seja, corresponde ao padrão da regex), o `mapper` emite o **endereço IP** como chave e o número **1** como valor, indicando que uma conexão foi feita por aquele IP.

Isso é o que chamamos de **mapeamento** no processo de MapReduce: associar cada IP com o número 1 para que no próximo passo, o reducer possa contar quantas vezes cada IP apareceu no log.

<br>

#### 2. Reducer:

A função `reducer(self, ip, occurrences)` acumula as contagens de IPs e, em um segundo estágio, a função `reducer_final(self)` seleciona e emite os 20 IPs com o maior número de conexões.

---

```python
    def reducer(self, ip, occurrences):
        # Armazenar as contagens de IPs em uma lista
        self.ip_counts = getattr(self, 'ip_counts', [])
        self.ip_counts.append((ip, sum(occurrences)))

    def reducer_final(self):
        # Selecionar os 20 IPs com mais conexões
        top_20_ips = nlargest(20, self.ip_counts, key=lambda x: x[1])
        
        # Emitir os 20 maiores IPs
        for ip, total in top_20_ips:
            yield ip, total
```

---

A função `reducer`:

- **Recebe**: O `reducer` recebe o **IP** (como chave) e uma lista de valores **1** (representando o número de vezes que o IP foi emitido pelo mapper).
- **Acumula**: Em vez de emitir o resultado imediatamente, a função `reducer` acumula as contagens de IPs em uma lista `self.ip_counts` para processamento posterior.

Isso é o que chamamos de **redução** no MapReduce: a consolidação de todas as ocorrências de um determinado IP em uma única soma.

A função `reducer_final`:

- **Processa**: No final da fase de redução, o `reducer_final` seleciona os **20 IPs** com mais conexões usando a função `nlargest` da biblioteca `heapq`, que é eficiente para encontrar os maiores valores.
- **Emite**: Por fim, o `reducer_final` emite os **20 IPs com o maior número de conexões**.

Isso é o que chamamos de **redução** e **seleção final** no MapReduce: a consolidação de todas as ocorrências de um determinado IP e a seleção dos 20 maiores IPs com base no número total de conexões.

<br>

### 4.2 Resumo

O código `LogsIp.py` implementa um job de **MapReduce** para contar o número de conexões feitas por cada **endereço IP** em um arquivo de log de servidor da web. O **mapper** extrai os IPs de cada linha do log e os associa ao valor `1`, enquanto o **reducer** acumula esses valores para calcular o número total de conexões feitas por cada IP. No final, o `reducer_final` seleciona e emite apenas os 20 maiores IPs com mais conexões.

- **Mapper**: O mapper usa uma **expressão regular (regex)** para capturar o IP de cada linha de log e emite esse IP com o valor `1` (indicando uma conexão).
- **Reduce**r: O reducer acumula todas as ocorrências de cada IP e, em uma fase final, seleciona os **20 IPs** com o maior número de conexões e os emite.

Este job pode ser *útil para analisar o tráfego de um servidor*, identificando quais IPs acessaram o servidor com mais frequência e fornecendo uma visão sobre os maiores contribuintes de tráfego.


<br> <br>

---

<br> 

## 5. Aplicando o job MapReduce 1

<br>

No contexto do nosso laboratório, usaremos o arquivo Python `LogsIp.py` para aplicar o **job MapReduce**. Para isso basta ir ao diretório do arquivo e no terminal digitar:

<br>

---

```bash
    python LogsIp.py hdfs:///logs/web_server.log -r hadoop
```

---

#### 5.1 Explicando a Saída no Terminal (Indicadores de Execução Bem-Sucedida)

<br>

<i>**Job Completed Successfully**</i>: Quando o job é finalizado sem erros, você verá uma linha de confirmação, indicando que o processamento foi concluído com sucesso.

<br>

<i>**Resultado Final Exibido**</i>: Após a execução bem-sucedida do job, o resultado é extraído do **HDFS** e apresentado. No nosso exemplo, vemos a quantidade de filmes que receberam cada tipo de nota:

<br>

```bash
"10.216.113.172"	158613
"10.220.112.1"	51942
"10.173.141.213"	47503
"10.240.144.183"	43592
"10.41.69.177"	37554
"10.169.128.121"	22516
"10.211.47.159"	20866
...
```

<br>

**Isso significa que**: *"10.216.113.172" fez 158.613 conexões* ao servidor durante o período em que o log foi capturado, sendo o IP com mais conexões. A saída **exibe os 20 IPs com o maior número de conexões**, em ordem decrescente, de acordo com o que foi especificado no nosso `reducer_final`.

<br><br><br> 



## 6. Criando Arquivos Python com o Job para o MapReduce 2 (Usando apenas o Hadoop Streaming)

<br>

Nesta seção, criamos os scripts `LogsIpMapper.py` e `LogsIpReducer.py` para realizar o processamento de logs usando o Hadoop Streaming, que é uma maneira flexível de implementar jobs MapReduce com qualquer linguagem de programação. Aqui, usaremos Python para implementar o Mapper e o Reducer para contar quantas conexões foram feitas por cada endereço IP.

<br>

#### Código do Arquivo `LogsIpMapper.py`

---

```python
#!/usr/bin/env python

import sys
import re

# Regex para extrair o endereço IP
log_pattern = re.compile(
    r'^(?P<ip>\S+) '           # Captura o endereço IP
    r'(?P<identd>\S+) '        # Ignora o Identd
    r'(?P<user>\S+) '          # Ignora o usuário
    r'\[(?P<timestamp>.*?)\] ' # Ignora o timestamp
    r'"(?P<request>.*?)" '     # Ignora a requisição
    r'(?P<status>\d{3}) '      # Ignora o status
    r'(?P<size>\S+)'           # Ignora o tamanho
)

for line in sys.stdin:
    # Aplicar regex à linha
    match = log_pattern.match(line)
    if match:
        ip = match.group('ip')
        print(ip)
```

---

<br>

#### Código do Arquivo `LogsIpReducer.py`

---

```python
#!/usr/bin/env python

import sys

current_ip_address = None
current_ip_address_count = 0

# Itera sobre cada linha recebida do Mapper
for line in sys.stdin:
    line = line.strip()
    
    # Verifica se a linha está vazia (ignora linhas vazias)
    if not line:
        continue
    
    # O Mapper emite apenas o IP, por isso espera-se uma única coluna
    try:
        new_ip_address = line
    except ValueError:
        # Se houver algum erro inesperado, continue
        continue

    # Se o IP atual mudar, imprime o IP anterior e sua contagem
    if current_ip_address and current_ip_address != new_ip_address:
        print("{0}\t{1}".format(current_ip_address, current_ip_address_count))
        current_ip_address_count = 0

    # Atualiza o IP atual e incrementa sua contagem
    current_ip_address = new_ip_address
    current_ip_address_count += 1

# Após iterar, imprime o último IP e sua contagem
if current_ip_address:
    print("{0}\t{1}".format(current_ip_address, current_ip_address_count))
```

---

<br>

### 6.1 Explicando Códigos do Job MapReduce 2 (`LogsIpMapper.py` e `LogsIpReducer`)


<br>

#### 1. LogsIpMapper:

O código `LogsIpMapper.py` implementa o **Mapper** do job MapReduce usando **Hadoop Streaming**. Ele é responsável por **extrair o endereço IP** de cada linha de log e emitir esse endereço para o Reducer.

**Explicação:**
- **Leitura de Entrada**: O Mapper lê cada linha de log via `sys.stdin` (a entrada padrão).
- **Regex para Processamento**: Utiliza uma expressão regular (`log_pattern`) para identificar e extrair o endereço IP de cada linha de log. O regex captura o primeiro campo (IP) e ignora os outros campos como `identd`, `usuário`, `timestamp`, `requisição`, `status`, e `tamanho`.
- **Emissão do IP**: Se a linha estiver no formato correto, o endereço IP é extraído e impresso, que é a saída do Mapper. Essa saída é usada como entrada no Reducer. O `print(ip)` emite o IP para o Hadoop Streaming, que será processado pelo Reducer.

<br>

#### 2. LogsIpReducer:

O código `LogsIpReducer.py` implementa o **Reducer**, responsável por **contar a frequência** com que cada IP aparece nos logs.

**Explicação:**
- **Leitura da Entrada**: O Reducer lê os endereços IP gerados pelo Mapper, via `sys.stdin`.
- **Contagem de IPs**: Ele conta quantas vezes o mesmo IP aparece consecutivamente. Cada vez que o IP muda, ele imprime o IP anterior junto com sua contagem.
- **Emissão de Resultados**: No final, o último IP e sua contagem são emitidos.

<br>

### 6.2 Resumo

Os scripts `LogsIpMapper.py` e `LogsIpReducer.py` implementam um **job MapReduce simples** utilizando o **Hadoop Streaming**. O **Mapper** lê os logs do servidor, extrai e emite os endereços IP. O **Reducer** conta o número de vezes que cada IP aparece e exibe o resultado final, mostrando quantas conexões foram feitas por cada endereço IP. Esse método é uma maneira eficiente de processar grandes arquivos de log de servidores e identificar padrões de acesso.

Esse segundo método, utilizando apenas o **Hadoop Streaming**, nos permite responder à pergunta de **quantas conexões foram feitas por cada endereço IP** de forma eficiente e escalável.


<br> <br>

---

<br> 

## 7. Aplicando o job MapReduce 2

<br>

No contexto do nosso laboratório, usaremos o arquivos Python `LogsIpMapper.py` e `LogsIpReducer` para aplicar o **job MapReduce** usando diretamente o **Hadoop Streaming**. 

O processo agora será **diferente** de quando usamos o *MRJob*. Para isso basta ir ao diretório do arquivo e no terminal digitar:

<br>

---

```bash
  hadoop jar /opt/hadoop/share/hadoop/tools/lib/hadoop-streaming-3.2.0.jar \
  -input /logs/web_server.log \
  -output /logs/output_ip_count \
  -mapper LogsIpMapper.py \
  -reducer LogsIpReducer.py \
  -file LogsIpMapper.py \
  -file LogsIpReducer.py

```

---

Esse comando faz o seguinte:

- **Hadoop Streaming Jar**: Chama o jar do Hadoop Streaming.
- **Input/Output**: Especifica o arquivo de log (`/logs/web_server.log`) como entrada e o diretório de saída (`/logs/output_ip_count`) para os resultados.
- **Mapper/Reducer**: Define o `LogsIpMapper.py` como Mapper e o `LogsIpReducer.py` como Reducer.
- **Arquivos**: Especifica os arquivos Python que serão usados no processo (`-file LogsIpMapper.py -file LogsIpReducer.py`).

<br>

#### 7.2 Verificando o Resultado

<br>

Após a execução bem-sucedida do job MapReduce, você pode verificar o resultado da contagem de IPs. Para isso, siga os passos abaixo:

<br>

1. **Listar Arquivos de Saída**: Verifique o diretório de saída no HDFS para garantir que o arquivo foi gerado corretamente:

<br>

```bash
hdfs dfs -ls /logs/output_ip_count
```

Isso deve mostrar um ou mais arquivos `part-00000` que contêm os resultados do MapReduce.

<br>

2. **Exibir o Conteúdo da Saída**: Para visualizar o conteúdo do arquivo de saída, use o comando cat no HDFS:

<br>

```bash
hdfs dfs -cat /logs/output_ip_count/part-00000
```

Esse comando irá exibir a lista de IPs e o número de vezes que cada IP apareceu nos logs. O formato da saída será algo como:

```bash
192.168.0.1   15
10.0.0.2      5
172.16.0.3    20
```

<br>

**Isso significa que**: O primeiro valor é o endereço IP e o segundo valor é o número de vezes que o IP apareceu.

<br><br><br><br><br>

---

<br><br><br><br><br> 

# <center>Laboratório 5 (Análise de Tickets de Estacionamento)</center>

<br><br>


# Objetivo do Laboratório 5

<br>

O objetivo deste laboratório é utilizar um **dataset de multas de estacionamento da cidade de Nova York** para realizar uma **análise abrangente dos padrões de infração**. A análise ajudará a **identificar as infrações mais comuns**, **os horários mais propensos à emissão de multas**, os **tipos de veículos mais infratores**, e outras métricas relevantes. Com base nos insights obtidos, será possível fazer recomendações para melhorar a regulamentação de estacionamento e reduzir as infrações.

<br>

---

# Pergunta de Negócio

<br>

> **Quais são as infrações de estacionamento mais comuns em diferentes bairros?**

- Identificar os bairros com maior número de infrações e as respectivas violações mais frequentes.

<br>

> **Em quais horários ocorrem a maioria das infrações de estacionamento?**

- Analisar os horários mais propensos à emissão de multas e sugerir mudanças nos horários de fiscalização.

<br>

> **Quais tipos de veículos (carro, SUV, caminhonete) estão mais propensos a receber multas?**

- Examinar o tipo de veículo e a frequência das infrações para entender quais tipos estão mais sujeitos às penalidades.

<br>

> **Quais são as ruas mais propensas a receber multas por estacionamento irregular?**

- Analisar as ruas com maior incidência de multas e recomendar melhorias na sinalização ou nos regulamentos.

<br>

> **Quais são códigos de rua mais propensos a receber multas por estacionamento irregular?**

- Analisar códigos de ruas com maior incidência de multas e recomendar melhorias na sinalização ou nos regulamentos.

<br>

> **Quais são as marcas de veículos mais propensas a receber multas por estacionamento irregular?**

- Examinar a marca do veículo e a frequência das infrações para entender quais tipos estão mais sujeitos às penalidades.


<br><br>

---

# Sobre o Dataset

<br>

Para este laboratório, utilizaremos o **dataset NYC Parking Tickets**, que contém dados detalhados sobre multas de estacionamento emitidas em Nova York. O dataset inclui informações sobre a localização, o tipo de infração, o tipo de veículo e outras características relevantes. A análise será focada em identificar padrões e fornecer insights práticos para a gestão de estacionamento na cidade.

#### Descrição Geral do Novo Dataset

- **Número de registros**: Milhões de registros (número exato depende da amostra utilizada).
- **Colunas**: `51 colunas` detalhando as infrações de estacionamento, tais como:
  - **Summons Number**: Número da multa.
  - **Plate ID**: Número da placa do veículo.
  - **Registration State**: Estado onde o veículo foi registrado.
  - **Issue Date**: Data da infração.
  - **Violation Code**: Código da violação cometida.
  - **Vehicle Body Type**: Tipo de carroceria do veículo.
  - **Vehicle Make**: Marca do veículo.
  - **Violation Location**: Localização da violação (bairro, rua).
  - **Violation Time**: Hora em que a infração foi registrada.
  - **Vehicle Color**, **Year**, **Meter Number**, **Latitude**, **Longitude**, entre outros.

<br>

Essa estrutura permitirá análises detalhadas sobre o comportamento das infrações de estacionamento em Nova York, bem como a possibilidade de identificar tendências relevantes para o gerenciamento de trânsito.
  
<br><br>

---

<br><br>

# <center><u>Iniciando o Laboratório 5</u></center>

<br><br><br>

## 1. Iniciando os Serviços

<br>

1.1 **Iniciar o HDFS (NameNode, DataNode, SecondaryNameNode)**:
   ```bash
   start-dfs.sh  |  stop-dfs.sh
   ```
1.2 **Iniciar o YARN (ResourceManager, NodeManager)**:
   ```bash
   start-yarn.sh  |  stop-yarn.sh
   ```
1.3 **Verificando serviços**:
   ```bash
   jps
   ```
<br> <br> 

---

<br> 

## 2. Criando Pasta/Diretório com o nome de `tickets` para o Laboratório no HDFS

<br>

```bash
    hdfs dfs -mkdir /tickets
```

<br>

2.1 **Lista os arquivos e diretórios no HDFS raiz**.

<br>

```bash
   hdfs dfs -ls /
```

<br> <br> 

---

<br> 

## 3. Copiando o Dataset do `Sistema Operacional Local` para dentro do `HDFS`:

<br>

**Para este laboratório precisaremos do arquivo `PV_2016.csv`**.

Abrir pasta onde está o arquivo a ser copiado, abrir um terminal e digitar o comando abaixo:

<br>

```bash
    hdfs dfs -put PV_2016.csv /tickets
```

<br> <br> 

---

<br> <br> <br>

## 4. <u>Criando Arquivo Python com o Job para o MapReduce 1</u>

<br>

> Quais são as infrações de estacionamento mais comuns em diferentes bairros?

<br>

#### Código do Arquivo `CodeTicket1.py`

---

```python
from mrjob.job import MRJob
from mrjob.step import MRStep

class InfracoesPorBairro(MRJob):

    def steps(self):
        return [
            MRStep(mapper=self.mapper_get_bairro_infracao,
                   reducer=self.reducer_count_infracoes),
            MRStep(reducer=self.reducer_find_max_infracoes)
        ]

    # Mapper: Extrai o bairro e o código da infração de cada linha
    def mapper_get_bairro_infracao(self, _, line):
        # Usando vírgula como delimitador (formato CSV)
        fields = line.split(',')
        
        # Ignorando o cabeçalho (garantindo que os campos são numéricos)
        if fields[0].isdigit():  
            try:
                # Extrair o bairro (Violation Location) e o código da infração (Violation Code)
                bairro = fields[14].strip()  # Posição 14 para Violation Location
                codigo_infracao = fields[5].strip()  # Posição 5 para Violation Code
                
                # Verificando se o bairro está vazio
                if not bairro:
                    return
                
                # Se o código de infração estiver ausente, defina como 'unknown'
                if not codigo_infracao:
                    codigo_infracao = 'unknown'
                
                # Emitir o bairro e o código da infração como chave, e o valor 1 para contar
                yield (bairro, codigo_infracao), 1
                
            except IndexError:
                # Caso ocorra erro de índice, a linha pode estar malformada e será ignorada
                return

    # Reducer: Soma as ocorrências de cada (bairro, tipo de infração)
    def reducer_count_infracoes(self, key, values):
        yield key[0], (sum(values), key[1])

    # Reducer final: Determina a infração mais comum por bairro
    def reducer_find_max_infracoes(self, bairro, infracoes_contadas):
        yield bairro, max(infracoes_contadas)  # max() retorna a infração mais frequente

if __name__ == '__main__':
    InfracoesPorBairro.run()

```

---

<br>

### 4.1 Explicando Código do Job MapReduce 1 (`CodeTicket1.py`)

<br>

O script `CodeTicket1` implementa um job MapReduce que busca encontrar as infrações de estacionamento mais comuns em diferentes bairros da cidade de Nova York. Ele realiza isso em dois passos principais: primeiro, ele mapeia os dados para extrair a localização do bairro e o código de infração; em seguida, agrega e reduz os dados para identificar qual infração foi mais frequente em cada bairro.

<br>

#### 1. Mapper:

---

```python
    # Mapper: Extrai o bairro e o código da infração de cada linha
    def mapper_get_bairro_infracao(self, _, line):
        # Usando vírgula como delimitador (formato CSV)
        fields = line.split(',')
        
        # Ignorando o cabeçalho (garantindo que os campos são numéricos)
        if fields[0].isdigit():  
            try:
                # Extrair o bairro (Violation Location) e o código da infração (Violation Code)
                bairro = fields[14].strip()  # Posição 14 para Violation Location
                codigo_infracao = fields[5].strip()  # Posição 5 para Violation Code
                
                # Verificando se o bairro está vazio
                if not bairro:
                    return
                
                # Se o código de infração estiver ausente, defina como 'unknown'
                if not codigo_infracao:
                    codigo_infracao = 'unknown'
                
                # Emitir o bairro e o código da infração como chave, e o valor 1 para contar
                yield (bairro, codigo_infracao), 1
                
            except IndexError:
                # Caso ocorra erro de índice, a linha pode estar malformada e será ignorada
                return
```

---

A função `mapper_get_bairro_infracao` é responsável por:

- **Dividir a linha de entrada**: A função começa dividindo cada linha de entrada usando a tabulação (`'\t'`) como delimitador para separar os campos.
    - **Ignorar o cabeçalho**: O `mapper` utiliza a verificação `if fields[0].isdigit()` para garantir que está processando apenas linhas de dados (onde o campo Summons Number é numérico) e não o cabeçalho, que deve ser ignorado.
- **Extrair os campos de interesse**: Em seguida, tenta extrair os campos `'Violation Location'` (bairro) (bairro), que está na posição `14` da linha e `'Violation Code'` (código da infração) que está na posição `5` da linha.. Se esses campos não forem encontrados ou estiverem ausentes, a linha é ignorada usando o bloco `try-except`.
- **Verificar valores nulos**:
  - Se o valor do campo **'Violation Location'** for nulo ou ausente, a linha é descartada.
  - Se o valor do campo **'Violation Code'** for nulo ou ausente, o código de infração é substituído por `'unknown'`.
- **Emitir chave-valor**: O `mapper` emite um par chave-valor onde a chave é a combinação (`bairro, código de infração`) e o valor é sempre `1`, indicando que uma infração ocorreu.

<br>

#### 2. Reducer:

---

```python
    # Reducer: Soma as ocorrências de cada (bairro, tipo de infração)
    def reducer_count_infracoes(self, key, values):
        yield key[0], (sum(values), key[1])

    # Reducer final: Determina a infração mais comum por bairro
    def reducer_find_max_infracoes(self, bairro, infracoes_contadas):
        yield bairro, max(infracoes_contadas)  # max() retorna a infração mais frequente
```

---

A função `reducer_count_infracoes` é responsável por:

- **Receber os dados do mapper**: Ela recebe a chave (`bairro, código da infração`) e os valores que representam o número de ocorrências desse par.
- **Contar as infrações**: A função soma todas as ocorrências (`sum(values)`) para cada combinação de bairro e código de infração, resultando no total de infrações desse tipo em determinado bairro.
- **Emitir chave-valor**: O `reducer` então emite o bairro (`key[0]`) como a nova chave, e como valor, emite uma tupla contendo o número total de ocorrências e o código da infração correspondente.
  - O primeiro elemento da tupla é o número total de ocorrências.
  - O segundo elemento da tupla é o código da infração correspondente (`key[1]`).

<br>

A função `reducer_find_max_infracoes` é responsável por:

- **Receber os dados do primeiro reducer**: Ela recebe o nome do bairro e uma lista de tuplas, onde cada tupla contém a contagem de infrações e o código da infração.
- **Determinar a infração mais comum**: Utiliza a função `max(infracoes_contadas)` para encontrar a tupla com o maior número de ocorrências. Isso identifica a infração mais comum em cada bairro.
- **Emitir chave-valor**: A função emite o bairro como chave e a tupla (`maior contagem, código da infração`) como valor, onde a infração mais frequente é retornada para cada bairro.

<br>

### 4.2 Resumo

- **Pipeline de duas etapas**:
  - **Primeira etapa (mapper e reducer)**: O mapeamento agrupa as infrações por bairro e tipo, e o primeiro `reducer` soma as ocorrências de cada tipo de infração em cada bairro.
  - **Segunda etapa (reducer final)**: O `reducer` final processa as infrações agregadas para encontrar a mais comum em cada bairro.
- **Funcionamento geral**:
  - O `mapper` extrai os bairros e os códigos das infrações, agrupando as ocorrências.
  - O primeiro `reducer` conta o número de infrações para cada combinação de bairro e código de infração.
  - O segundo `reducer` identifica a infração mais comum em cada bairro, retornando o código e a contagem dessa infração.

<br> <br>

---

<br> 

## 5. Aplicando o job MapReduce 1

<br>

No contexto do nosso laboratório, usaremos o arquivo Python `CodeTicket1.py` para aplicar o **job MapReduce**. Para isso basta ir ao diretório do arquivo e no terminal digitar:

<br>

---

```bash
    python CodeTicket1.py hdfs:///tickets/PV_2016.csv -r hadoop
```

---

#### 5.1 Explicando a Saída no Terminal (Indicadores de Execução Bem-Sucedida)

<br>

<i>**Job Completed Successfully**</i>: Quando o job é finalizado sem erros, você verá uma linha de confirmação, indicando que o processamento foi concluído com sucesso.

<br>

<i>**Resultado Final Exibido**</i>: Após a execução bem-sucedida do job, o resultado é extraído do **HDFS** e apresentado. No nosso exemplo, vemos a quantidade de filmes que receberam cada tipo de nota:

<br>

```bash
"0"	[1253511, "36"]
"1"	[72056, "14"]
"10"	[26544, "14"]
"100"	[5533, "21"]
"101"	[6391, "21"]
...
```

<br>

**Isso significa que**: 

- Cada linha exibe o bairro (por exemplo, `"0"` ou `"1"`) e uma tupla com o número total de infrações associadas àquele bairro (por exemplo, `1253511 infrações` para o bairro `"0"`).
- O segundo elemento da tupla é o código da infração mais comum naquele bairro (por exemplo, o código `"36"` para o bairro `"0"`).

Esses resultados indicam que o MapReduce identificou corretamente a infração mais comum em cada bairro e contou o número total de infrações para cada combinação de bairro e infração.




<br><br><br><br><br>

## 6. <u>Criando Arquivo Python com o Job para o MapReduce 2</u>

<br>

> Em quais horários ocorrem a maioria das infrações de estacionamento?

<br>

#### Código do Arquivo `CodeTicket2.py`

---

```python
from mrjob.job import MRJob
from mrjob.step import MRStep

class InfracoesPorTempo(MRJob):

    def steps(self):
        return [
            MRStep(mapper=self.mapper_get_tempo_infracao,
                   reducer=self.reducer_count_tempo),
            MRStep(reducer=self.reducer_top_30_tempos)
        ]

    # Mapper: Extrai o tempo da infração de cada linha
    def mapper_get_tempo_infracao(self, _, line):
        fields = line.split(',')
        
        if fields[0].isdigit():
            try:
                # Extrair o tempo (Violation Time)
                tempo = fields[19].strip()  # Posição 19 para Violation Time
                
                if not tempo:
                    tempo = 'unknown'
                
                yield tempo, 1
                
            except IndexError:
                return

    # Reducer: Soma as ocorrências de cada tempo
    def reducer_count_tempo(self, key, values):
        yield None, (sum(values), key)

    # Reducer final: Seleciona os 30 tempos com mais ocorrências
    def reducer_top_30_tempos(self, _, tempo_contagem):
        # Ordena pela contagem (primeiro valor na tupla) e seleciona os 30 maiores
        sorted_tempos = sorted(tempo_contagem, reverse=True, key=lambda x: x[0])
        for count, tempo in sorted_tempos[:30]:
            yield tempo, count

if __name__ == '__main__':
    InfracoesPorTempo.run()
```

---

<br>

### 6.1 Explicando Código do Job MapReduce 2 (`CodeTicket2.py`)

<br>

O script `CodeTicket2` foi criado para responder à pergunta: **Em quais horários ocorrem a maioria das infrações de estacionamento?**. Ele processa os dados das infrações e seleciona os 30 horários com o maior número de ocorrências, exibindo os resultados em ordem decrescente.

<br>

#### 1. Mapper:

---

```python
    # Mapper: Extrai o tempo da infração de cada linha
    def mapper_get_tempo_infracao(self, _, line):
        fields = line.split(',')
        
        if fields[0].isdigit():
            try:
                # Extrair o tempo (Violation Time)
                tempo = fields[19].strip()  # Posição 19 para Violation Time
                
                if not tempo:
                    tempo = 'unknown'
                
                yield tempo, 1
                
            except IndexError:
                return
```

---

A função `mapper_get_tempo_infracao`:

- **Dividir a linha de entrada**: A função divide a linha de entrada em campos separados por vírgula.
- **Verificar se não é o cabeçalho**: A função verifica se o primeiro campo da linha é numérico, descartando o cabeçalho.
- **Extrair o tempo de infração**: A função extrai o valor da coluna de **Violation Time** (posição 19).
- **Verificar e corrigir valores ausentes**: Se o tempo estiver ausente, ele é marcado como 'unknown'.
- **Emitir chave-valor**: A chave é o tempo da infração, e o valor é `1`, indicando uma ocorrência. 

<br>

#### 2. Reducer:

---

```python
    # Reducer: Soma as ocorrências de cada tempo
    def reducer_count_tempo(self, key, values):
        yield None, (sum(values), key)

    # Reducer final: Seleciona os 30 tempos com mais ocorrências
    def reducer_top_30_tempos(self, _, tempo_contagem):
        # Ordena pela contagem (primeiro valor na tupla) e seleciona os 30 maiores
        sorted_tempos = sorted(tempo_contagem, reverse=True, key=lambda x: x[0])
        for count, tempo in sorted_tempos[:30]:
            yield tempo, count
```

---

A função `reducer_count_tempo` é responsável por:

- **Receber o tempo da infração**: Ele recebe o tempo como chave e as ocorrências como valores.
- **Somar as ocorrências**: A função soma as ocorrências para cada tempo.
- **Emitir chave-valor**: Emite um par onde a chave é `None` (para agrupar os valores no próximo passo) e o valor é uma tupla contendo o número total de ocorrências e o tempo da infração. 
  - **Por que usar None?** O `None` faz com que todos os pares (`contagem, tempo`) sejam enviados para o **próximo reducer** como uma única lista, permitindo que o segundo reducer tenha acesso a todas as ocorrências de tempos para fazer a ordenação.

<br>

A função `reducer_top_30_tempos` é responsável por:

- **Ordenar e selecionar**: Ordena os tempos de infração pela contagem de ocorrências e seleciona os 30 maiores.
- **Emitir os resultados**: Exibe os 30 tempos com mais infrações, em ordem decrescente.

<br>

### 6.2 Resumo

- O **mapper** extrai o horário das infrações.
- O primeiro **reducer** soma o número de infrações em cada horário.
- O segundo **reducer** ordena e seleciona os 30 horários com mais ocorrências.


<br> <br>

---

<br> 

## 7. Aplicando o job MapReduce 2

<br>

No contexto do nosso laboratório, usaremos o arquivo Python `CodeTicket2.py` para aplicar o **job MapReduce**. Para isso basta ir ao diretório do arquivo e no terminal digitar:

<br>

---

```bash
    python CodeTicket2.py hdfs:///tickets/PV_2016.csv -r hadoop
```

---

#### 7.1 Explicando a Saída no Terminal (Indicadores de Execução Bem-Sucedida)

<br>

<i>**Job Completed Successfully**</i>: Quando o job é finalizado sem erros, você verá uma linha de confirmação, indicando que o processamento foi concluído com sucesso.

<br>

<i>**Resultado Final Exibido**</i>: Após a execução bem-sucedida do job, o resultado é extraído do **HDFS** e apresentado. No nosso exemplo, vemos a quantidade de filmes que receberam cada tipo de nota:

<br>

```bash
"0836A"	30688
"1136A"	30288
"1140A"	28297
"0936A"	26267
"0840A"	25891
"1138A"	25674
"0906A"	25633
"0940A"	25485
"1145A"	25461
"1139A"	25388
"1137A"	24926
"0806A"	24790
"1142A"	24640
"1141A"	24302
"0945A"	24101
"0838A"	24037
"1143A"	23657
"1150A"	23597
"0845A"	23573
"0839A"	23551
```

<br>

**Isso significa que**: O job identificou com sucesso os horários com maior número de infrações de estacionamento, e ordenou os 30 horários mais comuns, exibindo-os do mais alto para o mais baixo em termos de ocorrência. 




<br><br><br><br><br>

## 8. <u>Criando Arquivo Python com o Job para o MapReduce 3</u>

<br>

> Quais tipos de veículos (carro, SUV, caminhonete) estão mais propensos a receber multas?

<br>

#### Código do Arquivo `CodeTicket3.py`

---

```python
from mrjob.job import MRJob
from mrjob.step import MRStep

class InfracoesPorTipoVeiculo(MRJob):

    def steps(self):
        return [
            MRStep(mapper=self.mapper_get_veiculo_infracao,
                   reducer=self.reducer_count_infracoes),
            MRStep(reducer=self.reducer_top_veiculos)
        ]

    # Mapper: Extrai o tipo de veículo de cada linha
    def mapper_get_veiculo_infracao(self, _, line):
        fields = line.split(',')
        
        if fields[0].isdigit():
            try:
                # Extrair o tipo de veículo (Vehicle Body Type)
                tipo_veiculo = fields[6].strip().upper() if fields[6].strip() else "unknown"  # Posição 6 para Vehicle Body Type
                
                # Emitir o tipo de veículo (substitui por 'unknown' se estiver ausente/nulo)
                yield tipo_veiculo, 1
                
            except IndexError:
                return

    # Reducer: Soma as ocorrências de cada tipo de veículo
    def reducer_count_infracoes(self, key, values):
        yield None, (sum(values), key)

    # Reducer final: Seleciona os tipos de veículos com mais infrações
    def reducer_top_veiculos(self, _, tipo_veiculo_contagem):
        # Ordena pela contagem e exibe todos os tipos de veículos relevantes
        sorted_veiculos = sorted(tipo_veiculo_contagem, reverse=True, key=lambda x: x[0])
        for count, tipo_veiculo in sorted_veiculos:
            yield tipo_veiculo, count

if __name__ == '__main__':
    InfracoesPorTipoVeiculo.run()

```

---

<br>

### 8.1 Explicando Código do Job MapReduce 3 (`CodeTicket3.py`)

<br>

O script `CodeTicket3` é um job MapReduce que identifica quais tipos de veículos (carro, SUV, caminhonete, etc.) estão mais propensos a receber multas de estacionamento com base nos dados da coluna `Vehicle Body Type`. Ele realiza essa análise em duas etapas: um mapeamento inicial para contar as ocorrências de cada tipo de veículo e, em seguida, um segundo estágio que ordena os tipos de veículos por número de multas.

<br>

#### 1. Mapper:

---

```python
 # Mapper: Extrai o tipo de veículo de cada linha
    def mapper_get_veiculo_infracao(self, _, line):
        fields = line.split(',')
        
        if fields[0].isdigit():
            try:
                # Extrair o tipo de veículo (Vehicle Body Type)
                tipo_veiculo = fields[6].strip().upper() if fields[6].strip() else "unknown"  # Posição 6 para Vehicle Body Type
                
                # Emitir o tipo de veículo (substitui por 'unknown' se estiver ausente/nulo)
                yield tipo_veiculo, 1
                
            except IndexError:
                return
```

---

A função `mapper_get_veiculo_infracao`

- **Processa cada linha**: Ela divide as linhas do dataset por vírgulas e extrai o campo `Vehicle Body Type` (posição 6) para identificar o tipo de veículo.
- **Verifica valores ausentes**: Se o campo estiver ausente ou for nulo, o tipo de veículo será substituído por `"unknown"`.
- **Emite o tipo de veículo**: O Mapper gera uma chave-valor, onde a chave é o tipo de veículo e o valor é `1`, indicando que esse tipo de veículo recebeu uma multa.

<br>

#### 2. Reducer:

---

```python
    # Reducer: Soma as ocorrências de cada tipo de veículo
    def reducer_count_infracoes(self, key, values):
        yield None, (sum(values), key)

    # Reducer final: Seleciona os tipos de veículos com mais infrações
    def reducer_top_veiculos(self, _, tipo_veiculo_contagem):
        # Ordena pela contagem e exibe todos os tipos de veículos relevantes
        sorted_veiculos = sorted(tipo_veiculo_contagem, reverse=True, key=lambda x: x[0])
        for count, tipo_veiculo in sorted_veiculos:
            yield tipo_veiculo, count
```

---

A função `reducer_count_infracoes`:

- **Soma as ocorrências**: Ela recebe os tipos de veículos e soma o número de ocorrências para cada tipo.
- **Emite uma lista de ocorrências**: O Reducer emite uma tupla, onde o primeiro valor é a contagem total de multas e o segundo valor é o tipo de veículo.

<br>

A função `reducer_top_veiculos`:

- **Ordena os veículos**: A função ordena os veículos pelo número de infrações, do maior para o menor.
- **Emite o resultado**: O Reducer final emite os tipos de veículos ordenados pela quantidade de multas.

<br>

### 8.2 Resumo

- **Mapper**: Extrai o tipo de veículo da coluna `Vehicle Body Type` e emite esse valor com o número de ocorrências.
- **Reducer**: Soma as ocorrências para cada tipo de veículo e ordena os resultados, apresentando os tipos de veículos mais comuns entre as multas.



<br> <br>

---

<br> 

## 9. Aplicando o job MapReduce 3

<br>

No contexto do nosso laboratório, usaremos o arquivo Python `CodeTicket3.py` para aplicar o **job MapReduce**. Para isso basta ir ao diretório do arquivo AmigosIdade.py e no terminal digitar:

<br>

---

```bash
    python CodeTicket3.py hdfs:///tickets/PV_2016.csv -r hadoop
```

---

#### 9.1 Explicando a Saída no Terminal (Indicadores de Execução Bem-Sucedida)

<br>

<i>**Job Completed Successfully**</i>: Quando o job é finalizado sem erros, você verá uma linha de confirmação, indicando que o processamento foi concluído com sucesso.

<br>

<i>**Resultado Final Exibido**</i>: Após a execução bem-sucedida do job, o resultado é extraído do **HDFS** e apresentado. No nosso exemplo, vemos a quantidade de filmes que receberam cada tipo de nota:

<br>

```bash
"SUBN"	3466020
"4DSD"	2992093
"VAN"	1518294
"DELV"	755274
"SDN"	424043
"2DSD"	276455
"PICK"	264271
"REFG"	84019
...

```

<br>

**Isso significa que**: 

- O tipo de veículo `SUBN` (SUV) recebeu 3.466.020 multas, seguido por `4DSD` (sedan de 4 portas), que recebeu 2.992.093 multas.
- Os veículos `VAN`, `DELV` (entrega), e `SDN` (sedan) também aparecem frequentemente na lista de veículos multados. 








<br><br><br><br><br>

## 10. <u>Criando Arquivo Python com o Job para o MapReduce 4</u>

<br>

> Quais são as marcas de veículos mais propensas a receber multas por estacionamento irregular?

<br>

#### Código do Arquivo `CodeTicket4.py`

---

```python
from mrjob.job import MRJob
from mrjob.step import MRStep

class InfracoesPorMarcaVeiculo(MRJob):


    def steps(self):
        return [
            MRStep(mapper=self.mapper_get_marca_veiculo_infracao,
                   reducer=self.reducer_count_infracoes),
            MRStep(reducer=self.reducer_top_veiculos)
        ]

    # Mapper: Extrai a marca de veículo de cada linha
    def mapper_get_marca_veiculo_infracao(self, _, line):

    	fields = line.split(',')

    	if fields[0].isdigit():
    		try:
    			# Extrair a marca de veículo (Vehicle Make)
    			marca = fields[7].strip().upper() if fields[7].strip() else "unknown"

    			# Emitir a marca do veículo
    			yield marca, 1

    		except IndexError:
    			return

    # Reducer: Soma as ocorrências de cada marca de veículo
    def reducer_count_infracoes(self, key, values):
    	yield None, (sum(values), key)


    # Reducer final: Seleciona os tipos de veículos com mais infrações
    def reducer_top_veiculos(self, _, marca_veiculo_contagem):

    	# Ordena pela contagem e exibe todos as marcas de veiculos
    	sorted_veiculos = sorted(marca_veiculo_contagem, reverse=True, key=lambda x: x[0])

    	for count, marca in sorted_veiculos:
    		yield marca, count



if __name__ == '__main__':
    InfracoesPorMarcaVeiculo.run()
```

---

<br>

### 10.1 Explicando Código do Job MapReduce 4 (`CodeTicket4.py`)

<br>

O script `CodeTicket4` tem como objetivo identificar as marcas de veículos mais propensas a receber multas de estacionamento irregular, utilizando o formato de MapReduce. Ele segue duas etapas principais: mapeamento e redução. A seguir, detalharemos cada etapa.

<br>

#### 1. Mapper:

---

```python
    # Mapper: Extrai a marca de veículo de cada linha
    def mapper_get_marca_veiculo_infracao(self, _, line):

    	fields = line.split(',')

    	if fields[0].isdigit():
    		try:
    			# Extrair a marca de veículo (Vehicle Make)
    			marca = fields[7].strip().upper() if fields[7].strip() else "unknown"

    			# Emitir a marca do veículo
    			yield marca, 1

    		except IndexError:
    			return
```

---

A função `mapper_get_marca_veiculo_infracao`

- **Divisão da linha**: A função divide cada linha do dataset utilizando a vírgula como delimitador (`line.split(',')`).
- **Validação de campos**: Ela verifica se a linha processada é válida, assegurando que o campo `Summons Number` (posição 0) seja numérico.
- **Extração da marca**: A marca do veículo é extraída da posição 7 (`Vehicle Make`). Se o campo estiver vazio ou nulo, a marca é substituída pelo valor `"unknown"`.
- **Emitir chave-valor**: A função emite um par chave-valor, onde a chave é a marca do veículo e o valor é `1`, indicando uma ocorrência.

<br>

#### 2. Reducer:

---

```python
    # Reducer: Soma as ocorrências de cada marca de veículo
    def reducer_count_infracoes(self, key, values):
    	yield None, (sum(values), key)


    # Reducer final: Seleciona os tipos de veículos com mais infrações
    def reducer_top_veiculos(self, _, marca_veiculo_contagem):

    	# Ordena pela contagem e exibe todos as marcas de veiculos
    	sorted_veiculos = sorted(marca_veiculo_contagem, reverse=True, key=lambda x: x[0])

    	for count, marca in sorted_veiculos:
    		yield marca, count
```

---

A função `reducer_count_infracoes`:  Esta função reduz os dados somando todas as ocorrências para cada marca de veículo, agrupando-as por marca.

- **Entrada**: A marca do veículo como chave e a lista de valores (contagens individuais).
- **Saída**: Uma tupla contendo a soma das infrações e a marca do veículo.


<br>

A função `reducer_top_veiculos`: Esta função final ordena os resultados por número de infrações (da maior para a menor) e exibe as marcas com o maior número de multas.

- **Entrada**: A tupla com a contagem de infrações e a marca.
- **Ordenação**: Os resultados são ordenados pela contagem (`x[0]`), de forma decrescente.
- **Saída**: O resultado final exibe a marca e o número de multas recebidas.



<br>

### 10.2 Resumo

- Este código segue a abordagem MapReduce para processar grandes quantidades de dados de multas de estacionamento, respondendo à pergunta: **Quais são as marcas de veículos mais propensas a receber multas por estacionamento irregular?** O mapeador extrai as marcas de veículos, enquanto o reducer soma as ocorrências e organiza os resultados, mostrando as marcas que mais receberam multas.



<br> <br>

---

<br> 

## 11. Aplicando o job MapReduce 4

<br>

No contexto do nosso laboratório, usaremos o arquivo Python `CodeTicket4.py` para aplicar o **job MapReduce**. Para isso basta ir ao diretório do arquivo e no terminal digitar:

<br>

---

```bash
    python CodeTicket4.py hdfs:///tickets/PV_2016.csv -r hadoop
```

---

#### 11.1 Explicando a Saída no Terminal (Indicadores de Execução Bem-Sucedida)

<br>

<i>**Job Completed Successfully**</i>: Quando o job é finalizado sem erros, você verá uma linha de confirmação, indicando que o processamento foi concluído com sucesso.

<br>

<i>**Resultado Final Exibido**</i>: Após a execução bem-sucedida do job, o resultado é extraído do **HDFS** e apresentado. No nosso exemplo, vemos a quantidade de filmes que receberam cada tipo de nota:

<br>

```bash
"FORD"	1324765
"TOYOT"	1154783
"HONDA"	1014073
"NISSA"	834832
"CHEVR"	759659
"FRUEH"	423583
"ME/BE"	362575
"DODGE"	359201
"BMW"	353302
"JEEP"	302510
"INTER"	285073
"GMC"	282249
"HYUND"	266087
"LEXUS"	238846
"ACURA"	193039
"CHRYS"	186158
"VOLKS"	183601
"INFIN"	158301
"NS/OT"	136078
"SUBAR"	130597
"ISUZU"	123733
"AUDI"	123702
"MITSU"	108494
"MAZDA"	104430
"LINCO"	103822
"HINO"	99459
"KIA"	98040
"CADIL"	85540
"MERCU"	77175
"VOLVO"	76708
"unknown"	63578
"ROVER"	60906
"WORKH"	44654
"KENWO"	43845
"BUICK"	39685
"PETER"	37505
"MACK"	26999
"PONTI"	24083
"MINI"	23533
"PORSC"	22454
"SATUR"	22168
"SMART"	18953
"JAGUA"	14317
"UD"	14107
"WORK"	13606
"FIAT"	13228
...

```

<br>

**Isso significa que**: A marca **FORD** foi a mais multada, com **1.324.765 infrações**. As marcas **TOYOT** (Toyota) e **HONDA** também aparecem frequentemente na lista, com **1.154.783** e **1.014.073** infrações, respectivamente.

<br><br><br><br>










## 12. <u>Criando Arquivo Python com o Job para o MapReduce 5</u>

<br>

> Quais são as ruas mais propensas a receber multas por estacionamento irregular?

<br>

#### Código do Arquivo `CodeTicket5.py`

---

```python
from mrjob.job import MRJob
from mrjob.step import MRStep

class InfracoesPorNomeRua(MRJob):


    def steps(self):
        return [
            MRStep(mapper=self.mapper_get_nome_rua_infracao,
                   reducer=self.reducer_count_infracoes),
            MRStep(reducer=self.reducer_top_nomes_ruas)
        ]

    # Mapper: Extrai o nome de rua de cada linha
    def mapper_get_nome_rua_infracao(self, _, line):

    	fields = line.split(',')

    	if fields[0].isdigit():
    		try:
    			# Extrair o nome da rua (Street Name)
    			nome_rua = fields[24].strip().upper() if fields[24].strip() else "unknown"

    			# Emitir a marca do veículo
    			yield nome_rua, 1

    		except IndexError:
    			return

    # Reducer: Soma as ocorrências de cada nome de rua
    def reducer_count_infracoes(self, key, values):
    	yield None, (sum(values), key)


    # Reducer final: Seleciona os nomes de ruas com mais infrações
    def reducer_top_nomes_ruas(self, _, nome_ruas):

        # Ordena pela contagem em ordem decrescente e exibe todos os nomes de ruas
        ruas_ordenadas = sorted(nome_ruas, reverse=True, key=lambda x: x[0])
        
        for count, nome in ruas_ordenadas:
            # Exibe apenas ruas com 10000 ou mais infrações
            if count >= 1000:
                yield nome, count


if __name__ == '__main__':
    InfracoesPorNomeRua.run()
```

---

<br>

### 12.1 Explicando Código do Job MapReduce 5 (`CodeTicket5.py`)

<br>

O script `CodeTicket5` foi desenvolvido para identificar as ruas com o maior número de multas de estacionamento, utilizando o framework MapReduce para processamento de dados. Este código segue duas etapas principais: um mapeamento para extrair e contar as ocorrências de cada rua e uma redução para ordenar e selecionar as ruas com mais infrações.

<br>

#### 1. Mapper:

---

```python
    # Mapper: Extrai o nome de rua de cada linha
    def mapper_get_nome_rua_infracao(self, _, line):

    	fields = line.split(',')

    	if fields[0].isdigit():
    		try:
    			# Extrair o nome da rua (Street Name)
    			nome_rua = fields[24].strip().upper() if fields[24].strip() else "unknown"

    			# Emitir a marca do veículo
    			yield nome_rua, 1

    		except IndexError:
    			return
```

---

A função `mapper_get_nome_rua_infracao`:

- **Divisão da linha**: A função divide cada linha de entrada, utilizando a vírgula como delimitador (`line.split(',')`).
- **Validação de campos**: A linha é validada para garantir que o campo `Summons Number` (posição 0) é numérico. Isso ajuda a evitar o cabeçalho e linhas inválidas.
- **Extração do nome da rua**: Extrai o nome da rua da coluna na posição 24. Caso esteja vazio, o nome da rua é substituído por `"unknown"`.
- **Emitir chave-valor**: A função emite o nome da rua como chave e `1` como valor, representando uma ocorrência de infração.

<br>

#### 2. Reducer:

---

```python
    # Reducer: Soma as ocorrências de cada nome de rua
    def reducer_count_infracoes(self, key, values):
    	yield None, (sum(values), key)


    # Reducer final: Seleciona os nomes de ruas com mais infrações
    def reducer_top_nomes_ruas(self, _, nome_ruas):

        # Ordena pela contagem em ordem decrescente e exibe todos os nomes de ruas
        ruas_ordenadas = sorted(nome_ruas, reverse=True, key=lambda x: x[0])
        
        for count, nome in ruas_ordenadas:
            # Exibe apenas ruas com 10000 ou mais infrações
            if count >= 1000:
                yield nome, count
```

---

A função `reducer_count_infracoes`:

- **Contagem das ocorrências**: Soma todas as ocorrências de cada rua, agrupando-as por nome.
- **Saída**: Emite uma tupla com o total de infrações (soma) e o nome da rua, para cada rua identificada.

<br>

A função `reducer_top_nomes_ruas`:

- **Ordenação e filtragem**: Ordena as ruas pelo número de infrações, de forma decrescente. Em seguida, exibe apenas as ruas com pelo menos 1.000 infrações.
- **Saída**: Emite o nome da rua e o número de infrações, mostrando apenas as ruas com contagem mínima de infrações conforme o critério especificado.

<br>

### 12.2 Resumo

- O `CodeTicket5.py` utiliza a metodologia MapReduce para processar o conjunto de dados de infrações de estacionamento e identificar as ruas com maior incidência de multas. O mapeador extrai e conta as infrações por rua, enquanto o reducer organiza e filtra os resultados para exibir apenas as ruas com pelo menos 1.000 infrações, atendendo ao critério de exibição.



<br> <br>

---

<br> 

## 13. Aplicando o job MapReduce 5

<br>

No contexto do nosso laboratório, usaremos o arquivo Python `CodeTicket5.py` para aplicar o **job MapReduce**. Para isso basta ir ao diretório do arquivo e no terminal digitar:

<br>

---

```bash
    python CodeTicket5.py hdfs:///tickets/PV_2016.csv -r hadoop
```

---

#### 13.1 Explicando a Saída no Terminal (Indicadores de Execução Bem-Sucedida)

<br>

<i>**Job Completed Successfully**</i>: Quando o job é finalizado sem erros, você verá uma linha de confirmação, indicando que o processamento foi concluído com sucesso.

<br>

<i>**Resultado Final Exibido**</i>: Após a execução bem-sucedida do job, o resultado é extraído do **HDFS** e apresentado. No nosso exemplo, vemos a quantidade de filmes que receberam cada tipo de nota:

<br>

```bash
"BROADWAY"	229790
"3RD AVE"	173303
"5TH AVE"	113589
"MADISON AVE"	107016
"LEXINGTON AVE"	89513
"2ND AVE"	83166
"1ST AVE"	70951
"7TH AVE"	70265
"8TH AVE"	65123
"AMSTERDAM AVE"	63560
"QUEENS BLVD"	60794
"6TH AVE"	58061
"JAMAICA AVE"	48297
...

```

<br>

**Isso significa que**: As ruas **BROADWAY**, **3RD AVE**, e **5TH AVE** destacam-se como as vias mais propensas a receber multas de estacionamento irregular, com contagens de **229.790**, **173.303** e **113.589** infrações, respectivamente. Essas ruas representam áreas com alta concentração de infrações, sinalizando pontos críticos para intervenções regulatórias e de fiscalização. 

<br><br><br><br>






## 14. <u>Criando Arquivo Python com o Job para o MapReduce 6</u>

<br>

> Quais são códigos de rua mais propensos a receber multas por estacionamento irregular?

<br>

#### Código do Arquivo `CodeTicket6.py`

---

```python
from mrjob.job import MRJob
from mrjob.step import MRStep

class InfracoesPorCodigoRua(MRJob):

    def steps(self):
        return [
            MRStep(mapper=self.mapper_get_street_codes,
                   reducer=self.reducer_count_codes),
            MRStep(reducer=self.reducer_sort_top_codes)
        ]

    # Mapper: Extrai cada código de rua presente nas três colunas
    def mapper_get_street_codes(self, _, line):
        fields = line.split(',')
        
        if fields[0].isdigit():
            try:
                # Extrair os códigos de rua (Street Code1, Street Code2, Street Code3)
                street_codes = [fields[9].strip(), fields[10].strip(), fields[11].strip()]
                
                # Emitir cada código de rua válido como uma chave
                for code in street_codes:
                    if code:  # Ignorar códigos vazios
                        yield code, 1

            except IndexError:
                return  # Ignorar linhas com índice inválido

    # Reducer: Soma as ocorrências de cada código de rua
    def reducer_count_codes(self, code, counts):
        yield None, (sum(counts), code)

    # Reducer final: Ordena os códigos de rua por contagem de ocorrências
    def reducer_sort_top_codes(self, _, code_counts):
        # Ordena pela contagem em ordem decrescente
        sorted_codes = sorted(code_counts, reverse=True, key=lambda x: x[0])
        
        for count, code in sorted_codes:
            # Exibe apenas códigos com 10000 ou mais infrações
            if count >= 1000:
                yield code, count

if __name__ == '__main__':
    InfracoesPorCodigoRua.run()
```

---

<br>

### 14.1 Explicando Código do Job MapReduce 6 (`CodeTicket6.py`)

<br>

O script `CodeTicket6` foi desenvolvido para identificar os códigos de rua que aparecem com maior frequência em infrações de estacionamento, indicando as áreas mais propensas a multas. Este código usa o framework MapReduce e segue duas etapas principais: mapeamento e redução, conforme detalhado a seguir.

<br>

#### 1. Mapper:

---

```python
    def mapper_get_street_codes(self, _, line):
        fields = line.split(',')
        
        if fields[0].isdigit():
            try:
                # Extrair os códigos de rua (Street Code1, Street Code2, Street Code3)
                street_codes = [fields[9].strip(), fields[10].strip(), fields[11].strip()]
                
                # Emitir cada código de rua válido como uma chave
                for code in street_codes:
                    if code:  # Ignorar códigos vazios
                        yield code, 1

            except IndexError:
                return  # Ignorar linhas com índice inválido
```

---

A função `mapper_get_street_codes`:

- **Divisão da linha**: Divide a linha de entrada usando vírgulas como delimitador para obter cada campo individual (`line.split(',')`).
- **Validação de campos**: A função verifica se o campo `Summons Number` (posição 0) é numérico, filtrando o cabeçalho ou linhas inválidas.
- **Extração dos códigos de rua**: Extrai os códigos das colunas `Street Code1`, `Street Code2` e `Street Code3` (posições 9, 10 e 11).
- **Emitir chave-valor**: Emite cada código de rua válido como uma chave e `1` como valor, representando uma ocorrência de infração.

<br>

#### 2. Reducer:

---

```python
    # Reducer: Soma as ocorrências de cada código de rua
    def reducer_count_codes(self, code, counts):
        yield None, (sum(counts), code)

    # Reducer final: Ordena os códigos de rua por contagem de ocorrências
    def reducer_sort_top_codes(self, _, code_counts):
        # Ordena pela contagem em ordem decrescente
        sorted_codes = sorted(code_counts, reverse=True, key=lambda x: x[0])
        
        for count, code in sorted_codes:
            # Exibe apenas códigos com 10000 ou mais infrações
            if count >= 1000:
                yield code, count
```

---

A função `reducer_count_codes`:

- **Contagem das ocorrências**: Soma as ocorrências de cada código de rua individualmente.
- **:Saída**:: Emite uma tupla com o total de ocorrências e o código de rua para cada código identificado.

<br>

A função `reducer_sort_top_codes`:

- **Ordenação e filtragem**: Ordena os códigos de rua pela contagem de ocorrências em ordem decrescente. Após a ordenação, filtra os resultados para mostrar apenas os códigos com um mínimo de 1.000 ocorrências.
- **Saída**: Emite o código de rua e o número total de infrações, exibindo apenas os códigos mais frequentes.

<br>

### 14.2 Resumo

- Este script utiliza MapReduce para processar os dados de infrações de estacionamento e identificar os códigos de rua mais recorrentes em multas. O mapeador extrai e conta as infrações por código de rua, enquanto o reducer organiza e exibe os resultados para destacar os códigos de rua com o maior número de infrações, considerando apenas aqueles que aparecem com frequência mínima de 1.000 infrações.


<br> <br>

---

<br> 

## 15. Aplicando o job MapReduce 6

<br>

No contexto do nosso laboratório, usaremos o arquivo Python `CodeTicket6.py` para aplicar o **job MapReduce**. Para isso basta ir ao diretório do arquivo e no terminal digitar:

<br>

---

```bash
    python CodeTicket6.py hdfs:///tickets/PV_2016.csv -r hadoop
```

---

#### 15.1 Explicando a Saída no Terminal (Indicadores de Execução Bem-Sucedida)

<br>

<i>**Job Completed Successfully**</i>: Quando o job é finalizado sem erros, você verá uma linha de confirmação, indicando que o processamento foi concluído com sucesso.

<br>

<i>**Resultado Final Exibido**</i>: Após a execução bem-sucedida do job, o resultado é extraído do **HDFS** e apresentado. No nosso exemplo, vemos a quantidade de filmes que receberam cada tipo de nota:

<br>

```bash
"0"	8504821
"40404"	624805
"13610"	408288
"10210"	315060
"10410"	290852
"10510"	263155
...

```

<br>

**Isso significa que**: O código de rua `"0"` foi o mais frequente, com **8.504.821 infrações**, seguido por outros códigos como `"40404"` e `"13610"`, com **624.805** e **408.288** infrações, respectivamente. Esses códigos indicam áreas específicas que têm uma alta concentração de infrações de estacionamento, sendo potenciais focos para uma fiscalização mais intensa e para revisões de regulamentação.


<br><br><br><br><br>

---

<br><br><br><br><br> 