# Strawberry

## Libraries dependency and imports

In [1]:
!pip install strawberry-graphql
!pip install uvicorn[standard]
!pip install fastapi
!pip install psycopg2-binary

Collecting strawberry-graphql
  Downloading strawberry_graphql-0.209.5-py3-none-any.whl (273 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m273.3/273.3 kB[0m [31m107.0 kB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hCollecting graphql-core<3.3.0,>=3.2.0
  Downloading graphql_core-3.2.3-py3-none-any.whl (202 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m202.9/202.9 kB[0m [31m260.8 kB/s[0m eta [36m0:00:00[0m
Installing collected packages: graphql-core, strawberry-graphql
Successfully installed graphql-core-3.2.3 strawberry-graphql-0.209.5
Collecting websockets>=10.4
  Downloading websockets-11.0.3-cp310-cp310-manylinux_2_5_x86_64.manylinux1_x86_64.manylinux_2_17_x86_64.manylinux2014_x86_64.whl (129 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m129.9/129.9 kB[0m [31m49.0 kB/s[0m eta [36m0:00:00[0ma [36m0:00:01[0m
[?25hCollecting uvloop!=0.15.0,!=0.15.1,>=0.14.0
  Downloading uvloop-0.17.0-cp310-cp310-manylinux_2

## Helper Func for App in Notebook

V ukázkách dále bude použit kód, který je specifický pro prostředí jupyter a který tak umožňuje spouštět ukázky přímo v notebooku. Fakticky je kódem vytvořen subproces, který zabezpečuje běh serveru. Identifikace subprocesu je uložena v datové struktuře `servers`. Díky tomu lze identifikovat, zda na požadovaném portu již nějaký server běží a v případě potřeby jej zastavit a spustit nový server.

Po ukončení experimentů se serverem (kódem) je nutné tento server zastavit, aby došlo k uvolnění portu. V případe problémů je možné, že bude nezbytné restartovat jupyter, aby byly porty uvolněny. Je-li spuštěn nový server, aniž by běžící na stejném portu byl ukončen, dojde k chybovému stavu.

```python
assert port in [9991, 9992, 9993, 9994]
```
Slouží k ověření, že požadovaný port je dostupný i z prostředí mimo jupyter. Vzpomeňte si na konfiguraci docker stacku a mapování portů mimo jupyter kontejner.

In [2]:
# Code in this cell is just for (re)starting the API on a Process, and other compatibility stuff with Jupyter cells.
# Just ignore it!
import uvicorn
from multiprocessing import Process

servers = {}

def start_api(app=None, port=9992, runNew=True):
    """Stop the API if running; Start the API; Wait until API (port) is available (reachable)"""
    assert port in [9991, 9992, 9993, 9994], f'port has unexpected value {port}'
    def run():
        uvicorn.run(app, port=port, host='0.0.0.0', root_path='')    
        
    _api_process = servers.get(port, None)
    if _api_process:
        _api_process.terminate()
        _api_process.join()
        del servers[port]
    
    if runNew:
        assert (not app is None), 'app is None'
        _api_process = Process(target=run, daemon=True)
        _api_process.start()
        servers[port] = _api_process

In [3]:
import os
import asyncio
import multiprocessing
servers = {}

def start_api(app=None, port=9992, runNew=True):
    """Stop the API if running; Start the API; Wait until API (port) is available (reachable)"""
    assert port in [9991, 9992, 9993, 9994], f'port has unexpected value {port}'
    
    async def runAsync():
        config = uvicorn.Config(app, port=port, host='0.0.0.0', log_level="info")
        server = uvicorn.Server(config)
        await server.serve()
        #uvicorn.run(app, port=port, host='0.0.0.0', root_path='')    
        
    def withLoop():
        process_name = "[Process %s]" % (os.getpid())
        print("%s Started " % process_name)

        #loop = asyncio.get_event_loop()
        loop = asyncio.new_event_loop()
        try:
            loop.run_until_complete(runAsync())
        except KeyboardInterrupt:
            print("%s Loop interrupted" % process_name)
            loop.stop()

        print("%s terminating" % process_name)
        pass
    
    _api_process = servers.get(port, None)
    if _api_process:
        _api_process.terminate()
        _api_process.join()
        del servers[port]
    
    if runNew:
        assert (not app is None), 'app is None'
        _api_process = multiprocessing.Process(target=withLoop)
        #_api_process = Process(target=run, daemon=True)
        _api_process.start()
        servers[port] = _api_process

In [4]:
!pip install gunicorn

Collecting gunicorn
  Downloading gunicorn-21.2.0-py3-none-any.whl (80 kB)
[2K     [90m━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━━[0m [32m80.2/80.2 kB[0m [31m83.3 kB/s[0m eta [36m0:00:00[0m [36m0:00:01[0m
Installing collected packages: gunicorn
Successfully installed gunicorn-21.2.0


? # Code in this cell is just for (re)starting the API on a Process, and other compatibility stuff with Jupyter cells.
? # Just ignore it!
import uvicorn
from multiprocessing import Process

async def start_api(app=None, port=9992, runNew=True):
    """Stop the API if running; Start the API; Wait until API (port) is available (reachable)"""
    assert port in [9991, 9992, 9993, 9994], f'port has unexpected value {port}'

    config = uvicorn.Config(app, port=port, host='0.0.0.0', log_level="info")
    server = uvicorn.Server(config)
    await server.serve()

await start_api(app, port=9992, runNew=True)

In [31]:
#start_api(app, port=9992, runNew=False)

## Hello World in Strawberry

In [5]:
import strawberry
import uuid

@strawberry.type(description="""Type for query root""")
class Query:

    @strawberry.field(description="""Returns a hello""")
    async def say_hello(self, info: strawberry.types.Info, id: strawberry.ID) -> str:
        result = f'Hello {id}'
        return result
    
from strawberry.asgi import GraphQL

graphql_app = GraphQL(
    strawberry.federation.Schema(Query), 
    graphiql = True,
    allow_queries_via_get = True
)

from fastapi import FastAPI
app = FastAPI()
app.mount("/gql", graphql_app)

start_api(app, port=9992, runNew=True)

[Process 23563] Started 


INFO:     Started server process [23563]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://0.0.0.0:9992 (Press CTRL+C to quit)


INFO:     172.20.0.1:45804 - "GET /docs HTTP/1.1" 200 OK
INFO:     172.20.0.1:45804 - "GET /openapi.json HTTP/1.1" 200 OK
INFO:     172.20.0.1:45824 - "GET /gql HTTP/1.1" 307 Temporary Redirect
INFO:     172.20.0.1:45824 - "GET / HTTP/1.1" 200 OK
INFO:     172.20.0.1:45824 - "POST / HTTP/1.1" 200 OK
INFO:     172.20.0.1:45866 - "POST / HTTP/1.1" 200 OK
INFO:     172.20.0.1:45878 - "POST / HTTP/1.1" 200 OK
INFO:     172.20.0.1:45918 - "POST / HTTP/1.1" 200 OK
INFO:     172.20.0.1:45938 - "POST / HTTP/1.1" 200 OK
INFO:     172.20.0.1:45938 - "POST / HTTP/1.1" 200 OK


INFO:     Shutting down
INFO:     Waiting for application shutdown.
INFO:     Application shutdown complete.
INFO:     Finished server process [23563]


[Process 23563] terminating


In [6]:
start_api(app, port=9992, runNew=False)

## DB with SQLAlchemy

### Models

In [34]:
import sqlalchemy
import datetime

from sqlalchemy import Column, String, BigInteger, Integer, DateTime, ForeignKey, Sequence, Table, Boolean
from sqlalchemy.dialects.postgresql import UUID

from sqlalchemy.orm import relationship
from sqlalchemy.ext.declarative import declarative_base

BaseModel = declarative_base()

def UUIDColumn(name=None):
    if name is None:
        return Column(UUID(as_uuid=True), primary_key=True, server_default=sqlalchemy.text("gen_random_uuid()"), unique=True)
    else:
        return Column(name, UUID(as_uuid=True), primary_key=True, server_default=sqlalchemy.text("gen_random_uuid()"), unique=True)
    
class MembershipModel(BaseModel):
    """Spojuje User s Group jestlize User je clen Group
       Umoznuje udrzovat historii spojeni
    """

    __tablename__ = 'memberships'

    id = UUIDColumn()
    user_id = Column(ForeignKey('users.id'), primary_key=True)
    group_id = Column(ForeignKey('groups.id'), primary_key=True)

    user = relationship('UserModel', back_populates='memberships')
    group = relationship('GroupModel', back_populates='memberships')
    
class UserModel(BaseModel):
    """Spravuje data spojena s uzivatelem
    """
    __tablename__ = 'users'

    id = UUIDColumn()
    name = Column(String)
    surname = Column(String)
    email = Column(String)

    memberships = relationship('MembershipModel', back_populates='user')

class GroupModel(BaseModel):
    """Spravuje data spojena se skupinou
    """
    __tablename__ = 'groups'
    
    id = UUIDColumn()
    name = Column(String)
    
    memberships = relationship('MembershipModel', back_populates='group')


### Connectionstring

In [35]:
import os
def ComposeConnectionString():
    """Odvozuje connectionString z promennych prostredi (nebo z Docker Envs, coz je fakticky totez).
       Lze predelat na napr. konfiguracni file.
    """
    user = os.environ.get("POSTGRES_USER", "postgres")
    password = os.environ.get("POSTGRES_PASSWORD", "example")
    database =  os.environ.get("POSTGRES_DB", "data")
    hostWithPort =  os.environ.get("POSTGRES_HOST", "postgres:5432")
    
    driver = "postgresql+asyncpg" #"postgresql+psycopg2"
    connectionstring = f"{driver}://{user}:{password}@{hostWithPort}/{database}"
    
    connectionstring = "postgresql+asyncpg://postgres:example@postgres/demo"
    return connectionstring

### Async Engine

In [36]:
from sqlalchemy.ext.asyncio import AsyncSession
from sqlalchemy.ext.asyncio import create_async_engine
from sqlalchemy.orm import sessionmaker

async def startEngine(connectionstring, makeDrop=False, makeUp=True):
    """Provede nezbytne ukony a vrati asynchronni SessionMaker """
    asyncEngine = create_async_engine(connectionstring) 

    async with asyncEngine.begin() as conn:
        if makeDrop:
            await conn.run_sync(BaseModel.metadata.drop_all)
            print('BaseModel.metadata.drop_all finished')
        if makeUp:
            await conn.run_sync(BaseModel.metadata.create_all)    
            print('BaseModel.metadata.create_all finished')

    async_sessionMaker = sessionmaker(
        asyncEngine, expire_on_commit=False, class_=AsyncSession
    )
    return async_sessionMaker


### Populate Database

In [37]:
import uuid
def newUUID():
    return f'{uuid.uuid1()}'

users = [
    {'id': newUUID(), 'name': 'John', 'surname': 'Newbie'},
    {'id': newUUID(), 'name': 'Julia', 'surname': 'Green'},
]

groups = [
    {'id': newUUID(), 'name': 'UIT'},
    {'id': newUUID(), 'name': 'FVG'},
    {'id': newUUID(), 'name': 'K401'},
]

memberships = [
    {'id': newUUID(), 'user_id': users[0]['id'], 'group_id': groups[0]['id']},
    {'id': newUUID(), 'user_id': users[0]['id'], 'group_id': groups[1]['id']},
    {'id': newUUID(), 'user_id': users[0]['id'], 'group_id': groups[2]['id']},
    {'id': newUUID(), 'user_id': users[1]['id'], 'group_id': groups[0]['id']},
]

entitiesToAdd = [UserModel(**row) for row in users] + \
    [GroupModel(**row) for row in groups] + [MembershipModel(**row) for row in memberships]

In [38]:
sessionMaker = await startEngine(ComposeConnectionString(), makeDrop=False, makeUp=True)

async with sessionMaker() as session:
    async with session.begin():
        session.add_all(entitiesToAdd)
    await session.commit()

BaseModel.metadata.create_all finished


## Strawberry + SQLAlchemy Synchronous

### Strawberry synchronous resolvers

In [52]:
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker

def startEngine(connectionstring, makeDrop=False, makeUp=True):
    """Provede nezbytne ukony a vrati synchronni SessionMaker """
    
    connectionstring = connectionstring.replace("postgresql+asyncpg", "postgresql+psycopg2")
    engine = create_engine(connectionstring) 

    if makeDrop:
        BaseModel.metadata.drop_all(bind=engine)
        print('BaseModel.metadata.drop_all finished')
    if makeUp:
        BaseModel.metadata.create_all(bind=engine)    
        print('BaseModel.metadata.create_all finished')

    result = sessionmaker(engine, expire_on_commit=False)
    return result

sessionMaker = startEngine(ComposeConnectionString())

BaseModel.metadata.create_all finished


In [61]:
def createEntityGetterSync(DBModel: BaseModel):
    """Předkonfiguruje dotaz do databáze na vektor entit
    
    Parameters
    ----------
    DBModel : BaseModel
        class representing SQLAlchlemy model - table where record will be found
    Returns
    -------
    Callable[[AsyncSession, int, int], Awaitable[DBModel]]
        asynchronous function for query into database
    """
    
    stmt = select(DBModel)
    
    def resultedFunction(session, skip, limit):
        """Předkonfigurovaný dotaz bez filtru"""
        stmtWithFilter = stmt.offset(skip).limit(limit)

        dbSet = session.execute(stmtWithFilter)
        result = dbSet.scalars()
        return result

    return resultedFunction

In [54]:
def createEntityByIdGetterSync(DBModel: BaseModel):
    """Předkonfiguruje dotaz do databáze na entitu podle id
    
    Parameters
    ----------
    DBModel : BaseModel
        class representing SQLAlchlemy model - table where record will be found
    options : any
        possible to use joinedload from SQLAlchemy for extending the query (select with join)
    Returns
    -------
    Callable[[AsyncSession, uuid.UUID], Awaitable[DBModel]]
        asynchronous function for query into database
    """
    stmt = select(DBModel)
    def resultedFunction(session, id):
        """Předkonfigurovaný dotaz bez filtru"""
        stmtWithFilter = stmt.filter_by(id=id)

        dbSet = session.execute(stmtWithFilter)
        result = next(dbSet.scalars(), None)
        return result

    return resultedFunction

In [55]:
def create1NGetterSync(ResultedDBModel: BaseModel, foreignKeyName):
    """Vytvori resolver pro relaci 1:N (M:N)
       Dotazujeme se na cizi entitu, ktera obsahuje foreingKey s patricnou hodnotou
       Ocekavanym navratem je vektor hodnot
    Parameters
    ----------
    ResultedDBModel : BaseModel
        class representing a model (SQLAlchemy) for result
    foreignKeyName : str
        name of foreignkey used for filtering entities
    Returns
    -------
    Callable[[AsyncSession, uuid.UUID], Awaitable[List[BaseModel]]]
        asynchronous function representing the resolver for 1:N (or N:M) relations on particular entity
    """
    stmt = select(ResultedDBModel)

    def resultedFunction(session: AsyncSession, id: uuid.UUID):
        """Predkonfigurovany dotaz bez filtru
        
        Parameters
        ----------
        session : AsyncSession
            session for DB (taken from SQLAlchemy)
        id: uuid.UUID
            key value used for foreign key
        Returns
        -------
        List[ResultedDBModel]
            vector of entities (1:N or M:N)
        """
        filterQuery = {foreignKeyName: id}
        stmtWithFilter = stmt.filter_by(**filterQuery)
        dbSet = session.execute(stmtWithFilter)
        result = dbSet.scalars()
        return result

    return resultedFunction
    

In [76]:
resolveMembershipByIdSync = createEntityByIdGetterSync(MembershipModel)
resolveMembershipPageSync = createEntityGetterSync(MembershipModel)

resolveUserByIdSync = createEntityByIdGetterSync(UserModel)
resolveUserPageSync = createEntityGetterSync(UserModel)
resolveMembershipForUserSync = create1NGetterSync(MembershipModel, foreignKeyName='user_id')

resolveGroupByIdSync = createEntityByIdGetterSync(GroupModel)
resolveGroupPageSync = createEntityGetterSync(GroupModel)
resolveMembershipForGroupSync = create1NGetterSync(MembershipModel, foreignKeyName='group_id')

In [77]:
syncSessionMaker = sessionMaker

with syncSessionMaker() as session:
    page = resolveUserPageSync(session, skip=0, limit=10)
    print(page)
    page = list(map(lambda item: {'id': item.id, 'name': item.name, 'surname': item.surname }, page))
    for item in page:
        print('=' * 30)
        print(item['id'], item['name'], item['surname'])
        memberships = resolveMembershipForUser(session, item['id'])
        for m in memberships:
            print(m.group_id)
            #print(m.group.id)


<sqlalchemy.engine.result.ScalarResult object at 0x7efc8660d420>
cc900fb8-6fdb-11ed-9d68-0242ac140007 John Newbie
cc901742-6fdb-11ed-9d68-0242ac140007
cc9017b0-6fdb-11ed-9d68-0242ac140007
cc9017e2-6fdb-11ed-9d68-0242ac140007
cc90127e-6fdb-11ed-9d68-0242ac140007 Julia Green
cc901742-6fdb-11ed-9d68-0242ac140007
82282580-6fdd-11ed-9d68-0242ac140007 John Newbie
82282cd8-6fdd-11ed-9d68-0242ac140007
82282d64-6fdd-11ed-9d68-0242ac140007
82282d96-6fdd-11ed-9d68-0242ac140007
8228294a-6fdd-11ed-9d68-0242ac140007 Julia Green
82282cd8-6fdd-11ed-9d68-0242ac140007
be347056-6fdd-11ed-98ba-0242ac140007 John Newbie
be3475f6-6fdd-11ed-98ba-0242ac140007
be34766e-6fdd-11ed-98ba-0242ac140007
be3476b4-6fdd-11ed-98ba-0242ac140007
be347290-6fdd-11ed-98ba-0242ac140007 Julia Green
be3475f6-6fdd-11ed-98ba-0242ac140007
f10d8b7a-6fdd-11ed-98ba-0242ac140007 John Newbie
f10d911a-6fdd-11ed-98ba-0242ac140007
f10d9318-6fdd-11ed-98ba-0242ac140007
f10d9372-6fdd-11ed-98ba-0242ac140007
f10d8ee0-6fdd-11ed-98ba-0242ac140007 

### Strawberry Models

In [78]:
import strawberry
import typing
from typing import List, Union, Optional
import uuid

def SessionFromInfo(info):
    return info.context['session']

@strawberry.federation.type(keys=["id"], description="""Entity representing a relation between an user and a group""")
class MembershipGQLModel:
    @classmethod
    def resolve_reference(cls, info: strawberry.types.Info, id: strawberry.ID):
        result = resolveMembershipByIdSync(SessionFromInfo(info), id)
        result._type_definition = cls._type_definition # little hack :)
        return result

    @strawberry.field(description="""primary key""")
    def id(self) -> strawberry.ID:
        return self.id

    @strawberry.field(description="""user""")
    def user(self, info: strawberry.types.Info) -> 'UserGQLModel':
        result = resolveUserByIdSync(SessionFromInfo(info), self.user_id)
        return result

    @strawberry.field(description="""group""")
    def group(self, info: strawberry.types.Info) -> 'GroupGQLModel':
        result = resolveGroupByIdSync(SessionFromInfo(info), self.group_id)
        return result

@strawberry.federation.type(keys=["id"], description="""Entity representing a user""")
class UserGQLModel:

    @classmethod
    def resolve_reference(cls, info: strawberry.types.Info, id: strawberry.ID):
        result = resolveUserByIdSync(SessionFromInfo(info), id)
        result._type_definition = cls._type_definition # little hack :)
        return result

    @strawberry.field(description="""Entity primary key""")
    def id(self, info: strawberry.types.Info) -> strawberry.ID:
        return self.id

    @strawberry.field(description="""User's name (like John)""")
    def name(self) -> str:
        return self.name

    @strawberry.field(description="""User's family name (like Obama)""")
    def surname(self) -> str:
        return self.surname

    @strawberry.field(description="""List of groups, where the user is member""")
    def membership(self, info: strawberry.types.Info) -> typing.List['MembershipGQLModel']:
        result = resolveMembershipForUserSync(SessionFromInfo(info), self.id)
        return result

@strawberry.federation.type(keys=["id"], description="""Entity representing a group""")
class GroupGQLModel:

    @classmethod
    def resolve_reference(cls, info: strawberry.types.Info, id: strawberry.ID):
        result = resolveGroupByIdSync(SessionFromInfo(info), id)
        result._type_definition = cls._type_definition # little hack :)
        return result

    @strawberry.field(description="""Entity primary key""")
    def id(self) -> strawberry.ID:
        return self.id

    @strawberry.field(description="""Group's name (like Department of Intelligent Control)""")
    def name(self) -> str:
        return self.name

    @strawberry.field(description="""List of users who are member of the group""")
    async def memberships(self, info: strawberry.types.Info) -> typing.List['MembershipGQLModel']:
        result = resolveMembershipForGroupSync(SessionFromInfo(info), self.id)
        return result  

@strawberry.type(description="""Type for query root""")
class Query:

    @strawberry.field(description="""Returns a hello""")
    def say_hello(self, info: strawberry.types.Info, id: strawberry.ID) -> str:
        result = f'Hello {id}'
        return result
    
    @strawberry.field(description="""Returns a list of users (paged)""")
    def user_page(self, info: strawberry.types.Info, skip: int = 0, limit: int = 10) -> List[UserGQLModel]:
        result = resolveUserPageSync(SessionFromInfo(info), skip, limit)
        return result

    @strawberry.field(description="""Finds an user by their id""")
    def user_by_id(self, info: strawberry.types.Info, id: uuid.UUID) -> Union[UserGQLModel, None]:
        result = resolveUserByIdSync(SessionFromInfo(info), id)
        return result
    
    @strawberry.field(description="""Returns a list of groups (paged)""")
    def group_page(self, info: strawberry.types.Info, skip: int = 0, limit: int = 10) -> List[GroupGQLModel]:
        result = resolveGroupPageSync(SessionFromInfo(info), skip, limit)
        return result

    @strawberry.field(description="""Finds a group by its id""")
    def group_by_id(self, info: strawberry.types.Info, id: uuid.UUID) -> Union[GroupGQLModel, None]:
        result = resolveGroupByIdSync(SessionFromInfo(info), id)
        return result
    


### Strawberry Session Management (class)

In [79]:
from strawberry.asgi import GraphQL

class MyGraphQL(GraphQL):
    """Rozsirena trida zabezpecujici praci se session"""
    async def __call__(self, scope, receive, send):

        syncSessionMaker = sessionMaker
        with syncSessionMaker() as session:
            self._session = session
            self._user = {'id': '?'}
            result = await GraphQL.__call__(self, scope, receive, send)
            return result
    
    async def get_context(self, request, response):
        parentResult = await GraphQL.get_context(self, request, response)
        return {**parentResult, 
            'session': self._session, 
            'asyncSessionMaker': sessionMaker,
            'user': self._user
            }

In [80]:
graphql_app = MyGraphQL(
    strawberry.federation.Schema(Query), 
    graphiql = True,
    allow_queries_via_get = True
)

from fastapi import FastAPI
app = FastAPI()
app.mount("/gql", graphql_app)

# start_api(app, port=9992, runNew=True)
# await start_api(app, port=9992, runNew=True)
start_api(app, port=9992, runNew=True)

[Process 31603] Started 


INFO:     Started server process [31603]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://0.0.0.0:9992 (Press CTRL+C to quit)


INFO:     172.20.0.1:39376 - "POST / HTTP/1.1" 200 OK
INFO:     172.20.0.1:39380 - "POST / HTTP/1.1" 200 OK
INFO:     172.20.0.1:39388 - "POST / HTTP/1.1" 200 OK


## Strawberry Asynchronous

### Strawberry (SQLAlchemy) Resolvers I

In [39]:
from sqlalchemy.future import select

In [40]:
def createEntityGetter(DBModel: BaseModel):
    """Předkonfiguruje dotaz do databáze na vektor entit
    
    Parameters
    ----------
    DBModel : BaseModel
        class representing SQLAlchlemy model - table where record will be found
    Returns
    -------
    Callable[[AsyncSession, int, int], Awaitable[DBModel]]
        asynchronous function for query into database
    """
    
    stmt = select(DBModel)
    
    async def resultedFunction(session, skip, limit):
        """Předkonfigurovaný dotaz bez filtru"""
        stmtWithFilter = stmt.offset(skip).limit(limit)

        dbSet = await session.execute(stmtWithFilter)
        result = dbSet.scalars()
        return result

    return resultedFunction

In [41]:
def createEntityByIdGetter(DBModel: BaseModel):
    """Předkonfiguruje dotaz do databáze na entitu podle id
    
    Parameters
    ----------
    DBModel : BaseModel
        class representing SQLAlchlemy model - table where record will be found
    options : any
        possible to use joinedload from SQLAlchemy for extending the query (select with join)
    Returns
    -------
    Callable[[AsyncSession, uuid.UUID], Awaitable[DBModel]]
        asynchronous function for query into database
    """
    stmt = select(DBModel)
    async def resultedFunction(session, id):
        """Předkonfigurovaný dotaz bez filtru"""
        stmtWithFilter = stmt.filter_by(id=id)

        dbSet = await session.execute(stmtWithFilter)
        result = next(dbSet.scalars(), None)
        return result

    return resultedFunction

In [42]:
def create1NGetter(ResultedDBModel: BaseModel, foreignKeyName):
    """Vytvori resolver pro relaci 1:N (M:N)
       Dotazujeme se na cizi entitu, ktera obsahuje foreingKey s patricnou hodnotou
       Ocekavanym navratem je vektor hodnot
    Parameters
    ----------
    ResultedDBModel : BaseModel
        class representing a model (SQLAlchemy) for result
    foreignKeyName : str
        name of foreignkey used for filtering entities
    Returns
    -------
    Callable[[AsyncSession, uuid.UUID], Awaitable[List[BaseModel]]]
        asynchronous function representing the resolver for 1:N (or N:M) relations on particular entity
    """
    stmt = select(ResultedDBModel)

    async def resultedFunction(session: AsyncSession, id: uuid.UUID):
        """Predkonfigurovany dotaz bez filtru
        
        Parameters
        ----------
        session : AsyncSession
            session for DB (taken from SQLAlchemy)
        id: uuid.UUID
            key value used for foreign key
        Returns
        -------
        List[ResultedDBModel]
            vector of entities (1:N or M:N)
        """
        filterQuery = {foreignKeyName: id}
        stmtWithFilter = stmt.filter_by(**filterQuery)
        dbSet = await session.execute(stmtWithFilter)
        result = dbSet.scalars()
        return result

    return resultedFunction
    

### Strawberry (SQLAlchemy) Resolvers II

In [43]:
resolveMembershipById = createEntityByIdGetter(MembershipModel)
resolveMembershipPage = createEntityGetter(MembershipModel)

resolveUserById = createEntityByIdGetter(UserModel)
resolveUserPage = createEntityGetter(UserModel)
resolveMembershipForUser = create1NGetter(MembershipModel, foreignKeyName='user_id')

resolveGroupById = createEntityByIdGetter(GroupModel)
resolveGroupPage = createEntityGetter(GroupModel)
resolveMembershipForGroup = create1NGetter(MembershipModel, foreignKeyName='group_id')

In [44]:
asyncSessionMaker = sessionMaker
async with asyncSessionMaker() as session:
    page = await resolveUserPage(session, skip=0, limit=10)
    page = list(map(lambda item: {'id': item.id, 'name': item.name, 'surname': item.surname }, page))
    for item in page:
        print('=' * 30)
        print(item['id'], item['name'], item['surname'])
        memberships = await resolveMembershipForUser(session, item['id'])
        for m in memberships:
            print(m.group_id)
            #print(m.group.id)
    await session.commit()


cc900fb8-6fdb-11ed-9d68-0242ac140007 John Newbie
cc901742-6fdb-11ed-9d68-0242ac140007
cc9017b0-6fdb-11ed-9d68-0242ac140007
cc9017e2-6fdb-11ed-9d68-0242ac140007
cc90127e-6fdb-11ed-9d68-0242ac140007 Julia Green
cc901742-6fdb-11ed-9d68-0242ac140007
82282580-6fdd-11ed-9d68-0242ac140007 John Newbie
82282cd8-6fdd-11ed-9d68-0242ac140007
82282d64-6fdd-11ed-9d68-0242ac140007
82282d96-6fdd-11ed-9d68-0242ac140007
8228294a-6fdd-11ed-9d68-0242ac140007 Julia Green
82282cd8-6fdd-11ed-9d68-0242ac140007
be347056-6fdd-11ed-98ba-0242ac140007 John Newbie
be3475f6-6fdd-11ed-98ba-0242ac140007
be34766e-6fdd-11ed-98ba-0242ac140007
be3476b4-6fdd-11ed-98ba-0242ac140007
be347290-6fdd-11ed-98ba-0242ac140007 Julia Green
be3475f6-6fdd-11ed-98ba-0242ac140007
f10d8b7a-6fdd-11ed-98ba-0242ac140007 John Newbie
f10d911a-6fdd-11ed-98ba-0242ac140007
f10d9318-6fdd-11ed-98ba-0242ac140007
f10d9372-6fdd-11ed-98ba-0242ac140007
f10d8ee0-6fdd-11ed-98ba-0242ac140007 Julia Green
f10d911a-6fdd-11ed-98ba-0242ac140007
0f5e5f8c-6fde-11

### Strawberry Models

In [45]:
import strawberry
import typing
from typing import List, Union, Optional
import uuid

def AsyncSessionFromInfo(info):
    return info.context['session']

@strawberry.federation.type(keys=["id"], description="""Entity representing a relation between an user and a group""")
class MembershipGQLModel:
    @classmethod
    async def resolve_reference(cls, info: strawberry.types.Info, id: strawberry.ID):
        result = await resolveMembershipById(AsyncSessionFromInfo(info), id)
        result._type_definition = cls._type_definition # little hack :)
        return result

    @strawberry.field(description="""primary key""")
    def id(self) -> strawberry.ID:
        return self.id

    @strawberry.field(description="""user""")
    async def user(self, info: strawberry.types.Info) -> 'UserGQLModel':
        result = await resolveUserById(AsyncSessionFromInfo(info), self.user_id)
        return result

    @strawberry.field(description="""group""")
    async def group(self, info: strawberry.types.Info) -> 'GroupGQLModel':
        result = await resolveGroupById(AsyncSessionFromInfo(info), self.group_id)
        return result

@strawberry.federation.type(keys=["id"], description="""Entity representing a user""")
class UserGQLModel:

    @classmethod
    async def resolve_reference(cls, info: strawberry.types.Info, id: strawberry.ID):
        result = await resolveUserById(AsyncSessionFromInfo(info), id)
        result._type_definition = cls._type_definition # little hack :)
        return result

    @strawberry.field(description="""Entity primary key""")
    def id(self, info: strawberry.types.Info) -> strawberry.ID:
        return self.id

    @strawberry.field(description="""User's name (like John)""")
    def name(self) -> str:
        return self.name

    @strawberry.field(description="""User's family name (like Obama)""")
    def surname(self) -> str:
        return self.surname

    @strawberry.field(description="""List of groups, where the user is member""")
    async def membership(self, info: strawberry.types.Info) -> typing.List['MembershipGQLModel']:
        result = await resolveMembershipForUser(AsyncSessionFromInfo(info), self.id)
        return result

@strawberry.federation.type(keys=["id"], description="""Entity representing a group""")
class GroupGQLModel:

    @classmethod
    async def resolve_reference(cls, info: strawberry.types.Info, id: strawberry.ID):
        result = await resolveGroupById(AsyncSessionFromInfo(info), id)
        result._type_definition = cls._type_definition # little hack :)
        return result

    @strawberry.field(description="""Entity primary key""")
    def id(self) -> strawberry.ID:
        return self.id

    @strawberry.field(description="""Group's name (like Department of Intelligent Control)""")
    def name(self) -> str:
        return self.name

    @strawberry.field(description="""List of users who are member of the group""")
    async def memberships(self, info: strawberry.types.Info) -> typing.List['MembershipGQLModel']:
        result = await resolveMembershipForGroup(AsyncSessionFromInfo(info), self.id)
        return result  

@strawberry.type(description="""Type for query root""")
class Query:

    @strawberry.field(description="""Returns a hello""")
    async def say_hello(self, info: strawberry.types.Info, id: strawberry.ID) -> str:
        result = f'Hello {id}'
        return result
    
    @strawberry.field(description="""Returns a list of users (paged)""")
    async def user_page(self, info: strawberry.types.Info, skip: int = 0, limit: int = 10) -> List[UserGQLModel]:
        result = await resolveUserPage(AsyncSessionFromInfo(info), skip, limit)
        return result

    @strawberry.field(description="""Finds an user by their id""")
    async def user_by_id(self, info: strawberry.types.Info, id: uuid.UUID) -> Union[UserGQLModel, None]:
        result = await resolveUserById(AsyncSessionFromInfo(info), id)
        return result
    
    @strawberry.field(description="""Returns a list of groups (paged)""")
    async def group_page(self, info: strawberry.types.Info, skip: int = 0, limit: int = 10) -> List[GroupGQLModel]:
        result = await resolveGroupPage(AsyncSessionFromInfo(info), skip, limit)
        return result

    @strawberry.field(description="""Finds a group by its id""")
    async def group_by_id(self, info: strawberry.types.Info, id: uuid.UUID) -> Union[GroupGQLModel, None]:
        result = await resolveGroupById(AsyncSessionFromInfo(info), id)
        return result
    


### Strawberry Session Management (class)

In [46]:
from strawberry.asgi import GraphQL

class MyGraphQL(GraphQL):
    """Rozsirena trida zabezpecujici praci se session"""
    async def __call__(self, scope, receive, send):

        asyncSessionMaker = sessionMaker
        async with asyncSessionMaker() as session:
            self._session = session
            self._user = {'id': '?'}
            print('in')
            try:
                result = await GraphQL.__call__(self, scope, receive, send)
            except:
                pass
            print('out')
            return result
    
    async def get_context(self, request, response):
        parentResult = await GraphQL.get_context(self, request, response)
        return {**parentResult, 
            'session': self._session, 
            'asyncSessionMaker': sessionMaker,
            'user': self._user
            }

In [48]:
graphql_app = MyGraphQL(
    strawberry.federation.Schema(Query), 
    graphiql = True,
    allow_queries_via_get = True
)

from fastapi import FastAPI
app = FastAPI()
app.mount("/gql", graphql_app)

# start_api(app, port=9992, runNew=True)
# await start_api(app, port=9992, runNew=True)
start_api(app, port=9992, runNew=True)

[Process 26111] Started 


INFO:     Started server process [26111]
INFO:     Waiting for application startup.
INFO:     Application startup complete.
INFO:     Uvicorn running on http://0.0.0.0:9992 (Press CTRL+C to quit)
INFO:     Shutting down
INFO:     Waiting for application shutdown.
INFO:     Application shutdown complete.
INFO:     Finished server process [26111]


[Process 26111] terminating


In [49]:
start_api(app, port=9992, runNew=False)