In [1]:
from pydiatr.handler import AbstractHandler, AbstractRequest, AbstractResponse
from pydiatr.registry import Registry

In [8]:
registry = Registry()

In [9]:
class CreateUserRequest(AbstractRequest):
    username: str
    email: str

class CreateUserResponse(AbstractResponse):
    success: bool

In [10]:
@registry.decorate_handler
class CreateUserHandler(AbstractHandler[CreateUserRequest, CreateUserResponse]):

    async def handle(self, request: CreateUserRequest) -> CreateUserResponse:
        print(f"Creating user: {request.username} with email: {request.email}")
        return CreateUserResponse(success=True)
    
response = await registry.dispatch(
    CreateUserRequest(
        username="test", 
        email="test@test.com"
    )
)
print(response)

Creating user: test with email: test@test.com
success=True


In [11]:
@registry.decorate_handler
class CreateUserWithArgs(AbstractHandler[CreateUserRequest, CreateUserResponse]):

    async def handle(self, request: CreateUserRequest, arg1: str) -> CreateUserResponse:
        print(f"Creating user: {request.username} with email: {request.email} and arg1: {arg1}")
        return CreateUserResponse(success=True)
    
response = await registry.dispatch(
    CreateUserRequest(
        username="test", 
        email="test@test.com"
    ),
    "ARG1"
)
print(response)

Creating user: test with email: test@test.com and arg1: ARG1
success=True


In [13]:
@registry.decorate_handler
class CreateUserWithKwargs(AbstractHandler[CreateUserRequest, CreateUserResponse]):

    async def handle(self, request: CreateUserRequest, arg1: str) -> CreateUserResponse:
        print(f"Creating user: {request.username} with email: {request.email} and arg1: {arg1}")
        return CreateUserResponse(success=True)
    
response = await registry.dispatch(
    CreateUserRequest(
        username="test", 
        email="test@test.com"
    ),
    arg1="KWARG1"
)
print(response)

Creating user: test with email: test@test.com and arg1: KWARG1
success=True
