In [None]:
# Create settings instance
from project_settings import Settings
settings = Settings()
print(f"Loaded settings: {settings}")


In [None]:
# models.py
from sqlalchemy import Column, Integer, String
from sqlalchemy.ext.declarative import declarative_base

Base = declarative_base()

class TestItem(Base):
    __tablename__ = "test_items"

    id = Column(Integer, primary_key=True, index=True)
    name = Column(String, nullable=False)

from google_session import SyncSessionLocal  # assuming this creates your engine/session

# Access the same engine used in SyncSessionLocal
# If SyncSessionLocal is a sessionmaker(engine), you can get the engine from it like:
engine = SyncSessionLocal.kw["bind"]  # Only works if it's initialized with `bind=engine`

# Or directly if you can import engine from wherever it's defined
# from google_session import engine

# Create tables
Base.metadata.create_all(bind=engine)

print("Table created successfully.")

In [5]:
# drop_table_test.py
from google_session import SyncSessionLocal  # or import engine directly if available

# Get the engine from the sessionmaker
engine = SyncSessionLocal.kw["bind"]  # Only works if SyncSessionLocal is sessionmaker(bind=engine)

# Drop the tables
Base.metadata.drop_all(bind=engine)

print("Table(s) dropped successfully.")

Table(s) dropped successfully.


In [None]:
# Get the engine from the sessionmaker
engine = SyncSessionLocal.kw["bind"]  # Only works if SyncSessionLocal is sessionmaker(bind=engine)

# Drop the tables
Base.metadata.drop_all(bind=engine)

print("Table(s) dropped successfully.")

In [None]:
from dependencies import get_db
db = get_db()


In [None]:
import os
import logging
import sqlalchemy
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker

from google.cloud.sql.connector import Connector, IPTypes
import pg8000

_engine = None

def get_db_engine() -> sqlalchemy.engine.base.Engine:
    """
    Returns a global SQLAlchemy engine instance with connection pooling.

    This creates a singleton engine that can be reused across requests,
    improving performance by avoiding repeated connection establishment.

    Returns:
        SQLAlchemy engine with connection pooling
    """
    global _engine

    if _engine is not None:
        # Return existing engine if already created
        return _engine

    # Check if we're running in GCP environment
    if os.environ.get("INSTANCE_CONNECTION_NAME"):
        # Create Cloud SQL connection using the connector
        instance_connection_name = settings.INSTANCE_CONNECTION_NAME
        db_user = settings.DB_USER
        db_pass = settings.DB_PASS
        db_name = settings.DB_NAME
        # db_user = os.environ.get("DB_USER", settings.DB_USER)
        # db_pass = os.environ.get("DB_PASS", settings.DB_PASS)
        # db_name = os.environ.get("DB_NAME", settings.DB_NAME)

        ip_type = IPTypes.PRIVATE if os.environ.get("PRIVATE_IP") else IPTypes.PUBLIC

        connector = Connector(refresh_strategy="LAZY")

        def getconn() -> pg8000.dbapi.Connection:
            conn = connector.connect(
                instance_connection_name,
                "pg8000",
                user=db_user,
                password=db_pass,
                db=db_name,
                ip_type=ip_type,
            )
            return conn

        # Configure pooling parameters for better performance
        _engine = sqlalchemy.create_engine(
            "postgresql+pg8000://",
            creator=getconn,
            # Pool settings
            pool_size=5,               # Default number of connections to maintain
            max_overflow=10,           # Allow up to 10 additional connections on high load
            pool_timeout=30,           # Wait up to 30 seconds for a connection
            pool_recycle=1800,         # Recycle connections after 30 minutes
            pool_pre_ping=True         # Check connection viability before using
        )

        logging.info("Google Cloud SQL connection pool initialized")
    else:
        raise ValueError('No instance connection provided')

    return _engine

get_db_engine()

In [None]:
with _engine.session()