# <font color='Purple'>Ciência dos Dados na Pratica</font>
# <font color='blue'>Analise de Sentimentos em Tempo Real no Twitter</font>



### *********** Atenção: *********** 
Utilize Java JDK 1.8 / Apache Spark 2.4.2  / Python 3.7 / Ubuntu 18.04

****** Caso receba mensagem de erro "name 'sc' is not defined", interrompa o pyspark e apague o diretório metastore_db no mesmo diretório onde está este Jupyter notebook ******

Acesse http://localhost:4040 sempre que quiser acompanhar a execução dos jobs

## Vamos usar o Framework Spark Streaming para processar dados do Twitter

In [1]:
# Pode ser necessário instalar esses pacotes
!pip install requests_oauthlib
!pip install twython
!pip install nltk

Collecting requests_oauthlib
  Downloading requests_oauthlib-1.3.0-py2.py3-none-any.whl (23 kB)
Collecting oauthlib>=3.0.0
  Downloading oauthlib-3.1.0-py2.py3-none-any.whl (147 kB)
[K     |████████████████████████████████| 147 kB 2.0 MB/s eta 0:00:01
Installing collected packages: oauthlib, requests-oauthlib
Successfully installed oauthlib-3.1.0 requests-oauthlib-1.3.0
Collecting twython
  Downloading twython-3.8.2-py3-none-any.whl (33 kB)
Installing collected packages: twython
Successfully installed twython-3.8.2


In [1]:
# Módulos usados
from pyspark.streaming import StreamingContext
from pyspark import SparkContext
from requests_oauthlib import OAuth1Session
from operator import add
import requests_oauthlib
from time import gmtime, strftime
import requests
import time
import string
import ast
import json
import pylab as pl

In [2]:
# Pacote NLTK
import nltk
from nltk.classify import NaiveBayesClassifier
from nltk.sentiment import SentimentAnalyzer
from nltk.corpus import subjectivity
from nltk.corpus import stopwords
from nltk.sentiment.util import *

In [3]:
# Frequência de update - a cada 5 Segundos a gente grava os resultados
INTERVALO_BATCH = 5

In [4]:
# Criando o StreamingContext - Contexto de manipulacao do Streaming de dados
ssc = StreamingContext(sc, INTERVALO_BATCH)

## Treinando o Classificador de Análise de Sentimento

Uma parte essencial da criação de um algoritmo de análise de sentimento (ou qualquer algoritmo de mineração de dados) é ter um conjunto de dados abrangente ou "Corpus" para o aprendizado, bem como um conjunto de dados de teste para garantir que a precisão do seu algoritmo atende aos padrões que você espera. Isso também permitirá que você ajuste o seu algoritmo a fim de deduzir melhores (ou mais precisas) características de linguagem natural que você poderia extrair do texto e que vão contribuir para a classificação de sentimento, em vez de usar uma abordagem genérica. Tomaremos como base o dataset de treino fornecido pela Universidade de Michigan, para competições do Kaggle --> https://inclass.kaggle.com/c/si650winter11.

Esse dataset contém 1,578,627 tweets classificados e cada linha é marcada como: 

### 1 para o sentimento positivo 
### 0 para o sentimento negativo 

In [5]:
# Lendo o arquivo texto e criando um RDD em memória com Spark
arquivo = sc.textFile("dataset_analise_sentimento.csv")

In [7]:
# Removendo o cabeçalho
header = arquivo.take(1)[0]
dataset = arquivo.filter(lambda line: line != header)

In [8]:
type(dataset)

pyspark.rdd.PipelinedRDD

In [9]:
# Essa função separa as colunas de cada linha, cria uma tupla e remove a pontuação. Faz limpeza de dados
def get_row(line):
  row = line.split(',')
  sentimento = row[1]
  tweet = row[3].strip()
  translator = str.maketrans({key: None for key in string.punctuation})
  tweet = tweet.translate(translator)
  tweet = tweet.split(' ')
  tweet_lower = []
  for word in tweet:
    tweet_lower.append(word.lower())
  return (tweet_lower, sentimento)

In [10]:
# Aplica a função a cada linha do dataset
dataset_treino = dataset.map(lambda line: get_row(line))

In [11]:
# Cria um objeto SentimentAnalyzer 
sentiment_analyzer = SentimentAnalyzer()

In [12]:
# Certifique-se de ter espaço em disco - Aproximadamente 5GB
# https://raw.githubusercontent.com/nltk/nltk_data/gh-pages/index.xml
nltk.download()
nltk.download("stopwords")

[nltk_data] Downloading package stopwords to
[nltk_data]     /home/eduardo/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


True

In [13]:
from IPython.display import Image
Image(url = 'ntlkdata.png')

In [13]:
# Obtém a lista de stopwords em Inglês 
stopwords_all = []
for word in stopwords.words('english'):
  stopwords_all.append(word)
  stopwords_all.append(word + '_NEG')

In [14]:
# Obtém 10.000 tweets do dataset de treino e retorna todas as palavras que não são stopwords
dataset_treino_amostra = dataset_treino.take(10000)

In [15]:
all_words_neg = sentiment_analyzer.all_words([mark_negation(doc) for doc in dataset_treino_amostra])
all_words_neg_nostops = [x for x in all_words_neg if x not in stopwords_all]

In [16]:
# Cria um unigram (n-grama) e extrai as features
unigram_feats = sentiment_analyzer.unigram_word_feats(all_words_neg_nostops, top_n = 200)
sentiment_analyzer.add_feat_extractor(extract_unigram_feats, unigrams = unigram_feats)
training_set = sentiment_analyzer.apply_features(dataset_treino_amostra)

In [18]:
type(training_set)

nltk.collections.LazyMap

In [19]:
print(training_set)

[({'contains()': False, 'contains(im)': False, 'contains(_NEG)': False, 'contains(followfriday)': False, 'contains(amp)': False, 'contains(dont)': False, 'contains(day)': False, 'contains(love)': False, 'contains(like)': False, 'contains(cant)': False, 'contains(good)': False, 'contains(get)': False, 'contains(go)': False, 'contains(today)': False, 'contains(got)': False, 'contains(want)': False, 'contains(time)': False, 'contains(going)': False, 'contains(back)': False, 'contains(one)': False, 'contains(sad)': True, 'contains(really)': False, 'contains(miss)': False, 'contains(u)': False, 'contains(work)': False, 'contains(new)': False, 'contains(2)': False, 'contains(last)': False, 'contains(still)': False, 'contains(twitter)': False, 'contains(night)': False, 'contains(great)': False, 'contains(lol)': False, 'contains(follow)': False, 'contains(need)': False, 'contains(see)': False, 'contains(much)': False, 'contains(myweakness)': False, 'contains(get_NEG)': False, 'contains(didnt)'

In [21]:
# Treinar o modelo
trainer = NaiveBayesClassifier.train
classifier = sentiment_analyzer.train(trainer, training_set)

Training classifier


In [22]:
# Testa o classificador em algumas sentenças
test_sentence1 = [(['this', 'program', 'is', 'bad'], '')]
test_sentence2 = [(['tough', 'day', 'at', 'work', 'today'], '')]
test_sentence3 = [(['good', 'wonderful', 'amazing', 'awesome'], '')]
test_set = sentiment_analyzer.apply_features(test_sentence1)
test_set2 = sentiment_analyzer.apply_features(test_sentence2)
test_set3 = sentiment_analyzer.apply_features(test_sentence3)

In [24]:
# Autenticação do Twitter 
consumer_key = "xxx"
consumer_secret = "xxx"
access_token = "xxx"
access_token_secret = "xxx"

In [25]:
# Especifica a URL termo de busca
search_term = 'Trump'
sample_url = 'https://stream.twitter.com/1.1/statuses/sample.json'
filter_url = 'https://stream.twitter.com/1.1/statuses/filter.json?track='+search_term

In [26]:
# Criando o objeto de atutenticação para o Twitter
auth = requests_oauthlib.OAuth1(consumer_key, consumer_secret, access_token, access_token_secret)

In [28]:
# Configurando o Stream
rdd = ssc.sparkContext.parallelize([0])
stream = ssc.queueStream([], default = rdd)

In [29]:
type(stream)

pyspark.streaming.dstream.DStream

In [30]:
# Total de tweets por update
NUM_TWEETS = 500  

In [31]:
# Essa função conecta ao Twitter e retorna um número específico de Tweets (NUM_TWEETS)
def tfunc(t, rdd):
  return rdd.flatMap(lambda x: stream_twitter_data())

def stream_twitter_data():
  response = requests.get(filter_url, auth = auth, stream = True)
  print(filter_url, response)
  count = 0
  for line in response.iter_lines():
    try:
      if count > NUM_TWEETS:
        break
      post = json.loads(line.decode('utf-8'))
      contents = [post['text']]
      count += 1
      yield str(contents)
    except:
      result = False

In [32]:
stream = stream.transform(tfunc)

In [33]:
coord_stream = stream.map(lambda line: ast.literal_eval(line))

In [34]:
# Essa função classifica os tweets, aplicando as features do modelo criado anteriormente
def classifica_tweet(tweet):
  sentence = [(tweet, '')]
  test_set = sentiment_analyzer.apply_features(sentence)
  print(tweet, classifier.classify(test_set[0][0]))
  return(tweet, classifier.classify(test_set[0][0]))

In [35]:
# Essa função retorna o texto do Twitter
def get_tweet_text(rdd):
  for line in rdd:
    tweet = line.strip()
    translator = str.maketrans({key: None for key in string.punctuation})
    tweet = tweet.translate(translator)
    tweet = tweet.split(' ')
    tweet_lower = []
    for word in tweet:
      tweet_lower.append(word.lower())
    return(classifica_tweet(tweet_lower))

In [36]:
# Cria uma lista vazia para os resultados
resultados = []

In [37]:
# Essa função salva o resultado dos batches de Tweets junto com o timestamp
def output_rdd(rdd):
  global resultados
  pairs = rdd.map(lambda x: (get_tweet_text(x)[1],1))
  counts = pairs.reduceByKey(add)
  output = []
  for count in counts.collect():
    output.append(count)
  result = [time.strftime("%I:%M:%S"), output]
  resultados.append(result)
  print(result)

In [38]:
# A função foreachRDD() aplica uma função a cada RDD to streaming de dados
coord_stream.foreachRDD(lambda t, rdd: output_rdd(rdd))

In [40]:
# Start streaming
ssc.start()
# ssc.awaitTermination()

In [41]:
cont = True
while cont:
  if len(resultados) > 5:
    cont = False

['03:11:23', []]
['03:11:56', [('1', 96), ('0', 405)]]
['03:12:17', [('0', 408), ('1', 93)]]
['03:12:36', [('1', 90), ('0', 411)]]
['03:12:54', [('1', 115), ('0', 386)]]
['03:13:11', [('0', 390), ('1', 111)]]
['03:13:27', [('0', 392), ('1', 109)]]
['03:13:43', [('0', 411), ('1', 90)]]
['03:13:57', [('1', 104), ('0', 397)]]
['03:14:11', [('0', 403), ('1', 98)]]
['03:14:25', [('1', 95), ('0', 406)]]
['03:14:39', [('0', 415), ('1', 86)]]
['03:14:52', [('0', 397), ('1', 104)]]
['03:15:06', [('0', 408), ('1', 93)]]
['03:15:19', [('0', 399), ('1', 102)]]
['03:15:34', [('0', 390), ('1', 111)]]
['03:15:49', [('0', 409), ('1', 92)]]
['03:16:04', [('0', 397), ('1', 104)]]


In [42]:
# Grava os resultados
rdd_save = '/resultados/r'+time.strftime("%I%M%S")
resultados_rdd = sc.parallelize(resultados)
resultados_rdd.saveAsTextFile(rdd_save)

['03:16:21', [('0', 405), ('1', 96)]]


Py4JJavaError: An error occurred while calling o2672.saveAsTextFile.
: org.apache.spark.SparkException: Job aborted.
	at org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:100)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopDataset$1(PairRDDFunctions.scala:1096)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopDataset(PairRDDFunctions.scala:1094)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopFile$4(PairRDDFunctions.scala:1067)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:1032)
	at org.apache.spark.rdd.PairRDDFunctions.$anonfun$saveAsHadoopFile$1(PairRDDFunctions.scala:958)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.PairRDDFunctions.saveAsHadoopFile(PairRDDFunctions.scala:958)
	at org.apache.spark.rdd.RDD.$anonfun$saveAsTextFile$1(RDD.scala:1499)
	at scala.runtime.java8.JFunction0$mcV$sp.apply(JFunction0$mcV$sp.java:23)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
	at org.apache.spark.rdd.RDD.withScope(RDD.scala:363)
	at org.apache.spark.rdd.RDD.saveAsTextFile(RDD.scala:1478)
	at org.apache.spark.api.java.JavaRDDLike.saveAsTextFile(JavaRDDLike.scala:550)
	at org.apache.spark.api.java.JavaRDDLike.saveAsTextFile$(JavaRDDLike.scala:549)
	at org.apache.spark.api.java.AbstractJavaRDDLike.saveAsTextFile(JavaRDDLike.scala:45)
	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.lang.reflect.Method.invoke(Method.java:498)
	at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
	at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
	at py4j.Gateway.invoke(Gateway.java:282)
	at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
	at py4j.commands.CallCommand.execute(CallCommand.java:79)
	at py4j.GatewayConnection.run(GatewayConnection.java:238)
	at java.lang.Thread.run(Thread.java:748)
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 41.0 failed 1 times, most recent failure: Lost task 0.0 in stage 41.0 (TID 40, localhost, executor driver): java.io.IOException: Mkdirs failed to create file:/resultados/r031617/_temporary/0/_temporary/attempt_20200629151618_0171_m_000000_0 (exists=false, cwd=file:/home/eduardo/Downloads/spark-notebook)
	at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:455)
	at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:440)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:911)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:804)
	at org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter(TextOutputFormat.java:123)
	at org.apache.spark.internal.io.HadoopMapRedWriteConfigUtil.initWriter(SparkHadoopWriter.scala:228)
	at org.apache.spark.internal.io.SparkHadoopWriter$.executeTask(SparkHadoopWriter.scala:122)
	at org.apache.spark.internal.io.SparkHadoopWriter$.$anonfun$write$1(SparkHadoopWriter.scala:83)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:411)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	at java.lang.Thread.run(Thread.java:748)

Driver stacktrace:
	at org.apache.spark.scheduler.DAGScheduler.failJobAndIndependentStages(DAGScheduler.scala:1889)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2(DAGScheduler.scala:1877)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$abortStage$2$adapted(DAGScheduler.scala:1876)
	at scala.collection.mutable.ResizableArray.foreach(ResizableArray.scala:62)
	at scala.collection.mutable.ResizableArray.foreach$(ResizableArray.scala:55)
	at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1876)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGScheduler.$anonfun$handleTaskSetFailed$1$adapted(DAGScheduler.scala:926)
	at scala.Option.foreach(Option.scala:274)
	at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:926)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.doOnReceive(DAGScheduler.scala:2110)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2059)
	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onReceive(DAGScheduler.scala:2048)
	at org.apache.spark.util.EventLoop$$anon$1.run(EventLoop.scala:49)
	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:737)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2061)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2082)
	at org.apache.spark.SparkContext.runJob(SparkContext.scala:2114)
	at org.apache.spark.internal.io.SparkHadoopWriter$.write(SparkHadoopWriter.scala:78)
	... 38 more
Caused by: java.io.IOException: Mkdirs failed to create file:/resultados/r031617/_temporary/0/_temporary/attempt_20200629151618_0171_m_000000_0 (exists=false, cwd=file:/home/eduardo/Downloads/spark-notebook)
	at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:455)
	at org.apache.hadoop.fs.ChecksumFileSystem.create(ChecksumFileSystem.java:440)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:911)
	at org.apache.hadoop.fs.FileSystem.create(FileSystem.java:804)
	at org.apache.hadoop.mapred.TextOutputFormat.getRecordWriter(TextOutputFormat.java:123)
	at org.apache.spark.internal.io.HadoopMapRedWriteConfigUtil.initWriter(SparkHadoopWriter.scala:228)
	at org.apache.spark.internal.io.SparkHadoopWriter$.executeTask(SparkHadoopWriter.scala:122)
	at org.apache.spark.internal.io.SparkHadoopWriter$.$anonfun$write$1(SparkHadoopWriter.scala:83)
	at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
	at org.apache.spark.scheduler.Task.run(Task.scala:121)
	at org.apache.spark.executor.Executor$TaskRunner.$anonfun$run$3(Executor.scala:411)
	at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
	at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
	... 1 more


In [43]:
# Visualiza os resultados
resultados_rdd.collect()

[['03:11:23', []],
 ['03:11:56', [('1', 96), ('0', 405)]],
 ['03:12:17', [('0', 408), ('1', 93)]],
 ['03:12:36', [('1', 90), ('0', 411)]],
 ['03:12:54', [('1', 115), ('0', 386)]],
 ['03:13:11', [('0', 390), ('1', 111)]],
 ['03:13:27', [('0', 392), ('1', 109)]],
 ['03:13:43', [('0', 411), ('1', 90)]],
 ['03:13:57', [('1', 104), ('0', 397)]],
 ['03:14:11', [('0', 403), ('1', 98)]],
 ['03:14:25', [('1', 95), ('0', 406)]],
 ['03:14:39', [('0', 415), ('1', 86)]],
 ['03:14:52', [('0', 397), ('1', 104)]],
 ['03:15:06', [('0', 408), ('1', 93)]],
 ['03:15:19', [('0', 399), ('1', 102)]],
 ['03:15:34', [('0', 390), ('1', 111)]],
 ['03:15:49', [('0', 409), ('1', 92)]],
 ['03:16:04', [('0', 397), ('1', 104)]]]

['03:16:43', [('1', 104), ('0', 397)]]
['03:17:00', [('0', 396), ('1', 105)]]
['03:17:14', [('0', 419), ('1', 82)]]


In [44]:
# Finaliza o streaming
ssc.stop()

# Fim

### Valeu! - Ciencia dos Dados - <a href="http://facebook.com/cienciadosdadosbr">facebook.com/cienciadosdadosbr</a>

### Telegram - Scripts e Datasets - Comunidade Telegram <a href="https://t.me/cienciadosdadosraiz">https://t.me/cienciadosdadosraiz</a>

### #**YouTube** - Mais Aulas como essa no YouTube <a href="https://www.youtube.com/watch?v=IaIc5oHd3II&t=1569s">https://www.youtube.com/watch?v=IaIc5oHd3II&t=1569s</a>

In [3]:
from IPython.core.display import HTML

HTML('<iframe width="300" height="200" src="https://www.youtube.com/embed/okAOIlqSfUg" frameborder="0" allow="accelerometer; autoplay; encrypted-media; gyroscope; picture-in-picture" allowfullscreen></iframe>')

