In [1]:
from pyspark.sql import SparkSession
from pyspark.sql.functions import col, explode, lit, sum, year, month, dayofmonth 
from pyspark.sql.types import *
import json

In [2]:
# Inicializar SparkSession
spark = SparkSession.builder.appName("LoadJSON").getOrCreate()

In [3]:
input_path = "C:/Projetos/cocobambu-case/src/data/raw/ERP.json"

# Verifique se o JSON é válido
with open(input_path, "r") as f:
    try:
        data = json.load(f)
        print("JSON carregado com sucesso!")
    except json.JSONDecodeError as e:
        print("Erro ao carregar JSON:", e)


JSON carregado com sucesso!


In [None]:
erp_raw_df = spark.read \
    .option("multiLine", "true") \
    .json(input_path)

# Verifique o esquema e os dados
erp_raw_df.printSchema()
erp_raw_df.show(truncate=False)

root
 |-- curUTC: string (nullable = true)
 |-- guestChecks: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- balDueTtl: string (nullable = true)
 |    |    |-- chkNum: long (nullable = true)
 |    |    |-- chkTtl: double (nullable = true)
 |    |    |-- clsdBusDt: string (nullable = true)
 |    |    |-- clsdFlag: boolean (nullable = true)
 |    |    |-- clsdLcl: string (nullable = true)
 |    |    |-- clsdUTC: string (nullable = true)
 |    |    |-- detailLines: array (nullable = true)
 |    |    |    |-- element: struct (containsNull = true)
 |    |    |    |    |-- aggQty: long (nullable = true)
 |    |    |    |    |-- aggTtl: double (nullable = true)
 |    |    |    |    |-- busDt: string (nullable = true)
 |    |    |    |    |-- chkEmpId: long (nullable = true)
 |    |    |    |    |-- chkEmpNum: long (nullable = true)
 |    |    |    |    |-- detailLcl: string (nullable = true)
 |    |    |    |    |-- detailUTC: string (nullable = true)
 |

-

In [9]:
# Explodir guestChecks e incluir locRef no contexto
guest_checks = erp_raw_df.select(
    col("locRef").alias("storeId"),  # Incluir locRef
    explode(col("guestChecks")).alias("guestCheck")
)

# Expandir guestChecks mantendo locRef
guest_checks = guest_checks.select(
    col("storeId"),  # Mantém locRef no contexto
    col("guestCheck.*")
)

In [10]:
# Criar Tabela Fato: Sales
fact_sales = guest_checks.select(
    col("guestCheckId"),
    col("storeId"),  # locRef já mapeado como storeId
    col("opnBusDt").alias("busDt"),
    col("subTtl").alias("subTotal"),
    col("dscTtl").alias("discountTotal"),
    col("chkTtl").alias("checkTotal"),
    col("payTtl").alias("paidTotal")
)

fact_sales.show()

+------------+--------+----------+--------+-------------+----------+---------+
|guestCheckId| storeId|     busDt|subTotal|discountTotal|checkTotal|paidTotal|
+------------+--------+----------+--------+-------------+----------+---------+
|  1122334455|99 CB CB|2024-01-01|   109.9|          -10|     109.9|    109.9|
+------------+--------+----------+--------+-------------+----------+---------+



In [11]:
# Adicionar somatório de taxes.taxCollTtl
taxes = guest_checks.select(
    col("guestCheckId"),
    explode(col("taxes")).alias("tax")
).select(
    col("guestCheckId"),
    col("tax.taxCollTtl").alias("taxCollected")
)

taxes.show()

+------------+------------+
|guestCheckId|taxCollected|
+------------+------------+
|  1122334455|       20.81|
+------------+------------+



In [12]:
fact_sales = fact_sales.join(
    taxes.groupBy("guestCheckId").agg(sum("taxCollected").alias("taxCollected")),
    on="guestCheckId",
    how="left"
)
fact_sales.show()

+------------+--------+----------+--------+-------------+----------+---------+------------+
|guestCheckId| storeId|     busDt|subTotal|discountTotal|checkTotal|paidTotal|taxCollected|
+------------+--------+----------+--------+-------------+----------+---------+------------+
|  1122334455|99 CB CB|2024-01-01|   109.9|          -10|     109.9|    109.9|       20.81|
+------------+--------+----------+--------+-------------+----------+---------+------------+



In [13]:
# Criar Dimensão Date
dim_date = guest_checks.select(
    col("opnBusDt").alias("date"),
    year(col("opnBusDt")).alias("year"),
    month(col("opnBusDt")).alias("month"),
    dayofmonth(col("opnBusDt")).alias("day")
).distinct()

dim_date.show()

+----------+----+-----+---+
|      date|year|month|day|
+----------+----+-----+---+
|2024-01-01|2024|    1|  1|
+----------+----+-----+---+



In [21]:
# Criar Dimensão Store
dim_store = erp_raw_df.select(
    col("locRef").alias("storeId"),
    lit("Coco Bambu").alias("storeLocation")  # Pode ser ajustado conforme necessário
).distinct()

dim_store.show()

+--------+-------------+
| storeId|storeLocation|
+--------+-------------+
|99 CB CB|   Coco Bambu|
+--------+-------------+



In [16]:
# Criar Dimensão MenuItem
detail_lines = guest_checks.select(
    col("guestCheckId"),
    explode(col("detailLines")).alias("detailLine")
)
detail_lines.show()


+------------+--------------------+
|guestCheckId|          detailLine|
+------------+--------------------+
|  1122334455|{1, 119.9, 2024-0...|
+------------+--------------------+



In [17]:
dim_menu_item = detail_lines.select(
    col("detailLine.menuItem.miNum").alias("menuItemId"),
    col("detailLine.menuItem.prcLvl").alias("priceLevel"),
    col("detailLine.menuItem.activeTaxes").alias("activeTaxes")
).distinct()


dim_menu_item.show()

+----------+----------+-----------+
|menuItemId|priceLevel|activeTaxes|
+----------+----------+-----------+
|      6042|         3|         28|
+----------+----------+-----------+



In [18]:
# Criar Dimensão Taxes
dim_taxes = guest_checks.select(
    col("guestCheckId"),
    explode(col("taxes")).alias("tax")
).select(
    col("tax.taxNum").alias("taxId"),
    col("tax.taxCollTtl").alias("taxAmount"),
    col("tax.taxRate").alias("taxRate")
).distinct()

dim_taxes.show()

+-----+---------+-------+
|taxId|taxAmount|taxRate|
+-----+---------+-------+
|   28|    20.81|     21|
+-----+---------+-------+



In [19]:
# Salvar Tabelas em Parquet
output_path = "C:\\Projetos\\cocobambu-case\\src\\data\\processed"
fact_sales.write.parquet(f"{output_path}/FactSales", mode="overwrite")
dim_date.write.parquet(f"{output_path}/DimDate", mode="overwrite")
dim_store.write.parquet(f"{output_path}/DimStore", mode="overwrite")
dim_menu_item.write.parquet(f"{output_path}/DimMenuItem", mode="overwrite")
dim_taxes.write.parquet(f"{output_path}/DimTaxes", mode="overwrite")

print("Pipeline concluído com sucesso!")

Pipeline concluído com sucesso!


---------------------------------------------------------------------------------------------------------------