In [50]:
"""PostgreSQL schema statistics utility.

Connects to a PostgreSQL database, discovers all tables in a given schema,
and computes descriptive statistics for each column based on its data type.

Usage (CLI):
    python -m scripts.pg_schema_stats \
        --host localhost --port 5432 --dbname mydb --user myuser --password mypass \
        --schema public --output stats.json

The script supports either a full DSN via --dsn or individual connection params.
"""

from __future__ import annotations

import argparse
import json
import os
import sys
from dataclasses import dataclass
from typing import Any, Dict, Iterable, List, Optional, Tuple

import psycopg2
from psycopg2 import sql

In [96]:
@dataclass
class ConnectionParams:
    host: Optional[str] = None
    port: Optional[int] = None
    dbname: Optional[str] = None
    user: Optional[str] = None
    password: Optional[str] = None
    dsn: Optional[str] = None


def connect(params: ConnectionParams):
    """Create a psycopg2 connection from provided parameters or DSN."""
    if params.dsn:
        return psycopg2.connect(params.dsn)
    conn_kwargs: Dict[str, Any] = {}
    if params.host:
        conn_kwargs["host"] = params.host
    if params.port:
        conn_kwargs["port"] = params.port
    if params.dbname:
        conn_kwargs["dbname"] = params.dbname
    if params.user:
        conn_kwargs["user"] = params.user
    if params.password:
        conn_kwargs["password"] = params.password
    return psycopg2.connect(**conn_kwargs)


def list_tables(conn, schema: str) -> List[str]:
    """Return all base table names for a given schema."""
    query = (
        """
        SELECT table_name
        FROM information_schema.tables
        WHERE table_schema = %s AND table_type = 'BASE TABLE'
        ORDER BY table_name
        """
    )
    with conn.cursor() as cur:
        cur.execute(query, (schema,))
        rows = cur.fetchall()
    return [r[0] for r in rows]

def list_views(conn, schemas: List[str]) -> List[str]:
    """Return all view names for a given schema."""
    query = (
        """
        SELECT table_schema, table_name
        FROM information_schema.views
        WHERE table_schema = ANY(%s)
        ORDER BY table_schema, table_name
        """
    )
    with conn.cursor() as cur:
        cur.execute(query, (schemas,))
        rows = cur.fetchall()
        print(f"Rows: {rows}")
    # result contains tuples of (table_schema, table_name)
    return [f"{r[0]}.{r[1]}" for r in rows]


def list_columns(conn, schema: str, table: str) -> List[Tuple[str, str, str]]:
    """Return list of (column_name, data_type, udt_name) for table."""
    query = (
        """
        SELECT column_name, data_type, udt_name
        FROM information_schema.columns
        WHERE table_schema = %s AND table_name = %s
        ORDER BY ordinal_position
        """
    )
    with conn.cursor() as cur:
        cur.execute(query, (schema, table))
        rows = cur.fetchall()
    return [(r[0], r[1], r[2]) for r in rows]


def categorize_type(data_type: str, udt_name: str) -> str:
    """Map PostgreSQL data types to a coarse category for statistics."""
    dt = (data_type or "").lower()
    udt = (udt_name or "").lower()

    numeric_types = {
        "smallint",
        "integer",
        "bigint",
        "decimal",
        "numeric",
        "real",
        "double precision",
        "smallserial",
        "serial",
        "bigserial",
        "money",
    }
    text_types = {"text", "character varying", "character", "varchar", "char", "citext"}
    bool_types = {"boolean"}
    date_types = {"date"}
    timestamp_types = {"timestamp without time zone", "timestamp with time zone"}
    time_types = {"time without time zone", "time with time zone"}
    json_types = {"json", "jsonb"}

    if dt in numeric_types or udt in {"int2", "int4", "int8", "float4", "float8"}:
        return "numeric"
    if dt in bool_types:
        return "boolean"
    if dt in date_types:
        return "date"
    if dt in timestamp_types:
        return "timestamp"
    if dt in time_types:
        return "time"
    if dt in json_types:
        return "json"
    if dt in text_types:
        return "text"
    # Default to text-like for unknowns
    return "text"


def fetchone_dict(cur) -> Dict[str, Any]:
    """Convert a single-row cursor result into a dict keyed by column names."""
    row = cur.fetchone()
    if row is None:
        return {}
    desc = cur.description or []
    return {desc[i].name: row[i] for i in range(len(desc))}


def compute_numeric_stats(conn, schema: str, table: str, column: str) -> Dict[str, Any]:
    query = sql.SQL(
        """
        SELECT
            COUNT(*)::bigint AS total_rows,
            COUNT({col})::bigint AS non_null_rows,
            (COUNT(*) - COUNT({col}))::bigint AS null_rows,
            COUNT(DISTINCT {col})::bigint AS distinct_values,
            AVG({col}) AS avg_value,
            MIN({col}) AS min_value,
            MAX({col}) AS max_value,
            STDDEV_SAMP({col}) AS stddev,
            PERCENTILE_CONT(0.25) WITHIN GROUP (ORDER BY {col}) AS p25,
            PERCENTILE_CONT(0.50) WITHIN GROUP (ORDER BY {col}) AS median,
            PERCENTILE_CONT(0.75) WITHIN GROUP (ORDER BY {col}) AS p75
        FROM {sch}.{tbl}
        """
    ).format(
        col=sql.Identifier(column),
        sch=sql.Identifier(schema),
        tbl=sql.Identifier(table),
    )
    with conn.cursor() as cur:
        cur.execute(query)
        return fetchone_dict(cur)


def compute_boolean_stats(conn, schema: str, table: str, column: str) -> Dict[str, Any]:
    query = sql.SQL(
        """
        SELECT
            COUNT(*)::bigint AS total_rows,
            SUM(CASE WHEN {col} = TRUE THEN 1 ELSE 0 END)::bigint AS true_count,
            SUM(CASE WHEN {col} = FALSE THEN 1 ELSE 0 END)::bigint AS false_count,
            SUM(CASE WHEN {col} IS NULL THEN 1 ELSE 0 END)::bigint AS null_count,
            COUNT(DISTINCT {col})::bigint AS distinct_values
        FROM {sch}.{tbl}
        """
    ).format(
        col=sql.Identifier(column),
        sch=sql.Identifier(schema),
        tbl=sql.Identifier(table),
    )
    with conn.cursor() as cur:
        cur.execute(query)
        base = fetchone_dict(cur)

    top_values = top_value_counts(conn, schema, table, column, limit=3)
    base["top_values"] = top_values
    return base


def compute_text_stats(conn, schema: str, table: str, column: str) -> Dict[str, Any]:
    query = sql.SQL(
        """
        SELECT
            COUNT(*)::bigint AS total_rows,
            COUNT({col})::bigint AS non_null_rows,
            (COUNT(*) - COUNT({col}))::bigint AS null_rows,
            COUNT(DISTINCT {col})::bigint AS distinct_values,
            AVG(LENGTH({col})) AS avg_length,
            MIN(LENGTH({col})) AS min_length,
            MAX(LENGTH({col})) AS max_length
        FROM {sch}.{tbl}
        """
    ).format(
        col=sql.Identifier(column),
        sch=sql.Identifier(schema),
        tbl=sql.Identifier(table),
    )
    with conn.cursor() as cur:
        cur.execute(query)
        base = fetchone_dict(cur)

    base["top_values"] = top_value_counts(conn, schema, table, column, limit=10)
    return base


def compute_date_like_stats(conn, schema: str, table: str, column: str) -> Dict[str, Any]:
    query = sql.SQL(
        """
        SELECT
            COUNT(*)::bigint AS total_rows,
            COUNT({col})::bigint AS non_null_rows,
            (COUNT(*) - COUNT({col}))::bigint AS null_rows,
            MIN({col}) AS min_value,
            MAX({col}) AS max_value,
            COUNT(DISTINCT {col})::bigint AS distinct_values
        FROM {sch}.{tbl}
        """
    ).format(
        col=sql.Identifier(column),
        sch=sql.Identifier(schema),
        tbl=sql.Identifier(table),
    )
    with conn.cursor() as cur:
        cur.execute(query)
        base = fetchone_dict(cur)

    base["top_values"] = top_value_counts(conn, schema, table, column, limit=10)
    return base


def compute_json_stats(conn, schema: str, table: str, column: str) -> Dict[str, Any]:
    query = sql.SQL(
        """
        SELECT
            COUNT(*)::bigint AS total_rows,
            COUNT({col})::bigint AS non_null_rows,
            (COUNT(*) - COUNT({col}))::bigint AS null_rows
        FROM {sch}.{tbl}
        """
    ).format(
        col=sql.Identifier(column),
        sch=sql.Identifier(schema),
        tbl=sql.Identifier(table),
    )
    with conn.cursor() as cur:
        cur.execute(query)
        base = fetchone_dict(cur)
    return base


def top_value_counts(
    conn, schema: str, table: str, column: str, limit: int = 10
) -> List[Dict[str, Any]]:
    """Return top-N value counts for a column. NULLs are excluded."""
    query = sql.SQL(
        """
        SELECT {col} AS value, COUNT(*)::bigint AS count
        FROM {sch}.{tbl}
        WHERE {col} IS NOT NULL
        GROUP BY {col}
        ORDER BY COUNT(*) DESC
        LIMIT {lim}
        """
    ).format(
        col=sql.Identifier(column),
        sch=sql.Identifier(schema),
        tbl=sql.Identifier(table),
        lim=sql.Literal(limit),
    )
    with conn.cursor() as cur:
        cur.execute(query)
        rows = cur.fetchall()
        desc = cur.description or []
        headings = [d.name for d in desc]
    results: List[Dict[str, Any]] = []
    for row in rows:
        item: Dict[str, Any] = {}
        for i in range(len(headings)):
            # Convert non-JSON-serializable values to string
            value = row[i]
            if isinstance(value, (bytes, memoryview)):
                value = value.tobytes().decode("utf-8", errors="replace")
            try:
                json.dumps(value)
                item[headings[i]] = value
            except TypeError:
                item[headings[i]] = str(value)
        results.append(item)
    return results


In [100]:
def get_table_statistics(conn, schema: str, table: str) -> Dict[str, Any]:
    """Generate statistics for all columns in a table."""
    print(f"Processing table: {schema}.{table}")
    cols = list_columns(conn, schema, table)
    print(f"  Found {len(cols)} columns")
    results: Dict[str, Any] = {}
    for i, (column_name, data_type, udt_name) in enumerate(cols, 1):
        print(f"  Processing column {i}/{len(cols)}: {column_name} ({data_type})")
        category = categorize_type(data_type, udt_name)
        if category == "numeric":
            stats = compute_numeric_stats(conn, schema, table, column_name)
        elif category == "boolean":
            stats = compute_boolean_stats(conn, schema, table, column_name)
        elif category in {"date", "timestamp", "time"}:
            stats = compute_date_like_stats(conn, schema, table, column_name)
        elif category == "json":
            stats = compute_json_stats(conn, schema, table, column_name)
        else:
            stats = compute_text_stats(conn, schema, table, column_name)

        results[column_name] = {
            "data_type": data_type,
            "udt_name": udt_name,
            "category": category,
            "stats": stats,
        }
    print(f"  Completed table: {schema}.{table}")
    return results


def get_schema_statistics(
    conn, schemas: List[str], include_tables: Optional[Iterable[str]] = None, exclude_tables: Optional[Iterable[str]] = None
) -> Dict[str, Any]:
    """Compute statistics for all (or selected) tables in a schema."""
    import time
    
    start_time = time.time()
    print(f"Starting schema statistics for: {schemas}")
    include = set(include_tables or [])
    exclude = set(exclude_tables or [])
    tables = list_views(conn, schemas)
    to_process = [t for t in tables if (not include or t in include) and (t not in exclude)]
    print(f"Found {len(tables)} total tables, processing {len(to_process)} tables")

    results: Dict[str, Any] = {}
    for i, full_table_name in enumerate(to_process, 1):
        table_start_time = time.time()
        table_schema, table_name = full_table_name.split(".")
        print(f"Table {i}/{len(to_process)}: {full_table_name}")
        results[full_table_name] = get_table_statistics(conn, table_schema, table_name)
        table_elapsed = time.time() - table_start_time
        print(f"  Completed {full_table_name} in {table_elapsed:.2f}s")
        
        # Show overall progress
        elapsed_time = time.time() - start_time
        avg_time_per_table = elapsed_time / i
        estimated_remaining = avg_time_per_table * (len(to_process) - i)
        print(f"  Progress: {i}/{len(to_process)} ({i/len(to_process)*100:.1f}%) - Elapsed: {elapsed_time:.1f}s - ETA: {estimated_remaining:.1f}s")
    
    total_elapsed = time.time() - start_time
    print(f"Completed schema statistics for: {schemas} in {total_elapsed:.2f}s")
    return results


In [101]:
import dotenv
dotenv.load_dotenv("../.env")

params = ConnectionParams(
        host=os.getenv("DB_HOST"),
        port=os.getenv("DB_PORT"),
        dbname=os.getenv("DB_NAME"),
        user=os.getenv("DB_USER"),
        password=os.getenv("DB_PASSWORD"),
        dsn=os.getenv("DB_DSN"),
    )

conn = connect(params)
print(conn)


<connection object at 0x00000202EE7AF780; dsn: 'user=postgres.mrmlzpkjfosavipxrwct password=xxx dbname=postgres host=aws-0-us-east-1.pooler.supabase.com port=5432', closed: 0>


In [102]:
schemas = [
    # "sp_api_thrive_2", 
    "amazon_ads_thrive"
]
views = list_views(conn, schemas)
views

Rows: [('amazon_ads_thrive', 'ad_group_history_view'), ('amazon_ads_thrive', 'ad_group_level_report_view'), ('amazon_ads_thrive', 'ad_group_serving_status_detail_view'), ('amazon_ads_thrive', 'ad_group_suggested_keyword_view'), ('amazon_ads_thrive', 'advertised_product_report_view'), ('amazon_ads_thrive', 'asin_bid_recommendation_value_view'), ('amazon_ads_thrive', 'asin_suggested_keyword_view'), ('amazon_ads_thrive', 'asin_theme_based_bid_recommendation_view'), ('amazon_ads_thrive', 'campaign_history_view'), ('amazon_ads_thrive', 'campaign_level_report_view'), ('amazon_ads_thrive', 'campaign_negative_keyword_history_view'), ('amazon_ads_thrive', 'campaign_negative_keyword_serving_status_detail_view'), ('amazon_ads_thrive', 'campaign_placement_bidding_view'), ('amazon_ads_thrive', 'campaign_placement_report_view'), ('amazon_ads_thrive', 'campaign_serving_status_detail_view'), ('amazon_ads_thrive', 'keyword_history_view'), ('amazon_ads_thrive', 'keyword_serving_status_detail_view'), ('a

['amazon_ads_thrive.ad_group_history_view',
 'amazon_ads_thrive.ad_group_level_report_view',
 'amazon_ads_thrive.ad_group_serving_status_detail_view',
 'amazon_ads_thrive.ad_group_suggested_keyword_view',
 'amazon_ads_thrive.advertised_product_report_view',
 'amazon_ads_thrive.asin_bid_recommendation_value_view',
 'amazon_ads_thrive.asin_suggested_keyword_view',
 'amazon_ads_thrive.asin_theme_based_bid_recommendation_view',
 'amazon_ads_thrive.campaign_history_view',
 'amazon_ads_thrive.campaign_level_report_view',
 'amazon_ads_thrive.campaign_negative_keyword_history_view',
 'amazon_ads_thrive.campaign_negative_keyword_serving_status_detail_view',
 'amazon_ads_thrive.campaign_placement_bidding_view',
 'amazon_ads_thrive.campaign_placement_report_view',
 'amazon_ads_thrive.campaign_serving_status_detail_view',
 'amazon_ads_thrive.keyword_history_view',
 'amazon_ads_thrive.keyword_serving_status_detail_view',
 'amazon_ads_thrive.negative_keyword_history_view',
 'amazon_ads_thrive.negati

In [103]:
stats = get_schema_statistics(conn, schemas)

print(stats)


Starting schema statistics for: ['amazon_ads_thrive']
Rows: [('amazon_ads_thrive', 'ad_group_history_view'), ('amazon_ads_thrive', 'ad_group_level_report_view'), ('amazon_ads_thrive', 'ad_group_serving_status_detail_view'), ('amazon_ads_thrive', 'ad_group_suggested_keyword_view'), ('amazon_ads_thrive', 'advertised_product_report_view'), ('amazon_ads_thrive', 'asin_bid_recommendation_value_view'), ('amazon_ads_thrive', 'asin_suggested_keyword_view'), ('amazon_ads_thrive', 'asin_theme_based_bid_recommendation_view'), ('amazon_ads_thrive', 'campaign_history_view'), ('amazon_ads_thrive', 'campaign_level_report_view'), ('amazon_ads_thrive', 'campaign_negative_keyword_history_view'), ('amazon_ads_thrive', 'campaign_negative_keyword_serving_status_detail_view'), ('amazon_ads_thrive', 'campaign_placement_bidding_view'), ('amazon_ads_thrive', 'campaign_placement_report_view'), ('amazon_ads_thrive', 'campaign_serving_status_detail_view'), ('amazon_ads_thrive', 'keyword_history_view'), ('amazon_a

In [107]:
import json
print(json.dumps(stats, indent=2, default=str))


{
  "amazon_ads_thrive.ad_group_history_view": {
    "id": {
      "data_type": "character varying",
      "udt_name": "varchar",
      "category": "text",
      "stats": {
        "total_rows": 728,
        "non_null_rows": 728,
        "null_rows": 0,
        "distinct_values": 728,
        "avg_length": "14.6304945054945055",
        "min_length": 12,
        "max_length": 15,
        "top_values": [
          {
            "value": "101180574487644",
            "count": 1
          },
          {
            "value": "101619008626738",
            "count": 1
          },
          {
            "value": "102237366048889",
            "count": 1
          },
          {
            "value": "10228173083157",
            "count": 1
          },
          {
            "value": "10289633950586",
            "count": 1
          },
          {
            "value": "102988116954951",
            "count": 1
          },
          {
            "value": "10298853894571",
            "cou

In [108]:
import json
import os
from datetime import datetime

# Create output directory if it doesn't exist
output_dir = "../data/schema_stats"
os.makedirs(output_dir, exist_ok=True)

# Generate filename with timestamp
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")

for schema in schemas:
    output_file = os.path.join(output_dir, f"{schema}_stats_{timestamp}.json")
    
    # Write statistics to file
    with open(output_file, "w", encoding="utf-8") as f:
        json.dump(stats, f, default=str, indent=2)
    
    print(f"Statistics written to: {output_file}")


Statistics written to: ../data/schema_stats\amazon_ads_thrive_stats_20250901_223956.json


In [109]:

from openai import OpenAI

def generate_column_descriptions(table_name: str, stats_text: str) -> Dict[str, str]:
    # Create the prompt for OpenAI
    prompt = f"""
        You are given metadata and descriptive statistics for every column in a Postgres table named "{table_name}", which contains Amazon e-commerce data.

        Your task is to generate a clear, semantically rich description for each column. 
        The purpose of these descriptions is to improve the accuracy of a text-to-SQL agent that will use them to translate natural language questions into SQL queries.

        Guidelines for writing descriptions:
        - Expand any abbreviations or acronyms into full words.
        - Clarify the real-world meaning of the column (e.g., 'asin' → 'Amazon Standard Identification Number, a unique product identifier').
        - Add hints about typical usage in queries (e.g., whether it’s useful for filtering, grouping, or joining).
        - Use the descriptive statistics and sample values provided below to infer semantic meaning 
        (e.g., categorical vs numeric, date fields, booleans, common text patterns).
        - Include representative example values from the column in the description to illustrate its contents.
        - Keep descriptions concise but informative, written in plain English.
        - Emphasize the Amazon e-commerce context (products, sellers, reviews, orders, customers, prices, etc.).

        Descriptive statistics and examples:
        {stats_text}

        Output format (JSON dictionary):
        {{
            "column_name": "description of the column, including semantic meaning, how it may be used in SQL queries, and 1–3 representative example values",
            "column_name2": "description of the column2, including semantic meaning, how it may be used in SQL queries, and 1–3 representative example values",
            ...
        }}
    """

    # Initialize OpenAI client
    client = OpenAI()

    # Call OpenAI API
    response = client.chat.completions.create(
        model="gpt-5",
        messages=[
            {
                "role": "system",
                "content": prompt,
            },
        ],
        reasoning_effort="high",
    )

    # Return the generated markdown
    return response.choices[0].message.content


In [112]:
views = list(stats.keys())
views

['amazon_ads_thrive.ad_group_history_view',
 'amazon_ads_thrive.ad_group_level_report_view',
 'amazon_ads_thrive.ad_group_serving_status_detail_view',
 'amazon_ads_thrive.ad_group_suggested_keyword_view',
 'amazon_ads_thrive.advertised_product_report_view',
 'amazon_ads_thrive.asin_bid_recommendation_value_view',
 'amazon_ads_thrive.asin_suggested_keyword_view',
 'amazon_ads_thrive.asin_theme_based_bid_recommendation_view',
 'amazon_ads_thrive.campaign_history_view',
 'amazon_ads_thrive.campaign_level_report_view',
 'amazon_ads_thrive.campaign_negative_keyword_history_view',
 'amazon_ads_thrive.campaign_negative_keyword_serving_status_detail_view',
 'amazon_ads_thrive.campaign_placement_bidding_view',
 'amazon_ads_thrive.campaign_placement_report_view',
 'amazon_ads_thrive.campaign_serving_status_detail_view',
 'amazon_ads_thrive.keyword_history_view',
 'amazon_ads_thrive.keyword_serving_status_detail_view',
 'amazon_ads_thrive.negative_keyword_history_view',
 'amazon_ads_thrive.negati

In [133]:
# Column Description Generator
import json
import time
from datetime import datetime
import asyncio
from concurrent.futures import ThreadPoolExecutor
import threading

def generate_descriptions_for_table(table_name, table_stats):
    """Generate descriptions for a single table."""
    try:
        # Format statistics into a readable string
        stats_text = f"Table: {table_name}\n"
        stats_text += "="*50 + "\n"
        
        for column_name, column_info in table_stats.items():
            stats_text += f"\nColumn: {column_name}\n"
            stats_text += f"Data Type: {column_info['data_type']}\n"
            stats_text += f"Category: {column_info['category']}\n"
            stats_text += "Statistics:\n"
            
            # Format the statistics nicely
            column_stats = column_info['stats']
            for stat_name, stat_value in column_stats.items():
                if stat_name == 'top_values' and isinstance(stat_value, list):
                    stats_text += f"  {stat_name}:\n"
                    for value_info in stat_value[:5]:  # Show top 5 values
                        stats_text += f"    '{value_info['value']}': {value_info['count']} occurrences\n"
                else:
                    stats_text += f"  {stat_name}: {stat_value}\n"
            stats_text += "-" * 30 + "\n"
        
        # Generate descriptions using the formatted stats
        descriptions = generate_column_descriptions(table_name, stats_text)

        # Convert descriptions to JSON if it's not already a dict
        if isinstance(descriptions, str):
            descriptions = json.loads(descriptions)

        return table_name, descriptions, None
        
    except Exception as e:
        return table_name, None, str(e)

def process_tables_parallel(tables_to_process, stats_dict, descriptions_dict=None, max_workers=5):
    """Process multiple tables in parallel using ThreadPoolExecutor."""
    if descriptions_dict is None:
        descriptions_dict = {}
    
    # Filter out tables that already have descriptions
    tables_to_process_filtered = [
        table for table in tables_to_process 
        if table not in descriptions_dict
    ]
    
    skipped_tables = len(tables_to_process) - len(tables_to_process_filtered)
    
    successful_updates = 0
    failed_updates = 0
    
    print(f"{'='*60}")
    print(f"Column Description Generation Started (Parallel)")
    print(f"{'='*60}")
    print(f"Total tables requested: {len(tables_to_process)}")
    print(f"Tables already processed (skipped): {skipped_tables}")
    print(f"Tables to process: {len(tables_to_process_filtered)}")
    print(f"Max workers: {max_workers}")
    print(f"Started at: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
    print(f"{'='*60}")
    
    if skipped_tables > 0:
        print(f"ℹ️ Skipping {skipped_tables} tables that already have descriptions")
    
    if not tables_to_process_filtered:
        print("ℹ️ No new tables to process - all tables already have descriptions")
        return descriptions_dict
    
    start_time = time.time()
    
    with ThreadPoolExecutor(max_workers=max_workers) as executor:
        # Submit all tasks
        future_to_table = {
            executor.submit(generate_descriptions_for_table, table_name, stats_dict[table_name]): table_name
            for table_name in tables_to_process_filtered
        }
        
        # Process completed tasks
        for i, future in enumerate(future_to_table, 1):
            table_name = future_to_table[future]
            try:
                table_name_result, descriptions, error = future.result()
                
                if error:
                    failed_updates += 1
                    print(f"[{i}/{len(tables_to_process_filtered)}] ❌ Error processing {table_name}: {error}")
                else:
                    descriptions_dict[table_name_result] = descriptions
                    successful_updates += 1
                    print(f"[{i}/{len(tables_to_process_filtered)}] ✅ Successfully processed {table_name} - {len(descriptions)} columns")
                    
            except Exception as e:
                failed_updates += 1
                print(f"[{i}/{len(tables_to_process_filtered)}] ❌ Unexpected error processing {table_name}: {str(e)}")
    
    # Final statistics
    total_elapsed = time.time() - start_time
    print(f"\n{'='*60}")
    print(f"Column Description Generation Summary:")
    print(f"{'='*60}")
    print(f"Total tables requested: {len(tables_to_process)}")
    print(f"Tables skipped (already processed): {skipped_tables}")
    print(f"Tables processed in this run: {len(tables_to_process_filtered)}")
    print(f"Successful generations: {successful_updates}")
    print(f"Failed generations: {failed_updates}")
    if tables_to_process_filtered:
        print(f"Success rate: {(successful_updates/len(tables_to_process_filtered)*100):.1f}%")
        print(f"Total elapsed time: {total_elapsed:.2f}s")
        print(f"Average time per table: {(total_elapsed/len(tables_to_process_filtered)):.2f}s")
    print(f"Total tables in descriptions_dict: {len(descriptions_dict)}")
    print(f"Completed at: {datetime.now().strftime('%Y-%m-%d %H:%M:%S')}")
    
    return descriptions_dict

# Process tables in parallel (limit to first table for testing)
tables_to_process = views  # Remove [:1] to process all tables
descriptions_dict = process_tables_parallel(tables_to_process, stats, descriptions_dict, max_workers=4)

Column Description Generation Started (Parallel)
Total tables requested: 92
Tables already processed (skipped): 20
Tables to process: 72
Max workers: 4
Started at: 2025-09-01 23:16:11
ℹ️ Skipping 20 tables that already have descriptions
[1/72] ✅ Successfully processed amazon_ads_thrive.negative_targeting_expression_view - 6 columns
[2/72] ✅ Successfully processed amazon_ads_thrive.negative_targeting_resolved_expression_view - 6 columns
[3/72] ✅ Successfully processed amazon_ads_thrive.negative_targeting_serving_status_detail_view - 5 columns
[4/72] ✅ Successfully processed amazon_ads_thrive.portfolio_history_view - 14 columns
[5/72] ✅ Successfully processed amazon_ads_thrive.product_ad_history_view - 11 columns
[6/72] ✅ Successfully processed amazon_ads_thrive.product_ad_serving_status_detail_view - 5 columns
[7/72] ✅ Successfully processed amazon_ads_thrive.profile_view - 13 columns
[8/72] ✅ Successfully processed amazon_ads_thrive.purchased_product_keyword_report_view - 39 columns
[9

In [135]:
len(descriptions_dict.keys())

92

In [136]:
# Save descriptions to file
import os
import json
from datetime import datetime

def save_descriptions_to_file(descriptions_dict, filename="table_descriptions.json"):
    """Save table descriptions to a JSON file, updating existing descriptions without deleting others."""
    
    # Check if file exists and load existing descriptions
    existing_descriptions = {}
    if os.path.exists(filename):
        try:
            with open(filename, 'r', encoding='utf-8') as f:
                existing_descriptions = json.load(f)
            print(f"  ✓ Loaded existing descriptions from {filename}")
        except Exception as e:
            print(f"  ⚠️ Could not load existing file {filename}: {str(e)}")
            print(f"  ℹ️ Will create new file")
    
    # Update existing descriptions with new ones
    existing_descriptions.update(descriptions_dict)
    
    # Add metadata
    metadata = {
        "_metadata": {
            "last_updated": datetime.now().isoformat(),
            "total_tables": len(existing_descriptions) - 1,  # -1 for metadata
            "updated_tables": list(descriptions_dict.keys()),
            "generator_version": "1.0"
        }
    }
    existing_descriptions.update(metadata)
    
    # Save to file
    try:
        with open(filename, 'w', encoding='utf-8') as f:
            json.dump(existing_descriptions, f, indent=2, ensure_ascii=False)
        
        print(f"  ✅ Successfully saved descriptions to {filename}")
        print(f"  ℹ️ File contains {len(existing_descriptions) - 1} tables total")
        print(f"  ℹ️ Updated {len(descriptions_dict)} tables in this run")
        return True
        
    except Exception as e:
        print(f"  ❌ Error saving descriptions to file: {str(e)}")
        return False

# Save the generated descriptions
print(f"\n{'='*60}")
print(f"Saving Descriptions to File:")
print(f"{'='*60}")

save_success = save_descriptions_to_file(descriptions_dict)

if save_success:
    print(f"  📁 Descriptions saved successfully")
else:
    print(f"  ❌ Failed to save descriptions")



Saving Descriptions to File:
  ✓ Loaded existing descriptions from table_descriptions.json
  ✅ Successfully saved descriptions to table_descriptions.json
  ℹ️ File contains 109 tables total
  ℹ️ Updated 92 tables in this run
  📁 Descriptions saved successfully


In [137]:
def update_table_column_descriptions(table_name, descriptions_dict, conn):
    """Update column descriptions for a view in the database."""
    if table_name not in descriptions_dict:
        print(f"No descriptions found for table {table_name}")
        return False
    
    descriptions = descriptions_dict[table_name]
    updated_count = 0
    
    try:
        with conn.cursor() as cursor:
            for column_name, description in descriptions.items():
                # Escape single quotes in the description
                escaped_description = description.replace("'", "''")
                
                # Update the column comment on the view
                sql = f"""
                COMMENT ON COLUMN {table_name}.{column_name} IS '{escaped_description}';
                """
                
                try:
                    cursor.execute(sql)
                    updated_count += 1
                    print(f"    ✓ Updated description for {table_name}.{column_name}")
                except Exception as e:
                    print(f"    ❌ Failed to update {table_name}.{column_name}: {str(e)}")
            
            # Commit all changes for this view
            conn.commit()
            print(f"  ✅ Successfully updated {updated_count}/{len(descriptions)} column descriptions for {table_name}")
            return True
            
    except Exception as e:
        print(f"  ❌ Error updating descriptions for {table_name}: {str(e)}")
        conn.rollback()
        return False

In [138]:


conn = connect(params)

total_tables = len(views)
for i, full_table_name in list(enumerate(views, 1)):
    # Use schema.table_name format for full table name
    print(f"[{i}/{total_tables}] Updating descriptions for {full_table_name}")
        
    # Get column count for progress tracking
    if full_table_name in descriptions_dict:
        column_count = len(descriptions_dict[full_table_name])
        print(f"  - Processing {column_count} columns...")
                
        # Create a modified descriptions dict with full table name
        full_descriptions_dict = {full_table_name: descriptions_dict[full_table_name]}
        update_table_column_descriptions(full_table_name, full_descriptions_dict, conn)
        
        print(f"  Progress: {i}/{total_tables} tables completed ({(i/total_tables)*100:.1f}%)")
    else:
        print(f"  - No descriptions found for {full_table_name}, skipping...")
    print()

print(f"🎉 All {total_tables} tables processed successfully!")


[1/92] Updating descriptions for amazon_ads_thrive.ad_group_history_view
  - Processing 9 columns...
    ✓ Updated description for amazon_ads_thrive.ad_group_history_view.id
    ✓ Updated description for amazon_ads_thrive.ad_group_history_view.last_updated_date
    ✓ Updated description for amazon_ads_thrive.ad_group_history_view.serving_status
    ✓ Updated description for amazon_ads_thrive.ad_group_history_view.creation_date
    ✓ Updated description for amazon_ads_thrive.ad_group_history_view.campaign_id
    ✓ Updated description for amazon_ads_thrive.ad_group_history_view.name
    ✓ Updated description for amazon_ads_thrive.ad_group_history_view.default_bid
    ✓ Updated description for amazon_ads_thrive.ad_group_history_view.state
    ✓ Updated description for amazon_ads_thrive.ad_group_history_view._fivetran_synced
  ✅ Successfully updated 9/9 column descriptions for amazon_ads_thrive.ad_group_history_view
  Progress: 1/92 tables completed (1.1%)

[2/92] Updating descriptions for

In [27]:
# Check current user privileges
try:
    with conn.cursor() as cursor:
        cursor.execute("""
            SELECT rolname, rolsuper
            FROM pg_roles
            WHERE rolname = current_user;
        """)
        result = cursor.fetchone()
        if result:
            username, is_superuser = result
            print(f"Current user: {username}")
            print(f"Is superuser: {is_superuser}")
        else:
            print("Could not retrieve current user information")
except Exception as e:
    print(f"Error checking user privileges: {str(e)}")


Current user: postgres
Is superuser: False


In [131]:
def get_view_schema_info(conn, schema_name, view_name):
    """
    Get formatted schema information for a specific view including comments.
    
    Args:
        conn: Database connection object
        schema_name: Name of the schema
        view_name: Name of the view
        
    Returns:
        str: Formatted schema information including view definition and column details
    """
    try:
        cursor = conn.cursor()
        
        # Check if view exists
        cursor.execute("""
            SELECT table_name
            FROM information_schema.views
            WHERE table_schema = %s AND table_name = %s;
        """, (schema_name, view_name))
        
        if not cursor.fetchone():
            return f"View '{view_name}' not found in schema '{schema_name}'"
        
        output = []
        output.append(f"View: {schema_name}.{view_name}")
        output.append("=" * 50)
        
        # Get view definition and comment
        cursor.execute("""
            SELECT 
                obj_description(c.oid, 'pg_class') as comment,
                pg_get_viewdef(c.oid, true) as definition
            FROM pg_class c
            JOIN pg_namespace n ON n.oid = c.relnamespace
            WHERE c.relname = %s 
            AND n.nspname = %s 
            AND c.relkind = 'v';
        """, (view_name, schema_name))
        
        view_info = cursor.fetchone()
        if view_info:
            comment, definition = view_info
            if comment:
                output.append(f"Comment: {comment}")
            else:
                output.append("Comment: No comment available")
            
            output.append("\nDefinition:")
            # Truncate very long definitions for readability
            if len(definition) > 1000:
                output.append(definition[:1000] + "...")
            else:
                output.append(definition)
        
        # Get column information with comments
        cursor.execute("""
            SELECT 
                column_name,
                data_type,
                character_maximum_length,
                is_nullable,
                column_default,
                col_description(pgc.oid, a.attnum) as column_comment
            FROM information_schema.columns c
            LEFT JOIN pg_class pgc ON pgc.relname = c.table_name
            LEFT JOIN pg_namespace n ON n.oid = pgc.relnamespace AND n.nspname = c.table_schema
            LEFT JOIN pg_attribute a ON a.attrelid = pgc.oid AND a.attname = c.column_name
            WHERE c.table_name = %s 
            AND c.table_schema = %s
            ORDER BY c.ordinal_position;
        """, (view_name, schema_name))
        
        columns = cursor.fetchall()
        if columns:
            output.append("\nColumns:")
            for col in columns:
                col_name, data_type, max_length, nullable, default, col_comment = col
                
                # Format data type with length if applicable
                type_str = data_type
                if max_length and data_type in ['character varying', 'varchar', 'char']:
                    type_str += f"({max_length})"
                
                output.append(f"  - {col_name} ({type_str})")
                
                if col_comment:
                    output.append(f"    Comment: {col_comment}")
                if default:
                    output.append(f"    Default: {default}")
                if nullable == 'NO':
                    output.append(f"    NOT NULL")
        
        return "\n".join(output)
        
    except Exception as e:
        return f"Error retrieving schema information for view '{view_name}': {str(e)}"

# Example usage:
# Print schema info for a specific view
view_schema_info = get_view_schema_info(conn, "amazon_ads_thrive", "ad_group_history_view")
print(view_schema_info)




View: amazon_ads_thrive.ad_group_history_view
Comment: No comment available

Definition:
 SELECT ad_group_history.id,
    ad_group_history.last_updated_date,
    ad_group_history.serving_status,
    ad_group_history.creation_date,
    ad_group_history.campaign_id,
    ad_group_history.name,
    ad_group_history.default_bid,
    ad_group_history.state,
    ad_group_history._fivetran_synced
   FROM amazon_ads_thrive.ad_group_history;

Columns:
  - id (character varying(256))
    Comment: Amazon Advertising ad group identifier (numeric-looking string). Unique per ad group in this table; treat as text when joining. Use as the key to join to ad-group-level metrics or targeting tables and to filter specific ad groups. Example values: '101180574487644', '10228173083157', '102237366048889'.
  - last_updated_date (timestamp with time zone)
    Comment: UTC timestamp when this ad group’s settings or status were last updated in Amazon Ads. Useful for incremental loads, change tracking, and filter

In [48]:
# Run the query from file_context_0 to get column information
cursor = conn.cursor()
cursor.execute("""
    SELECT 
        column_name,
        data_type,
        character_maximum_length,
        is_nullable,
        column_default,
        col_description(pgc.oid, a.attnum) as column_comment
    FROM information_schema.columns c
    LEFT JOIN pg_class pgc ON pgc.relname = c.table_name
    LEFT JOIN pg_namespace n ON n.oid = pgc.relnamespace AND n.nspname = c.table_schema
    LEFT JOIN pg_attribute a ON a.attrelid = pgc.oid AND a.attname = c.column_name
    WHERE c.table_name = 'search_terms_report_daily_view' 
    AND c.table_schema = 'sp_api_thrive_2'
    ORDER BY c.ordinal_position;
""")

results = cursor.fetchall()
for row in results:
    column_name, data_type, max_length, nullable, default, column_comment = row
    print(f"Column: {column_name}, Type: {data_type}, Comment: {column_comment}")


Column: clicked_asin, Type: character varying, Comment: Amazon Standard Identification Number (ASIN) of the product that received clicks for this search term on the given date and marketplace. Exactly 10 characters. Join key to product/ASIN dimensions; filter to a specific product; use with click_share_rank to find the top clicked item per term. Examples: 'B0007QCQGI', 'B0CFGYFCYL', 'B09MQWWP87'.
Column: department_name, Type: character varying, Comment: Amazon retail site (domain/locale) the data belongs to; pairs with marketplace_id. Useful for filtering or grouping by country/site. Examples: 'Amazon.com' (United States), 'Amazon.com.br' (Brazil).
Column: end_date, Type: date, Comment: End date (UTC) of the reporting period for this row. For daily data it equals start_date. Use to filter date ranges or join to a date/calendar dimension. Examples: '2025-08-06', '2025-08-08', '2025-08-12'.
Column: marketplace_id, Type: character varying, Comment: Amazon Marketplace identifier for the c