In [11]:
# libs do Spark
from pyspark.sql import SparkSession
from pyspark.sql.functions import explode
from pyspark.sql.functions import split
from pyspark.sql.functions import col, udf
from pyspark.sql.types import StringType, FloatType

# libs para análise de texto
from textblob import TextBlob
from googletrans import Translator
# decodifica caracteres não textuais, emojis
from unidecode import unidecode

In [12]:
# cria seção para estabelecer conexão
spark = SparkSession \
        .builder \
        .appName('StructuredNetworkTwitter') \
        .getOrCreate()

In [14]:
# cria dataframe que será responsável por ler cada uma das linhas recebidas através do localhost na porta 9995
# define a fonte (source) de dados
twitters = spark \
    .readStream \
    .format('socket') \
    .option('host', 'localhost') \
    .option('port', 9995) \
    .load()

In [15]:
# função que realiza tradução para o inglês
def translate_udf(col):
    trans_obj = Translator().translate(col)
    return trans_obj.tex

#função para realizar análise de sentimento
def sentiment_udf(col):
    sentiment_text = TextBlob(col)
    return sentiment_text.polarity

In [16]:
# definição das funções udf - User-Defined-Function
# define função de decode a ser utilizada no daframe
unicode_udf_string = udf(lambda z: unidecode(z), StringType())
group_by_sentiment = udf(lambda x: 'negativo' if x < -0.1 else 'positivo' if x > 0.1 else 'neutro', StringType())
# define a função de tradução
translate_udf_string = udf(translate_udf, StringType())
# define a função de análise de sentimento
sentiment_udf_float = udf(sentiment_udf, FloatType())

In [19]:
teste = 'Eu 💓 o meu cachorro, ele é o meu melhor amigo'
decode = unidecode(teste)
print(decode)
decodeEN = Translator().translate(decode)
print(decodeEN.text)
a = str(decodeEN)
sentiment = TextBlob(a)
print(sentiment.polarity)

Eu  o meu cachorro, ele e o meu melhor amigo
I my dog, he and my best friend
1.0


In [24]:
# aplica as funções udf para a seleção de colunas
# decodifica
twitters_unicode = twitters.select('value', unicode_udf_string(twitters.value).alias('unicoded'))
#traduz
twitters_uni_trans = twitters_unicode.select('value', 'unicoded',
                                              translate_udf_string(col('unicoded')).alias('twitter_EN'))
# análise de sentimento
twitters_uni_trans_sent = twitters_uni_trans.select('value', 'unicoded', 'twitter_EN',
                                                   sentiment_udf_float(col('twitter_EN')).alias('analise'))
t_sent_label = twitters_uni_trans_sent.select('value', 'unicoded', 'twitter_EN', 'analise',
                                            group_by_sentiment(col('analise')).alias('classificacao'))

In [28]:
t_sent_count = t_sent_label.groupBy('classificacao').count()

In [30]:
# define a consult (query) e como deve ser realizada a saida (sink) para o stream criado
query = t_sent_count \
    .writeStream \
    .outputMode("update") \
    .format("console") \
    .start()

# aguarta até que a streaming query termine
query.awaitTermination()

AnalysisException: 'Multiple streaming aggregations are not supported with streaming DataFrames/Datasets;;\nAggregate [classificacao#52], [classificacao#52, count(1) AS count#70L]\n+- Aggregate [classificacao#52], [classificacao#52, count(1) AS count#64L]\n   +- Project [value#0, unicoded#37, twitter_EN#41, analise#46, <lambda>(analise#46) AS classificacao#52]\n      +- Project [value#0, unicoded#37, twitter_EN#41, sentiment_udf(twitter_EN#41) AS analise#46]\n         +- Project [value#0, unicoded#37, translate_udf(unicoded#37) AS twitter_EN#41]\n            +- Project [value#0, <lambda>(value#0) AS unicoded#37]\n               +- StreamingRelationV2 org.apache.spark.sql.execution.streaming.sources.TextSocketSourceProvider@7ebd136c, socket, Map(host -> localhost, port -> 9995), [value#0]\n'