## Jorge Pablo Ávila Gómez

# Ejercicio 2 (hashtags más utilizados): (2 puntos)

Desarrollar un notebook de Jupyter, denominado “hashtags.ipynb”, en el que se utilice como fuente de datos Kafka, y en concreto el topic kafkaTwitter. La duración del batch será de 5 segundos. Se procesarán los tweets que lleguen para extraer  los hashtags que contengan (tener en cuenta que todos los hashtags comienzan por el carácter ‘#’). Se irán mostrando, 
cada vez que se procese el batch (5 segundos) los diez hashtags más utilizados desde el inicio del programa hasta ese momento y el número total de apariciones de cada uno, ordenados de mayor a menor frecuencia.

In [1]:
import findspark

# Se indica la ruta de spark:
findspark.init("C:\\Users\\JorgeAvila\\Documents\\spark-2.4.7-bin-hadoop2.7")

import os

os.environ[
    "PYSPARK_SUBMIT_ARGS"
] = "--packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.4.7 pyspark-shell"

In [2]:
# Se importan los diferentes paquetes necesarios:
import pyspark
import pyspark.streaming
from pyspark import SparkConf, SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

### Preparación del entorno Spark Streaming

In [3]:
sc = SparkContext("local[*]")
# Se inicializa SparkContext con la opción "local[*]" para que use todos los núcleos del equipo
ssc = StreamingContext(sc, batchDuration=5)
# Se crea el StreamingContext indicando que la duración del batch es 5s como pide el enunciado.
ssc.checkpoint("checkpoint")
# El uso de checkpoints es necesario cuando se van a utilizar la función updateStateByKey, para ir almacenando los estados anteriores.

### Conexión con kafka e introducción de los datos

In [4]:
# Para crear el stream de datos utilizamos la siguiente función:
tweetsDS = KafkaUtils.createDirectStream(
    ssc,  # StreamingContext con el que conectar kafka
    topics=["kafkaTwitter"],  # El topic con el que queremos conectar.
    kafkaParams={
        "bootstrap.servers": "192.168.1.100:9092, 192.168.1.100:9093"
    },  # Las direcciones de los brokers.
)

Se va a usar el transformador 'updateStateByKey' para mantener una cuenta de cuantas veces ha aparecido cada tweet. Para ello necesitamos especificar la siguiente función, que mantiene la suma para cada tweet.

In [5]:
# Función para actualizar las cuentas totales de los hashtags
def updateFunction(newValues, runningCount):
    return sum(newValues) + (runningCount or 0)

El funcionamiento básico propuesto es el dividir el tweet en palabras, filtrar las palabras para quedarnos solo con los hashtags. Y finalmente, actualizar la cuenta de hashtags totales que tenemos, ordenarlos por frecuencia e imprimir los 10 primeros.

In [6]:
# Extraemos el texto del tweet que se encuentra en la posición 1 de una tupla.
tweets = tweetsDS.map(lambda tweet: tweet[1])
# Separamos cada tweet en las palabras que lo forman, dividiendo por los espacios en blanco.
words = tweets.flatMap(lambda line: line.strip().split(" "))
# Usamos una expresión regular para filtrar los hashtags, y los ponemos en forma de tupla (#hashtag,1)
import re

p = re.compile("\#[a-zA-Z0-9]+")
hashtags = words.filter(lambda word: p.match(word) != None).map(lambda x: (x, 1))
# Utilizamos el transformador 'updateStateByKey' con la función 'updateFunction', para mantener la cuenta de cuantas veces
# ha salido cada tweet.
total = hashtags.updateStateByKey(updateFunction)
# Ordenamos todas las entradas por el número de veces que ha salido, en orden descendente. Imprimimos las 10 primeras.
total.transform(lambda rdd: rdd.sortBy(lambda x: x[1], ascending=False)).pprint(10)

### Ejecución del programa

In [7]:
ssc.start()

-------------------------------------------
Time: 2021-01-10 10:06:20
-------------------------------------------
('#peoresserorian', 1)
('#PresidentaCospedal', 1)
('#GALAXYNoteEdge', 1)
('#FelizJueves', 1)
('#UAM', 1)
('#videojuegos', 1)
('#xbox', 1)
('#nds', 1)
('#L6Nratopalma', 1)
('#PedroSanchezARV', 1)
...

-------------------------------------------
Time: 2021-01-10 10:06:30
-------------------------------------------
('#UAM', 5)
('#ForoCambio11J', 5)
('#OSX', 3)
('#Ciencia', 3)
('#Apple', 3)
('#RutaDelCambio', 3)
('#videojuegos', 2)
('#TuitUtil', 2)
('#Latch', 2)
('#PSOE24M', 2)
...

-------------------------------------------
Time: 2021-01-10 10:06:40
-------------------------------------------
('#ForoCambio11J', 7)
('#UAM', 5)
('#UniDeVerano', 5)
('#OSX', 4)
('#Ciencia', 4)
('#Apple', 4)
('#digoSIvotoPSOE', 4)
('#EnPodemosTúDecides', 3)
('#xelFuturodeEspaña', 3)
('#PGE2016', 3)
...

-------------------------------------------
Time: 2021-01-10 10:06:50
-------------------------

In [8]:
ssc.stop(stopSparkContext=True, stopGraceFully=True)

-------------------------------------------
Time: 2021-01-10 10:07:30
-------------------------------------------
('#UAM', 19)
('#ForoCambio11J', 19)
('#UniDeVerano', 12)
('#Ciencia', 11)
('#Apple', 11)
('#TuitUtil', 9)
('#TrabajarHacerCrecer', 9)
('#xelFuturodeEspaña', 9)
('#FelizMiercoles', 8)
('#OSX', 7)
...

-------------------------------------------
Time: 2021-01-10 10:07:40
-------------------------------------------
('#ForoCambio11J', 23)
('#UAM', 20)
('#UniDeVerano', 15)
('#Ciencia', 12)
('#Apple', 12)
('#TrabajarHacerCrecer', 11)
('#TuitUtil', 9)
('#xelFuturodeEspaña', 9)
('#FelizMiercoles', 8)
('#RutaDelCambio', 8)
...

-------------------------------------------
Time: 2021-01-10 10:07:50
-------------------------------------------
('#ForoCambio11J', 24)
('#UAM', 22)
('#UniDeVerano', 17)
('#TrabajarHacerCrecer', 15)
('#xelFuturodeEspaña', 14)
('#Ciencia', 13)
('#Apple', 13)
('#TuitUtil', 10)
('#CMin', 9)
('#FelizMiercoles', 8)
...



Se puede ver que el programa funciona correctamente. Nos imprime la lista de los 10 hashtags que más han aparecido hasta el momento. Se puede ver que la lista es la misma y se va actualizando, aumentando la frecuencia de los hashtags más populares.

El único detalle que se desconoce porque pasa es que se recibe una actualización de la lista cada 10 segundos. Cuando en el StreamingContext se ha indicado una duración de batch de 5 segundos. Se ha intentado incluso usar una ventana de 5 segundos con desplazamiento de 5 segundos y sigue pasando igual. Además, se ha intentado modificar otros parámetros como el número de núcleos y no se soluciona ese problema. El problema no ocurre si el número de operaciones por bach es menor, así que puede que sea un problema de que hay muchas operaciones que hacer y no da tiempo a terminarlas todas en 5 segundos.