In [1]:
import os
from sqlalchemy import create_engine, text

db_url = os.environ.get("DATABASE_URL", "postgresql://postgres:postgres@127.0.0.1:54322/postgres")
engine = create_engine(db_url)

schema = "public"
table = None

with engine.begin() as c:
    tables = c.execute(text("""
        SELECT table_name
        FROM information_schema.tables
        WHERE table_schema=:s AND table_type='BASE TABLE'
        ORDER BY table_name
    """), {"s": schema}).scalars().all()
print("tables:", tables)

table = tables[0] if tables else None
if table:
    with engine.begin() as c:
        cols = c.execute(text("""
            SELECT column_name, data_type, is_nullable, column_default
            FROM information_schema.columns
            WHERE table_schema=:s AND table_name=:t
            ORDER BY ordinal_position
        """), {"s": schema, "t": table}).mappings().all()
        pks = c.execute(text("""
            SELECT kcu.column_name
            FROM information_schema.table_constraints tc
            JOIN information_schema.key_column_usage kcu
              ON tc.constraint_name = kcu.constraint_name
             AND tc.table_schema   = kcu.table_schema
            WHERE tc.table_schema=:s AND tc.table_name=:t AND tc.constraint_type='PRIMARY KEY'
            ORDER BY kcu.ordinal_position
        """), {"s": schema, "t": table}).scalars().all()
        fks = c.execute(text("""
            SELECT kcu.column_name AS child_column, ccu.table_name AS parent_table, ccu.column_name AS parent_column
            FROM information_schema.table_constraints tc
            JOIN information_schema.key_column_usage kcu
              ON tc.constraint_name = kcu.constraint_name
             AND tc.table_schema   = kcu.table_schema
            JOIN information_schema.constraint_column_usage ccu
              ON ccu.constraint_name = tc.constraint_name
             AND ccu.table_schema   = tc.table_schema
            WHERE tc.table_schema=:s AND tc.table_name=:t AND tc.constraint_type='FOREIGN KEY'
        """), {"s": schema, "t": table}).mappings().all()

    print("columns:", cols)
    print("primary_keys:", pks)
    print("foreign_keys:", [dict(r) for r in fks])

with engine.begin() as c:
    fish = c.execute(text("""
        SELECT table_name
        FROM information_schema.tables
        WHERE table_schema=:s AND table_type='BASE TABLE' AND table_name ILIKE '%fish%'
        ORDER BY table_name
    """), {"s": schema}).scalars().all()
    mounts = c.execute(text("""
        SELECT table_name
        FROM information_schema.tables
        WHERE table_schema=:s AND table_type='BASE TABLE' AND table_name ILIKE '%mount%'
        ORDER BY table_name
    """), {"s": schema}).scalars().all()
print("fish_tables:", fish)
print("mount_tables:", mounts)


tables: ['cassettes', 'cassettes_elements', 'elements', 'fish', 'fish_mounts', 'fish_mutations', 'fish_selectedphenotypes', 'fish_strains', 'fish_transgenes', 'fish_treatments', 'fish_year_counters', 'mounts', 'mutations', 'plasmids', 'plasmids_cassettes', 'profiles', 'selectedphenotypes', 'strains', 'tanks', 'transgenes', 'treatments']
columns: [{'column_name': 'id', 'data_type': 'bigint', 'is_nullable': 'NO', 'column_default': "nextval('cassettes_id_seq'::regclass)"}, {'column_name': 'name', 'data_type': 'text', 'is_nullable': 'NO', 'column_default': None}, {'column_name': 'notes', 'data_type': 'text', 'is_nullable': 'YES', 'column_default': None}, {'column_name': 'created_by', 'data_type': 'uuid', 'is_nullable': 'NO', 'column_default': "'00000000-0000-0000-0000-000000000000'::uuid"}, {'column_name': 'created_at', 'data_type': 'timestamp with time zone', 'is_nullable': 'NO', 'column_default': 'now()'}, {'column_name': 'updated_at', 'data_type': 'timestamp with time zone', 'is_nullabl

In [2]:
import os
import pandas as pd
from sqlalchemy import create_engine, text

SCHEMA = "public"
DB_URL = os.environ.get("DATABASE_URL", "postgresql://postgres:postgres@127.0.0.1:54322/postgres")
engine = create_engine(DB_URL)

with engine.begin() as c:
    cols = c.execute(text("""
        SELECT
          table_schema,
          table_name,
          ordinal_position,
          column_name,
          data_type,
          is_nullable,
          column_default
        FROM information_schema.columns
        WHERE table_schema = :s
        ORDER BY table_name, ordinal_position
    """), {"s": SCHEMA}).mappings().all()

    pks = c.execute(text("""
        SELECT
          kcu.table_name,
          kcu.column_name
        FROM information_schema.table_constraints tc
        JOIN information_schema.key_column_usage kcu
          ON tc.constraint_name = kcu.constraint_name
         AND tc.table_schema   = kcu.table_schema
        WHERE tc.table_schema = :s
          AND tc.constraint_type = 'PRIMARY KEY'
    """), {"s": SCHEMA}).mappings().all()

    fks = c.execute(text("""
        SELECT
          tc.table_name       AS child_table,
          kcu.column_name     AS child_column,
          ccu.table_name      AS parent_table,
          ccu.column_name     AS parent_column
        FROM information_schema.table_constraints tc
        JOIN information_schema.key_column_usage kcu
          ON tc.constraint_name = kcu.constraint_name
         AND tc.table_schema   = kcu.table_schema
        JOIN information_schema.constraint_column_usage ccu
          ON ccu.constraint_name = tc.constraint_name
         AND ccu.table_schema   = tc.table_schema
        WHERE tc.table_schema = :s
          AND tc.constraint_type = 'FOREIGN KEY'
    """), {"s": SCHEMA}).mappings().all()

df_cols = pd.DataFrame(cols)
df_pks  = pd.DataFrame(pks)
df_fks  = pd.DataFrame(fks)

if not df_cols.empty:
    df_cols["is_pk"] = False
    if not df_pks.empty:
        df_cols["is_pk"] = (
            df_cols.set_index(["table_name","column_name"])
                  .index.isin(df_pks.set_index(["table_name","column_name"]).index)
        )
    if not df_fks.empty:
        fk_map = (df_fks.assign(fk_ref=lambda d: d["parent_table"] + "." + d["parent_column"])
                        .set_index(["child_table","child_column"])["fk_ref"])
        df_cols["fk_ref"] = df_cols.apply(
            lambda r: fk_map.get((r["table_name"], r["column_name"]), None), axis=1
        )
    else:
        df_cols["fk_ref"] = None

    df_cols = df_cols.rename(columns={
        "table_schema":"schema",
        "table_name":"table",
        "ordinal_position":"position",
        "column_name":"column",
        "data_type":"type",
        "is_nullable":"nullable",
        "column_default":"default"
    })[["schema","table","position","column","type","nullable","default","is_pk","fk_ref"]]

desktop = os.path.join(os.path.expanduser("~"), "Desktop")
os.makedirs(desktop, exist_ok=True)
csv_path = os.path.join(desktop, f"{SCHEMA}_tables_fields.csv")
xlsx_path = os.path.join(desktop, f"{SCHEMA}_tables_fields.xlsx")

df_cols.to_csv(csv_path, index=False)
xlsx_ok = False
try:
    with pd.ExcelWriter(xlsx_path) as w:
        df_cols.to_excel(w, index=False, sheet_name="schema")
    xlsx_ok = True
except Exception:
    pass

print("Saved:", csv_path)
print("Saved:", xlsx_path if xlsx_ok else "(Excel skipped; install openpyxl to enable)")
df_cols.head(20)


Saved: /Users/davekokel/Desktop/public_tables_fields.csv
Saved: /Users/davekokel/Desktop/public_tables_fields.xlsx


Unnamed: 0,schema,table,position,column,type,nullable,default,is_pk,fk_ref
0,public,cassettes,1,id,bigint,NO,nextval('cassettes_id_seq'::regclass),True,
1,public,cassettes,2,name,text,NO,,False,
2,public,cassettes,3,notes,text,YES,,False,
3,public,cassettes,4,created_by,uuid,NO,'00000000-0000-0000-0000-000000000000'::uuid,False,
4,public,cassettes,5,created_at,timestamp with time zone,NO,now(),False,
5,public,cassettes,6,updated_at,timestamp with time zone,NO,now(),False,
6,public,cassettes_elements,1,id,bigint,NO,nextval('cassettes_elements_id_seq'::regclass),True,
7,public,cassettes_elements,2,cassette_id,bigint,NO,,False,cassettes.id
8,public,cassettes_elements,3,element_id,bigint,NO,,False,elements.id
9,public,cassettes_elements,4,position,integer,NO,1,False,
