# Práctica Sistemas Distribuidos III 
## Spark Streaming. Quatar GP 2014

Utilizando como base las herramientas presentadas en clase (productor y consumidor de Kafka
genéricos en Python), crear una aplicación local de Spark Streaming que lea progresivamente los
tweets insertados en una cola de Kafka identificada por el topic "Quatar_GP_2014", defina un
intervalo de procesamiento de datos de 5 segundos y realice las siguientes tareas:

a) Calcular el número total de menciones recibidas por cada cuenta de usuario durante el intervalo de 5 segundos.

b) Calcular la frecuencia total acumulada de apariciones de cada hashtag en el campo body, actualizando un ranking con los 5 hashtags con mayor frecuencia de aparición.

c) Calcular en una ventana temporal 20 segundos con offset de 10 segundos la frecuencia de aparición de cada uno de los 3 posibles tipos de tweets (TW-RT-MT).

#### Importamos Librerias y creamos el contexto

In [None]:
# Importación de dependencias y funciones
from __future__ import print_function
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from operator import add
from operator import sub
import re

In [None]:
# Load external packages programatically
import os
packages = "org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.1"

os.environ["PYSPARK_SUBMIT_ARGS"] = (
    "--packages {0} pyspark-shell".format(packages))

In [None]:
sc = SparkContext(appName="Quatar_GP_2014")

In [None]:
# Crear el contexto de Spark Streaming
ssc = StreamingContext(sc, 5)

#### Métodos necesarios

In [None]:
from datetime import datetime

def parse_tweet(line):
  s = re.split(r",(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)",line) # Usamos expresion regular para no dividir por , dentro de ""
  try:
      return [{"Id": int(s[0]), "Parent_sys_id": s[1], "Source": s[2], 
               "Mentions": s[3], "Target": s[4], "Name_source": s[5], 
               "Body": s[6], "Pub_date": datetime.strptime(s[7], "%d/%m/%Y %H:%M"), 
               "URLs": s[8], "Tipe_action": s[9], "Link": s[10],"Has_link": s[11],
               "Has_picture": s[12], "Website": s[13], "Country": s[14], 
               "Acrivity": int(s[15]), "Followers": int(s[16]), 
               "Following": int(s[17]) , "Location": s[18]}]
  except Exception as err:
      print("Wrong line format (%s): " % line)
      return []

### Lectura de datos

In [None]:
# Kafka: Lectura de datos
# python practica_kafka_producer.py 0.4 0.9 Quatar_GP_2014 data/DATASET-Twitter-23-26-Mar-2014-MotoGP-Qatar.csv
kafkaBrokerIPPort = "127.0.0.1:9092"

kafkaParams = {"metadata.broker.list": kafkaBrokerIPPort}
stream = KafkaUtils.createDirectStream(ssc, ["Quatar_GP_2014"], kafkaParams)
stream = stream.map(lambda o: str(o[1])) # lo que devuelve es un dstream

### a) Calcular el número total de menciones recibidas por cada cuenta de usuario durante el intervalo de 5 segundos.

In [None]:
tweet = stream.flatMap(parse_tweet)

numMenUser = tweet.flatMap(lambda o : (o["Mentions"].split(",")))\
                  .map(lambda o : o.strip('"'))\
                  .map(lambda o : (o, 1))\
                  .reduceByKey(add) 

numMenUser.pprint()

numMenUser.repartition(1).saveAsTextFiles("data/output_a/metrics", "txt") # 

### b) Calcular la frecuencia total acumulada de apariciones de cada hashtag en el campo body, actualizando un ranking con los 5 hashtags con mayor frecuencia de aparición.

In [None]:
tweet = stream.flatMap(parse_tweet)

numHash = tweet.filter(lambda o : o != "")\
               .flatMap(lambda o : (o["Body"].split()))\
               .map(lambda o : o.strip('"'))\
               .filter(lambda o : o[0] == "#")\
               .map(lambda o : (o, 1))\
               .reduceByKey(add) 

acumHash = numHash.updateStateByKey(lambda vals , totalHash : sum(vals) + totalHash if totalHash != None else sum(vals)) 
top5hash = acumHash.transform(lambda rdd: rdd.sortBy(lambda x: x[1], False).map(lambda x: x[0]).zipWithIndex().filter(lambda x: x[1] < 5)) 

sc.setCheckpointDir("data/checkpoint_b/")
top5hash.pprint()

### c) Calcular en una ventana temporal 20 segundos con offset de 10 segundos la frecuencia de aparición de cada uno de los 3 posibles tipos de tweets (TW-RT-MT).

In [None]:
tweet = stream.flatMap(parse_tweet)

tweetPerWindow = tweet.map(lambda o: (o['Tipe_action'],1))\
                      .reduceByKeyAndWindow(add, sub, 20, 10) 

sc.setCheckpointDir("data/checkpoint_c/")

tweetPerWindow.pprint()


### Start Streaming context

In [None]:
ssc.start()
ssc.awaitTerminationOrTimeout(30)  # Espera 30 segs. antes de acabar

In [None]:
ssc.stop(False)  