# Imports

In [1]:
from uuid import UUID
from datetime import datetime

In [2]:
from pydantic import BaseModel
from sqlalchemy_utils.models import Timestamp, generic_repr
from sqlalchemy.orm import Mapped
from sqlalchemy import insert, select

In [3]:
from matter_persistence.sql.base import CustomBase, Base
from matter_persistence.sql.manager import DatabaseManager
from matter_persistence.sql.utils import get, find

# Postgres config

In [4]:
# test database settings
POSTGRES_PORT = 5432
POSTGRES_USER = "postgres"
POSTGRES_PASSWORD = "postgres"
POSTGRES_DB = "postgres"

CONNECTION_URI = f"postgresql+asyncpg://{POSTGRES_USER}:{POSTGRES_PASSWORD}@localhost:{POSTGRES_PORT}"

# DatabaseManager

In [5]:
database_manager = DatabaseManager(host=CONNECTION_URI) # make sure postgres instance is available

# Example ORM

In [6]:
# Test ORM
@generic_repr
class PersonORM(CustomBase):
    __tablename__ = 'persons'

    name: Mapped[str]

In [7]:
person = PersonORM(name="Adam")

## from pydantic type

In [8]:
# the fields between the orm and pydantic must match

In [9]:
class Person(BaseModel):
    name: str

In [10]:
PersonORM.parse_obj(Person(name="Eve"))  # __repr__ is provided by sqlalchemy_utils.models.generic_repr

PersonORM(name='Eve', id=<not loaded>, deleted=<not loaded>, created=<not loaded>, updated=<not loaded>)

# Insert test data

In [11]:
# create table in db
async with database_manager.connect() as conn:
        print("creating test table")
        await conn.run_sync(Base.metadata.drop_all)
        await conn.run_sync(Base.metadata.create_all)

creating test table


In [12]:
# insert a row into the table using Connection
async with database_manager.connect() as conn:
    await conn.execute(insert(PersonORM), [{'name': "John"}])
    await conn.commit()

In [13]:
# insert a row into the table using Session
async with database_manager.session() as session:
    session.add(PersonORM(name="Jane"))
    await session.commit()

# Utility functions

## find

In [14]:
# use find
async with database_manager.session() as session:
    print(await find(session, PersonORM))

[PersonORM(name='John', id=UUID('6194191c-9f37-4d74-9f88-a130762ef3a0'), deleted=None, created=datetime.datetime(2024, 4, 30, 7, 12, 21, 572201), updated=datetime.datetime(2024, 4, 30, 7, 12, 21, 572201)), PersonORM(name='Jane', id=UUID('4a9b31de-84a6-4bcd-adb8-feaf80b1d009'), deleted=None, created=datetime.datetime(2024, 4, 30, 7, 12, 21, 588278), updated=datetime.datetime(2024, 4, 30, 7, 12, 21, 588280))]


## get

In [15]:
# use get
async with database_manager.session() as session:
    print(await get(session, select(PersonORM), PersonORM))

PersonORM(name='John', id=UUID('6194191c-9f37-4d74-9f88-a130762ef3a0'), deleted=None, created=datetime.datetime(2024, 4, 30, 7, 12, 21, 572201), updated=datetime.datetime(2024, 4, 30, 7, 12, 21, 572201))


# Close connection pool

In [16]:
await database_manager.close()