##### Configurações

In [0]:
S3_BUCKET_NAME = "YOUR_S3_BUCKET_NAME"

##### Schemas e bibliotecas

In [0]:
from pyspark.sql.types import StructType, StructField, IntegerType, StringType, LongType, \
DoubleType, BooleanType, TimestampType, DateType, ArrayType, MapType, DecimalType, ByteType, BinaryType
from pyspark.sql.functions import to_json, from_json, explode
from pyspark.sql.functions import col, upper, date_format
import uuid
from delta.tables import DeltaTable
from Crypto.Cipher import AES
from Crypto.Hash import SHA256
from pyspark.sql.functions import lit, concat, upper
import time

# Definimos todos os schemas de entrada explicitamente para evitar uma passagem adicional sobre o conjunto de dados

restaurant_schema = StructType([
  StructField("id", StringType(), True), 
  StructField("created_at", TimestampType(), True),
  StructField("enabled", BooleanType(), True), 
  StructField("price_range", StringType(), True), 
  StructField("average_ticket", IntegerType(), True), 
  StructField("takeout_time", IntegerType(), True), 
  StructField("delivery_time", IntegerType(), True), 
  StructField("minimum_order_value", DecimalType(), True), 
  StructField("merchant_zip_code", IntegerType(), True), 
  StructField("merchant_city", StringType(), True), 
  StructField("merchant_state", StringType(), True), 
  StructField("merchant_country", StringType(), True)
])

consumer_schema = StructType([
  StructField("customer_id", StringType(), True),
  StructField("language", StringType(), True),
  StructField("created_at", StringType(), True),
  StructField("active", StringType(), True),
  StructField("customer_name", StringType(), True),
  StructField("customer_phone_area", StringType(), True),
  StructField("customer_phone_number", StringType(), True)
])

order_schema = StructType([
  StructField("cpf", StringType(), True), 
  StructField("customer_id", StringType(), True),
  StructField("customer_name", StringType(), True), 
  StructField("delivery_address_city", StringType(), True), 
  StructField("delivery_address_country", StringType(), True), 
  StructField("delivery_address_district", StringType(), True),
  StructField("delivery_address_external_id", StringType(), True), 
  StructField("delivery_address_latitude", StringType(), True) ,
  StructField("delivery_address_longitude", StringType(), True) ,
  StructField("delivery_address_state", StringType(), True) ,
  StructField("delivery_address_zip_code", StringType(), True) , 
  StructField("merchant_id", StringType(), True) ,
  StructField("merchant_latitude", StringType(), True), 
  StructField("merchant_longitude", StringType(), True), 
  StructField("merchant_timezone", StringType(), True),
  StructField("order_created_at", TimestampType(), True),
  StructField("order_id", StringType(), True),
  StructField("order_scheduled", BooleanType(), True),
  StructField("order_scheduled_date", TimestampType(), True),
  StructField("order_total_amount", DecimalType(), True),
  StructField("origin_platform", StringType(), True), 
  StructField("items", StringType(), True)
])

items_schema = ArrayType(StructType([
  StructField("name", StringType(), True), 
  StructField("quantity", DoubleType(), True),
  StructField("externalId", StringType(), True),
  StructField("sequence", IntegerType(), True),
  StructField("addition", StructType([ StructField("value", StringType(), True) , StructField("currency", StringType(), True) ]), True),
  StructField("discount", StructType([ StructField("value", StringType(), True) , StructField("currency", StringType(), True) ]), True),
  StructField("unitPrice", StructType([ StructField("value", StringType(), True) , StructField("currency", StringType(), True) ]), True),
  StructField("totalValue", StructType([ StructField("value", StringType(), True) , StructField("currency", StringType(), True) ]), True),
  StructField("customerNote", StringType(), True),
  StructField("integrationId", StringType(), True),
  StructField("categoryName", StringType(), True),
  StructField("totalAddition", StructType([ StructField("value", StringType(), True) , StructField("currency", StringType(), True) ]), True),
  StructField("totalDiscount", StructType([ StructField("value", StringType(), True) , StructField("currency", StringType(), True) ]), True),
  StructField("garnishItems", ArrayType(StructType([ 
        StructField("name", StringType(), True) , 
        StructField("quantity", DoubleType(), True),
        StructField("addition", StructType([ StructField("value", StringType(), True) , StructField("currency", StringType(), True) ]), True),
        StructField("discount", StructType([ StructField("value", StringType(), True) , StructField("currency", StringType(), True) ]), True),
        StructField("sequence", IntegerType(), True),
        StructField("unitPrice", StructType([ StructField("value", StringType(), True) , StructField("currency", StringType(), True) ]), True),
        StructField("categoryId", StringType(), True),
        StructField("categoryName", StringType(), True),
        StructField("integrationId", StringType(), True),
        StructField("totalValue", StructType([ StructField("value", StringType(), True) , StructField("currency", StringType(), True) ]), True),
              ])), True)
]))

items_garnish_schema = ArrayType(StructType([
  StructField("name", StringType(), True), 
  StructField("quantity", DoubleType(), True),
  StructField("addition", StructType([ StructField("value", StringType(), True) , StructField("currency", StringType(), True) ]), True),
  StructField("discount", StructType([ StructField("value", StringType(), True) , StructField("currency", StringType(), True) ]), True),
  StructField("sequence", IntegerType(), True),
  StructField("unitPrice", StructType([ StructField("value", StringType(), True) , StructField("currency", StringType(), True) ]), True),
  StructField("categoryId", StringType(), True),
  StructField("categoryName", StringType(), True),
  StructField("integrationId", StringType(), True),
  StructField("totalValue", StructType([ StructField("value", StringType(), True) , StructField("currency", StringType(), True) ]), True)
]))

In [0]:
PATH_ROOT = "/mnt/s3"

#display(dbutils.fs.ls(PATH_ROOT))

SOURCE_BUCKET_NAME = 'ifood-data-architect-test-source'
PATH_DELTA_BRONZE = '{}/delta_lake/bronze'.format(PATH_ROOT)
PATH_DELTA_SILVER = '{}/delta_lake/silver'.format(PATH_ROOT)
PATH_DELTA_GOLD = '{}/delta_lake/gold'.format(PATH_ROOT)
PATH_CONSUMER_ORDER = '{}/delta_lake/lookup'.format(PATH_ROOT)
PATH_TEMP = '{}/delta_lake/tmp'.format(PATH_ROOT)

# Raw Paths
PATH_RAW_RESTAURANT="s3://{}/restaurant.csv.gz".format(SOURCE_BUCKET_NAME)
PATH_RAW_CONSUMER="s3://{}/consumer.csv.gz".format(SOURCE_BUCKET_NAME)
PATH_RAW_ORDER="s3://{}/order.json.gz".format(SOURCE_BUCKET_NAME)

# Bronze Paths
PATH_DELTA_BRONZE_RESTAURANT="{}/restaurant/".format(PATH_DELTA_BRONZE)
PATH_DELTA_BRONZE_CONSUMER="{}/consumer/".format(PATH_DELTA_BRONZE)
PATH_DELTA_BRONZE_ORDER="{}/order/".format(PATH_DELTA_BRONZE)

# Silver Paths
PATH_DELTA_SILVER_RESTAURANT="{}/restaurant/".format(PATH_DELTA_SILVER)
PATH_DELTA_SILVER_CONSUMER="{}/consumer/".format(PATH_DELTA_SILVER)
PATH_DELTA_SILVER_ORDER="{}/order/".format(PATH_DELTA_SILVER)
PATH_DELTA_SILVER_ORDER_ITEM="{}/order_item/".format(PATH_DELTA_SILVER)
PATH_DELTA_SILVER_ORDER_GARNISH_ITEM="{}/garnish_item/".format(PATH_DELTA_SILVER)

# Gold Paths
PATH_DELTA_GOLD_ORDER="{}/order/".format(PATH_DELTA_GOLD)
PATH_DELTA_GOLD_ORDER_ITEM="{}/order_item/".format(PATH_DELTA_GOLD)
PATH_DELTA_GOLD_ORDER_GARNISH_ITEM="{}/garnish_item/".format(PATH_DELTA_GOLD)

# Lookup table
PATH_CONSUMER_ORDER_LOOKUP="{}/consumer_order/".format(PATH_CONSUMER_ORDER)

#dbutils.fs.rm("{}/delta_lake/".format(PATH_ROOT), True)
#dbutils.fs.mkdirs("{}/delta_lake/".format(PATH_ROOT))

In [0]:
dict_layer_tables = {
        'bronze': ['restaurant','consumer','order'],
        'silver': ['restaurant','consumer','order','order_item','garnish_item'],
        'gold': ['order','order_item','garnish_item']
    }

uuid_udf = udf(lambda : str(uuid.uuid4()),StringType())

def generate_manifest_files(*delta_layers):
  """ Create files manifest """
  for delta_layer in delta_layers:
    tables = dict_layer_tables[delta_layer]
    for table in tables:
      delta_table = DeltaTable.forPath(spark,'s3://{}/delta_lake/{}/{}'.format(S3_BUCKET_NAME, delta_layer, table))
      delta_table.generate("symlink_format_manifest")
  print('Manifest file generated to layers: {}'.format(delta_layers))
  
def optimize_delta_tables(*delta_layers):
  """ Corrigindo possíveis problemas de performace caso tenhamos muitos arquivos pequenos. 
  Assim, compactamos arquivos pequenos em um arquivo maior. """
  for delta_layer in delta_layers:
    tables = dict_layer_tables[delta_layer]
    for table in tables:
      sql_statement = "OPTIMIZE delta.`s3://{}/delta_lake/{}/{}/`;".format(S3_BUCKET_NAME, delta_layer, table)
      spark.sql(sql_statement)
  print('Tables optimized in layers: {}'.format(delta_layers))
  
def generate_symlink_format_manifest(*delta_layers):
  """ Sicronização automática dos arquivos de manifest. 
  Obs.: Isso é útil caso os dados sejam atualizados fora dessa pipeline """
  for delta_layer in delta_layers:
    tables = dict_layer_tables[delta_layer]
    for table in tables:
      sql_statement = "ALTER TABLE delta.`s3://{}/delta_lake/{}/{}/` SET TBLPROPERTIES(delta.compatibility.symlinkFormatManifest.enabled=true);".format(S3_BUCKET_NAME, delta_layer, table)
      spark.sql(sql_statement)
  print('symlinkFormatManifest enabled to all tables in layers: {}'.format(delta_layers))
  
def file_exists(path):
  try:
    dbutils.fs.ls(path)
    return True
  except Exception as e:
    if 'java.io.FileNotFoundException' in str(e):
      return False
    else:
      raise
      
def mount_s3_dir(mount_dir, path_root):
  dbutils.fs.mount(mount_dir, path_root)
  print('S3 path mounted on: {}{}'.format(mount_dir, path_root))
      
def init_s3_env(bucket_name, path_root):
  mount_dir = "s3a://%s" % bucket_name
  try:
    dbutils.fs.unmount(PATH_ROOT)
    mount_s3_dir(mount_dir, path_root)
  except Exception as e:
    if 'java.rmi.RemoteException' in str(e):
      print('Directory not mounted')
      mount_s3_dir(mount_dir, path_root)
    
init_s3_env(S3_BUCKET_NAME, PATH_ROOT)
# dbutils.fs.rm("{}/delta_lake/".format(PATH_ROOT), True)
# dbutils.fs.mkdirs("{}/delta_lake/".format(PATH_ROOT))

##### Aquisição dos dados

In [0]:
# Read from data lake
raw_restaurant = spark.read.option("header","true").schema(restaurant_schema).csv(PATH_RAW_RESTAURANT)
raw_consumer = spark.read.option('header', "true").schema(consumer_schema).csv(PATH_RAW_CONSUMER)
raw_order = spark.read.schema(order_schema).json(PATH_RAW_ORDER)

##### Prividade: Pseudonimização

In [0]:
columns_consumer_to_pseudonymous = [col("consumer.{}".format(c)).alias('consumer_{}'.format(c)) for c in raw_consumer.columns]
columns_order_to_pseudonymous = [col("order.{}".format(c)).alias("order_{}".format(c)) for c in raw_order.columns]
consumer_personal_informations = ['customer_name', 'customer_phone_area','customer_phone_number']
order_personal_informations = ['cpf', 'consumer_name']
consumer_order_lookup_columns = ['pseudonymous_id','order_customer_name', 'consumer_customer_phone_area','consumer_customer_phone_number','order_cpf']

consumer_order = raw_consumer.alias('consumer').join(raw_order.alias('order'), raw_consumer.customer_id == raw_order.customer_id).select(columns_consumer_to_pseudonymous + columns_order_to_pseudonymous)

sep_char = lit("|")
coluns_serialization = concat(upper('order_cpf'), sep_char, upper('order_customer_name'), sep_char, upper('consumer_customer_phone_area'), sep_char, upper('consumer_customer_phone_number')).alias('id_serialize')
consumer_order = consumer_order.withColumn('id_serialize', coluns_serialization)

# IMPORTANTE: Apenas titulo de exemplo. Em cenários reais obtemos a key por meio de serviços de Secret Managment, como AWS Secrets Manager ou Databricks Secret Scopes. Como no de código comentado abaixo.
# encryption_key = dbutils.secrets.get(scope = "scope-etl", key = "etlid")
encryption_key = 'A4NECBPIU5LDUEAC9A8DGLBBKSGLBYEB'

@udf(returnType=BinaryType())
def udf_pseudonymous(text, encrypt_key):
  def encrypt(text):
    cipher = AES.new(encrypt_key, AES.MODE_ECB)
    text_encrypt = cipher.encrypt(text)
    return text_encrypt
  def sha_256(text):
    hash_object = SHA256.new()
    hash_object.update(bytes(text, encoding='utf8'))
    text_hash = hash_object.hexdigest()
    return text_hash
  return encrypt(sha_256(text)) 

consumer_order_pseudonymous = consumer_order.withColumn('pseudonymous_id', udf_pseudonymous(col('id_serialize'), lit(encryption_key))) 
consumer_order_lookup = consumer_order_pseudonymous.select(consumer_order_lookup_columns)

# Demonstrating compliance with the GDPR in Recital 78 “pseudonymising personal data as soon as possible.”
consumer_order_lookup.write.format("delta").mode('overwrite').save(PATH_CONSUMER_ORDER_LOOKUP)

consumer_pseudonymous_columns = ['consumer_{}'.format(c) for c in raw_consumer.columns]
order_pseudonymous_columns = ['order_{}'.format(c) for c in raw_order.columns]

columns_pseudonymous_to_order = [col("{}".format(c_lookup)).alias('{}'.format(c_order)) for c_lookup, c_order in zip(order_pseudonymous_columns, raw_order.columns)] + [col('pseudonymous_id')]
columns_pseudonymous_to_consumer = [col("{}".format(c_lookup)).alias('{}'.format(c_consumer)) for c_lookup, c_consumer in zip(consumer_pseudonymous_columns, raw_consumer.columns)] + [col('pseudonymous_id')]

#display(consumer_order_pseudonymous.select(columns_lookup_to_consumer).drop(*consumer_personal_informations))
#display(consumer_order_pseudonymous.select(columns_lookup_to_order).drop(*order_personal_informations))

consumer = consumer_order_pseudonymous.select(columns_pseudonymous_to_consumer).drop(*consumer_personal_informations)
order = consumer_order_pseudonymous.select(columns_pseudonymous_to_order).drop(*order_personal_informations)

In [0]:
# Essa trecho de codigo é apenas um exemlo e não se aplica o pipeline. Apenas exemplificando como os dados podem ser descriptografados, caso necessarios
@udf(returnType=StringType())
def udf_decrypt(text, k):
  decipher = AES.new(k, AES.MODE_ECB)
  texts_decrypt =  decipher.decrypt(bytes(text))
  return str(texts_decrypt, encoding='utf8')
#display(consumer_order.withColumn('pseudonymous_id', udf_pseudonymous(col('id_serialize'), lit(encryption_key))))

##### Ingestão

In [0]:
# Raw Ingestion
raw_restaurant.write.format("delta").mode('overwrite').save(PATH_DELTA_BRONZE_RESTAURANT)
consumer.write.format("delta").mode('overwrite').save(PATH_DELTA_BRONZE_CONSUMER)
order.write.format("delta").mode('overwrite').save(PATH_DELTA_BRONZE_ORDER)

# Read from delta lakedata
restaurant = spark.read.format("delta").load(PATH_DELTA_BRONZE_RESTAURANT)
consumer = spark.read.format("delta").load(PATH_DELTA_BRONZE_CONSUMER)
order = spark.read.format("delta").load(PATH_DELTA_BRONZE_ORDER)

In [0]:
generate_manifest_files('bronze')

##### Transformação

In [0]:
order = order.withColumn('order_created_at_time', date_format('order_created_at', 'HH:mm:ss')) \
  .withColumn('order_created_at_date', date_format('order_created_at', 'yyyy-MM-dd')) \
  .withColumn('delivery_address_latitude', col('delivery_address_latitude').cast('decimal')) \
  .withColumn('delivery_address_longitude', col('delivery_address_longitude').cast('decimal')) \
  .withColumn('merchant_latitude', col('merchant_latitude').cast('decimal')) \
  .withColumn('merchant_longitude', col('merchant_longitude').cast('decimal')) \
  .withColumn('order_created_at', col('order_created_at').cast('timestamp')) \
  .withColumn('delivery_address_longitude', col('delivery_address_longitude').cast('decimal')) \
  .withColumn('delivery_address_longitude', col('delivery_address_longitude').cast('decimal')) \
  .withColumn("order_items_id", uuid_udf())
order_items = order.select(order.order_items_id, from_json("items", items_schema).alias("items"))
order = order.drop('items')
order_id_columns = ['pseudonymous_id', 'customer_id', 'order_id', 'order_items_id','merchant_id']
order = order.select(*order_id_columns,*(upper(col(c)).alias(c) for c in order.columns if c not in order_id_columns))

#order.write.format("delta").mode('overwrite').save(PATH_DELTA_SILVER_ORDER)

order = order.alias('order').join(restaurant.alias('restaurant'), order.merchant_id == restaurant.id).select('order.*',col('restaurant.price_range').alias('merchant_price_range'), col('restaurant.takeout_time').alias('merchant_takeout_time'), col('delivery_time').alias('merchant_delivery_time'), col('minimum_order_value').alias('merchant_minimum_order_value'), col('merchant_city').alias('merchant_city'), col('merchant_zip_code').alias('merchant_zip_code'), col('merchant_state').alias('merchant_state'), col( 'merchant_country').alias( 'merchant_country')).cache()

order.write.format("delta").mode('overwrite').save(PATH_DELTA_SILVER_ORDER)
order.write.format("delta").mode('overwrite').save(PATH_DELTA_GOLD_ORDER)

In [0]:
#order = spark.read.format("delta").load(PATH_DELTA_SILVER_ORDER)
order_items = order_items.select(col("order_items_id"), explode("items").alias('items')) \
  .select('order_items_id',
            col('items.name'),
            'items.quantity',
            col('items.addition.value').alias('addition_value'),
            col('items.addition.currency').alias('addition_currency'),
            col('items.discount.value').alias('discount_value'),
            col('items.discount.currency').alias('discount_currency'),
            
            col('items.unitPrice.value').alias('unit_price_value'),
            col('items.unitPrice.currency').alias('unit_price_currency'),
            
            col('items.totalValue.value').alias('total_value_value'),
            col('items.totalValue.currency').alias('total_value_currency'),
            
            col('items.customerNote').alias('customer_note'),
            col('items.integrationId').alias('integration_id'),
            col('items.categoryName').alias('category_name'),
            
            col('items.totalAddition.value').alias('total_addition_value'),
            col('items.totalAddition.currency').alias('total_addition_currency'),

            col('items.totalDiscount.value').alias('total_discount_value'),
            col('items.totalDiscount.currency').alias('total_discount_currency'),
                
            col('items.garnishItems').alias("garnish_items")
           ) \
          .withColumn("garnish_items_id", uuid_udf())

#order_items = order_items.select('order_id',*(upper(col(c)).alias(c) for c in order_items.columns if c not in ['order_id', 'garnish_items']))
order_garnish_items = order_items.select(col("garnish_items_id"), explode("garnish_items").alias('garnish_items'))
order_items = order_items.drop('garnish_items')

order_items_id_columns = ['order_items_id']
order_items = order_items.select(*order_items_id_columns,*(upper(col(c)).alias(c) for c in order_items.columns if c not in order_items_id_columns))

order_items = order_items.withColumn('quantity', col('quantity').cast('integer')) \
  .withColumn('addition_value', col('addition_value').cast('decimal')) \
  .withColumn('discount_value', col('discount_value').cast('decimal')) \
  .withColumn('unit_price_value', col('unit_price_value').cast('decimal')) \
  .withColumn('total_value_value', col('total_value_value').cast('decimal')) \
  .withColumn('total_addition_value', col('total_addition_value').cast('decimal')) \
  .withColumn('total_discount_value', col('total_discount_value').cast('decimal')).cache()

order_items.write.format("delta").mode('overwrite').save(PATH_DELTA_SILVER_ORDER_ITEM)
order_items.write.format("delta").mode('overwrite').save(PATH_DELTA_GOLD_ORDER_ITEM)

In [0]:
order_garnish_items = order_garnish_items.select('garnish_items_id',
                'garnish_items.name',
                'garnish_items.quantity',
                col('garnish_items.addition.value').alias('addition_value'),
                col('garnish_items.addition.currency').alias('addition_currency'),
                col('garnish_items.discount.value').alias('discount_value'),
                col('garnish_items.discount.currency').alias('discount_currency'),
                'garnish_items.sequence',
                col('garnish_items.unitPrice.value').alias('unit_price_value'),
                col('garnish_items.unitPrice.currency').alias('unit_price_currency'),
                col('garnish_items.categoryId').alias('category_id'),
                col('garnish_items.categoryName').alias('category_name'),
                col('garnish_items.integrationId').alias('integration_id'),
                col('garnish_items.totalValue.value').alias('total_value_value'),
                col('garnish_items.totalValue.currency').alias('total_value_currency')
           ) 

order_garnish_items = order_garnish_items.select('garnish_items_id',*(upper(col(c)).alias(c) for c in order_garnish_items.columns if c != 'garnish_items_id'))

order_garnish_items = order_garnish_items.withColumn('quantity', col('quantity').cast('integer')) \
  .withColumn('addition_value', col('addition_value').cast('decimal')) \
  .withColumn('discount_value', col('discount_value').cast('decimal')) \
  .withColumn('unit_price_value', col('unit_price_value').cast('decimal')) \
  .withColumn('total_value_value', col('total_value_value').cast('decimal')).cache()

order_garnish_items.write.format("delta").mode('overwrite').save(PATH_DELTA_SILVER_ORDER_GARNISH_ITEM)
order_garnish_items.write.format("delta").mode('overwrite').save(PATH_DELTA_GOLD_ORDER_GARNISH_ITEM)

In [0]:
consumer_id_columns = ['pseudonymous_id', 'customer_id']
consumer = consumer.select(*consumer_id_columns,*(upper(col(c)).alias(c) for c in consumer.columns if c not in consumer_id_columns))

consumer = consumer.withColumn('created_at', col('created_at').cast('timestamp')) \
  .withColumn('active', col('active').cast('boolean'))

consumer.write.format("delta").mode('overwrite').save(PATH_DELTA_SILVER_CONSUMER)

In [0]:
restaurant.write.format("delta").mode('overwrite').save(PATH_DELTA_SILVER_RESTAURANT)

In [0]:
generate_manifest_files('silver','gold')

In [0]:
# Adicionamente, podemos colocar as informações no mesmo conjunto de arquivos que serão usadas com muita frequencia para agregações na API.
spark.sql("OPTIMIZE delta.`s3://{}/delta_lake/gold/order/` ZORDER BY (order_scheduled_date, delivery_address_state, delivery_address_city);".format(S3_BUCKET_NAME))

# Caso haja necessacidade podemos também fazê-lo pelo id
# spark.sql("OPTIMIZE delta.`s3://{}/delta_lake/gold/order/` ZORDER BY (customer_id);".format(S3_BUCKET_NAME))

##### Fim pipeline

##### Otimizações

In [0]:
# Utilizamos overwrite durante todos os processos de escrita, mas isso varia de acordo com as regras especificadas pelo negócio. Caso, por exemplo, tenhamos um modelo incremental, pode fazer o merge dos dados.

In [0]:
generate_symlink_format_manifest('bronze','silver','gold')
#optimize_delta_tables('bronze','silver','gold')

##### Exemplo de streaming

In [0]:
%sql
drop table if exists order_table;

In [0]:
if file_exists(PATH_TEMP):
    dbutils.fs.rm(PATH_TEMP, True)
    
city = ['SAO PAULO','RIO DE JANEIRO','BELO HORIZONTE','MINAS GERAIS']
order.filter(order.delivery_address_city.isin(city)) \
.select('delivery_address_city', 'order_created_at', 
'order_id').withColumn('order_created_at_date', date_format('order_created_at', 'yyyy-MM-dd')) \
.withColumn('order_created_at_time', date_format('order_created_at', 'HH:mm:ss')) \
.write.format("delta").partitionBy("order_created_at_date").mode('overwrite').save(PATH_TEMP+'/raw_in')

df_order = spark.read.format("delta").load(PATH_TEMP+'/raw_in').limit(10)
stream_order = spark.readStream.format("delta").option("rowsPerSecond", 100).load(PATH_TEMP+'/raw_in')

stream_order.writeStream \
    .format("delta") \
    .option('checkpointLocation', PATH_TEMP + "/_chk") \
    .trigger(processingTime = "5 seconds") \
    .table("order_table")

In [0]:
display(spark.readStream.format("delta").table("order_table").groupBy("delivery_address_city").count())

In [0]:
def stop_all_streams(runtime):
  """ O streaming é realizado a título de exemplo. Assim esperamos N segundos para parar sua execução. """
  print("Waiting for {} sec's to streaming example...".format(runtime))
  time.sleep(runtime)
  try:
      [stream.stop() for stream in spark.streams.active]
      print("Streams stoped.")
  except:
    print("No stream to stop.")
    
stop_all_streams(60)