## [DEFINITION] Contexto do Spark

In [None]:
from pyspark import SparkConf, SparkContext
from pyspark.streaming import StreamingContext
from pyspark.sql import Row,SQLContext
import sys
import requests

# Criando a configuração (Informe um nome para a sua API)
conf = SparkConf().setMaster("local[2]").setAppName("twitterApp")

# Cria o contexto spark com a configuração acima
sc = SparkContext.getOrCreate(conf = conf)

#Para evitar muito lixo no log
sc.setLogLevel("ERROR")

# Cria o contexto do streaming utilizando o contexto do spark com intervalo de 3 segundos
ssc = StreamingContext(sc, 3)

# Setando o checkpoint para permitir a recuperação do RDD
ssc.checkpoint("checkpoint_TwitterApp")

# Lendo dados da porta 9009
dataStream = ssc.socketTextStream("localhost", 9009)

## [DEFINITION] Métodos para tratamento dos dados com o Spark

In [None]:
import pandas as pd
import json
import requests

# Obtem o contexto SQL do spark (Utilizado na criação do dataframe em send_to_mongo)
def get_sql_context_instance(spark_context):
    if ('sqlContextSingletonInstance' not in globals()):
        globals()['sqlContextSingletonInstance'] = SQLContext(spark_context)
    return globals()['sqlContextSingletonInstance']

# Filtra somente as hashtags em uma lista de palavras
def get_hashtags(words):
    return set(w[1:] for w in words.split() if w.startswith('#'))

# Faz um post na API com os dados da hashtag para atualização e obtem a resposta
def send_to_mongo(j):
    response = requests.post(url = "http://localhost:5001/v1/twits", json = j, headers={'Content-Type': 'application/json'})
    return response.text

# Printa as respostas 
def print_responses(r):
    print(r)

# Transforma cada rdd em um json válido para requisição na API
def transform_rdd(rdd):    
    try:
        if not (rdd.isEmpty()):
            print("RDD Recebido!")
            
            responses = []
            sql_context = get_sql_context_instance(rdd.context)
            
            rdd_row = rdd.map(lambda x: Row(hashtag = x[0], hashtag_count = x[1]))
            rdd.unpersist()
            
            jsons = sql_context.createDataFrame(rdd_row).toJSON()
            rdd_row.unpersist()
            
            print("Enviando para a API mongo: ")
            responses = jsons.map(lambda x: send_to_mongo(x))
            jsons.unpersist()
            
            print("Imprimindo as respostas: " + str(responses.count()))
            for x in responses.collect():
                print_responses(x)
            
        else:
            #Significa que tratou um tweet que não continha hashtags
            print("RDD vazio!")
    except Exception as ex:
        print("send_to_mongo Error: %s" % ex)

## [TRANSFORMATION] Tratamento dos tweets

In [None]:
# Quebrando cada tweet em palavras
words = dataStream.flatMap(lambda line: line.split(" "))

# Filtra as palavras para obter apenas as hashtags, então agrupa as hashtags e suas quantidades (count())
hashtags = words.filter(get_hashtags).countByValue()

# Transformando os totais de cada tag em formato json válido para enviar as requisições
hashtags.foreachRDD(transform_rdd)

# Startando a computação do streaming
ssc.start()

# Esperando a finalização do straming
ssc.awaitTermination()