In [None]:
#| default_exp core

# fastasyncpg

`fastasyncpg` is a simple wrapper for asyncpg. We'll explain how it works and build up the module in a "literate" nbdev style.

## Installation

On macOS, the recommended way to install PostgreSQL is via **Homebrew**. Other options include Postgres.app (a menubar app) and the EDB installer, but Homebrew integrates best with command-line workflows and makes updates simple.

To install PostgreSQL 18 (the current latest stable release):

```bash
brew install postgresql@18
```

To check if you already have PostgreSQL installed via Homebrew, run `brew list | grep postgres`. You can also check which version is in your PATH with `psql --version`.

Let's verify the installation is working:

In [None]:
!brew list | grep postgres

In [None]:
!psql --version

psql (PostgreSQL) 18.1


After installation, run `brew info postgresql@18` to see setup instructions. PostgreSQL 18 is "keg-only", meaning it's not automatically symlinked into your PATH.

You'll see something like:

```
This formula has created a default database cluster with:
  initdb --locale=en_US.UTF-8 -E UTF-8 /opt/homebrew/var/postgresql@18

When uninstalling, some dead symlinks are left behind so you may want to run:
  brew cleanup --prune-prefix

postgresql@18 is keg-only, which means it was not symlinked into /opt/homebrew,
because this is an alternate version of another formula.

If you need to have postgresql@18 first in your PATH, run:
  echo 'export PATH="/opt/homebrew/opt/postgresql@18/bin:$PATH"' >> /Users/jhoward/.bash_profile

To start postgresql@18 now and restart at login:
  brew services start postgresql@18
```

The `brew info` output (above) tells you exactly what to do:

1. **Add to PATH** (for bash): `echo 'export PATH="/opt/homebrew/opt/postgresql@18/bin:$PATH"' >> ~/.bash_profile && source ~/.bash_profile`
2. **Start the service**: `brew services start postgresql@18`

This registers PostgreSQL to start automatically at login.

To run non-interactive queries from a shell, use `-c` to pass a command directly:

In [None]:
!psql -d postgres -c "SELECT version();"

                                        version                                        
---------------------------------------------------------------------------------------
 PostgreSQL 18.1 on x86_64-pc-linux-gnu, compiled by gcc (GCC) 15.2.1 20251112, 64-bit
(1 row)



Running `brew services start` registers PostgreSQL to start automatically at login/reboot. You can verify this with `brew services list`, which shows all Homebrew-managed services and their status.

In [None]:
!brew services list

To control auto-start behavior:
- `brew services stop postgresql@18` — stop and disable auto-start
- `brew services start postgresql@18` — start and enable auto-start  
- `brew services run postgresql@18` — run once without enabling auto-start

On Ubuntu, the standard way to get the latest PostgreSQL is through the **official PostgreSQL APT repository** (PGDG), since Ubuntu's default repos often have older versions.

## Connecting with Python

The most popular Python libraries for PostgreSQL are **psycopg2/psycopg3** (synchronous) and **asyncpg** (async). For async work, asyncpg is about 5x faster than psycopg3 and is the recommended choice.

We'll use **asyncpg** for this wrapper — it's the fastest Python PostgreSQL library for async code.

In [None]:
#| export
from fastcore.utils import *
import asyncpg
from asyncpg import connection,protocol

In [None]:
import os

In [None]:
user = os.environ['USER']; user

'natedawg'

asyncpg uses `await` for all database operations. The `connect` function returns a connection object, and `fetchval` is a convenience method that returns a single value from the first row:

In [None]:
conn = await asyncpg.connect(user=user, database='postgres', host='127.0.0.1')
await conn.fetchval('SELECT version()')

'PostgreSQL 18.1 on x86_64-pc-linux-gnu, compiled by gcc (GCC) 15.2.1 20251112, 64-bit'

Let's create a simple test table to explore basic operations:

In [None]:
await conn.execute('''DROP TABLE IF EXISTS users''')
await conn.execute('''CREATE TABLE users ( id SERIAL PRIMARY KEY, name TEXT NOT NULL, age INTEGER )''')

'CREATE TABLE'

Great! Now let's insert some data:

In [None]:
await conn.execute("INSERT INTO users (name, age) VALUES ($1, $2)", 'Alice', 30)

'INSERT 0 1'

PostgreSQL uses `$1`, `$2`, etc. for parameterized queries, not `?` like SQLite. This syntax allows you to reference the same parameter multiple times and makes the order explicit.

`fetch` returns a list of `Record` objects. Each record supports dict-like access by column name or index:

In [None]:
rs = await conn.fetch("SELECT * FROM users")
rs

[<Record id=1 name='Alice' age=30>]

In [None]:
r = rs[0]
type(rs),type(r)

(list, asyncpg.protocol.record.Record)

`asyncpg.Record` objects use dict-like access (`r['name']` or `r[0]`), not attribute access. You can use `dict2obj` if you want the latter.

In [None]:
ro = dict2obj(dict(r))
ro.name

'Alice'

Unlike psycopg2, asyncpg doesn't use traditional cursors. Instead, use `async for record in conn.cursor(...)` to iterate over results. However, cursors in asyncpg require an explicit transaction:

In [None]:
# raises "NoActiveSQLTransactionError: cursor cannot be created outside of a transaction"
# async for record in conn.cursor("SELECT * FROM users"): print(record)

In [None]:
async with conn.transaction():
    async for record in conn.cursor("SELECT * FROM users"): print(record)

<Record id=1 name='Alice' age=30>


By default, asyncpg operates in **auto-commit mode** — changes are applied immediately when not in an explicit transaction block. Regular queries (`execute`, `fetch`, etc.) don't need transactions, but cursors do. This is a direct reflection of how PostgreSQL itself handles "portals" (the underlying mechanism for cursors).

In PostgreSQL, a portal is generally only valid for the duration of a transaction. If a transaction isn't explicitly started, PostgreSQL runs each command in its own "one-shot" transaction. For a cursor to stay open so you can fetch multiple batches of rows, the transaction it belongs to must remain open.

Other libraries (like `psycopg2`) often hide this by starting a transaction for you automatically when you create a cursor, whereas `asyncpg` chooses to be more "explicit" about the underlying database state.

## Chinook

For testing with a real dataset, we'll use the **Chinook** sample database, which has tables for artists, albums, tracks, etc. The PostgreSQL version is available on GitHub:

```bash
curl -L -O https://github.com/lerocha/chinook-database/raw/master/ChinookDatabase/DataSources/Chinook_PostgreSql.sql
```

Now we need to create a database and run that script. First, let's create a database called `chinook`:

```bash
createdb chinook
```

Then we can load the SQL file into it:

```bash
psql -d chinook -f Chinook_PostgreSql.sql
```

Always close connections when done — this releases the database connection back to PostgreSQL:

In [None]:
await conn.close()

Now let's connect to the Chinook database to work with more realistic data:

In [None]:
conn = await asyncpg.connect(user=user, database='chinook', host='127.0.0.1')
await conn.fetchval("SELECT count(*) FROM artist")

275

## Metadata

`Results` is a simple list subclass that renders as an HTML table in notebooks. It displays all rows with column headers, making query results easy to read:

In [None]:
#| export
class Results(list):
    def _repr_html_(self):
        if not self: return ""
        ks = list(self[0].keys())
        ths = "".join(f"<th>{k}</th>" for k in ks)
        trs = "".join(f"<tr>{''.join(f'<td>{v}</td>' for v in r.values())}</tr>" for r in self)
        return f'<table class="prose"><thead><tr>{ths}</tr></thead><tbody>{trs}</tbody></table>'

`sql` is a quick helper to run SQL and return results as a list of records:

In [None]:
from IPython.core.magic import register_cell_magic

In [None]:
@register_cell_magic
async def sql(l,c): return Results(await conn.fetch(c))

In [None]:
%%sql
SELECT column_name, data_type, is_nullable
FROM information_schema.columns
WHERE table_name = 'artist' 

column_name,data_type,is_nullable
artist_id,integer,NO
name,character varying,YES


In PostgreSQL, every table belongs to a **schema** — think of it as a folder or namespace. The default schema is `public`. When you create a table without specifying a schema, it lands there. You can query schema information via `information_schema` views:

In [None]:
%%sql
SELECT column_name, data_type, is_nullable
FROM information_schema.columns
WHERE table_name = 'artist'

column_name,data_type,is_nullable
artist_id,integer,NO
name,character varying,YES


To list all tables in the `public` schema, you can query the `information_schema.tables` view:

In [None]:
await conn.fetch("SELECT table_name FROM information_schema.tables WHERE table_schema = 'public'")

[<Record table_name='artist'>,
 <Record table_name='album'>,
 <Record table_name='employee'>,
 <Record table_name='customer'>,
 <Record table_name='invoice'>,
 <Record table_name='invoice_line'>,
 <Record table_name='track'>,
 <Record table_name='playlist'>,
 <Record table_name='playlist_track'>,
 <Record table_name='genre'>,
 <Record table_name='media_type'>,
 <Record table_name='cat'>,
 <Record table_name='dog'>,
 <Record table_name='toy'>]

To customize how records behave, we need access to the underlying `Record` class:

In [None]:
#| export
from asyncpg.protocol.record import Record

`FRecord` extends asyncpg's `Record` with two conveniences: attribute access (`r.name` instead of `r['name']`) and HTML rendering for notebooks:

In [None]:
#| export
class FRecord(Record):
    def __getattr__(self, k):
        if k.startswith('_'): raise AttributeError(k)
        return self[k]

    def _repr_html_(self):
        rows = "".join(f"<tr><td>{k}</td><td>{v}</td></tr>" for k,v in self.items())
        return f'<table class="prose"><thead><tr><th>Field</th><th>Value</th></tr></thead><tbody>{rows}</tbody></table>'

We can use `record_class` to auto-wrap with `FRecord`:

In [None]:
await conn.close()

conn = await asyncpg.connect(user=user, database='chinook', host='127.0.0.1', record_class=FRecord)

`table_names` and `view_names` query PostgreSQL's system catalogs to list tables and views in a schema. We use `pg_class` and `pg_namespace` rather than `information_schema` for better performance:

In [None]:
#| export
async def table_names(conn, schema='public'):
    "List of table names in `schema`"
    res = await conn.fetch("""
        SELECT c.relname FROM pg_class c
        JOIN pg_namespace n ON n.oid = c.relnamespace
        WHERE n.nspname = $1 AND c.relkind = 'r' AND NOT c.relname LIKE 'pg_%'""", schema)
    return [r['relname'] for r in res]

async def view_names(conn, schema='public'):
    "List of view names in `schema`"
    res = await conn.fetch("""
        SELECT c.relname FROM pg_class c
        JOIN pg_namespace n ON n.oid = c.relnamespace
        WHERE n.nspname = $1 AND c.relkind = 'v'""", schema)
    return [r['relname'] for r in res]

In [None]:
print(' '.join(await table_names(conn)))

artist album employee customer invoice invoice_line track playlist playlist_track genre media_type cat dog toy


In [None]:
await view_names(conn)

[]

`columns_info` returns a dict mapping column names to their PostgreSQL data types. It queries `pg_attribute` directly for efficiency:

In [None]:
#| export
async def columns_info(conn, table, schema='public'):
    "Dict mapping column names to data types for `table`"
    res = await conn.fetch("""
        SELECT a.attname, format_type(a.atttypid, a.atttypmod) as data_type
        FROM pg_attribute a
        JOIN pg_class c ON c.oid = a.attrelid
        JOIN pg_namespace n ON n.oid = c.relnamespace
        WHERE n.nspname = $1 AND c.relname = $2 AND a.attnum > 0 AND NOT a.attisdropped
        ORDER BY a.attnum""", schema, table)
    return {r['attname']: r['data_type'] for r in res}

In [None]:
list(await columns_info(conn, 'artist'))

['artist_id', 'name']

We'll need to know each table's primary key. PostgreSQL stores this in `pg_index`. The `::regclass` cast is idiomatic PostgreSQL — it converts a table name string to its internal object ID, automatically handling schema resolution. The `indisprimary` flag identifies the primary key index.

In [None]:
#| export
async def pk_cols(conn, table):
    "Get primary key column(s) for `table`"
    res = await conn.fetch("""
        SELECT a.attname FROM pg_index i
        JOIN pg_attribute a ON a.attrelid = i.indrelid AND a.attnum = ANY(i.indkey)
        WHERE i.indrelid = $1::regclass AND i.indisprimary""", table)
    return [r['attname'] for r in res]

In [None]:
await pk_cols(conn, 'artist')

['artist_id']

`Database` wraps an asyncpg connection (or pool) and provides table/view metadata caching. It delegates unknown attributes to the underlying connection via `__getattr__`, so you can call `db.fetch(...)` directly. The `t` property returns a `_TablesGetter` for convenient table access.

In [None]:
#| export
class Database:
    def __init__(self, conn, refresh=True):
        self.conn = conn
        self._tnames,self._vnames = [],[]
        self._tables = {}
        if refresh: asyncio.create_task(self.refresh())

    @property
    def t(self):
        if not hasattr(self, '_t'): self._t = _TablesGetter(self)
        return self._t

    def __getattr__(self, k): return getattr(self.conn, k)

    def table(self, name):
        if name not in self._tables: self._tables[name] = Table(self, name)
        return self._tables[name]

    async def refresh(self):
        "Refresh all metadata"
        self._tnames,self._vnames = await table_names(self),await view_names(self)
        self._cols = {o: (await columns_info(self, o)) for o in self._tnames+self._vnames}
        self._pks = {o: (await pk_cols(self, o)) for o in self._tnames}
    
    def __str__(self):
        if isinstance(self.conn, asyncpg.pool.Pool):
            kw = self.conn._connect_kwargs
            u,h,d,p = kw.get('user','postgres'), kw.get('host','localhost'), kw.get('database','postgres'), kw.get('port',5432)
        else:
            pr,a = self.conn._params, self.conn._addr
            u,h,d,p = pr.user, a[0], pr.database, a[1]
        return f"postgresql://{u}@{h}:{p}/{d}"


`Table` represents a database table with metadata like columns and primary keys. The `xtra` method lets you set persistent row filters (useful for multi-tenancy). Tables stringify as quoted identifiers for safe SQL interpolation.

In [None]:
#| export
class Table:
    def __init__(self, db, name):
        store_attr()
        self.xtra_id = {}

    @property
    def cols(self): return self.db._cols.get(self.name, {})
    def __repr__(self): return f'Table "{self.name}"'
    def __str__(self): return f'"{self.name}"'

    def xtra(self, **kwargs):
        "Set extra constraints for queries/inserts"
        self.xtra_id = kwargs
        return self

    @property
    def pks(self): return self.db._pks.get(self.name, [])

`_Getter` is a base class for "magic accessor" objects that provide multiple ways to access items — by attribute (`dt.artist`), by index (`dt['artist']`), or by iteration (`for t in dt`). It implements `__dir__` so tab-completion works in notebooks, `__repr__` for nice display, `__contains__` for `in` checks, and both `__getattr__` and `__getitem__` for flexible access.

`_TablesGetter` specializes this for tables, reading from the database's `_tnames` list. The `db.t` property returns one of these, giving you a clean API: `db.t.artist` instead of `db.table('artist')`.

In [None]:
#| export
class _Getter:
    def __init__(self, db, attr): self.db,self.attr = db,attr

    def __repr__(self): return ", ".join(getattr(self.db, self.attr))
    def __dir__(self): return getattr(self.db, self.attr)
    def __iter__(self): return iter(self[dir(self)])
    def __contains__(self, s): return (s if isinstance(s,str) else s.name) in dir(self)
    def __getitem__(self, idxs):
        if isinstance(idxs,str): return self.db.table(idxs)
        return [self.db.table(o) for o in idxs]
    def __getattr__(self, k):
        if k.startswith('_'): raise AttributeError
        return self.db.table(k)

class _TablesGetter(_Getter):
    def __init__(self, db): super().__init__(db,'_tnames')

`connect` is our main entry point — it creates an asyncpg connection with `FRecord` as the default record class, sets up JSON codecs, and returns a `Database` wrapper with metadata already loaded:

In [None]:
async def connect(*args, **kwargs):
    kwargs.setdefault('record_class', FRecord)
    conn = await asyncpg.connect(*args, **kwargs)
    res = Database(conn, refresh=False)
    await res.refresh()
    return res

In [None]:
await conn.close()

db = await connect(user=user, database='chinook', host='127.0.0.1'); str(db)

'postgresql://natedawg@127.0.0.1:5432/chinook'

In [None]:
dt = db.t
artist = dt.artist
artist

Table "artist"

In [None]:
artist.pks

['artist_id']

In [None]:
dt['album','artist']

[Table "album", Table "artist"]

In [None]:
for tbl in dt:
    if tbl.name[0]=='a': print(tbl)

"album"
"artist"


In [None]:
assert 'artist' in dt
assert artist in dt
assert 'foo' not in dt

In [None]:
artist.cols

{'artist_id': 'integer', 'name': 'character varying(120)'}

`_Col` represents a single column, with `__str__` returning fully-qualified SQL (`"table"."column"`). `_ColsGetter` follows the same pattern as `_TablesGetter` — it's a magic accessor that lets you write `artist.c.name` to get a column reference. The `__call__` method returns all columns as `_Col` objects, useful for building queries programmatically.

In [None]:
#| export
class _Col:
    def __init__(self, t, c): self.t,self.c = t,c
    def __str__(self):  return f'"{self.t}"."{self.c}"'
    def __repr__(self):  return self.c
    def __iter__(self): return iter(self.c)

class _ColsGetter:
    def __init__(self, tbl): self.tbl = tbl
    def __dir__(self): return list(self.tbl.cols)
    def __repr__(self): return ", ".join(dir(self))
    def __call__(self): return [_Col(self.tbl.name,o.name) for o in self.tbl.columns]
    def __contains__(self, s): return (s if isinstance(s,str) else s.c) in self.tbl.cols
    def __getattr__(self, k):
        if k[0]=='_': raise AttributeError
        return _Col(self.tbl.name, k)

@patch(as_prop=True)
def c(self:Table): return _ColsGetter(self)

In [None]:
ac = artist.c
ac

artist_id, name

Columns stringify in a format suitable for including in SQL statements.

In [None]:
print(f"select {ac.name} ...")

select "artist"."name" ...


Tables and views do the same.

In [None]:
print(f"select {ac.Name} from {artist}")

select "artist"."Name" from "artist"


In [None]:
assert 'name' in ac
assert ac.name in ac
assert 'foo' not in ac

## Queries and views

`db.q` is a convenience method that runs a SQL query and wraps results in a `Results` list for nice HTML rendering:

In [None]:
#| export
@patch
async def q(self:Database, sql, *args): return Results(await self.fetch(sql, *args))

In [None]:
await db.q(f"select * from {artist} limit 2")

artist_id,name
1,AC/DC
2,Accept


In [None]:
acdc = await db.q(f"select * from {artist} where {ac.name} like 'AC/%'")
acdc

artist_id,name
1,AC/DC


## Dataclasses

PostgreSQL has many data types that map to Python equivalents. We'll import the Python types we need:

In [None]:
#| export
from datetime import datetime, date, time, timedelta
from decimal import Decimal
from uuid import UUID
from ipaddress import IPv4Network, IPv6Network, IPv4Interface, IPv6Interface, IPv4Address, IPv6Address
from asyncpg.types import Range, BitString, Box, Circle, Line, LineSegment, Path, Point, Polygon

`get_typ` extracts the base PostgreSQL type (stripping size specifiers like `(120)`) and maps it to the corresponding Python type:

In [None]:
#| export
def get_typ(pg_type):
    "Get Python type for PostgreSQL type string"
    return pg_to_py[pg_type.split('(')[0].strip()]

The `pg_to_py` dict maps PostgreSQL type names to Python types. This covers the most common types — numeric, string, temporal, JSON, network, and geometric:

In [None]:
#| export
pg_to_py = {
    'smallint': int, 'integer': int, 'bigint': int,
    'real': float, 'float': float, 'double precision': float,
    'numeric': Decimal, 'decimal': Decimal,
    
    'char': str, 'character': str, 'name': str, 'varchar': str, 'character varying': str, 'text': str, 'xml': str,
    'bytea': bytes,
    'boolean': bool, 'bool': bool,
    
    'date': date,
    'time': time, 'time without time zone': time, 'time with time zone': time,
    'timestamp': datetime, 'timestamp without time zone': datetime, 'timestamp with time zone': datetime,
    'interval': timedelta,
    
    'uuid': UUID,
    'json': dict, 'jsonb': dict,
    'money': str,
    'macaddr': str,
    
    'cidr': IPv4Network, 'inet': IPv4Interface,
    
    'bit': BitString, 'varbit': BitString,
    'box': Box, 'circle': Circle, 'line': Line, 'lseg': LineSegment,
    'path': Path, 'point': Point, 'polygon': Polygon,
    
    'anyarray': list, 'ARRAY': list,
    'anyrange': Range,
    'record': tuple, 'tid': tuple,
}

asyncpg doesn't automatically decode JSON columns — we need to register custom codecs. The `setup_json` function configures both `json` and `jsonb` types to use Python's `json` module:

In [None]:
#| export
import json

In [None]:
#| export
async def setup_json(conn):
    await conn.set_type_codec('json', encoder=json.dumps, decoder=json.loads, schema='pg_catalog')
    await conn.set_type_codec('jsonb', encoder=json.dumps, decoder=json.loads, schema='pg_catalog')

We'll re-define `connect` to use json now:

In [None]:
#| export
async def connect(*args, **kwargs):
    kwargs.setdefault('record_class', FRecord)
    conn = await asyncpg.connect(*args, **kwargs)
    await setup_json(conn)
    res = Database(conn, refresh=False)
    await res.refresh()
    return res

In [None]:
await db.close()

In [None]:
db = await connect(user=user, database='chinook', host='127.0.0.1')

We'll use Python's `dataclasses` module to auto-generate typed classes from table schemas:

In [None]:
#| export
from dataclasses import dataclass, field, make_dataclass, fields, Field, is_dataclass, MISSING

With the type mapping in place, we can auto-generate Python dataclasses from table schemas. The `_get_flds` helper extracts field definitions, and `dataclass()` creates a dataclass matching the table structure. We use `flexiclass` from fastcore to make the dataclass more flexible (allowing partial instantiation).

In [None]:
#| export
def _get_flds(tbl): 
    return [(k, get_typ(v)|None, field(default=UNSET))
            for k,v in tbl.cols.items()]

def _dataclass(self:Table, store=True, suf='')->type:
    "Create a `dataclass` with the types and defaults of this table"
    res = make_dataclass(self.name.title()+suf, _get_flds(self))
    flexiclass(res)
    if store: self.cls = res
    return res

Table.dataclass = _dataclass

In [None]:
dt = db.t
artist = dt.artist

Artist = artist.dataclass()
art1_obj = Artist(**acdc[0])
art1_obj

Artist(artist_id=1, name='AC/DC')

In [None]:
artist.cls

__main__.Artist

You can get the definition of the dataclass using fastcore's `dataclass_src`:

In [None]:
src = dataclass_src(Artist)
hl_md(src, 'python')

```python
@dataclass
class Artist:
    artist_id: int | None = UNSET
    name: str | None = UNSET

```

`all_dcs` generates dataclasses for every table (and optionally views) in the database. This is useful for type-checking and IDE autocompletion:

In [None]:
#| export
def all_dcs(db, with_views=False, store=True, suf=''):
    "dataclasses for all objects in `db`"
    return [o.dataclass(store=store, suf=suf) for o in list(db.t) + (db.views if with_views else [])]

## get

The `xtra` method (defined earlier) lets you set persistent filters on a table. The `_add_xtra` helper injects these constraints into WHERE clauses. This is useful for multi-tenant apps or any situation where you want automatic row filtering — e.g., `album.xtra(artist_id=1)` ensures all subsequent queries only see albums by artist 1.

In [None]:
#| export
def _add_xtra(tbl, where, args, offset=0):
    if not tbl.xtra_id: return where, args
    args = list(args)
    xw = ' AND '.join(f'"{k}"=${len(args)+offset+i+1}' for i,k in enumerate(tbl.xtra_id))
    where = f'({where}) AND {xw}' if where else xw
    args.extend(tbl.xtra_id.values())
    return where, args

`__getitem__` provides dict-style access by primary key. It raises `NotFoundError` if the row doesn't exist (or doesn't match `xtra` constraints). If a dataclass has been generated for the table, results are automatically converted to that type.

In [None]:
#| export
class NotFoundError(Exception): pass

@patch
async def __getitem__(self:Table, pk):
    "Get row by primary key, raising NotFoundError if missing"
    if not self.pks: raise ValueError(f"No primary key for {self.name}")
    where, args = _add_xtra(self, f'"{self.pks[0]}" = $1', [pk])
    res = await self.db.fetch(f'SELECT * FROM {self} WHERE {where}', *args)
    if not res: raise NotFoundError(f"{self.name}[{pk}]")
    return self.cls(**res[0]) if hasattr(self, 'cls') else res[0]

In [None]:
await artist[1]

Artist(artist_id=1, name='AC/DC')

In [None]:
album = dt.album
Album = album.dataclass()

print("Album 1:", await album[1])
print("Artist ID of album 1:", (await album[1]).artist_id)

album.xtra(artist_id=1)
print("\nWith xtra(artist_id=1):")
print("Album 1:", await album[1])  # Should work - album 1 is by artist 1

try: await album[2]  # Album 2 is by a different artist
except NotFoundError as e: print('Error correctly raised:', type(e))

Album 1: Album(album_id=1, title='For Those About To Rock We Salute You', artist_id=1)
Artist ID of album 1: 1

With xtra(artist_id=1):
Album 1: Album(album_id=1, title='For Those About To Rock We Salute You', artist_id=1)
Error correctly raised: <class '__main__.NotFoundError'>


`get` is the "safe" version of `__getitem__` — it returns `None` instead of raising an exception when a row isn't found. This mirrors the pattern in fastlite and Python's `dict.get()`.

In [None]:
#| export
@patch
async def get(self:Table, pk):
    "Get row by primary key, or None if missing"
    try: return await self[pk]
    except NotFoundError: return None

In [None]:
await artist.get(1)

Artist(artist_id=1, name='AC/DC')

In [None]:
await artist.get(99999)  # Should return None

## call/select

`rows_where` is the core query method. It builds SQL from its parameters, applies `xtra` constraints, and optionally converts results to the table's dataclass. Unlike psycopg2/sqlite which use `?` placeholders, PostgreSQL uses `$1, $2` positional parameters.

In [None]:
#| export
@patch
async def rows_where(self:Table, where=None, where_args=None, order_by=None, select='*', limit=None, offset=None,
    as_cls=True, debug=False):
    "Iterate over rows matching where clause"
    where, args = _add_xtra(self, where, where_args or [])
    sql = f'SELECT {select} FROM {self}'
    if where: sql += f' WHERE {where}'
    if order_by: sql += f' ORDER BY {order_by}'
    if limit: sql += f' LIMIT {limit}'
    if offset: sql += f' OFFSET {offset}'
    if debug: print(sql)
    res = await self.db.fetch(sql, *args)
    if as_cls and hasattr(self, 'cls'): res = [self.cls(**r) for r in res]
    return res

In [None]:
await artist.rows_where(limit=3)

[Artist(artist_id=1, name='AC/DC'),
 Artist(artist_id=2, name='Accept'),
 Artist(artist_id=3, name='Aerosmith')]

In [None]:
album.xtra(artist_id=1)
await album.rows_where(limit=5)

[Album(album_id=1, title='For Those About To Rock We Salute You', artist_id=1),
 Album(album_id=4, title='Let There Be Rock', artist_id=1)]

`count` is an async property that returns the number of rows in a table. It respects `xtra` constraints, so if you've set filters, only matching rows are counted:

In [None]:
#| export
@patch(as_prop=True)
async def count(self:Table):
    where, args = _add_xtra(self, None, [])
    sql = f'SELECT COUNT(*) FROM {self}'
    if where: sql += f' WHERE {where}'
    return await self.db.fetchval(sql, *args)

In [None]:
album.xtra(artist_id=1)
await album.count

2

In [None]:
album.xtra()
await album.count

347

In [None]:
#| export
from collections.abc import Mapping

`get_field` extracts a value from either a dict-like object (using `[k]`) or a dataclass/object (using `getattr`). This lets us handle both `Record` and dataclass results uniformly:

In [None]:
#| export
def get_field(r, k):
    return r[k] if isinstance(r, Mapping) else getattr(r, k)

`pks_and_rows_where` wraps `rows_where` but returns `(pk, row)` tuples — useful when you need to know which primary key each row has without inspecting the row itself.

In [None]:
#| export
@patch
async def pks_and_rows_where(self:Table, **kwargs):
    "Like rows_where but returns (pk, row) tuples"
    rows = await self.rows_where(**kwargs)
    pk = self.pks[0] if self.pks else None
    return [(get_field(r, pk) if pk else None, r) for r in rows]

In [None]:
await artist.pks_and_rows_where(limit=3)

[(1, Artist(artist_id=1, name='AC/DC')),
 (2, Artist(artist_id=2, name='Accept')),
 (3, Artist(artist_id=3, name='Aerosmith'))]

`__call__` makes tables callable, providing a convenient shorthand for queries. `await artist(limit=3)` is equivalent to `await artist.rows_where(limit=3)`. The `with_pk` parameter switches to returning `(pk, row)` tuples.

In [None]:
#| export
@patch
async def __call__(self:Table, where=None, where_args=None, order_by=None, limit=None, offset=None, select='*', with_pk=False,
    as_cls=True, debug=False):
    "Query table rows"
    f = self.pks_and_rows_where if with_pk else self.rows_where
    return await f(where=where, where_args=where_args, order_by=order_by, limit=limit, offset=offset, select=select,
        as_cls=as_cls, debug=debug)

In [None]:
await artist(limit=3)

[Artist(artist_id=1, name='AC/DC'),
 Artist(artist_id=2, name='Accept'),
 Artist(artist_id=3, name='Aerosmith')]

In [None]:
await artist(limit=3, with_pk=True)

[(1, Artist(artist_id=1, name='AC/DC')),
 (2, Artist(artist_id=2, name='Accept')),
 (3, Artist(artist_id=3, name='Aerosmith'))]

In [None]:
album.xtra(artist_id=1)
await album(limit=5)

[Album(album_id=1, title='For Those About To Rock We Salute You', artist_id=1),
 Album(album_id=4, title='Let There Be Rock', artist_id=1)]

`selectone` returns exactly one row matching the query, raising `NotFoundError` if none found or `ValueError` if multiple found. It passes `limit=2` internally so it can detect the "not unique" case without fetching the entire table.

In [None]:
#| export
@patch
async def selectone(
    self:Table,
    where:str|None=None,  # SQL where fragment to use, for example `id > ?`
    where_args: Iterable|dict|NoneType=None, # Parameters to use with `where`; iterable for `id>?`, or dict for `id>:id`
    select:str = "*", # Comma-separated list of columns to select
    as_cls:bool=True, # Convert returned dict to stored dataclass?
    debug:bool=False
)->list:
    "Shortcut for `__call__` that returns exactly one item"
    res = await self(where=where, where_args=where_args, select=select, as_cls=as_cls, limit=2, debug=debug)
    if len(res)==0: raise NotFoundError
    elif len(res) > 1: raise ValueError(f"Not unique: {len(res)} results")
    return res[0]

In [None]:
await artist.selectone('Name=$1', ('AC/DC',), debug=True)

SELECT * FROM "artist" WHERE Name=$1 LIMIT 2


Artist(artist_id=1, name='AC/DC')

In [None]:
try: await artist.selectone('Name like $1', ('%a%',))
except ValueError: pass
else: raise Exception("Failed to get non unique exception")

In [None]:
try: await artist.selectone('Name=$1', ('i do not exist',))
except NotFoundError: pass
else: raise Exception("Failed to get NotFoundError")

`db.item` is for scalar queries — it returns a single field from a single row. Useful for things like `SELECT count(*)` or `SELECT max(price)`.

In [None]:
#| export
@patch
async def item(self:Database, sql, args=None):
    "Execute sql and return a single field from a single row"
    res = await self.fetch(sql, *(args or []))
    if len(res)==0: raise NotFoundError
    elif len(res) > 1: raise ValueError(f"Not unique: {len(res)} results")
    row = res[0]
    if len(row) > 1: raise ValueError(f"Too many fields: {len(row)} fields")
    return row[0]

In [None]:
await db.item('select artist_id from artist where name=$1', ('AC/DC',))

1

## create_mod

`create_mod` generates a Python module file containing dataclass definitions for all tables in the database. This lets you import type-checked dataclasses directly rather than regenerating them each time. The generated file includes proper imports and uses `UNSET` defaults for flexible instantiation.

In [None]:
#| export
def create_mod(db, mod_fn, with_views=False, store=True, suf=''):
    "Create module for dataclasses for `db`"
    mod_fn = str(mod_fn)
    if not mod_fn.endswith('.py'): mod_fn+='.py'
    dcs = all_dcs(db, with_views, store=store, suf=suf)
    strlist = ', '.join([f'"{o.__name__}"' for o in dcs])
    with open(mod_fn, 'w') as f:
        print(f'__all__ = [{strlist}]', file=f)
        print('from dataclasses import dataclass', file=f)
        print('import datetime,decimal', file=f)
        print('from uuid import UUID', file=f)
        print('from fastcore.utils import UNSET', file=f)
        for o in dcs: print(dataclass_src(o), file=f)

In [None]:
create_mod(db, 'db_dc')

`link_dcs` reconnects a database's tables to dataclasses from a previously generated module. This is useful when you've imported dataclasses from a file created by `create_mod` and want the ORM to use them.

In [None]:
#| export
@patch
def link_dcs(self:Database, mod):
    "Set the internal dataclass type links for tables using `mod` (created via `create_mod`)"
    for o in mod.__all__: self.t[o.lower()].cls = getattr(mod, o)

In [None]:
#| eval: false
from db_dc import *
await dt.track[1]

Track(track_id=1, name='For Those About To Rock (We Salute You)', album_id=1, media_type_id=1, genre_id=1, composer='Angus Young, Malcolm Young, Brian Johnson', milliseconds=343719, bytes=11170334, unit_price=Decimal('0.99'))

`set_classes` is a convenience method that links all table dataclasses from a namespace (typically `globals()`). It expects dataclass names to be title-cased versions of table names (e.g., `Artist` for table `artist`).

In [None]:
#| export
@patch
def set_classes(self:Database, glb):
    "Add set all table dataclasses using types in namespace `glb`"
    for tbl in self.t: tbl.cls = glb[tbl.name.title()]

In [None]:
db.t

artist, album, employee, customer, invoice, invoice_line, track, playlist, playlist_track, genre, media_type, cat, dog, toy

`get_tables` injects table objects into a namespace with pluralized names — so `db.t.album` becomes available as `albums`. Combined with `set_classes`, this gives you a clean API: `await albums(limit=3)` returns a list of `Album` dataclass instances.

In [None]:
#| export
@patch
def get_tables(self:Database, glb):
    "Add objects for all table objects to namespace `glb`"
    for tbl in self.t: glb[tbl.name.lower()+'s'] = tbl

In [None]:
#| eval: false
db.set_classes(globals())
db.get_tables(globals())

await albums(limit=1)

[Album(album_id=1, title='For Those About To Rock We Salute You', artist_id=1)]

In [None]:
await employees(limit=1)

[Employee(employee_id=1, last_name='Adams', first_name='Andrew', title='General Manager', reports_to=None, birth_date=datetime.datetime(1962, 2, 18, 0, 0), hire_date=datetime.datetime(2002, 8, 14, 0, 0), address='11120 Jasper Ave NW', city='Edmonton', state='AB', country='Canada', postal_code='T5K 2N1', phone='+1 (780) 428-9482', fax='+1 (780) 428-3457', email='andrew@chinookcorp.com')]

## insert

To support both dataclasses and dicts as input, and to handle `Enum` values properly, we need these imports:

In [None]:
#| export
from dataclasses import asdict
from enum import Enum

`_process_row` converts a dataclass (or dict) to a plain dict, filtering out `UNSET` values and extracting `.value` from Enum fields. This lets you pass partially-filled dataclasses to `insert`/`update`.

In [None]:
#| export
def _process_row(row, kwargs):
    if row is None: d = {}
    elif not is_dataclass(row): d = dict(row) if hasattr(row, 'items') else {}
    else: d = {k:(v.value if isinstance(v, Enum) else v) for k,v in asdict(row).items() if v is not UNSET}
    return d|kwargs

`insert` adds a row to the table. It accepts either a dataclass/dict as `record`, keyword arguments, or both (kwargs override record fields). PostgreSQL's `RETURNING *` clause lets us get the inserted row back in one query — including any auto-generated values like `SERIAL` primary keys. The `xtra` constraints are automatically merged in.

In [None]:
#| export
def _prep_row(record, kwargs):
    row = _process_row(record, kwargs)
    if not row: return None*3
    cols = ', '.join(f'"{k}"' for k in row)
    vals = ', '.join(f'${i+1}' for i in range(len(row)))
    return row, cols, vals

@patch
async def _exec_returning(self:Table, sql, *args):
    res = await self.db.fetch(sql, *args)
    return self.cls(**res[0]) if hasattr(self, 'cls') else res[0]

@patch
async def insert(self:Table, record=None, **kwargs):
    "Insert a row and return it"
    row, cols, vals = _prep_row(record, {**kwargs, **self.xtra_id})
    if not row: return None
    sql = f'INSERT INTO {self} ({cols}) VALUES ({vals}) RETURNING *'
    return await self._exec_returning(sql, *row.values())

For DDL statements like `CREATE TABLE`, use `execute` rather than `fetch`/`q`. DDL statements don't return rows — they return a status string like `'CREATE TABLE'`. PostgreSQL uses `SERIAL` for auto-incrementing integers (instead of SQLite's `INTEGER PRIMARY KEY`) and `REAL` instead of `FLOAT`.

In [None]:
await db.execute('''
DROP TABLE IF EXISTS cat;
CREATE TABLE cat (
    id SERIAL PRIMARY KEY,
    name TEXT,
    weight REAL,
    uid INTEGER
)''')

'CREATE TABLE'

`_retr_tbl` is a helper that refreshes the database metadata and returns the table object for a given name. This ensures you're working with up-to-date schema information after creating or modifying tables.

In [None]:
#| export
@patch
async def _retr_tbl(self:Database, name):
    await self.refresh()
    return self.t[name]

`table2glb` is a convenience method that refreshes metadata, creates the dataclass, and injects both the table object (pluralized name) and the dataclass into a namespace. This is handy after creating a new table.

In [None]:
#| export
import inspect

In [None]:
#| export
@patch
async def table2glb(self:Database, name, glb=None):
    "Get table by name, refreshing metadata and creating dataclass, adding to glb"
    if glb is None: glb = inspect.currentframe().f_back.f_globals
    tbl = await self._retr_tbl(name)
    cls = tbl.dataclass()
    glb[name],glb[cls.__name__] = tbl,cls

In [None]:
await db.table2glb('cat')

In [None]:
cat

Table "cat"

In [None]:
c = await cat.insert(name='meow', weight=6, uid=2)
c

Cat(id=1, name='meow', weight=6.0, uid=2)

In [None]:
await cat()

[Cat(id=1, name='meow', weight=6.0, uid=2)]

With `xtra` set, `insert` automatically includes those constraints. Here we set `uid=1`, so the inserted cat gets that value even though we didn't pass it explicitly.

In [None]:
cat.xtra(uid=1)
c2 = await cat.insert(name='purr', weight=4)
c2

Cat(id=2, name='purr', weight=4.0, uid=1)

In [None]:
await cat()

[Cat(id=2, name='purr', weight=4.0, uid=1)]

Calling `xtra()` with no arguments clears the filter by setting `xtra_id = {}`. Now queries return all rows again.

In [None]:
cat.xtra()

Table "cat"

In [None]:
await cat()  # Should now return all cats, not just uid=1

[Cat(id=1, name='meow', weight=6.0, uid=2),
 Cat(id=2, name='purr', weight=4.0, uid=1)]

## update

In [None]:
c.name = "moo"
c.uid = 1
c

Cat(id=1, name='moo', weight=6.0, uid=1)

`_pk_where` builds a WHERE clause for primary key matching, using PostgreSQL's `$1, $2` placeholders with an offset to account for preceding parameters in the query.

In [None]:
#| export
def _pk_where(pks, offset=0):
    return ' AND '.join(f'"{pk}"=${i+offset+1}' for i,pk in enumerate(pks))

`update` modifies an existing row by primary key. It builds an `UPDATE ... SET ... WHERE pk = $n RETURNING *` statement. Like `insert`, it respects `xtra` constraints — if you try to update a row that doesn't match the `xtra` filter, you'll get `NotFoundError`.

In [None]:
#| export
@patch
async def update(self:Table, record=None, pk_values=None, **kwargs):
    "Update a row and return it"
    row = _process_row(record, kwargs)
    if not row: return None
    if pk_values is None: pk_values = [row[o] for o in self.pks]
    sets = ', '.join(f'"{k}"=${i+1}' for i,k in enumerate(row))
    xwhere = _pk_where(self.pks, len(row))
    pk_where, args = _add_xtra(self, xwhere, pk_values, len(row))
    sql = f'UPDATE {self} SET {sets} WHERE {pk_where} RETURNING *'
    res = await self.db.fetch(sql, *row.values(), *args)
    if not res: raise NotFoundError(f"{self.name}[{pk_values}]")
    return self.cls(**res[0]) if hasattr(self, 'cls') else res[0]

In [None]:
await cat.update(c)

Cat(id=1, name='moo', weight=6.0, uid=1)

In [None]:
await cat()

[Cat(id=2, name='purr', weight=4.0, uid=1),
 Cat(id=1, name='moo', weight=6.0, uid=1)]

In [None]:
cat.xtra(uid=2)
c.uid = 2
try: await cat.update(c)  # Should fail - c has id=1 which has uid=1, not uid=2
except NotFoundError as e: print('Correctly blocked:', e)

Correctly blocked: cat[[1]]


In [None]:
cat.xtra()

Table "cat"

## delete

`delete` removes a row by primary key, returning the deleted row (using `RETURNING *`). Like the other methods, it respects `xtra` constraints — attempting to delete a row that doesn't match the filter raises `NotFoundError`.

In [None]:
#| export
@patch
async def delete(self:Table, pk_values):
    "Delete row by primary key, returning the deleted row"
    pk_where, args = _add_xtra(self, _pk_where(self.pks), listify(pk_values))
    sql = f'DELETE FROM {self} WHERE {pk_where} RETURNING *'
    res = await self.db.fetch(sql, *args)
    if not res: raise NotFoundError(f"{self.name}[{pk_values}]")
    return self.cls(**res[0]) if hasattr(self, 'cls') else res[0]

Let's verify delete works — first check what cats we have:

In [None]:
await cat()

[Cat(id=2, name='purr', weight=4.0, uid=1),
 Cat(id=1, name='moo', weight=6.0, uid=1)]

Delete returns the deleted row, so you can see exactly what was removed:

In [None]:
await cat.delete(c.id)

Cat(id=1, name='moo', weight=6.0, uid=1)

In [None]:
await cat()

[Cat(id=2, name='purr', weight=4.0, uid=1)]

The `xtra` filter also applies to deletes. If you try to delete a row that doesn't match the constraint, you get `NotFoundError`:

In [None]:
cat.xtra(uid=99)
try: await cat.delete(2)  # Should fail - cat 2 has uid=1, not uid=99
except NotFoundError as e: print('Correctly blocked:', e)

Correctly blocked: cat[2]


In [None]:
cat.xtra()

Table "cat"

`delete_where` is the bulk version of `delete` — it removes all rows matching a WHERE clause (or all rows if none given), returning the deleted rows as a list. Like `delete`, it respects `xtra` constraints and uses `RETURNING *` to give back what was removed. This is useful for cleanup operations like removing all rows above a threshold.

In [None]:
#| export
@patch
async def delete_where(self:Table, where=None, where_args=None):
    where, args = _add_xtra(self, where, where_args or [])
    sql = f'DELETE FROM {self}' + (f' WHERE {where}' if where else '') + ' RETURNING *'
    res = await self.db.fetch(sql, *args)
    if hasattr(self, 'cls'): res = [self.cls(**r) for r in res]
    return res

In [None]:
await cat.insert(name='Cat McCat Face', weight=9)
await cat.insert(name='Skitter', weight=2)

Cat(id=4, name='Skitter', weight=2.0, uid=None)

In [None]:
await cat.delete_where('weight > $1', [3])

[Cat(id=2, name='purr', weight=4.0, uid=1),
 Cat(id=3, name='Cat McCat Face', weight=9.0, uid=None)]

In [None]:
await cat()

[Cat(id=4, name='Skitter', weight=2.0, uid=None)]

## Create

To create tables from Python classes, we need a reverse mapping from Python types to PostgreSQL types. We generate it from `pg_to_py` and override some entries for cleaner defaults (e.g., `TEXT` instead of `character varying`).

In [None]:
#| export
py_to_pg = {v: k for k, v in pg_to_py.items() if v not in (list, tuple, Range)}
# Override some for cleaner defaults
py_to_pg.update({int: 'INTEGER', str: 'TEXT', float: 'REAL', bool: 'BOOLEAN'})

`col_def` builds a column definition for `CREATE TABLE`. If the column is the primary key and has type `int`, it becomes `SERIAL PRIMARY KEY` (PostgreSQL's auto-incrementing integer). Otherwise it maps the Python type to PostgreSQL and adds `NOT NULL` if specified.

In [None]:
#| export
def col_def(name, typ, pk, not_null):
    "Build column definition SQL for CREATE TABLE"
    pg_typ = py_to_pg.get(typ, 'TEXT')
    if name == pk and typ == int: pg_typ = 'SERIAL'
    parts = [f'"{name}"', pg_typ]
    if name == pk: parts.append('PRIMARY KEY')
    elif not_null and name in not_null: parts.append('NOT NULL')
    return ' '.join(parts)

In [None]:
col_def('id', int, 'id', None)  # 'id' is pk and int -> SERIAL PRIMARY KEY

'"id" SERIAL PRIMARY KEY'

In [None]:
col_def('name', str, 'id', {'name'})  # not pk, in not_null -> TEXT NOT NULL

'"name" TEXT NOT NULL'

In [None]:
col_def('age', int, 'id', None)  # not pk -> INTEGER

'"age" INTEGER'

`db.create` creates a table from a Python class (or dataclass). It extracts field names and types, builds column definitions, and executes the `CREATE TABLE` statement. The `replace=True` option drops any existing table first (with `CASCADE` to handle dependencies).

In [None]:
#| export
@patch
async def create(self:Database, cls=None, name=None, pk='id', foreign_keys=None, defaults=None, 
                 column_order=None, not_null=None, if_not_exists=False, replace=False):
    "Create table from `cls`"
    flexiclass(cls)
    if name is None: name = camel2snake(cls.__name__)
    typs = {o.name: o.type for o in fields(cls)}
    if column_order: typs = {k: typs[k] for k in column_order if k in typs}
    
    cols = [col_def(k, v, pk, not_null) for k, v in typs.items()]
    if defaults:
        for i, (k, v) in enumerate(typs.items()):
            if k in defaults: cols[i] += f' DEFAULT {defaults[k]!r}'
    if foreign_keys:
        for col, (ref_tbl, ref_col) in foreign_keys.items():
            cols.append(f'FOREIGN KEY ("{col}") REFERENCES "{ref_tbl}" ("{ref_col}")')
    
    col_sql = ', '.join(cols)
    exists = 'IF NOT EXISTS ' if if_not_exists else ''
    if replace: await self.execute(f'DROP TABLE IF EXISTS "{name}" CASCADE')
    await self.execute(f'CREATE TABLE {exists}"{name}" ( {col_sql} )')
    tbl = await self._retr_tbl(name)
    tbl.cls = cls
    return tbl

`drop` removes a table from the database. The `cascade=True` option also drops any dependent objects (like foreign key references from other tables).

In [None]:
#| export
@patch
async def drop(self:Table, cascade=False):
    "Drop this table"
    casc = ' CASCADE' if cascade else ''
    await self.db.execute(f'DROP TABLE IF EXISTS {self}{casc}')
    await self.db.refresh()

In [None]:
await db.t.dog.drop(cascade=True)

Now let's test `create` with a simple `Dog` class:

In [None]:
class Dog: id:int; name:str; age:int

dogs = await db.create(Dog, replace=True)
dogs.cols

{'id': 'integer', 'name': 'text', 'age': 'integer'}

The auto-generated `SERIAL` primary key handles auto-increment automatically:

In [None]:
d = await dogs.insert(name='Rex', age=5)
d

Dog(id=1, name='Rex', age=5)

Foreign keys are specified as a dict mapping column names to `(table, column)` tuples:

In [None]:
class Toy: id:int; name:str; dog_id:int

toys = await db.create(Toy, replace=True, foreign_keys={'dog_id': ('dog', 'id')})
toys.cols

{'id': 'integer', 'name': 'text', 'dog_id': 'integer'}

The foreign key constraint is enforced by PostgreSQL — inserting a toy with an invalid `dog_id` would raise an error:

In [None]:
t = await toys.insert(name='Ball', dog_id=d.id)
t

Toy(id=1, name='Ball', dog_id=1)

## Upsert

`upsert` performs an "insert or update" operation using PostgreSQL's `ON CONFLICT ... DO UPDATE` clause. If a row with the same primary key exists, it updates it; otherwise it inserts a new row. Like `insert`, it uses `_prep_row` for row processing and `_exec_returning` for result handling, and respects `xtra` constraints.

In [None]:
#| export
@patch
async def upsert(self:Table, record=None, **kwargs):
    "Insert or update a row and return it"
    row, cols, vals = _prep_row(record, {**kwargs, **self.xtra_id})
    if not row: return None
    pk = self.pks[0]
    updates = ', '.join(f'"{k}"=EXCLUDED."{k}"' for k in row if k != pk)
    sql = f'INSERT INTO {self} ({cols}) VALUES ({vals}) ON CONFLICT ("{pk}") DO UPDATE SET {updates} RETURNING *'
    return await self._exec_returning(sql, *row.values())

Let's test upsert — first check current state:

In [None]:
await dogs()

[Dog(id=1, name='Rex', age=5)]

Updating an existing row — change Rex's age from 5 to 6:

In [None]:
d.age = 6
await dogs.upsert(d)

Dog(id=1, name='Rex', age=6)

In [None]:
await dogs()

[Dog(id=1, name='Rex', age=6)]

Inserting a new row — upsert without an existing id creates a new record:

In [None]:
await dogs.upsert(name='Spot', age=3)

Dog(id=2, name='Spot', age=3)

In [None]:
await dogs()

[Dog(id=1, name='Rex', age=6), Dog(id=2, name='Spot', age=3)]

With `xtra` set, upsert merges those constraints into the row:

In [None]:
dogs.xtra(age=6)
d.name = 'Rexy'
await dogs.upsert(d)  # Should set age=6 from xtra

Dog(id=1, name='Rexy', age=6)

## Connection pool

For production use, you'll want a connection pool instead of a single connection. `asyncpg.Pool` has the same query methods (`fetch`, `execute`, etc.) as a connection, so our `Database` wrapper works with both. The key difference is that JSON codecs must be registered via the `init` callback (which runs on each new connection in the pool).

In [None]:
#| export
async def create_pool(*args, **kwargs):
    kwargs.setdefault('record_class', FRecord)
    kwargs.setdefault('init', setup_json)
    pool = await asyncpg.create_pool(*args, **kwargs)
    res = Database(pool, refresh=False)
    await res.refresh()
    return res

In [None]:
await db.close()

Let's test that the pool works the same as a single connection:

In [None]:
db = await create_pool(user=user, database='chinook', host='127.0.0.1')
await db.t.artist(limit=3)

[<FRecord artist_id=1 name='AC/DC'>,
 <FRecord artist_id=2 name='Accept'>,
 <FRecord artist_id=3 name='Aerosmith'>]

In [None]:
str(db)

'postgresql://natedawg@127.0.0.1:5432/chinook'