# Ejercicio 2 (hashtags más utilizados): (2 puntos)
Desarrollar un notebook de Jupyter, denominado “hashtags.ipynb”, en el que se utilice como fuente de datos Kafka, y en concreto el topic kafkaTwitter. La duración del batch será de 5 segundos. Se procesarán los tweets que lleguen para extraer los hashtags que contengan (tener en cuenta que todos los hashtags comienzan por el carácter ‘#’). 

Se irán mostrando, cada vez que se procese el batch (5 segundos) <b> los diez hashtags más utilizados desde el inicio del programa hasta ese momento y el número total de apariciones de cada uno, ordenados de mayor a menor frecuencia.</b>


In [1]:
import findspark
findspark.init('/opt/spark')

In [2]:
import os
os.environ['JAVA_HOME'] = '/Library/Java/JavaVirtualMachines/jdk1.8.0_171.jdk/Contents/Home/jre'
os.environ['PYSPARK_SUBMIT_ARGS'] = '--master local[*] --packages org.apache.spark:spark-streaming-kafka-0-8_2.11:2.4.4 pyspark-shell'

In [3]:
import pyspark
import pyspark.streaming
from pyspark import SparkConf, SparkContext
from pyspark.sql import SparkSession
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils

In [4]:
#sc = SparkContext(master="local[*]", appName="Tweet Count")
spark = SparkSession.builder.appName("hashtags").master("local[*]").getOrCreate()

# 5s de batchDuration
ssc = StreamingContext(spark.sparkContext, 5)

ssc.checkpoint("checkpoint")

In [5]:
# DStream con el batch de 5s de Kafka.
kafkaStream = KafkaUtils.createStream(ssc,zkQuorum='localhost:2181',groupId="group_1" , topics={'kafkaTwitter':2})

In [6]:
words = kafkaStream.flatMap(lambda message: (message[1].split(" ")))
# Nos quedamos con las palabras que contienen # porque algunos elementos no tienen un espacio antes del #.
# ej. :D.#MTVHottest,
hastags = words.filter(lambda word: ("#" in word))

In [7]:
import re

def getRegex(word, regex):
    match_obj = re.search(regex, word)
    
    if type(match_obj) is None:
        return "NULL"
    else :
        return match_obj.group() 
    

In [8]:
# Filtramos con una expresión regular para quedarnos con el # y la palabra que lo acompaña.
clean_hastags = hastags.map(lambda x: (getRegex(x,'#\w+')))

In [9]:
# Mapeamos cada hastag como key y 1 como value para contarlos.
local_hastag_count = clean_hastags.map( lambda ht: (ht, 1))

In [10]:
# Función para actualizar el conteo global con la suma del conteo local del batch en ejecución.
def updateFunction(newValues, localCount):
    return sum(newValues) + (localCount or 0)

In [11]:
global_hastag_count = local_hastag_count.updateStateByKey(updateFunc=updateFunction)

In [12]:
# Ordenamos por value en orden descendiente.
result = global_hastag_count.transform( lambda ht_count: (ht_count.sortBy(lambda pair: ( pair[1]), ascending=False)))

In [13]:
# Mostramos los primero 10 valores -> los 10 valores mas altos.
result.pprint(10)

In [14]:
ssc.start()

-------------------------------------------
Time: 2020-01-11 20:51:40
-------------------------------------------

-------------------------------------------
Time: 2020-01-11 20:51:45
-------------------------------------------

-------------------------------------------
Time: 2020-01-11 20:51:50
-------------------------------------------

-------------------------------------------
Time: 2020-01-11 20:51:55
-------------------------------------------

-------------------------------------------
Time: 2020-01-11 20:52:00
-------------------------------------------
('#UAM', 1)
('#EmPPleo', 1)
('#YoTambiénAmoALosAnimales', 1)
('#Actualizaciones', 1)
('#hot', 1)
('#SID2014', 1)
('#CospedalAsturias', 1)
('#FelizMiércoles', 1)
('#ElCambioQueUne', 1)
('#Apple', 1)
...

-------------------------------------------
Time: 2020-01-11 20:52:05
-------------------------------------------
('#UAM', 2)
('#InvertirEnPersonas', 2)
('#EmPPleo', 1)
('#YoTambiénAmoALosAnimales', 1)
('#Actualizaciones', 