**Introdução: Preparação e transformação da base de dados de pedidos**


Este notebook foi desenvolvido com o objetivo de preparar e transformar a base de dados de pedidos (order.json), tornando-a apta para análises subsequentes.
- A base de pedidos contém informações aninhadas em duas colunas de array: items e garnishItems. Para facilitar consultas e análises em um formato tabular, essas estruturas precisam ser desdobradas. Este notebook aplica técnicas de desaninhamento para garantir que cada componente seja representado de forma clara e acessível, sem perda de informações.
- Dado o tamanho da base originail (order), optou-se por processar cada um dos 9 arquivos individualmente. Após a aplicação das transformações e do desaninhamento em cada parte, as bases processadas serão unificadas em uma tabela final.

In [0]:
# Importações necessárias
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, from_json, explode_outer, length
from pyspark.sql.types import (StructType, StructField, StringType, ArrayType, DoubleType, LongType, BooleanType)

# Standard Libraries
from datetime import date, timedelta

# Spark
import pyspark.sql.functions as F
from pyspark.sql import Window
from pyspark.sql.functions import lower, get_json_object, when, coalesce
from pyspark.sql.utils import AnalysisException

spark = SparkSession.builder.appName("ProcessaJsonPedidos").getOrCreate()

## Parte 00

In [0]:
CATALOGO = "workspace"
SCHEMA = "default"
VOLUME = "arq_json"
NOME_ARQUIVO_JSON = "part-00000.json"

caminho_arquivo_json_volume = f"/Volumes/{CATALOGO}/{SCHEMA}/{VOLUME}/{NOME_ARQUIVO_JSON}"

print(f"Lendo o arquivo JSON de: {caminho_arquivo_json_volume}")

Lendo o arquivo JSON de: /Volumes/workspace/default/arq_json/part-00000.json


In [0]:
# Ler o arquivo JSON como um único objeto ou array JSON
df_raw = spark.read.json(caminho_arquivo_json_volume, multiLine=True, encoding='UTF-8')

print(f"Total de linhas: {df_raw.count()}")

Total de linhas: 367033


In [0]:
print("Retornando 10 primeiras linhas:")
try:
    df_raw_text = spark.read.text(caminho_arquivo_json_volume)
    for row in df_raw_text.limit(10).collect():
        print(row[0])
except Exception as e:
    print(f"Erro ao ler arquivo como texto: {e}")

# Os dados contêm dois arrays aninhados: items e garnishItems. Para transformar essa estrutura hierárquica em um formato tabular e facilitar a análise, é necessário desaninhá-los. Isso é feito aplicando a função explode_outer() a cada array, o que cria uma nova linha para cada elemento do array, garantindo que todos os registros originais (pedidos e itens) sejam mantidos.

Retornando 10 primeiras linhas:
{"cpf":"24917797900","customer_id":"35419c42dd9e77fa6b34811ef3cdf1f242255b6ab4aa62e936e4b1a2e5dd1952","customer_name":"ULISSES","delivery_address_city":"JUIZ DE FORA","delivery_address_country":"BR","delivery_address_district":"CASCATINHA","delivery_address_external_id":"6760884","delivery_address_latitude":"-43.36","delivery_address_longitude":"-21.78","delivery_address_state":"MG","delivery_address_zip_code":"36033","items":"[{\"name\": \"403 Mariano Procópio\", \"addition\": {\"value\": \"0\", \"currency\": \"BRL\"}, \"discount\": {\"value\": \"0\", \"currency\": \"BRL\"}, \"quantity\": 1.00, \"sequence\": 1, \"unitPrice\": {\"value\": \"0\", \"currency\": \"BRL\"}, \"externalId\": \"ee0f88c318af4612998392a0ead7b10c\", \"totalValue\": {\"value\": \"0\", \"currency\": \"BRL\"}, \"customerNote\": null, \"garnishItems\": [{\"name\": \"AO PONTO\", \"addition\": {\"value\": \"0\", \"currency\": \"BRL\"}, \"discount\": {\"value\": \"0\", \"currency\": \"BRL

In [0]:
# Primeiramente, devemos definir como o Spark deve interpretar a string JSON da coluna 'items'

item_schema = ArrayType(StructType([
    StructField("name", StringType(), True),
    StructField("addition", StructType([
        StructField("value", StringType(), True),
        StructField("currency", StringType(), True)
    ]), True),
    StructField("discount", StructType([
        StructField("value", StringType(), True),
        StructField("currency", StringType(), True)
    ]), True),
    StructField("quantity", DoubleType(), True),
    StructField("sequence", LongType(), True),
    StructField("unitPrice", StructType([
        StructField("value", StringType(), True),
        StructField("currency", StringType(), True)
    ]), True),
    StructField("externalId", StringType(), True),
    StructField("totalValue", StructType([
        StructField("value", StringType(), True),
        StructField("currency", StringType(), True)
    ]), True),
    StructField("customerNote", StringType(), True),
    StructField("garnishItems", ArrayType(StructType([
        StructField("name", StringType(), True),
        StructField("addition", StructType([
            StructField("value", StringType(), True),
            StructField("currency", StringType(), True)
        ]), True),
        StructField("discount", StructType([
            StructField("value", StringType(), True),
            StructField("currency", StringType(), True)
        ]), True),
        StructField("quantity", DoubleType(), True),
        StructField("sequence", LongType(), True),
        StructField("unitPrice", StructType([
            StructField("value", StringType(), True),
            StructField("currency", StringType(), True)
        ]), True),
        StructField("categoryId", StringType(), True),
        StructField("externalId", StringType(), True),
        StructField("totalValue", StructType([
            StructField("value", StringType(), True),
            StructField("currency", StringType(), True)
        ]), True),
        StructField("categoryName", StringType(), True),
        StructField("integrationId", StringType(), True)
    ])), True)
]))

In [0]:
# Checar se 'items' já mesmo um array:
if "array" in str(df_raw.schema["items"].dataType).lower():
   print("A coluna 'items' é um array. Pulando from_json.")
   df_parsed_items = df_raw
else:
   print("A coluna 'items' é uma string. Aplicando from_json.")
   df_parsed_items = df_raw.withColumn("items", from_json(col("items"), item_schema))

# Assumindo que 'items' ainda vem como string:
df_parsed_items = df_raw.withColumn("items", from_json(col("items"), item_schema))

# display(df_parsed_items)

A coluna 'items' é uma string. Aplicando from_json.


In [0]:
# Explodir o array 'items' para criar uma linha para cada item.
df_exploded_items = df_parsed_items.withColumn("item", explode_outer(col("items")))

# Seleciona as colunas principais e desaninha os campos do 'item'
df_flattened = df_exploded_items.select(
    col("cpf"),
    col("customer_id"),
    col("customer_name"),
    col("delivery_address_city"),
    col("delivery_address_country"),
    col("delivery_address_district"),
    col("delivery_address_external_id"),
    col("delivery_address_latitude"),
    col("delivery_address_longitude"),
    col("delivery_address_state"),
    col("delivery_address_zip_code"),
    col("merchant_id"),
    col("merchant_latitude"),
    col("merchant_longitude"),
    col("merchant_timezone"),
    col("order_created_at"),
    col("order_id"),
    col("order_scheduled"),
    col("order_total_amount"),
    col("origin_platform"),
    # Campos do item
    col("item.name").alias("item_name"),
    col("item.addition.value").alias("item_addition_value"),
    col("item.addition.currency").alias("item_addition_currency"),
    col("item.discount.value").alias("item_discount_value"),
    col("item.discount.currency").alias("item_discount_currency"),
    col("item.quantity").alias("item_quantity"),
    col("item.sequence").alias("item_sequence"),
    col("item.unitPrice.value").alias("item_unit_price_value"),
    col("item.unitPrice.currency").alias("item_unit_price_currency"),
    col("item.externalId").alias("item_external_id"),
    col("item.totalValue.value").alias("item_total_value_value"),
    col("item.totalValue.currency").alias("item_total_value_currency"),
    col("item.customerNote").alias("item_customer_note"),
    # Mantém o array 'garnishItems' por enquanto para explodir na próxima etapa
    col("item.garnishItems").alias("garnish_items_array") 
)

# display(df_flattened)

In [0]:
# Explodir o array 'garnishItems'.

df_final = df_flattened.withColumn("garnish_item", explode_outer(col("garnish_items_array"))) \
    .select(
        col("*"), # Seleciona todas as colunas já existentes
        # Campos do garnish_item
        col("garnish_item.name").alias("garnish_item_name"),
        col("garnish_item.addition.value").alias("garnish_item_addition_value"),
        col("garnish_item.addition.currency").alias("garnish_item_addition_currency"),
        col("garnish_item.discount.value").alias("garnish_item_discount_value"),
        col("garnish_item.discount.currency").alias("garnish_item_discount_currency"),
        col("garnish_item.quantity").alias("garnish_item_quantity"),
        col("garnish_item.sequence").alias("garnish_item_sequence"),
        col("garnish_item.unitPrice.value").alias("garnish_item_unit_price_value"),
        col("garnish_item.unitPrice.currency").alias("garnish_item_unit_price_currency"), 
        col("garnish_item.categoryId").alias("garnish_item_category_id"),
        col("garnish_item.externalId").alias("garnish_item_external_id"),
        col("garnish_item.totalValue.value").alias("garnish_item_total_value_value"),
        col("garnish_item.totalValue.currency").alias("garnish_item_total_value_currency"),
        col("garnish_item.categoryName").alias("garnish_item_category_name"),
        col("garnish_item.integrationId").alias("garnish_item_integration_id")
    ).drop("garnish_items_array", "garnish_item") # Remove as colunas de array originais

# display(df_final)

In [0]:
CATALOGO = "workspace"
SCHEMA = "default"
VOLUME = "arq_json"
NOME_TABELA_SQL = "order00_sql" 


# Salavando tabela
df_final.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable(f"{CATALOGO}.{SCHEMA}.{NOME_TABELA_SQL}")

print(f"\nTabela '{CATALOGO}.{SCHEMA}.{NOME_TABELA_SQL}' criada com sucesso.")


Tabela 'workspace.default.order00_sql' criada com sucesso.


# Parte 01

In [0]:
CATALOGO = "workspace"
SCHEMA = "default"
VOLUME = "arq_json"
NOME_ARQUIVO_JSON = "part-00001.json"

caminho_arquivo_json_volume = f"/Volumes/{CATALOGO}/{SCHEMA}/{VOLUME}/{NOME_ARQUIVO_JSON}"

print(f"Lendo o arquivo JSON de: {caminho_arquivo_json_volume}")

Lendo o arquivo JSON de: /Volumes/workspace/default/arq_json/part-00001.json


In [0]:
# Ler o arquivo JSON como um único objeto ou array JSON
df_raw = spark.read.json(caminho_arquivo_json_volume, multiLine=True, encoding='UTF-8')

print(f"Total de linhas: {df_raw.count()}")

Total de linhas: 367077


In [0]:
# Primeiramente, devemos definir como o Spark deve interpretar a string JSON da coluna 'items'

item_schema = ArrayType(StructType([
    StructField("name", StringType(), True),
    StructField("addition", StructType([
        StructField("value", StringType(), True),
        StructField("currency", StringType(), True)
    ]), True),
    StructField("discount", StructType([
        StructField("value", StringType(), True),
        StructField("currency", StringType(), True)
    ]), True),
    StructField("quantity", DoubleType(), True),
    StructField("sequence", LongType(), True),
    StructField("unitPrice", StructType([
        StructField("value", StringType(), True),
        StructField("currency", StringType(), True)
    ]), True),
    StructField("externalId", StringType(), True),
    StructField("totalValue", StructType([
        StructField("value", StringType(), True),
        StructField("currency", StringType(), True)
    ]), True),
    StructField("customerNote", StringType(), True),
    StructField("garnishItems", ArrayType(StructType([
        StructField("name", StringType(), True),
        StructField("addition", StructType([
            StructField("value", StringType(), True),
            StructField("currency", StringType(), True)
        ]), True),
        StructField("discount", StructType([
            StructField("value", StringType(), True),
            StructField("currency", StringType(), True)
        ]), True),
        StructField("quantity", DoubleType(), True),
        StructField("sequence", LongType(), True),
        StructField("unitPrice", StructType([
            StructField("value", StringType(), True),
            StructField("currency", StringType(), True)
        ]), True),
        StructField("categoryId", StringType(), True),
        StructField("externalId", StringType(), True),
        StructField("totalValue", StructType([
            StructField("value", StringType(), True),
            StructField("currency", StringType(), True)
        ]), True),
        StructField("categoryName", StringType(), True),
        StructField("integrationId", StringType(), True)
    ])), True)
]))

In [0]:
# Checar se 'items' é mesmo um array:
if "array" in str(df_raw.schema["items"].dataType).lower():
   print("A coluna 'items' é um array. Pulando from_json.")
   df_parsed_items = df_raw
else:
   print("A coluna 'items' é uma string. Aplicando from_json.")
   df_parsed_items = df_raw.withColumn("items", from_json(col("items"), item_schema))

# Assumindo que 'items' ainda vem como string:
df_parsed_items = df_raw.withColumn("items", from_json(col("items"), item_schema))

# display(df_parsed_items)

A coluna 'items' é uma string. Aplicando from_json.


In [0]:
# Explodir o array 'items' para criar uma linha para cada item.
df_exploded_items = df_parsed_items.withColumn("item", explode_outer(col("items")))

# Seleciona as colunas principais e desaninha os campos do 'item'
df_flattened = df_exploded_items.select(
    col("cpf"),
    col("customer_id"),
    col("customer_name"),
    col("delivery_address_city"),
    col("delivery_address_country"),
    col("delivery_address_district"),
    col("delivery_address_external_id"),
    col("delivery_address_latitude"),
    col("delivery_address_longitude"),
    col("delivery_address_state"),
    col("delivery_address_zip_code"),
    col("merchant_id"),
    col("merchant_latitude"),
    col("merchant_longitude"),
    col("merchant_timezone"),
    col("order_created_at"),
    col("order_id"),
    col("order_scheduled"),
    col("order_total_amount"),
    col("origin_platform"),
    # Campos do item
    col("item.name").alias("item_name"),
    col("item.addition.value").alias("item_addition_value"),
    col("item.addition.currency").alias("item_addition_currency"),
    col("item.discount.value").alias("item_discount_value"),
    col("item.discount.currency").alias("item_discount_currency"),
    col("item.quantity").alias("item_quantity"),
    col("item.sequence").alias("item_sequence"),
    col("item.unitPrice.value").alias("item_unit_price_value"),
    col("item.unitPrice.currency").alias("item_unit_price_currency"),
    col("item.externalId").alias("item_external_id"),
    col("item.totalValue.value").alias("item_total_value_value"),
    col("item.totalValue.currency").alias("item_total_value_currency"),
    col("item.customerNote").alias("item_customer_note"),
    # Mantém o array 'garnishItems' por enquanto para explodir na próxima etapa
    col("item.garnishItems").alias("garnish_items_array") 
)

# display(df_flattened)

In [0]:
# Explodir o array 'garnishItems'.

df_final = df_flattened.withColumn("garnish_item", explode_outer(col("garnish_items_array"))) \
    .select(
        col("*"), # Seleciona todas as colunas já existentes
        # Campos do garnish_item
        col("garnish_item.name").alias("garnish_item_name"),
        col("garnish_item.addition.value").alias("garnish_item_addition_value"),
        col("garnish_item.addition.currency").alias("garnish_item_addition_currency"),
        col("garnish_item.discount.value").alias("garnish_item_discount_value"),
        col("garnish_item.discount.currency").alias("garnish_item_discount_currency"),
        col("garnish_item.quantity").alias("garnish_item_quantity"),
        col("garnish_item.sequence").alias("garnish_item_sequence"),
        col("garnish_item.unitPrice.value").alias("garnish_item_unit_price_value"),
        col("garnish_item.unitPrice.currency").alias("garnish_item_unit_price_currency"), 
        col("garnish_item.categoryId").alias("garnish_item_category_id"),
        col("garnish_item.externalId").alias("garnish_item_external_id"),
        col("garnish_item.totalValue.value").alias("garnish_item_total_value_value"),
        col("garnish_item.totalValue.currency").alias("garnish_item_total_value_currency"),
        col("garnish_item.categoryName").alias("garnish_item_category_name"),
        col("garnish_item.integrationId").alias("garnish_item_integration_id")
    ).drop("garnish_items_array", "garnish_item") # Remove as colunas de array originais

# display(df_final)

In [0]:
CATALOGO = "workspace"
SCHEMA = "default"
VOLUME = "arq_json"
NOME_TABELA_SQL = "order01_sql" 


# Salavando tabela
df_final.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable(f"{CATALOGO}.{SCHEMA}.{NOME_TABELA_SQL}")

print(f"\nTabela '{CATALOGO}.{SCHEMA}.{NOME_TABELA_SQL}' criada com sucesso.")


Tabela 'workspace.default.order01_sql' criada com sucesso.


# Parte 02

In [0]:
CATALOGO = "workspace"
SCHEMA = "default"
VOLUME = "arq_json"
NOME_ARQUIVO_JSON = "part-00002.json"

caminho_arquivo_json_volume = f"/Volumes/{CATALOGO}/{SCHEMA}/{VOLUME}/{NOME_ARQUIVO_JSON}"

print(f"Lendo o arquivo JSON de: {caminho_arquivo_json_volume}")

Lendo o arquivo JSON de: /Volumes/workspace/default/arq_json/part-00002.json


In [0]:
# Ler o arquivo JSON como um único objeto ou array JSON
df_raw = spark.read.json(caminho_arquivo_json_volume, multiLine=True, encoding='UTF-8')

print(f"Total de linhas: {df_raw.count()}")

Total de linhas: 367129


In [0]:
# Primeiramente, devemos definir como o Spark deve interpretar a string JSON da coluna 'items'

item_schema = ArrayType(StructType([
    StructField("name", StringType(), True),
    StructField("addition", StructType([
        StructField("value", StringType(), True),
        StructField("currency", StringType(), True)
    ]), True),
    StructField("discount", StructType([
        StructField("value", StringType(), True),
        StructField("currency", StringType(), True)
    ]), True),
    StructField("quantity", DoubleType(), True),
    StructField("sequence", LongType(), True),
    StructField("unitPrice", StructType([
        StructField("value", StringType(), True),
        StructField("currency", StringType(), True)
    ]), True),
    StructField("externalId", StringType(), True),
    StructField("totalValue", StructType([
        StructField("value", StringType(), True),
        StructField("currency", StringType(), True)
    ]), True),
    StructField("customerNote", StringType(), True),
    StructField("garnishItems", ArrayType(StructType([
        StructField("name", StringType(), True),
        StructField("addition", StructType([
            StructField("value", StringType(), True),
            StructField("currency", StringType(), True)
        ]), True),
        StructField("discount", StructType([
            StructField("value", StringType(), True),
            StructField("currency", StringType(), True)
        ]), True),
        StructField("quantity", DoubleType(), True),
        StructField("sequence", LongType(), True),
        StructField("unitPrice", StructType([
            StructField("value", StringType(), True),
            StructField("currency", StringType(), True)
        ]), True),
        StructField("categoryId", StringType(), True),
        StructField("externalId", StringType(), True),
        StructField("totalValue", StructType([
            StructField("value", StringType(), True),
            StructField("currency", StringType(), True)
        ]), True),
        StructField("categoryName", StringType(), True),
        StructField("integrationId", StringType(), True)
    ])), True)
]))

In [0]:
# Checar se 'items' é mesmo um array:
if "array" in str(df_raw.schema["items"].dataType).lower():
   print("A coluna 'items' é um array. Pulando from_json.")
   df_parsed_items = df_raw
else:
   print("A coluna 'items' é uma string. Aplicando from_json.")
   df_parsed_items = df_raw.withColumn("items", from_json(col("items"), item_schema))

# Assumindo que 'items' ainda vem como string:
df_parsed_items = df_raw.withColumn("items", from_json(col("items"), item_schema))

# display(df_parsed_items)

A coluna 'items' é uma string. Aplicando from_json.


In [0]:
# Explodir o array 'items' para criar uma linha para cada item.
df_exploded_items = df_parsed_items.withColumn("item", explode_outer(col("items")))

# Seleciona as colunas principais e desaninha os campos do 'item'
df_flattened = df_exploded_items.select(
    col("cpf"),
    col("customer_id"),
    col("customer_name"),
    col("delivery_address_city"),
    col("delivery_address_country"),
    col("delivery_address_district"),
    col("delivery_address_external_id"),
    col("delivery_address_latitude"),
    col("delivery_address_longitude"),
    col("delivery_address_state"),
    col("delivery_address_zip_code"),
    col("merchant_id"),
    col("merchant_latitude"),
    col("merchant_longitude"),
    col("merchant_timezone"),
    col("order_created_at"),
    col("order_id"),
    col("order_scheduled"),
    col("order_total_amount"),
    col("origin_platform"),
    # Campos do item
    col("item.name").alias("item_name"),
    col("item.addition.value").alias("item_addition_value"),
    col("item.addition.currency").alias("item_addition_currency"),
    col("item.discount.value").alias("item_discount_value"),
    col("item.discount.currency").alias("item_discount_currency"),
    col("item.quantity").alias("item_quantity"),
    col("item.sequence").alias("item_sequence"),
    col("item.unitPrice.value").alias("item_unit_price_value"),
    col("item.unitPrice.currency").alias("item_unit_price_currency"),
    col("item.externalId").alias("item_external_id"),
    col("item.totalValue.value").alias("item_total_value_value"),
    col("item.totalValue.currency").alias("item_total_value_currency"),
    col("item.customerNote").alias("item_customer_note"),
    # Mantém o array 'garnishItems' por enquanto para explodir na próxima etapa
    col("item.garnishItems").alias("garnish_items_array") 
)

# display(df_flattened)

In [0]:
# Explodir o array 'garnishItems'.

df_final = df_flattened.withColumn("garnish_item", explode_outer(col("garnish_items_array"))) \
    .select(
        col("*"), # Seleciona todas as colunas já existentes
        # Campos do garnish_item
        col("garnish_item.name").alias("garnish_item_name"),
        col("garnish_item.addition.value").alias("garnish_item_addition_value"),
        col("garnish_item.addition.currency").alias("garnish_item_addition_currency"),
        col("garnish_item.discount.value").alias("garnish_item_discount_value"),
        col("garnish_item.discount.currency").alias("garnish_item_discount_currency"),
        col("garnish_item.quantity").alias("garnish_item_quantity"),
        col("garnish_item.sequence").alias("garnish_item_sequence"),
        col("garnish_item.unitPrice.value").alias("garnish_item_unit_price_value"),
        col("garnish_item.unitPrice.currency").alias("garnish_item_unit_price_currency"), 
        col("garnish_item.categoryId").alias("garnish_item_category_id"),
        col("garnish_item.externalId").alias("garnish_item_external_id"),
        col("garnish_item.totalValue.value").alias("garnish_item_total_value_value"),
        col("garnish_item.totalValue.currency").alias("garnish_item_total_value_currency"),
        col("garnish_item.categoryName").alias("garnish_item_category_name"),
        col("garnish_item.integrationId").alias("garnish_item_integration_id")
    ).drop("garnish_items_array", "garnish_item") # Remove as colunas de array originais

# display(df_final)

In [0]:
CATALOGO = "workspace"
SCHEMA = "default"
VOLUME = "arq_json"
NOME_TABELA_SQL = "order02_sql" 


# Salavando tabela
df_final.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable(f"{CATALOGO}.{SCHEMA}.{NOME_TABELA_SQL}")

print(f"\nTabela '{CATALOGO}.{SCHEMA}.{NOME_TABELA_SQL}' criada com sucesso.")


Tabela 'workspace.default.order02_sql' criada com sucesso.


# Parte 03

In [0]:
CATALOGO = "workspace"
SCHEMA = "default"
VOLUME = "arq_json"
NOME_ARQUIVO_JSON = "part-00003.json"

caminho_arquivo_json_volume = f"/Volumes/{CATALOGO}/{SCHEMA}/{VOLUME}/{NOME_ARQUIVO_JSON}"

print(f"Lendo o arquivo JSON de: {caminho_arquivo_json_volume}")

Lendo o arquivo JSON de: /Volumes/workspace/default/arq_json/part-00003.json


In [0]:
# Ler o arquivo JSON como um único objeto ou array JSON
df_raw = spark.read.json(caminho_arquivo_json_volume, multiLine=True, encoding='UTF-8')

print(f"Total de linhas: {df_raw.count()}")

Total de linhas: 367127


In [0]:
# Primeiramente, devemos definir como o Spark deve interpretar a string JSON da coluna 'items'

item_schema = ArrayType(StructType([
    StructField("name", StringType(), True),
    StructField("addition", StructType([
        StructField("value", StringType(), True),
        StructField("currency", StringType(), True)
    ]), True),
    StructField("discount", StructType([
        StructField("value", StringType(), True),
        StructField("currency", StringType(), True)
    ]), True),
    StructField("quantity", DoubleType(), True),
    StructField("sequence", LongType(), True),
    StructField("unitPrice", StructType([
        StructField("value", StringType(), True),
        StructField("currency", StringType(), True)
    ]), True),
    StructField("externalId", StringType(), True),
    StructField("totalValue", StructType([
        StructField("value", StringType(), True),
        StructField("currency", StringType(), True)
    ]), True),
    StructField("customerNote", StringType(), True),
    StructField("garnishItems", ArrayType(StructType([
        StructField("name", StringType(), True),
        StructField("addition", StructType([
            StructField("value", StringType(), True),
            StructField("currency", StringType(), True)
        ]), True),
        StructField("discount", StructType([
            StructField("value", StringType(), True),
            StructField("currency", StringType(), True)
        ]), True),
        StructField("quantity", DoubleType(), True),
        StructField("sequence", LongType(), True),
        StructField("unitPrice", StructType([
            StructField("value", StringType(), True),
            StructField("currency", StringType(), True)
        ]), True),
        StructField("categoryId", StringType(), True),
        StructField("externalId", StringType(), True),
        StructField("totalValue", StructType([
            StructField("value", StringType(), True),
            StructField("currency", StringType(), True)
        ]), True),
        StructField("categoryName", StringType(), True),
        StructField("integrationId", StringType(), True)
    ])), True)
]))

In [0]:
# Checar se 'items' é mesmo um array:
if "array" in str(df_raw.schema["items"].dataType).lower():
   print("A coluna 'items' é um array. Pulando from_json.")
   df_parsed_items = df_raw
else:
   print("A coluna 'items' é uma string. Aplicando from_json.")
   df_parsed_items = df_raw.withColumn("items", from_json(col("items"), item_schema))

# Assumindo que 'items' ainda vem como string:
df_parsed_items = df_raw.withColumn("items", from_json(col("items"), item_schema))

# display(df_parsed_items)

A coluna 'items' é uma string. Aplicando from_json.


In [0]:
# Explodir o array 'items' para criar uma linha para cada item.
df_exploded_items = df_parsed_items.withColumn("item", explode_outer(col("items")))

# Seleciona as colunas principais e desaninha os campos do 'item'
df_flattened = df_exploded_items.select(
    col("cpf"),
    col("customer_id"),
    col("customer_name"),
    col("delivery_address_city"),
    col("delivery_address_country"),
    col("delivery_address_district"),
    col("delivery_address_external_id"),
    col("delivery_address_latitude"),
    col("delivery_address_longitude"),
    col("delivery_address_state"),
    col("delivery_address_zip_code"),
    col("merchant_id"),
    col("merchant_latitude"),
    col("merchant_longitude"),
    col("merchant_timezone"),
    col("order_created_at"),
    col("order_id"),
    col("order_scheduled"),
    col("order_total_amount"),
    col("origin_platform"),
    # Campos do item
    col("item.name").alias("item_name"),
    col("item.addition.value").alias("item_addition_value"),
    col("item.addition.currency").alias("item_addition_currency"),
    col("item.discount.value").alias("item_discount_value"),
    col("item.discount.currency").alias("item_discount_currency"),
    col("item.quantity").alias("item_quantity"),
    col("item.sequence").alias("item_sequence"),
    col("item.unitPrice.value").alias("item_unit_price_value"),
    col("item.unitPrice.currency").alias("item_unit_price_currency"),
    col("item.externalId").alias("item_external_id"),
    col("item.totalValue.value").alias("item_total_value_value"),
    col("item.totalValue.currency").alias("item_total_value_currency"),
    col("item.customerNote").alias("item_customer_note"),
    # Mantém o array 'garnishItems' por enquanto para explodir na próxima etapa
    col("item.garnishItems").alias("garnish_items_array") 
)

# display(df_flattened)

In [0]:
# Explodir o array 'garnishItems'.

df_final = df_flattened.withColumn("garnish_item", explode_outer(col("garnish_items_array"))) \
    .select(
        col("*"), # Seleciona todas as colunas já existentes
        # Campos do garnish_item
        col("garnish_item.name").alias("garnish_item_name"),
        col("garnish_item.addition.value").alias("garnish_item_addition_value"),
        col("garnish_item.addition.currency").alias("garnish_item_addition_currency"),
        col("garnish_item.discount.value").alias("garnish_item_discount_value"),
        col("garnish_item.discount.currency").alias("garnish_item_discount_currency"),
        col("garnish_item.quantity").alias("garnish_item_quantity"),
        col("garnish_item.sequence").alias("garnish_item_sequence"),
        col("garnish_item.unitPrice.value").alias("garnish_item_unit_price_value"),
        col("garnish_item.unitPrice.currency").alias("garnish_item_unit_price_currency"), 
        col("garnish_item.categoryId").alias("garnish_item_category_id"),
        col("garnish_item.externalId").alias("garnish_item_external_id"),
        col("garnish_item.totalValue.value").alias("garnish_item_total_value_value"),
        col("garnish_item.totalValue.currency").alias("garnish_item_total_value_currency"),
        col("garnish_item.categoryName").alias("garnish_item_category_name"),
        col("garnish_item.integrationId").alias("garnish_item_integration_id")
    ).drop("garnish_items_array", "garnish_item") # Remove as colunas de array originais

# display(df_final)

In [0]:
CATALOGO = "workspace"
SCHEMA = "default"
VOLUME = "arq_json"
NOME_TABELA_SQL = "order03_sql" 


# Salavando tabela
df_final.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable(f"{CATALOGO}.{SCHEMA}.{NOME_TABELA_SQL}")

print(f"\nTabela '{CATALOGO}.{SCHEMA}.{NOME_TABELA_SQL}' criada com sucesso.")


Tabela 'workspace.default.order03_sql' criada com sucesso.


# Parte 04

In [0]:
CATALOGO = "workspace"
SCHEMA = "default"
VOLUME = "arq_json"
NOME_ARQUIVO_JSON = "part-00004.json"

caminho_arquivo_json_volume = f"/Volumes/{CATALOGO}/{SCHEMA}/{VOLUME}/{NOME_ARQUIVO_JSON}"

print(f"Lendo o arquivo JSON de: {caminho_arquivo_json_volume}")

Lendo o arquivo JSON de: /Volumes/workspace/default/arq_json/part-00004.json


In [0]:
# Ler o arquivo JSON como um único objeto ou array JSON
df_raw = spark.read.json(caminho_arquivo_json_volume, multiLine=True, encoding='UTF-8')

print(f"Total de linhas: {df_raw.count()}")

Total de linhas: 367093


In [0]:
# Primeiramente, devemos definir como o Spark deve interpretar a string JSON da coluna 'items'

item_schema = ArrayType(StructType([
    StructField("name", StringType(), True),
    StructField("addition", StructType([
        StructField("value", StringType(), True),
        StructField("currency", StringType(), True)
    ]), True),
    StructField("discount", StructType([
        StructField("value", StringType(), True),
        StructField("currency", StringType(), True)
    ]), True),
    StructField("quantity", DoubleType(), True),
    StructField("sequence", LongType(), True),
    StructField("unitPrice", StructType([
        StructField("value", StringType(), True),
        StructField("currency", StringType(), True)
    ]), True),
    StructField("externalId", StringType(), True),
    StructField("totalValue", StructType([
        StructField("value", StringType(), True),
        StructField("currency", StringType(), True)
    ]), True),
    StructField("customerNote", StringType(), True),
    StructField("garnishItems", ArrayType(StructType([
        StructField("name", StringType(), True),
        StructField("addition", StructType([
            StructField("value", StringType(), True),
            StructField("currency", StringType(), True)
        ]), True),
        StructField("discount", StructType([
            StructField("value", StringType(), True),
            StructField("currency", StringType(), True)
        ]), True),
        StructField("quantity", DoubleType(), True),
        StructField("sequence", LongType(), True),
        StructField("unitPrice", StructType([
            StructField("value", StringType(), True),
            StructField("currency", StringType(), True)
        ]), True),
        StructField("categoryId", StringType(), True),
        StructField("externalId", StringType(), True),
        StructField("totalValue", StructType([
            StructField("value", StringType(), True),
            StructField("currency", StringType(), True)
        ]), True),
        StructField("categoryName", StringType(), True),
        StructField("integrationId", StringType(), True)
    ])), True)
]))

In [0]:
# Checar se 'items' é mesmo um array:
if "array" in str(df_raw.schema["items"].dataType).lower():
   print("A coluna 'items' é um array. Pulando from_json.")
   df_parsed_items = df_raw
else:
   print("A coluna 'items' é uma string. Aplicando from_json.")
   df_parsed_items = df_raw.withColumn("items", from_json(col("items"), item_schema))

# Assumindo que 'items' ainda vem como string:
df_parsed_items = df_raw.withColumn("items", from_json(col("items"), item_schema))

# display(df_parsed_items)

A coluna 'items' é uma string. Aplicando from_json.


In [0]:
# Explodir o array 'items' para criar uma linha para cada item.
df_exploded_items = df_parsed_items.withColumn("item", explode_outer(col("items")))

# Seleciona as colunas principais e desaninha os campos do 'item'
df_flattened = df_exploded_items.select(
    col("cpf"),
    col("customer_id"),
    col("customer_name"),
    col("delivery_address_city"),
    col("delivery_address_country"),
    col("delivery_address_district"),
    col("delivery_address_external_id"),
    col("delivery_address_latitude"),
    col("delivery_address_longitude"),
    col("delivery_address_state"),
    col("delivery_address_zip_code"),
    col("merchant_id"),
    col("merchant_latitude"),
    col("merchant_longitude"),
    col("merchant_timezone"),
    col("order_created_at"),
    col("order_id"),
    col("order_scheduled"),
    col("order_total_amount"),
    col("origin_platform"),
    # Campos do item
    col("item.name").alias("item_name"),
    col("item.addition.value").alias("item_addition_value"),
    col("item.addition.currency").alias("item_addition_currency"),
    col("item.discount.value").alias("item_discount_value"),
    col("item.discount.currency").alias("item_discount_currency"),
    col("item.quantity").alias("item_quantity"),
    col("item.sequence").alias("item_sequence"),
    col("item.unitPrice.value").alias("item_unit_price_value"),
    col("item.unitPrice.currency").alias("item_unit_price_currency"),
    col("item.externalId").alias("item_external_id"),
    col("item.totalValue.value").alias("item_total_value_value"),
    col("item.totalValue.currency").alias("item_total_value_currency"),
    col("item.customerNote").alias("item_customer_note"),
    # Mantém o array 'garnishItems' por enquanto para explodir na próxima etapa
    col("item.garnishItems").alias("garnish_items_array") 
)

# display(df_flattened)

In [0]:
# Explodir o array 'garnishItems'.

df_final = df_flattened.withColumn("garnish_item", explode_outer(col("garnish_items_array"))) \
    .select(
        col("*"), # Seleciona todas as colunas já existentes
        # Campos do garnish_item
        col("garnish_item.name").alias("garnish_item_name"),
        col("garnish_item.addition.value").alias("garnish_item_addition_value"),
        col("garnish_item.addition.currency").alias("garnish_item_addition_currency"),
        col("garnish_item.discount.value").alias("garnish_item_discount_value"),
        col("garnish_item.discount.currency").alias("garnish_item_discount_currency"),
        col("garnish_item.quantity").alias("garnish_item_quantity"),
        col("garnish_item.sequence").alias("garnish_item_sequence"),
        col("garnish_item.unitPrice.value").alias("garnish_item_unit_price_value"),
        col("garnish_item.unitPrice.currency").alias("garnish_item_unit_price_currency"), 
        col("garnish_item.categoryId").alias("garnish_item_category_id"),
        col("garnish_item.externalId").alias("garnish_item_external_id"),
        col("garnish_item.totalValue.value").alias("garnish_item_total_value_value"),
        col("garnish_item.totalValue.currency").alias("garnish_item_total_value_currency"),
        col("garnish_item.categoryName").alias("garnish_item_category_name"),
        col("garnish_item.integrationId").alias("garnish_item_integration_id")
    ).drop("garnish_items_array", "garnish_item") # Remove as colunas de array originais

# display(df_final)

In [0]:
CATALOGO = "workspace"
SCHEMA = "default"
VOLUME = "arq_json"
NOME_TABELA_SQL = "order04_sql" 


# Salavando tabela
df_final.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable(f"{CATALOGO}.{SCHEMA}.{NOME_TABELA_SQL}")

print(f"\nTabela '{CATALOGO}.{SCHEMA}.{NOME_TABELA_SQL}' criada com sucesso.")


Tabela 'workspace.default.order04_sql' criada com sucesso.


# Parte 05

In [0]:
CATALOGO = "workspace"
SCHEMA = "default"
VOLUME = "arq_json"
NOME_ARQUIVO_JSON = "part-00005.json"

caminho_arquivo_json_volume = f"/Volumes/{CATALOGO}/{SCHEMA}/{VOLUME}/{NOME_ARQUIVO_JSON}"

print(f"Lendo o arquivo JSON de: {caminho_arquivo_json_volume}")

Lendo o arquivo JSON de: /Volumes/workspace/default/arq_json/part-00005.json


In [0]:
# Ler o arquivo JSON como um único objeto ou array JSON
df_raw = spark.read.json(caminho_arquivo_json_volume, multiLine=True, encoding='UTF-8')

print(f"Total de linhas: {df_raw.count()}")

Total de linhas: 367030


In [0]:
# Primeiramente, devemos definir como o Spark deve interpretar a string JSON da coluna 'items'

item_schema = ArrayType(StructType([
    StructField("name", StringType(), True),
    StructField("addition", StructType([
        StructField("value", StringType(), True),
        StructField("currency", StringType(), True)
    ]), True),
    StructField("discount", StructType([
        StructField("value", StringType(), True),
        StructField("currency", StringType(), True)
    ]), True),
    StructField("quantity", DoubleType(), True),
    StructField("sequence", LongType(), True),
    StructField("unitPrice", StructType([
        StructField("value", StringType(), True),
        StructField("currency", StringType(), True)
    ]), True),
    StructField("externalId", StringType(), True),
    StructField("totalValue", StructType([
        StructField("value", StringType(), True),
        StructField("currency", StringType(), True)
    ]), True),
    StructField("customerNote", StringType(), True),
    StructField("garnishItems", ArrayType(StructType([
        StructField("name", StringType(), True),
        StructField("addition", StructType([
            StructField("value", StringType(), True),
            StructField("currency", StringType(), True)
        ]), True),
        StructField("discount", StructType([
            StructField("value", StringType(), True),
            StructField("currency", StringType(), True)
        ]), True),
        StructField("quantity", DoubleType(), True),
        StructField("sequence", LongType(), True),
        StructField("unitPrice", StructType([
            StructField("value", StringType(), True),
            StructField("currency", StringType(), True)
        ]), True),
        StructField("categoryId", StringType(), True),
        StructField("externalId", StringType(), True),
        StructField("totalValue", StructType([
            StructField("value", StringType(), True),
            StructField("currency", StringType(), True)
        ]), True),
        StructField("categoryName", StringType(), True),
        StructField("integrationId", StringType(), True)
    ])), True)
]))

In [0]:
# Checar se 'items' é mesmo um array:
if "array" in str(df_raw.schema["items"].dataType).lower():
   print("A coluna 'items' é um array. Pulando from_json.")
   df_parsed_items = df_raw
else:
   print("A coluna 'items' é uma string. Aplicando from_json.")
   df_parsed_items = df_raw.withColumn("items", from_json(col("items"), item_schema))

# Assumindo que 'items' ainda vem como string:
df_parsed_items = df_raw.withColumn("items", from_json(col("items"), item_schema))

# display(df_parsed_items)

A coluna 'items' é uma string. Aplicando from_json.


In [0]:
# Explodir o array 'items' para criar uma linha para cada item.
df_exploded_items = df_parsed_items.withColumn("item", explode_outer(col("items")))

# Seleciona as colunas principais e desaninha os campos do 'item'
df_flattened = df_exploded_items.select(
    col("cpf"),
    col("customer_id"),
    col("customer_name"),
    col("delivery_address_city"),
    col("delivery_address_country"),
    col("delivery_address_district"),
    col("delivery_address_external_id"),
    col("delivery_address_latitude"),
    col("delivery_address_longitude"),
    col("delivery_address_state"),
    col("delivery_address_zip_code"),
    col("merchant_id"),
    col("merchant_latitude"),
    col("merchant_longitude"),
    col("merchant_timezone"),
    col("order_created_at"),
    col("order_id"),
    col("order_scheduled"),
    col("order_total_amount"),
    col("origin_platform"),
    # Campos do item
    col("item.name").alias("item_name"),
    col("item.addition.value").alias("item_addition_value"),
    col("item.addition.currency").alias("item_addition_currency"),
    col("item.discount.value").alias("item_discount_value"),
    col("item.discount.currency").alias("item_discount_currency"),
    col("item.quantity").alias("item_quantity"),
    col("item.sequence").alias("item_sequence"),
    col("item.unitPrice.value").alias("item_unit_price_value"),
    col("item.unitPrice.currency").alias("item_unit_price_currency"),
    col("item.externalId").alias("item_external_id"),
    col("item.totalValue.value").alias("item_total_value_value"),
    col("item.totalValue.currency").alias("item_total_value_currency"),
    col("item.customerNote").alias("item_customer_note"),
    # Mantém o array 'garnishItems' por enquanto para explodir na próxima etapa
    col("item.garnishItems").alias("garnish_items_array") 
)

# display(df_flattened)

In [0]:
# Explodir o array 'garnishItems'.

df_final = df_flattened.withColumn("garnish_item", explode_outer(col("garnish_items_array"))) \
    .select(
        col("*"), # Seleciona todas as colunas já existentes
        # Campos do garnish_item
        col("garnish_item.name").alias("garnish_item_name"),
        col("garnish_item.addition.value").alias("garnish_item_addition_value"),
        col("garnish_item.addition.currency").alias("garnish_item_addition_currency"),
        col("garnish_item.discount.value").alias("garnish_item_discount_value"),
        col("garnish_item.discount.currency").alias("garnish_item_discount_currency"),
        col("garnish_item.quantity").alias("garnish_item_quantity"),
        col("garnish_item.sequence").alias("garnish_item_sequence"),
        col("garnish_item.unitPrice.value").alias("garnish_item_unit_price_value"),
        col("garnish_item.unitPrice.currency").alias("garnish_item_unit_price_currency"), 
        col("garnish_item.categoryId").alias("garnish_item_category_id"),
        col("garnish_item.externalId").alias("garnish_item_external_id"),
        col("garnish_item.totalValue.value").alias("garnish_item_total_value_value"),
        col("garnish_item.totalValue.currency").alias("garnish_item_total_value_currency"),
        col("garnish_item.categoryName").alias("garnish_item_category_name"),
        col("garnish_item.integrationId").alias("garnish_item_integration_id")
    ).drop("garnish_items_array", "garnish_item") # Remove as colunas de array originais

# display(df_final)

In [0]:
CATALOGO = "workspace"
SCHEMA = "default"
VOLUME = "arq_json"
NOME_TABELA_SQL = "order05_sql" 


# Salavando tabela
df_final.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable(f"{CATALOGO}.{SCHEMA}.{NOME_TABELA_SQL}")

print(f"\nTabela '{CATALOGO}.{SCHEMA}.{NOME_TABELA_SQL}' criada com sucesso.")


Tabela 'workspace.default.order05_sql' criada com sucesso.


# Parte 06

In [0]:
CATALOGO = "workspace"
SCHEMA = "default"
VOLUME = "arq_json"
NOME_ARQUIVO_JSON = "part-00006.json"

caminho_arquivo_json_volume = f"/Volumes/{CATALOGO}/{SCHEMA}/{VOLUME}/{NOME_ARQUIVO_JSON}"

print(f"Lendo o arquivo JSON de: {caminho_arquivo_json_volume}")

Lendo o arquivo JSON de: /Volumes/workspace/default/arq_json/part-00006.json


In [0]:
# Ler o arquivo JSON como um único objeto ou array JSON
df_raw = spark.read.json(caminho_arquivo_json_volume, multiLine=True, encoding='UTF-8')

print(f"Total de linhas: {df_raw.count()}")

Total de linhas: 367083


In [0]:
# Primeiramente, devemos definir como o Spark deve interpretar a string JSON da coluna 'items'

item_schema = ArrayType(StructType([
    StructField("name", StringType(), True),
    StructField("addition", StructType([
        StructField("value", StringType(), True),
        StructField("currency", StringType(), True)
    ]), True),
    StructField("discount", StructType([
        StructField("value", StringType(), True),
        StructField("currency", StringType(), True)
    ]), True),
    StructField("quantity", DoubleType(), True),
    StructField("sequence", LongType(), True),
    StructField("unitPrice", StructType([
        StructField("value", StringType(), True),
        StructField("currency", StringType(), True)
    ]), True),
    StructField("externalId", StringType(), True),
    StructField("totalValue", StructType([
        StructField("value", StringType(), True),
        StructField("currency", StringType(), True)
    ]), True),
    StructField("customerNote", StringType(), True),
    StructField("garnishItems", ArrayType(StructType([
        StructField("name", StringType(), True),
        StructField("addition", StructType([
            StructField("value", StringType(), True),
            StructField("currency", StringType(), True)
        ]), True),
        StructField("discount", StructType([
            StructField("value", StringType(), True),
            StructField("currency", StringType(), True)
        ]), True),
        StructField("quantity", DoubleType(), True),
        StructField("sequence", LongType(), True),
        StructField("unitPrice", StructType([
            StructField("value", StringType(), True),
            StructField("currency", StringType(), True)
        ]), True),
        StructField("categoryId", StringType(), True),
        StructField("externalId", StringType(), True),
        StructField("totalValue", StructType([
            StructField("value", StringType(), True),
            StructField("currency", StringType(), True)
        ]), True),
        StructField("categoryName", StringType(), True),
        StructField("integrationId", StringType(), True)
    ])), True)
]))

In [0]:
# Checar se 'items' é mesmo um array:
if "array" in str(df_raw.schema["items"].dataType).lower():
   print("A coluna 'items' é um array. Pulando from_json.")
   df_parsed_items = df_raw
else:
   print("A coluna 'items' é uma string. Aplicando from_json.")
   df_parsed_items = df_raw.withColumn("items", from_json(col("items"), item_schema))

# Assumindo que 'items' ainda vem como string:
df_parsed_items = df_raw.withColumn("items", from_json(col("items"), item_schema))

# display(df_parsed_items)

A coluna 'items' é uma string. Aplicando from_json.


In [0]:
# Explodir o array 'items' para criar uma linha para cada item.
df_exploded_items = df_parsed_items.withColumn("item", explode_outer(col("items")))

# Seleciona as colunas principais e desaninha os campos do 'item'
df_flattened = df_exploded_items.select(
    col("cpf"),
    col("customer_id"),
    col("customer_name"),
    col("delivery_address_city"),
    col("delivery_address_country"),
    col("delivery_address_district"),
    col("delivery_address_external_id"),
    col("delivery_address_latitude"),
    col("delivery_address_longitude"),
    col("delivery_address_state"),
    col("delivery_address_zip_code"),
    col("merchant_id"),
    col("merchant_latitude"),
    col("merchant_longitude"),
    col("merchant_timezone"),
    col("order_created_at"),
    col("order_id"),
    col("order_scheduled"),
    col("order_total_amount"),
    col("origin_platform"),
    # Campos do item
    col("item.name").alias("item_name"),
    col("item.addition.value").alias("item_addition_value"),
    col("item.addition.currency").alias("item_addition_currency"),
    col("item.discount.value").alias("item_discount_value"),
    col("item.discount.currency").alias("item_discount_currency"),
    col("item.quantity").alias("item_quantity"),
    col("item.sequence").alias("item_sequence"),
    col("item.unitPrice.value").alias("item_unit_price_value"),
    col("item.unitPrice.currency").alias("item_unit_price_currency"),
    col("item.externalId").alias("item_external_id"),
    col("item.totalValue.value").alias("item_total_value_value"),
    col("item.totalValue.currency").alias("item_total_value_currency"),
    col("item.customerNote").alias("item_customer_note"),
    # Mantém o array 'garnishItems' por enquanto para explodir na próxima etapa
    col("item.garnishItems").alias("garnish_items_array") 
)

# display(df_flattened)

In [0]:
# Explodir o array 'garnishItems'.

df_final = df_flattened.withColumn("garnish_item", explode_outer(col("garnish_items_array"))) \
    .select(
        col("*"), # Seleciona todas as colunas já existentes
        # Campos do garnish_item
        col("garnish_item.name").alias("garnish_item_name"),
        col("garnish_item.addition.value").alias("garnish_item_addition_value"),
        col("garnish_item.addition.currency").alias("garnish_item_addition_currency"),
        col("garnish_item.discount.value").alias("garnish_item_discount_value"),
        col("garnish_item.discount.currency").alias("garnish_item_discount_currency"),
        col("garnish_item.quantity").alias("garnish_item_quantity"),
        col("garnish_item.sequence").alias("garnish_item_sequence"),
        col("garnish_item.unitPrice.value").alias("garnish_item_unit_price_value"),
        col("garnish_item.unitPrice.currency").alias("garnish_item_unit_price_currency"), 
        col("garnish_item.categoryId").alias("garnish_item_category_id"),
        col("garnish_item.externalId").alias("garnish_item_external_id"),
        col("garnish_item.totalValue.value").alias("garnish_item_total_value_value"),
        col("garnish_item.totalValue.currency").alias("garnish_item_total_value_currency"),
        col("garnish_item.categoryName").alias("garnish_item_category_name"),
        col("garnish_item.integrationId").alias("garnish_item_integration_id")
    ).drop("garnish_items_array", "garnish_item") # Remove as colunas de array originais

# display(df_final)

In [0]:
CATALOGO = "workspace"
SCHEMA = "default"
VOLUME = "arq_json"
NOME_TABELA_SQL = "order06_sql" 


# Salavando tabela
df_final.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable(f"{CATALOGO}.{SCHEMA}.{NOME_TABELA_SQL}")

print(f"\nTabela '{CATALOGO}.{SCHEMA}.{NOME_TABELA_SQL}' criada com sucesso.")


Tabela 'workspace.default.order06_sql' criada com sucesso.


# Parte 07

In [0]:
CATALOGO = "workspace"
SCHEMA = "default"
VOLUME = "arq_json"
NOME_ARQUIVO_JSON = "part-00007.json"

caminho_arquivo_json_volume = f"/Volumes/{CATALOGO}/{SCHEMA}/{VOLUME}/{NOME_ARQUIVO_JSON}"

print(f"Lendo o arquivo JSON de: {caminho_arquivo_json_volume}")

Lendo o arquivo JSON de: /Volumes/workspace/default/arq_json/part-00007.json


In [0]:
# Ler o arquivo JSON como um único objeto ou array JSON
df_raw = spark.read.json(caminho_arquivo_json_volume, multiLine=True, encoding='UTF-8')

print(f"Total de linhas: {df_raw.count()}")

Total de linhas: 367086


In [0]:
# Primeiramente, devemos definir como o Spark deve interpretar a string JSON da coluna 'items'

item_schema = ArrayType(StructType([
    StructField("name", StringType(), True),
    StructField("addition", StructType([
        StructField("value", StringType(), True),
        StructField("currency", StringType(), True)
    ]), True),
    StructField("discount", StructType([
        StructField("value", StringType(), True),
        StructField("currency", StringType(), True)
    ]), True),
    StructField("quantity", DoubleType(), True),
    StructField("sequence", LongType(), True),
    StructField("unitPrice", StructType([
        StructField("value", StringType(), True),
        StructField("currency", StringType(), True)
    ]), True),
    StructField("externalId", StringType(), True),
    StructField("totalValue", StructType([
        StructField("value", StringType(), True),
        StructField("currency", StringType(), True)
    ]), True),
    StructField("customerNote", StringType(), True),
    StructField("garnishItems", ArrayType(StructType([
        StructField("name", StringType(), True),
        StructField("addition", StructType([
            StructField("value", StringType(), True),
            StructField("currency", StringType(), True)
        ]), True),
        StructField("discount", StructType([
            StructField("value", StringType(), True),
            StructField("currency", StringType(), True)
        ]), True),
        StructField("quantity", DoubleType(), True),
        StructField("sequence", LongType(), True),
        StructField("unitPrice", StructType([
            StructField("value", StringType(), True),
            StructField("currency", StringType(), True)
        ]), True),
        StructField("categoryId", StringType(), True),
        StructField("externalId", StringType(), True),
        StructField("totalValue", StructType([
            StructField("value", StringType(), True),
            StructField("currency", StringType(), True)
        ]), True),
        StructField("categoryName", StringType(), True),
        StructField("integrationId", StringType(), True)
    ])), True)
]))

In [0]:
# Checar se 'items' é mesmo um array:
if "array" in str(df_raw.schema["items"].dataType).lower():
   print("A coluna 'items' é um array. Pulando from_json.")
   df_parsed_items = df_raw
else:
   print("A coluna 'items' é uma string. Aplicando from_json.")
   df_parsed_items = df_raw.withColumn("items", from_json(col("items"), item_schema))

# Assumindo que 'items' ainda vem como string:
df_parsed_items = df_raw.withColumn("items", from_json(col("items"), item_schema))

# display(df_parsed_items)

A coluna 'items' é uma string. Aplicando from_json.


In [0]:
# Explodir o array 'items' para criar uma linha para cada item.
df_exploded_items = df_parsed_items.withColumn("item", explode_outer(col("items")))

# Seleciona as colunas principais e desaninha os campos do 'item'
df_flattened = df_exploded_items.select(
    col("cpf"),
    col("customer_id"),
    col("customer_name"),
    col("delivery_address_city"),
    col("delivery_address_country"),
    col("delivery_address_district"),
    col("delivery_address_external_id"),
    col("delivery_address_latitude"),
    col("delivery_address_longitude"),
    col("delivery_address_state"),
    col("delivery_address_zip_code"),
    col("merchant_id"),
    col("merchant_latitude"),
    col("merchant_longitude"),
    col("merchant_timezone"),
    col("order_created_at"),
    col("order_id"),
    col("order_scheduled"),
    col("order_total_amount"),
    col("origin_platform"),
    # Campos do item
    col("item.name").alias("item_name"),
    col("item.addition.value").alias("item_addition_value"),
    col("item.addition.currency").alias("item_addition_currency"),
    col("item.discount.value").alias("item_discount_value"),
    col("item.discount.currency").alias("item_discount_currency"),
    col("item.quantity").alias("item_quantity"),
    col("item.sequence").alias("item_sequence"),
    col("item.unitPrice.value").alias("item_unit_price_value"),
    col("item.unitPrice.currency").alias("item_unit_price_currency"),
    col("item.externalId").alias("item_external_id"),
    col("item.totalValue.value").alias("item_total_value_value"),
    col("item.totalValue.currency").alias("item_total_value_currency"),
    col("item.customerNote").alias("item_customer_note"),
    # Mantém o array 'garnishItems' por enquanto para explodir na próxima etapa
    col("item.garnishItems").alias("garnish_items_array") 
)

# display(df_flattened)

In [0]:
# Explodir o array 'garnishItems'.

df_final = df_flattened.withColumn("garnish_item", explode_outer(col("garnish_items_array"))) \
    .select(
        col("*"), # Seleciona todas as colunas já existentes
        # Campos do garnish_item
        col("garnish_item.name").alias("garnish_item_name"),
        col("garnish_item.addition.value").alias("garnish_item_addition_value"),
        col("garnish_item.addition.currency").alias("garnish_item_addition_currency"),
        col("garnish_item.discount.value").alias("garnish_item_discount_value"),
        col("garnish_item.discount.currency").alias("garnish_item_discount_currency"),
        col("garnish_item.quantity").alias("garnish_item_quantity"),
        col("garnish_item.sequence").alias("garnish_item_sequence"),
        col("garnish_item.unitPrice.value").alias("garnish_item_unit_price_value"),
        col("garnish_item.unitPrice.currency").alias("garnish_item_unit_price_currency"), 
        col("garnish_item.categoryId").alias("garnish_item_category_id"),
        col("garnish_item.externalId").alias("garnish_item_external_id"),
        col("garnish_item.totalValue.value").alias("garnish_item_total_value_value"),
        col("garnish_item.totalValue.currency").alias("garnish_item_total_value_currency"),
        col("garnish_item.categoryName").alias("garnish_item_category_name"),
        col("garnish_item.integrationId").alias("garnish_item_integration_id")
    ).drop("garnish_items_array", "garnish_item") # Remove as colunas de array originais

# display(df_final)

In [0]:
CATALOGO = "workspace"
SCHEMA = "default"
VOLUME = "arq_json"
NOME_TABELA_SQL = "order07_sql" 


# Salavando tabela
df_final.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable(f"{CATALOGO}.{SCHEMA}.{NOME_TABELA_SQL}")

print(f"\nTabela '{CATALOGO}.{SCHEMA}.{NOME_TABELA_SQL}' criada com sucesso.")


Tabela 'workspace.default.order07_sql' criada com sucesso.


# Parte 08

In [0]:
CATALOGO = "workspace"
SCHEMA = "default"
VOLUME = "arq_json"
NOME_ARQUIVO_JSON = "part-00008.json"

caminho_arquivo_json_volume = f"/Volumes/{CATALOGO}/{SCHEMA}/{VOLUME}/{NOME_ARQUIVO_JSON}"

print(f"Lendo o arquivo JSON de: {caminho_arquivo_json_volume}")

Lendo o arquivo JSON de: /Volumes/workspace/default/arq_json/part-00008.json


In [0]:
# Ler o arquivo JSON como um único objeto ou array JSON
df_raw = spark.read.json(caminho_arquivo_json_volume, multiLine=True, encoding='UTF-8')

print(f"Total de linhas: {df_raw.count()}")

Total de linhas: 367071


In [0]:
# Primeiramente, devemos definir como o Spark deve interpretar a string JSON da coluna 'items'

item_schema = ArrayType(StructType([
    StructField("name", StringType(), True),
    StructField("addition", StructType([
        StructField("value", StringType(), True),
        StructField("currency", StringType(), True)
    ]), True),
    StructField("discount", StructType([
        StructField("value", StringType(), True),
        StructField("currency", StringType(), True)
    ]), True),
    StructField("quantity", DoubleType(), True),
    StructField("sequence", LongType(), True),
    StructField("unitPrice", StructType([
        StructField("value", StringType(), True),
        StructField("currency", StringType(), True)
    ]), True),
    StructField("externalId", StringType(), True),
    StructField("totalValue", StructType([
        StructField("value", StringType(), True),
        StructField("currency", StringType(), True)
    ]), True),
    StructField("customerNote", StringType(), True),
    StructField("garnishItems", ArrayType(StructType([
        StructField("name", StringType(), True),
        StructField("addition", StructType([
            StructField("value", StringType(), True),
            StructField("currency", StringType(), True)
        ]), True),
        StructField("discount", StructType([
            StructField("value", StringType(), True),
            StructField("currency", StringType(), True)
        ]), True),
        StructField("quantity", DoubleType(), True),
        StructField("sequence", LongType(), True),
        StructField("unitPrice", StructType([
            StructField("value", StringType(), True),
            StructField("currency", StringType(), True)
        ]), True),
        StructField("categoryId", StringType(), True),
        StructField("externalId", StringType(), True),
        StructField("totalValue", StructType([
            StructField("value", StringType(), True),
            StructField("currency", StringType(), True)
        ]), True),
        StructField("categoryName", StringType(), True),
        StructField("integrationId", StringType(), True)
    ])), True)
]))

In [0]:
# Checar se 'items' é mesmo um array:
if "array" in str(df_raw.schema["items"].dataType).lower():
   print("A coluna 'items' é um array. Pulando from_json.")
   df_parsed_items = df_raw
else:
   print("A coluna 'items' é uma string. Aplicando from_json.")
   df_parsed_items = df_raw.withColumn("items", from_json(col("items"), item_schema))

# Assumindo que 'items' ainda vem como string:
df_parsed_items = df_raw.withColumn("items", from_json(col("items"), item_schema))

# display(df_parsed_items)

A coluna 'items' é uma string. Aplicando from_json.


In [0]:
# Explodir o array 'items' para criar uma linha para cada item.
df_exploded_items = df_parsed_items.withColumn("item", explode_outer(col("items")))

# Seleciona as colunas principais e desaninha os campos do 'item'
df_flattened = df_exploded_items.select(
    col("cpf"),
    col("customer_id"),
    col("customer_name"),
    col("delivery_address_city"),
    col("delivery_address_country"),
    col("delivery_address_district"),
    col("delivery_address_external_id"),
    col("delivery_address_latitude"),
    col("delivery_address_longitude"),
    col("delivery_address_state"),
    col("delivery_address_zip_code"),
    col("merchant_id"),
    col("merchant_latitude"),
    col("merchant_longitude"),
    col("merchant_timezone"),
    col("order_created_at"),
    col("order_id"),
    col("order_scheduled"),
    col("order_total_amount"),
    col("origin_platform"),
    # Campos do item
    col("item.name").alias("item_name"),
    col("item.addition.value").alias("item_addition_value"),
    col("item.addition.currency").alias("item_addition_currency"),
    col("item.discount.value").alias("item_discount_value"),
    col("item.discount.currency").alias("item_discount_currency"),
    col("item.quantity").alias("item_quantity"),
    col("item.sequence").alias("item_sequence"),
    col("item.unitPrice.value").alias("item_unit_price_value"),
    col("item.unitPrice.currency").alias("item_unit_price_currency"),
    col("item.externalId").alias("item_external_id"),
    col("item.totalValue.value").alias("item_total_value_value"),
    col("item.totalValue.currency").alias("item_total_value_currency"),
    col("item.customerNote").alias("item_customer_note"),
    # Mantém o array 'garnishItems' por enquanto para explodir na próxima etapa
    col("item.garnishItems").alias("garnish_items_array") 
)

# display(df_flattened)

In [0]:
# Explodir o array 'garnishItems'.

df_final = df_flattened.withColumn("garnish_item", explode_outer(col("garnish_items_array"))) \
    .select(
        col("*"), # Seleciona todas as colunas já existentes
        # Campos do garnish_item
        col("garnish_item.name").alias("garnish_item_name"),
        col("garnish_item.addition.value").alias("garnish_item_addition_value"),
        col("garnish_item.addition.currency").alias("garnish_item_addition_currency"),
        col("garnish_item.discount.value").alias("garnish_item_discount_value"),
        col("garnish_item.discount.currency").alias("garnish_item_discount_currency"),
        col("garnish_item.quantity").alias("garnish_item_quantity"),
        col("garnish_item.sequence").alias("garnish_item_sequence"),
        col("garnish_item.unitPrice.value").alias("garnish_item_unit_price_value"),
        col("garnish_item.unitPrice.currency").alias("garnish_item_unit_price_currency"), 
        col("garnish_item.categoryId").alias("garnish_item_category_id"),
        col("garnish_item.externalId").alias("garnish_item_external_id"),
        col("garnish_item.totalValue.value").alias("garnish_item_total_value_value"),
        col("garnish_item.totalValue.currency").alias("garnish_item_total_value_currency"),
        col("garnish_item.categoryName").alias("garnish_item_category_name"),
        col("garnish_item.integrationId").alias("garnish_item_integration_id")
    ).drop("garnish_items_array", "garnish_item") # Remove as colunas de array originais

# display(df_final)

In [0]:
CATALOGO = "workspace"
SCHEMA = "default"
VOLUME = "arq_json"
NOME_TABELA_SQL = "order08_sql" 


# Salavando tabela
df_final.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable(f"{CATALOGO}.{SCHEMA}.{NOME_TABELA_SQL}")

print(f"\nTabela '{CATALOGO}.{SCHEMA}.{NOME_TABELA_SQL}' criada com sucesso.")


Tabela 'workspace.default.order08_sql' criada com sucesso.


# Parte 09

In [0]:
CATALOGO = "workspace"
SCHEMA = "default"
VOLUME = "arq_json"
NOME_ARQUIVO_JSON = "part-00009.json"

caminho_arquivo_json_volume = f"/Volumes/{CATALOGO}/{SCHEMA}/{VOLUME}/{NOME_ARQUIVO_JSON}"

print(f"Lendo o arquivo JSON de: {caminho_arquivo_json_volume}")

Lendo o arquivo JSON de: /Volumes/workspace/default/arq_json/part-00009.json


In [0]:
# Ler o arquivo JSON como um único objeto ou array JSON
df_raw = spark.read.json(caminho_arquivo_json_volume, multiLine=True, encoding='UTF-8')

print(f"Total de linhas: {df_raw.count()}")

Total de linhas: 367097


In [0]:
# Primeiramente, devemos definir como o Spark deve interpretar a string JSON da coluna 'items'

item_schema = ArrayType(StructType([
    StructField("name", StringType(), True),
    StructField("addition", StructType([
        StructField("value", StringType(), True),
        StructField("currency", StringType(), True)
    ]), True),
    StructField("discount", StructType([
        StructField("value", StringType(), True),
        StructField("currency", StringType(), True)
    ]), True),
    StructField("quantity", DoubleType(), True),
    StructField("sequence", LongType(), True),
    StructField("unitPrice", StructType([
        StructField("value", StringType(), True),
        StructField("currency", StringType(), True)
    ]), True),
    StructField("externalId", StringType(), True),
    StructField("totalValue", StructType([
        StructField("value", StringType(), True),
        StructField("currency", StringType(), True)
    ]), True),
    StructField("customerNote", StringType(), True),
    StructField("garnishItems", ArrayType(StructType([
        StructField("name", StringType(), True),
        StructField("addition", StructType([
            StructField("value", StringType(), True),
            StructField("currency", StringType(), True)
        ]), True),
        StructField("discount", StructType([
            StructField("value", StringType(), True),
            StructField("currency", StringType(), True)
        ]), True),
        StructField("quantity", DoubleType(), True),
        StructField("sequence", LongType(), True),
        StructField("unitPrice", StructType([
            StructField("value", StringType(), True),
            StructField("currency", StringType(), True)
        ]), True),
        StructField("categoryId", StringType(), True),
        StructField("externalId", StringType(), True),
        StructField("totalValue", StructType([
            StructField("value", StringType(), True),
            StructField("currency", StringType(), True)
        ]), True),
        StructField("categoryName", StringType(), True),
        StructField("integrationId", StringType(), True)
    ])), True)
]))

In [0]:
# Checar se 'items' é mesmo um array:
if "array" in str(df_raw.schema["items"].dataType).lower():
   print("A coluna 'items' é um array. Pulando from_json.")
   df_parsed_items = df_raw
else:
   print("A coluna 'items' é uma string. Aplicando from_json.")
   df_parsed_items = df_raw.withColumn("items", from_json(col("items"), item_schema))

# Assumindo que 'items' ainda vem como string:
df_parsed_items = df_raw.withColumn("items", from_json(col("items"), item_schema))

# display(df_parsed_items)

A coluna 'items' é uma string. Aplicando from_json.


In [0]:
# Explodir o array 'items' para criar uma linha para cada item.
df_exploded_items = df_parsed_items.withColumn("item", explode_outer(col("items")))

# Seleciona as colunas principais e desaninha os campos do 'item'
df_flattened = df_exploded_items.select(
    col("cpf"),
    col("customer_id"),
    col("customer_name"),
    col("delivery_address_city"),
    col("delivery_address_country"),
    col("delivery_address_district"),
    col("delivery_address_external_id"),
    col("delivery_address_latitude"),
    col("delivery_address_longitude"),
    col("delivery_address_state"),
    col("delivery_address_zip_code"),
    col("merchant_id"),
    col("merchant_latitude"),
    col("merchant_longitude"),
    col("merchant_timezone"),
    col("order_created_at"),
    col("order_id"),
    col("order_scheduled"),
    col("order_total_amount"),
    col("origin_platform"),
    # Campos do item
    col("item.name").alias("item_name"),
    col("item.addition.value").alias("item_addition_value"),
    col("item.addition.currency").alias("item_addition_currency"),
    col("item.discount.value").alias("item_discount_value"),
    col("item.discount.currency").alias("item_discount_currency"),
    col("item.quantity").alias("item_quantity"),
    col("item.sequence").alias("item_sequence"),
    col("item.unitPrice.value").alias("item_unit_price_value"),
    col("item.unitPrice.currency").alias("item_unit_price_currency"),
    col("item.externalId").alias("item_external_id"),
    col("item.totalValue.value").alias("item_total_value_value"),
    col("item.totalValue.currency").alias("item_total_value_currency"),
    col("item.customerNote").alias("item_customer_note"),
    # Mantém o array 'garnishItems' por enquanto para explodir na próxima etapa
    col("item.garnishItems").alias("garnish_items_array") 
)

# display(df_flattened)

In [0]:
# Explodir o array 'garnishItems'.

df_final = df_flattened.withColumn("garnish_item", explode_outer(col("garnish_items_array"))) \
    .select(
        col("*"), # Seleciona todas as colunas já existentes
        # Campos do garnish_item
        col("garnish_item.name").alias("garnish_item_name"),
        col("garnish_item.addition.value").alias("garnish_item_addition_value"),
        col("garnish_item.addition.currency").alias("garnish_item_addition_currency"),
        col("garnish_item.discount.value").alias("garnish_item_discount_value"),
        col("garnish_item.discount.currency").alias("garnish_item_discount_currency"),
        col("garnish_item.quantity").alias("garnish_item_quantity"),
        col("garnish_item.sequence").alias("garnish_item_sequence"),
        col("garnish_item.unitPrice.value").alias("garnish_item_unit_price_value"),
        col("garnish_item.unitPrice.currency").alias("garnish_item_unit_price_currency"), 
        col("garnish_item.categoryId").alias("garnish_item_category_id"),
        col("garnish_item.externalId").alias("garnish_item_external_id"),
        col("garnish_item.totalValue.value").alias("garnish_item_total_value_value"),
        col("garnish_item.totalValue.currency").alias("garnish_item_total_value_currency"),
        col("garnish_item.categoryName").alias("garnish_item_category_name"),
        col("garnish_item.integrationId").alias("garnish_item_integration_id")
    ).drop("garnish_items_array", "garnish_item") # Remove as colunas de array originais

# display(df_final)

In [0]:
CATALOGO = "workspace"
SCHEMA = "default"
VOLUME = "arq_json"
NOME_TABELA_SQL = "order09_sql" 


# Salavando tabela
df_final.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable(f"{CATALOGO}.{SCHEMA}.{NOME_TABELA_SQL}")

print(f"\nTabela '{CATALOGO}.{SCHEMA}.{NOME_TABELA_SQL}' criada com sucesso.")


Tabela 'workspace.default.order09_sql' criada com sucesso.


# Tabela final order

In [0]:
CATALOGO = "workspace"
SCHEMA = "default"
NOME_TABELA_FINAL_UNIFICADA = "final_order"


# Lendo as tabelas criadas e unindo-as
df_unificado = spark.table("workspace.default.order00_sql")

df_unificado = df_unificado.unionByName(
    spark.table("workspace.default.order01_sql"),
    allowMissingColumns=True
)
df_unificado = df_unificado.unionByName(
    spark.table("workspace.default.order02_sql"),
    allowMissingColumns=True
)
df_unificado = df_unificado.unionByName(
    spark.table("workspace.default.order03_sql"),
    allowMissingColumns=True
)
df_unificado = df_unificado.unionByName(
    spark.table("workspace.default.order04_sql"),
    allowMissingColumns=True
)
df_unificado = df_unificado.unionByName(
    spark.table("workspace.default.order05_sql"),
    allowMissingColumns=True
)
df_unificado = df_unificado.unionByName(
    spark.table("workspace.default.order06_sql"),
    allowMissingColumns=True
)
df_unificado = df_unificado.unionByName(
    spark.table("workspace.default.order07_sql"),
    allowMissingColumns=True
)
df_unificado = df_unificado.unionByName(
    spark.table("workspace.default.order08_sql"),
    allowMissingColumns=True
)
df_unificado = df_unificado.unionByName(
    spark.table("workspace.default.order09_sql"),
    allowMissingColumns=True
)

print(f"Número total de registros no DataFrame unificado: {df_unificado.count()}")

# Salvando tabela final

print(f"\nSalvando o DataFrame unificado em '{CATALOGO}.{SCHEMA}.{NOME_TABELA_FINAL_UNIFICADA}'...")
df_unificado.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable(f"{CATALOGO}.{SCHEMA}.{NOME_TABELA_FINAL_UNIFICADA}")
print(f"Tabela final '{CATALOGO}.{SCHEMA}.{NOME_TABELA_FINAL_UNIFICADA}' criada com sucesso.")

Número total de registros no DataFrame unificado: 14385062

Salvando o DataFrame unificado em 'workspace.default.final_order'...
Tabela final 'workspace.default.final_order' criada com sucesso.


In [0]:
display(df_unificado.limit(10))

cpf,customer_id,customer_name,delivery_address_city,delivery_address_country,delivery_address_district,delivery_address_external_id,delivery_address_latitude,delivery_address_longitude,delivery_address_state,delivery_address_zip_code,merchant_id,merchant_latitude,merchant_longitude,merchant_timezone,order_created_at,order_id,order_scheduled,order_total_amount,origin_platform,item_name,item_addition_value,item_addition_currency,item_discount_value,item_discount_currency,item_quantity,item_sequence,item_unit_price_value,item_unit_price_currency,item_external_id,item_total_value_value,item_total_value_currency,item_customer_note,garnish_item_name,garnish_item_addition_value,garnish_item_addition_currency,garnish_item_discount_value,garnish_item_discount_currency,garnish_item_quantity,garnish_item_sequence,garnish_item_unit_price_value,garnish_item_unit_price_currency,garnish_item_category_id,garnish_item_external_id,garnish_item_total_value_value,garnish_item_total_value_currency,garnish_item_category_name,garnish_item_integration_id
24917797900,35419c42dd9e77fa6b34811ef3cdf1f242255b6ab4aa62e936e4b1a2e5dd1952,ULISSES,JUIZ DE FORA,BR,CASCATINHA,6760884,-43.36,-21.78,MG,36033,dd3915c8589797c626a80b506abf4ad85f7ff7fb3a2a61c865da0c3c834ac0bd,-43.36,-21.78,America/Sao_Paulo,2019-01-06T23:07:53.000Z,1b3f0287d7cc080e3060df78054ba251745a48e2dd6c25c296bd61957a4b6040,False,90.9,DESKTOP,403 Mariano Procópio,0,BRL,0,BRL,1.0,1,0,BRL,ee0f88c318af4612998392a0ead7b10c,0,BRL,,AO PONTO,0.0,BRL,0.0,BRL,1.0,2.0,0.0,BRL,ASC2,82d8b31676374f889675a55ea1504796,0.0,BRL,PONTO DA CARNE,
24917797900,35419c42dd9e77fa6b34811ef3cdf1f242255b6ab4aa62e936e4b1a2e5dd1952,ULISSES,JUIZ DE FORA,BR,CASCATINHA,6760884,-43.36,-21.78,MG,36033,dd3915c8589797c626a80b506abf4ad85f7ff7fb3a2a61c865da0c3c834ac0bd,-43.36,-21.78,America/Sao_Paulo,2019-01-06T23:07:53.000Z,1b3f0287d7cc080e3060df78054ba251745a48e2dd6c25c296bd61957a4b6040,False,90.9,DESKTOP,403 Mariano Procópio,0,BRL,0,BRL,1.0,1,0,BRL,ee0f88c318af4612998392a0ead7b10c,0,BRL,,Inteira,0.0,BRL,0.0,BRL,1.0,3.0,8290.0,BRL,AXUI,2dc5b08c22954891a931d4f3c6b63c73,8290.0,BRL,Tamanho,
24917797900,35419c42dd9e77fa6b34811ef3cdf1f242255b6ab4aa62e936e4b1a2e5dd1952,ULISSES,JUIZ DE FORA,BR,CASCATINHA,6760884,-43.36,-21.78,MG,36033,dd3915c8589797c626a80b506abf4ad85f7ff7fb3a2a61c865da0c3c834ac0bd,-43.36,-21.78,America/Sao_Paulo,2019-01-06T23:07:53.000Z,1b3f0287d7cc080e3060df78054ba251745a48e2dd6c25c296bd61957a4b6040,False,90.9,DESKTOP,Refrigerantes 2LT,0,BRL,0,BRL,1.0,4,0,BRL,21240296966a461989f5027060697375,0,BRL,,Coca-cola,0.0,BRL,0.0,BRL,1.0,5.0,800.0,BRL,AN5P,87b411e3029b4c319565e01663856586,800.0,BRL,Escolha,
40103289230,23033ace6c281bc27e3525f16fa0339c5e7a988e15912e647cebe00207749785,MARIA,RIO DE JANEIRO,BR,JARDIM GUANABARA,2190167,-43.21,-22.82,RJ,21940,fa140fe3df83f225f95a07243a0b7134a3a119756a428c377a847e0b38e8c853,-43.21,-22.82,America/Sao_Paulo,2018-12-29T00:07:25.000Z,f9fa11446bab9d526332677d693e28e6d7467d15bc9d44082425aef289b5c28b,False,37.9,IOS,HOT FILADÉFIA CROCANTE,0,BRL,0,BRL,1.0,1,0,BRL,98fcfbce64da4889a852ff2ea5fe7983,0,BRL,,465- Salmão,0.0,BRL,0.0,BRL,2.0,2.0,1850.0,BRL,19VS,7877e08f04644d0c8f841f0d806c82f1,3700.0,BRL,ESCOLHA UM ITEM,
40103289230,23033ace6c281bc27e3525f16fa0339c5e7a988e15912e647cebe00207749785,MARIA,RIO DE JANEIRO,BR,JARDIM GUANABARA,2190167,-43.21,-22.82,RJ,21940,fa140fe3df83f225f95a07243a0b7134a3a119756a428c377a847e0b38e8c853,-43.21,-22.82,America/Sao_Paulo,2018-12-29T00:07:25.000Z,f9fa11446bab9d526332677d693e28e6d7467d15bc9d44082425aef289b5c28b,False,37.9,IOS,673 - HASHI EXTRA,0,BRL,0,BRL,3.0,3,30,BRL,be71ce367a774194822cbb2d1be84b6b,90,BRL,,,,,,,,,,,,,,,,
94554280152,e2a4d4bd02ea325628327d1eb0a4d2536c98327c4363fddd706c9f1405a0b130,JULIANA,SAO PAULO,BR,CONJUNTO HABITACIONAL TURISTICA,9411004,-46.76,-23.48,SP,51640,410be56997129cd52afbfe92d6101ea82c6c4c9885969c40ffc2dad07bf3d71c,-46.76,-23.48,America/Sao_Paulo,2019-01-21T17:51:35.000Z,43d25ec73d3f01f27adea95082f22432f7d908cb4d94fc6f18fec664144869b2,False,70.7,IOS,COCA-COLA 600 ML,0,BRL,0,BRL,1.0,4,590,BRL,da77ebaded2a42748a8f5fa7358ae3f3,590,BRL,,,,,,,,,,,,,,,,
94554280152,e2a4d4bd02ea325628327d1eb0a4d2536c98327c4363fddd706c9f1405a0b130,JULIANA,SAO PAULO,BR,CONJUNTO HABITACIONAL TURISTICA,9411004,-46.76,-23.48,SP,51640,410be56997129cd52afbfe92d6101ea82c6c4c9885969c40ffc2dad07bf3d71c,-46.76,-23.48,America/Sao_Paulo,2019-01-21T17:51:35.000Z,43d25ec73d3f01f27adea95082f22432f7d908cb4d94fc6f18fec664144869b2,False,70.7,IOS,YAKISOBA CLASSICO GRANDE,0,BRL,0,BRL,1.0,1,3350,BRL,d49ce7f899ee4dd5b077a7c713df48c3,3350,BRL,,,,,,,,,,,,,,,,
94554280152,e2a4d4bd02ea325628327d1eb0a4d2536c98327c4363fddd706c9f1405a0b130,JULIANA,SAO PAULO,BR,CONJUNTO HABITACIONAL TURISTICA,9411004,-46.76,-23.48,SP,51640,410be56997129cd52afbfe92d6101ea82c6c4c9885969c40ffc2dad07bf3d71c,-46.76,-23.48,America/Sao_Paulo,2019-01-21T17:51:35.000Z,43d25ec73d3f01f27adea95082f22432f7d908cb4d94fc6f18fec664144869b2,False,70.7,IOS,CHA ICE TEA FUZE PESSEGO 300ML,0,BRL,0,BRL,1.0,3,550,BRL,8f2c50416d7442289ab6cd756f18f3d5,550,BRL,,,,,,,,,,,,,,,,
94554280152,e2a4d4bd02ea325628327d1eb0a4d2536c98327c4363fddd706c9f1405a0b130,JULIANA,SAO PAULO,BR,CONJUNTO HABITACIONAL TURISTICA,9411004,-46.76,-23.48,SP,51640,410be56997129cd52afbfe92d6101ea82c6c4c9885969c40ffc2dad07bf3d71c,-46.76,-23.48,America/Sao_Paulo,2019-01-21T17:51:35.000Z,43d25ec73d3f01f27adea95082f22432f7d908cb4d94fc6f18fec664144869b2,False,70.7,IOS,YAKISOBA CLASSICO PEQUENO,0,BRL,0,BRL,1.0,2,2580,BRL,26235047a2d64f1e8fc8fd12ebc4a1e6,2580,BRL,,,,,,,,,,,,,,,,
36394163232,ec5d13d99badeb9108b1c04a5b94ab4f7c8915165acf4ea9a3757ee354582047,VICTOR,RIO DE JANEIRO,BR,COPACABANA,8149407,-43.18,-22.97,RJ,22020,f5b685fda71643a4b5a29201c9fda17a3183499d1107eadd029c196ffa4ae854,-43.18,-22.97,America/Sao_Paulo,2018-12-31T22:46:13.000Z,9227ef1b80d2158e433cc3199ddedd57373cc358bb51d6cf532bc433d1453e2b,False,46.6,IOS,Mini Kibe Frito,0,BRL,0,BRL,2.0,3,380,BRL,bf778edf94d74ce39c35a40822f98376,760,BRL,,,,,,,,,,,,,,,,


# Tabela usuários, restaurantes, teste A/B

In [0]:
df_consumer = (spark.table('workspace.default.consumer'))

# display(df_consumer.limit(100))

In [0]:
df_rest = (spark.table('workspace.default.restaurant'))

# display(df_rest.limit(100))

In [0]:
df_teste = (spark.table('workspace.default.ab_test_ref'))

print(f"Número total de registros no DataFrame final: {df_teste.count()}")

# display(df_teste.limit(100))

Número total de registros no DataFrame final: 806467


In [0]:
df_rest = (spark.table("workspace.default.ab_test_ref")
    .groupBy("is_target")
    .agg(
        F.count("is_target").alias("Freq"),
        (F.count("is_target") * 1.0 / F.sum(F.count("is_target")).over(Window.partitionBy(F.lit(1))) * 100).alias("Percent")
    )
    .withColumn("Percent", F.round("Percent", 2))
    .orderBy(F.desc("Freq"))
)

display(df_rest)

is_target,Freq,Percent
target,445925,55.29
control,360542,44.71


# Join entre as tabelas

In [0]:
CATALOGO = "workspace"
SCHEMA = "default"

# 1. Leitura das tabelas
df_teste = spark.table("workspace.default.ab_test_ref").withColumnRenamed("customer_id", "customer_id_teste_ab")
df_order = spark.table("workspace.default.final_order").withColumnRenamed("customer_id", "customer_id_order")
df_consumer = spark.table("workspace.default.consumer").withColumnRenamed("created_at", "consumer_created_at")
df_restaurant = spark.table("workspace.default.restaurant").withColumnRenamed("created_at", "restaurant_created_at")


# União: tabela order e tabela teste A/B
df_final_result = (
    df_order.join(
        df_teste,
        on=df_teste["customer_id_teste_ab"] == df_order["customer_id_order"],
        how="left_outer"
    )
    .join(
        df_consumer,
        on=df_consumer["customer_id"] == df_order["customer_id_order"],
        how="left_outer"
    ).drop(df_consumer["customer_name"])
    .join(
        df_restaurant,
        on=df_order["merchant_id"] == df_restaurant["id"],
        how="left_outer"
    ).drop(df_restaurant["id"])
)


print(f"Número total de registros na tabela teste ab: {df_teste.count()}")
print(f"Número total de registros na tabela order: {df_order.count()}")
print(f"Número total de registros no DataFrame final: {df_final_result.count()}")

# display(df_final_result.limit(10))

Número total de registros na tabela teste ab: 806467
Número total de registros na tabela order: 14385062
Número total de registros no DataFrame final: 14385062


In [0]:
CATALOGO = "workspace"
SCHEMA = "default"

NOME_TABELA_RESULTADO_FINAL = "result_order"

df_final_result.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable(f"{CATALOGO}.{SCHEMA}.{NOME_TABELA_RESULTADO_FINAL}")
print(f"Tabela final '{CATALOGO}.{SCHEMA}.{NOME_TABELA_RESULTADO_FINAL}' criada com sucesso.")

Tabela final 'workspace.default.result_order' criada com sucesso.


In [0]:
CATALOGO = "workspace"
SCHEMA = "default"

# 1. Leitura das tabelas
df_consumer = spark.table("workspace.default.consumer").withColumnRenamed("created_at", "consumer_created_at")
df_teste = spark.table("workspace.default.ab_test_ref").withColumnRenamed("customer_id", "customer_id_teste_ab")
df_restaurant = spark.table("workspace.default.restaurant").withColumnRenamed("created_at", "restaurant_created_at")
df_order = spark.table("workspace.default.final_order")


# Primeira união: tabela teste A/B e tabela comsumer
df_final1 = df_teste.join(
    df_consumer,
    on=df_teste["customer_id_teste_ab"] == df_consumer["customer_id"],
    how="left_outer"
).drop(df_consumer["customer_id"])

# Segunda união: tabela order e tabela restaurant
df_final2 = df_order.join(
    df_restaurant,
    on=df_order["merchant_id"] == df_restaurant["id"],
    how="left_outer"
).drop(df_restaurant["id"], df_order["customer_name"])

# Terceira e última união: df_final1 e df_final2 (unindo todos os dados)
df_final_result = df_final1.join(
    df_final2,
    on=df_final1["customer_id_teste_ab"] == df_final2["customer_id"],
    how="left_outer"
).drop(df_final2["customer_id"])

print(f"Número total de registros na tabela teste ab: {df_teste.count()}")
print(f"Número total de registros na tabela order: {df_order.count()}")
print(f"Número total de registros no primeiro join: {df_final1.count()}")
print(f"Número total de registros no segundo join: {df_final2.count()}")
print(f"Número total de registros no DataFrame final: {df_final_result.count()}")

# display(df_final_result.limit(10))

Número total de registros na tabela teste ab: 806467
Número total de registros na tabela order: 14385062
Número total de registros no primeiro join: 806467
Número total de registros no segundo join: 14385062
Número total de registros no DataFrame final: 14337897


In [0]:
CATALOGO = "workspace"
SCHEMA = "default"

NOME_TABELA_RESULTADO_FINAL = "result_teste"

df_final_result.write \
    .format("delta") \
    .mode("overwrite") \
    .option("overwriteSchema", "true") \
    .saveAsTable(f"{CATALOGO}.{SCHEMA}.{NOME_TABELA_RESULTADO_FINAL}")
print(f"Tabela final '{CATALOGO}.{SCHEMA}.{NOME_TABELA_RESULTADO_FINAL}' criada com sucesso.")

Tabela final 'workspace.default.result_teste' criada com sucesso.


In [0]:
caminho_exportacao_csv = "/Volumes/workspace/default/arq_json/arq_csv"

print(f"Exportando o DataFrame para CSV em: {caminho_exportacao_csv}")

# O método coalesce(1) ainda é recomendado para criar um único arquivo CSV para download fácil.
# Se o seu DataFrame for muito grande, remova '.coalesce(1)' para que o Spark escreva em múltiplos arquivos.
df_final_result.coalesce(1).write \
    .format("csv") \
    .option("header", "true") \
    .mode("overwrite") \
    .save(caminho_exportacao_csv)

print(f"O arquivo CSV (ou diretório com arquivos CSV) foi salvo em: {caminho_exportacao_csv}")

Exportando o DataFrame para CSV em: /Volumes/workspace/default/arq_json/arq_csv
O arquivo CSV (ou diretório com arquivos CSV) foi salvo em: /Volumes/workspace/default/arq_json/arq_csv
