In [1]:
# Baseado no vídeo do youtube: https://www.youtube.com/watch?v=nh-NMicbU40
!python --version

Python 3.9.2


In [2]:
!pip install --user -U nltk

Collecting nltk
  Using cached nltk-3.6.5-py3-none-any.whl (1.5 MB)
  Using cached nltk-3.6.3-py3-none-any.whl (1.5 MB)


In [3]:
from pyspark.streaming import StreamingContext
from pyspark.sql import SparkSession
from pyspark.conf import SparkConf
from pyspark.sql.types import StructType, StructField, StringType, IntegerType, FloatType
from pyspark.sql.functions import col
from operator import add
import time
import string
import ast
import json

In [4]:
import nltk
from nltk.classify import NaiveBayesClassifier
from nltk.sentiment import SentimentAnalyzer
from nltk.corpus import subjectivity
from nltk.corpus import stopwords
from nltk.sentiment.util import *
from nltk import tokenize

In [6]:
# Lendo o arquivo csv
schema = StructType([
        StructField("type", StringType(), True),
        StructField("id", StringType(), True),
        StructField("subreddit.id", StringType(), True),
        StructField("subreddit.name", StringType(), True),
        StructField("subreddit.nsfw", StringType(), True),
        StructField("created_utc", IntegerType(), True),
        StructField("permalink", StringType(), True),
        StructField("body", StringType(), True),
        StructField("sentiment", FloatType(), True),
        StructField("score", IntegerType(), True),
    ])

dataframe = spark\
        .read\
        .schema(schema) \
        .option("header", True) \
        .option("inferSchema", True) \
        .option("delimiter", ",") \
        .csv("the-reddit-covid-dataset-comments.csv")

In [7]:
df_body = dataframe.select("body").where(col("body").isNotNull())
df_comment = df_body.withColumnRenamed("body", "comment")
df_comment.take(1)

                                                                                

[Row(comment='When you scheduled your booster with CVS does it just give you the option of Vaccines: COVID-19 (Vaccine brand) or does it specifically say booster')]

In [8]:
def map_row(row):
    comment = row.comment.strip() # remove espaços do início e do fim do texto 
    translator = str.maketrans({key: None for key in string.punctuation})
    comment = comment.translate(translator)
    comment = comment.split(" ")
    comment_lower = []
    for word in comment:
        comment_lower.append(word.lower())
    return (comment_lower, '')

In [8]:
# nltk.download('punkt')

[nltk_data] Downloading package punkt to /root/nltk_data...
[nltk_data]   Package punkt is already up-to-date!


True

In [9]:
dataset_training = df_comment.rdd.map(map_row)

In [10]:
dataset_training.take(2)

                                                                                

[(['when',
   'you',
   'scheduled',
   'your',
   'booster',
   'with',
   'cvs',
   'does',
   'it',
   'just',
   'give',
   'you',
   'the',
   'option',
   'of',
   'vaccines',
   'covid19',
   'vaccine',
   'brand',
   'or',
   'does',
   'it',
   'specifically',
   'say',
   'booster'],
  ''),
 (['didnt',
   'stop',
   'prices',
   'there',
   'though',
   'new',
   'zealand',
   'and',
   'canada',
   'grew',
   'at',
   'about',
   'the',
   'same',
   'rate',
   'through',
   'covid'],
  '')]

In [11]:
sentiment_analyzer = SentimentAnalyzer() # SentimentIntensityAnalyzer

In [12]:
# Baixar nltk
# nltk.download()
nltk.download("stopwords")

[nltk_data] Downloading package stopwords to /root/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


True

In [13]:
# Obtém a lista de stopwords em Inglês
# stopwords_all = []
# for word in stopwords.words("english"):
#     stopwords_all.append(word)

In [13]:
# Obtém 10.000 comentários do reddit e retorna todas as palavras que não são stopwords
dataset_training_sample = dataset_training.take(10000)

                                                                                

In [14]:
all_words_neg = sentiment_analyzer.all_words([mark_negation(doc) for doc in dataset_training_sample])

In [15]:
# Cria um unigram (n-grama) e extrai as features
unigram_feats = sentiment_analyzer.unigram_word_feats(all_words_neg, min_freq = 4)
sentiment_analyzer.add_feat_extractor(extract_unigram_feats, unigrams = unigram_feats)
training_set = sentiment_analyzer.apply_features(dataset_training_sample)

In [16]:
type(training_set)

nltk.collections.LazyMap

In [17]:
print(training_set)



In [18]:
# Testa o modelo em algumas sentenças
test_sentence1 = [(['this', 'program', 'is', 'bad'], '')]
test_sentence2 = [(['tough', 'day', 'at', 'work', 'today'], '')]
test_sentence3 = [(['good', 'wonderful', 'amazing', 'awesom'], '')]
test_set = sentiment_analyzer.apply_features(test_sentence1)
test_set2 = sentiment_analyzer.apply_features(test_sentence2)
test_set3 = sentiment_analyzer.apply_features(test_sentence3)

In [20]:
# Treinar o modelo
trainer = NaiveBayesClassifier.train
classifier = sentiment_analyzer.train(trainer, training_set)

Training classifier


In [22]:
print(sentiment_analyzer.evaluate(test_set3).items())

Evaluating NaiveBayesClassifier results...
dict_items([('Accuracy', 1.0), ('Precision []', 1.0), ('Recall []', 1.0), ('F-measure []', 1.0)])


In [23]:
# Criando Streaming Context
spark_context = spark.sparkContext
INTERVALO = 5 # Intervalo de atulização do stream
stream_context = StreamingContext(spark_context, INTERVALO)

In [24]:
# Configurando o streaming
rdd = stream_context.sparkContext.parallelize([0])
stream = stream_context.queueStream([], default=rdd)

In [25]:
type(stream)

pyspark.streaming.dstream.DStream

In [26]:
reddit_client_id = "6TgyXf-Ie3G0TGHVvGf7Lg"
reddit_client_secret = "XLFAALr7j-NHWop-QQNBWLc6Tpus-A"

In [27]:
!pip install praw



In [28]:
import praw

In [29]:
def rfunc(t, rdd):
    return rdd.flatMap(lambda r: reddit_stream_data())

def reddit_stream_data():
    reddit = praw.Reddit(client_id=reddit_client_id, client_secret=reddit_client_secret, user_agent="Marcus Maciel")
    
    for comment in reddit.subreddit("COVID19").stream.comments():
        contents = [comment.body]
        yield str(contents)

In [30]:
x = reddit_stream_data()
print(next(x))

["Garbage click bait study? I don't see the physiologic basis for NAC anyway."]


In [31]:
stream = stream.transform(rfunc)

In [32]:
coord_stream = stream.map(lambda line: ast.literal_eval(line))

In [33]:
# Classifica os comentários aplicando as features do modelo criado anteriormente
def classifier_comment(comment):
    sentence = [(comment, '')]
    test_set = sentiment_analyzer.apply_features(sentence)
    print(comment, classifier.classify(test_set[0][0]))
    return (comment, classifier.classify(test_set[0][0]))

In [35]:
x = classifier_comment(["bad", "covid"])
x

['bad', 'covid'] 


(['bad', 'covid'], '')

In [34]:
# Retonar o texto do comentário
def get_comment(rdd):
    for line in rdd:
        comment = line.strip() # remove espaços do início e do fim do texto 
        translator = str.maketrans({key: None for key in string.punctuation})
        comment = comment.translate(translator)
        comment = comment.split(" ")
        comment_lower = []
        for word in comment:
            comment_lower.append(word.lower())
        return (classifier_comment(comment_lower))

In [35]:
# Cria um lista para os resultados
results = []

In [36]:
# Salva o resultado dos batches dos comentários junto com o timestamp
def output_rdd(rdd):
    global results
    pairs = rdd.map(lambda x:(get_comment(x), 1))
    counts = pairs.reduceByKey(add)
    output = []
    for count in counts.collect():
        output.append(count)
    result = [time.strftime("%I:%M:%S"), output]
    results.append(result)

In [37]:
coord_stream.foreachRDD(lambda t, rdd: output_rdd(rdd))

In [38]:
stream_context.start()

In [39]:
#stream_context.stop()

In [44]:
print(results)

[['08:38:02', []]]


