In [1]:
# Pacotes necessarios para dicionario e integrações ao Twitter.
!pip install requests_oauthlib
!pip install twython
!pip install nltk



In [1]:
#Alguns dos modulos usados:
from pyspark.streaming import StreamingContext
from pyspark import SparkContext
from requests_oauthlib import OAuth1Session
from operator import add
from time import gmtime, strftime
import requests_oauthlib
import requests
import time
import string
import ast
import json
import pylab as pl

In [2]:
#Pacotes do dicionario NLRK.
import nltk
from nltk.classify import NaiveBayesClassifier
from nltk.sentiment import SentimentAnalyzer
from nltk.corpus import subjectivity
from nltk.corpus import stopwords
from nltk.sentiment.util import *

In [3]:
from pyspark import SparkContext
sc = SparkContext()

In [4]:
# Intervalo de updade de resultados.
INTERVALO_BATCH = 5

In [5]:
#Criando o StreamingContext - basicamente ele manipula o Streaming de dados.
ssc = StreamingContext(sc, INTERVALO_BATCH)

In [6]:
# Aqui vou treinar o classificador de análise de sentimentos, realizando o Machine Learning.. usando um dataset 
# de treino oferecido pela Universidade do Michigan.
arquivo = sc.textFile("dataset_analise_sentimento.csv")

In [7]:
# Remove o cabeçalho.
header = arquivo.take(1)[0]
dataset = arquivo.filter(lambda line: line != header)

In [8]:
# Função utilizada para separar as colunas de cada linha, faz uma limpeza de dados.
def get_row(line):
  row = line.split(',')
  sentimento = row[1]
  tweet = row[3].strip()
  translator = str.maketrans({key: None for key in string.punctuation})
  tweet = tweet.translate(translator)
  tweet = tweet.split(' ')
  tweet_lower = []
  for word in tweet:
    tweet_lower.append(word.lower())
  return (tweet_lower, sentimento)

In [9]:
# Aplica a função anterior em cada linha do dataset
dataset_treino = dataset.map(lambda line: get_row(line))

In [10]:
# Cria um objeto SentimentAnalyzer
sentiment_analyzer = SentimentAnalyzer()

In [12]:
# Download do NLTK - com StopWords incluidas. 
nltk.download()
nltk.download("stopwords")

showing info https://raw.githubusercontent.com/nltk/nltk_data/gh-pages/index.xml


[nltk_data] Downloading package stopwords to
[nltk_data]     /home/vinicius/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


True

In [11]:
# Obtém a lista de StopWords no idioma desejado, como eu utilizei um dataset com tweets em inglês, usarei o inglês.
stopwords_all = []
for word in stopwords.words('english'):
  stopwords_all.append(word)
  stopwords_all.append(word + '_NEG')

In [12]:
# Aqui irei escolher quantos tweets irei utilizar do dataset.
dataset_treino_amostra = dataset_treino.take(5000)

In [13]:
all_words_neg = sentiment_analyzer.all_words([mark_negation(doc) for doc in dataset_treino_amostra])
all_words_neg_nostops = [x for x in all_words_neg if x not in stopwords_all]

In [14]:
# criando um unigram e extraindo suas features, unigram é basicamente uma quebra de frases.. 
# uni = uma palavra ('Eu', 'Sou')
# bigrama = duas palavras ('Eu sou')
unigram_feats = sentiment_analyzer.unigram_word_feats(all_words_neg_nostops, top_n = 200)
sentiment_analyzer.add_feat_extractor(extract_unigram_feats, unigrams = unigram_feats)
training_set = sentiment_analyzer.apply_features(dataset_treino_amostra)

In [15]:
type(training_set)

nltk.collections.LazyMap

In [16]:
# treinando o modelo criado.
trainer = NaiveBayesClassifier.train
classifier = sentiment_analyzer.train(trainer, training_set)

Training classifier


In [17]:
# testando o nosso classificador com algumas sentenças.
test_sentence1 = [(['this', 'program', 'is', 'bad'], '')]
test_sentence2 = [(['tough', 'day', 'at', 'work', 'today'], '')]
test_sentence3 = [(['good', 'wonderful', 'amazing', 'awesome'], '')]
test_set = sentiment_analyzer.apply_features(test_sentence1)
test_set2 = sentiment_analyzer.apply_features(test_sentence2)
test_set3 = sentiment_analyzer.apply_features(test_sentence3)

In [18]:
# Autenticação e permissões para utilização da API do Twitter
consumer_key = "Chave Pessoal"
consumer_secret = "Chave Pessoal"
access_token = "Chave Pessoal"
access_token_secret = "Chave Pessoal"

In [19]:
# Especificando a URL com o termo de busca.
search_term = 'Trump'
sample_url = 'https://stream.twitter.com/1.1/statuses/sample.json'
filter_url = 'https://stream.twitter.com/1.1/statuses/filter.json?track='+search_term

In [20]:
# Criando o objeto autenticação para o twitter.
auth = requests_oauthlib.OAuth1(consumer_key, consumer_secret, access_token, access_token_secret)

In [21]:
# Configurando o Stream
rdd = ssc.sparkContext.parallelize([0])
stream = ssc.queueStream([], default = rdd)

In [22]:
type(stream)

pyspark.streaming.dstream.DStream

In [23]:
# Quantidade de tweets que será utilizado por Update.
NUM_TWEETS = 200

In [24]:
# Essa função sincroniza em tempo real com o Twitter e retorna o número especifico de Tweets, especificado 
#anteriromente com (NUM_TWEETS)
def tfunc(t, rdd):
  return rdd.flatMap(lambda x: stream_twitter_data())

def stream_twitter_data():
  response = request.get(filter_url, auth = auth, stream = True)
  print(filter_url, response)
  count = 0
  for line in response.iter_lines():
    try:
      if count > NUM_TWEETS:
        break
      post = json.loads(line.decode('utf-8'))
      contents = [post['text']]
      count += 1
      yield str(contents)
    except:
      result = False       

In [25]:
stream = stream.transform(tfunc)

In [26]:
coord_stream = stream.map(lambda line: ast.literal_eval(line))

In [27]:
# Função que classifica os Tweets e aplica as features do modelo criado anteriormente.
def classifica_tweet(tweet):
  sentence = [(tweet, '')]
  test_set = sentiment_analyzer.apply_features(sentence)
  print(tweet, classifier.classify(test_set[0][0]))
  return(tweet, classifier.classify(test_set[0][0]))

In [28]:
# Função para retornar o texto do Twitter.
def get_tweet_text(rdd):
  for line in rdd:
    tweet = line.strip()
    translator = str.maketrans({key: None for key in string.punctuation})
    tweet = tweet.translate(translator)
    tweet = tweet.split(' ')
    tweet_lower = []
    for word in tweet:
      tweet_lower.append(word.lower())
    return(classifica_tweet(tweet_lower))

In [29]:
# Criando uma lista vazia para os resultados.
resultados = []

In [30]:
# Essa função salva o resultado dos batches de Tweets junto com o TimeStramp.
def output_rdd(rdd):
  global resultados 
  pairs = rdd.map(lambda x: (get_tweet_text(x)[1],1))
  counts = pairs.reduceByKey(add)
  output = []
  for count in counts.collect():
    output.append(count)
  result = [time.strftime("%I:%M:%S"), output]
  resultados.append(result)
  print(result)
    

In [31]:
# A função foreachRDD() aplica uma função a cada RDD de Streaming de Dados.
coord_stream.foreachRDD(lambda t, rdd: output_rdd(rdd))

In [32]:
# Começa o Streming 
ssc.start()
# ssc.awaitTermination()

In [None]:
cont = True 
while cont:
   if len(resultados) > 5:
     cont = False

['05:11:13', []]


In [None]:
# Grava os nossos resultados.
rdd_save =  '/resultados/r'+time.strftime("%I%M%S")
resultados_rdd = sc.parallelize(resultados)
resultados_rdd.saveAsTextFile(rdd_save)

In [None]:
# Vizualizando os nossos resultados.
resultados_rdd.collect()

In [None]:
# Finaliza o nosso Streaming.
ssc.stop()