In [24]:
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from operator import add
from operator import sub

In [25]:
# Load external packages programatically
import os
packages = "org.apache.spark:spark-streaming-kafka-0-8_2.11:2.2.1"

os.environ["PYSPARK_SUBMIT_ARGS"] = (f"--packages {packages} pyspark-shell")

In [26]:
sc = SparkContext(appName="PythonStreamingKafkaMOTOGP")
ssc = StreamingContext(sc, 5)

In [27]:
kafkaParams = {"metadata.broker.list": "localhost:9092"}
directKafkaStream = KafkaUtils.createDirectStream(ssc, ["motogp"], kafkaParams)

In [6]:
#a) Calcular el número total de menciones recibidas por cada cuenta de usuario durante el intervalo de 5 segundos.   
stream = directKafkaStream.map(lambda line:line[1])
tweet_stream = (stream.map(lambda line:line.split(',')[3]
                           if len(line.split(',')) > 3 else '')
                      .filter(lambda line: line != '')
                      .map(lambda line:(line,1))
                      .reduceByKey(add))
tweet_stream.pprint()
                           


-------------------------------------------
Time: 2018-07-13 12:01:20
-------------------------------------------

-------------------------------------------
Time: 2018-07-13 12:01:25
-------------------------------------------

-------------------------------------------
Time: 2018-07-13 12:01:30
-------------------------------------------

-------------------------------------------
Time: 2018-07-13 12:01:35
-------------------------------------------

-------------------------------------------
Time: 2018-07-13 12:01:40
-------------------------------------------
('correoorinoco', 2)
('trans7', 2)
('motogp', 10)
('motogpt7', 1)
('onehd', 1)
('ryanjepe', 1)
('tutahderevista', 2)
('elesteban98', 5)
('laureussport', 1)
('danipedrosa26', 1)
...

-------------------------------------------
Time: 2018-07-13 12:01:45
-------------------------------------------
('dario46young', 1)
('motogp', 7)
('andreadovizioso', 1)
('thomasluthi', 6)
('26_danipedrosa', 2)
('jonasfolger94', 1)
('luissalom

KeyboardInterrupt: 

-------------------------------------------
Time: 2018-07-13 12:02:05
-------------------------------------------
('motogp', 3)
('efrentxu7', 1)
('26_danipedrosa', 2)
('andreaiannone29', 1)
('lorenzo99', 3)
('marcmarquez93', 3)
('danielricciardo', 1)
('yamahaindonesia', 6)
('valeyellow46', 36)
('joanolive', 1)
...

-------------------------------------------
Time: 2018-07-13 12:02:10
-------------------------------------------
('domexcourier', 1)
('ansar_javi', 1)
('flacojonas', 8)

-------------------------------------------
Time: 2018-07-13 12:02:15
-------------------------------------------
('eliammedina', 3)
('olle_jorge5', 2)
('yeisonfressh', 3)

-------------------------------------------
Time: 2018-07-13 12:02:20
-------------------------------------------
('dmjeferson', 10)
('dmediison_21', 5)
('diime_daniiel', 18)

-------------------------------------------
Time: 2018-07-13 12:02:25
-------------------------------------------

-------------------------------------------
Time

In [28]:
ssc.checkpoint("checkpoint")
#b) Calcular la frecuencia total acumulada de apariciones de cada hashtag en el campo body,
#actualizando un ranking con los 5 hashtags con mayor frecuencia de aparición.
stream = directKafkaStream.map(lambda line:line[1])
hashtags = (stream.map(lambda line: line.split(',')[6] if len(line.split(','))>6 else '')
                  .flatMap(lambda line: filter(lambda word: word.startswith('#'), list(set(line.split(' ')))))
                  .map(lambda hastag:(hastag ,1))
                  .updateStateByKey(lambda currentVal, totalVal: 
                                    sum(currentVal) + totalVal if totalVal != None else sum(currentVal))
)

top_5_hastags =  (hashtags.transform(lambda rdd: rdd.context.parallelize( 
                                rdd.takeOrdered(5, key = lambda hashtag: -hashtag[1])
                            ) 
          ))

top_5_hastags.pprint()

-------------------------------------------
Time: 2018-07-13 13:09:10
-------------------------------------------
('#motogp', 162)
('#qatar', 143)
('#vr46', 140)
('#motogp"', 34)
('#moto2', 11)

-------------------------------------------
Time: 2018-07-13 13:09:15
-------------------------------------------
('#motogp', 205)
('#qatar', 146)
('#vr46', 140)
('#motogp"', 34)
('#gpqatar', 30)

-------------------------------------------
Time: 2018-07-13 13:09:20
-------------------------------------------
('#motogp', 255)
('#qatar', 146)
('#vr46', 140)
('#gpqatar', 111)
('#moto2', 93)

-------------------------------------------
Time: 2018-07-13 13:09:25
-------------------------------------------
('#motogp', 255)
('#qatar', 146)
('#vr46', 140)
('#moto2', 111)
('#gpqatar', 111)

-------------------------------------------
Time: 2018-07-13 13:09:30
-------------------------------------------
('#motogp', 267)
('#qatar', 146)
('#vr46', 140)
('#moto2', 119)
('#gpqatar', 111)

------------------

KeyboardInterrupt: 

-------------------------------------------
Time: 2018-07-13 13:09:40
-------------------------------------------
('#motogp', 287)
('#qatar', 146)
('#vr46', 140)
('#moto2', 125)
('#gpqatar', 111)

-------------------------------------------
Time: 2018-07-13 13:09:45
-------------------------------------------
('#motogp', 324)
('#qatar', 150)
('#vr46', 140)
('#moto2', 128)
('#moto3', 122)



In [36]:
ssc.checkpoint("checkpoint")
#c) Calcular en una ventana temporal 20 segundos con offset de 10 segundos la frecuencia de aparición de cada uno de los 3 posibles tipos de tweets (TW-RT-MT).
stream = directKafkaStream.map(lambda line:line[1])
tweet_window_stream = (stream.map(lambda line: line.split(',')[9].replace("'",'').replace('"','') 
                                  if line !=None and len(line.split(','))>9
                                  else '')
                             .filter(lambda line: line in ("MT","RT","TW"))
                             .map(lambda line:(line,1))
                             .reduceByKeyAndWindow(add, sub, 20, 10))

tweet_window_stream.pprint()

-------------------------------------------
Time: 2018-07-13 12:21:55
-------------------------------------------
('MT', 77)
('TW', 430)
('RT', 2)

-------------------------------------------
Time: 2018-07-13 12:22:05
-------------------------------------------
('MT', 132)
('TW', 623)
('RT', 2)

-------------------------------------------
Time: 2018-07-13 12:22:15
-------------------------------------------
('MT', 125)
('TW', 377)

-------------------------------------------
Time: 2018-07-13 12:22:25
-------------------------------------------
('MT', 140)
('TW', 416)

-------------------------------------------
Time: 2018-07-13 12:22:35
-------------------------------------------
('MT', 107)
('TW', 416)

-------------------------------------------
Time: 2018-07-13 12:22:45
-------------------------------------------
('MT', 141)
('TW', 370)
('RT', 2)

-------------------------------------------
Time: 2018-07-13 12:22:55
-------------------------------------------
('MT', 201)
('TW', 353)

KeyboardInterrupt: 

-------------------------------------------
Time: 2018-07-13 12:24:15
-------------------------------------------
('MT', 225)
('TW', 287)
('RT', 8)



In [None]:
ssc.start()
ssc.awaitTermination()

In [None]:
ssc.stop()