#  5. Práctica Spark Streaming

SPARK STREAMING+ KAFKA

Autora: Ester Cortés García

Utilizando como base las herramientas presentadas en clase (productor y consumidor de Kafka
genéricos en Python), crear una aplicación local de Spark Streaming que lea progresivamente los
tweets insertados en una cola de Kafka identificada por el topic "Qatar", defina un
intervalo de procesamiento de datos de 5 segundos y realice tres tareas. 

## Importaciones

In [None]:
from __future__ import print_function
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from operator import add
from operator import sub

## Creación del contexto

In [None]:
import os
packages = "org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.1"
os.environ["PYSPARK_SUBMIT_ARGS"] = (
    "--packages {0} pyspark-shell".format(packages)
)

In [None]:
sc = SparkContext(appName="KafkaStreamingEndtoEnd")

In [None]:
# Crear el contexto de Spark Streaming
ssc = StreamingContext(sc, 5) #tiempo de procesamiento - 5s

## Métodos auxiliares

### Método de parseo de tweets

Este método nos ayuda a parsear cada línea que llega por la cola de Kafka con datos sobre los tweets

In [None]:
import csv

def parseOrder(line):
    s = next(csv.reader([line]))
    try: 
        return[{"ID":s[0],"Source":s[2],"Mentions":s[3],
               "Body":s[6],"Type":s[9]}]
    except Exception as err:
        print("Wrong line format (%s) " % line)
        return []

### Lectura de datos de kafka

In [None]:
kafkaBrokerIPPort = "127.0.0.1:9092"
kafkaParams = {"metadata.broker.list": kafkaBrokerIPPort}
stream = KafkaUtils.createDirectStream(ssc, ["Qatar"], 
                                       kafkaParams)
stream = stream.map(lambda o: str(o[1]))

## Apartado A
### Calcular el número total de menciones recibidas por cada cuenta de usuario durante el intervalo de 5 segundos.

In [None]:
tweets = stream.flatMap(parseOrder)
mentions = (tweets.flatMap(lambda M: M["Mentions"].split(","))
                    .map(lambda x: (x,1))
                    .filter(lambda m: m[0] != '')
                    .reduceByKey(add))
                
mentions.pprint()

## Apartado B
### Calcular la frecuencia total acumulada de apariciones de cada hashtag en el campo body, actualizando un ranking con los 5 hashtags con mayor frecuencia de aparición.

In [None]:
tweets = stream.flatMap(parseOrder)
get5 = (tweets.flatMap(lambda M: M["Body"].split(" "))
                    .filter(lambda x: x.startswith("#"))
                    .map(lambda x: (x,1))
                    .updateStateByKey(lambda vals, totalOpt: sum(vals) + totalOpt 
                                      if totalOpt != None else sum(vals)))
result = (get5.transform(lambda rdd: rdd.sortBy(lambda x: x[1], False)
                         .map(lambda x: x[0])
                          .zipWithIndex()
                         .filter(lambda x: x[1] < 5)))

                                               
result.pprint()

sc.setCheckpointDir("data/checkpoint/")

## Apartado C
### Calcular en una ventana temporal 20 segundos con offset de 10 segundos la frecuencia de aparición de cada uno de los 3 posibles tipos de tweets (TW-RT-MT).

In [None]:
tweets = stream.flatMap(parseOrder)
getType = (tweets.map(lambda t:(t["Type"],1))
                .reduceByKeyAndWindow(add,sub,windowDuration=20,slideDuration=10)
                .repartition(1))
getType.pprint()

sc.setCheckpointDir("data/checkpoint/")

## Start Streaming context

In [None]:
ssc.start()
ssc.awaitTerminationOrTimeout(10)  # Espera 10 segs. antes de acabar

## Stop Streaming Context

In [None]:
ssc.stop(False)