Skip to content

Commit

Permalink
Merge pull request #16 from Avivsalem/staging
Browse files Browse the repository at this point in the history
Staging
  • Loading branch information
Avivsalem committed Dec 11, 2023
2 parents fedf3e7 + 98b46ea commit 8f73407
Show file tree
Hide file tree
Showing 7 changed files with 127 additions and 32 deletions.
19 changes: 9 additions & 10 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,17 +26,17 @@ $ pip install fastmessage
## Examples

```python
from fastmessage import FastMessage
from fastmessage import FastMessage, OtherMethodOutput
from messageflux.iodevices.rabbitmq import RabbitMQInputDeviceManager, RabbitMQOutputDeviceManager

fm = FastMessage()


@fm.map(output_device='next_year') # this sends its outputs to 'next_year' method
@fm.map()
def hello(name: str, birthYear: int):
age = 2023 - birthYear
print(f'Hello {name}. your age is {age}')
return dict(age=age)
return OtherMethodOutput(next_year, age=age) # this sends its output to 'next_year' method


@fm.map()
Expand All @@ -52,22 +52,20 @@ if __name__ == "__main__":
output_device_manager = RabbitMQOutputDeviceManager(hosts='my.rabbit.host',
user='username',
password='password')

service = fm.create_service(input_device_manager=input_device_manager,
output_device_manager=output_device_manager)
output_device_manager=output_device_manager)
service.start() # this runs the PipelineService and blocks
```

This example shows two methods: ```hello``` and ```next_year```, each listening on its own queue
This example shows two methods: ```hello``` and ```next_year```, each listening on its own queue
(with the same name)

the ```hello``` method is decorated with ```output_device='next_year'``` which means its output is directed to the
```next_year``` device (and the corrosponding method)

the ```__main__``` creates an input and output device managers (```RabbitMQ``` in this case), and starts the service
the ```__main__``` creates an input and output device managers (```RabbitMQ``` in this case), and starts the service
with these devices.

every message that is sent to the ```hello``` queue should have the following format:

```json
{
"name": "john",
Expand All @@ -76,6 +74,7 @@ every message that is sent to the ```hello``` queue should have the following fo
```

in that case the process will print (in 2023...):

```
Hello john. your age is 24
next year you will be 25
Expand Down
61 changes: 44 additions & 17 deletions docs/index.md
Original file line number Diff line number Diff line change
Expand Up @@ -26,22 +26,22 @@ fm = FastMessage(default_output_device='output')

@fm.map()
def do_something(x: int, y: str):
pass # do something with x and y
pass # do something with x and y


@fm.map()
async def do_something_async(x: int, y: str):
pass # do something with x and y asynchronously
pass # do something with x and y asynchronously


class SomeModel(BaseModel):
x: int
y: str
x: int
y: str


@fm.map()
def do_something_else(m: SomeModel, a: int):
return "some_value" # do somthing with m and a
return "some_value" # do somthing with m and a

```

Expand Down Expand Up @@ -104,8 +104,9 @@ There are special types which you can annotate the arguments for the callback wi
from. useful for registering the same callback for several input devices
* ```Message``` - arguments annotated with this type will receive the raw message which came from the device
* ```MessageBundle``` - arguments annotated with this type will receive the complete MessageBundle (with device headers)
* ```MethodValidator``` - argument of this type, will receive an object that can help validate return values for other methods
Notice that arguments annotated with these types MUST NOT have default values (Since they always have values).
* ```MethodValidator``` - argument of this type, will receive an object that can help validate return values for other
methods
Notice that arguments annotated with these types MUST NOT have default values (Since they always have values).

```python

Expand All @@ -118,20 +119,22 @@ fm = FastMessage()

@fm.map(input_device='some_queue')
def do_something(i: InputDeviceName, m: Message, mb: MessageBundle, x: int):
# i will be 'some_queue'
# m will be the message that arrived
# mb will be the MessageBundle that arrived
# x will be the serialized value of the message
pass # do something
# i will be 'some_queue'
# m will be the message that arrived
# mb will be the MessageBundle that arrived
# x will be the serialized value of the message
pass # do something


@fm.map()
def func1(mv: MethodValidator):
yield mv.validate_and_return(func2, x=3, y="hello") # this will succeed
yield mv.validate_and_return(func2, x=4) # this will raise MethodValidationError because y param is required but missing
yield mv.validate_and_return(func2, x=3, y="hello") # this will succeed
yield mv.validate_and_return(func2,
x=4) # this will raise MethodValidationError because y param is required but missing


@fm.map()
def func2(x:int, y:str):
def func2(x: int, y: str):
pass
```

Expand Down Expand Up @@ -178,7 +181,31 @@ fm = FastMessage()

@fm.map(input_device='some_queue', output_device='default_output_device')
def do_something(x: int):
return CustomOutput(value=1,
output_device='other_output_device') # this will send the value 1 to 'other_output_device' instead of the default
return CustomOutput(value=1,
output_device='other_output_device') # this will send the value 1 to 'other_output_device' instead of the default
```

### Returning Result to another method

You can make the function return a result to another mapped method, while validating its values BEFORE sending the
output to the destination queue

you do this by returning ```OtherMethodOutput``` class, that receives the callable as its first parameter, and the
arguments as kwargs:

```python
from fastmessage import FastMessage, OtherMethodOutput

fm = FastMessage()


@fm.map()
def func1():
yield OtherMethodOutput(func2, x=3, y="hello") # this will succeed
yield OtherMethodOutput(func2, x=4) # this will raise MethodValidationError because y param is required but missing


@fm.map()
def func2(x: int, y: str):
pass
```
1 change: 1 addition & 0 deletions fastmessage/__init__.py
Original file line number Diff line number Diff line change
@@ -1,5 +1,6 @@
from .common import (
CustomOutput,
OtherMethodOutput,
InputDeviceName,
MultipleReturnValues,
)
Expand Down
10 changes: 8 additions & 2 deletions fastmessage/callable_wrapper.py
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@
from pydantic.config import get_config
from pydantic.typing import get_all_type_hints

from fastmessage.common import CustomOutput, InputDeviceName, MultipleReturnValues
from fastmessage.common import CustomOutput, InputDeviceName, MultipleReturnValues, OtherMethodOutput
from fastmessage.common import _CALLABLE_TYPE, get_callable_name, _logger
from fastmessage.exceptions import NotAllowedParamKindException, SpecialDefaultValueException
from fastmessage.method_validator import MethodValidator
Expand Down Expand Up @@ -56,6 +56,7 @@ def __init__(self, *,
self._callable = wrapped_callable
self._input_device_name = input_device_name
self._output_device_name = output_device_name
self._method_validator = MethodValidator(self._fastmessage_handler)

self._callable_analysis = self._analyze_callable(self._callable)
self._model: Type[BaseModel] = self._create_model(model_name=self._get_model_name(),
Expand Down Expand Up @@ -179,7 +180,7 @@ def __call__(self,
elif param_info.annotation is Message:
kwargs[param_name] = message_bundle.message
elif param_info.annotation is MethodValidator:
kwargs[param_name] = MethodValidator(self._fastmessage_handler)
kwargs[param_name] = self._method_validator

model: BaseModel = self._model.parse_raw(message_bundle.message.bytes)
kwargs.update(dict(model))
Expand Down Expand Up @@ -211,6 +212,11 @@ def _get_pipeline_results(self,
elif isinstance(value, CustomOutput):
return self._get_pipeline_results(value=value.value,
default_output_device=value.output_device)
elif isinstance(value, OtherMethodOutput):
custom_output = self._method_validator.validate_and_return(value.method, **value.kwargs)

return self._get_pipeline_results(value=custom_output.value,
default_output_device=custom_output.output_device)
else:
pipeline_result = self._get_single_pipeline_result(value=value,
output_device=default_output_device)
Expand Down
12 changes: 11 additions & 1 deletion fastmessage/common.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import logging
from dataclasses import dataclass
from typing import Callable, Any, TypeVar
from typing import Callable, Any, TypeVar, Union

from fastmessage.exceptions import UnnamedCallableException

Expand Down Expand Up @@ -44,3 +44,13 @@ class CustomOutput:
"""
output_device: str
value: Any


class OtherMethodOutput:
"""
a result that contains the other method to send the result to
"""

def __init__(self, method: Union[str, Callable], **kwargs):
self.method = method
self.kwargs = kwargs
2 changes: 1 addition & 1 deletion fastmessage/fastmessage_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ def __init__(self,
default_output_device: Optional[str] = None,
validation_error_handler: Optional[Callable[
[InputDevice, MessageBundle, ValidationError],
Optional[Union[PipelineResult, List[PipelineResult]]]]] = None):
Optional[Union[PipelineResult, Iterable[PipelineResult]]]]] = None):
"""
:param default_output_device: an optional default output device to send callback results to,
Expand Down
54 changes: 53 additions & 1 deletion tests/method_validation_test.py
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
import pytest

from fastmessage import FastMessage, MissingCallbackException
from fastmessage import FastMessage, MissingCallbackException, OtherMethodOutput
from fastmessage.exceptions import MethodValidationError
from fastmessage.method_validator import MethodValidator
from messageflux.iodevices.base.common import MessageBundle, Message
Expand Down Expand Up @@ -31,6 +31,30 @@ def func_output(x: int, y: str):
assert result[0].message_bundle.message.bytes == b'"Success: x=3, y=hello"'


def test_by_othermethodoutput():
fm: FastMessage = FastMessage()

@fm.map()
def func_input():
return OtherMethodOutput(func_output, x=3, y="hello")

@fm.map(input_device='func2_device', output_device='output')
def func_output(x: int, y: str):
return f"Success: x={x}, y={y}"

result = fm.handle_message(FakeInputDevice('func_input'), MessageBundle(message=Message(data=b'{"y": 10}')))
assert result is not None
result = list(result)
assert len(result) == 1
assert result[0].output_device_name == "func2_device"

result = fm.handle_message(FakeInputDevice(result[0].output_device_name), result[0].message_bundle)
assert result is not None
result = list(result)
assert len(result) == 1
assert result[0].message_bundle.message.bytes == b'"Success: x=3, y=hello"'


def test_by_input_device_name():
fm: FastMessage = FastMessage()

Expand Down Expand Up @@ -70,15 +94,43 @@ def func_output(x: int, y: str):
_ = fm.handle_message(FakeInputDevice('func_input'), MessageBundle(message=Message(data=b'{"y": 10}')))


def test_validation_error_by_output():
fm: FastMessage = FastMessage()

@fm.map()
def func_input():
return OtherMethodOutput(func_output, x=3)

@fm.map(input_device='func2_device', output_device='output')
def func_output(x: int, y: str):
return f"Success: x={x}, y={y}"

with pytest.raises(MethodValidationError):
_ = fm.handle_message(FakeInputDevice('func_input'), MessageBundle(message=Message(data=b'{"y": 10}')))


def test_missing_callback():
fm: FastMessage = FastMessage()

@fm.map()
def func_input(method_validator: MethodValidator):
return method_validator.validate_and_return(func_output, x=3)

# notice there's no mapping decorator here...
def func_output(x: int, y: str):
return f"Success: x={x}, y={y}"

with pytest.raises(MissingCallbackException):
_ = fm.handle_message(FakeInputDevice('func_input'), MessageBundle(message=Message(data=b'{"y": 10}')))


def test_missing_callback_by_output():
fm: FastMessage = FastMessage()

@fm.map()
def func_input():
return OtherMethodOutput(func_output, x=3)

# notice there's no mapping decorator here...
def func_output(x: int, y: str):
return f"Success: x={x}, y={y}"

0 comments on commit 8f73407

Please sign in to comment.