### Streaming dos dados JSON
Para o streaming dos dados foi configurada uma arquitetura Kappa, onde os dados são ingeridos pelo Kafka, processados no Spark e armazenados diretamente no Delta Lake. Por esse motivo este notebook foi criado em um cluster <a href="community.cloud.databricks.com/" target="_blank">**databricks community**</a>, sua versão gratuita. Para acesso/teste desta solução posso receber a chave pública de vocês para liberar o acesso ao servidor kafka, onde pode ser feito o teste da ingestão dos dados através do tópico Kafka. Já a execução desse notebook pode ser feita em qualquer cluster criado no databricks community.

[![kappa](https://storage.googleapis.com/repo-files/kappa.png)]()

## Kafka
Uma instância Ubuntu foi criada no GCP e possui o confluent instalado. Asim que os arquivos json são recebidos, um tópico do kafka ingere os dados e os deixa disponíveis para serem processados pelo Spark, conforme o código contido neste notebook.

## Spark
Os dados disponibilizados pelo Kafka são recebidos em binário, são lidos e convertidos em string e posteriormente processados no spark.

## Delta
Após o processamento feito no Spark, os dados são armazenados no Delta Lake, onde podemos integrar SQL diretamente com o streaming do dado.

## Importando todas as bibliotecas necessárias

In [3]:
import pyspark

from pyspark.context import SparkContext
from pyspark.sql.session import SparkSession
from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.streaming import *


sc = SparkContext.getOrCreate()
spark = SparkSession(sc)

## Testando a conexão ao servidor e ao broker do kafka

In [5]:
%sh telnet 35.238.128.197 9092

## Definindo as variáveis
As variáveis abaixo serão utilizadas na leitura do Kafka pelo Spark. São elas referente a string de conexão do broker, do schema registry e dos tópicos do Kafka.

In [7]:
#kafka
kafkaBrokers = "35.238.128.197:9092"

#schema registry
schemaRegistryAddr = "http://35.238.128.197:8081"

#topic
spooldir_topic = "spooldir2009-json-topic, spooldir2010-json-topic, spooldir2011-json-topic, spooldir2012-json-topic"

## Definindo o schema dos dados
O dado recebido do Kafka é do tipo binário. Aqui é difinido o schema que será associado ao dado no Spark, o que converterá o dado em uma estrutura do tipo Spark Struct

In [9]:
jsonSchema = StructType([ StructField('dropoff_datetime', TimestampType(), True), \
                         StructField('dropoff_latitude', DoubleType(), True), \
                         StructField('dropoff_longitude', DoubleType(), True), \
                         StructField('fare_amount', DoubleType(), True), \
                         StructField('passenger_count', LongType(), True), \
                         StructField('payment_type', StringType(), True), \
                         StructField('pickup_datetime', TimestampType(), True), \
                         StructField('pickup_latitude', DoubleType(), True), \
                         StructField('pickup_longitude', DoubleType(), True), \
                         StructField('rate_code', StringType(), True), \
                         StructField('store_and_fwd_flag', LongType(), True), \
                         StructField('surcharge', DoubleType(), True), \
                         StructField('tip_amount', DoubleType(), True), \
                         StructField('tolls_amount', DoubleType(), True), \
                         StructField('total_amount', DoubleType(), True), \
                         StructField('trip_distance', DoubleType(), True), \
                         StructField('vendor_id', StringType(), True)])

## Leitura do tópico kafka
Esse é o primeiro passo para especificar a localização do Kafka e quais tópicos serão lidos. O Spark permite a leitura de mais de um tópico simultaneamente. Neste caso serão lidos quatro tópicos, cada um referente a um arquivo (2009, 2010, 2011, 2012).

In [11]:
df_kafka_nyctaxitrips = spark.readStream \
                        .format("kafka") \
                        .option("kafka.bootstrap.servers", kafkaBrokers) \
                        .option("subscribe", spooldir_topic) \
                        .option("startingOffsets", "earliest") \
                        .load()

## Convertendo o valor binário lido do kafka para string
Os bytes do registro do Kafka representam Strings UTF8. Neste caso basta converter os dados binários para o tipo desejado.

In [13]:
df = df_kafka_nyctaxitrips.selectExpr("CAST(value AS STRING)")

Os dados estão chegando no formato JSON. Para associar o schema correto, é utilizada a função nativa do Spark `from_json` junto ao schema já definido previamente.

In [15]:
dfnyctaxitrips = df.select(from_json(df.value, jsonSchema).alias("json"))

## Validando se o dataframe está transmitindo dados.

In [17]:
df_kafka_nyctaxitrips.isStreaming

## Carga dos dados de streaming no delta lake
Os dados em streaming serão escritos no Delta Lake, onde poderão ser manipulados.

In [19]:
dfnyctaxitrips.writeStream \
                     .format("delta") \
                     .outputMode("append") \
                     .option("checkpointLocation", "/home/fabricio_dutra87/ingest-nyctaxitrips") \
                     .start("/home/fabricio_dutra87/ingest-nyctaxitrips")

## Queries em SQL diretamente no Delta Lake após a ingestão dos dados
Como exemplo, foi executada diretamente no Delta Lake a consulta referente a questão número 4 do teste técnico:
- Faça um gráfico de série temporal contando a quantidade de gorjetas de cada dia, nos últimos 3 meses de 2012

In [21]:
%sql

select to_date(substr(json.dropoff_datetime, 1,10)) as day,
                       sum(json.tip_amount) as sum_tips
                       from delta.`/home/fabricio_dutra87/ingest-nyctaxitrips`
                       where json.dropoff_datetime >= ('2012-10-01')
                       group by day
                       order by day asc

day,sum_tips
2012-10-01,1438.2900000000004
2012-10-02,1444.7399999999998
2012-10-03,1335.76
2012-10-04,1407.6700000000003
2012-10-05,1392.94
2012-10-06,1362.4500000000005
2012-10-07,1347.4699999999998
2012-10-08,1307.6199999999997
2012-10-09,1396.24
2012-10-10,1443.66
