Databricks notebook source
Criar uma estrutura file system
Criação de diretórios do filestore

In [None]:
dbutils.fs.mkdirs('/FileStore/tables/raw') # colocar os arquivos recebidosneste diretório
dbutils.fs.mkdirs('/FileStore/tables/parquet') # depois de ler o arquivo csv, transforma o arquivo para parquet
dbutils.fs.mkdirs('/FileStore/tables/checkpoint') # guarda os metadados dos arquivos no momento em que estão sendo lidos, para caso de falhas no nó, rede e etc, para saber onde parou o processamento

COMMAND ----------

criação dos schemas do arquivo CSV, para o spark saber o tipo de cada um dos campos/atributos no momento da leitura
O StructType, define a estrutura do dataframe e o StructField são os tipos/formatos dos campos/atributos, sendo TRUE/FALSE (NULL OU NOT NULL)

In [None]:
from pyspark.sql.types import StructType, StructField, IntegerType, DoubleType, TimestampType
from pyspark.sql.functions import sum ,window,col

In [None]:
schema = StructType([
    StructField("TransactionID",IntegerType(),True),
    StructField("ProductID",IntegerType(),True),
    StructField("Quantity",IntegerType(),True),
    StructField("Price",DoubleType(),True),
    StructField("Date",TimestampType(),True),
])

COMMAND ----------

Stream
Leitura do arquivo para verificar processamento dos dados
cria dataframe
sequencia: 1 - Formato do arquivo a ser lido, nome do schema determinado, se existe ou não cabeçalho no arquivo, qual o tipo de separação do arquivo (, ou ; e etc) e de onde ele vai ler estes arquivos ( no caso o caminho onde o arquivo estará)

In [None]:
df = spark.readStream.format('csv').schema(schema).option('header',True).option('sep',';').load('/FileStore/tables/raw')

In [None]:
result_df = df.groupBy('ProductID', 'Date') \
    .agg(
        sum('Quantity').alias('TotalQuantity'),
        sum('Price').alias('TotalPrice')    
            )
display(result_df)

COMMAND ----------

Apaga os arquivos de maneira recursiva nas pastas/diretórios criados

dbutils.fs.rm('/FileStore/tables/raw',recurse =True) # colocar os arquivos recebidosneste diretório
dbutils.fs.rm('/FileStore/tables/parquet',recurse =True) # depois de ler o arquivo csv, transforma o arquivo para parquet
dbutils.fs.rm('/FileStore/tables/checkpoint',recurse =True)

COMMAND ----------

Ler o arquivo ,  e fazer transformação do arquivo
Criando o Dataframe de streaming usando o schema definido e buscando o arquivo
no diretório raw

In [None]:
streaming_df = spark.readStream \
    .option("header","true")\
    .schema(schema)\
    .csv('/FileStore/tables/raw')

Agregando os dados antes de salvar em parquet
withWatermark verifica arquivos atrasados para atualizar

In [None]:
aggregated_df = streaming_df \
    .withWatermark("Date","1 hour") \
    .groupBy("ProductID", window("Date", "1 hour")) \
    .agg(
        sum("Quantity").alias("TotalQuantity"),
        sum("Price").alias("TotalPrice"),
    ).select(
        col("ProductID"),
        col("window").start.alias("WindowsStart"),
        col("window").end.alias("WindowsEnd"),
        col("TotalQuantity"),
        col("TotalPrice")
    )

Caminhos de saída e checkpoint
criando dois df para receber os arquivos dos diretórios

In [None]:
output_path = '/FileStore/tables/parquet'
checkpoint_path = '/FileStore/tables/checkpoint'

In [None]:
# Configurando a escrita do stream
# sequencia: 1 - o tipo de atualização do arquivo ( append), 2- o formato gerado do arquivo, 3 - o nome do novo diretório e caminho do arquivo,
query = aggregated_df.writeStream \
    .outputMode("append") \
    .format("parquet") \
    .option("path",output_path)\
    .option("checkpointlocation",checkpoint_path)\
    .start()

In [None]:
query.awaitTermination()

COMMAND ----------

Listando arquivos no diretório especificado

In [None]:
display(dbutils.fs.ls("/FileStore/tables/raw"))

COMMAND ----------

In [None]:
display(dbutils.fs.ls("/FileStore/tables/parquet"))

COMMAND ----------

In [None]:
display(dbutils.fs.ls("/FileStore/tables/checkpoint"))