# O que é Streaming?
  - processamento de dados contínuo
  - Em tempo real, ou próximo a tempo real

# Por que Streaming é importante?
  - Analisar dados é um processo de transformação:
    - Dados -> processamento -> informação
  - A informação tem um valor
  - O valor de qualquer informação está relacionada diretamente ao tempo!

# Streaming
  - Contínuo - sem fim
  - Carregado a medida que é produzido
  - Processado a medida que é produzido

# Batch
  - Com inicio e fim
  - Carregamento em lote
  - Processamento em lote




# Como funciona o processo de Streaming no Spark?
  - Micro-Batchs
    - Bloco de dados produzidos em intervalo de tempo
  - Structured Streaming
    - Segunda geração de processamento de streaming de Spark (Dstream foi a primeira)
    - Garantia de processamento único de cada registro (end-to-end exactly-once guarantees)

# Modo de Saída
  - APPEND: Só novas linhas. Suporta apenas consultas stateless (não depende da informação dos registros anteriores)
  - UPDATE: apenas linhas que foram atualizadas
  - COMPLETE: toda a tabela é atualizada

# TRIGGER (Como o processo de monitoramento vai funcionar?)
  - Formas:
    - Default: dispara quando o micro batch termina
    - Tempo
    - Once: apenas uma única vez
    - Continuous: processamento contínuo

  - Parar o processo
    - stop()

# Checkpointdir
 - Diretório onde o estado de andamento é salvo
 - Se você parar o prcoesso e reiniciar com o mesmo diretório, ele segue de onde parou

# Métodos semelhantes os de batch
 - readstream em vez de read
 - writestream em vez de write

# Source e Sinks (origem e destino) que não tem suporte
  - Métodos de batch podem ser usados (read, write):
    - foreachbatch: opera no micro-batch
    - Foreach: opera a cada linha
  - Algumas garantias são perdidas: por exemplo, exactly-once::




In [None]:
%%sh
pip install spark
pip install pyspark

Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting spark
  Downloading spark-0.2.1.tar.gz (41 kB)
Building wheels for collected packages: spark
  Building wheel for spark (setup.py): started
  Building wheel for spark (setup.py): finished with status 'done'
  Created wheel for spark: filename=spark-0.2.1-py3-none-any.whl size=58762 sha256=dc402f0205a0765351a9f1d4043fedf27881daaf5134e7105131528aca3e0544
  Stored in directory: /root/.cache/pip/wheels/4e/0e/f1/164619f9920fb447d294afaae11a7715bd442ded7225953d72
Successfully built spark
Installing collected packages: spark
Successfully installed spark-0.2.1
Looking in indexes: https://pypi.org/simple, https://us-python.pkg.dev/colab-wheels/public/simple/
Collecting pyspark
  Downloading pyspark-3.3.0.tar.gz (281.3 MB)
Collecting py4j==0.10.9.5
  Downloading py4j-0.10.9.5-py2.py3-none-any.whl (199 kB)
Building wheels for collected packages: pyspark
  Building wheel for pyspark (setup

In [None]:
from pyspark.sql import SparkSession

### De JSON para o postgreSQL

In [None]:
# obs: ao executar o código, lembre-se de adicionar um arquivo .json na pasta teststreaming
# estrutura pastas: https://prnt.sc/nFgt2LDD4Yeu

if __name__ == '__main__':
  spark = SparkSession.builder.appName('Streaming').getOrCreate()

  # Definindo um schema pro json
  jsonschema = """nome STRING, 
                postagem STRING, 
                data INT"""

# lê todos os arquivos .json que estivar na pasta teststreaming
df = spark.readStream.json('/content/murilo/teststreaming', schema=jsonschema)

# guardar o estado da sessão da aplicacao streaming na pasta temp
# de cada arquivo .json inserido em teststreaming
diretorio = '/content/murilo/temp'

def atualiza_postgresql(data_f, batch_id):
  data_f.write.format('jdbc').option('url', 'jdbc:postgresql://localhost:5432/posts')\
                             .option('dbtable', 'posts')\
                             .option('user', 'postgres')\
                             .option('password', '123456')\
                             .option('driver', 'org.postgresql.Driver')\
                             .mode('append').save()
              

stcal = df.writeStream.foreachBatch(atualiza_postgresql)\
                      .outputMode('append')\
                      .trigger(processingTime = '5 second')\
                      .option('checkpointlocation', diretorio)\
                      .start()

# espera o fim do processo de alguma forma
stcal.awaitTermination()