# Spark Streaming para Elasticsearch

In [None]:
import os
import urllib.request
import zipfile
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
import json
import time
from datetime import datetime

In [1]:
#!pip install elasticsearch

In [None]:
from elasticsearch import Elasticsearch

In [None]:
# Conexão do Elasticsearch por padrão, nos conectamos ao elasticsearch: 9200, como estamos executando este notebook 
# no Spark-Node, precisamos usar 'elasticsearch-node' em vez de 'localhost', pois esse é o nome do docker container
# executando o Elasticsearch. Se o índice de teste de fluxo existir, limpe-o e crie um novo.

es = Elasticsearch('elasticsearch-node:9200')

if es.indices.exists('stream-test'):
    es.indices.delete('stream-test')
    
    body={
        'mappings': {
            'properties': {
                'count': {'type': 'text'},
                'name': {'type': 'text'},
                'value': {'type': 'text'},
                'timestamp': {'type': 'text'}
            }
        }
    }
    
    es.indices.create(index='stream-test', body=body)
    
print(es.info())

In [None]:
# Precisamos garantir que o conector ES-Hadoop esteja no caminho de classe do driver; 

os.environ['PYSPARK_SUBMIT_ARGS'] = '--jars elasticsearch-hadoop-7.5.2/dist/elasticsearch-spark-20_2.11-7.5.2.jar pyspark-shell'

# SparkContext -- É o portão de entrada da funcionalidade Apache Spark.

sc = SparkContext(appName="PythonSparkStreaming")
sc.setLogLevel("WARN")

In [None]:
# StreamingContext -- É o ponto de entrada para todas as funcionalidades do Spark Streaming.
# (representa a conexão com um cluster Spark e pode ser usado para criar várias fontes de entrada).

ssc = StreamingContext(sc, 3)

In [None]:
# 1. Transmitiremos todos os arquivos gravados no diretório de amostra. Isso está sendo bombeado com dados aleatórios.

print(os.getcwd())

json_folder_path = ('./sample/')

json_files = [ x for x in os.listdir(json_folder_path ) if x.endswith("json") ]

for json_file in json_files:

    json_file_path = os.path.join(json_folder_path, json_file)

    with open (json_file_path) as f:

        data_dict = json.loads(f.read())  

        data_dict['count'] = data_dict.pop('count')
        
        print(data_dict['count'])
        
        stream = ssc.textFileStream(json_file_path)

# 2. Gravar o RDD gerado por cada operação em lote de streaming no Elasticsearch.

        es_write_conf = {
            "es.nodes" : '927d8d22d8c7',
            "es.port" : '9200',
            "es.resource" : 'stream-test',
            "es.input.json" : "yes",
            "es.mapping.id": "count"
        }
    
# RDD -- Conjuntos de dados distribuídos resilientes (RDD) é uma estrutura de dados fundamental do Spark. 
#        É uma coleção imutável de objetos distribuídos. Cada conjunto de dados no RDD é dividido em partições lógicas,
#        que podem ser computadas em diferentes nós do cluster.

# PARALLELIZE -- É uma estrutura de dados fundamental do Spark, é uma coleção imutável de objetos distribuídos. 
#                Cada conjunto de dados no RDD é dividido em partições lógicas, que podem ser computadas em diferentes
#                nós do cluster

        rdd = sc.parallelize(data_dict)
        
        def format_data(x):
            return (data_dict['count'], json.dumps(data_dict))      

        
# LAMBDA -- Uma função lambda pode receber qualquer número de argumentos, mas pode ter apenas uma expressão.
#           Também conhecidas como funções anônimas, são pequenas funções restritas que não precisam de um 
#           nome (ou seja, um identificador).

# MAP -- A operação Map() se aplica a cada elemento do RDD e retorna o resultado como novo RDD. 
#        No mapa, o desenvolvedor pode definir sua própria lógica de negócios customizada. 
#        A mesma lógica será aplicada a todos os elementos do RDD.


        rdd = rdd.map(lambda x: format_data(x))
        
        parsed = stream.map(lambda x: format_data(x))
        
# saveAsNewAPIHadoopFile -- Método para salvar o RDD no ElasticSearch.  

        rdd.saveAsNewAPIHadoopFile(
            path='-',
            outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
            keyClass="org.apache.hadoop.io.NullWritable",
            valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
            conf=es_write_conf
        )
        

In [None]:
# foreachRDD --É um "operador de saída" no Spark Streaming. Ele permite que você acesse os RDDs subjacentes 
# do DStream para executar ações que fazem algo prático com os dados. Por exemplo, usando o foreachRDD, 
# você pode gravar dados em um banco de dados.

parsed.foreachRDD(lambda rdd: handler(rdd))

In [None]:
# Solicitar que também seja impresso em 'stdout'.

parsed.pprint()

In [None]:
# Começar o contexto do spark.

ssc.start()

In [None]:
# Parar o contexto.

ssc.stop()

# Bulk Processing ES with Spark

In [None]:
sc = SparkContext(appName="PythonSparkReading")  
sc.setLogLevel("WARN")

In [None]:
es_read_conf = {
    "es.nodes" : '927d8d22d8c7',
    "es.port" : '9200',
    "es.resource" : 'stream-test'
    }

In [None]:
# Obtenha um RDD para um determinado arquivo Hadoop com uma nova API InputFormat arbitrária e 
# opções de configuração extras para passar para o formato de entrada.

es_rdd = sc.newAPIHadoopRDD(
    inputFormatClass="org.elasticsearch.hadoop.mr.EsInputFormat",
    keyClass="org.apache.hadoop.io.NullWritable",
    valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
    conf=es_read_conf)

In [None]:
# Retornar os 5 primeiros documentos, numa lista com id(count) e o resto da informação.

es_rdd.take(5)

In [None]:
es_rdd = es_rdd.map(lambda x: x[1])

In [None]:
# Retornar um unico elemento, numa lista com todas as suas informações.

es_rdd.take(1)

In [None]:
from pyspark.sql import SparkSession, SQLContext, Row

spark = SparkSession \
    .builder \
    .appName("Spark SQL") \
    .getOrCreate()

In [None]:
df = es_rdd.map(lambda l: Row(**dict(l))).toDF()

In [None]:
df.take(1)

In [None]:
df \
    .groupby('value') \
    .count() \
    .collect()

In [None]:
df \
    .filter(df.name == 'Legolas')\
    .take(5)

In [None]:
sc.stop()