## Bronze Layer

#### Criei um volume delta já que no free tier o acesso a mnt/ é negado

#### Verificando os dados de teste disponíveis

In [0]:
%py
# Definir caminhos base
# BRONZE_PATH = "/mnt/delta/bronze"
BRONZE_PATH = "/Volumes/workspace/default/delta/bronze/retail"
SILVER_PATH = "/Volumes/workspace/default/delta/silver/retail"


# Criando diretório Silver
# Definir caminhos base


In [0]:
display(dbutils.fs.ls(BRONZE_PATH))


### Dataset retail silver


In [0]:
silver_map = {
    "tmp_customers": f"{BRONZE_PATH}/customers",
    "tmp_company_employees": f"{BRONZE_PATH}/company_employees",
    "tmp_active_promotions": f"{BRONZE_PATH}/active_promotions",
    "tmp_loyalty_segment": f"{BRONZE_PATH}/loyalty_segment",
    "tmp_products": f"{BRONZE_PATH}/products",
    "tmp_promotions": f"{BRONZE_PATH}/promotions",
    "tmp_purchase_orders": f"{BRONZE_PATH}/purchase_orders",
    "tmp_sales_orders": f"{BRONZE_PATH}/sales_orders",
    "tmp_sales_stream": f"{BRONZE_PATH}/sales_stream",
    "tmp_suppliers": f"{BRONZE_PATH}/suppliers"
}

for view_name, path in silver_map.items():
    print(f"Loading {view_name} from {path}")
    spark.read.format("delta").load(path).createOrReplaceTempView(view_name)
# DBTITLE 1,customers

### Lendo as tabelas

In [0]:
%sql
SELECT * FROM tmp_suppliers 

#### Explodindo a tabela para armazenar produtos por fornecedor em python

In [0]:
from pyspark.sql import functions as F

df_exploded = (
    _sqldf 
    # transforma string em array usando a vírgula como separador
    .withColumn("item_array", F.split(F.col("items_provided"), ","))
    # explode a coluna array em múltiplas linhas
    .withColumn("item", F.explode(F.col("item_array")))
    # opcional: remover espaços extras
    .withColumn("item", F.trim(F.col("item")))
    # remover coluna temporária
    .drop("item_array", "items_provided", "street", "number", "unit", "region", "district", "postcode", "city", "state", "lat", "lon", "street_ship", "number_ship", "unit_ship", "region_ship", "district_ship", "postcode_ship", "city_ship", "state_ship", "lat_ship", "lon_ship")
)

display(df_exploded)

#### O mesmo processo agora em sql

In [0]:
%sql
SELECT
    o.SUPPLIER_ID,
    o.TAX_ID,
    o.supplier_name,
    TRIM(i) AS item
FROM tmp_suppliers o
LATERAL VIEW EXPLODE(SPLIT(o.items_provided, ',')) AS i


In [0]:
%sql
CREATE OR REPLACE TABLE workspace.retail.silver_supplier_items
USING DELTA
AS
SELECT
    o.SUPPLIER_ID,
    o.TAX_ID,
    o.supplier_name,
    TRIM(i) AS item
FROM tmp_suppliers o
LATERAL VIEW EXPLODE(SPLIT(o.items_provided, ',')) AS i

#### Salvando tabela de localização e tax id dos fornecedores

In [0]:
%sql
SELECT * FROM tmp_suppliers 

In [0]:
df_supplier = _sqldf.select("SUPPLIER_ID", "TAX_ID", "supplier_name", "lat", "lon")

#####  - Consulta a api que retorna o endereço pelas coordenadas
#####  - Conceito de cache, avitar consultas desnecessárias em api's

lat = 33.985047200000004

lon = -118.2286537

f"https://nominatim.openstreetmap.org/reverse?lat=-{lat}&lon={lon}&format=json"

!head -n 20 {CACHE_FILE} 


In [0]:
import json
import requests
import time
from pathlib import Path

# caminho para salvar o cache
CACHE_FILE = f"{BRONZE_PATH}/suppliers_geocache.json"

# carrega o cache existente (ou cria vazio)
if Path(CACHE_FILE).exists():
    with open(CACHE_FILE, "r") as f:
        geocache = json.load(f)
else:
    geocache = {}

for row in df_supplier.collect():  # .limit(10)
    lat = str(row["lat"])
    lon = str(row["lon"])
    key = f"{lat},{lon}"

    if key in geocache:
        print(f"🔁 Cache hit: {key}")
        addr = geocache[key]
    else:
        url = f"https://nominatim.openstreetmap.org/reverse?lat={lat}&lon={lon}&format=json"
        r = requests.get(url, headers={'User-Agent': 'databricks-app'})
        data = r.json()
        addr = data.get("address", {})
        geocache[key] = {
            "road": addr.get("road"),
            "town": addr.get("town"),
            "state": addr.get("state"),
            "postcode": addr.get("postcode"),
            "country": addr.get("country")
        }
        print(f"✅ Nova requisição: {key}")
        time.sleep(1)

    print({
        "SUPPLIER_ID": row["SUPPLIER_ID"],
        **geocache[key]
    })

# salva o cache atualizado
with open(CACHE_FILE, "w") as f:
    json.dump(geocache, f, indent=2)


In [0]:
import json

with open(CACHE_FILE, "r") as f:
    cache_dict = json.load(f)

# Exemplo de como ele fica
print(cache_dict)


In [0]:
from pyspark.sql import Row

# Transforma o dicionário em lista de linhas com colunas separadas
rows = []
for coords, addr in cache_dict.items():
    lat, lon = map(float, coords.split(","))
    rows.append(Row(
        lat=lat,
        lon=lon,
        road=addr.get("road"),
        town=addr.get("town"),
        state=addr.get("state"),
        postcode=addr.get("postcode"),
        country=addr.get("country")
    ))

# Cria o DataFrame Spark
cache_supplier = spark.createDataFrame(rows)
display(cache_supplier)


In [0]:
# Faz o join entre df_supplier e cache_supplier
df_joined = df_supplier.join(
    cache_supplier,
    (df_supplier.lat == cache_supplier.lat) & (df_supplier.lon == cache_supplier.lon),
    "left"  # mantém todos os suppliers mesmo que não tenha endereço no cache
)

# Remove colunas duplicadas vindas do cache
df_joined = df_joined.drop(cache_supplier.lat).drop(cache_supplier.lon)

# Visualiza o resultado
display(df_joined)

In [0]:
# Salva a tabela na camada Silver
df_joined.write.mode("overwrite").saveAsTable("workspace.retail.silver_suppliers")
