### Uso de Structured streaming con Spark ML.

Se recogen los datos en streaming desde Kafka, y se les aplicará el análisis de sentimiento para etiquetar el sentimiento de los tweets recogidos en streaming, utilizando Spark ML. También se almacenarán los tweets y su predicción en distintos formatos como serían una tabla de MongoDB, ficheros csv y en ElasticSearch para su posterior visualización con Kibana.

In [1]:
# imports y configuraciones necesarias
# Spark Streaming
from pyspark.streaming import StreamingContext  

from pyspark import SparkContext
from pyspark import SparkConf
from pyspark.sql import SparkSession

from pyspark.sql.types import *
import pyspark.sql.functions as F

from pyspark.ml import Pipeline
from pyspark.ml.classification import LogisticRegression
from pyspark.ml.evaluation import MulticlassClassificationEvaluator
from pyspark.ml.feature import Tokenizer, CountVectorizer, IDF
from pyspark.ml.tuning import ParamGridBuilder, TrainValidationSplit

from pyspark.mllib.util import *

from pyspark.ml.linalg import SparseVector, DenseVector

from pymongo import MongoClient

import pandas as pd

import json
import string
import re
import unicodedata

import time

import nltk
from nltk.corpus import stopwords
# cargamos las stopwords para cada idioma
spanish_stopwords = stopwords.words('spanish')
english_stopwords = stopwords.words('english')

# !pip install elasticsearch --si fuera necesario instalarlo
from elasticsearch import Elasticsearch
from elasticsearch import helpers


spark = SparkSession\
        .builder\
        .appName("twitter")\
        .master("spark://MacBook-Pro-de-Jose.local:7077")\
        .config("spark.io.compression.codec", "snappy")\
        .config("spark.streaming.stopGracefullyOnShutdown", True)\
        .config("spark.mongodb.input.uri","mongodb://localhost:27017/tfm_twitter")\
        .config("spark.mongodb.output.uri","mongodb://localhost:27017/tfm_twitter")\
        .getOrCreate()

spark.sparkContext.setLogLevel('ERROR')

#### Creamos las funciones necesarias para los procesos posteriores.

In [2]:
# Función de carga de datos desde los csv generados con anterioridad con registros ya con el sentimiento anotado.
# Estos datos servirán para entrenar el modelo con el que predecir los tweets en streaming.
def carga_datos_csv(csv):
    schema_csv = StructType([ 
        StructField("text", StringType(), True),
        StructField("sentiment", IntegerType(), True)
    ])
    
    # cargamos los csv
    if(csv=='english_full'): fichero = './data/df_result_english.csv'
    elif(csv=='spanish_full'): fichero = './data/df_result_spanish.csv'
    elif(csv=='english_neutro'): fichero = './data/df_result_english_neutral.csv'
    elif(csv=='spanish_neutro'): fichero = './data/df_result_spanish_neutral.csv'
    elif(csv=='english_noNeutro'): fichero = './data/df_result_english_noNeutral.csv'
    elif(csv=='spanish_noNeutro'): fichero = './data/df_result_spanish_noNeutral.csv'
    
    df_csv = sqlContext.\
        read.format("com.databricks.spark.csv").\
        option("header", "true").\
        option("inferschema", "true").\
        option("mode", "DROPMALFORMED").\
        schema(schema_csv).\
        load(fichero).\
        cache()
    
    return df_csv


# Funciones de visualización de DFs para ver las columnas, el número de registros o dimensiones, y el recuento
# de valores del atributo sentimiento.
def visualizar_datos_csv(df):
    print("Columnas del dataframe: ", df.columns)
    print("Numero de registros = %d" % df.count())
    print("\n")
    print(df.limit(10).toPandas())
    print("\n")
    recuento_sentiment = df.select('sentiment').groupBy("sentiment").count().show()
    print("\n")


# Función para eliminar palabras que no queramos analizar. Serán las palabras más comunes de cada lenguaje, que no
# tienen valor para hacer un análisis de sentimiento, como podrían ser artículos o pronombres.
def eliminar_stopwords(texto, palabras_eliminar):
    tok = nltk.tokenize
    palabras = tok.word_tokenize(texto)
        
    palabras_salida = []
        
    for palabra in palabras:
        if palabra not in palabras_eliminar:
            palabras_salida.append(palabra)
        
    salida = ""
    for i in range(len(palabras_salida)):
        if palabras_salida[i] in string.punctuation:
            salida = salida.strip()+palabras_salida[i] + " "
        else:
            salida += palabras_salida[i] + " "

    return salida


# Funciones de limpieza de los tweets, para limpiar los textos de caracteres extraños, números, urls o emoticonos.
def limpieza_tweets_spanish(tokens : list) -> list:
    tweet = [re.sub('  +', ' ', s).strip() for s in tokens]
    tweet = [re.sub(r'http\S+', '', s) for s in tweet]  
    tweet = [re.sub(r'@[\S]+', '', s) for s in tweet]
    tweet = [re.sub(r'#(\S+)', r' \1 ', s) for s in tweet]
    tweet = [re.sub(r'\brt\b', '', s) for s in tweet]
    tweet = [re.sub(r'\.{2,}', ' ', s) for s in tweet]
    tweet = [re.sub(r'\s+', ' ', s) for s in tweet]
    tweet = [re.sub('','',s).lower() for s in tweet]
    # eliminar full_text que es como comienzan los textos que son extendidos
    tweet = [re.sub(r'full_text', '', s) for s in tweet] 
    
    # convertir la repetición de una letra más de 2 veces a 1
    # biennnnn --> bien
    tweet = [re.sub(r'(.)\1+', r'\1\1', s) for s in tweet]
    # remover - & '
    tweet = [re.sub(r'(-|\')', '', s) for s in tweet] 
    # eliminar acentos
    tweet = [''.join((c for c in unicodedata.normalize('NFD',s) if unicodedata.category(c) != 'Mn')) for s in tweet]
        
    # reemplazar emojis
    emoji_pattern = re.compile(u'['u'\U0001F300-\U0001F64F'u'\U0001F680-\U0001F6FF'u'\
                               \u2600-\u26FF\u2700-\u27BF]+', re.UNICODE)
 
    tweet = [emoji_pattern.sub(r' ', s) for s in tweet]
    tweet = [re.sub("[^A-Za-z]+$",'',s) for s in tweet]
    tweet = [re.sub("^[^A-Za-z]+",'',s) for s in tweet]
    tweet = [re.sub("[\$*&!?///\º\'\’\‘\|()%/\"{}@;:+\[\]\–\”\…\“\】\【=]",'',s) for s in tweet]  
    
    # remover stopwords
    tweet = [eliminar_stopwords(s, spanish_stopwords) for s in tweet]

    filtered = filter(None, tweet)
    
    return list(filtered)

# A los datos en inglés le aplicamos sus stopwords correspondientes y no les quitamos los acentos
def limpieza_tweets_english(tokens : list) -> list:
    tweet = [re.sub('  +', ' ', s).strip() for s in tokens]
    tweet = [re.sub(r'http\S+', '', s) for s in tweet]  
    tweet = [re.sub(r'@[\S]+', '', s) for s in tweet]
    tweet = [re.sub(r'#(\S+)', r' \1 ', s) for s in tweet]
    tweet = [re.sub(r'\brt\b', '', s) for s in tweet]
    tweet = [re.sub(r'\.{2,}', ' ', s) for s in tweet]
    tweet = [re.sub(r'\s+', ' ', s) for s in tweet]
    tweet = [re.sub('','',s).lower() for s in tweet]
    # eliminar full_text que es como comienzan los textos que son extendidos
    tweet = [re.sub(r'full_text', '', s) for s in tweet] 
    
    # convertir la repetición de una letra más de 2 veces a 1
    # biennnnn --> bien
    tweet = [re.sub(r'(.)\1+', r'\1\1', s) for s in tweet]
    # remover - & '
    tweet = [re.sub(r'(-|\')', '', s) for s in tweet] 

    # reemplazar emojis
    emoji_pattern = re.compile(u'['u'\U0001F300-\U0001F64F'u'\U0001F680-\U0001F6FF'u'\
                               \u2600-\u26FF\u2700-\u27BF]+', re.UNICODE)
 
    tweet = [emoji_pattern.sub(r' ', s) for s in tweet]
    tweet = [re.sub("[^A-Za-z]+$",'',s) for s in tweet]
    tweet = [re.sub("^[^A-Za-z]+",'',s) for s in tweet]
    tweet = [re.sub("[\$*&!?///\º\'\’\‘\|()%/\"{}@;:+\[\]\–\”\…\“\】\【=]",'',s) for s in tweet]  
    
    # remover stopwords
    tweet = [eliminar_stopwords(s, english_stopwords) for s in tweet]

    filtered = filter(None, tweet)
    
    return list(filtered)


# Función de preprocesado de los datos que tokeniza los textos y llama a las funciones de limpieza anteriores.
# Devolverá los dos dataframes, con datos en inglés y español, con los datos listos para aplicar el modelo.
def preprocesado_dataframe(df1, df2, tipo):
    # primero usamos tokenizer y vamos a partir los tweets por palabras
    if(tipo==0): tokenizer = Tokenizer(inputCol = "text", outputCol = "token")
    elif(tipo==1): tokenizer = Tokenizer(inputCol = "texto", outputCol = "token")

    df_tokens_english = tokenizer.transform(df1)
    df_tokens_spanish = tokenizer.transform(df2)  
    
    # usamos las funciones de limpieza y preprocesado con ambos DFs ya con los textos divididos en tokens
    limpiezaUDF_english = F.udf(limpieza_tweets_english, ArrayType(StringType()))
    limpiezaUDF_spanish = F.udf(limpieza_tweets_spanish, ArrayType(StringType()))

    df_tokens_english = df_tokens_english.withColumn("tokens_clean", limpiezaUDF_english(df_tokens_english["token"]))
    df_tokens_spanish = df_tokens_spanish.withColumn("tokens_clean", limpiezaUDF_spanish(df_tokens_spanish["token"]))

    df_tokens_english = df_tokens_english.drop("token")
    df_tokens_spanish = df_tokens_spanish.drop("token")

    df_tokens_english_clean = df_tokens_english.where(F.size(F.col("tokens_clean")) > 0)
    df_tokens_spanish_clean = df_tokens_spanish.where(F.size(F.col("tokens_clean")) > 0)
    
    return df_tokens_english_clean, df_tokens_spanish_clean


# Función que según el valor del atributo Truncated, se queda con el valor de Text o de Extended_tweet, si el texto
# del tweet es demasiado largo y está extendido.
def udf_cambiar_texto(col1,col2,col3):
    if(col1==True):
        col2=col3
    return col2


# Función udf para calcular fecha y hora y añadirla posteriormente al dataframe.
def add_timestamp():
    timestamp = time.strftime("%Y-%m-%d %H:%M:%S")
    return timestamp


# Función que pasa del tipo DenseVector a un array, que es mejor para tratar con estos datos.
def sparse_to_array(v):
    v = DenseVector(v)
    new_array = list([float(x) for x in v])
    return new_array


# Función udf para según el valor del sentimiento calculado por el modelo, quedarnos con el valor
# del array de probabilidad que corresponde a esa etiqueta de sentimiento, y así ver en cada caso el % con el
# cual el modelo ha etiquetado el sentimiento.
def udf_probability(col1,col2):
    if(col2==0.0):
        col1=col1[0]
    elif(col2==1.0):
        col1=col1[1]
    elif(col2==2.0):
        col1=col1[2]
        
    return col1


# Función que engloba las transformaciones y predicciones a realizar a los datos en streaming, para acabar teniendo
# las columnas que queremos de los datos y el sentimiento calculado.
def generacion_dataframe_streaming(df_1, df_2, model_1, model_2):
    # sumamos las 4 columnas del recuento de replies, favorites, quotes y retweet para tener una sola columna con la
    # suma de todas estas interacciones de un tweet.
    df_english = df_1.withColumn('interacciones', df_1.favorite_count\
                + df_1.quote_count + df_1.reply_count + df_1.retweet_count)
    df_spanish = df_2.withColumn('interacciones', df_2.favorite_count\
                + df_2.quote_count + df_2.reply_count + df_2.retweet_count)

    # aplicamos la función para coger el texto de la columna extended_tweet si es necesario, y sustituir al texto.
    cambiar_texto_udf=F.udf(udf_cambiar_texto)
    df_spanish = df_spanish.select('*').withColumn("texto", cambiar_texto_udf("truncated","text","extended_tweet"))
    df_english = df_english.select('*').withColumn("texto", cambiar_texto_udf("truncated","text","extended_tweet"))
    
    # llamamos a la función que engloba el preprocesado de los datos, con la tokenización y la limpieza de tweets.
    df_tokens_english, df_tokens_spanish = preprocesado_dataframe(df_english,df_spanish,1)
    df_english_clean = df_tokens_english.withColumnRenamed("sentiment", "label")
    df_spanish_clean = df_tokens_spanish.withColumnRenamed("sentiment", "label")

    # usamos el modelo para predecir el sentimiento de los tweets en los distintos DFs por idioma.
    pred_eng = model_1.transform(df_english_clean)
    pred_spa = model_2.transform(df_spanish_clean)

    # usamos la función para añadir la fecha y hora a cada registro
    add_timestamp_udf = F.udf(add_timestamp, StringType())
    # añadimos la columna con el timestamp a los datos
    pred_eng = pred_eng.withColumn("timestamp", add_timestamp_udf())
    pred_spa = pred_spa.withColumn("timestamp", add_timestamp_udf())

    # pasamos a array el DenseVector con la probabilidad calculada por el modelo al predecir el sentimiento.
    sparse_to_array_udf = F.udf(sparse_to_array, ArrayType(FloatType()))
    pred_eng = pred_eng.withColumn('probability_array', sparse_to_array_udf('probability'))
    pred_spa = pred_spa.withColumn('probability_array', sparse_to_array_udf('probability'))
    # del array de probabilidad nos quedamos solo con la probabilidad del sentimiento predecido en cada registro.
    probability_udf=F.udf(udf_probability)
    pred_eng = pred_eng.withColumn('probabilidad', probability_udf('probability_array','prediction'))
    pred_spa = pred_spa.withColumn('probabilidad', probability_udf('probability_array','prediction'))

    # eliminamos las columnas que no queremos sacar con los datos, y posteriormente las ordenamos según se desea.
    columns_to_drop = ['favorite_count', 'quote_count', 'reply_count', 'retweet_count', 'label',\
                       'countfeatures', 'features', 'rawPrediction', 'tokens_clean', 'sentiment']
    pred_spa = pred_spa.drop(*columns_to_drop)
    pred_eng = pred_eng.drop(*columns_to_drop)

    columns = ['id', 'texto', 'interacciones', 'probabilidad', 'prediction', 'timestamp']
    pred_eng = pred_eng[columns] 
    pred_spa = pred_spa[columns]
    
    return pred_eng, pred_spa

#### Carga de los datos guardados en formato .csv para entrenar el modelo a usar con los tweets que vienen en streaming.

In [3]:
# cargamos los distintos csv generados con anterioridad y con el sentimiento ya etiquetado.
df_csv_english_full = carga_datos_csv('english_full')
df_csv_spanish_full = carga_datos_csv('spanish_full')

In [4]:
# cogemos un sample del DF con datos en inglés ya que tiene muchos datos y da problemas de cómputo y tiempos.
df_csv_english_sample = df_csv_english_full.sample(False, 0.1, 45)

visualizar_datos_csv(df_csv_english_sample)

Columnas del dataframe:  ['text', 'sentiment']
Numero de registros = 175818


                                                text  sentiment
0  @VirginAmerica amazing to me that we can't get...          0
1  @VirginAmerica View of downtown Los Angeles, t...          2
2  @VirginAmerica plz help me win my bid upgrade ...          1
3  @VirginAmerica I'm #elevategold for a good rea...          2
4  @VirginAmerica @ladygaga @carrieunderwood Juli...          1
5  @VirginAmerica I’m having trouble adding this ...          0
6  @VirginAmerica you have the absolute best team...          2
7  @VirginAmerica has flight number 276 from SFO ...          1
8  @VirginAmerica Another delayed flight? #liking...          0
9  @VirginAmerica Can you find us a flt out of LA...          1


+---------+-----+
|sentiment|count|
+---------+-----+
|        1| 1372|
|        2|87372|
|        0|87074|
+---------+-----+





In [5]:
visualizar_datos_csv(df_csv_spanish_full)

Columnas del dataframe:  ['text', 'sentiment']
Numero de registros = 46787


                                                text  sentiment
0  @PauladeLasHeras No te libraras de ayudar me/n...          1
1                          @marodriguezb Gracias MAR          2
2  Off pensando en el regalito Sinde, la que se v...          0
3  Conozco a alguien q es adicto al drama! Ja ja ...          2
4  Toca @crackoviadeTV3 . Grabación dl especial N...          2
5  Buen día todos! Lo primero mandar un abrazo gr...          2
6  Desde el escaño. Todo listo para empezar #endi...          2
7  Bdías. EM no se ira de puente. Si vosotros os ...          2
8  Un sistema económico q recorta dinero para pre...          2
9                  #programascambiados caca d ajuste          0


+---------+-----+
|sentiment|count|
+---------+-----+
|        1| 2927|
|        2|25392|
|        0|18468|
+---------+-----+





#### Generamos el modelo que se probó anteriormente y dio los mejores resultados, y lo entrenamos con los datos recogidos de los csv.

In [6]:
# llamamos a la función que engloba el preprocesado de los datos, con la tokenización y la limpieza de tweets.
df_tokens_english_clean, df_tokens_spanish_clean = preprocesado_dataframe(df_csv_english_sample,df_csv_spanish_full,0)

In [7]:
df_tokens_english_clean.limit(5).toPandas()

Unnamed: 0,text,sentiment,tokens_clean
0,@VirginAmerica amazing to me that we can't get...,0,"[amazing , cant , get , cold , air , vents , v..."
1,"@VirginAmerica View of downtown Los Angeles, t...",2,"[view , downtown , los , angeles , hollywood ,..."
2,@VirginAmerica plz help me win my bid upgrade ...,1,"[plz , help , win , bid , upgrade , flight , l..."
3,@VirginAmerica I'm #elevategold for a good rea...,2,"[im , elevategold , good , reason , rock ]"
4,@VirginAmerica @ladygaga @carrieunderwood Juli...,1,"[julie , andrews , first , lady , gaga , wowd ..."


In [8]:
df_tokens_spanish_clean.limit(5).toPandas()

Unnamed: 0,text,sentiment,tokens_clean
0,@PauladeLasHeras No te libraras de ayudar me/n...,1,"[libraras , ayudar , menos , besos , gracias ]"
1,@marodriguezb Gracias MAR,2,"[gracias , mar ]"
2,"Off pensando en el regalito Sinde, la que se v...",0,"[off , pensando , regalito , sinde , va , sgae..."
3,Conozco a alguien q es adicto al drama! Ja ja ...,2,"[conozco , alguien , q , adicto , drama , ja ,..."
4,Toca @crackoviadeTV3 . Grabación dl especial N...,2,"[toca , grabacion , dl , especial , navideno m..."


In [9]:
# a partir del dataframe anterior, vamos a eliminar la columna texto ya que nos interesa quedarnos con los textos en
# limpio y tras pasar por el preprocesado para el modelo.
df_tokens_english_clean = df_tokens_english_clean.drop("text")
df_tokens_spanish_clean = df_tokens_spanish_clean.drop("text")

In [10]:
# cambiamos el nombre de la columna sentiment por label a los dataframes para los modelos.
df_english_clean = df_tokens_english_clean.withColumnRenamed("sentiment", "label").cache()
df_spanish_clean = df_tokens_spanish_clean.withColumnRenamed("sentiment", "label").cache()

In [11]:
# generación de conjunto de training y test de los dataframes.
train_english_clean, test_english_clean = df_english_clean.randomSplit([0.9, 0.1], seed=12345)
train_spanish_clean, test_spanish_clean = df_spanish_clean.randomSplit([0.9, 0.1], seed=12345)

In [12]:
# generamos el pipeline del modelo con sus etapas y parámetros, además de utilizar TrainValidationSplit,
# sería la configuración del modelo con el que se obtuvieron los mejores resultados en el notebook de pruebas de ML.
countVec = CountVectorizer(inputCol="tokens_clean", outputCol="countfeatures")
idf = IDF(inputCol=countVec.getOutputCol(), outputCol="features", minDocFreq=2)
lr = LogisticRegression(maxIter=10, regParam=0.001, elasticNetParam=0)

pipeline = Pipeline(stages=[countVec, idf, lr])

paramGrid = ParamGridBuilder() \
    .addGrid(lr.regParam, [0.1, 0.01, 0.005]) \
    .addGrid(lr.elasticNetParam, [0.1, 0.2])\
    .addGrid(lr.maxIter, [5, 20])\
    .addGrid(lr.fitIntercept, [False, True])\
    .addGrid(countVec.minTF, [1.0, 3.0, 5.0])\
    .build()

tvs = TrainValidationSplit(estimator=pipeline,
                           estimatorParamMaps=paramGrid,
                           evaluator=MulticlassClassificationEvaluator(),
                           trainRatio=0.9)

In [13]:
# entrenamos los modelos
model_eng = tvs.fit(train_english_clean)

In [14]:
model_spa = tvs.fit(train_spanish_clean)

#### Conectamos con el streaming para recoger los datos de Kafka y tenerlos en un dataframe, al que poder aplicar los modelos para calcular el sentimiento de cada tweet en streaming.

In [15]:
# definimos el schema de los datos que queremos guardar de los datos de entrada al sistema.
schema = StructType([
    StructField("id", LongType(), True),
    StructField("text", StringType(), True),
    StructField("truncated", BooleanType(), True),
    StructField("extended_tweet", StringType(), True),
    StructField("favorite_count", IntegerType(), True),
    StructField("quote_count", IntegerType(), True),
    StructField("reply_count", IntegerType(), True),
    StructField("retweet_count", IntegerType(), True),
    StructField("sentiment", IntegerType(), True)
])

In [16]:
# se obtienen desde Kafka los datos del topic en español mediante el schema.
df_spanish = spark\
.readStream\
.format("kafka")\
.option("kafka.bootstrap.servers", "localhost:9092")\
.option("startingOffsets", "latest")\
.option("subscribe", "TopicSpanish")\
.option("failOnDataLoss", "false") \
.load()

df_spanish = df_spanish.selectExpr("CAST(value AS STRING)")

df_spanish = df_spanish.select(F.from_json(F.col("value"), schema).alias("data")).select("data.*")           
                                                                                          
df_spanish.createOrReplaceTempView("spanish_schema")

In [17]:
display(df_spanish)

DataFrame[id: bigint, text: string, truncated: boolean, extended_tweet: string, favorite_count: int, quote_count: int, reply_count: int, retweet_count: int, sentiment: int]

In [18]:
# se obtienen desde Kafka los datos del topic en inglés mediante el schema.
df_english = spark\
.readStream\
.format("kafka")\
.option("kafka.bootstrap.servers", "localhost:9093")\
.option("startingOffsets", "latest")\
.option("subscribe", "TopicEnglish")\
.option("failOnDataLoss", "false") \
.load()

df_english = df_english.selectExpr("CAST(value AS STRING)")

df_english = df_english.select(F.from_json(F.col("value"), schema).alias("data")).select("data.*")

df_english.createOrReplaceTempView("english_schema")

In [19]:
display(df_english)

DataFrame[id: bigint, text: string, truncated: boolean, extended_tweet: string, favorite_count: int, quote_count: int, reply_count: int, retweet_count: int, sentiment: int]

In [20]:
# chequeamos los datos en streaming
print(" ")
print("Is the stream ready?")
print(df_spanish.isStreaming, df_english.isStreaming)

 
Is the stream ready?
True True


In [21]:
# llamamos a la función que engloba todo el preprocesado y análisis a aplicar a los datos, y nos devuelve los datos
# recogidos en streaming con el sentimiento ya calculado usando los modelos entrenados anteriormente.
pred_eng, pred_spa = generacion_dataframe_streaming(df_english, df_spanish, model_eng, model_spa)

Nos quedamos en los dataframes con las siguientes columnas:
- ID: id único del tweet
- Texto: el texto del tweet
- Interacciones: sería la suma de respuestas, citas, retweets y favoritos del tweet.
- Probability: probabilidad con la que se obtiene el valor del sentimiento de cada tweet.
- Prediction: etiqueta de sentimiento elegida por el modelo para el tweet.
- Timestamp: añadimos fecha y hora del procesamiento.

In [22]:
display(pred_eng)
display(pred_spa)

DataFrame[id: bigint, texto: string, interacciones: int, probabilidad: string, prediction: double, timestamp: string]

DataFrame[id: bigint, texto: string, interacciones: int, probabilidad: string, prediction: double, timestamp: string]

In [23]:
# por si acaso hubiera duplicados los eliminamos.
pred_eng = pred_eng.dropDuplicates(subset=['id'])
pred_spa = pred_spa.dropDuplicates(subset=['id'])

In [24]:
# creamos una función a la que pasamos los datos, y la configuración para lanzar el streaming, y poder ver por
# consola el procesamiento de datos en streaming. Procesamos cada segundo, y usando time.sleep paramos luego la 
# query de forma programada y cómoda.
def mostrar_datos(df, outputMode, formato, tiempo, query_name):
    query = df.writeStream.outputMode(outputMode).queryName(query_name).format(formato)\
            .option("truncate", "False").trigger(processingTime='1 second').start()
    time.sleep(tiempo)
    query.stop()

In [25]:
# probamos con los datos en inglés y modo append, donde en cada ejecución sólo saldrán los nuevos registros.
mostrar_datos(pred_eng, 'append', 'console', 60, 'query_eng_append')

In [26]:
# probamos con los datos en español y modo append, donde en cada ejecución sólo saldrán los nuevos registros.
mostrar_datos(pred_spa, 'append', 'console', 60, 'query_spa_append')

In [27]:
# probamos con los datos en inglés y modo update, donde en cada ejecución sólo saldrán los nuevos registros y 
# los que hayan sido actualizados.
mostrar_datos(pred_eng, 'update', 'console', 60, 'query_eng_update')

In [28]:
# probamos con los datos en español y modo update, donde en cada ejecución sólo saldrán los nuevos registros y
# los que hayan sido actualizados.
mostrar_datos(pred_spa, 'update', 'console', 60, 'query_spa_update')

In [29]:
# para poder insertar en streaming los datos en MongoDB debemos hacer uso de funciones foreachBatch.
def foreach_batch_function(df, epoch_id):
    df.write.format("com.mongodb.spark.sql.DefaultSource")\
        .mode("append")\
        .option("database", "tfm_twitter")\
        .option("collection", "tweets_streaming_english").save()
    
def foreach_batch_function_spa(df, epoch_id):
    df.write.format("com.mongodb.spark.sql.DefaultSource")\
        .mode("append")\
        .option("database", "tfm_twitter")\
        .option("collection", "tweets_streaming_spanish").save()

In [30]:
# insertamos los datos en inglés
mongo_eng = pred_eng.writeStream\
        .outputMode("append")\
        .foreachBatch(foreach_batch_function)\
        .start()

time.sleep(60)
mongo_eng.stop()

In [31]:
# insertamos los datos en español
mongo_spa = pred_spa.writeStream\
        .outputMode("append")\
        .foreachBatch(foreach_batch_function_spa)\
        .start()

time.sleep(60)
mongo_spa.stop()

In [32]:
# función para generar el índice
def generateIndexName(name):
    return(name + time.strftime("%Y%m%d-%H%M%S"))

In [33]:
# se insertan los datos en inglés en streaming en elasticsearch
elastic_eng = pred_eng.writeStream\
    .outputMode("append")\
    .format("es")\
    .option("es.nodes","localhost")\
    .option("es.port","9200")\
    .option("checkpointLocation","/tmp/")\
    .option("es.resource", generateIndexName('tweets_sentiment_eng_') + "/tweets_sentiment_eng_") \
    .option("es.codec", "best_compression") \
    .start()

time.sleep(60)
elastic_eng.stop()

In [34]:
# se insertan los datos en español en streaming en elasticsearch
elastic_spa = pred_spa.writeStream\
    .outputMode("append")\
    .format("es")\
    .option("es.nodes","localhost")\
    .option("es.port","9200")\
    .option("checkpointLocation","/tmp/")\
    .option("es.resource", generateIndexName('tweets_sentiment_spa_') + "/tweets_sentiment_spa_") \
    .option("es.codec", "best_compression") \
    .start()

time.sleep(60)
elastic_spa.stop()

#### Se prueba a hacer las consultas en memoria, obteniendo un dataframe por idioma e insertándolo posteriormente en Mongo y ElasticSearch.

In [92]:
# vamos a obtener los datos en memoria
query_spanish_memory = pred_spa.writeStream.outputMode("append").queryName("spanish_memory").format("memory")\
.option("truncate", "False").start()

query_english_memory = pred_eng.writeStream.outputMode("append").queryName("english_memory").format("memory")\
.option("truncate", "False").start()

print(query_spanish_memory.status)
print(query_english_memory.status)

{'message': 'Getting offsets from KafkaV2[Subscribe[TopicSpanish]]', 'isDataAvailable': False, 'isTriggerActive': True}
{'message': 'Getting offsets from KafkaV2[Subscribe[TopicEnglish]]', 'isDataAvailable': False, 'isTriggerActive': True}


In [93]:
result_spanish = spark.table("spanish_memory")
result_english = spark.table("english_memory")

In [94]:
# visualizamos los datos en español
result_spanish.limit(20).toPandas()

Unnamed: 0,id,texto,interacciones,probabilidad,prediction,timestamp
0,1167084394420592642,RT @LegalAppMJ: El Estado obliga a las entidad...,0,0.8828351497650146,0.0,2019-08-29 16:39:58
1,1167084394395459584,RT @Zuanna_Sol: Buenos días 😈😈 https://t.co/tr...,0,0.8228078484535217,2.0,2019-08-29 16:39:58
2,1167084394395394051,RT @petrogustavo: Por ahora los violentos gana...,0,0.9783394932746888,2.0,2019-08-29 16:39:58
3,1167084394399580165,Fuck el desayuno merezco el almuerzo,0,0.4916864037513733,1.0,2019-08-29 16:39:58
4,1167084394407956483,@Poetadelciel0 Amén 🙏,0,0.7538945078849792,2.0,2019-08-29 16:39:58
5,1167084394387050497,"RT @anacastelan_: Donde no te buscan, no haces...",0,0.5875048637390137,0.0,2019-08-29 16:39:58


In [95]:
# visualizamos los datos en inglés
result_english.limit(20).toPandas()

Unnamed: 0,id,texto,interacciones,probabilidad,prediction,timestamp
0,1167084394382790657,@BTS_twt OMG MY LOVE😭😭,0,0.6848946213722229,2.0,2019-08-29 16:39:58
1,1167084394382856193,@EmmaSpoonley Yes! I know I'm as appalled at m...,0,0.5481786727905273,2.0,2019-08-29 16:39:58
2,1167084394399633410,RT @sawxcy: If you ain’t trynna b like this I ...,0,0.7860847115516663,0.0,2019-08-29 16:39:58
3,1167084394391265286,RT @nielo_2wit: You cannot always be right... ...,0,0.6511359214782715,2.0,2019-08-29 16:39:58
4,1167084394387054592,"RT @AccidentalNut: I got y’all , rt Bc these h...",0,0.5655750632286072,2.0,2019-08-29 16:39:58


In [96]:
# end streaming
query_spanish_memory.stop()
query_english_memory.stop()

#### Almacenar datos en CSV, MongoDB y ElasticSearch.

Probamos a almacenar los datos etiquetados en diferentes formatos: archivos csv, MongoDB, y en ElasticSearch para su posterior visualización con Kibana.

In [97]:
# sacamos una cadena con fecha y hora para añadirla al nombre de los csv que se generan a continuación
timestr = time.strftime("%Y%m%d-%H%M%S")
print(timestr)

20190829-164021


In [98]:
# pasamos los dataframes con el sentimiento calculado a pandas
df_eng_pandas = result_english.toPandas()
df_spa_pandas = result_spanish.toPandas()

In [99]:
# se guardan los dataframes con el sentimiento calculado en csv
df_eng_pandas.to_csv('./data/predict_streaming_english_'+timestr+'.csv', index=False)
df_spa_pandas.to_csv('./data/predict_streaming_spanish_'+timestr+'.csv', index=False)

In [100]:
# guardar datos en MongoDB
conexion = 'mongodb://localhost:27017'
client = MongoClient(conexion)

# accedemos a la base de datos
db = client.tfm_twitter
# insertamos los dataframes en la tabla correspondiente
db.tweets_streaming_english.insert_many(df_eng_pandas.to_dict("records"))
db.tweets_streaming_spanish.insert_many(df_spa_pandas.to_dict("records"))

<pymongo.results.InsertManyResult at 0x1a22f76388>

In [101]:
# guardar datos en ElasticSearch
es = Elasticsearch('http://localhost:9200/')

# añadimos fecha y hora a los índices
indice_spa = "tweets_sentiment_spa_"+timestr
indice_eng = "tweets_sentiment_eng_"+timestr

# primero comprobamos si ya existen los índices y se borran
if es.indices.exists(indice_spa):
    !curl -X DELETE localhost:9200/{indice_spa}
if es.indices.exists(indice_eng):
    !curl -X DELETE localhost:9200/{indice_eng}

# generación de índices
!curl -X PUT localhost:9200/{indice_spa}
!curl -X PUT localhost:9200/{indice_eng}

TYPE = "record"

def rec_to_actions(df, lang):
    for record in df.to_dict(orient="records"):
        if(lang==0): yield ('{ "index" : { "_index" : "%s", "_type" : "%s" }}'% (indice_spa, TYPE))
        elif(lang==1): yield ('{ "index" : { "_index" : "%s", "_type" : "%s" }}'% (indice_eng, TYPE))
        yield (json.dumps(record, default=int))

if not es.indices.exists(indice_spa):
    raise RuntimeError('index does not exists, use `curl -X PUT "localhost:9200/%s"` and try again'%indice_spa)
if not es.indices.exists(indice_eng):
    raise RuntimeError('index does not exists, use `curl -X PUT "localhost:9200/%s"` and try again'%indice_eng)

r_spa = es.bulk(rec_to_actions(df_spa_pandas, 0))
r_eng = es.bulk(rec_to_actions(df_eng_pandas, 1)) 

print(not r_spa["errors"], not r_eng["errors"])

{"acknowledged":true,"shards_acknowledged":true,"index":"tweets_sentiment_spa_20190829-164021"}{"acknowledged":true,"shards_acknowledged":true,"index":"tweets_sentiment_eng_20190829-164021"}True True


In [102]:
# se guardan los modelos utilizados para la predicción
model_eng_best = model_eng.bestModel
model_spa_best = model_spa.bestModel
model_eng_best.write().overwrite().save('./data/models/model_eng_'+timestr)
model_spa_best.write().overwrite().save('./data/models/model_spa_'+timestr)