In [1]:
import os
import sqlalchemy
from sqlalchemy import create_engine
from sqlalchemy.orm import declarative_base
from sqlalchemy import Column, Integer, String, DateTime, func, cast, text, select
from sqlalchemy.dialects import postgresql
from sqlalchemy.dialects.postgresql import JSONB
from sqlalchemy.orm import Session, sessionmaker

In [2]:
def render_query(query):
    """
    Takes a SQLAlchemy query object and returns the fully rendered SQL query
    with all the arguments (bind parameters) inserted.
    
    Args:
        query: SQLAlchemy query object to render.
        
    Returns:
        str: Fully rendered SQL query as a string.
    """
    return str(query.compile(dialect=postgresql.dialect(), compile_kwargs={"literal_binds": True}))

## Create a Normal Table

### Create a db Engine

In [3]:
database_url = os.environ.get("DATABASE_URL")
engine = create_engine(
    database_url, 
    # echo=True,
)
engine

Engine(postgresql+psycopg://admin:***@db:5432/db)

### Define a Table

In [4]:
from sqlalchemy.orm import declarative_base
Base = declarative_base()

class Listing(Base):
    __tablename__ = 'listings'
    
    id = Column(Integer, primary_key=True)
    name = Column(String, unique=True)
    created_at = Column(DateTime, default=func.now())
    updated_at = Column(DateTime, default=func.now(), onupdate=func.now())
    
    def __repr__(self):
        return f"<Listing(id='{self.id}' name='{self.name}')>"


Listing.__table__.drop(engine, checkfirst=True)
Base.metadata.create_all(engine)

### Add Some Records

In [5]:

with Session(engine) as session:
    listing = Listing(name="home")
    listing2 = Listing(name="away")
    session.add_all([
        listing, listing2
    ])
    session.commit()

### Show the db Records

In [6]:
%load_ext sql

In [7]:
%%sql $DB_URL
select * from listings;

2 rows affected.


id,name,created_at,updated_at
1,home,2024-10-03 05:46:32.034380,2024-10-03 05:46:32.034380
2,away,2024-10-03 05:46:32.034380,2024-10-03 05:46:32.034380


## Create Table with JSONB

### Define a Table

In [8]:
Base = declarative_base()

class Listing(Base):
    __tablename__ = 'listings'
    
    id = Column(Integer, primary_key=True)
    name = Column(String, unique=True)
    rooms = Column(JSONB)
    created_at = Column(DateTime, default=func.now())
    updated_at = Column(DateTime, default=func.now(), onupdate=func.now())
    
    def __repr__(self):
        return f"<Listing(id='{self.id}' name='{self.name}')>"


Listing.__table__.drop(engine, checkfirst=True)
Base.metadata.create_all(engine)

### Add Some Records

In [9]:
from sqlalchemy.orm import Session
with Session(engine) as session:
    session.add_all([
        Listing(name="home", rooms={
            "kitchen": {"color": "blue"},
        }), 
        Listing(name="away", rooms={
            "bedroom": {"color": "yellow"},
            "kitchen": {"color": "yellow"},
        }), 
        Listing(name="far", rooms={
            "bedroom": {"color": "blue"},
            "basement": {"color": "green"},
        }), 
        Listing(name="city", rooms={
            "basement": {"color": "green"},
            "bedroom": {"color": "green"},
        }),
        Listing(name="country", rooms={
            "basement": {"color": "green"},
        }),
    ])
    session.commit()

### Show the db Records

In [10]:
%%sql $DB_URL
select * from listings;

5 rows affected.


id,name,rooms,created_at,updated_at
1,home,{'kitchen': {'color': 'blue'}},2024-10-03 05:46:33.888458,2024-10-03 05:46:33.888458
2,away,"{'bedroom': {'color': 'yellow'}, 'kitchen': {'color': 'yellow'}}",2024-10-03 05:46:33.888458,2024-10-03 05:46:33.888458
3,far,"{'bedroom': {'color': 'blue'}, 'basement': {'color': 'green'}}",2024-10-03 05:46:33.888458,2024-10-03 05:46:33.888458
4,city,"{'bedroom': {'color': 'green'}, 'basement': {'color': 'green'}}",2024-10-03 05:46:33.888458,2024-10-03 05:46:33.888458
5,country,{'basement': {'color': 'green'}},2024-10-03 05:46:33.888458,2024-10-03 05:46:33.888458


### Filter `bedroom.color == yellow`

In [11]:
query = select(Listing).filter(
    Listing.rooms['bedroom']["color"].astext == "yellow",
)
print("QUERY::: ", render_query(query))
from sqlalchemy.orm import Session
with Session(engine) as session:
    res = session.scalars(query)
    print("RESULT::: ", list(res))

QUERY:::  SELECT listings.id, listings.name, listings.rooms, listings.created_at, listings.updated_at 
FROM listings 
WHERE (((listings.rooms -> 'bedroom')) ->> 'color') = 'yellow'
RESULT:::  [<Listing(id='2' name='away')>]


### Filter `bedroom.color == yellow`

In [12]:
query = select(Listing).filter(
    Listing.rooms['bedroom'] == {"color": "yellow"},
)
print("QUERY::: ", str(query))
from sqlalchemy.orm import Session
with Session(engine) as session:
    res = session.scalars(query)
    print("RESULT::: ", list(res))

QUERY:::  SELECT listings.id, listings.name, listings.rooms, listings.created_at, listings.updated_at 
FROM listings 
WHERE listings.rooms[:rooms_1] = :param_1
RESULT:::  [<Listing(id='2' name='away')>]


### Filter `bedroom is Not Null`

In [13]:
query = select(Listing).filter(
    func.jsonb_extract_path_text(Listing.rooms, 'bedroom').isnot(None)
)
print("QUERY::: ", render_query(query))
from sqlalchemy.orm import Session
with Session(engine) as session:
    res = session.scalars(query)
    print("RESULT::: ", list(res))

QUERY:::  SELECT listings.id, listings.name, listings.rooms, listings.created_at, listings.updated_at 
FROM listings 
WHERE jsonb_extract_path_text(listings.rooms, 'bedroom') IS NOT NULL
RESULT:::  [<Listing(id='2' name='away')>, <Listing(id='3' name='far')>, <Listing(id='4' name='city')>]


### Filter `kitchen and bathroom has a color`

In [14]:
query = select(Listing).filter(Listing.rooms.op('->')('kitchen').op('?')('color'))
print("QUERY::: ", render_query(query))
from sqlalchemy.orm import Session
with Session(engine) as session:
    res = session.scalars(query)
    print("RESULT::: ", list(res))

QUERY:::  SELECT listings.id, listings.name, listings.rooms, listings.created_at, listings.updated_at 
FROM listings 
WHERE (listings.rooms -> 'kitchen') ? 'color'
RESULT:::  [<Listing(id='1' name='home')>, <Listing(id='2' name='away')>]


In [15]:
query = select(Listing).filter(Listing.rooms.op('->')('bathroom').op('?')('color'))
print("QUERY::: ", render_query(query))
from sqlalchemy.orm import Session
with Session(engine) as session:
    res = session.scalars(query)
    print("RESULT::: ", list(res))

QUERY:::  SELECT listings.id, listings.name, listings.rooms, listings.created_at, listings.updated_at 
FROM listings 
WHERE (listings.rooms -> 'bathroom') ? 'color'
RESULT:::  []


### Find all rooms with a green color

In [16]:
query = select(Listing).where(
    func.jsonb_path_exists(Listing.rooms, text("""'$.* ? (@.color == "green")'::jsonpath""")))
print("QUERY::: ", render_query(query))
with Session(engine) as session:
    res = session.scalars(query)
    print("RESULT::: ", list(res))

QUERY:::  SELECT listings.id, listings.name, listings.rooms, listings.created_at, listings.updated_at 
FROM listings 
WHERE jsonb_path_exists(listings.rooms, '$.* ? (@.color == "green")'::jsonpath)
RESULT:::  [<Listing(id='3' name='far')>, <Listing(id='4' name='city')>, <Listing(id='5' name='country')>]


In [17]:
# https://www.postgresql.org/docs/9.6/functions-json.html

In [18]:
# https://github.com/json-path/JsonPath

In [19]:
from sqlalchemy import func, select
from sqlalchemy.orm import Session
from contextlib import contextmanager
from dataclasses import dataclass

@dataclass
class Lock:
    locked: bool
    lock_key: int


@contextmanager
def advisory_lock(session: Session, lock_value: str, commit: bool = True):
    """
    Context manager that acquires a PostgreSQL advisory lock using a hash of the lock_value
    upon entering and releases it upon exiting, using SQLAlchemy's syntax.
    
    Args:
        session (Session): SQLAlchemy session to interact with the database.
        lock_value (str): The value (string) to hash and generate the lock key.
    
    Usage:
        with advisory_lock(session, "some_unique_value"):
            # Critical section where the lock is held
    """
    try:
        # Acquire the advisory lock using hashtext and pg_advisory_xact_lock in a single call
        lock_key_query = select(func.hashtext(lock_value))
        lock_key = session.execute(lock_key_query).scalar()
        
        lock = session.execute(
            select(func.pg_try_advisory_xact_lock(lock_key))
        ).first()[0]
        # Yield control back to the caller inside the context
        yield Lock(locked=not lock, lock_key=lock_key)
    finally:
        # The lock is automatically released when the transaction ends (no explicit release needed)
        if commit:
            session.commit()

In [22]:
from sqlalchemy.orm import sessionmaker
import time

Session = sessionmaker(autocommit=False, autoflush=False, bind=engine)

lock_value = "some_unique_string_value"  # This string will be hashed to generate the lock key

with Session() as session:
    with advisory_lock(session, lock_value, commit=True) as lock:
        if lock.locked:
            print("Skip because locked")
        else:
            # Critical section where the advisory lock is held
            # Perform operations that need the lock
            session.execute(text('select 1'))
            time.sleep(100)
            print(lock.locked, lock.lock_key)


Skip because locked
