# Streaming

In [None]:
# Importación de dependencias y funciones
from __future__ import print_function
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from operator import add
from operator import sub

# Crear el contexto de Spark Streaming
ssc = StreamingContext(sc, 5)

In [None]:
# Configura el endpoint para localizar el broker de Kafka
# kafkaBrokerIPPort = "172.20.1.21:9092"
kafkaBrokerIPPort = "127.0.0.1:9092"

# Productor simple (Singleton!)
# from kafka import KafkaProducer
import kafka
class KafkaProducerWrapper(object):
  producer = None
  @staticmethod
  def getProducer(brokerList):
    if KafkaProducerWrapper.producer != None:
      return KafkaProducerWrapper.producer
    else:
      KafkaProducerWrapper.producer = kafka.KafkaProducer(bootstrap_servers=brokerList, key_serializer=str.encode, value_serializer=str.encode)
      return KafkaProducerWrapper.producer

In [None]:
# Como los datos que vamos a usar no necesitan un tipado específico, dejamos que todos los campos sean tipo String

def parseTweet(line):
  s = line.split("\t")
  try:
        return [{"ID": s[0],
                 "PARENT-SYS-ID": s[1], 
                 "Source": s[2], 
                 "Mentions": s[3],
                 "Target": s[4],
                 "NAME Source": s[5],
                 "BODY": s[6],
                 "PUBDATE": s[7],
                 "URLs comma separated": s[8],
                 "Type TW-RT-MT": s[9],
                 "LINK": s[10],
                 "n1 Link": s[11],
                 "n1 Picture": s[12],
                 "PERSONAL-WEBSITE": s[13],
                 "COUNTRY": s[14],
                 "ALL-NICK-ACTIVITY-EVER": s[15],
                 "NICK-FOLLOWERS": s[16],
                 "FRIENDS-FOLLOWING-AUDIENCE": s[17],
                 "LOCATION": s[18]
                }]
  except Exception as err:
      print("Wrong line format (%s): " % line)
      return []

In [None]:
# Fichero de texto: Lectura de fuente de datos de fichero (no se usa en este ejemplo, en su lugar 
# enviamos los datos a Kafka para crear una simulación más realista)
# stream = ssc.textFileStream("/tmp/data/orders.txt")

# Kafka: Lectura de datos
kafkaParams = {"metadata.broker.list": kafkaBrokerIPPort}
stream = KafkaUtils.createDirectStream(ssc, ["test"], kafkaParams)
stream = stream.map(lambda t: str(t[1]))

## Ejecutar un ejercicio cada vez

### 1. Calcular el número total de menciones recibidas por cada cuenta de usuario durante el intervalo de 5 segundos.

In [None]:
tweets = stream.flatMap(parseTweet)

# Cálculo de resumen de compra-venta
numPerType = (tweets
              .flatMap(lambda t: t['Mentions'].split(","))
              .map(lambda m: (m,1) if m!='' else (m, 0))
              .reduceByKey(lambda x,y: x+y)
             )
                                     
numPerType.pprint()

sc.setCheckpointDir("data/checkpoint/")

### 2. Calcular la frecuencia total acumulada de apariciones de cada hashtag en el campo body, actualizando un ranking con los 5 hashtags con mayor frecuencia de aparición.

In [None]:
def updateFunction(newValues, runningCount):
    if runningCount is None:
        runningCount = 0
    return sum(newValues, runningCount)

tweets = stream.flatMap(parseTweet)

numHash = (tweets
           .flatMap(lambda t: t['BODY'].split(" "))
           .map(lambda w: (w,1) if w.startswith('#') else ('',0))
           .reduceByKey(lambda x,y: x+y)
           .updateStateByKey(updateFunction)
          )

top5Hash = numHash.transform(lambda rdd: rdd.sortBy(lambda x: x[1], False)
                             .map(lambda x: x[0])
                             .zipWithIndex()
                             .filter(lambda x: x[1] < 5)
                            )

top5Hash.pprint()

sc.setCheckpointDir("data/checkpoint/")

### 3. Calcular en una ventana temporal 20 segundos con offset de 10 segundos la frecuencia de aparición de cada uno de los 3 posibles tipos de tweets (TW-RT-MT).

In [None]:
tweets = stream.flatMap(parseTweet)

# Cálculo de resumen de compra-venta
numPerType = tweets.map(lambda t: (t['Type TW-RT-MT'], 1)).reduceByKeyAndWindow(add, sub, 10, 20)
numPerType.pprint()

sc.setCheckpointDir("data/checkpoint/")

## Start

In [None]:
ssc.start()

In [None]:
ssc.stop(False)

Es necesario reiniciar el kernel tras la ejecución de cada ejercicio