### Atenção:
Utilize Java JDK 1.8 ou 11 e Apache Spark 2.4.2

**Caso receba mensagem de erro "name 'sc' is not defined", interrompa o pyspark e apague o diretório metastore_db no mesmo diretório onde está este Jupyter notebook**

Acesse http://localhost:4040 sempre que quiser acompanhar a execução dos jobs

## Spark Streaming - Twitter

In [1]:
# Pode ser necessário instalar esses pacotes
# !pip install requests_oauthlib
# !pip install twython
# !pip install nltk

In [2]:
# Módulos usados
from pyspark.streaming import StreamingContext
from pyspark import SparkContext
from requests_oauthlib import OAuth1Session
from operator import add
import requests_oauthlib
from time import gmtime, strftime
import requests
import time
import string
import ast
import json

In [3]:
# Pacote NLTK
import nltk
from nltk.classify import NaiveBayesClassifier
from nltk.sentiment import SentimentAnalyzer
from nltk.corpus import subjectivity
from nltk.corpus import stopwords
from nltk.sentiment.util import *



In [4]:
# Frequência de update
INTERVALO_BATCH = 5

In [5]:
sc = SparkContext(appName='Twitter-Politics')

Setting default log level to "WARN".
To adjust logging level use sc.setLogLevel(newLevel). For SparkR, use setLogLevel(newLevel).


22/08/20 21:33:59 WARN NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable


In [6]:
# Criando o StreamingContext
ssc = StreamingContext(sc, INTERVALO_BATCH)

## Treinando o Classificador de Análise de Sentimento

Uma parte essencial da criação de um algoritmo de análise de sentimento (ou qualquer algoritmo de mineração de dados) é ter um conjunto de dados abrangente ou "Corpus" para o aprendizado, bem como um conjunto de dados de teste para garantir que a precisão do seu algoritmo atende aos padrões que você espera. Isso também permitirá que você ajuste o seu algoritmo a fim de deduzir melhores (ou mais precisas) características de linguagem natural que você poderia extrair do texto e que vão contribuir para a classificação de sentimento, em vez de usar uma abordagem genérica. Tomaremos como base o dataset de treino fornecido pela Universidade de Michigan, para competições do Kaggle --> https://inclass.kaggle.com/c/si650winter11.

Esse dataset contém 1,578,627 tweets classificados e cada linha é marcada como: 

### 1 para o sentimento positivo 
### 0 para o sentimento negativo 

In [7]:
# Lendo o arquivo texto e criando um RDD em memória com Spark
arquivo = sc.textFile("dataset_analise_sentimento.csv")

In [8]:
arquivo.count()

                                                                                

1578628

In [9]:
# Removendo o cabeçalho
header = arquivo.take(1)[0];header

'ItemID,Sentiment,SentimentSource,SentimentText'

In [10]:
dataset = arquivo.filter(lambda line: line != header)

In [11]:
dataset.count()



1578627

In [12]:
type(dataset)

pyspark.rdd.PipelinedRDD

In [13]:
# Essa função separa as colunas em cada linha, cria uma tupla e remove a pontuação.
def get_row(line):
  row = line.split(',')
  sentimento = row[1]
  tweet = row[3].strip()
  translator = str.maketrans({key: None for key in string.punctuation})
  tweet = tweet.translate(translator)
  tweet = tweet.split(' ')
  tweet_lower = [ word.lower() for word in tweet ]
  return (tweet_lower, sentimento)

In [14]:
# Aplica a função a cada linha do dataset
dataset_treino = dataset.map(lambda line: get_row(line))

In [15]:
dataset_treino.take(5)

[(['is', 'so', 'sad', 'for', 'my', 'apl', 'friend'], '0'),
 (['i', 'missed', 'the', 'new', 'moon', 'trailer'], '0'),
 (['omg', 'its', 'already', '730', 'o'], '1'),
 (['',
   'omgaga',
   'im',
   'sooo',
   '',
   'im',
   'gunna',
   'cry',
   'ive',
   'been',
   'at',
   'this',
   'dentist',
   'since',
   '11',
   'i',
   'was',
   'suposed',
   '2',
   'just',
   'get',
   'a',
   'crown',
   'put',
   'on',
   '30mins'],
  '0'),
 (['i',
   'think',
   'mi',
   'bf',
   'is',
   'cheating',
   'on',
   'me',
   '',
   '',
   '',
   '',
   '',
   '',
   'tt'],
  '0')]

In [16]:
dataset_treino.count()

                                                                                

1578627

In [17]:
# Cria um objeto SentimentAnalyzer 
sentiment_analyzer = SentimentAnalyzer()

In [18]:
# Certifique-se de ter espaço em disco - Aproximadamente 5GB
# https://raw.githubusercontent.com/nltk/nltk_data/gh-pages/index.xml
# nltk.download()
nltk.download("stopwords")

[nltk_data] Downloading package stopwords to
[nltk_data]     /Users/kayorenato/nltk_data...
[nltk_data]   Package stopwords is already up-to-date!


True

In [19]:
# Obtém a lista de stopwords em Inglês 
stopwords_all = []
for word in stopwords.words('english'):
  stopwords_all.append(word)
  stopwords_all.append(word + '_NEG')

In [20]:
# Obtém 10.000 tweets do dataset de treino e retorna todas as palavras que não são stopwords
dataset_treino_amostra = dataset_treino.take(10000)

In [21]:
all_words_neg = sentiment_analyzer.all_words([mark_negation(doc) for doc in dataset_treino_amostra])
all_words_neg_nostops = [x for x in all_words_neg if x not in stopwords_all]

In [22]:
# Cria um unigram (n-grama) e extrai as features
unigram_feats = sentiment_analyzer.unigram_word_feats(all_words_neg_nostops, top_n = 200)
sentiment_analyzer.add_feat_extractor(extract_unigram_feats, unigrams = unigram_feats)
training_set = sentiment_analyzer.apply_features(dataset_treino_amostra)

In [23]:
type(training_set)

nltk.collections.LazyMap

In [24]:
print(training_set)

[({'contains()': False, 'contains(im)': False, 'contains(_NEG)': False, 'contains(followfriday)': False, 'contains(amp)': False, 'contains(dont)': False, 'contains(day)': False, 'contains(love)': False, 'contains(like)': False, 'contains(cant)': False, 'contains(good)': False, 'contains(get)': False, 'contains(go)': False, 'contains(today)': False, 'contains(got)': False, 'contains(want)': False, 'contains(time)': False, 'contains(going)': False, 'contains(back)': False, 'contains(one)': False, 'contains(sad)': True, 'contains(really)': False, 'contains(miss)': False, 'contains(u)': False, 'contains(work)': False, 'contains(new)': False, 'contains(2)': False, 'contains(last)': False, 'contains(still)': False, 'contains(twitter)': False, 'contains(night)': False, 'contains(great)': False, 'contains(lol)': False, 'contains(follow)': False, 'contains(need)': False, 'contains(see)': False, 'contains(much)': False, 'contains(myweakness)': False, 'contains(get_NEG)': False, 'contains(didnt)'

In [25]:
# Treinar o modelo
trainer = NaiveBayesClassifier.train
classifier = sentiment_analyzer.train(trainer, training_set)

Training classifier


In [26]:
# Testa o classificador em algumas sentenças
test_sentence1 = [(['this', 'program', 'is', 'bad'], '')]
test_sentence2 = [(['tough', 'day', 'new', 'work', 'today'], '')]
test_sentence3 = [(['good', 'wonderful', 'amazing', 'awesome'], '')]
test_set = sentiment_analyzer.apply_features(test_sentence1)
test_set2 = sentiment_analyzer.apply_features(test_sentence2)
test_set3 = sentiment_analyzer.apply_features(test_sentence3)

In [27]:
# Autenticação do Twitter 
consumer_key = "xxxx"
consumer_secret = "xxxx"
access_token = "xxxx"
access_token_secret = "xxxx"

In [28]:
# Especifica a URL termo de busca
search_term = 'Trump'
sample_url = 'https://stream.twitter.com/1.1/statuses/sample.json'
filter_url = 'https://stream.twitter.com/1.1/statuses/filter.json?track='+search_term

In [29]:
# Criando o objeto de atutenticação para o Twitter
auth = requests_oauthlib.OAuth1(consumer_key, consumer_secret, access_token, access_token_secret)

In [30]:
# Configurando o Stream
rdd = ssc.sparkContext.parallelize([0])
stream = ssc.queueStream([], default = rdd)

In [31]:
type(stream)

pyspark.streaming.dstream.DStream

In [32]:
# Total de tweets por update
NUM_TWEETS = 500  

In [33]:
# Essa função conecta ao Twitter e retorna um número específico de Tweets (NUM_TWEETS)
def tfunc(t, rdd):
  return rdd.flatMap(lambda x: stream_twitter_data())

def stream_twitter_data():
  response = requests.get(filter_url, auth = auth, stream = True)
  print(filter_url, response)
  count = 0
  for line in response.iter_lines():
    try:
      if count > NUM_TWEETS:
        break
      post = json.loads(line.decode('utf-8'))
      contents = [post['text']]
      count += 1
      yield str(contents)
    except:
      result = False

In [34]:
stream = stream.transform(tfunc)

In [35]:
coord_stream = stream.map(lambda line: ast.literal_eval(line))

In [36]:
# Essa função classifica os tweets, aplicando as features do modelo criado anteriormente
def classifica_tweet(tweet):
  sentence = [(tweet, '')]
  test_set = sentiment_analyzer.apply_features(sentence)
  print(tweet, classifier.classify(test_set[0][0]))
  return(tweet, classifier.classify(test_set[0][0]))

In [37]:
# Essa função retorna o texto do Twitter
def get_tweet_text(rdd):
  for line in rdd:
    tweet = line.strip()
    translator = str.maketrans({key: None for key in string.punctuation})
    tweet = tweet.translate(translator)
    tweet = tweet.split(' ')
    tweet_lower = []
    for word in tweet:
      tweet_lower.append(word.lower())
    return(classifica_tweet(tweet_lower))

In [38]:
# Cria uma lista vazia para os resultados
resultados = []

In [39]:
# Essa função salva o resultado dos batches de Tweets junto com o timestamp
def output_rdd(rdd):
  global resultados
  pairs = rdd.map(lambda x: (get_tweet_text(x)[1],1))
  counts = pairs.reduceByKey(add)
  output = []
  for count in counts.collect():
    output.append(count)
  result = [time.strftime("%I:%M:%S"), output]
  resultados.append(result)
  print(result)

In [40]:
# A função foreachRDD() aplica uma função a cada RDD to streaming de dados
coord_stream.foreachRDD(lambda t, rdd: output_rdd(rdd))

In [41]:
# Start streaming
ssc.start()
# ssc.awaitTermination()

22/08/20 21:34:45 ERROR Executor: Exception in task 3.0 in stage 6.0 (TID 21)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/homebrew/anaconda3/envs/python10/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 668, in main
    func, profiler, deserializer, serializer = read_command(pickleSer, infile)
  File "/opt/homebrew/anaconda3/envs/python10/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 85, in read_command
    command = serializer._read_with_length(file)
  File "/opt/homebrew/anaconda3/envs/python10/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 173, in _read_with_length
    return self.loads(obj)
  File "/opt/homebrew/anaconda3/envs/python10/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 471, in loads
    return cloudpickle.loads(obj, encoding=encoding)
  File "/opt/homebrew/anaco

22/08/20 21:34:50 ERROR Executor: Exception in task 7.0 in stage 8.0 (TID 35)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/homebrew/anaconda3/envs/python10/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 668, in main
    func, profiler, deserializer, serializer = read_command(pickleSer, infile)
  File "/opt/homebrew/anaconda3/envs/python10/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 85, in read_command
    command = serializer._read_with_length(file)
  File "/opt/homebrew/anaconda3/envs/python10/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 173, in _read_with_length
    return self.loads(obj)
  File "/opt/homebrew/anaconda3/envs/python10/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 471, in loads
    return cloudpickle.loads(obj, encoding=encoding)
  File "/opt/homebrew/anaco

22/08/20 21:34:55 ERROR PythonRunner: Python worker exited unexpectedly (crashed)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/homebrew/anaconda3/envs/python10/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 666, in main
    eval_type = read_int(infile)
  File "/opt/homebrew/anaconda3/envs/python10/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 595, in read_int
    raise EOFError
EOFError

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:559)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:765)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:747)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:512)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Itera

22/08/20 21:35:00 ERROR Executor: Exception in task 2.0 in stage 12.0 (TID 50)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/homebrew/anaconda3/envs/python10/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 668, in main
    func, profiler, deserializer, serializer = read_command(pickleSer, infile)
  File "/opt/homebrew/anaconda3/envs/python10/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 85, in read_command
    command = serializer._read_with_length(file)
  File "/opt/homebrew/anaconda3/envs/python10/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 173, in _read_with_length
    return self.loads(obj)
  File "/opt/homebrew/anaconda3/envs/python10/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 471, in loads
    return cloudpickle.loads(obj, encoding=encoding)
  File "/opt/homebrew/anac

22/08/20 21:35:05 ERROR Executor: Exception in task 5.0 in stage 14.0 (TID 63)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/homebrew/anaconda3/envs/python10/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 668, in main
    func, profiler, deserializer, serializer = read_command(pickleSer, infile)
  File "/opt/homebrew/anaconda3/envs/python10/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 85, in read_command
    command = serializer._read_with_length(file)
  File "/opt/homebrew/anaconda3/envs/python10/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 173, in _read_with_length
    return self.loads(obj)
  File "/opt/homebrew/anaconda3/envs/python10/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 471, in loads
    return cloudpickle.loads(obj, encoding=encoding)
  File "/opt/homebrew/anac

22/08/20 21:35:10 ERROR PythonRunner: Python worker exited unexpectedly (crashed)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/homebrew/anaconda3/envs/python10/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 666, in main
    eval_type = read_int(infile)
  File "/opt/homebrew/anaconda3/envs/python10/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 595, in read_int
    raise EOFError
EOFError

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:559)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:765)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:747)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:512)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Itera

22/08/20 21:35:15 ERROR PythonRunner: Python worker exited unexpectedly (crashed)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/homebrew/anaconda3/envs/python10/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 666, in main
    eval_type = read_int(infile)
  File "/opt/homebrew/anaconda3/envs/python10/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 595, in read_int
    raise EOFError
EOFError

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:559)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:765)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:747)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:512)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Itera

22/08/20 21:35:20 ERROR PythonRunner: Python worker exited unexpectedly (crashed)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/homebrew/anaconda3/envs/python10/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 666, in main
    eval_type = read_int(infile)
  File "/opt/homebrew/anaconda3/envs/python10/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 595, in read_int
    raise EOFError
EOFError

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:559)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:765)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:747)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:512)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Itera

22/08/20 21:35:25 ERROR Executor: Exception in task 9.0 in stage 22.0 (TID 107)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/homebrew/anaconda3/envs/python10/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 668, in main
    func, profiler, deserializer, serializer = read_command(pickleSer, infile)
  File "/opt/homebrew/anaconda3/envs/python10/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 85, in read_command
    command = serializer._read_with_length(file)
  File "/opt/homebrew/anaconda3/envs/python10/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 173, in _read_with_length
    return self.loads(obj)
  File "/opt/homebrew/anaconda3/envs/python10/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 471, in loads
    return cloudpickle.loads(obj, encoding=encoding)
  File "/opt/homebrew/ana

22/08/20 21:35:30 ERROR Executor: Exception in task 6.0 in stage 24.0 (TID 114)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/homebrew/anaconda3/envs/python10/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 668, in main
    func, profiler, deserializer, serializer = read_command(pickleSer, infile)
  File "/opt/homebrew/anaconda3/envs/python10/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 85, in read_command
    command = serializer._read_with_length(file)
  File "/opt/homebrew/anaconda3/envs/python10/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 173, in _read_with_length
    return self.loads(obj)
  File "/opt/homebrew/anaconda3/envs/python10/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 471, in loads
    return cloudpickle.loads(obj, encoding=encoding)
  File "/opt/homebrew/ana

22/08/20 21:35:35 ERROR Executor: Exception in task 5.0 in stage 26.0 (TID 123)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/homebrew/anaconda3/envs/python10/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 668, in main
    func, profiler, deserializer, serializer = read_command(pickleSer, infile)
  File "/opt/homebrew/anaconda3/envs/python10/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 85, in read_command
    command = serializer._read_with_length(file)
  File "/opt/homebrew/anaconda3/envs/python10/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 173, in _read_with_length
    return self.loads(obj)
  File "/opt/homebrew/anaconda3/envs/python10/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 471, in loads
    return cloudpickle.loads(obj, encoding=encoding)
  File "/opt/homebrew/ana

22/08/20 21:35:40 ERROR Executor: Exception in task 4.0 in stage 28.0 (TID 132)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/homebrew/anaconda3/envs/python10/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 668, in main
    func, profiler, deserializer, serializer = read_command(pickleSer, infile)
  File "/opt/homebrew/anaconda3/envs/python10/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 85, in read_command
    command = serializer._read_with_length(file)
  File "/opt/homebrew/anaconda3/envs/python10/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 173, in _read_with_length
    return self.loads(obj)
  File "/opt/homebrew/anaconda3/envs/python10/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 471, in loads
    return cloudpickle.loads(obj, encoding=encoding)
  File "/opt/homebrew/ana

22/08/20 21:35:45 ERROR Executor: Exception in task 5.0 in stage 30.0 (TID 143)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/homebrew/anaconda3/envs/python10/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 668, in main
    func, profiler, deserializer, serializer = read_command(pickleSer, infile)
  File "/opt/homebrew/anaconda3/envs/python10/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 85, in read_command
    command = serializer._read_with_length(file)
  File "/opt/homebrew/anaconda3/envs/python10/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 173, in _read_with_length
    return self.loads(obj)
  File "/opt/homebrew/anaconda3/envs/python10/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 471, in loads
    return cloudpickle.loads(obj, encoding=encoding)
  File "/opt/homebrew/ana

22/08/20 21:35:50 ERROR PythonRunner: Python worker exited unexpectedly (crashed)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/homebrew/anaconda3/envs/python10/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 666, in main
    eval_type = read_int(infile)
  File "/opt/homebrew/anaconda3/envs/python10/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 595, in read_int
    raise EOFError
EOFError

	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.handlePythonException(PythonRunner.scala:559)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:765)
	at org.apache.spark.api.python.PythonRunner$$anon$3.read(PythonRunner.scala:747)
	at org.apache.spark.api.python.BasePythonRunner$ReaderIterator.hasNext(PythonRunner.scala:512)
	at org.apache.spark.InterruptibleIterator.hasNext(InterruptibleIterator.scala:37)
	at scala.collection.Itera

22/08/20 21:35:55 ERROR Executor: Exception in task 8.0 in stage 34.0 (TID 166)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/homebrew/anaconda3/envs/python10/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 668, in main
    func, profiler, deserializer, serializer = read_command(pickleSer, infile)
  File "/opt/homebrew/anaconda3/envs/python10/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 85, in read_command
    command = serializer._read_with_length(file)
  File "/opt/homebrew/anaconda3/envs/python10/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 173, in _read_with_length
    return self.loads(obj)
  File "/opt/homebrew/anaconda3/envs/python10/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 471, in loads
    return cloudpickle.loads(obj, encoding=encoding)
  File "/opt/homebrew/ana

22/08/20 21:36:00 ERROR Executor: Exception in task 4.0 in stage 36.0 (TID 172)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/homebrew/anaconda3/envs/python10/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 668, in main
    func, profiler, deserializer, serializer = read_command(pickleSer, infile)
  File "/opt/homebrew/anaconda3/envs/python10/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 85, in read_command
    command = serializer._read_with_length(file)
  File "/opt/homebrew/anaconda3/envs/python10/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 173, in _read_with_length
    return self.loads(obj)
  File "/opt/homebrew/anaconda3/envs/python10/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 471, in loads
    return cloudpickle.loads(obj, encoding=encoding)
  File "/opt/homebrew/ana

22/08/20 21:36:05 ERROR Executor: Exception in task 2.0 in stage 38.0 (TID 180)
org.apache.spark.api.python.PythonException: Traceback (most recent call last):
  File "/opt/homebrew/anaconda3/envs/python10/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 668, in main
    func, profiler, deserializer, serializer = read_command(pickleSer, infile)
  File "/opt/homebrew/anaconda3/envs/python10/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/worker.py", line 85, in read_command
    command = serializer._read_with_length(file)
  File "/opt/homebrew/anaconda3/envs/python10/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 173, in _read_with_length
    return self.loads(obj)
  File "/opt/homebrew/anaconda3/envs/python10/lib/python3.10/site-packages/pyspark/python/lib/pyspark.zip/pyspark/serializers.py", line 471, in loads
    return cloudpickle.loads(obj, encoding=encoding)
  File "/opt/homebrew/ana

In [None]:
cont = True
while cont:
  if len(resultados) > 5:
    cont = False

In [None]:
# Grava os resultados
rdd_save = './r'+time.strftime("%I%M%S")
resultados_rdd = sc.parallelize(resultados)
resultados_rdd.saveAsTextFile(rdd_save)

In [None]:
# Visualiza os resultados
resultados_rdd.collect()

In [None]:
# Finaliza o streaming
ssc.stop()