Este notebook vai gerar:

1- nulos por coluna

2- min/max/mean (numéricos)

3- top categorias (categóricos)

4- checagens de integridade

5- calcula total uma vez

6- converte min/max/avg sempre para float (ou None)

7- converte top_values para string (JSON-like) para caber num schema simples

8- evita erro de merge de tipos

9- converter qualquer valor “top” para string antes de json.dumps

4.1 Função de profiling

In [0]:
from pyspark.sql import functions as F
import json

NUMERIC_TYPES = {"int","bigint","double","float","decimal","smallint","tinyint","long"}

def _to_float(x):
    if x is None:
        return None
    try:
        return float(x)
    except Exception:
        return None

def profile_table(table_name, top_n=10):
    df = spark.table(table_name)
    total = df.count()

    rows = []
    for c, t in df.dtypes:
        nulls = df.filter(F.col(c).isNull()).count()
        distinct = df.select(c).distinct().count()

        info = {
            "table": table_name,
            "column": c,
            "type": t,
            "rows": int(total),
            "nulls": int(nulls),
            "null_pct": (nulls/total) if total else None,
            "distinct": int(distinct),
            "min": None,
            "max": None,
            "avg": None,
            "top_values": None
        }

        if t.lower() in NUMERIC_TYPES:
            stats = df.select(
                F.min(F.col(c)).alias("min"),
                F.max(F.col(c)).alias("max"),
                F.avg(F.col(c)).alias("avg")
            ).collect()[0]

            info["min"] = _to_float(stats["min"])
            info["max"] = _to_float(stats["max"])
            info["avg"] = _to_float(stats["avg"])

        else:
            top = (df.groupBy(F.col(c)).count()
                     .orderBy(F.desc("count"))
                     .limit(top_n))
            vals = [row[0] for row in top.collect()]
            vals_str = [None if v is None else str(v) for v in vals] # converte datetime, date, etc.
            info["top_values"] = json.dumps(vals_str, ensure_ascii=False)

        rows.append(info)

    return rows

tables_to_profile = [
    "gold_fato_pedido_item",
    "gold_dim_produto",
    "gold_dim_cliente",
    "gold_dim_vendedor",
    "gold_dim_pedido",
    "bronze_olist_reviews",
    "bronze_olist_payments",
]

profiles = []
for t in tables_to_profile:
    profiles.extend(profile_table(t))

profile_df = spark.createDataFrame(profiles)
display(profile_df)




avg,column,distinct,max,min,null_pct,nulls,rows,table,top_values,type
,order_id,98666,,,0.0,0,112650,gold_fato_pedido_item,"[""8272b63d03f5f79c56e9e4120aec44ef"", ""ab14fdcfbe524636d65ee38360e22ce8"", ""1b15974a0141d54e36626dca3fdc731a"", ""9ef13efd6949e4573a18964dd1bbe7f5"", ""428a2f660dc84138d969ccd69a0ab6d5"", ""9bdc4d4c71aa1de4606060929dee888c"", ""73c8ab38f07dc94389065f7eba4f297a"", ""37ee401157a3a0b28c9c6d0ed8c3b24b"", ""c05d6a79e55da72ca780ce90364abed9"", ""af822dacd6f5cff7376413c03a388bb7""]",string
1.1978339991122948,order_item_id,21,21.0,1.0,0.0,0,112650,gold_fato_pedido_item,,bigint
,product_id,32951,,,0.0,0,112650,gold_fato_pedido_item,"[""aca2eb7d00ea1a7b8ebd4e68314663af"", ""99a4788cb24856965c36a24e339b6058"", ""422879e10f46682990de24d770e7f83d"", ""389d119b48cf3043d311335e499d9c6b"", ""368c6c730842d78016ad823897a372db"", ""53759a2ecddad2bb87a079a1f1519f73"", ""d1c427060a0f73f6b889a5c7c61f2ac4"", ""53b36df67ebb7c41585e8d54d6772e08"", ""154e7e31ebfa092203795c972e5804a6"", ""3dd2a17168ec895c781a9191c1e95ad7""]",string
,seller_id,3095,,,0.0,0,112650,gold_fato_pedido_item,"[""6560211a19b47992c3666cc44a7e94c0"", ""4a3ca9315b744ce9f8e9374361493884"", ""1f50f920176fa81dab994f9023523100"", ""cc419e0650a3c5ba77189a1882b7556a"", ""da8622b14eb17ae2831f4ac5b9dab84a"", ""955fee9216a65b617aa5c0531780ce60"", ""1025f0e2d44d7041d6cf58b6550e0bfa"", ""7c67e1448b00f6e969d365cea6b010ab"", ""ea8482cd71df3c1969d7b9473ff13abc"", ""7a67c85e85bb2ce8582c35f2203ad736""]",string
,shipping_limit_date,93318,,,0.0,0,112650,gold_fato_pedido_item,"[""2018-03-01 02:50:48"", ""2017-07-21 18:25:23"", ""2017-08-30 14:30:23"", ""2017-11-30 10:30:51"", ""2017-02-03 21:44:49"", ""2017-12-21 02:30:41"", ""2018-02-28 11:48:12"", ""2018-06-13 17:30:35"", ""2018-04-19 02:30:52"", ""2018-04-25 22:11:43""]",timestamp
120.65373901464896,price,5968,6735.0,0.85,0.0,0,112650,gold_fato_pedido_item,,double
19.99031992898358,freight_value,6999,409.68,0.0,0.0,0,112650,gold_fato_pedido_item,,double
,customer_id,98666,,,0.0,0,112650,gold_fato_pedido_item,"[""fc3d1daec319d62d49bfb5e1f83123e9"", ""bd5d39761aa56689a265d95d8d32b8be"", ""be1b70680b9f9694d8c70f41fa3dc92b"", ""adb32467ecc74b53576d9d13a5a55891"", ""10de381f8a8d23fff822753305f71cae"", ""a7693fba2ff9583c78751f2b66ecab9d"", ""d5f2b3f597c7ccafbb5cac0bcc3d6024"", ""7d321bd4e8ba1caf74c4c1aabd9ae524"", ""3b54b5978e9ace64a63f90d176ffb158"", ""9eb3d566e87289dcb0acf28e1407c839""]",string
,order_approved_at,90175,,,0.00013315579227696404,15,112650,gold_fato_pedido_item,"[""2018-02-24 03:20:27"", ""2017-07-17 18:25:23"", ""2017-08-24 14:30:23"", ""2018-06-08 19:31:06"", ""2017-12-15 02:30:41"", ""2017-11-24 10:31:10"", null, ""2017-01-30 22:33:45"", ""2018-02-22 11:48:42"", ""2018-04-14 02:31:43""]",timestamp
,order_delivered_carrier_date,81018,,,0.0105992010652463,1194,112650,gold_fato_pedido_item,"[null, ""2018-05-09 15:48:00"", ""2018-05-10 18:29:00"", ""2018-08-08 15:01:00"", ""2018-05-07 12:31:00"", ""2017-07-20 15:45:53"", ""2018-03-02 00:18:01"", ""2017-08-25 20:07:36"", ""2018-06-08 14:40:00"", ""2018-08-15 12:53:00""]",timestamp


4.2 Exportar o catálogo para Markdown

In [0]:
import pandas as pd
import math
from pyspark.sql import Row

pdf = profile_df.toPandas()

def fmt(v):
    if v is None:
        return ""
    if isinstance(v, float):
        if math.isnan(v):
            return ""
        return f"{v:.4f}"
    return str(v)

def df_to_md_table(df, cols):
    # header
    lines = []
    lines.append("| " + " | ".join(cols) + " |")
    lines.append("| " + " | ".join(["---"] * len(cols)) + " |")
    # rows
    for _, r in df[cols].iterrows():
        lines.append("| " + " | ".join(fmt(r[c]).replace("\n"," ") for c in cols) + " |")
    return "\n".join(lines)

md_lines = ["# Catálogo e Qualidade de Dados (gerado automaticamente)\n"]

cols = ["column","type","rows","nulls","null_pct","distinct","min","max","avg","top_values"]

for table in pdf["table"].unique():
    md_lines.append(f"## {table}\n")
    sub = pdf[pdf["table"] == table].copy()
    for c in cols:
        if c not in sub.columns:
            sub[c] = None
    md_lines.append(df_to_md_table(sub, cols))
    md_lines.append("\n")

md_text = "\n".join(md_lines)

# ✅ Melhor prática sem DBFS: persistir o Markdown como tabela Delta (1 linha)
spark.createDataFrame([Row(doc_name="catalogo_dados.md", content=md_text)]) \
     .write.format("delta").mode("overwrite").saveAsTable("gold_docs_markdown")

# ✅ (opcional, recomendado): persistir também o profiling estruturado
profile_df.write.format("delta").mode("overwrite").saveAsTable("gold_data_catalog_profile")

md_text[:1000]



'# Catálogo e Qualidade de Dados (gerado automaticamente)\n\n## gold_fato_pedido_item\n\n| column | type | rows | nulls | null_pct | distinct | min | max | avg | top_values |\n| --- | --- | --- | --- | --- | --- | --- | --- | --- | --- |\n| order_id | string | 112650 | 0 | 0.0000 | 98666 |  |  |  | ["8272b63d03f5f79c56e9e4120aec44ef", "ab14fdcfbe524636d65ee38360e22ce8", "1b15974a0141d54e36626dca3fdc731a", "9ef13efd6949e4573a18964dd1bbe7f5", "428a2f660dc84138d969ccd69a0ab6d5", "9bdc4d4c71aa1de4606060929dee888c", "73c8ab38f07dc94389065f7eba4f297a", "37ee401157a3a0b28c9c6d0ed8c3b24b", "c05d6a79e55da72ca780ce90364abed9", "af822dacd6f5cff7376413c03a388bb7"] |\n| order_item_id | bigint | 112650 | 0 | 0.0000 | 21 | 1.0000 | 21.0000 | 1.1978 |  |\n| product_id | string | 112650 | 0 | 0.0000 | 32951 |  |  |  | ["aca2eb7d00ea1a7b8ebd4e68314663af", "99a4788cb24856965c36a24e339b6058", "422879e10f46682990de24d770e7f83d", "389d119b48cf3043d311335e499d9c6b", "368c6c730842d78016ad823897a372db", "53759