## MVP da sprint de Engenharia de Dados
### Perguntas a serem respondidas:
- Como a empresa está desempenhando ao longo do tempo? Existe sazonalidade nas vendas? Existe evolução ou involução ao longo dos anos?
- Qual a região mais lucrativa para a empresa? Ela deve investir em lojas em qual região?
- Qual o tipo de produto ao qual é mais vendido pela empresa? Existe um desempenho semelhante? Pode-se dizer que a região com mais vendas impacta no produto mais vendido?

In [0]:
display(dbutils.fs.ls("dbfs:/FileStore/tables/"))

path,name,size,modificationTime
dbfs:/FileStore/tables/categories-1.csv,categories-1.csv,2465,1744339816000
dbfs:/FileStore/tables/categories.csv,categories.csv,2465,1744339764000
dbfs:/FileStore/tables/customers-1.csv,customers-1.csv,11842,1744339816000
dbfs:/FileStore/tables/customers.csv,customers.csv,11842,1744339764000
dbfs:/FileStore/tables/employee_territories-1.csv,employee_territories-1.csv,415,1744339816000
dbfs:/FileStore/tables/employee_territories.csv,employee_territories.csv,415,1744339762000
dbfs:/FileStore/tables/employees-1.csv,employees-1.csv,6161,1744339816000
dbfs:/FileStore/tables/employees.csv,employees.csv,6161,1744339762000
dbfs:/FileStore/tables/order_details-1.csv,order_details-1.csv,44378,1744339816000
dbfs:/FileStore/tables/order_details.csv,order_details.csv,44378,1744339763000


In [0]:
# Caminhos das camadas
caminho_bronze = "/FileStore/bronze/"
caminho_prata = "/FileStore/silver/"
caminho_ouro = "/FileStore/gold/"

In [0]:
from pyspark.sql.functions import current_timestamp

# Lista das tabelas (arquivos CSV) que você informou
tabelas = [
    "categories",
    "customers",
    "employees",
    "employee_territories",
    "order_details",
    "orders",
    "products",
    "regions",
    "shippers",
    "suppliers",
    "territories"
]

# Caminho base onde os arquivos CSV estão
caminho_base = "/FileStore/tables/"

# Loop para ler, adicionar timestamp e salvar como Delta
for tabela in tabelas:
    caminho_csv = f"{caminho_base}{tabela}.csv"
    df = (
        spark.read.format("csv")
        .option("header", True)
        .option("inferSchema", True)
        .load(caminho_csv)
        .withColumn("data_ingestao", current_timestamp())
    )
    df.write.format("delta").mode("overwrite").save(f"{caminho_bronze}{tabela}")
    print(f"Tabela {tabela} salva na camada Bronze com sucesso.")

Tabela categories salva na camada Bronze com sucesso.
Tabela customers salva na camada Bronze com sucesso.
Tabela employees salva na camada Bronze com sucesso.
Tabela employee_territories salva na camada Bronze com sucesso.
Tabela order_details salva na camada Bronze com sucesso.
Tabela orders salva na camada Bronze com sucesso.
Tabela products salva na camada Bronze com sucesso.
Tabela regions salva na camada Bronze com sucesso.
Tabela shippers salva na camada Bronze com sucesso.
Tabela suppliers salva na camada Bronze com sucesso.
Tabela territories salva na camada Bronze com sucesso.


### Tabela Categories

In [0]:
# Tabela: categories
df_categories_bronze = spark.read.format("delta").load(f"{caminho_bronze}categories")

df_categories_prata = (
    df_categories_bronze
    .withColumnRenamed("CategoryID", "category_id")
    .withColumnRenamed("CategoryName", "category_name")
    .withColumnRenamed("Description", "description")
    # Se existirem colunas adicionais, trate conforme necessário
)

print("Schema de categories:")
df_categories_prata.printSchema()
display(df_categories_prata.limit(10))

# Salvar na camada Prata
df_categories_prata.write.format("delta").mode("overwrite").save(f"{caminho_prata}categories")


Schema de categories:
root
 |-- category_id: integer (nullable = true)
 |-- category_name: string (nullable = true)
 |-- description: string (nullable = true)
 |-- picture: string (nullable = true)
 |-- data_ingestao: timestamp (nullable = true)



category_id,category_name,description,picture,data_ingestao
1,Beverages,"Soft drinks, coffees, teas, beers, and ales",0x151C2F00020000000D000E0014002100FFFFFFFF4269746D617020496D616765005061696E742E5069637475726500010500000200000007000000504272757368000000000000000000A0290000424D98290000000000005600000028000000AC00000078000000010004000000000000000000880B0000880B0000080000,2025-04-12T00:43:16.698+0000
2,Condiments,"Sweet and savory sauces, relishes, spreads, and seasonings",0x151C2F00020000000D000E0014002100FFFFFFFF4269746D617020496D616765005061696E742E5069637475726500010500000200000007000000504272757368000000000000000000A0290000424D98290000000000005600000028000000AC00000078000000010004000000000000000000880B0000880B0000080000,2025-04-12T00:43:16.698+0000
3,Confections,"Desserts, candies, and sweet breads",0x151C2F00020000000D000E0014002100FFFFFFFF4269746D617020496D616765005061696E742E5069637475726500010500000200000007000000504272757368000000000000000000A0290000424D98290000000000005600000028000000AC00000078000000010004000000000000000000880B0000880B0000080000,2025-04-12T00:43:16.698+0000
4,Dairy Products,Cheeses,0x151C2F00020000000D000E0014002100FFFFFFFF4269746D617020496D616765005061696E742E5069637475726500010500000200000007000000504272757368000000000000000000A0290000424D98290000000000005600000028000000AC00000078000000010004000000000000000000880B0000880B0000080000,2025-04-12T00:43:16.698+0000
5,Grains/Cereals,"Breads, crackers, pasta, and cereal",0x151C2F00020000000D000E0014002100FFFFFFFF4269746D617020496D616765005061696E742E5069637475726500010500000200000007000000504272757368000000000000000000A0290000424D98290000000000005600000028000000AC00000078000000010004000000000000000000880B0000880B0000080000,2025-04-12T00:43:16.698+0000
6,Meat/Poultry,Prepared meats,0x151C2F00020000000D000E0014002100FFFFFFFF4269746D617020496D616765005061696E742E5069637475726500010500000200000007000000504272757368000000000000000000A0290000424D98290000000000005600000028000000AC00000078000000010004000000000000000000880B0000880B0000080000,2025-04-12T00:43:16.698+0000
7,Produce,Dried fruit and bean curd,0x151C2F00020000000D000E0014002100FFFFFFFF4269746D617020496D616765005061696E742E5069637475726500010500000200000007000000504272757368000000000000000000A0290000424D98290000000000005600000028000000AC00000078000000010004000000000000000000880B0000880B0000080000,2025-04-12T00:43:16.698+0000
8,Seafood,Seaweed and fish,0x151C2F00020000000D000E0014002100FFFFFFFF4269746D617020496D616765005061696E742E5069637475726500010500000200000007000000504272757368000000000000000000A0290000424D98290000000000005600000028000000AC00000078000000010004000000000000000000880B0000880B0000080000,2025-04-12T00:43:16.698+0000


### Tabela Customers

In [0]:
# Tabela: customers
df_customers_bronze = spark.read.format("delta").load(f"{caminho_bronze}customers")

df_customers_prata = (
    df_customers_bronze
    .withColumnRenamed("CustomerID", "customer_id")
    .withColumnRenamed("CompanyName", "company_name")
    .withColumnRenamed("ContactName", "contact_name")
    .withColumnRenamed("ContactTitle", "contact_title")
    .withColumnRenamed("Address", "address")
    .withColumnRenamed("City", "city")
    .withColumnRenamed("Region", "region")
    .withColumnRenamed("PostalCode", "postal_code")
    .withColumnRenamed("Country", "country")
    # Exemplo: filtro para garantir que não hajam nulos em customer_id
    .filter("customer_id is not null")
)

print("Schema de customers:")
df_customers_prata.printSchema()
display(df_customers_prata.limit(10))

df_customers_prata.write.format("delta").mode("overwrite").save(f"{caminho_prata}customers")


Schema de customers:
root
 |-- customer_id: string (nullable = true)
 |-- company_name: string (nullable = true)
 |-- contact_name: string (nullable = true)
 |-- contact_title: string (nullable = true)
 |-- address: string (nullable = true)
 |-- city: string (nullable = true)
 |-- region: string (nullable = true)
 |-- postal_code: string (nullable = true)
 |-- country: string (nullable = true)
 |-- phone: string (nullable = true)
 |-- fax: string (nullable = true)
 |-- data_ingestao: timestamp (nullable = true)



customer_id,company_name,contact_name,contact_title,address,city,region,postal_code,country,phone,fax,data_ingestao
ALFKI,Alfreds Futterkiste,Maria Anders,Sales Representative,Obere Str. 57,Berlin,,12209,Germany,030-0074321,030-0076545,2025-04-12T00:43:38.005+0000
ANATR,Ana Trujillo Emparedados y helados,Ana Trujillo,Owner,Avda. de la Constitución 2222,México D.F.,,05021,Mexico,(5) 555-4729,(5) 555-3745,2025-04-12T00:43:38.005+0000
ANTON,Antonio Moreno Taquería,Antonio Moreno,Owner,Mataderos 2312,México D.F.,,05023,Mexico,(5) 555-3932,,2025-04-12T00:43:38.005+0000
AROUT,Around the Horn,Thomas Hardy,Sales Representative,120 Hanover Sq.,London,,WA1 1DP,UK,(171) 555-7788,(171) 555-6750,2025-04-12T00:43:38.005+0000
BERGS,Berglunds snabbköp,Christina Berglund,Order Administrator,Berguvsvägen 8,Luleå,,S-958 22,Sweden,0921-12 34 65,0921-12 34 67,2025-04-12T00:43:38.005+0000
BLAUS,Blauer See Delikatessen,Hanna Moos,Sales Representative,Forsterstr. 57,Mannheim,,68306,Germany,0621-08460,0621-08924,2025-04-12T00:43:38.005+0000
BLONP,Blondesddsl père et fils,Frédérique Citeaux,Marketing Manager,24,place Kléber,Strasbourg,,67000,France,88.60.15.31,2025-04-12T00:43:38.005+0000
BOLID,Bólido Comidas preparadas,Martín Sommer,Owner,C/ Araquil,67,Madrid,,28023,Spain,(91) 555 22 82,2025-04-12T00:43:38.005+0000
BONAP,Bon app',Laurence Lebihan,Owner,12,rue des Bouchers,Marseille,,13008,France,91.24.45.40,2025-04-12T00:43:38.005+0000
BOTTM,Bottom-Dollar Markets,Elizabeth Lincoln,Accounting Manager,23 Tsawassen Blvd.,Tsawassen,BC,T2F 8M4,Canada,(604) 555-4729,(604) 555-3745,2025-04-12T00:43:38.005+0000


### Employees

In [0]:
from pyspark.sql.functions import col, to_date

# Leitura da tabela employees na camada Bronze
df_employees_bronze = spark.read.format("delta").load(f"{caminho_bronze}employees")

# Remover colunas desnecessárias
df_temp = df_employees_bronze.drop("PhotoPath", "Notes", "TitleOfCourtesy")

# Criar colunas temporárias convertidas para o tipo date
df_temp = df_temp.withColumn("birth_date_tmp", to_date(col("BirthDate"), "yyyy-MM-dd")) \
                 .withColumn("hire_date_tmp", to_date(col("HireDate"), "yyyy-MM-dd"))

# Dropar as colunas antigas
df_temp = df_temp.drop("BirthDate", "HireDate")

# Renomear as colunas temporárias
df_temp = df_temp.withColumnRenamed("birth_date_tmp", "birth_date") \
                 .withColumnRenamed("hire_date_tmp", "hire_date")

# Renomear outras colunas
df_employees_prata = (
    df_temp
    .withColumnRenamed("EmployeeID", "employee_id")
    .withColumnRenamed("LastName", "last_name")
    .withColumnRenamed("FirstName", "first_name")
    .withColumnRenamed("Title", "title")
    .withColumnRenamed("Address", "address")
    .withColumnRenamed("City", "city")
    .withColumnRenamed("Region", "region")
    .withColumnRenamed("PostalCode", "postal_code")
    .withColumnRenamed("Country", "country")
    .withColumnRenamed("HomePhone", "home_phone")
    .withColumnRenamed("Extension", "extension")
)

# Opcional: reorganizar as colunas para melhor leitura
colunas_ordenadas = [
    "employee_id", "last_name", "first_name", "title", "birth_date", "hire_date",
    "address", "city", "region", "postal_code", "country", "home_phone", "extension", "data_ingestao"
]
df_employees_prata = df_employees_prata.select(*colunas_ordenadas)

# Inspecionar resultado
print("Schema final da tabela employees:")
df_employees_prata.printSchema()
display(df_employees_prata.limit(10))

# Salvar na camada prata
df_employees_prata.write.format("delta").mode("overwrite").save(f"{caminho_prata}employees")


Schema final da tabela employees:
root
 |-- employee_id: integer (nullable = true)
 |-- last_name: string (nullable = true)
 |-- first_name: string (nullable = true)
 |-- title: string (nullable = true)
 |-- birth_date: date (nullable = true)
 |-- hire_date: date (nullable = true)
 |-- address: string (nullable = true)
 |-- city: string (nullable = true)
 |-- region: string (nullable = true)
 |-- postal_code: string (nullable = true)
 |-- country: string (nullable = true)
 |-- home_phone: string (nullable = true)
 |-- extension: integer (nullable = true)
 |-- data_ingestao: timestamp (nullable = true)



employee_id,last_name,first_name,title,birth_date,hire_date,address,city,region,postal_code,country,home_phone,extension,data_ingestao
1,Davolio,Nancy,Sales Representative,1948-12-08,1992-05-01,507 - 20th Ave. E. Apt. 2A,Seattle,WA,98122,USA,(206) 555-9857,5467,2025-04-12T00:43:50.318+0000
2,Fuller,Andrew,"Vice President, Sales",1952-02-19,1992-08-14,908 W. Capital Way,Tacoma,WA,98401,USA,(206) 555-9482,3457,2025-04-12T00:43:50.318+0000
3,Leverling,Janet,Sales Representative,1963-08-30,1992-04-01,722 Moss Bay Blvd.,Kirkland,WA,98033,USA,(206) 555-3412,3355,2025-04-12T00:43:50.318+0000
4,Peacock,Margaret,Sales Representative,1937-09-19,1993-05-03,4110 Old Redmond Rd.,Redmond,WA,98052,USA,(206) 555-8122,5176,2025-04-12T00:43:50.318+0000
5,Buchanan,Steven,Sales Manager,1955-03-04,1993-10-17,14 Garrett Hill,London,,SW1 8JR,UK,(71) 555-4848,3453,2025-04-12T00:43:50.318+0000
6,Suyama,Michael,Sales Representative,1963-07-02,1993-10-17,Coventry House Miner Rd.,London,,EC2 7JR,UK,(71) 555-7773,428,2025-04-12T00:43:50.318+0000
7,King,Robert,Sales Representative,1960-05-29,1994-01-02,Edgeham Hollow Winchester Way,London,,RG1 9SP,UK,(71) 555-5598,465,2025-04-12T00:43:50.318+0000
8,Callahan,Laura,Inside Sales Coordinator,1958-01-09,1994-03-05,4726 - 11th Ave. N.E.,Seattle,WA,98105,USA,(206) 555-1189,2344,2025-04-12T00:43:50.318+0000
9,Dodsworth,Anne,Sales Representative,1966-01-27,1994-11-15,7 Houndstooth Rd.,London,,WG2 7LT,UK,(71) 555-4444,452,2025-04-12T00:43:50.318+0000


[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-540095536058286>:49[0m
[1;32m     46[0m display(df_employees_prata[38;5;241m.[39mlimit([38;5;241m10[39m))
[1;32m     48[0m [38;5;66;03m# Salvar na camada prata[39;00m
[0;32m---> 49[0m df_employees_prata[38;5;241m.[39mwrite[38;5;241m.[39mformat([38;5;124m"[39m[38;5;124mdelta[39m[38;5;124m"[39m)[38;5;241m.[39mmode([38;5;124m"[39m[38;5;124moverwrite[39m[38;5;124m"[39m)[38;5;241m.[39msave([38;5;124mf[39m[38;5;124m"[39m[38;5;132;01m{[39;00mcaminho_prata[38;5;132;01m}[39;00m[38;5;124memployees[39m[38;5;124m"[39m)

File [0;32m/databricks/spark/python/pyspark/instrumentation_utils.py:48[0m, in [0;36m_wrap_function.<locals>.wrapper[0;34m(*args, **kwargs)[0m
[1;32m     46[0m start [38;5;241m=[39m time[38;5;241m.[39mperf_counter()
[1;32m     47[0m 

### Employees Territories

In [0]:
# Tabela: employee_territories
df_emp_territories_bronze = spark.read.format("delta").load(f"{caminho_bronze}employee_territories")

df_emp_territories_prata = (
    df_emp_territories_bronze
    .withColumnRenamed("EmployeeID", "employee_id")
    .withColumnRenamed("TerritoryID", "territory_id")
)

print("Schema de employee_territories:")
df_emp_territories_prata.printSchema()
display(df_emp_territories_prata.limit(10))

df_emp_territories_prata.write.format("delta").mode("overwrite").save(f"{caminho_prata}employee_territories")


Schema de employee_territories:
root
 |-- employee_id: integer (nullable = true)
 |-- territory_id: integer (nullable = true)
 |-- data_ingestao: timestamp (nullable = true)



employee_id,territory_id,data_ingestao
1,6897,2025-04-12T00:44:02.007+0000
1,19713,2025-04-12T00:44:02.007+0000
2,1581,2025-04-12T00:44:02.007+0000
2,1730,2025-04-12T00:44:02.007+0000
2,1833,2025-04-12T00:44:02.007+0000
2,2116,2025-04-12T00:44:02.007+0000
2,2139,2025-04-12T00:44:02.007+0000
2,2184,2025-04-12T00:44:02.007+0000
2,40222,2025-04-12T00:44:02.007+0000
3,30346,2025-04-12T00:44:02.007+0000


### Order Details

In [0]:
from pyspark.sql.functions import col

df_order_details_bronze = spark.read.format("delta").load(f"{caminho_bronze}order_details")

# Casts nas colunas originais antes de renomear
df_cast = (
    df_order_details_bronze
    .withColumn("UnitPrice", col("UnitPrice").cast("double"))
    .withColumn("Quantity", col("Quantity").cast("int"))
    .withColumn("Discount", col("Discount").cast("double"))
)

# Agora sim, renomeie para nomes padronizados
df_order_details_prata = (
    df_cast
    .withColumnRenamed("OrderID", "order_id")
    .withColumnRenamed("ProductID", "product_id")
    .withColumnRenamed("UnitPrice", "unit_price")
    .withColumnRenamed("Quantity", "quantity")
    .withColumnRenamed("Discount", "discount")
)

# Inspecionar resultado
print("Schema final da tabela order_details:")
df_order_details_prata.printSchema()
display(df_order_details_prata.limit(10))

# Salvar na camada prata
df_order_details_prata.write.format("delta").mode("overwrite").save(f"{caminho_prata}order_details")


Schema final da tabela order_details:
root
 |-- order_id: integer (nullable = true)
 |-- product_id: integer (nullable = true)
 |-- unit_price: double (nullable = true)
 |-- quantity: integer (nullable = true)
 |-- discount: double (nullable = true)
 |-- data_ingestao: timestamp (nullable = true)



order_id,product_id,unit_price,quantity,discount,data_ingestao
10248,11,14.0,12,0.0,2025-04-12T00:44:12.689+0000
10248,42,9.8,10,0.0,2025-04-12T00:44:12.689+0000
10248,72,34.8,5,0.0,2025-04-12T00:44:12.689+0000
10249,14,18.6,9,0.0,2025-04-12T00:44:12.689+0000
10249,51,42.4,40,0.0,2025-04-12T00:44:12.689+0000
10250,41,7.7,10,0.0,2025-04-12T00:44:12.689+0000
10250,51,42.4,35,0.15,2025-04-12T00:44:12.689+0000
10250,65,16.8,15,0.15,2025-04-12T00:44:12.689+0000
10251,22,16.8,6,0.05,2025-04-12T00:44:12.689+0000
10251,57,15.6,15,0.05,2025-04-12T00:44:12.689+0000


### Tabela Orders
Para esse caso, notamos que a coluna ship address não estava encapsulada por aspas duplas, quebrando o csv quando este era importado. Por isso foi necessário um tratamento no arquivo.

In [0]:
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark.sql.functions import current_timestamp, to_date, col

# Caminho do arquivo raw (o mesmo CSV que apresenta problemas)
raw_csv_path = "/FileStore/tables/orders.csv"
# Caminho onde será salvo na camada prata (silver)
silver_path = "/FileStore/silver/orders"

# Ler o CSV como texto (cada linha ficará na coluna "linha_original")
raw_df = spark.read.text(raw_csv_path).withColumnRenamed("value", "linha_original")
# Acrescentar a data de ingestão
raw_df = raw_df.withColumn("data_ingestao", current_timestamp())

expected_columns = [
    "orderID", "customerID", "employeeID", "orderDate", "requiredDate",
    "shippedDate", "shipVia", "freight", "shipName", "shipAddress",
    "shipCity", "shipRegion", "shipPostalCode", "shipCountry"
]

# Função que processa cada linha, ignorando o header e fazendo o merge do campo shipAddress
def parse_line_with_ingestao(row):
    data_ingestao = row["data_ingestao"]
    line = row["linha_original"]
    # Dividir a linha por vírgula simples
    parts = line.strip().split(",")
    # Ignorar header
    if parts[0] == "orderID":
        return None
    # Se a linha tiver exatamente o número esperado de campos, retorna sem alteração
    if len(parts) == len(expected_columns):
        return Row(*parts, data_ingestao)
    # Se houver campos a mais (por conta de vírgulas internas em shipAddress)
    elif len(parts) > len(expected_columns):
        fixed_prefix = parts[:9]
        fixed_suffix = parts[-4:]
        address_parts = parts[9:-4]
        ship_address = ",".join(address_parts).strip()
        full_row = fixed_prefix + [ship_address] + fixed_suffix + [data_ingestao]
        return Row(*full_row)
    else:
        return None

parsed_rdd = raw_df.rdd.map(parse_line_with_ingestao).filter(lambda x: x is not None)

schema = StructType([
    StructField("orderID", StringType(), True),
    StructField("customerID", StringType(), True),
    StructField("employeeID", StringType(), True),
    StructField("orderDate", StringType(), True),
    StructField("requiredDate", StringType(), True),
    StructField("shippedDate", StringType(), True),
    StructField("shipVia", StringType(), True),
    StructField("freight", StringType(), True),
    StructField("shipName", StringType(), True),
    StructField("shipAddress", StringType(), True),
    StructField("shipCity", StringType(), True),
    StructField("shipRegion", StringType(), True),
    StructField("shipPostalCode", StringType(), True),
    StructField("shipCountry", StringType(), True),
    StructField("data_ingestao", TimestampType(), True)
])

# Criar DataFrame com o schema definido
df_orders = spark.createDataFrame(parsed_rdd, schema=schema)

# Opcional: Exibir alguns registros para validação
print("Schema do DataFrame reconstruído:")
df_orders.printSchema()
display(df_orders.limit(10))


df_orders_prata = (
    df_orders
    .withColumn("order_date", to_date(col("orderDate")))
    .withColumn("required_date", to_date(col("requiredDate")))
    .withColumn("shipped_date", to_date(col("shippedDate")))
    .withColumn("freight", col("freight").cast("double"))
    .withColumn("employee_id", col("employeeID").cast("int"))
    .withColumn("ship_via", col("shipVia").cast("int"))
    .withColumn("order_id", col("orderID").cast("int"))
    .withColumn("customer_id", col("customerID"))
    .withColumn("ship_name", col("shipName"))
    .withColumn("ship_address", col("shipAddress"))
    .withColumn("ship_city", col("shipCity"))
    .withColumn("ship_region", col("shipRegion"))
    .withColumn("ship_postal_code", col("shipPostalCode"))
    .withColumn("ship_country", col("shipCountry"))
    .select(
        "order_id", "customer_id", "employee_id", "order_date", "required_date", "shipped_date",
        "ship_via", "freight", "ship_name", "ship_address", "ship_city",
        "ship_region", "ship_postal_code", "ship_country", "data_ingestao"
    )
)

# Inspeção final: exibir schema e alguns registros
print("Schema final da orders na camada Prata:")
df_orders_prata.printSchema()
display(df_orders_prata.limit(10))

df_orders_prata.write.format("delta").mode("overwrite").save(silver_path)
print("Tabela orders salva na camada Prata em:", silver_path)


Schema do DataFrame reconstruído:
root
 |-- orderID: string (nullable = true)
 |-- customerID: string (nullable = true)
 |-- employeeID: string (nullable = true)
 |-- orderDate: string (nullable = true)
 |-- requiredDate: string (nullable = true)
 |-- shippedDate: string (nullable = true)
 |-- shipVia: string (nullable = true)
 |-- freight: string (nullable = true)
 |-- shipName: string (nullable = true)
 |-- shipAddress: string (nullable = true)
 |-- shipCity: string (nullable = true)
 |-- shipRegion: string (nullable = true)
 |-- shipPostalCode: string (nullable = true)
 |-- shipCountry: string (nullable = true)
 |-- data_ingestao: timestamp (nullable = true)



orderID,customerID,employeeID,orderDate,requiredDate,shippedDate,shipVia,freight,shipName,shipAddress,shipCity,shipRegion,shipPostalCode,shipCountry,data_ingestao
10248,VINET,5,1996-07-04 00:00:00.000,1996-08-01 00:00:00.000,1996-07-16 00:00:00.000,3,32.38,Vins et alcools Chevalier,59 rue de l'Abbaye,Reims,,51100,France,2025-04-12T00:46:09.719+0000
10249,TOMSP,6,1996-07-05 00:00:00.000,1996-08-16 00:00:00.000,1996-07-10 00:00:00.000,1,11.61,Toms Spezialitäten,Luisenstr. 48,Münster,,44087,Germany,2025-04-12T00:46:09.719+0000
10250,HANAR,4,1996-07-08 00:00:00.000,1996-08-05 00:00:00.000,1996-07-12 00:00:00.000,2,65.83,Hanari Carnes,"Rua do Paço, 67",Rio de Janeiro,RJ,05454-876,Brazil,2025-04-12T00:46:09.719+0000
10251,VICTE,3,1996-07-08 00:00:00.000,1996-08-05 00:00:00.000,1996-07-15 00:00:00.000,1,41.34,Victuailles en stock,"2, rue du Commerce",Lyon,,69004,France,2025-04-12T00:46:09.719+0000
10252,SUPRD,4,1996-07-09 00:00:00.000,1996-08-06 00:00:00.000,1996-07-11 00:00:00.000,2,51.3,Suprêmes délices,"Boulevard Tirou, 255",Charleroi,,B-6000,Belgium,2025-04-12T00:46:09.719+0000
10253,HANAR,3,1996-07-10 00:00:00.000,1996-07-24 00:00:00.000,1996-07-16 00:00:00.000,2,58.17,Hanari Carnes,"Rua do Paço, 67",Rio de Janeiro,RJ,05454-876,Brazil,2025-04-12T00:46:09.719+0000
10254,CHOPS,5,1996-07-11 00:00:00.000,1996-08-08 00:00:00.000,1996-07-23 00:00:00.000,2,22.98,Chop-suey Chinese,Hauptstr. 31,Bern,,3012,Switzerland,2025-04-12T00:46:09.719+0000
10255,RICSU,9,1996-07-12 00:00:00.000,1996-08-09 00:00:00.000,1996-07-15 00:00:00.000,3,148.33,Richter Supermarkt,Starenweg 5,Genève,,1204,Switzerland,2025-04-12T00:46:09.719+0000
10256,WELLI,3,1996-07-15 00:00:00.000,1996-08-12 00:00:00.000,1996-07-17 00:00:00.000,2,13.97,Wellington Importadora,"Rua do Mercado, 12",Resende,SP,08737-363,Brazil,2025-04-12T00:46:09.719+0000
10257,HILAA,4,1996-07-16 00:00:00.000,1996-08-13 00:00:00.000,1996-07-22 00:00:00.000,3,81.91,HILARION-Abastos,Carrera 22 con Ave. Carlos Soublette #8-35,San Cristóbal,Táchira,5022,Venezuela,2025-04-12T00:46:09.719+0000


Schema final da orders na camada Prata:
root
 |-- order_id: integer (nullable = true)
 |-- customer_id: string (nullable = true)
 |-- employee_id: integer (nullable = true)
 |-- order_date: date (nullable = true)
 |-- required_date: date (nullable = true)
 |-- shipped_date: date (nullable = true)
 |-- ship_via: integer (nullable = true)
 |-- freight: double (nullable = true)
 |-- ship_name: string (nullable = true)
 |-- ship_address: string (nullable = true)
 |-- ship_city: string (nullable = true)
 |-- ship_region: string (nullable = true)
 |-- ship_postal_code: string (nullable = true)
 |-- ship_country: string (nullable = true)
 |-- data_ingestao: timestamp (nullable = true)



order_id,customer_id,employee_id,order_date,required_date,shipped_date,ship_via,freight,ship_name,ship_address,ship_city,ship_region,ship_postal_code,ship_country,data_ingestao
10248,VINET,5,1996-07-04,1996-08-01,1996-07-16,3,32.38,Vins et alcools Chevalier,59 rue de l'Abbaye,Reims,,51100,France,2025-04-12T00:46:09.719+0000
10249,TOMSP,6,1996-07-05,1996-08-16,1996-07-10,1,11.61,Toms Spezialitäten,Luisenstr. 48,Münster,,44087,Germany,2025-04-12T00:46:09.719+0000
10250,HANAR,4,1996-07-08,1996-08-05,1996-07-12,2,65.83,Hanari Carnes,"Rua do Paço, 67",Rio de Janeiro,RJ,05454-876,Brazil,2025-04-12T00:46:09.719+0000
10251,VICTE,3,1996-07-08,1996-08-05,1996-07-15,1,41.34,Victuailles en stock,"2, rue du Commerce",Lyon,,69004,France,2025-04-12T00:46:09.719+0000
10252,SUPRD,4,1996-07-09,1996-08-06,1996-07-11,2,51.3,Suprêmes délices,"Boulevard Tirou, 255",Charleroi,,B-6000,Belgium,2025-04-12T00:46:09.719+0000
10253,HANAR,3,1996-07-10,1996-07-24,1996-07-16,2,58.17,Hanari Carnes,"Rua do Paço, 67",Rio de Janeiro,RJ,05454-876,Brazil,2025-04-12T00:46:09.719+0000
10254,CHOPS,5,1996-07-11,1996-08-08,1996-07-23,2,22.98,Chop-suey Chinese,Hauptstr. 31,Bern,,3012,Switzerland,2025-04-12T00:46:09.719+0000
10255,RICSU,9,1996-07-12,1996-08-09,1996-07-15,3,148.33,Richter Supermarkt,Starenweg 5,Genève,,1204,Switzerland,2025-04-12T00:46:09.719+0000
10256,WELLI,3,1996-07-15,1996-08-12,1996-07-17,2,13.97,Wellington Importadora,"Rua do Mercado, 12",Resende,SP,08737-363,Brazil,2025-04-12T00:46:09.719+0000
10257,HILAA,4,1996-07-16,1996-08-13,1996-07-22,3,81.91,HILARION-Abastos,Carrera 22 con Ave. Carlos Soublette #8-35,San Cristóbal,Táchira,5022,Venezuela,2025-04-12T00:46:09.719+0000


[0;31m---------------------------------------------------------------------------[0m
[0;31mAnalysisException[0m                         Traceback (most recent call last)
File [0;32m<command-4035143838159746>:123[0m
[1;32m    118[0m display(df_orders_prata[38;5;241m.[39mlimit([38;5;241m10[39m))
[1;32m    120[0m [38;5;66;03m# -------------------------------------------------------------------[39;00m
[1;32m    121[0m [38;5;66;03m# 5. Gravar o DataFrame na camada Prata usando formato Delta[39;00m
[1;32m    122[0m [38;5;66;03m# -------------------------------------------------------------------[39;00m
[0;32m--> 123[0m df_orders_prata[38;5;241m.[39mwrite[38;5;241m.[39mformat([38;5;124m"[39m[38;5;124mdelta[39m[38;5;124m"[39m)[38;5;241m.[39mmode([38;5;124m"[39m[38;5;124moverwrite[39m[38;5;124m"[39m)[38;5;241m.[39msave(silver_path)
[1;32m    124[0m [38;5;28mprint[39m([38;5;124m"[39m[38;5;124mTabela orders salva na camada Prata em:[39m[38;5;

### Tabela Products

In [0]:
from pyspark.sql.functions import col
# Definir os caminhos das camadas
caminho_bronze = "/FileStore/bronze/"
caminho_prata  = "/FileStore/silver/"

# Ler a tabela products da camada Bronze
df_products_bronze = spark.read.format("delta").load(f"{caminho_bronze}products")

# Primeiro, realizar os casts nas colunas usando os nomes originais (do arquivo)
df_temp = df_products_bronze \
    .withColumn("UnitPrice", col("UnitPrice").cast("double")) \
    .withColumn("UnitsInStock", col("UnitsInStock").cast("int")) \
    .withColumn("UnitsOnOrder", col("UnitsOnOrder").cast("int")) \
    .withColumn("ReorderLevel", col("ReorderLevel").cast("int"))

# Em seguida, renomear as colunas conforme o padrão desejado
df_products_prata = (
    df_temp.withColumnRenamed("ProductID", "product_id")
           .withColumnRenamed("ProductName", "product_name")
           .withColumnRenamed("SupplierID", "supplier_id")
           .withColumnRenamed("CategoryID", "category_id")
           .withColumnRenamed("QuantityPerUnit", "quantity_per_unit")
           .withColumnRenamed("UnitPrice", "unit_price")
           .withColumnRenamed("UnitsInStock", "units_in_stock")
           .withColumnRenamed("UnitsOnOrder", "units_on_order")
           .withColumnRenamed("ReorderLevel", "reorder_level")
           .withColumnRenamed("Discontinued", "discontinued")
)

# Opcional: visualizar o schema e amostra dos dados para verificação
print("Schema de products na camada Prata:")
df_products_prata.printSchema()
display(df_products_prata.limit(10))

# Gravar o DataFrame processado na camada Prata
df_products_prata.write.format("delta").mode("overwrite").save(f"{caminho_prata}products")
print("Tabela products salva na camada Prata em:", caminho_prata + "products")


Schema de products na camada Prata:
root
 |-- product_id: integer (nullable = true)
 |-- product_name: string (nullable = true)
 |-- supplier_id: integer (nullable = true)
 |-- category_id: integer (nullable = true)
 |-- quantity_per_unit: string (nullable = true)
 |-- unit_price: double (nullable = true)
 |-- units_in_stock: integer (nullable = true)
 |-- units_on_order: integer (nullable = true)
 |-- reorder_level: integer (nullable = true)
 |-- discontinued: integer (nullable = true)
 |-- data_ingestao: timestamp (nullable = true)



product_id,product_name,supplier_id,category_id,quantity_per_unit,unit_price,units_in_stock,units_on_order,reorder_level,discontinued,data_ingestao
1,Chai,1,1,10 boxes x 20 bags,18.0,39,0,10,0,2025-04-12T00:44:33.449+0000
2,Chang,1,1,24 - 12 oz bottles,19.0,17,40,25,0,2025-04-12T00:44:33.449+0000
3,Aniseed Syrup,1,2,12 - 550 ml bottles,10.0,13,70,25,0,2025-04-12T00:44:33.449+0000
4,Chef Anton's Cajun Seasoning,2,2,48 - 6 oz jars,22.0,53,0,0,0,2025-04-12T00:44:33.449+0000
5,Chef Anton's Gumbo Mix,2,2,36 boxes,21.35,0,0,0,1,2025-04-12T00:44:33.449+0000
6,Grandma's Boysenberry Spread,3,2,12 - 8 oz jars,25.0,120,0,25,0,2025-04-12T00:44:33.449+0000
7,Uncle Bob's Organic Dried Pears,3,7,12 - 1 lb pkgs.,30.0,15,0,10,0,2025-04-12T00:44:33.449+0000
8,Northwoods Cranberry Sauce,3,2,12 - 12 oz jars,40.0,6,0,0,0,2025-04-12T00:44:33.449+0000
9,Mishi Kobe Niku,4,6,18 - 500 g pkgs.,97.0,29,0,0,1,2025-04-12T00:44:33.449+0000
10,Ikura,4,8,12 - 200 ml jars,31.0,31,0,0,0,2025-04-12T00:44:33.449+0000


Tabela products salva na camada Prata em: /FileStore/silver/products


### Tabela Regions

In [0]:
# Tabela: regions
df_regions_bronze = spark.read.format("delta").load(f"{caminho_bronze}regions")

df_regions_prata = (
    df_regions_bronze
    .withColumnRenamed("RegionID", "region_id")
    .withColumnRenamed("RegionDescription", "region_description")
)

print("Schema de regions:")
df_regions_prata.printSchema()
display(df_regions_prata.limit(10))

df_regions_prata.write.format("delta").mode("overwrite").save(f"{caminho_prata}regions")


Schema de regions:
root
 |-- region_id: integer (nullable = true)
 |-- region_description: string (nullable = true)
 |-- data_ingestao: timestamp (nullable = true)



region_id,region_description,data_ingestao
1,Eastern,2025-04-12T00:44:42.995+0000
2,Western,2025-04-12T00:44:42.995+0000
3,Northern,2025-04-12T00:44:42.995+0000
4,Southern,2025-04-12T00:44:42.995+0000


### Tabela Shippers

In [0]:
# Tabela: shippers
df_shippers_bronze = spark.read.format("delta").load(f"{caminho_bronze}shippers")

df_shippers_prata = (
    df_shippers_bronze
    .withColumnRenamed("ShipperID", "shipper_id")
    .withColumnRenamed("CompanyName", "shipper_name")
    .withColumnRenamed("Phone", "phone")
)

print("Schema de shippers:")
df_shippers_prata.printSchema()
display(df_shippers_prata.limit(10))

df_shippers_prata.write.format("delta").mode("overwrite").save(f"{caminho_prata}shippers")


Schema de shippers:
root
 |-- shipper_id: integer (nullable = true)
 |-- shipper_name: string (nullable = true)
 |-- phone: string (nullable = true)
 |-- data_ingestao: timestamp (nullable = true)



shipper_id,shipper_name,phone,data_ingestao
1,Speedy Express,(503) 555-9831,2025-04-12T00:44:52.854+0000
2,United Package,(503) 555-3199,2025-04-12T00:44:52.854+0000
3,Federal Shipping,(503) 555-9931,2025-04-12T00:44:52.854+0000


### Tabela Suppliers

In [0]:
from pyspark.sql import Row
from pyspark.sql.types import *
from pyspark.sql.functions import current_timestamp, col

input_suppliers = "/FileStore/tables/suppliers.csv"  # Caminho do CSV original
raw_suppliers_df = spark.read.text(input_suppliers).withColumnRenamed("value", "linha_original")
# Acrescentar a coluna data_ingestao (timestamp atual)
raw_suppliers_df = raw_suppliers_df.withColumn("data_ingestao", current_timestamp())

def parse_line_suppliers(row):
    data_ingestao = row["data_ingestao"]
    line = row["linha_original"]
    parts = line.strip().split(",")
    # Descarte o header
    if parts[0] == "supplierID":
        return None
    # Caso o número de campos seja exatamente o esperado (12), retorna a linha sem alteração
    if len(parts) == 12:
        return Row(*parts, data_ingestao)
    elif len(parts) == 13:

        if parts[2].startswith(" "):
            # Mescla companyName (campos 1 e 2)
            new_companyName = parts[1].strip() + ", " + parts[2].strip()
            # Monta a nova linha: [supplierID, new_companyName] + restantes a partir do índice 3
            new_parts = [parts[0], new_companyName] + parts[3:]
            return Row(*new_parts, data_ingestao)
        else:
            # Mescla o endereço: assume que os campos 4 e 5 devem ser unidos
            new_address = parts[4].strip() + ", " + parts[5].strip()
            # Monta nova linha: [campos 0 a 3] + [new_address] + os campos restantes a partir do índice 6
            new_parts = parts[:4] + [new_address] + parts[6:]
            return Row(*new_parts, data_ingestao)
    else:
        # Se o número de campos não for nem 12 nem 13, descarta a linha (ou implementa outra lógica)
        return None

parsed_suppliers_rdd = raw_suppliers_df.rdd.map(parse_line_suppliers).filter(lambda x: x is not None)

# Definir o schema esperado (todos os campos como String, exceto data_ingestao)
schema_suppliers = StructType([
    StructField("supplierID", StringType(), True),
    StructField("companyName", StringType(), True),
    StructField("contactName", StringType(), True),
    StructField("contactTitle", StringType(), True),
    StructField("address", StringType(), True),
    StructField("city", StringType(), True),
    StructField("region", StringType(), True),
    StructField("postalCode", StringType(), True),
    StructField("country", StringType(), True),
    StructField("phone", StringType(), True),
    StructField("fax", StringType(), True),
    StructField("homePage", StringType(), True),
    StructField("data_ingestao", TimestampType(), True)
])

df_suppliers = spark.createDataFrame(parsed_suppliers_rdd, schema=schema_suppliers)


df_suppliers_prata = (
    df_suppliers.withColumn("supplier_id", col("supplierID").cast("int"))
                .withColumn("supplier_name", col("companyName"))
                .withColumn("contact_name", col("contactName"))
                .withColumn("contact_title", col("contactTitle"))
                .withColumn("address", col("address"))
                .withColumn("city", col("city"))
                .withColumn("region", col("region"))
                .withColumn("postal_code", col("postalCode"))
                .withColumn("country", col("country"))
                .withColumn("phone", col("phone"))
                .withColumn("fax", col("fax"))
                .withColumn("homepage", col("homePage"))
                .select(
                    "supplier_id", "supplier_name", "contact_name", "contact_title",
                    "address", "city", "region", "postal_code", "country", "phone",
                    "fax", "homepage", "data_ingestao"
                )
)


df_suppliers_prata.write.format("delta") \
    .option("overwriteSchema", "true") \
    .mode("overwrite") \
    .save("/FileStore/silver/suppliers")

print("Schema de suppliers:")
df_suppliers_prata.printSchema()
display(df_suppliers_prata.limit(10))

print("Tabela suppliers salva na camada Prata em: /FileStore/silver/suppliers")


Schema de suppliers:
root
 |-- supplier_id: integer (nullable = true)
 |-- supplier_name: string (nullable = true)
 |-- contact_name: string (nullable = true)
 |-- contact_title: string (nullable = true)
 |-- address: string (nullable = true)
 |-- city: string (nullable = true)
 |-- region: string (nullable = true)
 |-- postal_code: string (nullable = true)
 |-- country: string (nullable = true)
 |-- phone: string (nullable = true)
 |-- fax: string (nullable = true)
 |-- homepage: string (nullable = true)
 |-- data_ingestao: timestamp (nullable = true)



supplier_id,supplier_name,contact_name,contact_title,address,city,region,postal_code,country,phone,fax,homepage,data_ingestao
1,Exotic Liquids,Charlotte Cooper,Purchasing Manager,49 Gilbert St.,London,,EC1 4SD,UK,(171) 555-2222,,,2025-04-12T00:46:42.028+0000
2,New Orleans Cajun Delights,Shelley Burke,Order Administrator,P.O. Box 78934,New Orleans,LA,70117,USA,(100) 555-4822,,#CAJUN.HTM#,2025-04-12T00:46:42.028+0000
3,Grandma Kelly's Homestead,Regina Murphy,Sales Representative,707 Oxford Rd.,Ann Arbor,MI,48104,USA,(313) 555-5735,(313) 555-3349,,2025-04-12T00:46:42.028+0000
4,Tokyo Traders,Yoshi Nagase,Marketing Manager,9-8 Sekimai Musashino-shi,Tokyo,,100,Japan,(03) 3555-5011,,,2025-04-12T00:46:42.028+0000
5,Cooperativa de Quesos 'Las Cabras',Antonio del Valle Saavedra,Export Administrator,Calle del Rosal 4,Oviedo,Asturias,33007,Spain,(98) 598 76 54,,,2025-04-12T00:46:42.028+0000
6,Mayumi's,Mayumi Ohno,Marketing Representative,92 Setsuko Chuo-ku,Osaka,,545,Japan,(06) 431-7877,,Mayumi's (on the World Wide Web)#http://www.microsoft.com/accessdev/sampleapps/mayumi.htm#,2025-04-12T00:46:42.028+0000
7,"Pavlova, Ltd.",Ian Devling,Marketing Manager,74 Rose St. Moonie Ponds,Melbourne,Victoria,3058,Australia,(03) 444-2343,(03) 444-6588,,2025-04-12T00:46:42.028+0000
8,"Specialty Biscuits, Ltd.",Peter Wilson,Sales Representative,29 King's Way,Manchester,,M14 GSD,UK,(161) 555-4448,,,2025-04-12T00:46:42.028+0000
9,PB Knäckebröd AB,Lars Peterson,Sales Agent,Kaloadagatan 13,Göteborg,,S-345 67,Sweden,031-987 65 43,031-987 65 91,,2025-04-12T00:46:42.028+0000
10,Refrescos Americanas LTDA,Carlos Diaz,Marketing Manager,Av. das Americanas 12.890,Sao Paulo,,5442,Brazil,(11) 555 4640,,,2025-04-12T00:46:42.028+0000


Tabela suppliers salva na camada Prata em: /FileStore/silver/suppliers


### Tabela Territories

In [0]:
# Tabela: territories
df_territories_bronze = spark.read.format("delta").load(f"{caminho_bronze}territories")

df_territories_prata = (
    df_territories_bronze
    .withColumnRenamed("TerritoryID", "territory_id")
    .withColumnRenamed("TerritoryDescription", "territory_description")
    .withColumnRenamed("RegionID", "region_id")
)

print("Schema de territories:")
df_territories_prata.printSchema()
display(df_territories_prata.limit(10))

df_territories_prata.write.format("delta").mode("overwrite").save(f"{caminho_prata}territories")


Schema de territories:
root
 |-- territory_id: integer (nullable = true)
 |-- territory_description: string (nullable = true)
 |-- region_id: integer (nullable = true)
 |-- data_ingestao: timestamp (nullable = true)



territory_id,territory_description,region_id,data_ingestao
1581,Westboro,1,2025-04-12T00:45:10.769+0000
1730,Bedford,1,2025-04-12T00:45:10.769+0000
1833,Georgetow,1,2025-04-12T00:45:10.769+0000
2116,Boston,1,2025-04-12T00:45:10.769+0000
2139,Cambridge,1,2025-04-12T00:45:10.769+0000
2184,Braintree,1,2025-04-12T00:45:10.769+0000
2903,Providence,1,2025-04-12T00:45:10.769+0000
3049,Hollis,3,2025-04-12T00:45:10.769+0000
3801,Portsmouth,3,2025-04-12T00:45:10.769+0000
6897,Wilton,1,2025-04-12T00:45:10.769+0000


### Conversão de data_ingestao
Como a coluna foi criada como timestamp, vamos converter para apenas data

In [0]:
from pyspark.sql.functions import to_date

# Lista das tabelas
tabelas = [
    "categories",
    "customers",
    "employees",
    "employee_territories",
    "order_details",
    "orders",
    "products",
    "regions",
    "shippers",
    "suppliers",
    "territories"
]

# Caminho base da camada prata
caminho_prata = "/FileStore/silver/"

for tabela in tabelas:
    caminho = f"{caminho_prata}{tabela}"
    
    # Leitura
    df = spark.read.format("delta").load(caminho)

    # Conversão do tipo de data_ingestao
    df = df.withColumn("data_ingestao", to_date("data_ingestao"))

    # Escrita com sobrescrita de esquema
    df.write.format("delta") \
        .mode("overwrite") \
        .option("overwriteSchema", "true") \
        .save(caminho)

    print(f"✅ Tabela '{tabela}' atualizada com 'data_ingestao' como DATE.")


✅ Tabela 'categories' atualizada com 'data_ingestao' como DATE.
✅ Tabela 'customers' atualizada com 'data_ingestao' como DATE.
✅ Tabela 'employees' atualizada com 'data_ingestao' como DATE.
✅ Tabela 'employee_territories' atualizada com 'data_ingestao' como DATE.
✅ Tabela 'order_details' atualizada com 'data_ingestao' como DATE.
✅ Tabela 'orders' atualizada com 'data_ingestao' como DATE.
✅ Tabela 'products' atualizada com 'data_ingestao' como DATE.
✅ Tabela 'regions' atualizada com 'data_ingestao' como DATE.
✅ Tabela 'shippers' atualizada com 'data_ingestao' como DATE.
✅ Tabela 'suppliers' atualizada com 'data_ingestao' como DATE.
✅ Tabela 'territories' atualizada com 'data_ingestao' como DATE.


In [0]:
from pyspark.sql.functions import col, min, max
import pandas as pd
import os

# Lista de tabelas da camada Silver
tabelas = [
    "categories",
    "customers",
    "employees",
    "employee_territories",
    "order_details",
    "orders",
    "products",
    "regions",
    "shippers",
    "suppliers",
    "territories"
]

# Caminho base da camada Silver
caminho_prata = "/FileStore/silver/"

# Lista para armazenar os metadados
catalogo_dados = []

# Geração do catálogo
for tabela in tabelas:
    df = spark.read.format("delta").load(f"{caminho_prata}{tabela}")
    for nome_coluna, tipo in df.dtypes:
        info = {"tabela": tabela, "coluna": nome_coluna, "tipo": tipo}
        if tipo in ['int', 'double', 'float', 'bigint']:
            stats = df.select(min(col(nome_coluna)).alias('min'), max(col(nome_coluna)).alias('max')).collect()[0]
            info["dominio"] = f"Mín: {stats['min']}, Máx: {stats['max']}"
        elif tipo.startswith("date") or tipo == "timestamp":
            stats = df.select(min(col(nome_coluna)).alias('min'), max(col(nome_coluna)).alias('max')).collect()[0]
            info["dominio"] = f"Data Mín: {stats['min']}, Data Máx: {stats['max']}"
        else:
            valores = df.select(nome_coluna).distinct().limit(10).rdd.flatMap(lambda x: x).collect()
            info["dominio"] = f"Exemplos: {valores}"
        catalogo_dados.append(info)

# Converte para DataFrame do Pandas
catalogo_df = pd.DataFrame(catalogo_dados)[["tabela", "coluna", "tipo", "dominio"]]

# Caminho para salvar no Databricks Community
caminho_saida = "/dbfs/FileStore/dictionary/dicionario_dados_northwind.csv"

# Cria diretório se não existir
os.makedirs(os.path.dirname(caminho_saida), exist_ok=True)

# Salva como CSV com acentuação correta
catalogo_df.to_csv(caminho_saida, index=False, encoding="utf-8-sig")

print("✅ Arquivo gerado com sucesso!")
print("Acesse o link para download:")
print("➡️ https://community.cloud.databricks.com/files/dictionary/dicionario_dados_northwind.csv")


✅ Arquivo gerado com sucesso!
Acesse o link para download:
➡️ https://community.cloud.databricks.com/files/dictionary/dicionario_dados_northwind.csv


### Momento de realizar o Load e criação do DW

In [0]:
spark.sql("CREATE DATABASE IF NOT EXISTS northwind_dw")

# Lista das tabelas
tabelas = [
    "categories",
    "customers",
    "employees",
    "employee_territories",
    "order_details",
    "orders",
    "products",
    "regions",
    "shippers",
    "suppliers",
    "territories"
]

# Caminho base da camada Silver
caminho_prata = "/FileStore/silver/"

# Loop para registrar cada tabela no catálogo do Databricks
for tabela in tabelas:
    df = spark.read.format("delta").load(f"{caminho_prata}{tabela}")
    # Registra como tabela permanente no Data Warehouse (northwind_dw)
    df.write.format("delta") \
      .mode("overwrite") \
      .saveAsTable(f"northwind_dw.{tabela}")
    print(f"Tabela {tabela} registrada no banco northwind_dw.")


Tabela categories registrada no banco northwind_dw.
Tabela customers registrada no banco northwind_dw.
Tabela employees registrada no banco northwind_dw.
Tabela employee_territories registrada no banco northwind_dw.
Tabela order_details registrada no banco northwind_dw.
Tabela orders registrada no banco northwind_dw.
Tabela products registrada no banco northwind_dw.
Tabela regions registrada no banco northwind_dw.
Tabela shippers registrada no banco northwind_dw.
Tabela suppliers registrada no banco northwind_dw.
Tabela territories registrada no banco northwind_dw.


## Análise e resposta das perguntas:
### Tendência de Vendas ao Longo do Tempo
Foi possível notar uma leve sazonalidade negativa nos meses de meio de ano. Enquanto que o mês de abril de 1998 foi o de melhor desempenho, esse desempenho foi acompanhado pelos seus meses precursores desde o mês de Outubro de 1997.
Também ficou claro que o desempenho da empresa vem crescendo no longo prazo, uma vez que o ano de 1996 é o ano que está com piores desempenhos, enquanto que o ano de 1998 é o com melhor desempenho.

In [0]:
%sql
SELECT 
    date_trunc('month', order_date) AS mes,
    SUM(od.quantity * od.unit_price) AS total_vendas
FROM northwind_dw.orders o
JOIN northwind_dw.order_details od 
  ON o.order_id = od.order_id
GROUP BY date_trunc('month', order_date)
ORDER BY total_vendas DESC;

mes,total_vendas
1998-04-01T00:00:00.000+0000,134630.56
1998-03-01T00:00:00.000+0000,109825.44999999998
1998-02-01T00:00:00.000+0000,104561.94999999995
1998-01-01T00:00:00.000+0000,100854.72
1997-12-01T00:00:00.000+0000,77476.26
1997-10-01T00:00:00.000+0000,70328.50000000001
1997-01-01T00:00:00.000+0000,66692.8
1997-09-01T00:00:00.000+0000,59733.02
1997-05-01T00:00:00.000+0000,56823.7
1997-04-01T00:00:00.000+0000,55699.39000000001


## Análise e resposta das perguntas:
### Vendas por Região
Essa foi uma das análises que foi prejudicada pela má ingestão de dados, provavelmente proveniente de um campo livre no sistema interno da empresa. Isso por que a coluna de região tem diversas informações nulas e/ou pouco padrozinadas. Enquanto Rio de Janeiro e Québec são regiões, temos estados com siglas como WA e OR. Dessa forma pouco pode ser presumido com relação a qual região que gera mais lucros. 
O fato é que podemos apenas afirmar que a região que tem a melhor padronização versus vendas parece ser a do Rio de Janeiro. No entanto o analista de dados deve refletir se Rio de Janeiro e São Paulo não deveriam ser tratados como uma região única chamada Brazil, ou até mesmo América do Sul ou Latina. Isso por que a empresa não é Brasileira e isso pode refletir melhor a sua posição global.

In [0]:
%sql
SELECT 
    c.region AS regiao,
    SUM(od.quantity * od.unit_price) AS total_vendas
FROM northwind_dw.orders o
JOIN northwind_dw.order_details od 
  ON o.order_id = od.order_id
JOIN northwind_dw.customers c 
  ON o.customer_id = c.customer_id
GROUP BY c.region
ORDER BY total_vendas DESC;


regiao,total_vendas
,678427.7500000001
ID,115673.39
Co. Cork,57317.39
Rio de Janeiro,53999.18
NM,52245.9
Sao Paulo,45786.37
Québec,32203.9
WA,31001.65
OR,30393.93
Charleroi,24704.4


## Análise e resposta das perguntas:
### Contribuição de Cada Categoria nas Vendas
Por fim, essa é uma análise que foi possível ser feita de modo completo uma vez que a tabela de categorias é uma das tabelas mais consistentes da base de dados da Northwind. 
É possível organizar o altíssimo desempenho de bebidas e produtos do dia a dia frente aos outros produtos da empresa. Uma vez que a categoria Bebidas demonstrou um ganho de cerca de 60% frente as carnes que são o 3º produto mais vendido pela empresa. Enquanto os dois produtos menos vendidos combinados não consegue nem chegar ao nível do segundo produto mais vendido, Dairy Products (produtos do dia a dia)

In [0]:
%sql
SELECT 
    cat.category_name,
    SUM(od.quantity * od.unit_price) AS total_vendas_categoria
FROM northwind_dw.order_details od
JOIN northwind_dw.products p 
  ON od.product_id = p.product_id
JOIN northwind_dw.categories cat 
  ON p.category_id = cat.category_id
GROUP BY cat.category_name
ORDER BY total_vendas_categoria DESC;


category_name,total_vendas_categoria
Beverages,286526.94999999995
Dairy Products,251330.5
Meat/Poultry,178188.80000000002
Confections,177099.09999999995
Seafood,141623.09000000003
Condiments,113694.75
Produce,105268.6
Grains/Cereals,100726.8
