# Práctica de Spark Streaming (2017 - 2018) 

## Carlos Sánchez Vega (Campus Ferraz) 

## Sistemas distribuidos 3

### Importación de dependencias y funciones

In [1]:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from operator import add
from operator import sub
from pandas import pandas as pd

### Cargamos los paquetes externos que necesitamos

In [2]:
import os
packages = "org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.1"

os.environ["PYSPARK_SUBMIT_ARGS"] = (
    "--packages {0} pyspark-shell".format(packages)
)

### Creamos una ventana de 5 segundos, tal y como dice en el enunciado

In [3]:
sc = SparkContext(appName="gp")
ssc = StreamingContext(sc, 5)

In [4]:
kafkaParams = {"metadata.broker.list": "localhost:9092"}
directKafkaStream = KafkaUtils.createDirectStream(ssc, ["gp"], kafkaParams)

### a) Calcular el número total de menciones recibidas por cada cuenta de usuario durante el intervalo de 5 segundos.   

In [5]:
ssc.checkpoint("checkpoint")

# Parseamos el mensaje de entrada
stream = directKafkaStream.map(lambda line:line[1])

# El csv divide los campos por ",". La columna que nos interesa es la de "Mentions"
# que es la cuarta columna (el primer índice es el índice 0)
total_count = (stream.map(lambda line:line.split(',')[3])
                       # hay casos en los que la columna "Mentions" está vacía
                       # en esos casos, no tendremos en cuenta el campo para el conteo
                      .filter(lambda line: len(line)>0)                       
                      .map(lambda line:(line,1))
                      .reduceByKey(add))
total_count.pprint()

In [None]:
ssc.start()
ssc.awaitTermination()

-------------------------------------------
Time: 2018-07-15 18:00:05
-------------------------------------------
('"valeyellow46', 1)
('"jessansan', 2)
('"valeyellow46"', 1)
('"marcmarquez93"', 1)

-------------------------------------------
Time: 2018-07-15 18:00:10
-------------------------------------------

-------------------------------------------
Time: 2018-07-15 18:00:15
-------------------------------------------
('"libertaddigital"', 1)

-------------------------------------------
Time: 2018-07-15 18:00:20
-------------------------------------------
('"valeyellow46"', 1)
('"motogp', 1)
('"prischilala', 1)

-------------------------------------------
Time: 2018-07-15 18:00:25
-------------------------------------------
('"motogp', 3)

-------------------------------------------
Time: 2018-07-15 18:00:30
-------------------------------------------
('"motogp', 1)
('"birtymotogp"', 1)
('"dhika46', 1)
('"bahtiar_', 1)
('"cindypoluta"', 1)
('"marcmarquez93"', 1)

----------------

### b) Calcular la frecuencia total acumulada de apariciones de cada hashtag en el campo body, actualizando un ranking con los 5 hashtags con mayor frecuencia de aparición.

In [5]:
ssc.checkpoint("checkpoint")

# Parseamos el mensaje de entrada
stream = directKafkaStream.map(lambda line:line[1])

# Los campos del csv están separados por el campo ",". La columna que nos interesa 
# es la de "Body", que se corresponde con la séptima columna 
# (los índices de las columnas empiezan por 0)
hashtags = (stream.map(lambda line: line.split(',')[6])
                    # se separan las palabras del campo y se toman aquellos que sean "hashtags"
                    # (los hashtags son los que comienzan por "#")
                  .flatMap(lambda line: filter(lambda word: word[:1]=='#', list(set(line.split()))))
                  .map(lambda hastag:(hastag ,1))                    
                   # se realiza el conteo parcial de los hashtags encontrados 
                  .updateStateByKey(lambda aux, totalSum: 
                                    sum(aux) + totalSum if totalSum != None else sum(aux))
)

# se ordenan los hashtags para poder mostrarlos en orden descendente
most_frequency =  (hashtags.transform(lambda rdd: rdd.context.parallelize( 
                                rdd.takeOrdered(5, key = lambda hashtag: -hashtag[1])
                            ) 
          ))

most_frequency.pprint()


In [None]:
ssc.start()
ssc.awaitTermination()

-------------------------------------------
Time: 2018-07-15 18:03:10
-------------------------------------------

-------------------------------------------
Time: 2018-07-15 18:03:15
-------------------------------------------
('#marc', 1)
('#marquesista', 1)
('#repsol', 1)
('#93', 1)
('#marquez', 1)

-------------------------------------------
Time: 2018-07-15 18:03:20
-------------------------------------------
('#marc', 1)
('#marquesista', 1)
('#repsol', 1)
('#motogp', 1)
('#93', 1)

-------------------------------------------
Time: 2018-07-15 18:03:25
-------------------------------------------
('#marc', 1)
('#marquesista', 1)
('#repsol', 1)
('#motogp', 1)
('#93', 1)

-------------------------------------------
Time: 2018-07-15 18:03:30
-------------------------------------------
('#motogp', 4)
('#', 3)
('#marquez', 3)
('#qatar', 2)
('#93', 2)

-------------------------------------------
Time: 2018-07-15 18:03:35
-------------------------------------------
('#', 7)
('#motogp', 7)

-------------------------------------------
Time: 2018-07-15 18:07:00
-------------------------------------------
('#', 8)
('#motogp', 7)
('#gp', 6)
('#moto', 6)
('#marquez', 5)

-------------------------------------------
Time: 2018-07-15 18:07:05
-------------------------------------------
('#', 8)
('#motogp', 7)
('#gp', 6)
('#moto', 6)
('#marquez', 5)

-------------------------------------------
Time: 2018-07-15 18:07:10
-------------------------------------------
('#', 8)
('#motogp', 7)
('#gp', 6)
('#moto', 6)
('#marquez', 5)

-------------------------------------------
Time: 2018-07-15 18:07:15
-------------------------------------------
('#', 8)
('#motogp', 7)
('#gp', 6)
('#moto', 6)
('#marquez', 5)

-------------------------------------------
Time: 2018-07-15 18:07:20
-------------------------------------------
('#', 8)
('#motogp', 7)
('#gp', 6)
('#moto', 6)
('#marquez', 5)

-------------------------------------------
Time: 2018-07-15 18:07:25
-----------------------------------

### c) Calcular en una ventana temporal 20 segundos con offset de 10 segundos la frecuencia de aparición de cada uno de los 3 posibles tipos de tweets (TW-RT-MT).

In [5]:
ssc.checkpoint("checkpoint")
stream = directKafkaStream.map(lambda line:line[1])

# se define un conjunto, entre los que están los diferentes tipos de tweets
# se escapan los caracteres de doble comilla(")
type_tw_rt_mt = ["\"MT\"", "\"RT\"", "\"TW\""]

total_count = (stream.map(lambda line: line.split(',')[9])
                               #si el campo está entre los diferentes tipos de tweets
                               # lo tomo en cuenta para el conteo total
                             .filter(lambda line: line in type_tw_rt_mt)                              
                             .map(lambda line:(line,1))
                               # añado los parámetros de la ventana temporal (20) y el offset (10)
                             .reduceByKeyAndWindow(add, sub, 20, 10))
total_count.pprint()

In [None]:
ssc.start()
ssc.awaitTermination()

-------------------------------------------
Time: 2018-07-15 17:57:35
-------------------------------------------
('"TW"', 24)

-------------------------------------------
Time: 2018-07-15 17:57:45
-------------------------------------------
('"TW"', 34)

-------------------------------------------
Time: 2018-07-15 17:57:55
-------------------------------------------
('"TW"', 10)

-------------------------------------------
Time: 2018-07-15 17:58:05
-------------------------------------------

-------------------------------------------
Time: 2018-07-15 17:58:15
-------------------------------------------
('"TW"', 7)

-------------------------------------------
Time: 2018-07-15 17:58:25
-------------------------------------------
('"TW"', 23)

-------------------------------------------
Time: 2018-07-15 17:58:35
-------------------------------------------
('"MT"', 3)
('"TW"', 28)

-------------------------------------------
Time: 2018-07-15 17:58:45
------------------------------------

In [None]:
ssc.stop()