# <font color='blue'>Data Science Academy - Formação Cientista de Dados</font>
# <font color='blue'>Autor: Evandro Eulálio Cleto</font>

## <font color='blue'>Data Início: 07/06/2023</font>
## <font color='blue'>Data Finalização: 13/06/2023</font>


![title](imagens/Projeto_imagem.png)

## <font color='blue'>Objetivo deste projeto:</font>
### <font color='blue'>Através da análise de Tweets sobre o ChatGPT foi construído um processo de análise que permite identificar o sentimento que predomina, especialmente no Twitter, sobre o ChatGPT.</font>

Resumo do Projeto: Criar um projeto de previsão de sentimentos sobre ChatGPT atráves de Tweets on-line usando Machine Learning.
Os sentimentos serão previstos como positivo, negativo ou neutro.

Acesse http://localhost:4040 para acompanhar a execução dos jobs

## Spark Streaming - Twitter

In [None]:
# Instalação de pacotes necessários para o projeto
#!pip install requests_oauthlib
#!pip install twython
#!pip install nltk
#!pip install emoji

In [None]:
# https://pypi.org/project/findspark/
!pip install -q findspark

In [1]:
# Importa o findspark e inicializa
import findspark
findspark.init()

In [2]:
# Módulos usados
from pyspark.streaming import StreamingContext
#from pyspark.streaming.twitter import TwitterUtils
from pyspark import SparkContext
from pyspark.sql import SparkSession
from requests_oauthlib import OAuth1Session
from operator import add
import requests_oauthlib
from time import gmtime, strftime
import pandas as pd
import re
import requests
import time
import string
import ast
import json
import warnings
warnings.filterwarnings("ignore", category=FutureWarning)

In [3]:
# Pacote NLTK
import nltk
from nltk.classify import NaiveBayesClassifier
from nltk.sentiment import SentimentAnalyzer
from nltk.corpus import subjectivity
from nltk.corpus import stopwords
from nltk.sentiment.util import *

In [4]:
# Baixa Stopwords do pacote NLTK
# https://raw.githubusercontent.com/nltk/nltk_data/gh-pages/index.xml
nltk.download("stopwords")

[nltk_data] Downloading package stopwords to
[nltk_data]     /home/evandro/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


True

In [None]:
# Importa o arquivo CSV como DataFrame do pandas
# Esse é um dataset com 219294 registros de tweets do Chat GPT rotulado com sentimentos positivo, negativo que 
# será utilizado para treinamento do NaiveBayesClassifier.
# O dataset foi obtido em:  https://www.kaggle.com/datasets/charunisa/chatgpt-sentiment-analysis
df = pd.read_csv("dados/ChatGPT_sentiment_orig.csv",sep=",")

In [None]:
df.head(10)

In [None]:
# Verificando o tipo do objeto
type(df)

In [None]:
# Verificando o shape dos dados
df.shape

In [None]:
# Remove a coluna 'code' pois não tem relevância para o projeto
df = df.drop('code', axis=1)

In [None]:
df.head(10)

In [None]:
# Altera posição da coluna 'labels' para a posição 0
cols = df.columns.tolist()
cols = ['labels'] + cols[:cols.index('labels')] + cols[cols.index('labels')+1:]
df = df[cols]

In [None]:
df.head(10)

In [None]:
# Remove vírgulas, exceto as do final da linha, da coluna 'tweets' evitar problemas na função que remove pontuação
df['tweets'] = df['tweets'].apply(lambda x: re.sub(r'(?<!\n),', '', x))

In [None]:
df.head(10)

In [None]:
# Mapear as classes para valores numéricos
label_mapping = {'bad': 0, 'good': 1, 'neutral': 2}
df['labels'] = df['labels'].map(label_mapping)

In [None]:
df.head(10)

In [None]:
# Remover emojis e caracteres especiais da coluna 'tweets'
df['tweets'] = df['tweets'].apply(lambda x: re.sub(r'[^\w\s,]', '', x))

### Fazer função para retirar o conteúdo  HTTP da coluna  tweets

In [None]:
df.head(10)

In [None]:
type(df)

### Tratamento do dataset para filtro dos tweets no idíoma inglês

In [None]:
# Obtém as stopwords em todos os idiomas
dicionario_stopwords = {lang: set(nltk.corpus.stopwords.words(lang)) for lang in nltk.corpus.stopwords.fileids()}

dicionario_stopwords

In [None]:
# Função para detectar o idioma predominante com base nas stopswords
def descobre_idioma(text):
    
    #Aplica tokenização considerando pontuação
    palavras = set(nltk.wordpunct_tokenize(text.lower()))
    
    # Conta o total de palavras tokenizadas considerando o dicionario de stopwords
    lang = max(((lang, len(palavras & stopwords)) for lang, stopwords in dicionario_stopwords.items()), key = lambda x: x[1])[0]
    
    # Verifica se o idioma é Inglês
    if lang == 'english':
        return True
    else:
        return False    

In [None]:
# Filtra somente os comentários em Inglês
df_ingles = df[df['tweets'].apply(descobre_idioma)]

In [None]:
df_ingles.head(10)

In [None]:
# Verificando o shape dos dados
df_ingles.shape

In [None]:
# Salva o dataframe tratado em csv
df_ingles.to_csv('dados/ChatGPT_sentiment_limpo.csv', index=False)

In [5]:
# Frequência de update
INTERVALO_BATCH = 5

In [6]:
# Cria o Spark Context
spark = SparkSession.builder.appName("TwitterSentimentAnalysis").getOrCreate()
sc = spark.sparkContext
sc.setLogLevel("ERROR")

23/08/27 11:59:57 WARN Utils: Your hostname, DataScience resolves to a loopback address: 127.0.1.1; using 10.0.2.15 instead (on interface enp0s3)
23/08/27 11:59:57 WARN Utils: Set SPARK_LOCAL_IP if you need to bind to another address
Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).
23/08/27 11:59:58 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [7]:
## Criando o StreamingContext
ssc = StreamingContext(sc, INTERVALO_BATCH)

## Treinando o Classificador de Análise de Sentimento

O dataset limpo e filtrado pelo idioma Inglês contém 44785 tweets classificados e cada linha é marcada como: 

### 0 para o sentimento negativo 
### 1 para o sentimento positivo 
### 2 para o sentimento neutro 

In [8]:
## Lendo o arquivo texto e criando um RDD em memória com Spark
arquivo = sc.textFile("dados/ChatGPT_sentiment_limpo.csv")

In [9]:
arquivo.collect()

[Stage 0:>                                                          (0 + 2) / 2]                                                                                

['labels,tweets',
 '0,GOD DAMN IT OpenAI STOP ANNOUNCING THINGS I AM TOO BUSY httpstcoF7Xd511FAf',
 '2,OpenAI announced ChatGPT a model optimized for dialogue httpstcof5FWklJE88',
 '2,Just in nChatGPT  research early stage GPT3 optimised for chat and remembers the conversation from OpenAI nhttpstco3RKUbLy8uO',
 '0,Google is donennCompare the quality of these responses ChatGPT httpstcoVGO7usvlIB',
 '0,Asked ChatGPT to write a Telugu song about cereal and it mixed milk with chicken masala  httpstcoTpEp8wa6Lk httpstcooHM9z7IS39',
 '1,Heres ChatGPT from OpenAI and my quick walkthrough nn Walkthrough httpstco1o93VT2afG nnAccess for free at httpstcoVooEL34xHx nnThats a brilliant idea to turn InstructGPT for Conversations httpstcogzCGZ6RUMQ',
 '1,I chatted with OpenAIs newly released tool ChatGPT about Web3 its advantages and challenges for Web3 and the futurennChatGPT is a model optimized for dialoguennRead full httpstco3sQTDVnFFrnnLets dive in  httpstcooayypVqb2D',
 '0,ChatGPT is out now ht

In [10]:
##Removendo o cabeçalho
header = arquivo.take(1)[0]
dataset = arquivo.filter(lambda line: line!=header)

[Stage 1:>                                                          (0 + 1) / 1]                                                                                

In [11]:
dataset.collect()

['0,GOD DAMN IT OpenAI STOP ANNOUNCING THINGS I AM TOO BUSY httpstcoF7Xd511FAf',
 '2,OpenAI announced ChatGPT a model optimized for dialogue httpstcof5FWklJE88',
 '2,Just in nChatGPT  research early stage GPT3 optimised for chat and remembers the conversation from OpenAI nhttpstco3RKUbLy8uO',
 '0,Google is donennCompare the quality of these responses ChatGPT httpstcoVGO7usvlIB',
 '0,Asked ChatGPT to write a Telugu song about cereal and it mixed milk with chicken masala  httpstcoTpEp8wa6Lk httpstcooHM9z7IS39',
 '1,Heres ChatGPT from OpenAI and my quick walkthrough nn Walkthrough httpstco1o93VT2afG nnAccess for free at httpstcoVooEL34xHx nnThats a brilliant idea to turn InstructGPT for Conversations httpstcogzCGZ6RUMQ',
 '1,I chatted with OpenAIs newly released tool ChatGPT about Web3 its advantages and challenges for Web3 and the futurennChatGPT is a model optimized for dialoguennRead full httpstco3sQTDVnFFrnnLets dive in  httpstcooayypVqb2D',
 '0,ChatGPT is out now httpstcobjYugyPsp0',

In [None]:
type(dataset)

In [12]:
## Essa função separa as colunas em cada linha, cria uma tupla e remove a pontuação
def get_row(line):
    row = line.split(',')
    sentimento = row[0]
    tweet = row[1].strip()
    translator = str.maketrans({key: None for key in string.punctuation})
    tweet = tweet.translate(translator)
    tweet = tweet.split(' ')
    tweet_lower = []
    for word in tweet:
        tweet_lower.append(word.lower())
    return(tweet_lower,sentimento)

In [13]:
#Aplica a função a cada linha do dataset
dataset_treino = dataset.map(lambda line: get_row(line))

In [14]:
#Cria um objeto SentimentAnalyser
sentiment_analyser = SentimentAnalyzer()

In [15]:
# Obtem a lista de stopwords
stopwords_all = []
for word in stopwords.words('english'):
    stopwords_all.append(word)
    stopwords_all.append(word + '_NEG')

In [29]:
#Obtem 31.350(70%) tweets do dataset de treino e retorna todas as palavras que não são Stpwords
dataset_treino_amostra = dataset_treino.take(4200)

In [30]:
dataset_treino_amostra

[(['god',
   'damn',
   'it',
   'openai',
   'stop',
   'announcing',
   'things',
   'i',
   'am',
   'too',
   'busy',
   'httpstcof7xd511faf'],
  '0'),
 (['openai',
   'announced',
   'chatgpt',
   'a',
   'model',
   'optimized',
   'for',
   'dialogue',
   'httpstcof5fwklje88'],
  '2'),
 (['just',
   'in',
   'nchatgpt',
   '',
   'research',
   'early',
   'stage',
   'gpt3',
   'optimised',
   'for',
   'chat',
   'and',
   'remembers',
   'the',
   'conversation',
   'from',
   'openai',
   'nhttpstco3rkubly8uo'],
  '2'),
 (['google',
   'is',
   'donenncompare',
   'the',
   'quality',
   'of',
   'these',
   'responses',
   'chatgpt',
   'httpstcovgo7usvlib'],
  '0'),
 (['asked',
   'chatgpt',
   'to',
   'write',
   'a',
   'telugu',
   'song',
   'about',
   'cereal',
   'and',
   'it',
   'mixed',
   'milk',
   'with',
   'chicken',
   'masala',
   '',
   'httpstcotpep8wa6lk',
   'httpstcoohm9z7is39'],
  '0'),
 (['heres',
   'chatgpt',
   'from',
   'openai',
   'and',
  

In [None]:
all_words_neg = sentiment_analyser.all_words([mark_negation(doc) for doc in dataset_treino_amostra])
all_words_neg_nostops = [x for x in all_words_neg if x not in stopwords_all]

In [None]:
#Cria um unigram(n-grama) e extrai as features
unigram_feats = sentiment_analyser.unigram_word_feats(all_words_neg_nostops, top_n=200)
sentiment_analyser.add_feat_extractor(extract_unigram_feats, unigrams = unigram_feats)
training_set = sentiment_analyser.apply_features(dataset_treino_amostra)

In [None]:
type(training_set)

In [None]:
print(training_set)

In [None]:
# Treinar o modelo
trainer = NaiveBayesClassifier.train
classifier = sentiment_analyser.train(trainer, training_set)

In [None]:
# Testa o classificador em algumas sentenças
test_sentence1 = [(['model', 'is', 'people', 'bad'], '')]
test_sentence2 = [(['learning', 'day', 'bit', 'work', 'today'], '')]
test_sentence3 = [(['good', 'wonderful', 'results', 'awesome'], '')]
test_set = sentiment_analyser.apply_features(test_sentence1)
test_set2 = sentiment_analyser.apply_features(test_sentence2)
test_set3 = sentiment_analyser.apply_features(test_sentence3)

In [None]:
test_set

In [None]:
test_set2

In [None]:
test_set3

In [None]:
#Autenticação do Twitter
consumer_key = 'HZlZ9oKuUd9Pjy26EAPqW7P4a'
consumer_secret = 'JOcP5J0PmI7vbszwfw7ILJPtADA270l1UiuAOXeMJ5QLEJuu8n'
access_token = '1251925649952059392-8dKgUbCc0m9udPOlotSPzNC2UfJceJ'
access_token_secret ='2rhYbbD5OOlDRhzowz1OZza9LXPj6Gq0rpjPuOPLCMXbY'
bearer_token = "AAAAAAAAAAAAAAAAAAAAAOCeTwEAAAAAoL4M%2FzLMl%2FYAk3yCFrsc%2BOniGIM%3Dd8TzQTt3X4A1tyYmI2aCWEMRg4R3mYbarXPtiJBaA72xR7V2ev"

In [None]:
# Configurar a autenticação do Twitter
auth_header = {
    "Authorization": "Bearer " + bearer_token,
    "Content-Type": "application/json"
}

In [None]:
# Especifica a URL termo de busca
search_term = 'chatgpt'
sample_url ='https://stream.twitter.com/1.1/statuses/sample.json'
#filter_url = 'https://stream.twitter.com/1.1/statuses/filter.json?track='+search_term
#filter_url = 'https://api.twitter.com/2/tweets/search/stream?tweet.fields=text'+search_term
filter_url = "https://api.twitter.com/2/tweets/search/stream"
tweet_fields = "tweet.fields=text"

In [None]:
query_params = {
    "expansions": "author_id",
    "tweet.fields": tweet_fields,
    "user.fields": "username",
    "track": search_term
}

In [None]:
#Criando o objeto de autenticação para o Twitter
auth = requests_oauthlib.OAuth1(consumer_key, consumer_secret, access_token, access_token_secret)

In [None]:
auth

In [None]:
auth.

In [None]:
auth = OAuth1Session('HZlZ9oKuUd9Pjy26EAPqW7P4a',
                            client_secret='JOcP5J0PmI7vbszwfw7ILJPtADA270l1UiuAOXeMJ5QLEJuu8n',
                            resource_owner_key='1251925649952059392-8dKgUbCc0m9udPOlotSPzNC2UfJceJ',
                            resource_owner_secret='2rhYbbD5OOlDRhzowz1OZza9LXPj6Gq0rpjPuOPLCMXbY')


In [None]:
url = 'https://api.twitter.com/1/account/settings.json'

In [None]:
r = auth.get(url)

In [None]:
r

In [None]:
#Configurando o Stream
rdd = ssc.sparkContext.parallelize([0])
stream = ssc.queueStream([], default=rdd)

In [None]:
#Total de Tweets por update
NUM_TWEETS = 500

In [None]:
type(stream)

In [None]:
# Essa função conecta ao Twitter e retorna um número específico de Tweets (NUM_TWEETS)
def tfunc(t, rdd):
  return rdd.flatMap(lambda x: stream_twitter_data())

def stream_twitter_data():
   #response = requests.get(filter_url, auth = auth, stream = True)
  response = requests.get(filter_url, auth = auth, headers=auth_header, stream=True, params=query_params)
  print(filter_url, response)
  count = 0
  for line in response.iter_lines():
    try:
      if count > NUM_TWEETS:
        break
      post = json.loads(line.decode('utf-8'))
      contents = [post['text']]
      count += 1
      yield str(contents)
    except:
      result = False

In [None]:
stream = stream.transform(tfunc)

In [None]:
stream

In [None]:
coord_stream = stream.map(lambda line: ast.literal_eval(line))

In [None]:
# Essa função classifica os tweets, aplicando as features do modelo criado anteriormente
def classifica_tweet(tweet):
  sentence = [(tweet, '')]
  test_set = sentiment_analyzer.apply_features(sentence)
  print(tweet, classifier.classify(test_set[0][0]))
  return(tweet, classifier.classify(test_set[0][0]))

In [None]:
# Essa função retorna o texto do Twitter
def get_tweet_text(rdd):
  for line in rdd:
    tweet = line.strip()
    translator = str.maketrans({key: None for key in string.punctuation})
    tweet = tweet.translate(translator)
    tweet = tweet.split(' ')
    tweet_lower = []
    for word in tweet:
      tweet_lower.append(word.lower())
    return(classifica_tweet(tweet_lower))

In [None]:
# Cria uma lista vazia para os resultados
resultados = []

In [None]:
# Essa função salva o resultado dos batches de Tweets junto com o timestamp
def output_rdd(rdd):
  global resultados
  pairs = rdd.map(lambda x: (get_tweet_text(x)[1],1))
  counts = pairs.reduceByKey(add)
  output = []
  for count in counts.collect():
    output.append(count)
  result = [time.strftime("%I:%M:%S"), output]
  resultados.append(result)
  print(result)

In [None]:
# A função foreachRDD() aplica uma função a cada RDD to streaming de dados
coord_stream.foreachRDD(lambda t, rdd: output_rdd(rdd))

In [None]:
# Start streaming
ssc.start()
# ssc.awaitTermination()

In [None]:
cont = True
while cont:
  if len(resultados) > 5:
    cont = False

In [None]:
# Grava os resultados
rdd_save = '/dados/r'+time.strftime("%I%M%S")
resultados_rdd = sc.parallelize(resultados)
resultados_rdd.saveAsTextFile(rdd_save)

In [None]:
# Visualiza os resultados
resultados_rdd.collect()

In [None]:
# Finaliza o streaming
ssc.stop()