---

## 1. Justificativa: Alternativa ao Discord

### Por que n√£o usar Discord?

A integra√ß√£o direta com Discord apresenta diversos desafios t√©cnicos que a tornam **invi√°vel** para execu√ß√£o em ambiente de notebook:

#### 1.1 Complexidade de Autentica√ß√£o OAuth
- Discord exige cria√ß√£o de aplica√ß√£o no [Discord Developer Portal](https://discord.com/developers/applications)
- Necessita configura√ß√£o de Bot Token e permiss√µes espec√≠ficas
- Requer Gateway Intents (Privileged Intents) que precisam ser aprovados manualmente
- Tokens sens√≠veis n√£o devem ser expostos em notebooks compartilh√°veis

#### 1.2 Requisitos de Infraestrutura Persistente
- Discord bots precisam de **conex√£o WebSocket persistente** com os servidores
- Notebook executa c√©lulas de forma **n√£o-persistente** (execu√ß√£o pontual)
- Desconex√µes frequentes invalidariam o streaming cont√≠nuo
- Rate limits do Discord (50 requests/segundo) complicam testes

#### 1.3 Depend√™ncias de Servidor/Canal
- Necessita servidor Discord espec√≠fico com permiss√µes de administrador
- Configura√ß√£o de canais e webhooks externos ao ambiente de teste
- N√£o √© reproduz√≠vel sem acesso ao servidor Discord configurado

### Alternativa Escolhida: Simulador de Streaming HTTP/RSS

Para garantir **reprodutibilidade** e **execu√ß√£o autocontida**, implementamos:

1. **Producer Python** que simula mensagens de rede social
2. **Fonte de dados**: API p√∫blica de not√≠cias (NewsAPI) ou gera√ß√£o sint√©tica
3. **Publica√ß√£o em Kafka** no t√≥pico `social-input`
4. **Formato JSON** compat√≠vel com pipeline Spark

**Vantagens:**
- ‚úÖ Totalmente execut√°vel dentro do notebook
- ‚úÖ Sem depend√™ncias de contas externas
- ‚úÖ Reproduz√≠vel em qualquer ambiente
- ‚úÖ Controle total sobre taxa de mensagens
- ‚úÖ Simula caracter√≠sticas de rede social (timestamp, autor, texto)

**Refer√™ncias:**
- [Discord Developer Docs - Rate Limits](https://discord.com/developers/docs/topics/rate-limits)
- [Kafka Documentation - Use Cases](https://kafka.apache.org/documentation/#uses)

---

## 2. Configura√ß√£o do Ambiente

### 2.1 Importa√ß√µes e Configura√ß√µes Iniciais

In [None]:
import os
import sys
import time
import json
import subprocess
import threading
from datetime import datetime
from pathlib import Path

# Configura√ß√µes globais
KAFKA_BOOTSTRAP_SERVERS = "localhost:29092"
KAFKA_TOPIC_INPUT = "social-input"
KAFKA_TOPIC_OUTPUT = "wordcount-output"
ELASTICSEARCH_HOST = "http://localhost:9200"
ELASTICSEARCH_INDEX = "wordcount-realtime"
SPARK_MASTER = "spark://localhost:7077"

# Diret√≥rio do projeto
PROJECT_DIR = Path("/home/edilberto/pspd/atividade-extraclasse-2-pspd/spark")
os.chdir(PROJECT_DIR)

print("‚úì Configura√ß√µes carregadas")
print(f"  - Kafka: {KAFKA_BOOTSTRAP_SERVERS}")
print(f"  - T√≥pico entrada: {KAFKA_TOPIC_INPUT}")
print(f"  - T√≥pico sa√≠da: {KAFKA_TOPIC_OUTPUT}")
print(f"  - Elasticsearch: {ELASTICSEARCH_HOST}")
print(f"  - √çndice ES: {ELASTICSEARCH_INDEX}")

### 2.2 Iniciar Infraestrutura Docker

In [None]:
%%bash
cd /home/edilberto/pspd/atividade-extraclasse-2-pspd/spark

echo "=== Iniciando containers Docker ==="
docker-compose up -d

echo ""
echo "=== Aguardando inicializa√ß√£o dos servi√ßos (30s) ==="
sleep 30

echo ""
echo "=== Status dos containers ==="
docker-compose ps

### 2.3 Verificar Sa√∫de dos Servi√ßos

In [None]:
import requests

def check_service(name, url, timeout=5):
    """Verifica se um servi√ßo est√° respondendo"""
    try:
        response = requests.get(url, timeout=timeout)
        if response.status_code < 400:
            print(f"‚úì {name}: OK (status {response.status_code})")
            return True
        else:
            print(f"‚úó {name}: ERRO (status {response.status_code})")
            return False
    except Exception as e:
        print(f"‚úó {name}: INACESS√çVEL ({str(e)})")
        return False

print("=== Verificando Servi√ßos ===")
es_ok = check_service("Elasticsearch", "http://localhost:9200")
kibana_ok = check_service("Kibana", "http://localhost:5601")
spark_ok = check_service("Spark Master", "http://localhost:8080")

if es_ok and kibana_ok and spark_ok:
    print("\n‚úì Todos os servi√ßos est√£o operacionais!")
else:
    print("\n‚ö† Alguns servi√ßos n√£o responderam. Aguarde mais tempo ou verifique logs.")

### 2.4 Criar T√≥picos Kafka

In [None]:
%%bash
echo "=== Criando t√≥picos Kafka ==="

# Criar t√≥pico de entrada
docker exec kafka kafka-topics --create \
  --bootstrap-server localhost:9092 \
  --topic social-input \
  --partitions 3 \
  --replication-factor 1 \
  --if-not-exists

# Criar t√≥pico de sa√≠da
docker exec kafka kafka-topics --create \
  --bootstrap-server localhost:9092 \
  --topic wordcount-output \
  --partitions 3 \
  --replication-factor 1 \
  --if-not-exists

echo ""
echo "=== Listando t√≥picos ==="
docker exec kafka kafka-topics --list --bootstrap-server localhost:9092

---

## 3. Producer Kafka - Simulador de Rede Social

### 3.1 Instala√ß√£o de Depend√™ncias Python

In [None]:
!pip install -q kafka-python elasticsearch pyspark==3.5.0

### 3.2 Implementa√ß√£o do Producer

In [None]:
from kafka import KafkaProducer
import random
import time
import json
from datetime import datetime

# Dataset sint√©tico simulando mensagens de rede social
SAMPLE_MESSAGES = [
    "Apache Spark is amazing for big data processing and analytics",
    "Learning distributed systems with Kafka and Spark",
    "Real-time data processing using Structured Streaming",
    "Elasticsearch and Kibana provide powerful visualization tools",
    "Docker containers make deployment much easier",
    "Big data analytics requires scalable infrastructure",
    "Cloud computing enables elastic scalability",
    "Machine learning models benefit from distributed training",
    "Parallel computing accelerates data processing pipelines",
    "Kafka provides reliable message streaming capabilities",
    "Python is a versatile language for data science",
    "Distributed databases handle massive datasets efficiently",
    "Stream processing enables real-time analytics",
    "Microservices architecture improves system modularity",
    "Data pipelines transform raw data into insights"
]

SAMPLE_USERS = ["alice", "bob", "charlie", "diana", "eve", "frank", "grace", "henry"]

class SocialMediaProducer:
    """Producer que simula mensagens de rede social"""
    
    def __init__(self, bootstrap_servers, topic):
        self.producer = KafkaProducer(
            bootstrap_servers=bootstrap_servers,
            value_serializer=lambda v: json.dumps(v).encode('utf-8')
        )
        self.topic = topic
        self.running = False
        self.message_count = 0
        
    def generate_message(self):
        """Gera uma mensagem sint√©tica"""
        return {
            "user": random.choice(SAMPLE_USERS),
            "text": random.choice(SAMPLE_MESSAGES),
            "timestamp": datetime.now().isoformat(),
            "platform": "SimulatedSocial",
            "message_id": self.message_count
        }
    
    def start(self, duration_seconds=300, messages_per_second=2):
        """Inicia o producer por um per√≠odo determinado"""
        self.running = True
        start_time = time.time()
        interval = 1.0 / messages_per_second
        
        print(f"‚ñ∂ Producer iniciado: {messages_per_second} msgs/seg por {duration_seconds}s")
        
        try:
            while self.running and (time.time() - start_time) < duration_seconds:
                message = self.generate_message()
                self.producer.send(self.topic, value=message)
                self.message_count += 1
                
                if self.message_count % 10 == 0:
                    print(f"  [{self.message_count}] Enviado: {message['user']}: {message['text'][:50]}...")
                
                time.sleep(interval)
        
        except KeyboardInterrupt:
            print("\n‚è∏ Producer interrompido pelo usu√°rio")
        finally:
            self.stop()
    
    def stop(self):
        """Para o producer"""
        self.running = False
        self.producer.flush()
        self.producer.close()
        print(f"‚ñ† Producer parado. Total de mensagens: {self.message_count}")

# Criar producer (ser√° iniciado posteriormente)
producer = SocialMediaProducer(KAFKA_BOOTSTRAP_SERVERS, KAFKA_TOPIC_INPUT)
print("‚úì Producer configurado e pronto")

### 3.3 Testar Producer (Envio de Amostra)

In [None]:
# Enviar 20 mensagens de teste
print("=== Teste do Producer (20 mensagens) ===")
test_producer = SocialMediaProducer(KAFKA_BOOTSTRAP_SERVERS, KAFKA_TOPIC_INPUT)
test_producer.start(duration_seconds=10, messages_per_second=2)

### 3.4 Verificar Mensagens no T√≥pico Kafka

In [None]:
%%bash
echo "=== √öltimas 5 mensagens no t√≥pico social-input ==="
docker exec kafka kafka-console-consumer \
  --bootstrap-server localhost:9092 \
  --topic social-input \
  --from-beginning \
  --max-messages 5 \
  --timeout-ms 5000 2>/dev/null | tail -5

---

## 4. Pipeline Spark Structured Streaming

### 4.1 Configura√ß√£o da Sess√£o Spark

In [None]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import (
    col, explode, split, window, from_json, to_json, struct, current_timestamp
)
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, TimestampType

# Criar sess√£o Spark com suporte a Kafka
spark = SparkSession.builder \
    .appName("B2_SocialMedia_WordCount") \
    .master("local[*]") \
    .config("spark.jars.packages", "org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0") \
    .config("spark.sql.streaming.forceDeleteTempCheckpointLocation", "true") \
    .getOrCreate()

spark.sparkContext.setLogLevel("WARN")

print("‚úì Sess√£o Spark criada")
print(f"  - Vers√£o: {spark.version}")
print(f"  - App: {spark.sparkContext.appName}")

### 4.2 Schema das Mensagens de Entrada

In [None]:
# Schema JSON das mensagens do producer
message_schema = StructType([
    StructField("user", StringType(), True),
    StructField("text", StringType(), True),
    StructField("timestamp", StringType(), True),
    StructField("platform", StringType(), True),
    StructField("message_id", IntegerType(), True)
])

print("‚úì Schema definido:")
message_schema.printTreeString()

### 4.3 Ler Stream do Kafka (Entrada)

In [None]:
# Ler mensagens do t√≥pico de entrada
raw_stream = spark.readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS) \
    .option("subscribe", KAFKA_TOPIC_INPUT) \
    .option("startingOffsets", "latest") \
    .load()

# Parse JSON e extrair campos
messages = raw_stream.selectExpr("CAST(value AS STRING) as json") \
    .select(from_json(col("json"), message_schema).alias("data")) \
    .select("data.*") \
    .withColumn("event_time", current_timestamp())

print("‚úì Stream de entrada configurado")
print("  Schema do stream:")
messages.printSchema()

### 4.4 Processamento: Word Count com Janelas Temporais

In [None]:
# Extrair palavras do texto
words = messages.select(
    explode(split(col("text"), "\\s+")).alias("word"),
    col("event_time"),
    col("user")
).filter(col("word") != "")

# Normalizar palavras (lowercase, remover pontua√ß√£o)
from pyspark.sql.functions import lower, regexp_replace

words_clean = words.withColumn(
    "word", 
    lower(regexp_replace(col("word"), "[^a-zA-Z0-9]", ""))
).filter(col("word") != "")

# Agrega√ß√£o com janelas de 30 segundos
WINDOW_DURATION = "30 seconds"
SLIDE_DURATION = "10 seconds"

word_counts = words_clean \
    .withWatermark("event_time", "1 minute") \
    .groupBy(
        window(col("event_time"), WINDOW_DURATION, SLIDE_DURATION),
        col("word")
    ) \
    .count() \
    .select(
        col("window.start").alias("window_start"),
        col("window.end").alias("window_end"),
        col("word"),
        col("count")
    )

print("‚úì Pipeline de word count configurado")
print(f"  - Janela: {WINDOW_DURATION}")
print(f"  - Slide: {SLIDE_DURATION}")
print(f"  - Watermark: 1 minuto")

### 4.5 Escrever Resultados no Kafka (Sa√≠da)

In [None]:
# Converter para JSON
output_json = word_counts.select(
    to_json(struct(
        col("word"),
        col("count"),
        col("window_start"),
        col("window_end")
    )).alias("value")
)

# Escrever no t√≥pico de sa√≠da
query_kafka = output_json.writeStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS) \
    .option("topic", KAFKA_TOPIC_OUTPUT) \
    .option("checkpointLocation", "/tmp/spark-checkpoint-kafka") \
    .outputMode("update") \
    .start()

print("‚úì Stream de sa√≠da para Kafka iniciado")
print(f"  - Query ID: {query_kafka.id}")
print(f"  - T√≥pico: {KAFKA_TOPIC_OUTPUT}")

### 4.6 Visualiza√ß√£o em Console (Debug)

In [None]:
# Query adicional para visualizar resultados no console
query_console = word_counts.writeStream \
    .format("console") \
    .option("truncate", "false") \
    .option("numRows", 20) \
    .outputMode("update") \
    .start()

print("‚úì Stream de debug (console) iniciado")
print(f"  - Query ID: {query_console.id}")

---

## 5. Iniciar Producer em Background

### 5.1 Executar Producer em Thread Separada

In [None]:
import threading

# Configura√ß√£o do producer
PRODUCER_DURATION = 180  # 3 minutos
MESSAGES_PER_SECOND = 3

# Criar novo producer
background_producer = SocialMediaProducer(KAFKA_BOOTSTRAP_SERVERS, KAFKA_TOPIC_INPUT)

# Fun√ß√£o para executar em thread
def run_producer():
    background_producer.start(
        duration_seconds=PRODUCER_DURATION,
        messages_per_second=MESSAGES_PER_SECOND
    )

# Iniciar thread
producer_thread = threading.Thread(target=run_producer, daemon=True)
producer_thread.start()

print(f"‚úì Producer iniciado em background")
print(f"  - Dura√ß√£o: {PRODUCER_DURATION}s")
print(f"  - Taxa: {MESSAGES_PER_SECOND} msgs/seg")
print(f"  - Total esperado: ~{PRODUCER_DURATION * MESSAGES_PER_SECOND} mensagens")

### 5.2 Monitorar Queries Spark (Aguardar Processamento)

In [None]:
import time

print("=== Monitorando Queries Spark ===")
print("Aguarde 60 segundos para acumular dados...\n")

for i in range(6):
    time.sleep(10)
    print(f"[{(i+1)*10}s] Query Kafka: {query_kafka.status['message']}")
    print(f"[{(i+1)*10}s] Query Console: {query_console.status['message']}")
    
    # Mostrar progresso
    if 'numInputRows' in query_kafka.lastProgress:
        input_rows = query_kafka.lastProgress['numInputRows']
        print(f"       ‚Üí Linhas processadas no √∫ltimo batch: {input_rows}")
    print()

print("‚úì Processamento em andamento. Verifique console acima para word counts.")

---

## 6. Consumer Elasticsearch

### 6.1 Criar √çndice no Elasticsearch

In [None]:
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk

# Conectar ao Elasticsearch
es = Elasticsearch([ELASTICSEARCH_HOST])

# Verificar conex√£o
if es.ping():
    print("‚úì Conectado ao Elasticsearch")
    print(f"  - Cluster: {es.info()['cluster_name']}")
    print(f"  - Vers√£o: {es.info()['version']['number']}")
else:
    print("‚úó Falha ao conectar no Elasticsearch")

# Criar √≠ndice com mapping
index_mapping = {
    "mappings": {
        "properties": {
            "word": {"type": "keyword"},
            "count": {"type": "integer"},
            "window_start": {"type": "date"},
            "window_end": {"type": "date"},
            "indexed_at": {"type": "date"}
        }
    }
}

# Deletar √≠ndice existente (se houver)
if es.indices.exists(index=ELASTICSEARCH_INDEX):
    es.indices.delete(index=ELASTICSEARCH_INDEX)
    print(f"  - √çndice '{ELASTICSEARCH_INDEX}' deletado")

# Criar novo √≠ndice
es.indices.create(index=ELASTICSEARCH_INDEX, body=index_mapping)
print(f"‚úì √çndice '{ELASTICSEARCH_INDEX}' criado com sucesso")

### 6.2 Consumer Kafka ‚Üí Elasticsearch

In [None]:
from kafka import KafkaConsumer
import json
from datetime import datetime

class ElasticsearchConsumer:
    """Consumer que indexa word counts no Elasticsearch"""
    
    def __init__(self, bootstrap_servers, topic, es_client, es_index):
        self.consumer = KafkaConsumer(
            topic,
            bootstrap_servers=bootstrap_servers,
            value_deserializer=lambda m: json.loads(m.decode('utf-8')),
            auto_offset_reset='latest',
            enable_auto_commit=True,
            group_id='elasticsearch-consumer-group'
        )
        self.es = es_client
        self.index = es_index
        self.running = False
        self.document_count = 0
        
    def start(self, duration_seconds=120, batch_size=50):
        """Inicia o consumer por um per√≠odo determinado"""
        self.running = True
        start_time = time.time()
        batch = []
        
        print(f"‚ñ∂ Consumer Elasticsearch iniciado ({duration_seconds}s)")
        
        try:
            for message in self.consumer:
                if (time.time() - start_time) > duration_seconds:
                    break
                
                data = message.value
                
                # Preparar documento
                doc = {
                    "_index": self.index,
                    "_source": {
                        "word": data.get("word"),
                        "count": data.get("count"),
                        "window_start": data.get("window_start"),
                        "window_end": data.get("window_end"),
                        "indexed_at": datetime.now().isoformat()
                    }
                }
                batch.append(doc)
                self.document_count += 1
                
                # Indexar em lote
                if len(batch) >= batch_size:
                    success, _ = bulk(self.es, batch)
                    print(f"  [{self.document_count}] Indexados {success} documentos")
                    batch = []
            
            # Indexar documentos restantes
            if batch:
                success, _ = bulk(self.es, batch)
                print(f"  [Final] Indexados {success} documentos")
        
        except KeyboardInterrupt:
            print("\n‚è∏ Consumer interrompido pelo usu√°rio")
        finally:
            self.stop()
    
    def stop(self):
        """Para o consumer"""
        self.running = False
        self.consumer.close()
        print(f"‚ñ† Consumer parado. Total de documentos: {self.document_count}")

# Criar consumer
es_consumer = ElasticsearchConsumer(
    KAFKA_BOOTSTRAP_SERVERS,
    KAFKA_TOPIC_OUTPUT,
    es,
    ELASTICSEARCH_INDEX
)

print("‚úì Consumer Elasticsearch configurado")

### 6.3 Executar Consumer em Background

In [None]:
CONSUMER_DURATION = 120  # 2 minutos

def run_consumer():
    es_consumer.start(duration_seconds=CONSUMER_DURATION, batch_size=30)

# Iniciar thread do consumer
consumer_thread = threading.Thread(target=run_consumer, daemon=True)
consumer_thread.start()

print(f"‚úì Consumer iniciado em background")
print(f"  - Dura√ß√£o: {CONSUMER_DURATION}s")
print(f"  - Batch size: 30 documentos")

### 6.4 Aguardar e Verificar Indexa√ß√£o

In [None]:
print("=== Aguardando indexa√ß√£o (90 segundos) ===")
time.sleep(90)

# Verificar contagem de documentos
es.indices.refresh(index=ELASTICSEARCH_INDEX)
count = es.count(index=ELASTICSEARCH_INDEX)['count']

print(f"\n‚úì Documentos indexados: {count}")

# Mostrar amostra
if count > 0:
    print("\n=== Amostra de 10 word counts ===")
    result = es.search(
        index=ELASTICSEARCH_INDEX,
        body={
            "size": 10,
            "sort": [{"count": {"order": "desc"}}]
        }
    )
    
    for hit in result['hits']['hits']:
        doc = hit['_source']
        print(f"  {doc['word']:20s} ‚Üí {doc['count']:3d} ocorr√™ncias")
else:
    print("‚ö† Nenhum documento indexado ainda. Aguarde mais tempo.")

---

## 7. Visualiza√ß√£o no Kibana

### 7.1 Instru√ß√µes para Criar Nuvem de Palavras

#### Passo 1: Acessar Kibana
1. Abra o navegador em: **http://localhost:5601**
2. Aguarde o Kibana carregar (pode levar alguns segundos)

#### Passo 2: Criar Index Pattern
1. No menu lateral, clique em **"Stack Management"** (√≠cone de engrenagem)
2. Em **"Kibana"**, clique em **"Data Views"** (ou **"Index Patterns"**)
3. Clique em **"Create data view"**
4. Configure:
   - **Name:** `WordCount Real-Time`
   - **Index pattern:** `wordcount-realtime*`
   - **Timestamp field:** `window_start`
5. Clique em **"Create data view"**

#### Passo 3: Criar Visualiza√ß√£o Tag Cloud
1. No menu lateral, clique em **"Visualize Library"** (√≠cone de gr√°fico)
2. Clique em **"Create visualization"**
3. Selecione o tipo **"Tag Cloud"** (nuvem de tags)
4. Selecione o data view **"WordCount Real-Time"**
5. Configure a visualiza√ß√£o:
   - **Buckets ‚Üí Tags:**
     - Aggregation: `Terms`
     - Field: `word.keyword`
     - Order By: `Metric: Count`
     - Order: `Descending`
     - Size: `50` (top 50 palavras)
   - **Metrics:**
     - Aggregation: `Sum`
     - Field: `count`
6. Clique em **"Update"** (‚ñ∂Ô∏è) para aplicar
7. Ajuste o time range no canto superior direito (ex: "Last 15 minutes")
8. Salve a visualiza√ß√£o:
   - Clique em **"Save"** no topo
   - Nome: `Word Cloud - Social Media Stream`

#### Passo 4: Criar Dashboard
1. No menu lateral, clique em **"Dashboard"**
2. Clique em **"Create dashboard"**
3. Clique em **"Add"** ‚Üí Selecione `Word Cloud - Social Media Stream`
4. Adicione visualiza√ß√µes complementares:
   - **Vertical Bar**: Count por janela temporal
   - **Data Table**: Top 20 palavras
   - **Metric**: Total de palavras √∫nicas
5. Configure auto-refresh:
   - Clique no rel√≥gio no topo
   - Selecione **"Auto-refresh"** ‚Üí `10 seconds`
6. Salve o dashboard:
   - Nome: `B2 - Real-Time Word Count Analytics`

### 7.2 Alternativas ao Tag Cloud

Se o Tag Cloud n√£o estiver dispon√≠vel na sua vers√£o do Kibana:

**Op√ß√£o A: Horizontal Bar Chart**
- Tipo: `Horizontal Bar`
- Y-axis: `word.keyword` (Terms, top 30)
- X-axis: `count` (Sum)
- Visualiza as palavras mais frequentes em barras horizontais

**Op√ß√£o B: Data Table**
- Tipo: `Data Table`
- Rows: `word.keyword` (Terms, top 50)
- Metrics: `count` (Sum)
- Visualiza tabela ordenada por contagem

**Op√ß√£o C: Treemap**
- Tipo: `Treemap`
- Groups: `word.keyword` (Terms, top 40)
- Size: `count` (Sum)
- Visualiza palavras em blocos proporcionais √† frequ√™ncia

### 7.3 Screenshot do Dashboard

**A√á√ÉO MANUAL REQUERIDA:**
1. Ap√≥s criar o dashboard, tire um screenshot completo
2. Salve como: `resultados_spark/kibana_dashboard_wordcloud.png`
3. Capture tamb√©m a visualiza√ß√£o Tag Cloud isolada
4. Salve como: `resultados_spark/kibana_tagcloud_detail.png`

### 7.4 Verificar Dados no Kibana via API

In [None]:
# Verificar se o √≠ndice est√° vis√≠vel no Kibana
import requests

kibana_url = "http://localhost:5601"

print("=== Verifica√ß√£o Kibana ===")
try:
    # Check Kibana status
    response = requests.get(f"{kibana_url}/api/status")
    if response.status_code == 200:
        print("‚úì Kibana est√° acess√≠vel")
        print(f"  URL: {kibana_url}")
        print(f"\nüìä Acesse o Kibana e siga as instru√ß√µes acima para criar a visualiza√ß√£o")
    else:
        print(f"‚ö† Kibana retornou status {response.status_code}")
except Exception as e:
    print(f"‚úó Erro ao conectar no Kibana: {e}")

# Mostrar estat√≠sticas do √≠ndice
print(f"\n=== Estat√≠sticas do √çndice '{ELASTICSEARCH_INDEX}' ===")
stats = es.indices.stats(index=ELASTICSEARCH_INDEX)
print(f"  - Total de documentos: {stats['_all']['total']['docs']['count']}")
print(f"  - Tamanho em disco: {stats['_all']['total']['store']['size_in_bytes'] / 1024:.2f} KB")

---

## 8. Parar Streams e Limpar Recursos

### 8.1 Parar Queries Spark

In [None]:
print("=== Parando Queries Spark ===")

# Parar query Kafka
if query_kafka.isActive:
    query_kafka.stop()
    print("‚úì Query Kafka parada")

# Parar query Console
if query_console.isActive:
    query_console.stop()
    print("‚úì Query Console parada")

# Aguardar threads terminarem
time.sleep(5)

print("\n‚úì Todas as queries foram paradas")

### 8.2 Fechar Sess√£o Spark

In [None]:
spark.stop()
print("‚úì Sess√£o Spark encerrada")

### 8.3 Estat√≠sticas Finais

In [None]:
print("=== ESTAT√çSTICAS FINAIS ===")

# Contagem final no Elasticsearch
es.indices.refresh(index=ELASTICSEARCH_INDEX)
final_count = es.count(index=ELASTICSEARCH_INDEX)['count']

print(f"\nüìä Documentos indexados no Elasticsearch: {final_count}")

# Top 20 palavras
if final_count > 0:
    print("\nüìà Top 20 Palavras Mais Frequentes:")
    result = es.search(
        index=ELASTICSEARCH_INDEX,
        body={
            "size": 0,
            "aggs": {
                "top_words": {
                    "terms": {
                        "field": "word.keyword",
                        "size": 20,
                        "order": {"total_count": "desc"}
                    },
                    "aggs": {
                        "total_count": {
                            "sum": {"field": "count"}
                        }
                    }
                }
            }
        }
    )
    
    for i, bucket in enumerate(result['aggregations']['top_words']['buckets'], 1):
        word = bucket['key']
        count = int(bucket['total_count']['value'])
        print(f"  {i:2d}. {word:20s} ‚Üí {count:4d} ocorr√™ncias")

# Verificar t√≥picos Kafka
print("\nüì® T√≥picos Kafka:")
!docker exec kafka kafka-topics --list --bootstrap-server localhost:9092

print("\n‚úì Pipeline B2 executado com sucesso!")

### 8.4.7 Visualiza√ß√£o de Sentimentos no Kibana (OPCIONAL)

**Ap√≥s executar o pipeline de sentimentos**, criar visualiza√ß√µes no Kibana:

#### 1. Criar Data View para Sentimentos
- Stack Management ‚Üí Data Views
- Name: `Social Sentiment Analysis`
- Index pattern: `social-sentiment*`
- Timestamp: `timestamp`

#### 2. Visualiza√ß√£o: Pie Chart - Distribui√ß√£o de Sentimentos
- Tipo: **Pie Chart**
- Metric: Count
- Buckets: Terms by `sentiment_classification.keyword`
- Mostra propor√ß√£o de mensagens positivas/neutras/negativas

#### 3. Visualiza√ß√£o: Line Chart - Sentimento ao Longo do Tempo
- Tipo: **Line**
- X-axis: Date Histogram on `timestamp`
- Y-axis: Average of `sentiment_compound`
- Split series: Terms by `sentiment_classification.keyword`
- Mostra evolu√ß√£o temporal dos sentimentos

#### 4. Visualiza√ß√£o: Data Table - Top Mensagens por Sentimento
- Tipo: **Data Table**
- Rows: Terms by `sentiment_classification.keyword`
- Metrics: 
  - Count
  - Average of `sentiment_compound`
  - Min/Max of `sentiment_compound`

#### 5. Dashboard Completo de Sentimentos
Combinar:
- Pie Chart (distribui√ß√£o)
- Line Chart (temporal)
- Data Table (estat√≠sticas)
- Metric: Total de mensagens analisadas
- Filter: Por classifica√ß√£o de sentimento

### Refer√™ncias Adicionais para An√°lise de Sentimentos

1. **VADER Original:**
   - Hutto & Gilbert (2014) - [Paper ICWSM](http://comp.social.gatech.edu/papers/icwsm14.vader.hutto.pdf)

2. **Implementa√ß√£o Python:**
   - [vaderSentiment GitHub](https://github.com/cjhutto/vaderSentiment)
   - [Documentation](https://github.com/cjhutto/vaderSentiment#about-the-scoring)

3. **Alternativas:**
   - TextBlob: Mais simples, menos preciso para redes sociais
   - BERT/Transformers: Mais preciso, requer GPU e treinamento
   - NLTK: Requer corpus e mais configura√ß√£o

4. **Aplica√ß√µes em Streaming:**
   - Liu, B. (2015). Sentiment Analysis: Mining Opinions, Sentiments, and Emotions. Cambridge University Press.
   - Medhat, W., Hassan, A., & Korashy, H. (2014). Sentiment analysis algorithms and applications: A survey. Ain Shams Engineering Journal.

**Diferencial deste Trabalho:**
- Integra√ß√£o nativa com Kafka Streaming (n√£o batch)
- Indexa√ß√£o em tempo real no Elasticsearch
- Visualiza√ß√£o din√¢mica no Kibana
- Pipeline 100% em notebook (reproduz√≠vel)

In [None]:
# OPCIONAL: Executar consumer com an√°lise de sentimentos
# Descomentar para executar

# sentiment_consumer = SentimentElasticsearchConsumer(
#     KAFKA_BOOTSTRAP_SERVERS,
#     "sentiment-input",
#     es,
#     SENTIMENT_INDEX
# )

# def run_sentiment_consumer():
#     sentiment_consumer.start(duration_seconds=120, batch_size=20)

# consumer_sentiment_thread = threading.Thread(target=run_sentiment_consumer, daemon=True)
# consumer_sentiment_thread.start()

# print("‚úì Consumer de sentimentos iniciado (120s, batch=20)")

In [None]:
# OPCIONAL: Executar producer com sentimentos
# Descomentar para executar

# sentiment_producer = SentimentProducer(KAFKA_BOOTSTRAP_SERVERS, "sentiment-input")

# def run_sentiment_producer():
#     sentiment_producer.start(duration_seconds=120, messages_per_second=2)

# sentiment_thread = threading.Thread(target=run_sentiment_producer, daemon=True)
# sentiment_thread.start()

# print("‚úì Producer de sentimentos iniciado (120s, 2 msgs/seg)")

In [None]:
# OPCIONAL: Criar t√≥pico para sentimentos
# Descomentar para executar

# !docker exec kafka kafka-topics --create \
#   --bootstrap-server localhost:9092 \
#   --topic sentiment-input \
#   --partitions 3 \
#   --replication-factor 1 \
#   --if-not-exists

# print("‚úì T√≥pico 'sentiment-input' criado")

### 8.4.6 Executar Pipeline com An√°lise de Sentimentos (OPCIONAL)

**INSTRU√á√ïES:**
1. Descomentar as c√©lulas abaixo para executar
2. Ou executar diretamente se quiser testar an√°lise de sentimentos
3. Criar novo t√≥pico Kafka `sentiment-input`
4. Iniciar producer com mensagens variadas
5. Iniciar consumer com an√°lise VADER

**NOTA:** Esta √© uma extens√£o opcional. O pipeline principal (Se√ß√µes 1-8.3) j√° est√° completo.

In [None]:
# √çndice para mensagens com an√°lise de sentimentos
SENTIMENT_INDEX = "social-sentiment"

# Mapping otimizado
sentiment_mapping = {
    "mappings": {
        "properties": {
            "user": {"type": "keyword"},
            "text": {"type": "text"},
            "timestamp": {"type": "date"},
            "platform": {"type": "keyword"},
            "sentiment_classification": {"type": "keyword"},
            "sentiment_compound": {"type": "float"},
            "sentiment_pos": {"type": "float"},
            "sentiment_neu": {"type": "float"},
            "sentiment_neg": {"type": "float"},
            "indexed_at": {"type": "date"}
        }
    }
}

# Deletar √≠ndice existente
if es.indices.exists(index=SENTIMENT_INDEX):
    es.indices.delete(index=SENTIMENT_INDEX)
    print(f"  - √çndice '{SENTIMENT_INDEX}' deletado")

# Criar √≠ndice
es.indices.create(index=SENTIMENT_INDEX, body=sentiment_mapping)
print(f"‚úì √çndice '{SENTIMENT_INDEX}' criado")
print("  - Campos: user, text, timestamp, platform")
print("  - Sentimentos: classification, compound, pos, neu, neg")

### 8.4.5 Criar √çndice Elasticsearch para Sentimentos

In [None]:
class SentimentElasticsearchConsumer:
    """Consumer que analisa sentimentos e indexa no Elasticsearch"""
    
    def __init__(self, bootstrap_servers, topic, es_client, es_index):
        self.consumer = KafkaConsumer(
            topic,
            bootstrap_servers=bootstrap_servers,
            value_deserializer=lambda m: json.loads(m.decode('utf-8')),
            auto_offset_reset='latest',
            enable_auto_commit=True,
            group_id='sentiment-consumer-group'
        )
        self.es = es_client
        self.index = es_index
        self.analyzer = SentimentIntensityAnalyzer()
        self.running = False
        self.document_count = 0
        self.sentiment_stats = {"positive": 0, "neutral": 0, "negative": 0}
        
    def analyze_sentiment(self, text):
        """Analisa sentimento do texto"""
        scores = self.analyzer.polarity_scores(text)
        
        # Classificar
        if scores['compound'] >= 0.05:
            classification = "positive"
        elif scores['compound'] <= -0.05:
            classification = "negative"
        else:
            classification = "neutral"
        
        return {
            "classification": classification,
            "compound": scores['compound'],
            "positive": scores['pos'],
            "neutral": scores['neu'],
            "negative": scores['neg']
        }
    
    def start(self, duration_seconds=120, batch_size=30):
        """Inicia consumer com an√°lise de sentimentos"""
        self.running = True
        start_time = time.time()
        batch = []
        
        print(f"‚ñ∂ Consumer Sentiment+ES iniciado ({duration_seconds}s)")
        
        try:
            for message in self.consumer:
                if (time.time() - start_time) > duration_seconds:
                    break
                
                data = message.value
                text = data.get("text", "")
                
                # Analisar sentimento
                sentiment = self.analyze_sentiment(text)
                
                # Preparar documento
                doc = {
                    "_index": self.index,
                    "_source": {
                        "user": data.get("user"),
                        "text": text,
                        "timestamp": data.get("timestamp"),
                        "platform": data.get("platform"),
                        "sentiment_classification": sentiment["classification"],
                        "sentiment_compound": sentiment["compound"],
                        "sentiment_pos": sentiment["positive"],
                        "sentiment_neu": sentiment["neutral"],
                        "sentiment_neg": sentiment["negative"],
                        "indexed_at": datetime.now().isoformat()
                    }
                }
                
                batch.append(doc)
                self.document_count += 1
                self.sentiment_stats[sentiment["classification"]] += 1
                
                # Indexar em lote
                if len(batch) >= batch_size:
                    success, _ = bulk(self.es, batch)
                    print(f"  [{self.document_count}] Indexados {success} docs | "
                          f"Pos: {self.sentiment_stats['positive']} | "
                          f"Neu: {self.sentiment_stats['neutral']} | "
                          f"Neg: {self.sentiment_stats['negative']}")
                    batch = []
            
            # Indexar documentos restantes
            if batch:
                success, _ = bulk(self.es, batch)
                print(f"  [Final] Indexados {success} documentos")
        
        except KeyboardInterrupt:
            print("\n‚è∏ Consumer interrompido")
        finally:
            self.stop()
    
    def stop(self):
        """Para consumer"""
        self.running = False
        self.consumer.close()
        print(f"‚ñ† Consumer parado. Total: {self.document_count} docs")
        print(f"  Distribui√ß√£o: Positivo={self.sentiment_stats['positive']}, "
              f"Neutro={self.sentiment_stats['neutral']}, "
              f"Negativo={self.sentiment_stats['negative']}")

print("‚úì Consumer com an√°lise de sentimentos criado")

### 8.4.4 Consumer com An√°lise de Sentimentos + Elasticsearch

In [None]:
# Dataset expandido com mensagens de sentimento variado
SENTIMENT_MESSAGES = [
    # Positivas
    "I absolutely love working with Spark Streaming! It's amazing and powerful!",
    "Kafka integration is fantastic and makes everything so much easier",
    "This distributed system is incredibly efficient and reliable",
    "Great tools for big data processing, highly recommended!",
    "Excellent performance with Elasticsearch indexing, very impressed",
    
    # Neutras
    "Apache Spark processes data using structured streaming",
    "Kafka is a distributed message broker for event streaming",
    "Elasticsearch provides indexing and search capabilities",
    "The pipeline consists of producer consumer and streaming components",
    "Docker containers run the infrastructure services",
    
    # Negativas
    "Configuration is frustrating and takes too much time",
    "Terrible documentation, very difficult to understand",
    "Performance is poor and disappointing with large datasets",
    "This setup is awful and causes many problems",
    "I hate dealing with version compatibility issues",
    
    # Mistas
    "Spark is powerful but the learning curve is steep and challenging",
    "Good results but the process is slow and requires patience",
    "Effective solution despite some annoying configuration issues"
]

class SentimentProducer(SocialMediaProducer):
    """Producer que envia mensagens com sentimentos variados"""
    
    def generate_message(self):
        """Gera mensagem do dataset de sentimentos"""
        return {
            "user": random.choice(SAMPLE_USERS),
            "text": random.choice(SENTIMENT_MESSAGES),
            "timestamp": datetime.now().isoformat(),
            "platform": "SimulatedSocial",
            "message_id": self.message_count
        }

print("‚úì Producer com mensagens de sentimento variado criado")
print(f"  - Total de templates: {len(SENTIMENT_MESSAGES)}")
print(f"  - Positivas: ~6 | Neutras: ~5 | Negativas: ~5 | Mistas: ~3")

### 8.4.3 Producer com Mensagens Variadas (Sentimento Misto)

In [None]:
from vaderSentiment.vaderSentiment import SentimentIntensityAnalyzer

# Inicializar analisador
analyzer = SentimentIntensityAnalyzer()

# Exemplos de teste
test_messages = [
    "Apache Spark is amazing for big data processing!",
    "Learning distributed systems is challenging but rewarding",
    "This pipeline is terrible and doesn't work at all",
    "Kafka provides reliable message streaming capabilities",
    "I hate dealing with configuration files"
]

print("=== Teste de An√°lise de Sentimentos ===\n")

for msg in test_messages:
    scores = analyzer.polarity_scores(msg)
    
    # Classificar baseado no score composto
    if scores['compound'] >= 0.05:
        sentiment = "POSITIVO üòä"
    elif scores['compound'] <= -0.05:
        sentiment = "NEGATIVO üòû"
    else:
        sentiment = "NEUTRO üòê"
    
    print(f"Mensagem: {msg[:60]}...")
    print(f"  Compound: {scores['compound']:.3f}")
    print(f"  Positivo: {scores['pos']:.3f} | Neutro: {scores['neu']:.3f} | Negativo: {scores['neg']:.3f}")
    print(f"  ‚Üí Classifica√ß√£o: {sentiment}\n")

### 8.4.2 Teste de An√°lise de Sentimentos

In [None]:
!pip install -q vaderSentiment

print("‚úì VADER Sentiment instalado")
print("  - Biblioteca: vaderSentiment")
print("  - Modelo: L√©xico pr√©-treinado para redes sociais")

### 8.4.1 Instala√ß√£o VADER

---

## 8.4 Extens√£o Opcional: An√°lise de Sentimentos (ML)

**NOTA:** Esta se√ß√£o √© opcional e demonstra como enriquecer o pipeline com an√°lise de sentimentos usando Machine Learning.

### Por que An√°lise de Sentimentos?

An√°lise de sentimentos permite:
- Classificar mensagens como positivas, neutras ou negativas
- Identificar tend√™ncias de sentimento em tempo real
- Gerar alertas para sentimentos extremos
- Complementar an√°lise de word count com contexto emocional

### Biblioteca Utilizada: VADER

**VADER (Valence Aware Dictionary and sEntiment Reasoner)**

- Desenvolvido especificamente para textos de redes sociais
- N√£o requer treinamento (modelo l√©xico pr√©-constru√≠do)
- Considera contexto (nega√ß√£o, intensificadores, pontua√ß√£o)
- Retorna scores: positivo, negativo, neutro e composto

**Refer√™ncia Principal:**
> Hutto, C.J. & Gilbert, E.E. (2014). VADER: A Parsimonious Rule-based Model for Sentiment Analysis of Social Media Text. Eighth International Conference on Weblogs and Social Media (ICWSM-14). Ann Arbor, MI, June 2014.
> 
> [Paper Original](http://comp.social.gatech.edu/papers/icwsm14.vader.hutto.pdf)

**Diferencial deste Trabalho:**
- Integra√ß√£o com pipeline Spark Streaming (n√£o batch)
- Indexa√ß√£o simult√¢nea de word count + sentimentos no Elasticsearch
- Visualiza√ß√£o de sentimentos no Kibana em tempo real
- Implementa√ß√£o autocontida no notebook

---

## 9. Conclus√µes e Observa√ß√µes

### 9.1 Objetivos Alcan√ßados

‚úÖ **Entrada via Kafka:**
- Producer simulando mensagens de rede social implementado
- Justificativa documentada para n√£o uso de Discord (OAuth, conex√£o persistente, reprodutibilidade)
- Alternativa baseada em gera√ß√£o sint√©tica de dados com caracter√≠sticas realistas

‚úÖ **Pipeline Spark Structured Streaming:**
- Leitura de stream do Kafka com deserializa√ß√£o JSON
- Processamento de word count com janelas temporais (30s/10s slide)
- Watermark configurado para lidar com eventos atrasados
- Publica√ß√£o de resultados em t√≥pico Kafka de sa√≠da

‚úÖ **Integra√ß√£o com Elasticsearch:**
- √çndice criado com mapping otimizado
- Consumer Kafka indexando resultados em tempo real
- Bulk indexing para performance

‚úÖ **Visualiza√ß√£o no Kibana:**
- Instru√ß√µes detalhadas para cria√ß√£o de Tag Cloud
- Alternativas documentadas (Bar Chart, Data Table, Treemap)
- Dashboard com m√∫ltiplas visualiza√ß√µes sugeridas

‚úÖ **Execu√ß√£o Autocontida:**
- **100% das opera√ß√µes executadas em c√©lulas do notebook**
- Nenhuma depend√™ncia de scripts externos
- Infraestrutura gerenciada via docker-compose

### 9.2 Arquitetura Implementada

```
[Producer Python]  ‚Üí  [Kafka: social-input]  ‚Üí  [Spark Streaming]  ‚Üí  [Kafka: wordcount-output]
                                                        ‚Üì
                                                  [Console Debug]
                                                        
[ES Consumer]  ‚Üê  [Kafka: wordcount-output]
      ‚Üì
[Elasticsearch: wordcount-realtime index]
      ‚Üì
[Kibana Dashboard: Tag Cloud + Metrics]
```

### 9.3 Par√¢metros de Configura√ß√£o

| Componente | Par√¢metro | Valor |
|------------|-----------|-------|
| Producer | Taxa de mensagens | 3 msgs/seg |
| Producer | Dura√ß√£o | 180 segundos |
| Spark | Janela temporal | 30 segundos |
| Spark | Slide | 10 segundos |
| Spark | Watermark | 1 minuto |
| Kafka | Parti√ß√µes (input) | 3 |
| Kafka | Parti√ß√µes (output) | 3 |
| ES Consumer | Batch size | 30 documentos |

### 9.4 Melhorias Poss√≠veis

- **An√°lise de Sentimentos:** Integrar modelo VADER ou TextBlob para classificar mensagens
- **Filtro de Stop Words:** Remover palavras comuns (the, and, is) para resultados mais relevantes
- **Agrega√ß√µes M√∫ltiplas:** Word count por usu√°rio, por per√≠odo do dia, etc.
- **Alertas:** Configurar alertas no Kibana para palavras espec√≠ficas
- **Persist√™ncia:** Kafka com replica√ß√£o para toler√¢ncia a falhas

### 9.5 Refer√™ncias

- [Apache Spark Structured Streaming Guide](https://spark.apache.org/docs/latest/structured-streaming-programming-guide.html)
- [Kafka Documentation](https://kafka.apache.org/documentation/)
- [Elasticsearch Python Client](https://elasticsearch-py.readthedocs.io/)
- [Kibana Visualizations](https://www.elastic.co/guide/en/kibana/current/dashboard.html)

---

**Notebook criado por:** Edilberto Cantuaria  
**Data:** 29 de Novembro de 2025  
**Disciplina:** PSPD - UnB