# Librerías

In [6]:
from collections import defaultdict
import pandas as pd
import psycopg2
from psycopg2 import sql
import re

# Archivo de conexion 

In [13]:
%run Paswords.py

# Probar conexión

In [None]:


try:
    # Establecer conexión
    conn = psycopg2.connect(
        dbname= DB_,            # Nombre de tu base
        user= username_,        # Usuario
        password= password_,    # Password
        host= host_,            
        port= port_             # puerto    
    )

    # Crear cursor
    cur = conn.cursor()

    # Ejecutar una consulta de prueba
    cur.execute("SELECT version();")
    version = cur.fetchone()
    print("Conectado a PostgreSQL:", version[0])

    # Cerrar cursor y conexión
    cur.close()
    conn.close()

except Exception as e:
    print("Error al conectar:", e)


Conectado a PostgreSQL: PostgreSQL 17.6 on x86_64-windows, compiled by msvc-19.44.35213, 64-bit


In [8]:
df = pd.read_excel("Diccionario de Datos SINCO.xlsx")

In [9]:
df.columns

Index(['Orden', 'Esquema', 'Nombre Tabla', 'Nombre Columna', 'Tipo De Dato',
       'Descripcion'],
      dtype='object')

In [10]:
df['Tipo De Dato'].unique()

array(['bigint', 'smallint', 'varchar', 'int', 'char', 'date', 'tinyint',
       'nvarchar', 'money', 'float', 'numeric', 'smallmoney', 'decimal'],
      dtype=object)

In [None]:
# Mapa base 
PG_TYPE_MAP = {
    "bigint":     "BIGINT",
    "smallint":   "SMALLINT",
    "int":        "INTEGER",
    "tinyint":    "SMALLINT",         
    "varchar":    "VARCHAR",          # si trae (n) lo respeta; si no, usa default
    "nvarchar":   "VARCHAR",          # PG es UTF-8 por defecto
    "char":       "CHAR",             # si trae (n) lo respeta; si no, usa default
    "text":       "TEXT",
    "date":       "DATE",
    "float":      "DOUBLE PRECISION", # en SQL Server float ~ double precision
    "numeric":    "NUMERIC",          # si trae (p,s) lo respeta; si no, default
    "decimal":    "NUMERIC",          # sinónimo en PG
    "money":      "NUMERIC(19,4)",    # recomendado en vez de PG money
    "smallmoney": "NUMERIC(10,4)",
    # extras por si aparecen
    "boolean":    "BOOLEAN",
    "bool":       "BOOLEAN",
    "bytea":      "BYTEA",
    "uuid":       "UUID",
}
DEFAULTS = {
    "varchar": 255,   # VARCHAR sin longitud → VARCHAR(255)
    "char": 1,        # CHAR sin longitud → CHAR(1)
    "numeric": (18, 2)  # NUMERIC sin (p,s) → NUMERIC(18,2)
}
_PAREN_RE = re.compile(r"^([a-zA-Z_]+)\s*\(\s*([0-9]+)(?:\s*,\s*([0-9]+))?\s*\)\s*$")

# Creación de plantilla de BD

In [None]:
# ========= CONFIGURA AQUÍ =========
EXCEL_PATH   = "Diccionario de Datos SINCO.xlsx"  # Ruta archivo Excel
SHEET_NAME   = 0                                  # índice o nombre de hoja

HOST         = host_
PORT         = port_n
DBNAME       = DB_          # <-- tu base de datos en PG
USER         = username_
PWD          = password_

INFERIR_PK   = True              # Detecta PK por nombre (ID o <TABLA>ID)
DEFAULT_SCHEMA = "public"        # Si no viene esquema en el Excel
# ==================================


def map_tipo_postgres(raw: str) -> str:
    if raw is None:
        return ""
    t = str(raw).strip().lower()
    if not t:
        return ""
    m = _PAREN_RE.match(t)
    if m:
        base, p1, p2 = m.group(1).lower(), m.group(2), m.group(3)
        base = PG_TYPE_MAP.get(base, base)
        if base.upper() in ("VARCHAR", "CHAR"):
            return f"{base.upper()}({int(p1)})"
        if base.upper() in ("NUMERIC", "DECIMAL"):
            if p2 is not None:
                return f"{base.upper()}({int(p1)},{int(p2)})"
            else:
                return f"{base.upper()}({int(p1)})"
        return f"{base.upper()}({p1}{','+p2 if p2 else ''})"
    base = PG_TYPE_MAP.get(t, None)
    if base:
        if base.upper() == "VARCHAR":
            return f"VARCHAR({DEFAULTS['varchar']})"
        if base.upper() == "CHAR":
            return f"CHAR({DEFAULTS['char']})"
        if base.upper() == "NUMERIC":
            p, s = DEFAULTS["numeric"]
            return f"NUMERIC({p},{s})"
        return base.upper()
    return f"VARCHAR({DEFAULTS['varchar']})"

def qident(name: str) -> str:
    s = str(name).replace('"', '""').strip()
    return f'"{s}"'

def pick_col(df: pd.DataFrame, name_like: str) -> str:
    target = name_like.lower()
    for c in df.columns:
        lc = c.lower().strip()
        if lc == target or target in lc:
            return c
    raise ValueError(f"No se encontró la columna requerida: {name_like}")

def main():
    # Conexión
    conn = psycopg2.connect(dbname=DBNAME, user=USER, password=PWD, host=HOST, port=PORT)
    conn.autocommit = False
    cur = conn.cursor()
    print(f"✅ Conexión OK a PostgreSQL")

    # Leer Excel
    df = pd.read_excel(EXCEL_PATH, sheet_name=SHEET_NAME, dtype=str).fillna("")
    print(f"📄 Filas leídas: {len(df)}")

    COL_ESQUEMA = pick_col(df, "Esquema")
    COL_TABLA   = pick_col(df, "Nombre Tabla")
    COL_COLUMNA = pick_col(df, "Nombre Columna")
    COL_TIPO    = pick_col(df, "Tipo De Dato")

    norm = lambda x: str(x).strip()
    norm_schema = lambda x: (norm(x) if norm(x) else DEFAULT_SCHEMA)

    df["_schema"] = df[COL_ESQUEMA].apply(norm_schema).str.lower()
    df["_table"]  = df[COL_TABLA].apply(norm).str.lower()
    df["_col"]    = df[COL_COLUMNA].apply(norm).str.lower()
    df["_type"]   = df[COL_TIPO].apply(map_tipo_postgres)

    bad = df[(df["_table"]=="") | (df["_col"]=="") | (df["_type"]=="")]
    if not bad.empty:
        print("⚠️ Filas con tabla/columna/tipo vacío (se omiten):")
        print(bad[[COL_ESQUEMA, COL_TABLA, COL_COLUMNA, COL_TIPO]].head(10))
        df = df[(df["_table"]!="") & (df["_col"]!="") & (df["_type"]!="")]

    group = defaultdict(list)
    for _, r in df.iterrows():
        group[(r["_schema"], r["_table"])].append(r)

    tablas_proc = 0
    for (sch, tbl), rows in group.items():
        if not tbl:
            continue

        # Crear esquema si no existe
        cur.execute(sql.SQL("CREATE SCHEMA IF NOT EXISTS {}").format(sql.Identifier(sch)))

        cols_sql = []
        pk_cols = []
        candidatos_pk = {f"{tbl}id", "id"} if INFERIR_PK else set()

        for r in rows:
            col = r["_col"]
            t   = r["_type"].upper()
            cols_sql.append(f'{qident(col)} {t}')
            if col in candidatos_pk:
                pk_cols.append(col)

        pk_clause = ""
        if pk_cols:
            pk_list = ", ".join(qident(c) for c in pk_cols)
            pk_clause = f", CONSTRAINT {qident('pk_'+tbl)} PRIMARY KEY ({pk_list})"

        create_stmt = (
            f'CREATE TABLE IF NOT EXISTS {qident(sch)}.{qident(tbl)} '
            f'({", ".join(cols_sql)}{pk_clause});'
        )

        try:
            cur.execute(create_stmt)
            tablas_proc += 1
        except Exception as e:
            print("❌ Error creando tabla:", f"{sch}.{tbl}")
            print("DDL que falló:\n", create_stmt)
            conn.rollback()
            raise e

    conn.commit()
    cur.close(); conn.close()
    print(f"🏁 Listo. Tablas procesadas/aseguradas: {tablas_proc}.")
    print(f"   Ejemplo para listar tablas del esquema:")
    print(f"   SELECT tablename FROM pg_tables WHERE schemaname = '{DEFAULT_SCHEMA}';")

if __name__ == "__main__":
    main()

✅ Conexión OK a PostgreSQL DB=ARPRO_Postgre como arpro
📄 Filas leídas: 531
🏁 Listo. Tablas procesadas/aseguradas: 50.
   Ejemplo para listar tablas del esquema:
   SELECT tablename FROM pg_tables WHERE schemaname = 'public';
