## Mini Projeto 2 - Spark Streaming - Twitter

In [None]:
# Pode ser necessário instalar esses pacotes
#!pip install requests_oauthlib # Pacote de autenticação necessário
#!pip install twython
#!pip install nltk

In [1]:
# Módulos usados
from pyspark.streaming import StreamingContext
from pyspark import SparkContext
from requests_oauthlib import OAuth1Session
from operator import add
import requests_oauthlib
from time import gmtime, strftime
import requests
import time
import string
import ast
import json

In [2]:
# Pacote NLTK
import nltk
from nltk.classify import NaiveBayesClassifier
from nltk.sentiment import SentimentAnalyzer
from nltk.corpus import subjectivity
from nltk.corpus import stopwords
from nltk.sentiment.util import *

In [3]:
# Frequência de update
# Intervalo com o qual vamos capturar os batchs
INTERVALO_BATCH = 5

In [4]:
# Criando o StreamingContext
ssc = StreamingContext(sc, INTERVALO_BATCH)

## Treinando o Classificador de Análise de Sentimento

Uma parte essencial da criação de um algoritmo de análise de sentimento (ou qualquer algoritmo de mineração de dados) é ter um conjunto de dados abrangente ou "Corpus" para o aprendizado, bem como um conjunto de dados de teste para garantir que a precisão do seu algoritmo atende aos padrões que você espera. Isso também permitirá que você ajuste o seu algoritmo a fim de deduzir melhores (ou mais precisas) características de linguagem natural que você poderia extrair do texto e que vão contribuir para a classificação de sentimento, em vez de usar uma abordagem genérica. Tomaremos como base o dataset de treino fornecido pela Universidade de Michigan, para competições do Kaggle --> https://inclass.kaggle.com/c/si650winter11.

Esse dataset contém 1,578,627 tweets classificados e cada linha é marcada como: 

### 1 para o sentimento positivo 
### 0 para o sentimento negativo 

O dataset dataset_analise_sentimento.csv fornecido no Kaggle está sendo utilizado como "dados já treinados" que será utilizado durante o treinamento supervisionado, ou seja, quando existe o output.

O Algoritmo que será implementado será o NaiveBayes, que é um algoritmo utilizado para resolver problemas de classificação.

In [5]:
# Lendo o arquivo texto e criando um RDD em memória com Spark
arquivo = sc.textFile("~/dataset_analise_sentimento.csv")

In [6]:
# Removendo o cabeçalho
# arquivo.take(1) -> Retorna a primeira Linha do dataset
# [0] -> Utilizado para selecionar o resultado do index 0, para que o retorno não seja como Lista
header = arquivo.take(1)[0] 
# Aplica o filtro retornando todos registros EXCETO o cabeçalho
dataset = arquivo.filter(lambda line: line != header)

In [7]:
type(dataset)

pyspark.rdd.PipelinedRDD

pyspark.rdd.PipelinedRDD

In [8]:
# Limpeza de dados
# Essa função separa as colunas em cada linha, cria uma tupla e remove a pontuação.
def get_row(line):
    row = line.split(',')
    sentimento = row[1] # Seleciona a linha de index 1, que representa o sentimento (0 ou 1) do RDD importado
    
    tweet = row[3].strip() # Remover os espaços em branco no inicio e fim (Semelhando a operação TRIM)
    translator = str.maketrans({key: None for key in string.punctuation}) # Remove as pontuações do tweet
    tweet = tweet.translate(translator) # Realiza a operação definida 

    tweet = tweet.split(' ') # Realiza um segundo split para cada espaço em branco encontrado
    tweet_lower = []

    # Para cada registro da Lista tweet gerada, obtem-se o registro e realiza a transformação para lower-case
    for i in tweet:
        tweet_lower.append(i.lower())
        
    return (tweet_lower, sentimento)

In [9]:
# Aplica a função a cada linha do dataset e cria-se o dataset de treino
dataset_treino = dataset.map(lambda line: get_row(line))

In [10]:
# Cria um objeto SentimentAnalyzer 
# Ferramenta implementada e utilizada para facilitar operações de Análise de Sentimento utilizando o pacote NLTK.
sentiment_analyzer = SentimentAnalyzer()

In [11]:
# Certifique-se de ter espaço em disco - Aproximadamente 5GB para realizar o Download total do nltk
# https://raw.githubusercontent.com/nltk/nltk_data/gh-pages/index.xml
# nltk.download()
nltk.download("stopwords") # stopwords são Palavras com muita frequencia sem relevância já fornecida pelo pacote NLTK.

[nltk_data] Downloading package stopwords to
[nltk_data]     C:\Users\rafael.rampineli\AppData\Roaming\nltk_data...
[nltk_data]   Package stopwords is already up-to-date!
[nltk_data] Downloading package stopwords to
[nltk_data]     C:\Users\rafael.rampineli\AppData\Roaming\nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


True

True

In [12]:
# Obtém a lista de stopwords em Inglês pois o dataset fornecido pela Universidade de Michigan possui tweets coletados em ingles.
stopwords_all = []
for word in stopwords.words('english'):    
    stopwords_all.append(word)
    stopwords_all.append(word + '_NEG') # Séra utilizado para remover as palavras classificadas como _NEG pelo mark_negation

In [13]:
# Obtém 10.000 tweets do dataset de treino e retorna todas as palavras que não são stopwords
dataset_treino_amostra = dataset_treino.take(10000)

In [14]:
# mark_negation -> Quando identifica uma palavra negativa, adiciona _NEG as palavras seguintes até que uma pontuação ocorra.
# https://www.nltk.org/api/nltk.sentiment.html
words_analyzed = sentiment_analyzer.all_words([mark_negation(doc) for doc in dataset_treino_amostra])

# Selecionando somente as palavras obtidas no words_analyzed que não representam stopwords classificadas anteriormente.
words_analyzed_not_stopwords = [x for x in words_analyzed if x not in stopwords_all]

In [15]:
# Cria um unigram (n-grama) e extrai as features
# n-grama -> Sequencia continua de itens (fonemas, silabas, levras, palavras...)
unigram_feats = sentiment_analyzer.unigram_word_feats(words_analyzed_not_stopwords, top_n = 200)

sentiment_analyzer.add_feat_extractor(extract_unigram_feats, unigrams = unigram_feats)
training_set = sentiment_analyzer.apply_features(dataset_treino_amostra)

In [None]:
type(training_set)

In [None]:
print(training_set)

In [16]:
# Treinar o modelo
trainer = NaiveBayesClassifier.train
classifier = sentiment_analyzer.train(trainer, training_set)

Training classifier
Training classifier


In [17]:
# Testa o classificador em algumas sentenças
test_sentence1 = [(['this', 'program', 'is', 'bad'], '')]
test_sentence2 = [(['tough', 'day', 'at', 'work', 'today'], '')]
test_sentence3 = [(['good', 'wonderful', 'amazing', 'awesome'], '')]
test_set = sentiment_analyzer.apply_features(test_sentence1)
test_set2 = sentiment_analyzer.apply_features(test_sentence2)
test_set3 = sentiment_analyzer.apply_features(test_sentence3)

In [None]:
print(test_set)

In [18]:
# Autenticação do Twitter 
consumer_key = "XXXX"
consumer_secret = "XXXX"
access_token = "XXXX"
access_token_secret = "XXXX"

In [19]:
# Especifica a URL termo de busca e conecta ao stream do próprio Twitter
search_term = 'BigData'
sample_url = 'https://stream.twitter.com/1.1/statuses/sample.json'
filter_url = 'https://stream.twitter.com/1.1/statuses/filter.json?track='+search_term

In [20]:
# Criando o objeto de atutenticação para o Twitter
auth = requests_oauthlib.OAuth1(consumer_key, consumer_secret, access_token, access_token_secret)

In [21]:
# Configurando o Stream
rdd = ssc.sparkContext.parallelize([0])
stream = ssc.queueStream([], default = rdd)

In [None]:
type(stream)

In [22]:
# Total de tweets que serão coletados por cada interação com o twitter por update
NUM_TWEETS = 50  

In [23]:
# Essa função conecta ao Twitter e retorna um número específico de Tweets (NUM_TWEETS)
def tfunc(t, rdd):
    return rdd.flatMap(lambda x: stream_twitter_data())

# Requests.get pertence ao pacote importado: Requests
def stream_twitter_data():
    response = requests.get(filter_url, auth = auth, stream = True)
    print(filter_url, response)
    count = 0
    # Para cada linha obtida
    for line in response.iter_lines():
        try:
            if count > NUM_TWEETS:
                break                
            post = json.loads(line.encode("utf-8")) # decode para garantir que os dados venham com formato especifico.
            contents = [post['text']]
            count += 1
            yield str(contents)
        except:
            result = False

In [24]:
stream = stream.transform(tfunc)

In [25]:
coord_stream = stream.map(lambda line: ast.literal_eval(line))

In [26]:
# Essa função classifica os tweets, aplicando as features do modelo criado anteriormente
def classifica_tweet(tweet):
    sentence = [(tweet, '')]
    test_set = sentiment_analyzer.apply_features(sentence)
    print(tweet, classifier.classify(test_set[0][0]))
    return(tweet, classifier.classify(test_set[0][0]))

In [27]:
# Essa função retorna o texto do Twitter
def get_tweet_text(rdd):
    for line in rdd:        
        tweet = line.strip()
        translator = str.maketrans({key: None for key in string.punctuation})
        tweet = tweet.translate(translator)
        tweet = tweet.split(' ')
        tweet_lower = []
    for word in tweet:
        tweet_lower.append(word.lower())
    return(classifica_tweet(tweet_lower))

In [28]:
# Cria uma lista vazia para os resultados
resultados = []

In [29]:
# Essa função salva o resultado dos batches de Tweets junto com o timestamp
def output_rdd(rdd):
    global resultados
    pairs = rdd.map(lambda x: (get_tweet_text(x)[1],1))
    counts = pairs.reduceByKey(add)
    output = []
    for count in counts.collect():
        output.append(count)
        result = [time.strftime("%I:%M:%S"), output]
        resultados.append(result)
        print(result)

In [30]:
# A função foreachRDD() aplica uma função a cada RDD to streaming de dados
coord_stream.foreachRDD(lambda t, rdd: output_rdd(rdd))

In [31]:
# Start streaming
ssc.start()
# ssc.awaitTermination()

In [None]:
# Finaliza o streaming
ssc.stop()