Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

deserialization of Record objects incomplete when using rpc calls #151

Closed
tarbaig opened this issue May 17, 2021 · 0 comments · Fixed by #152
Closed

deserialization of Record objects incomplete when using rpc calls #151

tarbaig opened this issue May 17, 2021 · 0 comments · Fixed by #152

Comments

@tarbaig
Copy link
Contributor

tarbaig commented May 17, 2021

Version Current master

Steps to reproduce

#adding.py
# based on examples/agent.py
import faust
from faust import Topic

class Add(faust.Record, serializer='json'):
    a: int
    b: int

class Result(faust.Record,serializer='json'):
    r : int
    summands : Add

class Settings:
    FAUST_KAFKA_BROCKER = 'kafka://localhost:9093'
    KAFKA_BROCKERS = ['localhost:9093']

class EcoTopic(Topic):
    def __init__(self, *args, **kwargs) -> None:
        """
        The default replication for Faust topics is broken, this class changes the default
        replication from 0 to 1 and should be used as replacement for Fausts own topics until this is fixed in the
        library
        """
        if kwargs.get("replicas") == 0:
            kwargs["replicas"] = 1
        super().__init__(*args, **kwargs)

app = faust.App(
    'add_app',
    broker=Settings.FAUST_KAFKA_BROCKER,
    Topic=EcoTopic
)

topic = app.topic('adding', value_type=Add)

@app.agent(topic)
async def adding(stream):
    async for value in stream:
        yield Result(value.a + value.b,value)
#send_to_agent.py
import faust
from addition.addition_app import Add,Result, adding

class Settings:
    FAUST_KAFKA_BROCKER = 'kafka://localhost:9093'
    KAFKA_BROCKERS = ['localhost:9093']


async def sum_ask(a, b) -> None:
    add_object = Add(a=a, b=b)
    result = (await adding.ask(add_object))
    print(f"result={result}")
    return result

if __name__ == '__main__':
    r = compute_sum_ask(0,0)
    assert isinstance(r, Result)

Expected behavior

Return value should be of class Result

Actual behavior

Return value is plain json/dict

##Fix
When fulfilling reply promises the return value should be wrapped in a call to maybe_model

##Details
Faust uses some internal store to Register all created Subclasses of Record, when deserializing some special fields in the
deserialized data are used to detect that this should be actually converted to a Record. The corresponding logic is in maye_model.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging a pull request may close this issue.

1 participant