Acesse http://localhost:4040 sempre que quiser acompanhar a execução dos jobs

## Spark Streaming - Twitter

### Author: Leandro Correa; Versão: 1.0.0-beta; Update: 01-02-2022

In [1]:
# Pode ser necessário instalar esses pacotes
#!pip install requests_oauthlib
#!pip install twython
#!pip install nltk

In [2]:
# Módulos usados
from pyspark.streaming import StreamingContext
from pyspark import SparkContext
from requests_oauthlib import OAuth1Session
from operator import add
import requests_oauthlib
from time import gmtime, strftime
import requests
import time
import string
import ast
import json
import pandas as pd
from sklearn.metrics import precision_score, recall_score, f1_score, accuracy_score, roc_auc_score
from unicodedata import normalize 
import re
from pathlib import Path

In [3]:
# Pacote NLTK
import nltk
from nltk.classify import NaiveBayesClassifier
from nltk.sentiment import SentimentAnalyzer
from nltk.corpus import subjectivity
from nltk.corpus import stopwords
from nltk.sentiment.util import *

## Setando variáveis de controle do streaming

In [4]:
# Frequência de update do spark
INTERVALO_BATCH = 5

# Numero de arquivos contendo análises salvos em cada pasta
# Cada arquivo contém duas análises
UPDATE = 5

# Total de tweets por análise 
# Numeros muito altos (acima de 1500) podem fazer com que o Twitter interrompa o serviço na conta Free)
NUM_TWEETS = 500

# Termo de busca desejado
SEARCH_TERM = 'AVON'

# Horário final de execução
FINAL_TIME = '2022-02-11-11-30-00'


In [5]:
# Ajustuando PATHS de input e output
PATH = re.sub("notebooks","",os.getcwd())

# Setando conunto de treino, teste, onde o termo e os resultados firacão armazenados
#DATASET = PATH + "data/train/dataset_analise_sentimento_ptbr.csv"
DATASET = PATH + "data/train/train_sentimento_ptbr_theme.csv"
#TEST = PATH + 'data/test/test_sentimento_ptbr.csv'
TEST = PATH + "data/test/teste_sentimento_ptbr_theme.csv"
TERM = PATH + 'data/term/output.txt'
RDD_ADDRESS = PATH + "data/record/r"
OUTPUT_DATA = PATH + "data/tweeters/t"


output ={'search_term': SEARCH_TERM}
with open(TERM, 'w') as outfile:
    json.dump(output, outfile)

In [6]:
# Criando o StreamingContext
ssc = StreamingContext(sc, INTERVALO_BATCH)

## Treinando o Classificador de Análise de Sentimento

Tomaremos como base para treino o dataset obtido a partir da competição do Kaggle -> https://www.kaggle.com/augustop/portuguese-tweets-for-sentiment-analysis


Esse dataset contém 500.000 tweets classificados e cada linha é marcada como: 

### 1 para o sentimento positivo 
### 0 para o sentimento negativo 

In [7]:
# Lendo o arquivo texto e criando um RDD em memória com Spark
arquivo = sc.textFile(DATASET)


In [8]:
# Removendo o cabeçalho
header = arquivo.take(1)[0]
dataset = arquivo.filter(lambda line: line != header)

In [9]:
# Essa função separa as colunas em cada linha, cria uma tupla e remove a pontuação.
def get_row(line):
  row = line.split(',')
  sentimento = row[1]
  tweet = row[3].strip()
  translator = str.maketrans({key: None for key in string.punctuation})
  tweet = tweet.translate(translator)
  tweet = tweet.split(' ')
  tweet_lower = []
  for word in tweet:
    tweet_lower.append(word.lower())
  
  tweet_lower = [re.sub("(.)\\1{"+str(2)+",}", '', w) for w in tweet_lower] 
  tweet_lower = [normalize('NFKD', w).encode('ASCII', 'ignore').decode('ASCII') for w in tweet_lower]
  
  return (tweet_lower, sentimento)

In [10]:
# Aplica a função a cada linha do dataset
dataset_treino = dataset.map(lambda line: get_row(line))

In [11]:
# Cria um objeto SentimentAnalyzer 
sentiment_analyzer = SentimentAnalyzer()

In [12]:
# Certifique-se de ter espaço em disco - Aproximadamente 5GB
# https://raw.githubusercontent.com/nltk/nltk_data/gh-pages/index.xml
# nltk.download()
nltk.download("stopwords")

[nltk_data] Downloading package stopwords to
[nltk_data]     /home/leandro/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


True

In [13]:
# Obtém a lista de stopwords em Inglês 
stopwords_all = []
#for word in stopwords.words('english'):
stopwords = stopwords.words('portuguese')
stopwords.extend(['pra', 'pro', 'muito', 'muita', 'pq', 'ai', 'coisa', 'tipo'])

for word in stopwords:
  stopwords_all.append(word)
  stopwords_all.append(word + '_NEG')

In [14]:
# Obtém 50.000 tweets do dataset de treino 
dataset_treino_amostra = dataset_treino.take(50000)

In [15]:
# retorna todas as palavras que não são stopwords
all_words_neg = sentiment_analyzer.all_words([mark_negation(doc) for doc in dataset_treino_amostra])
all_words_neg_nostops = [x for x in all_words_neg if x not in stopwords_all]

In [16]:
# Cria um unigram (n-grama) e extrai as features
unigram_feats = sentiment_analyzer.unigram_word_feats(all_words_neg_nostops, top_n = 200)
sentiment_analyzer.add_feat_extractor(extract_unigram_feats, unigrams = unigram_feats)
training_set = sentiment_analyzer.apply_features(dataset_treino_amostra)

In [17]:
# Treinar o modelo
trainer = NaiveBayesClassifier.train
classifier = sentiment_analyzer.train(trainer, training_set)

Training classifier


## Testando o classificador

In [18]:
# Obtendo o conjunto de teste
test_dataset = pd.read_csv(TEST)
predictions = []

for tweet in test_dataset.SentimentText:
    translator = str.maketrans({key: None for key in string.punctuation})
    tweet = tweet.translate(translator)
    tweet = tweet.split(' ')
    tweet_lower = []
    for word in tweet:
        if word != '':
            tweet_lower.append(word.lower())
    
    #r = '(.*).com+|\W+|_+|[0-9]+|(http|www)\S+|(http|www)' # 0.70
    #r = '\W+|_+|[0-9]+' # 0.712
    #tweet_lower = [w for w in tweet_lower if len(w) > 2] # 0.68
    
    tweet_lower = [re.sub("(.)\\1{"+str(2)+",}", '', w) for w in tweet_lower]
    tweet_lower = [normalize('NFKD', w).encode('ASCII', 'ignore').decode('ASCII') for w in tweet_lower]

    test_set = sentiment_analyzer.apply_features([([tweet_lower,''])])
    
    y = classifier.classify(test_set[0][0])
    predictions.append(int(y.replace('"', '')))

In [19]:
# Escrevendo os resultados da acurácia da previsão
y_teste = [y for y in test_dataset.Sentiment]
dict_previsions = {'Modelo':'NaiveBayes',
                   'Versão':'1',
                   'Precision':precision_score(predictions, y_teste),
                   'Recall':recall_score(predictions, y_teste),
                   'F1 Score':f1_score(predictions, y_teste),
                   'Acurácia':accuracy_score(predictions, y_teste),
                   'AUC':roc_auc_score(y_teste, predictions)}
dict_previsions

{'Modelo': 'NaiveBayes',
 'Versão': '1',
 'Precision': 0.852,
 'Recall': 0.613479262672811,
 'F1 Score': 0.7133288680509042,
 'Acurácia': 0.6576,
 'AUC': 0.6576}

## Integrando com o Twitter


In [20]:
# Autenticação do Twitter 
consumer_key = "#########################"
consumer_secret = "#########################"
access_token = "#########################"
access_token_secret = "#########################"

In [21]:
# Especifica a URL termo de busca
sample_url = 'https://stream.twitter.com/1.1/statuses/sample.json'
filter_url = 'https://stream.twitter.com/1.1/statuses/filter.json?track='+SEARCH_TERM
#callback_url = "http://127.0.0.1:8080"

In [22]:
# Criando o objeto de atutenticação para o Twitter
auth = requests_oauthlib.OAuth1(consumer_key, consumer_secret, access_token, access_token_secret)

In [23]:
# Configurando o Stream
rdd = ssc.sparkContext.parallelize([0])
stream = ssc.queueStream([], default = rdd)

In [24]:
# Essa função conecta ao Twitter e retorna um número específico de Tweets (NUM_TWEETS)
def tfunc(t, rdd):
  return rdd.flatMap(lambda x: stream_twitter_data())

def stream_twitter_data():
  response = requests.get(filter_url, auth = auth, stream = True)
  print(filter_url, response)
  count = 0
  for line in response.iter_lines():
    try:
      if count > NUM_TWEETS:
        break
      post = json.loads(line.decode('utf-8'))
      contents = [post['text']]
      count += 1
      yield str(contents)
    except:
      result = False

In [25]:
stream = stream.transform(tfunc)

In [26]:
coord_stream = stream.map(lambda line: ast.literal_eval(line))

In [27]:
# Essa função classifica os tweets, aplicando as features do modelo criado anteriormente
def classifica_tweet(tweet):
  sentence = [(tweet, '')]
  test_set = sentiment_analyzer.apply_features(sentence)
  print(tweet, classifier.classify(test_set[0][0]))
  return(tweet, classifier.classify(test_set[0][0]))

In [28]:
# Essa função retorna o texto do Twitter
def get_tweet_text(rdd):
  for line in rdd:
    tweet = line.strip()
    translator = str.maketrans({key: None for key in string.punctuation})
    tweet = tweet.translate(translator)
    tweet = tweet.split(' ')
    tweet_lower = []
    for word in tweet:
      tweet_lower.append(word.lower())
    
    tweet_lower = [re.sub("(.)\\1{"+str(2)+",}", '', w) for w in tweet_lower]
    tweet_lower = [normalize('NFKD', w).encode('ASCII', 'ignore').decode('ASCII') for w in tweet_lower]
    
    return(classifica_tweet(tweet_lower))

In [29]:
# Essa função grava os twitters em um arquivo
def record_twitter_text(rdd):
    list_tweeters = [] 
    for line in rdd:
      tweet = line.strip()
      translator = str.maketrans({key: None for key in string.punctuation})
      tweet = tweet.translate(translator)
      tweet = tweet.split(' ')
      tweet_lower = []
      for word in tweet:
        tweet_lower.append(word.lower())
      
      tweet_lower = [re.sub("(.)\\1{"+str(2)+",}", '', w) for w in tweet_lower]
      tweet_lower = [normalize('NFKD', w).encode('ASCII', 'ignore').decode('ASCII') for w in tweet_lower]
      
      list_tweeters.append(tweet_lower)
    
    return(list_tweeters)    
    

In [30]:
# Cria uma lista vazia para os resultados
resultados = []
tweet_list = []

In [31]:
import pdb

# Essa função salva o resultado dos batches de Tweets junto com o timestamp
def output_rdd(rdd):
  global resultados
  global tweet_list
  
  pairs = rdd.map(lambda x: (get_tweet_text(x)[1],1))
  
  counts = pairs.reduceByKey(add)
  output = []
  for count in counts.collect():
    output.append(count)
    
  result = [time.strftime("%d:%m:%I:%M:%S"), output]
  resultados.append(result)
    
  tweets = rdd.map(lambda x: get_tweet_text(x))
  output = []
  for count in tweets.collect():
    output.append(count)
    
  tweet_list.append(output)

  print(result)
  print(tweet_list)

In [32]:
# A função foreachRDD() aplica uma função a cada RDD to streaming de dados
coord_stream.foreachRDD(lambda t, rdd: output_rdd(rdd))

## Startando o streaming e processando as análises

In [33]:
# Start streaming
ssc.start()
# ssc.awaitTermination()

In [34]:
# Obtendo os resultados e salvando em disco
time_now = time.strftime("%Y-%m-%d-%H-%M-%S")

while time_now < FINAL_TIME:
    RDD_SAVE = RDD_ADDRESS + time.strftime("%I%M%S")+'/'
    RDD_TWEET = OUTPUT_DATA + time.strftime("%I%M%S")+'/'
    cont = True
    while cont:
      if len(resultados) > UPDATE:
        cont = False

    # Grava os resultados
    resultados_rdd = sc.parallelize(resultados)
    resultados_rdd.saveAsTextFile(RDD_SAVE)
    
    tweet_list_rdd = sc.parallelize(tweet_list)
    tweet_list_rdd.saveAsTextFile(RDD_TWEET)

    # Visualiza os resultados
    resultados_rdd.collect()
    tweet_list_rdd.collect()
    
    resultados = []
    tweet_list = []

    time_now = time.strftime("%Y-%m-%d-%H-%M-%S")

['10:02:11:06:51', []]
[[]]
['10:02:11:12:10', [('1', 433), ('0', 68)]]
[[], [(['cheirar', 'revistinha', 'da', 'avon', 'sem', 'duvidas', 'foi', 'meu', 'primeiro', 'vicio'], '1'), (['avon', 'me', 'lembra', 'minha', 'avo', ''], '1'), (['rt', 'centralreaiity', 'to', 'passada', 'chocada', '22'], '0'), (['rt', 'danbarbosa', 'e', 'hj', 'a', 'prova', 'do', 'lider', 'e', 'com', 'aquela', 'marca', 'la', 'avon\n\ntadeu', 'schimidt', '\n22', 'provadolider', 'httpstcofobggkrd1w'], '1'), (['bchartsnet', 'avon', 'tapete', 'da', 'natura', 'avisa', 'bc'], '1'), (['avon', 'colocou', 'o', 'cheiro', 'na', 'parede', '\nagr', 'ela', 'passou', 'dos', 'limites\n22'], '1'), (['ninguem', 'deu', 'moral', 'pro', 'perfume', 'da', 'avon', 'poxa', '22'], '0'), (['rt', 'tracklist', 'tadeu', 'falando', 'aquela', 'marca', 'la', 'para', 'falar', 'da', 'avon', '', '22', 'httpstco3tiusg4win'], '1'), (['a', 'publi', 'de', 'milhoes', 'que', 'tadeu', 'pediu', 'pra', 'nat', 'agr', 'avon', 'de', 'oportunidade', 'a', 'quem', '

KeyboardInterrupt: 

In [None]:
# Visualiza os resultados
resultados_rdd.collect()
#tweet_list_rdd.collect()

In [None]:
# Finaliza o streaming
ssc.stop()

# Fim