In [19]:
import json
from sqlalchemy import create_engine, inspect, text
from sqlalchemy.exc import OperationalError
from getpass import getpass

def create_db_engine(username=None, password=None):
    if username and password:
        return create_engine(f'postgresql://{username}:{password}@localhost:5432/postgres')
    elif username:
        return create_engine(f'postgresql://{username}@localhost:5432/postgres')
    else:
        return create_engine('postgresql://postgres@localhost:5432/postgres')

def attempt_connection():
    attempts = [
        (None, None),  # Try default connection
        ('postgres', None),  # Try with 'postgres' user, no password
    ]
    
    for username, password in attempts:
        try:
            engine = create_db_engine(username, password)
            with engine.connect():
                print(f"Successfully connected to PostgreSQL with user: {username or 'default'}")
                return engine
        except OperationalError:
            pass
    
    # If all attempts fail, prompt for credentials
    print("Automatic connection failed. Please enter your PostgreSQL credentials.")
    username = input("Username: ")
    password = getpass("Password: ")
    try:
        engine = create_db_engine(username, password)
        with engine.connect():
            print(f"Successfully connected to PostgreSQL with user: {username}")
            return engine
    except OperationalError as e:
        print(f"Failed to connect to PostgreSQL: {str(e)}")
        print("Please ensure PostgreSQL is running and accessible, and that your credentials are correct.")
        return None

def list_schemas(engine):
    inspector = inspect(engine)
    schemas = inspector.get_schema_names()
    return [schema for schema in schemas if not schema.startswith('pg_') and schema != 'information_schema']

def get_databases(engine):
    with engine.connect() as connection:
        result = connection.execute(text("SELECT datname FROM pg_database WHERE datistemplate = false;"))
        return [row[0] for row in result]

def get_schema_metadata(engine, schema_name):
    inspector = inspect(engine)
    metadata = {}
    
    # Get tables
    tables = inspector.get_table_names(schema=schema_name)
    metadata['tables'] = tables
    
    # Get primary keys
    metadata['primary_keys'] = {}
    for table in tables:
        pk_columns = inspector.get_pk_constraint(table, schema=schema_name)
        metadata['primary_keys'][table] = pk_columns['constrained_columns']
    
    # Get foreign keys
    metadata['foreign_keys'] = {}
    for table in tables:
        fk_constraints = inspector.get_foreign_keys(table, schema=schema_name)
        metadata['foreign_keys'][table] = [fk['constrained_columns'] for fk in fk_constraints]
    
    return metadata


In [20]:
# Attempt to connect
engine = attempt_connection()

if engine:
    # List databases
    databases = get_databases(engine)
    print("Available databases:", databases)
    
    # List schemas in the current database
    schemas = list_schemas(engine)
    print("Available schemas in the current database:", schemas)
else:
    print("Unable to connect to the database. Please check your PostgreSQL setup and try again.")


Automatic connection failed. Please enter your PostgreSQL credentials.
Successfully connected to PostgreSQL with user: postgres
Available databases: ['postgres']
Available schemas in the current database: ['public', 'star_schema']


In [21]:
# Prompt for schema name
schema_name = input("Enter the schema name to get metadata: ")

if engine:
    if schema_name in schemas:
        # Get schema metadata
        metadata = get_schema_metadata(engine, schema_name)
        print("Schema Metadata:", json.dumps(metadata, indent=4))
    else:
        print(f"Schema '{schema_name}' does not exist in the current database.")
else:
    print("Engine not available. Please ensure you have connected to the database.")


Schema Metadata: {
    "tables": [
        "dim_date",
        "dim_date2",
        "dim_date_2",
        "fact_sales_2",
        "dim_product_2",
        "dim_customer_2"
    ],
    "primary_keys": {
        "dim_date": [
            "date_key"
        ],
        "dim_date2": [
            "date_key"
        ],
        "dim_date_2": [
            "date_key"
        ],
        "fact_sales_2": [
            "sale_key"
        ],
        "dim_product_2": [
            "product_key"
        ],
        "dim_customer_2": [
            "customer_key"
        ]
    },
    "foreign_keys": {
        "dim_date": [],
        "dim_date2": [],
        "dim_date_2": [],
        "fact_sales_2": [
            [
                "customer_key"
            ],
            [
                "date_key"
            ],
            [
                "product_key"
            ]
        ],
        "dim_product_2": [],
        "dim_customer_2": []
    }
}


In [34]:
# Make sure you have an engine and schema_name defined
if engine:
    schema_name = input("Enter the schema name to get metadata: ")

    if schema_name in schemas:
        # Get schema metadata
        metadata = get_schema_metadata(engine, schema_name)
        
        # Extract metadata and foreign keys
        table_metadata = {}
        foreign_keys = {}
        
        for table in metadata['tables']:
            table_metadata[table] = {
                'primary_keys': metadata['primary_keys'].get(table, []),
                'foreign_keys': metadata['foreign_keys'].get(table, [])
            }
            foreign_keys[table] = metadata['foreign_keys'].get(table, [])
        
        # Store metadata and keys
        store_metadata_and_keys(engine, schema_name, table_metadata, foreign_keys)
    else:
        print(f"Schema '{schema_name}' does not exist in the current database.")
else:
    print("Engine not available. Please ensure you have connected to the database.")


Metadata and foreign keys successfully stored.


In [None]:
# from sqlalchemy import Table, Column, Integer, String, JSON, ForeignKey
# from sqlalchemy.orm import sessionmaker
# from sqlalchemy.ext.declarative import declarative_base

# Base = declarative_base()

# # Define tables for storing schema metadata and foreign keys
# class SchemaMetadata(Base):
#     __tablename__ = 'schema_metadata'
#     id = Column(Integer, primary_key=True, autoincrement=True)
#     schema_name = Column(String, nullable=False)
#     table_name = Column(String, nullable=False)
#     table_metadata = Column(JSON, nullable=False)  # Store table metadata as JSON

# class ForeignKeyRelations(Base):
#     __tablename__ = 'foreign_key_relations'
#     id = Column(Integer, primary_key=True, autoincrement=True)
#     schema_name = Column(String, nullable=False)
#     table_name = Column(String, nullable=False)
#     fk_constraints = Column(JSON, nullable=False)  # Store foreign key constraints as JSON

# # Function to store schema metadata and foreign key constraints
# def store_metadata_and_keys(engine, schema_name, metadata, fk_constraints):
#     # Create tables if they do not exist
#     Base.metadata.create_all(engine)
    
#     # Create a session
#     Session = sessionmaker(bind=engine)
#     session = Session()
    
#     try:
#         # Insert table metadata
#         for table_name, meta in metadata.items():
#             meta_entry = SchemaMetadata(
#                 schema_name=schema_name,
#                 table_name=table_name,
#                 table_metadata=json.dumps(meta)  # Convert metadata to JSON
#             )
#             session.add(meta_entry)
        
#         # Insert foreign key constraints
#         for table_name, fks in fk_constraints.items():
#             fk_entry = ForeignKeyRelations(
#                 schema_name=schema_name,
#                 table_name=table_name,
#                 fk_constraints=json.dumps(fks)  # Convert foreign key constraints to JSON
#             )
#             session.add(fk_entry)
        
#         # Commit the session to save the data
#         session.commit()
#         print("Metadata and foreign keys successfully stored.")
#     except Exception as e:
#         session.rollback()  # Rollback in case of error
#         print(f"Error storing metadata: {str(e)}")
#     finally:
#         session.close()
