<span style="color: green; font-size: 40px; font-weight: bold;">Projeto 2 (Análise de Dados em Tempo Real) </span>

<br> <br>

# Análise de Dados de Sensores de Movimento de Clientes em Tempo Real com Apache Spark Streaming e Apache Kafka

<br><br>

### Contexto

Uma grande rede de lojas de varejo está interessada em entender melhor o comportamento de seus clientes dentro das lojas físicas. Cada loja está equipada com sensores de movimento instalados em diferentes áreas, como entradas, corredores e caixas. Esses sensores detectam a presença e o movimento dos clientes em tempo real.

A empresa deseja ter uma **solução de análise de dados em tempo real que rastreie o fluxo de clientes dentro das lojas, calcule métricas como o tempo de permanência em cada área e a contagem de tráfego em cada seção da loja**. Esses insights ajudarão a otimizar a disposição dos produtos, melhorar o atendimento ao cliente e aumentar as vendas.

Além de construir a solução com Spark e Kafka, vamos desenvolver um simulador para gerar dados de sensores de movimento em uma loja física.

<br>

### Objetivo

O objetivo deste projeto é **demonstrar como configurar e executar uma pipeline de dados em tempo real que coleta, processa e analisa dados de sensores de movimento utilizando Apache Kafka e Apache Spark Structured Streaming**. A análise se concentra em calcular métricas como o tempo de permanência dos clientes e o tráfego em diferentes áreas da loja, permitindo otimizar a experiência de compra e o layout da loja.

<br>

### Pergunta de Negócio Principal

> A principal pergunta de negócio que este projeto visa responder é: "**Quais áreas da loja recebem mais tráfego de clientes em tempo real, e como podemos otimizar a disposição dos produtos para melhorar a experiência de compra?**"

<br>

### Entregável

O entregável deste projeto é uma aplicação de streaming em tempo real que:

- Coleta dados de sensores de movimento em tempo real usando Apache Kafka.
- Processa e analisa esses dados em tempo real usando Apache Spark Structured Streaming.
- Calcula e exibe métricas como o tempo de permanência por cliente e a contagem de tráfego por área.
- Permite a consulta em tempo real das áreas da loja com maior tráfego de clientes.

<br>

### Sobre a Fonte de Dados

Os dados utilizados no projeto são gerados por sensores de movimento instalados em várias áreas de uma loja física. Cada entrada de dados inclui:

- **timestamp**: Data e hora exatas em que o sensor de movimento detectou um evento.
- **id_sensor**: Identificador único do sensor de movimento.
- **location**: Localização ou descrição da área onde o sensor está instalado dentro da loja (por exemplo, "Entrada", "Corredor A", "Caixa").
- **customer_id**: Identificador único e anônimo do cliente que foi detectado pelo sensor.
- **movement_detected**: Valor booleano indicando se o sensor detectou movimento (sempre True neste caso).
- **duration**: Tempo, em segundos, que o cliente permaneceu na área monitorada pelo sensor antes de sair ou se mover para outra área.
- **traffic_count**: Número de clientes que passaram pela área monitorada pelo sensor dentro de um período de tempo.

#### Exemplo de Entrada de Dados (json):

<br>

```
{
  "timestamp": "2024-08-23T15:22:16.968007Z",
  "id_sensor": "SM-123AB",
  "location": "Corredor A",
  "customer_id": "Cliente_5678",
  "movement_detected": true,
  "duration": 152.34,
  "traffic_count": 4
}
```

Cada leitura de movimento é capturada em um formato JSON e enviada para o tópico Kafka, que é então consumido pelo Spark Structured Streaming para análise em tempo real.

#### Como simular isso?

Precisamos encontrar uma forma de simular a geração de dados em tempo real a partir de sensores de movimento. Para isso será necessário a construção de um simulador usando a linguagem python para gerar dados de sensores de movimento.

No dia a dia, bastaria solicitar os dados ao responsável pelas lojas os arquivos gerados pelos sensores de movimento.

<br><br>

### Considerações Finais

Este mini-projeto demonstra como é possível utilizar ferramentas modernas de big data para implementar soluções de análise em tempo real. A combinação de Apache Kafka e Apache Spark Structured Streaming oferece uma solução robusta e escalável para lidar com fluxos de dados contínuos, como os gerados por sensores de movimento em lojas físicas. Através desta pipeline, é possível monitorar, analisar e reagir aos dados à medida que são gerados, fornecendo insights imediatos e acionáveis para o negócio.

<br><br><br>

# Instruções para executar o projeto.

<br>

### Etapa 1 - Simulador

1. Abra o terminal ou prompt de comando e acesse a pasta do projeto e vá para a pasta `simulador` que contém o script `simulador.py`. Esta pasta é onde o simulador IoT está localizado, e esse script foi desenvolvido para gerar leituras simuladas de sensores IoT.

> **O que o script faz:** O script `simulador.py` gera dados simulados de sensores de movimento em formato JSON. Ele atribui valores de movimentação a sensores fictícios e os salva em um arquivo de saída. Esses dados são então usados no restante do projeto para simular um fluxo de dados de sensores de movimento em tempo real.

2. Execute o comando abaixo para gerar um arquivo com 10.000 leituras de sensores de movimento (você pode ajustar o número de registros conforme desejar).

   `python simulador.py 10000 > ../dados/dados_movimento.txt`

### Etapa 2 - Apache Kafka

**O que é o Apache Kafka:** 

O Apache Kafka é uma plataforma de streaming distribuída que permite publicar, subscrever, armazenar e processar fluxos de registros em tempo real. Neste projeto, o Kafka atua como uma ponte entre a fonte de dados (sensores IoT) e o Spark Streaming, permitindo que os dados de sensores sejam capturados e transmitidos para processamento em tempo real.

<br><br>

- 1. Acesse a página do Kafka e faça o download da versão usada no curso conforme mostrado na aula em vídeo.

<br>

- 2. Descompacte o arquivo do Kafka dentro da pasta do Mini-Projeto 6.

> **Nota:** As instruções abaixo são para MacOS e Linux. Para Windows as instruções estão no manual em pdf no Capítulo 15 do curso.
   
<br>

- 3. Abra o **terminal 1**, navegue até a pasta do Kafka (`kafka_2.13-3.3.1`) e execute o comando abaixo para inicializar o Zookeepper (gerenciador de cluster do Kafka):

   `bin/zookeeper-server-start.sh config/zookeeper.properties`

<br>

- 4. Abra o **terminal 2**, navegue até a pasta do Kafka (`kafka_2.13-3.3.1`) e execute o comando abaixo para inicializar o Kafka:

   `bin/kafka-server-start.sh config/server.properties`

<br>

- 5. Abra o **terminal 3**, navegue até a pasta do Kafka (`kafka_2.13-3.3.1`) e execute o comando abaixo para criar um tópico no Kafka 
  - (**modificar nome `topic1`**):

   `bin/kafka-topics.sh --create --topic topic1 --bootstrap-server localhost:9092`

<br>

- 6. No mesmo **terminal 3**, execute o comando abaixo para descrever o tópico:

   `bin/kafka-topics.sh --describe --topic topic1 --bootstrap-server localhost:9092`

<br>

- 7. No mesmo **terminal 3**, execute o comando abaixo para produzir o streaming de dados no Kafka (como um produtor de streaming):

   `bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic topic1 < ../dados/dados_movimento.txt`

<br>

- 8. No mesmo **terminal 3**, execute o comando abaixo para listar o conteúdo do tópico (como um consumidor de streaming):

   `bin/kafka-console-consumer.sh --topic topic1 --from-beginning --bootstrap-server localhost:9092`

<br>

- 9. Pressione `Ctrl+C` a qualquer momento para interromper qualquer uma das janelas. Mantenha todas elas abertas enquanto executa a Etapa 3 do projeto.

<br>

### Etapa 3 - Apache Spark

1. Execute o Jupyter Notebook do projeto e execute célula a célula.

<br><br><br><br>

# Importando Pacotes e Configurando Ambiente

<br>

#### Importanto Pacotes

In [1]:
# Importa o findspark e inicializa
import findspark
findspark.init()

# Import required modules
import pyspark
from pyspark.streaming import StreamingContext
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, DoubleType, BooleanType, IntegerType
from pyspark.sql.functions import col, from_json, sum, avg, count, when

####  Conector de integração do Spark Streaming com o Apache Kafka
https://spark.apache.org/docs/latest/structured-streaming-kafka-integration.html

In [2]:
# Conector
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-sql-kafka-0-10_2.12:3.3.0 pyspark-shell'

<br>

#### Criando a Sessão Spark

In [3]:
# Cria a sessão Spark
spark = SparkSession.builder.appName("Projeto2").getOrCreate()

24/08/23 18:42:43 WARN Utils: Your hostname, eduardo-Inspiron-15-3520 resolves to a loopback address: 127.0.1.1; using 172.20.10.8 instead (on interface wlp0s20f3)
24/08/23 18:42:43 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address


:: loading settings :: url = jar:file:/home/eduardo/anaconda3/lib/python3.9/site-packages/pyspark/jars/ivy-2.5.1.jar!/org/apache/ivy/core/settings/ivysettings.xml


Ivy Default Cache set to: /home/eduardo/.ivy2/cache
The jars for the packages stored in: /home/eduardo/.ivy2/jars
org.apache.spark#spark-sql-kafka-0-10_2.12 added as a dependency
:: resolving dependencies :: org.apache.spark#spark-submit-parent-dd9c8693-21c4-4d75-a073-65e3e86a59da;1.0
	confs: [default]
	found org.apache.spark#spark-sql-kafka-0-10_2.12;3.3.0 in central
	found org.apache.spark#spark-token-provider-kafka-0-10_2.12;3.3.0 in central
	found org.apache.kafka#kafka-clients;2.8.1 in central
	found org.lz4#lz4-java;1.8.0 in central
	found org.xerial.snappy#snappy-java;1.1.8.4 in central
	found org.slf4j#slf4j-api;1.7.32 in central
	found org.apache.hadoop#hadoop-client-runtime;3.3.2 in central
	found org.spark-project.spark#unused;1.0.0 in central
	found org.apache.hadoop#hadoop-client-api;3.3.2 in central
	found commons-logging#commons-logging;1.1.3 in central
	found com.google.code.findbugs#jsr305;3.0.0 in central
	found org.apache.commons#commons-pool2;2.11.1 in central
:: re

<br><br><br>


# Leitura do Stream

#### Configurando a leitura de dados em tempo real a partir de um tópico Kafka utilizando Apache Spark.

In [4]:
# Vamos criar uma subscrição no tópico que tem o streaming de dados que desejamos "puxar" os dados.
df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "topic1") \
  .load()

<br><br><br>

# Definição do Schema:

<br>

#### Definindo o Schema

In [5]:
# Definimos o schema dos dados de sensores de movimento
esquema_dados_movimento = StructType([
    StructField("timestamp", StringType(), True), 
    StructField("id_sensor", StringType(), True), 
    StructField("location", StringType(), True), 
    StructField("customer_id", StringType(), True), 
    StructField("movement_detected", BooleanType(), True), 
    StructField("duration", DoubleType(), True), 
    StructField("traffic_count", IntegerType(), True)
])

print('\nSchema definido.')


Schema definido.


<br>

### Resumo de tudo que foi feito

1. **Simulação dos Dados e Gravação em Disco**

- Primeiro, criamos um simulador que gera dados fictícios de sensores de movimento em formato JSON. Este simulador produz um conjunto de dados que imita o comportamento dos sensores em uma loja física, como o tempo em que os clientes permanecem em uma área e o número de clientes detectados. 
- O simulador cria 10.000 linhas de dados JSON, cada uma representando uma leitura de sensor. Estes dados são salvos em um arquivo de texto.

<br>

2. **Uso do Apache Kafka para Streaming de Dados**

Em seguida, usamos o Apache Kafka para atuar como um intermediário entre os dados gerados (agora em disco) e o Apache Spark, que fará o processamento em tempo real.

- **Produção de Dados**: O arquivo JSON gerado pelo simulador é enviado para um tópico Kafka, onde cada linha do arquivo é enviada como uma mensagem JSON para o Kafka.
- **Consumo de Dados**: O Apache Spark se inscreve nesse tópico Kafka e consome essas mensagens em tempo real para processá-las.

<br>

3. **Definição do Schema para Processamento no Spark**

Quando os dados JSON chegam ao Apache Spark através do Kafka, o Spark precisa entender a estrutura desses dados para processá-los corretamente. É aqui que entra a definição do schema.

- **O que é um Schema**: O schema é como um "molde" que define a estrutura dos dados JSON. Ele informa ao Spark quais são os campos (colunas) presentes em cada objeto JSON e quais são os tipos de dados associados a cada campo.

#### Por que precisamos de um Schema?

- **Estruturação**: O schema ajuda o Spark a entender que `timestamp` é uma string que representa data e hora, `id_sensor` é uma string que identifica o sensor, `duration` é um número decimal, e assim por diante.
- **Performance**: Com o schema, o Spark não precisa "adivinhar" a estrutura dos dados durante o processamento, o que torna a análise mais eficiente.
- **Consistência**: O schema assegura que os dados sejam interpretados de forma consistente ao longo do pipeline de processamento.

<br>

4. **Processamento dos Dados no Spark**

Com o schema definido, o Apache Spark pode agora processar as mensagens JSON recebidas do Kafka:

- **Leitura dos Dados**: O Spark lê os dados JSON e aplica o schema definido, mapeando cada campo do JSON para uma coluna correspondente.
- **Análise**: Com os dados estruturados, o Spark pode realizar operações de análise, como calcular a média de duration (tempo que os clientes passam em uma área) ou somar o traffic_count (número de clientes em uma área).

<br><br>

# Parse e Preparo dos Dados:

<br>

#### Conversão de cada linha de dado do stream para JSON e transformação em um DataFrame estruturado

In [6]:
# Capturamos cada linha de dado (cada valor) como string
df_conversao = df.selectExpr("CAST(value AS STRING)")

# Parse do formato JSON em dataframe
df_conversao = df_conversao.withColumn("jsonData", from_json(col("value"), esquema_dados_movimento)).select("jsonData.*")

df_conversao = df_conversao.filter((col("duration").isNotNull()) & (col("location").isNotNull()))

df_conversao.printSchema()

root
 |-- timestamp: string (nullable = true)
 |-- id_sensor: string (nullable = true)
 |-- location: string (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- movement_detected: boolean (nullable = true)
 |-- duration: double (nullable = true)
 |-- traffic_count: integer (nullable = true)



In [7]:
# Selecionar as colunas relevantes para a análise
df_movimento_preparado = df_conversao.select(
    col("location"),
    col("duration"),
    col("traffic_count")
)

df_movimento_preparado.printSchema()

root
 |-- location: string (nullable = true)
 |-- duration: double (nullable = true)
 |-- traffic_count: integer (nullable = true)



<br><br>

# Análise de Dados em Tempo Real:

<br>

#### Criando Objeto Para Análise

In [8]:
# Agrupar os dados por 'location' e calcular as métricas agregadas
df_analise_movimento = df_movimento_preparado.groupBy("location").agg(
    sum("duration").alias("total_duration"),
    sum("traffic_count").alias("total_traffic"),
    avg("duration").alias("avg_duration")
)

# Exibir o esquema do DataFrame de análise para verificação
df_analise_movimento.printSchema()

root
 |-- location: string (nullable = true)
 |-- total_duration: double (nullable = true)
 |-- total_traffic: long (nullable = true)
 |-- avg_duration: double (nullable = true)



<br>

#### Imprimindo o resultado no console.

Abaixo abrimos o streaming para análise de dados em tempo real,

In [9]:
# Objeto que inicia a consulta ao streaming com formato de console
query = df_analise_movimento.writeStream.outputMode("complete").format("console").start()

24/08/23 18:42:49 WARN ResolveWriteToStream: Temporary checkpoint location created which is deleted normally when the query didn't fail: /tmp/temporary-b619d1ef-c54b-4afa-914b-3949945c2b37. If it's required to delete it under any circumstances, please set spark.sql.streaming.forceDeleteTempCheckpointLocation to true. Important to know deleting temp checkpoint folder is best effort.
24/08/23 18:42:49 WARN ResolveWriteToStream: spark.sql.adaptive.enabled is not supported in streaming DataFrames/Datasets and will be disabled.
24/08/23 18:42:50 WARN AdminClientConfig: The configuration 'key.deserializer' was supplied but isn't a known config.
24/08/23 18:42:50 WARN AdminClientConfig: The configuration 'value.deserializer' was supplied but isn't a known config.
24/08/23 18:42:50 WARN AdminClientConfig: The configuration 'enable.auto.commit' was supplied but isn't a known config.
24/08/23 18:42:50 WARN AdminClientConfig: The configuration 'max.poll.records' was supplied but isn't a known con

-------------------------------------------
Batch: 0
-------------------------------------------
+--------+--------------+-------------+------------+
|location|total_duration|total_traffic|avg_duration|
+--------+--------------+-------------+------------+
+--------+--------------+-------------+------------+



                                                                                

-------------------------------------------
Batch: 1
-------------------------------------------
+--------------------+------------------+-------------+------------------+
|            location|    total_duration|total_traffic|      avg_duration|
+--------------------+------------------+-------------+------------------+
|          Corredor_B|2063.7500000000005|           86| 147.4107142857143|
|          Corredor_A|2804.4199999999996|           95| 186.9613333333333|
|               Caixa|2340.5799999999995|          107| 137.6811764705882|
|    Sessao_de_Roupas|1634.6099999999997|           89|125.73923076923074|
|Sessao_de_Eletron...| 3381.460000000001|          110| 161.0219047619048|
+--------------------+------------------+-------------+------------------+



                                                                                

-------------------------------------------
Batch: 2
-------------------------------------------
+--------------------+------------------+-------------+------------------+
|            location|    total_duration|total_traffic|      avg_duration|
+--------------------+------------------+-------------+------------------+
|          Corredor_B|         309394.91|        11261| 153.0142977250247|
|          Corredor_A|307633.29000000044|        10786|154.97898740554177|
|               Caixa| 309626.8599999998|        10969|154.35037886340967|
|    Sessao_de_Roupas| 301651.1699999999|        10996|152.65747469635625|
|Sessao_de_Eletron...| 309193.3300000002|        11068|  153.751034311288|
+--------------------+------------------+-------------+------------------+



                                                                                

-------------------------------------------
Batch: 3
-------------------------------------------
+--------------------+------------------+-------------+------------------+
|            location|    total_duration|total_traffic|      avg_duration|
+--------------------+------------------+-------------+------------------+
|          Corredor_B|          314005.3|        11465|152.94948855333658|
|          Corredor_A|313090.68000000046|        10983|155.07215453194672|
|               Caixa| 314679.0899999998|        11170| 154.1788780009798|
|    Sessao_de_Roupas| 305591.3499999999|        11163| 152.7193153423288|
|Sessao_de_Eletron...| 314260.3300000002|        11254|153.59742424242432|
+--------------------+------------------+-------------+------------------+



                                                                                

-------------------------------------------
Batch: 4
-------------------------------------------
+--------------------+-----------------+-------------+------------------+
|            location|   total_duration|total_traffic|      avg_duration|
+--------------------+-----------------+-------------+------------------+
|          Corredor_B|618789.8200000001|        22522|153.01429772502473|
|          Corredor_A| 615266.580000001|        21572| 154.9789874055418|
|               Caixa|619253.7199999997|        21938| 154.3503788634097|
|    Sessao_de_Roupas|603302.3399999999|        21992|152.65747469635625|
|Sessao_de_Eletron...|618386.6600000004|        22136|  153.751034311288|
+--------------------+-----------------+-------------+------------------+



## Importante

- Agora é necessário abrir um **novo terminal 1** na pasta `simulador` onde está o conjunto de dados e digitar:

`python simulador.py 10000 > ../dados/dados_movimento.txt`

- Agora é necessário abrir um **novo terminal 2** na pasta do Kafka (`kafka_2.13-3.3.1`) e digitar:

`bin/kafka-console-producer.sh --bootstrap-server localhost:9092 --topic topic1 < ../dados/dados_movimento.txt`

<br>

> Após isso, a tabela acima será atualizada

<br>



<br><br>

# FIM