# **Laboratorio sobre Spark**

# **Alunos:**
- Arthur Taylor de Jesus Popov (190084642)
- João Lucas Pinto Vasconcelos (190089601)
- Pablo Guilherme de J B Silva(200025791)
- Pedro Lucas Siqueira Fernandes(190115564)



## Inicialização

In [None]:
# Instalando a biblioteca que permite copiar conteúdos do Gdrive compartilhado do professor
%pip install gdown

In [None]:
# Copiando a pasta de laboratório (material do professor) para o contexto do aluno
import gdown
url = 'https://drive.google.com/drive/folders/1z_l8RO6YYwjLdPrMBtnSNpfzfznJt1ja'
gdown.download_folder(url)

## Definição de variaveis e instalação

In [None]:
# Definindo as variáveis de ambiente do Spark 
import os
# Variáveis gerais
os.environ['JAVA_HOME']="/usr/lib/jvm/java-11-openjdk-amd64" # readlink -f /usr/bin/javac
os.environ['BASHRC_PATH']= "/root/.bashrc"
# Variáveis específicas do Spark
os.environ['SPARK_INSTALL_DIR']="/content"
os.environ['SPARK_HOME']="/content/spark"
os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages org.apache.spark:spark-streaming-kafka-0-10_2.12:3.5.0,org.apache.spark:spark-sql-kafka-0-10_2.12:3.5.0 pyspark-shell'

In [None]:
# Copiando os fonte do hadoop para a pasta $SPARK_INSTALL_DIR
!wget https://downloads.apache.org/spark/spark-3.5.4/spark-3.5.4-bin-hadoop3.tgz -P $SPARK_INSTALL_DIR

In [None]:
# Descompactando os arquivos do hadoop na pasta $SPARK_INSTALL_DIR
!tar -xvzf $SPARK_INSTALL_DIR/spark-3.5.4-bin-hadoop3.tgz -C $SPARK_INSTALL_DIR
!mv $SPARK_INSTALL_DIR/spark-3.5.4-bin-hadoop3 $SPARK_INSTALL_DIR/spark
!rm $SPARK_INSTALL_DIR/spark-3.5.4-bin-hadoop3.tgz

## Ativando servidor Spark

In [None]:
# Iniciando os processos NameNode e DataNode, daemons do HDFS
!$SPARK_HOME/sbin/start-master.sh --host localhost --port 7077

In [None]:
# Iniciando os processos relativos ao gerenciador de recursos YARN
!$SPARK_HOME/sbin/start-worker.sh spark://localhost:7077

In [None]:
%pip install pyspark
#!$SPARK_HOME/bin/pyspark --master spark://localhost:7077

## Instalando o KAFKA

In [None]:
# Instalando o kafka python
%pip install kafka-python

In [None]:
# Fazendo os imports de Producer e Consumer do Kafka
from kafka import KafkaProducer
from kafka import KafkaConsumer

In [None]:
# Fazendo download do binário do kafka
!curl -sSOL https://dlcdn.apache.org/kafka/3.8.0/kafka_2.13-3.8.0.tgz

In [None]:
# Descompactando o kafka e criando um link para a pasta do kafka
!tar xvfz kafka_2.13-3.8.0.tgz
!ln -s kafka_2.13-3.8.0 kafka
!rm kafka_2.13-3.8.0.tgz

In [None]:
# Ativando os daemons do kafka...
!./kafka/bin/zookeeper-server-start.sh -daemon ./kafka/config/zookeeper.properties
!./kafka/bin/kafka-server-start.sh -daemon ./kafka/config/server.properties

In [None]:
#Criando canais kafka
!./kafka/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic canalinput
!./kafka/bin/kafka-topics.sh --create --bootstrap-server localhost:9092 --replication-factor 1 --partitions 1 --topic canaloutput

In [None]:
%pip install confluent_kafka

## 🔗 Conectando à API do Twitter
Esta seção configura a conexão com a API do Twitter usando `tweepy`.

In [None]:
%pip install tweepy

import tweepy

# Substitua pelas suas credenciais do Twitter
API_KEY = "SUA_API_KEY"
API_SECRET = "SUA_API_SECRET"
ACCESS_TOKEN = "SEU_ACCESS_TOKEN"
ACCESS_SECRET = "SEU_ACCESS_SECRET"

auth = tweepy.OAuthHandler(API_KEY, API_SECRET)
auth.set_access_token(ACCESS_TOKEN, ACCESS_SECRET)
api = tweepy.API(auth, wait_on_rate_limit=True)

# Teste de autenticação
user = api.verify_credentials()
print(f"Autenticado como: {user.screen_name}")

## 📡 Capturando Tweets em Tempo Real e Enviando para Kafka
Esta seção captura tweets sobre tecnologia e os envia para um tópico no Kafka.

In [None]:
%pip install confluent_kafka

from confluent_kafka import Producer
import json

producer = Producer({"bootstrap.servers": "localhost:9092"})

class StreamListener(tweepy.Stream):
    def on_status(self, status):
        if status.lang == "pt":  # Apenas tweets em português
            tweet_data = {
                "user": status.user.screen_name,
                "text": status.text,
                "created_at": str(status.created_at)
            }
            producer.produce("tweets_stream", json.dumps(tweet_data))
            print("Tweet enviado para Kafka:", tweet_data)

listener = StreamListener(API_KEY, API_SECRET, ACCESS_TOKEN, ACCESS_SECRET)
listener.filter(track=["tecnologia", "inteligência artificial"])

## 🧠 Processamento com IA (NLP) para Extração de Palavras-Chave
Usamos `spaCy` para processar os tweets e extrair palavras relevantes.

In [None]:
%pip install spacy
!python -m spacy download pt_core_news_sm

import spacy
from pyspark.sql.functions import udf
from pyspark.sql.types import StringType

nlp = spacy.load("pt_core_news_sm")

def process_tweet(text):
    doc = nlp(text)
    palavras_chave = [token.lemma_ for token in doc if token.is_alpha and not token.is_stop]
    return " ".join(palavras_chave)

process_tweet_udf = udf(process_tweet, StringType())

df = df.withColumn("keywords", process_tweet_udf(df.text))
df.show()

## 📤 Enviando os Dados Processados para o Elasticsearch
Os tweets processados serão armazenados no Elasticsearch para visualização no Kibana.

In [None]:
%pip install elasticsearch

from elasticsearch import Elasticsearch

es = Elasticsearch("http://localhost:9200")

for row in df.collect():
    doc = {
        "user": row.user,
        "text": row.text,
        "keywords": row.keywords,
        "created_at": row.created_at
    }
    es.index(index="tweets", body=doc)