In [5]:
import pandas as pd, csv, textwrap
from pathlib import Path
from collections import defaultdict, deque

# -----------------------------
# 1) Ejemplo de insumo_base
#    Estructura: tabla_origen, tabla_destino, query
# -----------------------------
insumo_rows = [
    ("s_core.clientes", "proceso_bipa_vpr.stg_clientes",
     "create table proceso_bipa_vpr.stg_clientes as select * from s_core.clientes;"),
    ("s_core.prestamos", "proceso_bipa_vpr.stg_prestamos",
     "create table proceso_bipa_vpr.stg_prestamos as select * from s_core.prestamos;"),
    ("proceso_bipa_vpr.stg_clientes", "proceso_bipa_vpr.dim_clientes",
     "create table proceso_bipa_vpr.dim_clientes as select ... from proceso_bipa_vpr.stg_clientes;"),
    ("proceso_bipa_vpr.stg_prestamos", "proceso_bipa_vpr.fact_prestamos_stg",
     "create table proceso_bipa_vpr.fact_prestamos_stg as select ... from proceso_bipa_vpr.stg_prestamos;"),
    ("proceso_bipa_vpr.dim_clientes", "resultados_bipa_vpr.tb_fact_prestamos",
     "create table resultados_bipa_vpr.tb_fact_prestamos as select ... from proceso_bipa_vpr.fact_prestamos_stg fp join proceso_bipa_vpr.dim_clientes dc on ...;"),
    ("proceso_bipa_vpr.fact_prestamos_stg", "resultados_bipa_vpr.tb_fact_prestamos",
     "create table resultados_bipa_vpr.tb_fact_prestamos as select ... from proceso_bipa_vpr.fact_prestamos_stg fp join proceso_bipa_vpr.dim_clientes dc on ...;"),

    ("s_core.pagos", "proceso_bipa_vpr.stg_pagos",
     "create table proceso_bipa_vpr.stg_pagos as select * from s_core.pagos;"),
    ("proceso_bipa_vpr.stg_pagos", "proceso_bipa_vpr.fact_pagos_stg",
     "create table proceso_bipa_vpr.fact_pagos_stg as select ... from proceso_bipa_vpr.stg_pagos;"),
    ("proceso_bipa_vpr.fact_pagos_stg", "resultados_bipa_vpr.tb_fact_pagos",
     "create table resultados_bipa_vpr.tb_fact_pagos as select ... from proceso_bipa_vpr.fact_pagos_stg;"),
    ("proceso_bipa_vpr.dim_clientes", "resultados_bipa_vpr.tb_fact_pagos",
     "create table resultados_bipa_vpr.tb_fact_pagos as select ... from proceso_bipa_vpr.fact_pagos_stg fp join proceso_bipa_vpr.dim_clientes dc on ...;"),

    ("resultados_bipa_vpr.tb_fact_prestamos", "resultados_bipa_vpr.tb_reporte_mensual",
     "create table resultados_bipa_vpr.tb_reporte_mensual as select ... from resultados_bipa_vpr.tb_fact_prestamos;"),
    ("resultados_bipa_vpr.tb_fact_pagos", "resultados_bipa_vpr.tb_reporte_mensual",
     "create table resultados_bipa_vpr.tb_reporte_mensual as select ... from resultados_bipa_vpr.tb_fact_pagos;"),

    ("s_core.cartera", "proceso_riesgo.stg_cartera",
     "create table proceso_riesgo.stg_cartera as select * from s_core.cartera;"),
    ("proceso_riesgo.stg_cartera", "proceso_riesgo.mora_clientes",
     "create table proceso_riesgo.mora_clientes as select ... from proceso_riesgo.stg_cartera;"),
    ("proceso_riesgo.mora_clientes", "resultados_riesgo.tb_alertas_mora",
     "create table resultados_riesgo.tb_alertas_mora as select ... from proceso_riesgo.mora_clientes;"),
]
insumo_base = pd.DataFrame(insumo_rows, columns=["tabla_origen","tabla_destino","query"])



In [3]:
# -----------------------------
# 2) Utilidades: zona, color e IDs
# -----------------------------
def parse_schema(fqn: str):
    fqn = (fqn or "").strip().lower()
    if "." in fqn:
        return fqn.split(".",1)[0]
    return fqn

def zone_prefix(fqn: str):
    schema = parse_schema(fqn)
    if schema.startswith("s_"):
        return "S"
    if schema.startswith("proceso"):
        return "P"
    if schema.startswith("resultados"):
        return "R"
    return "U"

def zone_color(fqn: str):
    z = zone_prefix(fqn)
    if z == "S":
        return "#e74c3c"
    if z == "P":
        return "#f1c40f"
    if z == "R":
        return "#2ecc71"
    return "#95a5a6"

def svg_escape(s: str):
    return (str(s)
            .replace("&","&amp;")
            .replace("<","&lt;")
            .replace(">","&gt;")
            .replace('"',"&quot;"))


def upstream_closure(target: str):
    """Retorna (nodos, aristas) del upstream completo del target."""
    target = target.strip()
    visited = set([target])
    q = deque([target])
    edges = []
    while q:
        d = q.popleft()
        for s, qry in incoming.get(d, []):
            edges.append((s, d, qry))
            if s not in visited:
                visited.add(s)
                q.append(s)
    return visited, edges

def assign_ids_per_objective(nodes, target):
    """
    IDs reinician por objetivo.
    Secuencia separada por zona: S1.., P1.., R1.. (y U1.. si aplica).
    Orden determinístico: más upstream primero, luego alfabético.
    """
    _, edges = upstream_closure(target)
    level = {target: 0}
    for _ in range(80):
        changed = False
        for s, d, _ in edges:
            if d in level:
                cand = level[d] + 1
                if s not in level or cand > level[s]:
                    level[s] = cand
                    changed = True
        if not changed:
            break

    def sort_key(n):
        return (-level.get(n, -1), n.lower())

    ordered = sorted(nodes, key=sort_key)

    counters = {"S":1, "P":1, "R":1, "U":1}
    node_id = {}
    for n in ordered:
        z = zone_prefix(n)
        node_id[n] = f"{z}{counters[z]}"
        counters[z] += 1
    return node_id

def build_svg_for_target(target, nodes, edges, node_id_map):
    # Layout jerárquico por niveles para no sobreponer nodos.
    level = {target: 0}
    for _ in range(120):
        changed = False
        for s, d, _ in edges:
            if d in level:
                cand = level[d] + 1
                if s not in level or cand > level[s]:
                    level[s] = cand
                    changed = True
        if not changed:
            break
    max_level = max(level.values()) if level else 0

    dx = 280
    node_w, node_h = 255, 70
    x_level = {n: (max_level - level.get(n, max_level))*dx for n in nodes}

    buckets = defaultdict(list)
    for n in nodes:
        buckets[level.get(n, max_level)].append(n)
    for k in buckets:
        buckets[k].sort(key=lambda n: n.lower())

    y_pos = {}
    for k, ns in buckets.items():
        for i, n in enumerate(ns):
            y_pos[n] = 60 + i*130

    width = (max_level+1)*dx + 420
    height = (max(y_pos.values()) if y_pos else 0) + 220

    parts = []
    parts.append('<div style="width:100%;height:100%;overflow:auto;background:#ffffff;">')
    parts.append(f'<svg xmlns="http://www.w3.org/2000/svg" width="{width}" height="{height}" style="background:#ffffff;">')
    parts.append("""
<defs>
  <marker id="arrow" viewBox="0 0 10 10" refX="10" refY="5" markerWidth="7" markerHeight="7" orient="auto-start-reverse">
    <path d="M 0 0 L 10 5 L 0 10 z" fill="#555"></path>
  </marker>
</defs>
""".strip())

    # edges
    for s, d, _ in edges:
        x1 = x_level[s] + node_w
        y1 = y_pos[s] + node_h/2
        x2 = x_level[d]
        y2 = y_pos[d] + node_h/2
        mx = (x1 + x2)/2
        parts.append(f'<path d="M {x1} {y1} C {mx} {y1}, {mx} {y2}, {x2} {y2}" stroke="#555" stroke-width="2" fill="none" marker-end="url(#arrow)"/>')

    # nodes
    for n in sorted(nodes, key=lambda n: (x_level[n], y_pos[n])):
        x, y = x_level[n], y_pos[n]
        color = zone_color(n)
        nid = node_id_map.get(n, "")
        parts.append(f'<rect x="{x}" y="{y}" rx="10" ry="10" width="{node_w}" height="{node_h}" fill="{color}" stroke="#333" stroke-width="1.2"/>')
        parts.append(f'<text x="{x+12}" y="{y+26}" font-size="14" fill="#111" font-weight="700">{svg_escape(nid)}</text>')
        lines = textwrap.wrap(svg_escape(n), width=34)
        for j, line in enumerate(lines[:2]):
            parts.append(f'<text x="{x+12}" y="{y+46 + j*16}" font-size="12" fill="#111">{line}</text>')

    parts.append("</svg></div>")
    return "\n".join(parts)

In [9]:
# -----------------------------
# 3) Índice dst -> fuentes (para recorrer upstream)
# -----------------------------
incoming = defaultdict(list)
for _, r in insumo_base.iterrows():
    incoming[r["tabla_destino"].strip()].append((r["tabla_origen"].strip(), r["query"]))


# -----------------------------
# 4) Generar tb_linaje y tb_html para cada tabla_destino objetivo
# -----------------------------
tb_linaje_rows = []
tb_html_rows = []

targets = sorted(insumo_base["tabla_destino"].unique(), key=lambda s: s.lower())

for target in targets:
    nodes, edges = upstream_closure(target)
    node_id_map = assign_ids_per_objective(nodes, target)

    # tb_linaje: una fila por relación origen->destino; id = id del nodo destino
    for s, d, qry in edges:
        tb_linaje_rows.append({
            "tabla_linaje_objetivo": target,
            "id": node_id_map.get(d, ""),
            "tabla_origen": s,
            "tabla_destino": d,
            "query": qry
        })

    tb_html_rows.append({
        "tabla_destino": target,
        "html": build_svg_for_target(target, nodes, edges, node_id_map)
    })

tb_linaje = pd.DataFrame(tb_linaje_rows)
tb_html = pd.DataFrame(tb_html_rows)

# -----------------------------
# 5) Guardar CSVs
# -----------------------------
out_dir = "./resultados_linaje"
insumo_path = out_dir + "/insumo_base_ejemplo.csv"
linaje_path = out_dir +"/tb_linaje_generado.csv"
html_path = out_dir + "/tb_html_generado.csv"

insumo_base.to_csv(insumo_path, index=False, quoting=csv.QUOTE_ALL)
tb_linaje.to_csv(linaje_path, index=False, quoting=csv.QUOTE_ALL)
tb_html.to_csv(html_path, index=False, quoting=csv.QUOTE_ALL)

# Mostrar vistas rápidas
insumo_base_head = insumo_base.head(8)
tb_linaje_sample = tb_linaje.sort_values(["tabla_linaje_objetivo","tabla_destino","tabla_origen"]).head(15)
tb_html_list = tb_html[["tabla_destino"]].copy()

(insumo_base_head, tb_linaje_sample, tb_html_list, str(insumo_path), str(linaje_path), str(html_path))

(                          tabla_origen                          tabla_destino  \
 0                      s_core.clientes          proceso_bipa_vpr.stg_clientes   
 1                     s_core.prestamos         proceso_bipa_vpr.stg_prestamos   
 2        proceso_bipa_vpr.stg_clientes          proceso_bipa_vpr.dim_clientes   
 3       proceso_bipa_vpr.stg_prestamos    proceso_bipa_vpr.fact_prestamos_stg   
 4        proceso_bipa_vpr.dim_clientes  resultados_bipa_vpr.tb_fact_prestamos   
 5  proceso_bipa_vpr.fact_prestamos_stg  resultados_bipa_vpr.tb_fact_prestamos   
 6                         s_core.pagos             proceso_bipa_vpr.stg_pagos   
 7           proceso_bipa_vpr.stg_pagos        proceso_bipa_vpr.fact_pagos_stg   
 
                                                query  
 0  create table proceso_bipa_vpr.stg_clientes as ...  
 1  create table proceso_bipa_vpr.stg_prestamos as...  
 2  create table proceso_bipa_vpr.dim_clientes as ...  
 3  create table proceso_bipa_vpr.fa

In [20]:
import pandas as pd, csv, textwrap
from pathlib import Path
from collections import defaultdict, deque

# -----------------------------
# Ejemplo de insumo_base (igual que antes)
# -----------------------------
insumo_rows = [
    ("s_core.clientes", "proceso_bipa_vpr.stg_clientes",
     "create table proceso_bipa_vpr.stg_clientes as select * from s_core.clientes;"),
    ("s_core.prestamos", "proceso_bipa_vpr.stg_prestamos",
     "create table proceso_bipa_vpr.stg_prestamos as select * from s_core.prestamos;"),
    ("proceso_bipa_vpr.stg_clientes", "proceso_bipa_vpr.dim_clientes",
     "create table proceso_bipa_vpr.dim_clientes as select ... from proceso_bipa_vpr.stg_clientes;"),
    ("proceso_bipa_vpr.stg_prestamos", "proceso_bipa_vpr.fact_prestamos_stg",
     "create table proceso_bipa_vpr.fact_prestamos_stg as select ... from proceso_bipa_vpr.stg_prestamos;"),
    ("proceso_bipa_vpr.dim_clientes", "resultados_bipa_vpr.tb_fact_prestamos",
     "create table resultados_bipa_vpr.tb_fact_prestamos as select ... from proceso_bipa_vpr.fact_prestamos_stg fp join proceso_bipa_vpr.dim_clientes dc on ...;"),
    ("proceso_bipa_vpr.fact_prestamos_stg", "resultados_bipa_vpr.tb_fact_prestamos",
     "create table resultados_bipa_vpr.tb_fact_prestamos as select ... from proceso_bipa_vpr.fact_prestamos_stg fp join proceso_bipa_vpr.dim_clientes dc on ...;"),

    ("s_core.pagos", "proceso_bipa_vpr.stg_pagos",
     "create table proceso_bipa_vpr.stg_pagos as select * from s_core.pagos;"),
    ("proceso_bipa_vpr.stg_pagos", "proceso_bipa_vpr.fact_pagos_stg",
     "create table proceso_bipa_vpr.fact_pagos_stg as select ... from proceso_bipa_vpr.stg_pagos;"),
    ("proceso_bipa_vpr.fact_pagos_stg", "resultados_bipa_vpr.tb_fact_pagos",
     "create table resultados_bipa_vpr.tb_fact_pagos as select ... from proceso_bipa_vpr.fact_pagos_stg;"),
    ("proceso_bipa_vpr.dim_clientes", "resultados_bipa_vpr.tb_fact_pagos",
     "create table resultados_bipa_vpr.tb_fact_pagos as select ... from proceso_bipa_vpr.fact_pagos_stg fp join proceso_bipa_vpr.dim_clientes dc on ...;"),

    ("resultados_bipa_vpr.tb_fact_prestamos", "resultados_bipa_vpr.tb_reporte_mensual",
     "create table resultados_bipa_vpr.tb_reporte_mensual as select ... from resultados_bipa_vpr.tb_fact_prestamos;"),
    ("resultados_bipa_vpr.tb_fact_pagos", "resultados_bipa_vpr.tb_reporte_mensual",
     "create table resultados_bipa_vpr.tb_reporte_mensual as select ... from resultados_bipa_vpr.tb_fact_pagos;"),

    ("s_core.cartera", "proceso_riesgo.stg_cartera",
     "create table proceso_riesgo.stg_cartera as select * from s_core.cartera;"),
    ("proceso_riesgo.stg_cartera", "proceso_riesgo.mora_clientes",
     "create table proceso_riesgo.mora_clientes as select ... from proceso_riesgo.stg_cartera;"),
    ("proceso_riesgo.mora_clientes", "resultados_riesgo.tb_alertas_mora",
     "create table resultados_riesgo.tb_alertas_mora as select ... from proceso_riesgo.mora_clientes;"),
]
insumo_base = pd.DataFrame(insumo_rows, columns=["tabla_origen","tabla_destino","query"])

# -----------------------------
# Utilidades
# -----------------------------
def parse_schema(fqn: str):
    fqn = (fqn or "").strip().lower()
    return fqn.split(".", 1)[0] if "." in fqn else fqn

def zone_prefix(fqn: str):
    schema = parse_schema(fqn)
    if schema.startswith("s_"):
        return "S"
    if schema.startswith("proceso"):
        return "P"
    if schema.startswith("resultados") or schema.startswith("resultado"):
        return "R"
    return "U"

def zone_color(fqn: str):
    z = zone_prefix(fqn)
    if z == "S":
        return "#e74c3c"
    if z == "P":
        return "#f1c40f"
    if z == "R":
        return "#2ecc71"
    return "#95a5a6"

def svg_escape(s: str):
    return (str(s)
            .replace("&","&amp;")
            .replace("<","&lt;")
            .replace(">","&gt;")
            .replace('"',"&quot;"))

# -----------------------------
# Construir índices
# -----------------------------
incoming = defaultdict(list)  # dst -> [(src, query)]
outgoing = defaultdict(list)  # src -> [(dst, query)]
for _, r in insumo_base.iterrows():
    s = r["tabla_origen"].strip()
    d = r["tabla_destino"].strip()
    incoming[d].append((s, r["query"]))
    outgoing[s].append((d, r["query"]))

# -----------------------------
# Closures
# -----------------------------
def upstream_closure(target: str):
    target = target.strip()
    visited = set([target])
    q = deque([target])
    edges = []
    while q:
        d = q.popleft()
        for s, qry in incoming.get(d, []):
            edges.append((s, d, qry))
            if s not in visited:
                visited.add(s)
                q.append(s)
    return visited, edges

def downstream_closure(root: str):
    root = root.strip()
    visited = set([root])
    q = deque([root])
    edges = []
    while q:
        s = q.popleft()
        for d, qry in outgoing.get(s, []):
            edges.append((s, d, qry))
            if d not in visited:
                visited.add(d)
                q.append(d)
    return visited, edges

# -----------------------------
# Levels para layout
# -----------------------------
def levels_upstream(edges, target):
    level = {target: 0}
    for _ in range(200):
        changed = False
        for s, d, _ in edges:
            if d in level:
                cand = level[d] + 1
                if s not in level or cand > level[s]:
                    level[s] = cand
                    changed = True
        if not changed:
            break
    return level

def levels_downstream(edges, root):
    level = {root: 0}
    for _ in range(200):
        changed = False
        for s, d, _ in edges:
            if s in level:
                cand = level[s] + 1
                if d not in level or cand > level[d]:
                    level[d] = cand
                    changed = True
        if not changed:
            break
    return level

# -----------------------------
# IDs (reinicio por objetivo)
# -----------------------------
def assign_ids_per_objective(nodes, level_map, kind):
    def sort_key(n):
        lv = level_map.get(n, -1)
        return ((-lv, n.lower()) if kind == "upstream" else (lv, n.lower()))
    ordered = sorted(nodes, key=sort_key)

    counters = {"S":1, "P":1, "R":1, "U":1}
    node_id = {}
    for n in ordered:
        z = zone_prefix(n)
        node_id[n] = f"{z}{counters[z]}"
        counters[z] += 1
    return node_id

# -----------------------------
# SVG/HTML (con scrollbars "fijos" al borde del visual)
# -----------------------------
def build_svg_html(nodes, edges, level_map, kind, node_ids):
    dx = 350  # más separación horizontal
    dy = 140
    node_w, node_h = 270, 74

    max_level = max(level_map.values()) if level_map else 0

    if kind == "upstream":
        x_level = {n: (max_level - level_map.get(n, max_level)) * dx for n in nodes}
    else:
        x_level = {n: level_map.get(n, 0) * dx for n in nodes}

    buckets = defaultdict(list)
    for n in nodes:
        buckets[level_map.get(n, 0)].append(n)
    for k in buckets:
        buckets[k].sort(key=lambda n: n.lower())

    y_pos = {}
    for k, ns in buckets.items():
        for i, n in enumerate(ns):
            y_pos[n] = 60 + i * dy

    width = (max_level + 1) * dx + 520
    height = (max(y_pos.values()) if y_pos else 0) + 260

    parts = []
    parts.append('<div style="position:relative;width:100%;height:320px;background:#ffffff;">')
    parts.append('<div style="position:absolute;inset:0;overflow-x:scroll;overflow-y:scroll;background:#ffffff;">')
    parts.append(f'<div style="width:{width}px;height:{height}px;">')
    parts.append(f'<svg xmlns="http://www.w3.org/2000/svg" width="{width}" height="{height}" style="background:#ffffff;">')

    parts.append("""
<defs>
  <marker id="arrow" viewBox="0 0 10 10" refX="10" refY="5" markerWidth="7" markerHeight="7" orient="auto-start-reverse">
    <path d="M 0 0 L 10 5 L 0 10 z" fill="#555"></path>
  </marker>
</defs>
""".strip())

    # edges
    for s, d, _ in edges:
        if s not in x_level or d not in x_level:
            continue
        x1 = x_level[s] + node_w
        y1 = y_pos[s] + node_h/2
        x2 = x_level[d]
        y2 = y_pos[d] + node_h/2
        mx = (x1 + x2) / 2
        parts.append(
            f'<path d="M {x1} {y1} C {mx} {y1}, {mx} {y2}, {x2} {y2}" '
            f'stroke="#555" stroke-width="2" fill="none" marker-end="url(#arrow)"/>'
        )

    # nodes rectangles
    for n in sorted(nodes, key=lambda n: (x_level.get(n,0), y_pos.get(n,0))):
        x, y = x_level[n], y_pos[n]
        parts.append(
            f'<rect x="{x}" y="{y}" rx="10" ry="10" width="{node_w}" height="{node_h}" '
            f'fill="{zone_color(n)}" stroke="#333" stroke-width="1.2"/>'
        )

    # labels
    for n in sorted(nodes, key=lambda n: (x_level.get(n,0), y_pos.get(n,0))):
        x, y = x_level[n], y_pos[n]
        nid = node_ids.get(n, "")
        parts.append(f'<text x="{x+12}" y="{y+28}" font-size="14" fill="#111" font-weight="700">{svg_escape(nid)}</text>')
        lines = textwrap.wrap(svg_escape(n), width=34)
        for j, line in enumerate(lines[:2]):
            parts.append(f'<text x="{x+12}" y="{y+50 + j*16}" font-size="12" fill="#111">{line}</text>')

    parts.append("</svg></div></div></div>")
    return "\n".join(parts)

# -----------------------------
# Objetivos:
#  A) todos los tabla_destino (upstream)
#  B) todas las tablas crudas s_* que aparezcan como tabla_origen (downstream)
# -----------------------------
dest_targets = sorted(insumo_base["tabla_destino"].unique(), key=lambda s: s.lower())
raw_sources = sorted({t for t in insumo_base["tabla_origen"].unique() if parse_schema(t).startswith("s_")}, key=lambda s: s.lower())

objectives = [("upstream", t) for t in dest_targets] + [("downstream", s) for s in raw_sources]

# -----------------------------
# Generar tb_linaje y tb_html
# -----------------------------
tb_linaje_rows = []
tb_html_rows = []

for kind, obj in objectives:
    if kind == "upstream":
        nodes, edges = upstream_closure(obj)
        level_map = levels_upstream(edges, obj)
    else:
        nodes, edges = downstream_closure(obj)
        level_map = levels_downstream(edges, obj)

    node_ids = assign_ids_per_objective(nodes, level_map, kind)
    html = build_svg_html(nodes, edges, level_map, kind, node_ids)

    # tb_linaje
    for s, d, qry in edges:
        tb_linaje_rows.append({
            "tabla_linaje_objetivo": obj,
            "id": node_ids.get(d, ""),
            "tabla_origen": s,
            "tabla_destino": d,
            "query": qry
        })

    tb_html_rows.append({"tabla_destino": obj, "html": html})

tb_linaje_v2 = pd.DataFrame(tb_linaje_rows)
tb_html_v2 = pd.DataFrame(tb_html_rows)

# -----------------------------
# Guardar CSVs
# -----------------------------
out_dir = "./resultados_linaje"
linaje_path = out_dir + "/tb_linaje_generado.csv"
html_path = out_dir +"/tb_html_generado.csv"

tb_linaje_v2.to_csv(linaje_path, index=False, quoting=csv.QUOTE_ALL)
tb_html_v2.to_csv(html_path, index=False, quoting=csv.QUOTE_ALL)

summary = pd.DataFrame({
    "objetivos_destino_upstream": [len(dest_targets)],
    "objetivos_crudos_downstream": [len(raw_sources)],
    "total_objetivos": [len(objectives)],
    "filas_tb_linaje": [len(tb_linaje_v2)],
    "filas_tb_html": [len(tb_html_v2)]
})

(str(linaje_path), str(html_path), summary)


('./resultados_linaje/tb_linaje_generado.csv',
 './resultados_linaje/tb_html_generado.csv',
    objetivos_destino_upstream  objetivos_crudos_downstream  total_objetivos  \
 0                          12                            4               16   
 
    filas_tb_linaje  filas_tb_html  
 0               56             16  )

In [15]:
import pandas as pd, csv, textwrap
from pathlib import Path
from collections import defaultdict, deque

# Example insumo_base
insumo_rows = [
    ("s_core.clientes", "proceso_bipa_vpr.stg_clientes",
     "create table proceso_bipa_vpr.stg_clientes as select * from s_core.clientes;"),
    ("s_core.prestamos", "proceso_bipa_vpr.stg_prestamos",
     "create table proceso_bipa_vpr.stg_prestamos as select * from s_core.prestamos;"),
    ("proceso_bipa_vpr.stg_clientes", "proceso_bipa_vpr.dim_clientes",
     "create table proceso_bipa_vpr.dim_clientes as select ... from proceso_bipa_vpr.stg_clientes;"),
    ("proceso_bipa_vpr.stg_prestamos", "proceso_bipa_vpr.fact_prestamos_stg",
     "create table proceso_bipa_vpr.fact_prestamos_stg as select ... from proceso_bipa_vpr.stg_prestamos;"),
    ("proceso_bipa_vpr.dim_clientes", "resultados_bipa_vpr.tb_fact_prestamos",
     "create table resultados_bipa_vpr.tb_fact_prestamos as select ... from proceso_bipa_vpr.fact_prestamos_stg fp join proceso_bipa_vpr.dim_clientes dc on ...;"),
    ("proceso_bipa_vpr.fact_prestamos_stg", "resultados_bipa_vpr.tb_fact_prestamos",
     "create table resultados_bipa_vpr.tb_fact_prestamos as select ... from proceso_bipa_vpr.fact_prestamos_stg fp join proceso_bipa_vpr.dim_clientes dc on ...;"),

    ("s_core.pagos", "proceso_bipa_vpr.stg_pagos",
     "create table proceso_bipa_vpr.stg_pagos as select * from s_core.pagos;"),
    ("proceso_bipa_vpr.stg_pagos", "proceso_bipa_vpr.fact_pagos_stg",
     "create table proceso_bipa_vpr.fact_pagos_stg as select ... from proceso_bipa_vpr.stg_pagos;"),
    ("proceso_bipa_vpr.fact_pagos_stg", "resultados_bipa_vpr.tb_fact_pagos",
     "create table resultados_bipa_vpr.tb_fact_pagos as select ... from proceso_bipa_vpr.fact_pagos_stg;"),
    ("proceso_bipa_vpr.dim_clientes", "resultados_bipa_vpr.tb_fact_pagos",
     "create table resultados_bipa_vpr.tb_fact_pagos as select ... from proceso_bipa_vpr.fact_pagos_stg fp join proceso_bipa_vpr.dim_clientes dc on ...;"),

    ("resultados_bipa_vpr.tb_fact_prestamos", "resultados_bipa_vpr.tb_reporte_mensual",
     "create table resultados_bipa_vpr.tb_reporte_mensual as select ... from resultados_bipa_vpr.tb_fact_prestamos;"),
    ("resultados_bipa_vpr.tb_fact_pagos", "resultados_bipa_vpr.tb_reporte_mensual",
     "create table resultados_bipa_vpr.tb_reporte_mensual as select ... from resultados_bipa_vpr.tb_fact_pagos;"),

    ("s_core.cartera", "proceso_riesgo.stg_cartera",
     "create table proceso_riesgo.stg_cartera as select * from s_core.cartera;"),
    ("proceso_riesgo.stg_cartera", "proceso_riesgo.mora_clientes",
     "create table proceso_riesgo.mora_clientes as select ... from proceso_riesgo.stg_cartera;"),
    ("proceso_riesgo.mora_clientes", "resultados_riesgo.tb_alertas_mora",
     "create table resultados_riesgo.tb_alertas_mora as select ... from proceso_riesgo.mora_clientes;"),
]
insumo_base = pd.DataFrame(insumo_rows, columns=["tabla_origen","tabla_destino","query"])

def parse_schema(fqn: str):
    fqn = (fqn or "").strip().lower()
    return fqn.split(".", 1)[0] if "." in fqn else fqn

def zone_prefix(fqn: str):
    schema = parse_schema(fqn)
    if schema.startswith("s_"):
        return "S"
    if schema.startswith("proceso"):
        return "P"
    if schema.startswith("resultados") or schema.startswith("resultado"):
        return "R"
    return "U"

def zone_color(fqn: str):
    z = zone_prefix(fqn)
    if z == "S":
        return "#e74c3c"
    if z == "P":
        return "#f1c40f"
    if z == "R":
        return "#2ecc71"
    return "#95a5a6"

def svg_escape(s: str):
    return (str(s)
            .replace("&","&amp;")
            .replace("<","&lt;")
            .replace(">","&gt;")
            .replace('"',"&quot;"))

incoming = defaultdict(list)
outgoing = defaultdict(list)
for _, r in insumo_base.iterrows():
    s = r["tabla_origen"].strip()
    d = r["tabla_destino"].strip()
    incoming[d].append((s, r["query"]))
    outgoing[s].append((d, r["query"]))

def upstream_closure(target: str):
    target = target.strip()
    visited = set([target])
    q = deque([target])
    edges = []
    while q:
        d = q.popleft()
        for s, qry in incoming.get(d, []):
            edges.append((s, d, qry))
            if s not in visited:
                visited.add(s)
                q.append(s)
    return visited, edges

def downstream_closure(root: str):
    root = root.strip()
    visited = set([root])
    q = deque([root])
    edges = []
    while q:
        s = q.popleft()
        for d, qry in outgoing.get(s, []):
            edges.append((s, d, qry))
            if d not in visited:
                visited.add(d)
                q.append(d)
    return visited, edges

def levels_upstream(edges, target):
    level = {target: 0}
    for _ in range(200):
        changed = False
        for s, d, _ in edges:
            if d in level:
                cand = level[d] + 1
                if s not in level or cand > level[s]:
                    level[s] = cand
                    changed = True
        if not changed:
            break
    return level

def levels_downstream(edges, root):
    level = {root: 0}
    for _ in range(200):
        changed = False
        for s, d, _ in edges:
            if s in level:
                cand = level[s] + 1
                if d not in level or cand > level[d]:
                    level[d] = cand
                    changed = True
        if not changed:
            break
    return level

def assign_ids_per_objective(nodes, level_map, kind):
    def sort_key(n):
        lv = level_map.get(n, -1)
        return ((-lv, n.lower()) if kind == "upstream" else (lv, n.lower()))
    ordered = sorted(nodes, key=sort_key)

    counters = {"S":1, "P":1, "R":1, "U":1}
    node_id = {}
    for n in ordered:
        z = zone_prefix(n)
        node_id[n] = f"{z}{counters[z]}"
        counters[z] += 1
    return node_id

def build_svg_html_v3(nodes, edges, level_map, kind, node_ids):
    dx = 420
    dy = 140
    node_w, node_h = 270, 74

    max_level = max(level_map.values()) if level_map else 0

    if kind == "upstream":
        x_level = {n: (max_level - level_map.get(n, max_level)) * dx for n in nodes}
    else:
        x_level = {n: level_map.get(n, 0) * dx for n in nodes}

    buckets = defaultdict(list)
    for n in nodes:
        buckets[level_map.get(n, 0)].append(n)
    for k in buckets:
        buckets[k].sort(key=lambda n: n.lower())

    y_pos = {}
    for k, ns in buckets.items():
        for i, n in enumerate(ns):
            y_pos[n] = 60 + i * dy

    width = (max_level + 1) * dx + 520
    height = (max(y_pos.values()) if y_pos else 0) + 260

    parts = []
    parts.append('<div style="width:100%;height:100%;min-height:650px;overflow:scroll;background:#ffffff;">')
    parts.append(f'<svg xmlns="http://www.w3.org/2000/svg" width="{width}" height="{height}" style="display:block;background:#ffffff;">')

    parts.append("""
<defs>
  <marker id="arrow" viewBox="0 0 10 10" refX="10" refY="5" markerWidth="7" markerHeight="7" orient="auto-start-reverse">
    <path d="M 0 0 L 10 5 L 0 10 z" fill="#555"></path>
  </marker>
</defs>
""".strip())

    for s, d, _ in edges:
        if s not in x_level or d not in x_level:
            continue
        x1 = x_level[s] + node_w
        y1 = y_pos[s] + node_h/2
        x2 = x_level[d]
        y2 = y_pos[d] + node_h/2
        mx = (x1 + x2) / 2
        parts.append(
            f'<path d="M {x1} {y1} C {mx} {y1}, {mx} {y2}, {x2} {y2}" '
            f'stroke="#555" stroke-width="2" fill="none" marker-end="url(#arrow)"/>'
        )

    for n in sorted(nodes, key=lambda n: (x_level.get(n,0), y_pos.get(n,0))):
        x, y = x_level[n], y_pos[n]
        parts.append(
            f'<rect x="{x}" y="{y}" rx="10" ry="10" width="{node_w}" height="{node_h}" '
            f'fill="{zone_color(n)}" stroke="#333" stroke-width="1.2"/>'
        )

    for n in sorted(nodes, key=lambda n: (x_level.get(n,0), y_pos.get(n,0))):
        x, y = x_level[n], y_pos[n]
        nid = node_ids.get(n, "")
        parts.append(f'<text x="{x+12}" y="{y+28}" font-size="14" fill="#111" font-weight="700">{svg_escape(nid)}</text>')
        lines = textwrap.wrap(svg_escape(n), width=34)
        for j, line in enumerate(lines[:2]):
            parts.append(f'<text x="{x+12}" y="{y+50 + j*16}" font-size="12" fill="#111">{line}</text>')

    parts.append("</svg></div>")
    return "\n".join(parts)

dest_targets = sorted(insumo_base["tabla_destino"].unique(), key=lambda s: s.lower())
raw_sources = sorted({t for t in insumo_base["tabla_origen"].unique() if parse_schema(t).startswith("s_")}, key=lambda s: s.lower())
objectives = [("upstream", t) for t in dest_targets] + [("downstream", s) for s in raw_sources]

tb_linaje_rows = []
tb_html_rows = []

for kind, obj in objectives:
    if kind == "upstream":
        nodes, edges = upstream_closure(obj)
        level_map = levels_upstream(edges, obj)
    else:
        nodes, edges = downstream_closure(obj)
        level_map = levels_downstream(edges, obj)

    node_ids = assign_ids_per_objective(nodes, level_map, kind)
    html = build_svg_html_v3(nodes, edges, level_map, kind, node_ids)

    for s, d, qry in edges:
        tb_linaje_rows.append({
            "tabla_linaje_objetivo": obj,
            "id": node_ids.get(d, ""),
            "tabla_origen": s,
            "tabla_destino": d,
            "query": qry
        })

    tb_html_rows.append({"tabla_destino": obj, "html": html})

tb_linaje_v3 = pd.DataFrame(tb_linaje_rows)
tb_html_v3 = pd.DataFrame(tb_html_rows)

out_dir = "./resultados_linaje"
linaje_path = out_dir + "/tb_linaje_generado.csv"
html_path = out_dir +"/tb_html_generado.csv"

tb_linaje_v3.to_csv(linaje_path, index=False, quoting=csv.QUOTE_ALL)
tb_html_v3.to_csv(html_path, index=False, quoting=csv.QUOTE_ALL)

(str(linaje_path), str(html_path), tb_html_v3.shape, tb_linaje_v3.shape)


('./resultados_linaje/tb_linaje_generado.csv',
 './resultados_linaje/tb_html_generado.csv',
 (16, 2),
 (56, 5))