# SQLAlchemy asynchroní loadery

## Notebook support

In [1]:
!pip install asyncpg



SQLAlchemy je knihovnou / frameworkem, který umožňuje odstínit konkrétní typ databázového serveru. Díky této knihovně IT specialista modeluje datové entity bez ohledu na konkrétní úložiště. Podobných knihoven existuje celá řada, ale SQLAlchemy je pravděpodobně nejpoužívanější.

Z hlediska modelování datových struktur existují dva základní přístupy:
- Database First
- Code First

Database First je způsob, kdy vznikají popisy přímo v databázi. Alternativně lze existující databázi vzít jako základ a dále ji rozšiřovat. Toto souvisí s tzv. migracemi, které mají specifický význam při upgrade informačního systému.

Code First předpokládá, že popis datových struktur je definován kódem a z tohoto kódu je následně odvozena posloupnost příkazů, které musí být nad databází provedeny, aby vznikly tabulky s jejich strukturou a vzájemným propojením (Foreign Keys).

SQLAlchemy podporuje oba přístupy, lze tedy z existující databáze odvodit modely nebo na základě modelů vytvořit strukturu databáze.

https://github.com/LeeBergstrand/Jupyter-SQLAlchemy-Tutorial/blob/master/Jupyter-SQLAlchemy.ipynb

In [2]:
#https://docs.sqlalchemy.org/en/13/orm/tutorial.html
#https://docs.sqlalchemy.org/en/14/orm/basic_relationships.html
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, BigInteger, Sequence, Table, ForeignKey, DateTime
from sqlalchemy.orm import relationship

### Engine

Engine "Stroj" je prvek, přes který jsou posílány SQL příkazy na server. V případě, kdy dochází k prvotní inicializaci (instalace), je nutné detekovat a případně vytvořit databázi a její strukturu.

In [3]:
!pip install sqlalchemy_utils



Testování a prvotní vytvoření databáze pomocí `connectionstring`u, který představuje úplnou definici propojení se serverem. Connecion string obsahuje definici driveru, jména uživatele, heslo uživatele, jméno serveru (počítače, tzv. hostname) a jméno databáze.

In [4]:
from sqlalchemy_utils.functions import database_exists, create_database

connectionstring = 'postgresql+psycopg2://postgres:example@postgres/newdatabase'
if not database_exists(connectionstring):  #=> False
    try:
        create_database(connectionstring)
        doCreateAll = True
        print('Database created')
    except Exception as e:
        print('Database does not exists and cannot be created')
        raise
else:
    print('Database already exists')

Database already exists


In [5]:
from sqlalchemy import create_engine

#engine = create_engine('sqlite:///:memory:', echo=True)
#engine = create_engine('postgresql+psycopg2://user:password@hostname/database_name')

In [6]:
engine = create_engine(connectionstring) 

### Models

Modely prezentují struktury uložené v tabulkách. Představují tak proces transformace z výsledku dotazu do struktur jazyka Python a ze struktur jazyka do prvků SQL dotazů.

V SQLAlchemy je zebezpečeno provázání modelů (mimo jiné relace) pomocí dědičnosti, kdy existuje třída, ze které jsou odvozeny všechny modely. Jsou využity specifické funkce jazyka Python k tomu, aby při deklaraci modelů vznikl registr těchto modelů. Tento přístup umožňuje řešit specifické problémy. 

In [7]:
from sqlalchemy.ext.declarative import declarative_base

BaseModel = declarative_base()

  BaseModel = declarative_base()


`BaseModel` je třídou, která musí být použita při deklaraci modelů. Všimněte si, že tato třída je návratovou hodnotou funkce. Tuto třídu lze vytvořit různými způsoby, zde si ukazujeme nejčastěji používaný.

V následující části jsou deklarovány tři modely `UserModel`, `GroupModel` a `GroupTypeModel`. Protože mezi `UserModel` a `GroupModel` je relace M:N, je nutné mít zprostředkující tabulku a tedy i model. Tímto modelem je `UserGroupModel`, který není definovaný jako třída, ale je vytvořen pomocí funkce `Table`.

In [8]:
import datetime
from sqlalchemy import Column, String, BigInteger, Integer, DateTime, ForeignKey, Sequence, Table
from sqlalchemy.orm import relationship

unitedSequence = Sequence('all_id_seq')

UserGroupModel = Table('users_groups', BaseModel.metadata,
        Column('id', BigInteger, Sequence('all_id_seq'), primary_key=True),
        Column('user_id', ForeignKey('users.id'), primary_key=True),
        Column('group_id', ForeignKey('groups.id'), primary_key=True)
)

class UserModel(BaseModel):
    __tablename__ = 'users'
    
    id = Column(BigInteger, Sequence('all_id_seq'), primary_key=True)
    name = Column(String)
    surname = Column(String)
    email = Column(String)
    
    lastchange = Column(DateTime, default=datetime.datetime.now)
    externalId = Column(BigInteger, index=True)

    groups = relationship('GroupModel', secondary=UserGroupModel, back_populates='users')
        
class GroupModel(BaseModel):
    __tablename__ = 'groups'
    
    id = Column(BigInteger, Sequence('all_id_seq'), primary_key=True)
    name = Column(String)
    
    lastchange = Column(DateTime, default=datetime.datetime.now)
    entryYearId = Column(Integer)

    externalId = Column(String, index=True)

    grouptype_id = Column(ForeignKey('grouptypes.id'))
    grouptype = relationship('GroupTypeModel', back_populates='groups')

    users = relationship('UserModel', secondary=UserGroupModel, back_populates='groups')

class GroupTypeModel(BaseModel):
    __tablename__ = 'grouptypes'
    
    id = Column(BigInteger, Sequence('all_id_seq'), primary_key=True)
    name = Column(String)

    groups = relationship('GroupModel', back_populates='grouptype')

### Inicializace struktur v databázi

Existují dva základní přístupy, které jsou v praxi kombinovány. Jedná se o

- database first
- code first

V tomto případě využíváme přístup code first, kdy budoucí strukturu tabulek v databázi je definována třídami. Tato definice poslouží k vytvoření struktury databáze a jejich tabulek.

In [9]:
#BaseModel.metadata.drop_all(engine)
BaseModel.metadata.create_all(engine)

`drop_all` všechny tabulky odstraní. Pozor, není to prosté a destruktivní odstranění. Pokud dosud definovaná struktura (třídami) neodpovídá struktuře relací v databázi, může dojít k chybě.

`create_all` vytvoří všechny tabulky a relace mezi nimi.

In [10]:
!pip install asyncpg



SQLAlchemy je knihovnou / frameworkem, který umožňuje odstínit konkrétní typ databázového serveru. Díky této knihovně IT specialista modeluje datové entity bez ohledu na konkrétní úložiště. Podobných knihoven existuje celá řada, ale SQLAlchemy je pravděpodobně nejpoužívanější.

Z hlediska modelování datových struktur existují dva základní přístupy:
- Database First
- Code First

Database First je způsob, kdy vznikají popisy přímo v databázi. Alternativně lze existující databázi vzít jako základ a dále ji rozšiřovat. Toto souvisí s tzv. migracemi, které mají specifický význam při upgrade informačního systému.

Code First předpokládá, že popis datových struktur je definován kódem a z tohoto kódu je následně odvozena posloupnost příkazů, které musí být nad databází provedeny, aby vznikly tabulky s jejich strukturou a vzájemným propojením (Foreign Keys).

SQLAlchemy podporuje oba přístupy, lze tedy z existující databáze odvodit modely nebo na základě modelů vytvořit strukturu databáze.

https://github.com/LeeBergstrand/Jupyter-SQLAlchemy-Tutorial/blob/master/Jupyter-SQLAlchemy.ipynb

In [11]:
#https://docs.sqlalchemy.org/en/13/orm/tutorial.html
#https://docs.sqlalchemy.org/en/14/orm/basic_relationships.html
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy import Column, Integer, String, BigInteger, Sequence, Table, ForeignKey, DateTime
from sqlalchemy.orm import relationship

In [12]:
!pip install asyncpg



Knihovna `asyncpg` umožňuje zpracovat connection string uvedený níže, který reprezentuje asynchronní připojení k serveru.

In [13]:
connectionstring = "postgresql+asyncpg://postgres:example@postgres/newdatabase"

In [14]:
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.ext.asyncio import create_async_engine

In [15]:
asyncEngine = create_async_engine(connectionstring, echo=True) 

In [16]:
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.orm import sessionmaker

async_sessionMaker = sessionmaker(
        asyncEngine, expire_on_commit=False, class_=AsyncSession
    )

# Dataloaders

In [17]:
!pip install aiodataloader

Collecting aiodataloader
  Downloading aiodataloader-0.4.0-py3-none-any.whl (10 kB)
Installing collected packages: aiodataloader
Successfully installed aiodataloader-0.4.0


In [18]:
from sqlalchemy import select

from aiodataloader import DataLoader
import asyncio

asyncSessionMaker = async_sessionMaker
dbModel = UserModel
filtermethod = dbModel.id.in_

class Loader(DataLoader):
    async def batch_load_fn(self, keys):
        print('batch_load_fn', keys, flush=True)
        mainstmt = select(dbModel)
        async with asyncSessionMaker() as session:
            statement = mainstmt.filter(filtermethod(keys))
            rows = await session.execute(statement)
            rows = rows.scalars()
            #return rows
            datamap = {}
            for row in rows:
                datamap[row.id] = row
            result = [datamap.get(id, None) for id in keys]
            return result
        
dataLoader = Loader()
keys = [1, 2, 3, 4, 4]
promises = [dataLoader.load(key) for key in keys]
values = await asyncio.gather(*promises)
emails = [(value.id, value.email) for value in values]
print(emails)
print(30*"-")
promises = [dataLoader.load(key) for key in keys]
values = await asyncio.gather(*promises)
emails = [(value.id, value.email) for value in values]
print(emails)

batch_load_fn [1, 2, 3, 4]
2023-10-17 10:47:54,174 INFO sqlalchemy.engine.Engine select pg_catalog.version()
2023-10-17 10:47:54,176 INFO sqlalchemy.engine.Engine [raw sql] ()
2023-10-17 10:47:54,180 INFO sqlalchemy.engine.Engine select current_schema()
2023-10-17 10:47:54,184 INFO sqlalchemy.engine.Engine [raw sql] ()
2023-10-17 10:47:54,188 INFO sqlalchemy.engine.Engine show standard_conforming_strings
2023-10-17 10:47:54,189 INFO sqlalchemy.engine.Engine [raw sql] ()
2023-10-17 10:47:54,193 INFO sqlalchemy.engine.Engine BEGIN (implicit)
2023-10-17 10:47:54,220 INFO sqlalchemy.engine.Engine SELECT users.id, users.name, users.surname, users.email, users.lastchange, users."externalId" 
FROM users 
WHERE users.id IN ($1::BIGINT, $2::BIGINT, $3::BIGINT, $4::BIGINT)
2023-10-17 10:47:54,222 INFO sqlalchemy.engine.Engine [generated in 0.00183s] (1, 2, 3, 4)
2023-10-17 10:47:54,238 INFO sqlalchemy.engine.Engine ROLLBACK
[(1, 'leslie.cannon@university.world'), (2, 'cathy.hickman@university.wo

In [None]:
#from sqlalchemy.future import select
from sqlalchemy import select
from sqlalchemy import delete

In [None]:
def updateModel(destinationModel, sourceModel=None, extraValues={}):
    """Updates destination's attributes with source's attributes.
    Attributes with value None are not updated."""
    if source is not None:
        for name in dir(sourceModel):
            if name.startswith("_"):
                continue
            value = getattr(sourceModel, name)
            if value is not None:
                setattr(destinationModel, name, value)

    for name, value in extraValues.items():
        setattr(destinationModel, name, value)

    return destinationModel


In [None]:
import datetime

from aiodataloader import DataLoader
update = updateModel

def createIdLoader(asyncSessionMaker, dbModel):

    mainstmt = select(dbModel)
    filtermethod = dbModel.id.in_
    class Loader(DataLoader):
        async def batch_load_fn(self, keys):
            #print('batch_load_fn', keys, flush=True)
            async with asyncSessionMaker() as session:
                statement = mainstmt.filter(filtermethod(keys))
                rows = await session.execute(statement)
                rows = rows.scalars()
                #return rows
                datamap = {}
                for row in rows:
                    datamap[row.id] = row
                result = [datamap.get(id, None) for id in keys]
                return result

        async def insert(self, entity, extraAttributes={}):
            newdbrow = dbModel()
            #print("insert", newdbrow, newdbrow.id, newdbrow.name, flush=True)
            newdbrow = update(newdbrow, entity, extraAttributes)
            async with asyncSessionMaker() as session:
                #print("insert", newdbrow, newdbrow.id, newdbrow.name, flush=True)
                session.add(newdbrow)
                await session.commit()
            #self.clear(newdbrow.id)
            #self.prime(newdbrow.id, newdbrow)
            #print("insert", newdbrow, newdbrow.id, newdbrow.name, flush=True)
            return newdbrow

        async def update(self, entity, extraValues={}):
            async with asyncSessionMaker() as session:
                statement = mainstmt.filter_by(id=entity.id)
                rows = await session.execute(statement)
                rows = rows.scalars()
                rowToUpdate = next(rows, None)

                if rowToUpdate is None:
                    return None

                dochecks = hasattr(rowToUpdate, 'lastchange')             
                checkpassed = True  
                #print('loaded', rowToUpdate)
                #print('loaded', rowToUpdate.id, rowToUpdate.name)
                if (dochecks):
                    #print('checking', flush=True)
                    if (entity.lastchange != rowToUpdate.lastchange):
                        #print('checking failed', flush=True)
                        result = None
                        checkpassed = False                        
                    else:
                        entity.lastchange = datetime.datetime.now()
                        #print(entity)           
                if checkpassed:
                    rowToUpdate = update(rowToUpdate, entity, extraValues=extraValues)
                    #print('updated', rowToUpdate.id, rowToUpdate.name, rowToUpdate.lastchange)
                    await session.commit()
                    #print('after commit', rowToUpdate.id, rowToUpdate.name, rowToUpdate.lastchange)
                    #print('after commit', row.id, row.name, row.lastchange)
                    result = rowToUpdate
                    self.registerResult(result)
                
                #self.clear_all()
            # cacherow = await self.load(result.id)
            # print("cacherow", cacherow, flush=True)
            # print("cacherow", cacherow.name, cacherow.id, flush=True)
            # print("cacherow", list(self._cache.keys()), flush=True)
            # cachevalue = await self._cache.get(entity.id)
            # print("cacherow", cachevalue.id, cachevalue.name, flush=True)
            return result

        async def delete(self, id):
            statement = delete(dbModel).where(dbModel.id==id)
            async with asyncSessionMaker() as session:
                result = await session.execute(statement)
                await session.commit()
                self.clear(id)
                return result

        def registerResult(self, result):
            self.clear(result.id)
            self.prime(result.id, result)
            return result

        def getSelectStatement(self):
            return select(dbModel)
        
        def getModel(self):
            return dbModel
        
        def getAsyncSessionMaker(self):
            return asyncSessionMaker
        
        async def execute_select(self, statement):
            async with asyncSessionMaker() as session:
                rows = await session.execute(statement)
                return (
                    self.registerResult(row)
                    for row in rows.scalars()
                )
            
        async def filter_by(self, **filters):
            statement = mainstmt.filter_by(**filters)
            async with asyncSessionMaker() as session:
                rows = await session.execute(statement)
                return (
                    self.registerResult(row)
                    for row in rows.scalars()
                )

        async def page(self, skip=0, limit=10):
            statement = mainstmt.offset(skip).limit(limit)
            async with asyncSessionMaker() as session:
                rows = await session.execute(statement)
                return (
                    self.registerResult(row)
                    for row in rows.scalars()
                )
            
        def set_cache(self, cache_object):
            self.cache = True
            self._cache = cache_object


    return Loader(cache=True)