# Análise de Dados do Twitter em Tempo Real com Python e Spark

***Atenção***

Utilize Java JDK 1.8 ou 11 e Apache Spark 2.3.1

## Instalação do Spark no Colab


In [None]:
!apt-get update
!apt-get install openjdk-8-jdk-headless -qq > /dev/null
!wget -q http://archive.apache.org/dist/spark/spark-2.3.1/spark-2.3.1-bin-hadoop2.7.tgz
!tar xf spark-2.3.1-bin-hadoop2.7.tgz
!pip install -q findspark

0% [Working]            Hit:1 https://cloud.r-project.org/bin/linux/ubuntu bionic-cran40/ InRelease
0% [Connecting to archive.ubuntu.com (91.189.88.142)] [Waiting for headers] [Wa                                                                               Ign:2 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  InRelease
0% [Connecting to archive.ubuntu.com (91.189.88.142)] [Waiting for headers] [Wa0% [1 InRelease gpgv 3,626 B] [Connecting to archive.ubuntu.com (91.189.88.142)                                                                               Ign:3 https://developer.download.nvidia.com/compute/machine-learning/repos/ubuntu1804/x86_64  InRelease
0% [1 InRelease gpgv 3,626 B] [Waiting for headers] [Waiting for headers] [Wait                                                                               Hit:4 https://developer.download.nvidia.com/compute/cuda/repos/ubuntu1804/x86_64  Release
0% [1 InRelease gpgv 3,626 B] [Waiting for head

## Configuração de variáveis

In [None]:
import os
os.environ["JAVA_HOME"] = "/usr/lib/jvm/java-8-openjdk-amd64"
os.environ["SPARK_HOME"] = "/content/spark-2.3.1-bin-hadoop2.7"

In [None]:
!ls

dataset_analise_sentimento.csv	spark-2.3.1-bin-hadoop2.7.tgz
sample_data			spark-2.3.1-bin-hadoop2.7.tgz.1
spark-2.3.1-bin-hadoop2.7


## Spark Streaming Twitter

In [None]:
# pacotes necessário
!pip install requests_oauthlib
!pip install twython
!pip install nltk
import findspark
findspark.init()



In [None]:
# Módulos usados
from pyspark.streaming import StreamingContext
from pyspark import SparkContext
from requests_oauthlib import OAuth1Session
from operator import add
from time import gmtime, strftime
import requests_oauthlib
import requests
import time
import string
import ast
import json

In [None]:
# Pacote NLTK
import nltk
from nltk.classify import NaiveBayesClassifier
from nltk.sentiment import SentimentAnalyzer
from nltk.corpus import subjectivity
from nltk.corpus import stopwords
from nltk.sentiment.util import *

In [None]:
# Frequencia de update
INTERVALO_BATCH = 5

In [None]:
# Criando o StreamingContext
sc =SparkContext.getOrCreate()
ssc = StreamingContext(sc, INTERVALO_BATCH)

## Treinando o Classificador de Análise de Sentimento

Uma parte essencial da criação de um algoritmo de análise de sentimento (ou qualquer algoritmo de mineiração de dados) é ter um conjunto de dados abrangente ou "Corpus" para o aprendizado, bem como um conjunto de dados de teste para garantir que a precisão do seu algoritmo atende aos padrões que você espera. Isso também permitirá que você ajuste o seu algoritmo a fim de deduzir melhores (ou mais precisas) características de linguagem natural que você poderia extrair do texto e que vão contribuir para a classificação de sentimento, em vez de usar uma abordagem genérica. Tomaremos como base o dataset de treino fornecido pela Universidade de Michigan, para competições do Kaggle --> https://inclass.kaggle.com/c/si650winter11.

Esse dataset contém 1,578,627 tweets classificados e cada linha linha é marcada como:

1 para o sentimento positivo<br>
0 para o sentimento negativo

In [None]:
# Lendo o arquivo de texto e criando um RDD em memória com Spark
arquivo = sc.textFile("dataset_analise_sentimento.csv")

In [None]:
# Removendo o cabeçalho
header = arquivo.take(1)[0]
dataset = arquivo.filter(lambda line: line != header)

In [None]:
type(dataset)

pyspark.rdd.PipelinedRDD

In [None]:
# Essa função separa as colunas em cada linha, cria uma tupla e remove a pontuação.
def get_row(line):
  row = line.split(',')
  sentimento = row[1]
  tweet = row[3].strip()
  translator = str.maketrans({key: None for key in string.punctuation})
  tweet = tweet.translate(translator)
  tweet = tweet.split(' ')
  tweet_lower = []
  for word in tweet:
    tweet_lower.append(word.lower())
  return (tweet_lower, sentimento)

In [None]:
# Aplicação a função a cada linha do dataset
dataset_treino = dataset.map(lambda line: get_row(line))

In [None]:
# Cria um objeto SentimentAnalyzer
sentiment_analyzer = SentimentAnalyzer()

In [None]:
# Certifique-se de ter espaço em disco - Aproximadamente 5GB
nltk.download("stopwords")

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


True

In [None]:
from IPython.display import Image
Image(url = 'nltkdata.png')

In [None]:
# Obtém a lista de stopwords em Inglês
stopwords_all = []
for word in stopwords.words('english'):
  stopwords_all.append(word)
  stopwords_all.append(word + '_NEG')

In [None]:
# Obtém 10.000 tweets do dataset de treino e retorna todas as palavras que não são stopwords
dataset_treino_amostra = dataset_treino.take(10000)

In [None]:
all_words_neg = sentiment_analyzer.all_words([mark_negation(doc) for doc in dataset_treino_amostra])
all_words_neg_nostops = [x for x in all_words_neg if x not in stopwords_all]

In [None]:
# Cria um unigram (n-grama) e extrai as features
# n-grama sequência contínua de itens, como sílabas, letras, palavras
unigram_feats = sentiment_analyzer.unigram_word_feats(all_words_neg_nostops, top_n = 200)
sentiment_analyzer.add_feat_extractor(extract_unigram_feats, unigrams = unigram_feats)
training_set = sentiment_analyzer.apply_features(dataset_treino_amostra)

In [None]:
type(training_set)

nltk.collections.LazyMap

In [None]:
# Treinar o modelo
trainer = NaiveBayesClassifier.train
classifier = sentiment_analyzer.train(trainer, training_set)

Training classifier


In [None]:
# Testa o classificador em algumas sentenças
test_sentence1 = [(['this', 'program', 'is', 'bad'], '')]
test_sentence2 = [(['tough','day','at','work','today']), '']
test_sentence3 = [(['good','wonderful','amzing','awesome'], '')]
test_set = sentiment_analyzer.apply_features(test_sentence1)
test_set2 = sentiment_analyzer.apply_features(test_sentence2)
test_set3 = sentiment_analyzer.apply_features(test_sentence3)

In [None]:
# Autenticação do Twitter
consumer_key = "XXX"
consumer_secret = "XXX"
access_token = "XXX"
access_token_secret = "XXX"

In [None]:
# Especifica a URL termo de busca
search_term = 'Trump'
sample_url = 'https://stream.twitter.com/1.1/statuses/sample.json'
filter_url = 'https://stream.twitter.com/1.1/statuses/filter.json?track='+search_term

In [None]:
# Criando o objeto de atutenticação para o Twitter
auth = requests_oauthlib.OAuth1(consumer_key, consumer_secret, access_token, access_token_secret)

In [None]:
# Configurando o Stream
rdd = ssc.sparkContext.parallelize([0])
stream = ssc.queueStream([], default = rdd)

In [None]:
# Coleção de RDDs
type(stream)

pyspark.streaming.dstream.DStream

In [None]:
# Total de tweets por update
NUM_TWEETS = 500

In [None]:
# Essa função conecta ao Twitter e retorna um número específico de Tweets (NUM_TWEETS)
def tfunc(t, rdd):
  return rdd.flatMap(lambda x: stream_twitter_data())

def stream_twitter_data():
  response = requests.get(filter_url, auth = auth, stream = True)
  print(filter_url, response)
  count = 0
  for line in response.iter_lines():
    try:
      if count > NUM_TWEETS:
        break
      post = json.loads(line.decode('utf-8'))
      contents = [post['text']]
      count += 1
      yield str(contents)
    except:
      result = False

In [None]:
stream = stream.transform(tfunc)

In [None]:
coord_stream = stream.map(lambda line: ast.literal_eval(line))

In [None]:
# Essa função classifica os tweets, aplicando as features do modelo criado anteriormente
def classifica_tweet(tweet):
  sentence = [(tweet, '')]
  test_set = sentiment_analyzer.apply_features(sentence)
  print(tweet, classifier.classify(test_set[0][0]))
  return(tweet, classifier.classify(test_set[0][0]))

In [None]:
# Essa função retorna o texto do Twitter
def get_tweet_text(rdd):
  for line in rdd:
    tweet = line.strip()
    translator = str.maketrans({key: None for key in string.punctuation})
    tweet = tweet.translate(translator)
    tweet = tweet.split(' ')
    tweet_lower = []
    for word in tweet:
      tweet_lower.append(word.lower())
    return(classifica_tweet(tweet_lower))

In [None]:
# Cria uma lista vazia para os resultados
resultados = []

In [None]:
# Essa função salva o resultado dos batches de Tweets junto com o timestamp
def output_rdd(rdd):
  global resultados
  pairs = rdd.map(lambda x: (get_tweet_text(x)[1],1))
  counts = pairs.reduceByKey(add)
  output = []
  for count in counts.collect():
    output.append(count)
  result = [time.strftime("%I:%M:%S"), output]
  resultados.append(result)
  print(result)

In [None]:
# A função foreachRDD() aplica uma função a cada RDD to streaming de dados
coord_stream.foreachRDD(lambda t, rdd: output_rdd(rdd))

In [None]:
# Start streaming
ssc.start()
# ssc.awaitTermination()

In [None]:
cont = True
while cont:
  if len(resultados) > 5:
    cont = False

In [None]:
# Grava os resultados
rdd_save = 'sentiment_analysis'+time.strftime("%I%M%S")
resultados_rdd = sc.parallelize(resultados)
resultados_rdd.saveAsTextFile(rdd_save)

In [None]:
# Visualiza os resultados
resultados_rdd.collect()

[['02:22:56', []],
 ['02:23:13', [('1', 132), ('0', 369)]],
 ['02:23:24', [('0', 373), ('1', 128)]],
 ['02:23:36', [('0', 367), ('1', 134)]],
 ['02:23:48', [('0', 389), ('1', 112)]],
 ['02:24:00', [('0', 355), ('1', 146)]],
 ['02:24:12', [('0', 374), ('1', 127)]],
 ['02:24:23', [('0', 382), ('1', 119)]],
 ['02:24:33', [('0', 383), ('1', 118)]],
 ['02:24:41', [('1', 117), ('0', 384)]],
 ['02:24:52', [('0', 381), ('1', 120)]],
 ['02:25:03', [('0', 367), ('1', 134)]],
 ['02:25:14', [('0', 365), ('1', 136)]],
 ['02:25:25', [('0', 368), ('1', 133)]],
 ['02:25:36', [('0', 371), ('1', 130)]],
 ['02:25:46', [('0', 393), ('1', 108)]],
 ['02:25:56', [('1', 123), ('0', 378)]],
 ['02:26:07', [('0', 370), ('1', 131)]],
 ['02:26:18', [('0', 377), ('1', 124)]],
 ['02:26:29', [('0', 377), ('1', 124)]],
 ['02:26:40', [('0', 372), ('1', 129)]],
 ['02:26:51', [('0', 381), ('1', 120)]],
 ['02:27:02', [('0', 391), ('1', 110)]],
 ['02:27:13', [('0', 364), ('1', 137)]],
 ['02:27:24', [('0', 379), ('1', 122)]

In [None]:
# Finaliza o streaming
ssc.stop()

Fim