In [1]:
#import pyspark.sql.functions as F
from pyspark.sql import Row #Converte RDDs em objetos do tipo Row
from pyspark.sql.functions import col, isnan, when, count # Encontra a contagem para valores None, Null, Nan, etc.
from pyspark.sql.types import IntegerType, FloatType

from pyspark.ml.feature import StringIndexer, OneHotEncoder #Converte strings em valores numéricos
from pyspark.ml.linalg import Vectors #Serve para criar um vetor denso
from pyspark.ml.evaluation import MulticlassClassificationEvaluator # Para avaliar o modelo com as métricas de avaliação.
from pyspark.ml.feature import RobustScaler, StandardScaler, MinMaxScaler, Normalizer # Métodos para escalas dos dados
from pyspark.ml.classification import RandomForestClassifier, LogisticRegression, GBTClassifier, LinearSVC # Algoritmos de ML
from pyspark.ml import Pipeline # Criação de um Pipeline de execução.
from pyspark.ml.functions import vector_to_array
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator # GridSearch e Validação Cruzada
from pyspark.ml.evaluation import BinaryClassificationEvaluator # Evaluator para classificação binária

from math import floor

In [2]:
# Módulos usados
from pyspark.streaming import StreamingContext
from pyspark import SparkContext
from requests_oauthlib import OAuth1Session
from operator import add
import requests_oauthlib
from time import gmtime, strftime
import requests
import time
import string
import ast
import json
#import re

In [3]:
# Pacote NLTK
import nltk
from nltk.classify import NaiveBayesClassifier
from nltk.sentiment import SentimentAnalyzer
from nltk.corpus import subjectivity
from nltk.corpus import stopwords
from nltk.sentiment.util import *

In [4]:
# Frequência de update
INTERVALO_BATCH = 10

In [30]:
# Criando o StreamingContext
ssc = StreamingContext(sc, INTERVALO_BATCH)

## Treinando o Classificador de Análise de Sentimento

Uma parte essencial da criação de um algoritmo de análise de sentimento (ou qualquer algoritmo de mineração de dados) é ter um conjunto de dados abrangente ou "Corpus" para o aprendizado, bem como um conjunto de dados de teste para garantir que a precisão do seu algoritmo atende aos padrões que você espera. Isso também permitirá que você ajuste o seu algoritmo a fim de deduzir melhores (ou mais precisas) características de linguagem natural que você poderia extrair do texto e que vão contribuir para a classificação de sentimento, em vez de usar uma abordagem genérica. Tomaremos como base o dataset de treino fornecido pela Universidade de Michigan, para competições do Kaggle (https://inclass.kaggle.com/c/si650winter11).


Esse dataset contém 1,578,627 tweets classificados e cada linha é marcada como: 

#### 1 para o sentimento positivo 
#### 0 para o sentimento negativo 

In [6]:
# Spark Session - usada quando se trabalha com Dataframes no Spark
spSession = SparkSession.builder.master("local").appName("tw-session").config("spark.some.config.option", "session").getOrCreate()

In [7]:
# Lendo o arquivo texto e criando um RDD em memória com Spark
rdd = sc.textFile("data/train.csv")

In [8]:
header = rdd.first()
rdd_body = rdd.filter(lambda x: header not in x)#.map(lambda l: l.split(','))

list_columns = header.replace('.', '_').upper().split(',')
list_columns

['SENTENCE', 'LABEL']

In [9]:
rdd_body = rdd_body.map(lambda x: x.replace(',0', ';0')).map(lambda x: x.replace(',1', ';1'))

In [10]:
rdd_body.take(10)

['Ok brokeback mountain is such a horrible movie.;0',
 'Brokeback Mountain was so awesome.;1',
 'friday hung out with kelsie and we went and saw The Da Vinci Code SUCKED!!!!!;0',
 'I am going to start reading the Harry Potter series again because that is one awesome story.;1',
 'Is it just me, or does Harry Potter suck?...;0',
 'The Da Vinci Code sucked big time.;0',
 'I am going to start reading the Harry Potter series again because that is one awesome story.;1',
 'For those who are Harry Potter ignorant, the true villains of this movie are awful creatures called dementors.;0',
 'Harry Potter dragged Draco Malfoy ’ s trousers down past his hips and sucked him into his throat with vigor, making whimpering noises and panting and groaning around the blonds rock-hard, aching cock...;0',
 "So as felicia's mom is cleaning the table, felicia grabs my keys and we dash out like freakin mission impossible.;1"]

In [11]:
rdd_row = rdd_body.map(lambda p: Row(
    TEXT = p[0], 
    SENTIMENT = p[1]
))

rdd_row.take(10)

# Criando um Dataframe
rdd_df = spSession.createDataFrame(rdd_row)
rdd_df.cache()

rdd_df.head(5)

[Row(TEXT='O', SENTIMENT='k'),
 Row(TEXT='B', SENTIMENT='r'),
 Row(TEXT='f', SENTIMENT='r'),
 Row(TEXT='I', SENTIMENT=' '),
 Row(TEXT='I', SENTIMENT='s')]

In [12]:
# Essa função separa as colunas em cada linha, cria uma tupla e remove a pontuação.
def get_row(line):
    row = line.split(';')
    
    tweet = row[0].strip()
    sentimento = int(re.sub('[^\d]+', '', row[1]))
    
    translator = str.maketrans({key: None for key in string.punctuation})
    #translator = re.compile('[%s]' % re.escape(string.punctuation))
    #tweet = regex.sub('', tweet)
    tweet = tweet.translate(translator)
    tweet = tweet.split(' ')
    tweet_lower = []
    for word in tweet:
        tweet_lower.append(word.lower())
    return (tweet_lower, sentimento)

In [13]:
# Aplcia a função a cada linha do dataset
dataset_treino = rdd_body.map(lambda line: get_row(line))

In [14]:
dataset_treino.take(2)

[(['ok', 'brokeback', 'mountain', 'is', 'such', 'a', 'horrible', 'movie'], 0),
 (['brokeback', 'mountain', 'was', 'so', 'awesome'], 1)]

In [15]:
# Cria um objeto SentimentAnalyzer 
sentiment_analyzer = SentimentAnalyzer()

In [16]:
# Obtém a lista de stopwords em Inglês 
stopwords_all = []
for word in stopwords.words('english'):
    stopwords_all.append(word)
    stopwords_all.append(word + '_NEG')

In [17]:
# Obtém 10.000 tweets do dataset de treino e retorna todas as palavras que não são stopwords
dataset_treino_amostra = dataset_treino.take(1000)

In [18]:
all_words_neg = sentiment_analyzer.all_words([mark_negation(doc) for doc in dataset_treino_amostra])
all_words_neg_nostops = [x for x in all_words_neg if x not in stopwords_all]

In [19]:
# Cria um unigram e extrai as features
unigram_feats = sentiment_analyzer.unigram_word_feats(all_words_neg_nostops, top_n = 200)
sentiment_analyzer.add_feat_extractor(extract_unigram_feats, unigrams = unigram_feats)
training_set = sentiment_analyzer.apply_features(dataset_treino_amostra)

In [20]:
type(training_set)

nltk.collections.LazyMap

In [21]:
# Treinar o modelo
trainer = NaiveBayesClassifier.train
classifier = sentiment_analyzer.train(trainer, training_set)

Training classifier


In [22]:
# Testa o classificador em algumas sentenças
test_sentence1 = [(['this', 'program', 'is', 'suck'], '')]
test_sentence2 = [(['tough', 'day', 'at', 'work', 'today'], '')]
test_sentence3 = [(['good', 'wonderful', 'amazing', 'awesome'], '')]
test_set1 = sentiment_analyzer.apply_features(test_sentence1)
test_set2 = sentiment_analyzer.apply_features(test_sentence2)
test_set3 = sentiment_analyzer.apply_features(test_sentence3)

In [23]:
test_set1[0][0]

{'contains(da)': False,
 'contains(vinci)': False,
 'contains(code)': False,
 'contains(brokeback)': False,
 'contains(mountain)': False,
 'contains(harry)': False,
 'contains(potter)': False,
 'contains(love)': False,
 'contains(awesome)': False,
 'contains(impossible)': False,
 'contains(mission)': False,
 'contains()': False,
 'contains(like)': False,
 'contains(movie)': False,
 'contains(sucked)': False,
 'contains(sucks)': False,
 'contains(hate)': False,
 'contains(movies)': False,
 'contains(stupid)': False,
 'contains(suck)': True,
 'contains(really)': False,
 'contains(much)': False,
 'contains(one)': False,
 'contains(3)': False,
 'contains(loved)': False,
 'contains(right)': False,
 'contains(series)': False,
 'contains(depressing)': False,
 'contains(fucking)': False,
 'contains(left)': False,
 'contains(want)': False,
 'contains(horrible)': False,
 'contains(harry_NEG)': False,
 'contains(potter_NEG)': False,
 'contains(start)': False,
 'contains(reading)': False,
 'contai

In [24]:
classifier.classify(test_set1[0][0])

0

---

---

In [25]:
import requests
import configparser
import tweepy
from tweepy import OAuthHandler
from tweepy import Stream
import socket
import json
import re
import time

In [26]:
# Autenticação do Twitter 
api_key = 'Olk6wAb5NqOnXztjIs9bR6FaF'
api_key_secret = 'vkUwO1OGEjKH5CGlmLsKP1KBTYoESycOGopwAjVTlDqG98Ajzj'
access_token = '1228500338912743425-fLQRIKtrHJe9mkvWM7EXToYmtfzzxc'
access_token_secret = 'q1cx5rGXqmIqeWX6gBsMGG0ymGzYLTfAe4Eagxin9ohRh'
bearer_token = r'AAAAAAAAAAAAAAAAAAAAAHY5hgEAAAAA2FSk9Qi3r1OKDdlXRzhkzox9InI%3DR3U3eL0OI5G8ka9GO1OlXtrZnvfuktbrEJozzVwPz1LssHYWcG'

In [27]:
def bearer_oauth(r):
    """
    Method required by bearer token authentication.
    """

    r.headers["Authorization"] = f"Bearer {bearer_token}"
    r.headers["User-Agent"] = "v2FilteredStreamPython"
    return r

In [28]:
def get_set_rules():
    print('Getting rules...')
    response = requests.get("https://api.twitter.com/2/tweets/search/stream/rules", auth=bearer_oauth)
    print(json.dumps(response.json()))
    rules = response.json()
    
    #########################################################################
    print('\nDeleting all rules...')
    if rules is None or "data" not in rules:
        return None

    ids = list(map(lambda rule: rule["id"], rules["data"]))
    payload = {"delete": {"ids": ids}}
    response = requests.post("https://api.twitter.com/2/tweets/search/stream/rules", auth=bearer_oauth, json=payload)
    
    print(json.dumps(response.json()))
    
    #########################################################################
    print('\nSetting rules...')
    sample_rules = [{'value': 'russia'},{'value': 'war'},{'value': 'putin'}]

    payload = {"add": sample_rules}
    response = requests.post("https://api.twitter.com/2/tweets/search/stream/rules", auth=bearer_oauth, json=payload)
    print(json.dumps(response.json()))

In [29]:
# Gainaing access and connecting to Twitter API using Credentials
client = tweepy.Client(bearer_token, api_key, api_key_secret, access_token, access_token_secret)

auth = tweepy.OAuth1UserHandler(api_key, api_key_secret, access_token, access_token_secret)
api = tweepy.API(auth, wait_on_rate_limit=True)

get_set_rules()

Getting rules...
{"data": [{"id": "1579447944444526593", "value": "russia"}, {"id": "1579447950945689601", "value": "war"}, {"id": "1579447955551051777", "value": "putin"}], "meta": {"sent": "2022-10-10T12:28:19.992Z", "result_count": 3}}

Deleting all rules...
{"meta": {"sent": "2022-10-10T12:28:21.341Z", "summary": {"deleted": 3, "not_deleted": 0}}}

Setting rules...
{"data": [{"value": "putin", "id": "1579448750732464129"}, {"value": "russia", "id": "1579448750732464131"}, {"value": "war", "id": "1579448750732464130"}], "meta": {"sent": "2022-10-10T12:28:22.788Z", "summary": {"created": 3, "not_created": 0, "valid": 3, "invalid": 0}}}


In [31]:
stream = ssc.queueStream([], default = rdd)
type(stream)

pyspark.streaming.dstream.DStream

In [32]:
# Total de tweets por update
NUM_TWEETS = 1  

In [33]:
# Essa função conecta ao Twitter e retorna um número específico de Tweets (NUM_TWEETS)
def tfunc(t, rdd):
    return rdd.flatMap(lambda x: stream_twitter_data())

def stream_twitter_data():
    response = requests.get("https://api.twitter.com/2/tweets/search/stream", auth=bearer_oauth, stream=True)
    print('Status code {}'.format(response.status_code))
    count = 0
    for line in response.iter_lines():
        try:
            if count > NUM_TWEETS:
                break
            post = json.loads(line.decode('utf-8'))
            contents = [post['data']['text']]
            count += 1
            yield str(contents)
        except:
            result = False

In [34]:
stream = stream.transform(tfunc)
type(stream)

pyspark.streaming.dstream.TransformedDStream

In [35]:
coord_stream = stream.map(lambda line: ast.literal_eval(line))
type(coord_stream)

pyspark.streaming.dstream.TransformedDStream

In [36]:
# Essa função classifica os tweets, aplicando as features do modelo criado anteriormente
def classifica_tweet(tweet):
    sentence = [(tweet, '')]
    test_set = sentiment_analyzer.apply_features(sentence)
    print(tweet, classifier.classify(test_set[0][0]))
    return(tweet, classifier.classify(test_set[0][0]))

In [37]:
# Essa função retorna o texto do Twitter
def get_tweet_text(rdd):
    for line in rdd:
        tweet = line.strip()
        translator = str.maketrans({key: None for key in string.punctuation})
        tweet = tweet.translate(translator)
        tweet = tweet.split(' ')
        tweet_lower = []
        for word in tweet:
            tweet_lower.append(word.lower())
    return(classifica_tweet(tweet_lower))

In [38]:
# Cria uma lista vazia para os resultados
resultados = []

In [39]:
# Essa função salva o resultado dos batches de Tweets junto com o timestamp
def output_rdd(rdd):
    global resultados
    pairs = rdd.map(lambda x: (get_tweet_text(x)[1],1))
    counts = pairs.reduceByKey(add)
    output = []
    for count in counts.collect():
        output.append(count)
    result = [time.strftime("%I:%M:%S"), output]
    resultados.append(result)
    print(result)

In [43]:
# A função foreachRDD() aplica uma função a cada RDD to streaming de dados
coord_stream.foreachRDD(lambda t, rdd: output_rdd(rdd))

In [44]:
# Start streaming
ssc.start()
# ssc.awaitTermination()

In [None]:
cont = True
while cont:
    if len(resultados) > 5:
        cont = False