In [12]:
import pandas as pd
import re
import math
from sqlalchemy import create_engine, text

In [13]:
CONN_STR = "postgresql+psycopg2://estufas_user:estufas_pass_123@localhost:5432/estufas_kibala"
engine = create_engine(CONN_STR)

In [14]:
# ---------- helper ----------
def cultura_key(s: str) -> str:
    s = str(s).strip()
    s = re.sub(r"\s*\(.*\)\s*$", "", s)   # remove "(...)"
    s = re.sub(r"\.\s*$", "", s)         # remove ponto no fim
    s = re.sub(r"\s+", " ", s)           # normaliza espa√ßos
    return s

In [16]:
ddl = """
CREATE SCHEMA IF NOT EXISTS silver;

DROP TABLE IF EXISTS silver.fact_colheita_linha;

CREATE TABLE silver.fact_colheita_linha (
  ano_semana TEXT NOT NULL,
  ano        INT  NOT NULL,
  semana     INT  NOT NULL,

  bloco_id   INT  NOT NULL,
  cultura_id INT  NOT NULL,

  -- üîë chave l√≥gica do resultado
  tipo_resultado TEXT NOT NULL, -- SEMANAL | ACUMULADO | TOTALIZADOR

  -- m√©tricas
  peso_normal_kg NUMERIC(12,2),
  caixas_normal  NUMERIC(12,2),
  peso_refugo_kg NUMERIC(12,2),
  caixas_refugo  NUMERIC(12,2),

  -- üîê gr√£o correto
  CONSTRAINT pk_fact_colheita_linha
    PRIMARY KEY (ano_semana, bloco_id, cultura_id, tipo_resultado)
);
"""
with engine.begin() as conn:
    conn.execute(text(ddl))

print("‚úÖ silver.fact_colheita_linha recriada com sucesso")


‚úÖ silver.fact_colheita_linha recriada com sucesso


In [17]:
sql_bronze = """
SELECT
  ano,
  semana,
  bloco::int AS bloco_id,
  TRIM(cultura) AS cultura,
  UPPER(TRIM(tipo_resultado)) AS tipo_resultado,
  caixas_normal,
  peso_normal_kg,
  caixas_refugo,
  peso_refugo_kg
FROM bronze.colheitas_bronze
WHERE ano IS NOT NULL
  AND semana IS NOT NULL
  AND bloco IS NOT NULL
  AND cultura IS NOT NULL
  AND TRIM(cultura) <> ''
  AND tipo_resultado IS NOT NULL
  AND TRIM(tipo_resultado) <> '';
"""
df = pd.read_sql(sql_bronze, engine)
print("BRONZE colheita rows:", len(df))
print(df["tipo_resultado"].value_counts(dropna=False).head(20))

BRONZE colheita rows: 5598
tipo_resultado
SEMANAL      2799
ACUMULADO    2799
Name: count, dtype: int64


In [18]:
df["ano"] = pd.to_numeric(df["ano"], errors="coerce").astype("Int64")
df["semana"] = pd.to_numeric(df["semana"], errors="coerce").astype("Int64")
df["bloco_id"] = pd.to_numeric(df["bloco_id"], errors="coerce").astype("Int64")

df["ano_semana"] = df["ano"].astype(str) + "-" + df["semana"].astype(str).str.zfill(2)

# normaliza tipo_resultado para apenas 3 labels fixos
map_tipo = {
    "SEMANAL": "SEMANAL",
    "ACUMULADO": "ACUMULADO",
    "TOTALIZADOR": "TOTALIZADOR",
}
df["tipo_resultado"] = df["tipo_resultado"].map(map_tipo).fillna(df["tipo_resultado"])

# opcional: manter s√≥ os 3 esperados (evita sujeira)
df = df[df["tipo_resultado"].isin(["SEMANAL", "ACUMULADO", "TOTALIZADOR"])].copy()

df["cultura_key"] = df["cultura"].apply(cultura_key)

In [19]:
dim = pd.read_sql("SELECT cultura_id, cultura_nome FROM silver.dim_cultura", engine)
dim["cultura_key"] = dim["cultura_nome"].apply(cultura_key)

map_cult = dict(zip(dim["cultura_key"], dim["cultura_id"]))
df["cultura_id"] = df["cultura_key"].map(map_cult)

print("Mapeadas:", df["cultura_id"].notna().sum(), "| N√£o mapeadas:", df["cultura_id"].isna().sum())
if df["cultura_id"].isna().any():
    print("\nCulturas n√£o mapeadas (amostra):")
    print(df.loc[df["cultura_id"].isna(), "cultura"].drop_duplicates().head(50).to_string(index=False))


Mapeadas: 5598 | N√£o mapeadas: 0


In [20]:
for col in ["caixas_normal","peso_normal_kg","caixas_refugo","peso_refugo_kg"]:
    df[col] = pd.to_numeric(df[col], errors="coerce").fillna(0)

# mant√©m s√≥ linhas mapeadas
df = df[df["cultura_id"].notna()].copy()
df["cultura_id"] = df["cultura_id"].astype("Int64")

In [21]:
fact = (
    df.groupby(["ano_semana","ano","semana","bloco_id","cultura_id","tipo_resultado"], as_index=False)
      .agg({
          "peso_normal_kg":"sum",
          "caixas_normal":"sum",
          "peso_refugo_kg":"sum",
          "caixas_refugo":"sum"
      })
)

print("FACT (agrupada) linhas:", len(fact))
print(fact["tipo_resultado"].value_counts())
fact.head()


FACT (agrupada) linhas: 5570
tipo_resultado
ACUMULADO    2785
SEMANAL      2785
Name: count, dtype: int64


Unnamed: 0,ano_semana,ano,semana,bloco_id,cultura_id,tipo_resultado,peso_normal_kg,caixas_normal,peso_refugo_kg,caixas_refugo
0,2025-01,2025,1,1,11,ACUMULADO,56.0,2.0,0.0,0.0
1,2025-01,2025,1,1,11,SEMANAL,0.0,0.0,0.0,0.0
2,2025-01,2025,1,1,12,ACUMULADO,504.0,28.0,0.0,0.0
3,2025-01,2025,1,1,12,SEMANAL,0.0,0.0,0.0,0.0
4,2025-01,2025,1,4,26,ACUMULADO,4945.0,0.0,0.0,0.0


In [22]:
sql_upsert = text("""
INSERT INTO silver.fact_colheita_linha (
  ano_semana, ano, semana, bloco_id, cultura_id, tipo_resultado,
  peso_normal_kg, caixas_normal, peso_refugo_kg, caixas_refugo
)
VALUES (
  :ano_semana, :ano, :semana, :bloco_id, :cultura_id, :tipo_resultado,
  :peso_normal_kg, :caixas_normal, :peso_refugo_kg, :caixas_refugo
)
ON CONFLICT (ano_semana, bloco_id, cultura_id, tipo_resultado)
DO UPDATE SET
  ano = EXCLUDED.ano,
  semana = EXCLUDED.semana,
  peso_normal_kg = EXCLUDED.peso_normal_kg,
  caixas_normal = EXCLUDED.caixas_normal,
  peso_refugo_kg = EXCLUDED.peso_refugo_kg,
  caixas_refugo = EXCLUDED.caixas_refugo;
""")

records = fact.to_dict("records")

# limpa NaN/NaT no dict (defensivo)
for r in records:
    for k, v in list(r.items()):
        if isinstance(v, float) and (math.isnan(v) or math.isinf(v)):
            r[k] = None
        elif pd.isna(v):
            r[k] = None

with engine.begin() as conn:
    conn.execute(sql_upsert, records)

print("‚úÖ silver.fact_colheita_linha carregada. Linhas:", len(records))


‚úÖ silver.fact_colheita_linha carregada. Linhas: 5570
