Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Support request/reply pattern #94

Closed
adrianhopebailie opened this issue Oct 11, 2018 · 65 comments
Closed

Support request/reply pattern #94

adrianhopebailie opened this issue Oct 11, 2018 · 65 comments
Labels
keep-open Prevents stale bot from closing this issue or PR

Comments

@adrianhopebailie
Copy link

Somewhat related to:

It would be useful if it was possible to describe messages that are explicitly requests and responses and for the auto-generated code to deal with creating the appropriate ephemeral queues and performing matching on the correlationid.

The pattern that seems most common when using a pub-sub message broker is for the requestor to create a single use topic and provide this address as the 'reply-to' header in the request message. The requestor also provides a correlationId which is echoed back to help match requests and replies.

However, when using a transport like WebSockets it would be necessary to do additional work in the generated code to match requests and responses and also deal with message state and expiry.

Ideally this should be abstracted away from API designers who may prefer to define their API in a manner similar to Open API as follows (see the Responses Object - https://github.com/OAI/OpenAPI-Specification/blob/master/versions/3.0.2.md#responsesObject) :

topics:
  UPDATEARTICLE:
    request:
      headers:
        type: object
        properties:
          ...
      payload:
        type: object
        properties:
          ...
     reply:
       match-on: {$response.headers.type}
       responses:
         'success':
           headers:
             type: object
             properties:
               ...
           payload:
             type: object
             properties:
               ...
         'fail':
           headers:
             type: object
             properties:
               ...
           payload:
             type: object
             properties:
               ...

I've used the topics object but maybe a new operations object would be more appropriate?

One of the challenges here is the flexibility of having multiple possible responses but also defining the logic for identifying what response has actually been received. (Open API matches on HTTP response code so that's pretty simple).

In my example I just provide a matching rule but this could probably be a lot more flexible.

@fmvilas
Copy link
Member

fmvilas commented Oct 24, 2018

Thanks for the suggestion @adrianhopebailie. I think we'll need to somehow support the request/response pattern in the future. We need to figure out how to properly design it so it ages well in the spec. Sorry for the short answer, this feature can't be added quickly without adding so much complexity to the current spec status. I'll have a look in detail as soon as I have time.

@SensibleWood
Copy link
Contributor

+1 to this from me. Common pattern on messaging platforms. Done many such implementations on WebSphere MQ in a former life. As mentioned above, #78 would be fundamental to this. Something in the style of a Callback Object would be a good approach.

@mpe85
Copy link

mpe85 commented Feb 26, 2019

Also +1
We are interested in support for the new request/response pattern introduced in MQTT 5.

@fmvilas
Copy link
Member

fmvilas commented Feb 26, 2019

Interesting, I haven't heard about MQTT 5. Going to take a look. Thanks for commenting!

@SensibleWood
Copy link
Contributor

Doing the investigation on this for a post v2.0.0 solution

@SensibleWood SensibleWood self-assigned this Mar 7, 2019
@prdatur
Copy link

prdatur commented Jun 20, 2019

Hi there, we are starting to use async communication more and more and we have both variants in use, the basic request/response pattern where we send an responseEvent parameter like the mqtt corrId, on incoming data it will do stuff and send the wanted information back to the responseEvent.

Also we use the publish/subscribe pattern were we need it.

So we are very intereseted to get this into the definition.

For me the most flexible way would be just a combination of

subscribe:
publish:

I think a flexible and good way would be to name subscribe as mentioned "request"
Within the request there will be a response: which is also defined as a operation obect

https://www.asyncapi.com/docs/specifications/2.0.0-rc1/#operationObject

For example:

channels:
  get-last-messages:
    request:
      summary: Get the latest messages from the user
      message:
        payload:
          responseEvent:
            type: string
      response:
        message:
          payload:
            messages:
              type: array
              description: A list of messages
              items:
                type: string
                description: The message.
    

If we assume errors / success status, we could just create a template by our own defintions which we can $ref in a, for example, called "status" property

status:
 $ref: '#/components/schemas/responseStatus'
content:
  type: object
  properties:
    resultData:
      type: string

With the use of the operation object we would not get into trouble in my opinion, as the definition should cover all feature variants of communication, so the operation object will be mutated and extended in future versions of asyncapi and that will extend also the response type.

What do you think?

@jdall
Copy link

jdall commented Sep 12, 2019

The suggestion of @prdatur matches pretty well with our thought (except we would expect the response/reply object on the same structural level as the request object).
One thing that we think is also required, is to state whether the application should act as a request/reply producer or consumer - i.e. whether the application would send requests, or would receive requests and send replies (such detail could maybe be specified in the binding object for the channel)

@ig0rsky
Copy link

ig0rsky commented Nov 6, 2019

hey, any update on this?

@fmvilas
Copy link
Member

fmvilas commented Nov 8, 2019

It's on the list for the next minor versions. Any research on how this could be done is appreciated.

@basickarl
Copy link

Interested party here also for request/response pattern!

@basickarl
Copy link

@fmvilas Any draft available for a sneak peek?

@fmvilas
Copy link
Member

fmvilas commented Dec 15, 2019

Hey @basickarl! There's no literature on this yet. Anything you can provide as a starting point would be greatly appreciated.

@jruizaranguren
Copy link

Request-Response in the context of MQTT 5.

  • There are request and response topics.
  • Both sides are subscribed to those topics.
  • Requests carry information about the response topics.
  • You can add correlation data so response can be bound to request.

@basickarl
Copy link

basickarl commented Jan 8, 2020

@jruizaranguren

Slightly different scenarios. The one in the MQTT has a broker. Client <-> Broker <-> Client. I'm specifically interested in the Client <-> Client scenario, e.g. for WebSocket connections from web clients to servers. Also for public facing WebSocket API's.

@prdatur
Copy link

prdatur commented Jan 10, 2020

@fmvilas based on your first examples which is a way we can go.
https://github.com/asyncapi/asyncapi/blob/b21cf1d854994a8488c6575ae0dda461b51eb1d4/examples/2.0.0/rpc-server.yml

I have one question or one change request.
Was is meant with '{queue}' I know that this is the response queue which is defined in amqp within the replyTo header but in the example we would only have one response queue because the string '{queue}' can only occure once.

We should just name the reply queue within the request definition like adding a new field before "correlationId" which is named "replyTo" of type string, this string can hold an existing channel which defines the response data.

Another way would be a variable as the channel, in this case a generator or user will not be confused to check wether the channel is just a response or actually a channel which the client can subscribe.
For example defining the same way as within your example

channels:
   '{$rpc_queue.publish.message.header#/replyTo}':
      subscribe:
         ...

So this channel would define the response channel which is located within the rpc_queue message header field "replyTo". The definition will be a subscribe operation.

Third option is to define that if publish and subscribe exists within the same channel declares that this channel is a request / response operation.

A self subscribing channel where the software will publish data would be a bit confusing :) So we can use this for the request response definition.
Additional we could create the "replyTo" field to let generates know where to find the reply queue name.

What do you mean?

@fmvilas
Copy link
Member

fmvilas commented Jan 20, 2020

I think that opening the door for expressions in the channel name would open a door for potential complexity but we will explore it.

...because the string '{queue}' can only occure once.

{queue} is a channel item variable, meaning that it can be anything. It doesn't mean that the queue should be exactly {queue}.

Does it make sense?

@prdatur
Copy link

prdatur commented Jan 20, 2020

{queue} is a channel item variable, meaning that it can be anything. It doesn't mean that the queue should be exactly {queue}.

I understood that this is a variable. While I was writing an example what I didn't understand, I got your point.
for example we can just write multiple '{queue}' or better for each a name like {sendSumResponseQueue} which then will be mostly the same definition like the one in your example.

However with this, we currently have two problems

First, the response is not linked to any operation.

After I used and wrote the word "operation" I might got a solution, why not link the operationId's... they must be unique per definition, so we could use something like responseOf: {operationId}

Second, linking such a response to a publish operation we are not able to know for which message the response is

Let me explain.
Within the issue #303 I made an example. This is exactly what I mean here.
Many systems just connect to one queue/exchange and publish an own structure, mostly something like:

{
   "action": "do-something",
   "data": "...."
}

This means, a schema will look like:

publish:
  message:
    oneOf:
    - $ref: '#/components/messages/message1'
    - $ref: '#/components/messages/message2'
    - $ref: '#/components/messages/message3'

If we have only one response channel, we will not know if the defined response is for message 1, 2 or 3. Also If we define 3 response messages, we will not know if the response message 1 will be published for message 1, 2 or 3.

So with a simple change to not link the response channel to publish operation, we should link a response to a message.
In that way we never get in trouble where something can be published and we did not know what we get returned.
If we link it to a message we really know this published message will us return message XY.

So the solution with channel expression would be one way.
Other solutions:

  • Add messageId same ways as the operationId, then it would be possible to define within the message object something like responseTo: {messageId}
  • Just add "response" keyword to a message object which is another messageObject, infinity loop must be adressed, Would be not the ideal solution because we have no definition for the response channel.

I would prefer the expression, because it will not be a big change within the specification, but yes it will be more complex also for generators.
Next would be the messageId and responseTo or responseOf solution. Mostly there will be just one channel we need to describe, because a system will mostly response in the same way as before.
So there will be just a bunch of ref's within message.oneOf...

Example of messageId solution:

components:
  messages:
    responseSum:
      payload:
        type: object
        properties:
          result:
            type: number
            examples:
              - 7
    responseDevide:
      payload:
        type: object
        properties:
          result:
            type: float
            examples:
              - 7.5
    operationSum:
      bindings:
        amqp:
          replyTo:
            type: string
      responseTo: '#/components/messages/responseSum'
      payload:
        type: object
        properties:
          operation:
            type: string
            enum:
            - sum
          numbers:
            type: array
            items:
              type: number
            examples:
              - [4,3]
    operationDevide:
      bindings:
        amqp:
          replyTo:
            type: string
      responseTo: '#/components/messages/responseDevide'
      payload:
        type: object
        properties:
          operation:
            type: string
            enum:
            - devide
          numbers:
            type: array
            items:
              type: number
            examples:
              - [15,2]

channels:
  '{responseQueue}':
    parameters:
      responseQueue:
        schema:
          type: string
          pattern: '^amq\\.gen\\-.+$'
    bindings:
      amqp:
        is: queue
        queue:
          exclusive: true
    subscribe:
      bindings:
        amqp:
          ack: true
      message:
        oneOf:
        - $ref: '#/components/messages/responseSum'
        - $ref: '#/components/messages/responseDevide'

  rpc_queue:
    bindings:
      amqp:
        is: queue
        queue:
          durable: false
    publish:
      message:
        oneOf:
        - $ref: '#/components/messages/operationSum'
        - $ref: '#/components/messages/operationDevide' 

Example for using just response within the message

components:
  messages:
    responseSum:
      payload:
        type: object
        properties:
          result:
            type: number
            examples:
              - 7
    responseDevide:
      payload:
        type: object
        properties:
          result:
            type: float
            examples:
              - 7.5

    operationSum:
      bindings:
        amqp:
          replyTo:
            type: string
      response:
         $ref: '#/components/messages/responseSum'
      payload:
        type: object
        properties:
          operation:
            type: string
            enum:
            - sum
          numbers:
            type: array
            items:
              type: number
            examples:
              - [4,3]
    operationDevide:
      bindings:
        amqp:
          replyTo:
            type: string
      response:
         $ref: '#/components/messages/responseDevide'
      payload:
        type: object
        properties:
          operation:
            type: string
            enum:
            - devide
          numbers:
            type: array
            items:
              type: number
            examples:
              - [15,2]

channels:
  rpc_queue:
    bindings:
      amqp:
        is: queue
        queue:
          durable: false
    publish:
      message:
        oneOf:
        - $ref: '#/components/messages/operationSum'
        - $ref: '#/components/messages/operationDevide' 

Problem we have not defined the queue for the response.

The problem is, that the request/response is really needed by us now. So it would be nice if we can get this finished. :)

Anyway, as a real quick answer. Yes your explanation makes sense but we have no information which message will get response if someone publish to rpc_queue :)

@prdatur
Copy link

prdatur commented Jan 21, 2020

After a discussion with a co-worker, we will just use an x-responses property within the publich.message section which will include one or oneOf references to the response message (not the channel) example:

components:
  messages:
    responseMessage1:
      ...
    responseMessage2:
      ...
channels:
  '{rQueue}':
    ...
    subscribe:
      message:
        oneOf:
        - $ref: '#/components/messages/responseMessage1'
        - $ref: '#/components/messages/responseMessage2'
  pubQueue:
    ...
    publish:
      ...
      message:
        ...
        x-responses:
          $ref: '#/components/messages/responseMessage1'

Or if we have multiple responses like the example with ack, status, response

  pubQueue:
    ...
    publish:
      ...
      message:
        ...
        x-responses:
          oneOf:
          - $ref: '#/components/messages/responseMessage1'

This allows us to define all required things (channel, messages) and just link the message to the response message so the documentator for example place a link below one of the accepeted messages to the response message.

Maybe we can use this idea as the solution and just remove the x- part.

@basickarl
Copy link

basickarl commented Mar 4, 2020

Me and a co-worker had a chat and we would like to support several different actions on a "channel". for channel mychannel/{myChannelId} in our case we would like a publish/subscribe (with unsubscribe and acks when subscribing and unsubscribing, we had a look at mqtt 5.0 and followed their example), request/response and command (command doesn't expect a response).

If you guys head over to https://crossbar.io/ (responsible for the WAMP protocol) and look at their fancy gif on the home page a little further down, you'll see a chronological scenario of what we want to accomplish via asyncapi.

An example of how thew asyncapi could be updated to support it:

channels:

  mychannel/{myChannelId}/:

    command: ...
    request: ...
    response: ...
    unsubscribe: ...
    subscribe: ...
    publish: ...

And we can take it a step further and include ACKs (inspired by mqtt 5.0):

channels:

  mychannel/{myChannelId}/:

    command: ...
    command ack: ...
    request: ...
    request ack: ...
    response: ...
    response ack: ...
    unsubscribe: ...
    unsubscribe ack: ...
    subscribe: ...
    subscribe ack: ...
    publish: ...
    publish ack: ...

Again, take a look at the gif at https://crossbar.io/.

Regarding the mqtt 5.0 inspiration, comes from the following:

https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901121
https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901171
https://docs.oasis-open.org/mqtt/mqtt/v5.0/os/mqtt-v5.0-os.html#_Toc3901187

(It feels like asyncapi would have to include these properties if it wants to help support mqtt 5.0?)

I also understand @prdatur approach but would as @fmvilas says let people do their own thing and increase complexity. With the suggestion I proposed it would add on to how asyncapi is already written, add structure and also help conform to mqtt 5.0.

@jonaslagoni
Copy link
Sponsor Member

jonaslagoni commented Mar 4, 2020

{queue} problem

@prdatur cant figure out if this is a current problem you have or a suggestion for solution to request reply 😄

If it is a current problem If it is a current problem is this not something that can be solved at the implementation level i.e. is there a use-case for this property to be present in the specification? If we take template clients they would look at your AsyncAPI file and (dependent on the implementation of course) create a method for your channel `'{responseQueue}'` where you can subscribe a callback (or whatever) and get notified when a response ticks in. The generated client then tries parsing the received data to one of the `oneOf` messages and call your callback with the given message. Then with the use of `correlationId` (or something else) you can cross check and figure out which response has returned?

If it is a suggestion for the request reply, then I gotta say I am more a fan of doing something like @basickarl suggests, at the channel level, which would provide complete clarity what is defined as request and reply in a given channel without having to go through the request operation first 😄

Request reply suggestion

channels:

  mychannel/{myChannelId}/:

    command: ...
    request: ...
    response: ...
    unsubscribe: ...
    subscribe: ...
    publish: ...

@basickarl isn't this a bit too specific for mqtt for it to be in a specification? Many other protocols just have request/reply and pubsub, which is what I see as the commonality between the protocols I have encountered.

However I also like the idea of adding request and reply properties to a channel i.e.

channels:

  mychannel/{myChannelId}/:

    request: ...
    response: ...
    subscribe: ...
    publish: ...

One question i'd like to raise: IF request/reply is not a feature we foresee as a standard feature for all protocols should it really be included in the specification and not just stay in the bindings as a boolean or a request/reply property for those protocols which do support it?

@basickarl
Copy link

@jonaslagoni As stated every protocol implements their own terminology for these things so I guess AsyncAPI will either have to add ALL of these terms to make everyone happy or put their foot down and define terms that everyone should adopt. These terms should however be able to satisfy what you wish to do with each protocol.

MQTT was just an example, and one of the specs in MQTT 5.0 was that when you send a subscribe message than it sends a subscribe acknowledgement back to who sent the subscribe message. If we wished to implement this today and specify it in a document AsyncAPI would fall short.

And answering your last question @jonaslagoni I don't see the harm in adding these things, you don't need to add it to your own AsyncAPI documentation, if you don't use request/response pattern in your API, just don't include it!

@fmvilas fmvilas added this to the AsyncAPI specification 2.1.0 milestone Mar 13, 2020
@GreenRover
Copy link
Collaborator

+1

@basickarl
Copy link

basickarl commented Mar 31, 2020

Would like to say we have forked 4 of the repos from https://github.com/asyncapi and have implemented the following:

command: ...
request: ...
response: ...
unsubscribe: ...
unsubscribe ack: ...
subscribe: ...
subscribe ack: ...
publish: ...
publish ack: ...

It's fufilling our needs pretty well.

@basickarl
Copy link

@basickarl Hi. could you update us, a year after you introduced different operations, how it works for you, how it evolved? I'm super interested as here and there people tend to say that spec should have more operations, but nobody ever did it in practice

Unfortunately I left the company as I was there on a project basis! I do still support this though!

@fmvilas fmvilas added this to the 3.0.0 Release milestone Sep 14, 2021
@Hassen-BENNOUR
Copy link

Hi folks,
Unfortunately it seems to need many time to find the right solution.

We have many amqp request reply communication between services as many people today.
We do it with spring integration and spring cloud stream.
Without support of request reply i think there is no reason or possibility to use asyncapi now 😔

Keep in touch and hope a support for this soon.

So thank you for your amazing stuff.

@autodidaddict
Copy link

Just dropping in to point out that this issue started 4 years ago. If async API is never going to support request/reply, I'd rather know that now rather than investing in a format that will ultimately never suit my needs.

@derberg
Copy link
Member

derberg commented Apr 26, 2022

@autodidaddict thanks for taking the time to share your feedback, not many do.

AsyncAPI Initiative is community-driven, with all needed mechanisms in place that enable anyone to contribute, work on features openly in a transparent environment. Some changes in the spec require a driver (champion), just nobody yet drove it to the end. There was a great attempt by @smarek (#594) but looks like he discontinued the work and we simply need someone else to champion this topic.

The problem is that many people come and say "it is needed" and make some generic statements on "need for request/reply", but nobody really takes time to share in detail their use cases. Without clear and documented use cases, individual maintainers of the spec are unable to pick it up and work on the topic, thus we look forward to contributors.

Some time ago I recorded this simple guide for spec contributors -> https://www.youtube.com/watch?v=QQYyGlMzJCc
Maybe you will find it interesting and feel encouraged to contribute. I'm would be super happy to onboard here anyone how would like to contribute to the spec. Now is a great opportunity as the community started working on 3.0 release of the spec. Feel free to join the next call -> asyncapi/community#330

@GreenRover
Copy link
Collaborator

@derberg There seams not to be much progress / discussions in: #594
May be we should have a dedicated call to get progress here? (But please not at 4PM UTC, at least -1hour/+2hour would match my time zone (CEST) much better)

@jonaslagoni
Copy link
Sponsor Member

jonaslagoni commented May 3, 2022

@derberg There seams not to be much progress / discussions in: #594
May be we should have a dedicated call to get progress here? (But please not at 4PM UTC, at least -1hour/+2hour would match my time zone (CEST) much better)

Of course! What time and date would you suggest? That way we can set up the meeting as all the other meetings for everyone to join.

@jonaslagoni
Copy link
Sponsor Member

Meeting scheduled: asyncapi/community#352

cc @smarek in case you still want to champion it 🙂

@GreenRover
Copy link
Collaborator

GreenRover commented May 16, 2022

collected use cases:

A: Topic based messaging using a broker: Well defined response topic + "correlationId".

There you have a well defined response topic. That could be found in spec.
As well as a "correlationId" in request message(header/body).
The correlationId is a random id generated by the requestor.
And have to copied to the response messages, by the responder.
To let the requestor map response to the request.

B: Topic based messaging using a broker: Per process individual inbox aka. "replyTopic" + "correlationId".

It works mostly the same as option A.
But the response topic is not well known.
The response topic if chosen by the requestor an put as "replyTopic" into message header/body.
The response topic/queue can either be totally random or may have to followed rules specified by the schema.

C: Topic based messaging using a broker: Temporary reply topic for a individual response. Only "replyTopic"

For reach request there will be a dedicated response topic. Otherwise it is the same as option B.
This method is mostly use for request to multi response pattern. Where the requestor don`t know the expected amount of responses.

D: WebSocket without topics: The channel is a tcp connection where flow messages. Only "correlationId"

It works the same as option A.
There are no topics at all. In this case the channel is just a dedicated TCP connection of the websocket.

target to be reached by the spec

  • a human should understand the correlation between request and reply
  • a code generator should by able to generate method templates

results of call from 16. May

Sample for use case A, D

channels:
  user.creation.channel:
    message:
      $ref: 'sample.yaml#/components/messages/createUser'

  user.creation.response.channel:
    message:
      $ref: 'sample.yaml#/components/messages/creationSucessfull'

operations:
  createUser:
    action: send
    channel: user.creation.channel
    description: Creates a user and expects a response in the same channel.
    reply:
      channel: user.creation.response.channel
        
components:
  messages:
    createUser:
      type: request
      name: CreateUser
      summary: Represents an explicit request to the service xyz
      contentType: application/json
      headers:
        type: object
        required:
         - correlationId
        properties:
          correlationId:
            type: string
            description: This header have to be copied to the response message.
      payload:
        $ref: 'sample.yaml#/components/schemas/createUser'
         
    creationSucessfull:
      type: response
      name: CreationWasSucessfull
      summary: an entity was created
      contentType: application/json
      headers:
        type: object
        required:
         - correlationId
        properties:
          correlationId:
            type: string
      payload:
        $ref: 'sample.yaml#/components/schemas/creationSucessfull'
  • The request will be send to topic/channel "user.creation.channel"
  • The request message needs to have a "correlationId" header
  • The response will be send to topic/channel "user.creation.response.channel"
  • The response message needs to have a "correlationId" header
  • The request and response message have independent well defined schemas.

Sample for use case B, C

channels:
  user.creation.channel:
    message:
      $ref: 'sample.yaml#/components/messages/createUser'

operations:
  createUser:
    action: send
    channel: user.creation.channel
    description: Creates a user and expects a response in the same channel.
    reply:
      replyTopic:
       source: header
       field: replyTo
      message:
        oneOf:
          - $ref: 'sample.yaml#/components/messages/creationFailed'
          - $ref: 'sample.yaml#/components/messages/creationSucessfull'
        
components:
  messages:
    createUser:
      type: request
      name: CreateUser
      summary: Represents an explicit request to the service xyz
      contentType: application/json
      headers:
        type: object
        required:
         - correlationId
         - replyTo
        properties:
          correlationId:
            type: string
          replyTo: 
            type: string
            pattern: user.creation.response.([^.]+) 
      payload:
        $ref: 'sample.yaml#/components/schemas/createUser'
         
    creationSucessfull:
      type: response
      name: CreationWasSucessfull
      summary: an entity was created
      contentType: application/json
      headers:
        type: object
        required:
         - correlationId
        properties:
          correlationId:
            type: string
      payload:
        $ref: 'sample.yaml#/components/schemas/creationSucessfull'
  • The request will be send to topic/channel "user.creation.channel"
  • The request message needs to have a "correlationId" header
  • The request message needs to have a "replyTopic" header
  • The response will be send to topic from the "replyTopic" header of request message
  • The response message needs to have a "correlationId" header
  • The request and response message have independent well defined schemas.

New defined schema elements

operations.*.action

type: string
enum:
 - send
 - receive

This is the replacement of async api v2 channels.*.publish and channels.*.subscribe

components.messages.*.type

type: string
default: generic
enum:
 - generic
 - request
 - response

Specify a messages as request or response. To give the code generator a hint.

components.messages.*.reply

Complex type.

type: object
properties:
  channel:
    type: string
    description: referring to a defined element in channels section
  message:
    type: object
    description: Either message or channel have to be set, to define the response message schema. If both are given, the message object will overwrite the message of the channel, all other options from the channel still take in place.
  replyTopic:
    type: object
    description: |
      Either replyTopic or channel have to be set, to define response topic. If both are given, the replyTopic will overwrite the topic of the channel, all other options from the channel still take in place.  
      The replyTopic describes where in the request message, the replyTopic can be found.
    required:
     - source
     - field
    properties:
      source:
        type: string
        enum:
         - header
         - body
      field:
        type: string
        description: the field name where the response topic string could be found. In case of the body, the replyTopic needs to be in a root level field.

To define a request reply couple.

components.messages.*.reply.channel

This field is optional.

It should be set if:

  • reply channel options doesn't match the sending channel. For example, if the response goes via a different broker.
  • if you want to specify a static well defined reply topic

components.messages.*.reply.message

This field is optional.

It should be set if:

  • The components.messages.*.reply.channel attribute is not set
  • You want to over write the message schema from the given channel

components.messages.*.reply.replyTopic

This field is optional.
If the reply topic is defined in the request message header. You need to define where in the request message it could be found.

It should be set if:

  • The components.messages.*.reply.channel attribute is not set
  • You want to over write the topic from the given channel

components.messages.*.reply.replyTopic.source

The request message contains the reply topic either in the message header or body.
For example mqtt3 and websocket, dont suppot message headers. In this case replyTopic needs to be part of the message body.

components.messages.*.reply.replyTopic.field

The field name where the response topic string could be found. In case of the body, the replyTopic needs to be in a root level field. Sub level pathes can not be defined.

@fmvilas
Copy link
Member

fmvilas commented May 17, 2022

Thanks a lot for this breakdown @GreenRover 👏 This is definitely what we need in order to move forward. IIRC, we should also look into JSON RPC as a use case so it can be covered as well.

@GreenRover
Copy link
Collaborator

Is there some one out in the wild that can provide input for JSON RPC? I never used it before. So if there is no one i have to start reading.

@fmvilas
Copy link
Member

fmvilas commented May 17, 2022

Not myself but there was a person in the call who said they were using it. Maybe they join next call 🤔

@natcl
Copy link

natcl commented May 17, 2022

Yes that was me, I'll describe our use case ASAP (probably tomorrow)

@natcl
Copy link

natcl commented May 17, 2022

@GreenRover in the meantime if your curious the spec is here ans it's fairly simple:
https://www.jsonrpc.org/specification

@natcl
Copy link

natcl commented May 19, 2022

So here is how we currently use JSON RPC 2.0:

We have servers that can handle multiple methods. For sake of example let's say our server supports the following 2 methods:

  • addUser
  • deleteUser

Our server listens over MQTT for JSON RPC requests at the following topic:
serverName/request

For MQTT 5, the server will reply to the topic specified by the client in the responseTopic property. This can be anything as the client is responsible for choosing that topic. Therefore it doesn't really make sense to document that topic in the AsyncAPI definition of the server.

For MQTT 3 the server will reply to the serverName/response topic. All clients listen to that topic and will filter out the correct reply by checking the JSON RPC id.

When used with MQTT the correlationData field of MQTT5 is not really important since JSON RPC has a correlation ID already in the body of the message so it's not used or could be set to the JSON RPC Id.

So in a nutshell what we need is a way to describe multiple methods that are all received on the same topic and also describe the schema of the reply (that needs to be linked with the method)

JSON RPC also has the concept of notifications. A notification is a message send without an ID by the client. When a notification is received, the server must not reply so in that case a response schema is not necessary.

Of some interest is also the fact the server can support other transports than MQTT for the JSON RPC aspect. We usually support UDP/TCP/WebSockets as well. Could be nice if there was a way to represent this in AsyncAPI (although it might be out of scope).

Here's an example of how addUser and delete user would work:

request -->

{
  "jsonrpc": "2.0",
  "id": 1,
  "method": "addUser",
  "params": {
    "username": "bob",
    "age": 33
  }
}

response <--

{
  "jsonrpc": "2.0",
  "id": 1,
  "result": {
    "status": "ok"
  }
}

The server could also respond with an error:

{
  "jsonrpc": "2.0",
  "id": 1
  "error" : {
    "code": -31000,
    "message": "Can't create user, sorry :("
   }
}

What we're mostly interested in documenting via a schema is the params, result and error properties, although a schema for the full message would be ok too.

Let me know if this is clear or if you need me to elaborate further !

Thanks :)

@fmvilas
Copy link
Member

fmvilas commented May 23, 2022

Hey folks, leaving the examples I shared on my screen during the last call:

# channels.yaml (probably defined and maintained at company level)

asyncapi: 3.0.0
channels:
  userCreationChannel:
    address: user.creation.channel
    message:
      - $ref: '#/components/messages/createUser'
  userCreationChannelWithType2:
    address: user.creation.channel
    message:
      - $ref: '#/components/messages/createUser2'
  userCreationReply:
    address: null
    message:
      - $ref: '#/components/messages/creationFailed'
      - $ref: '#/components/messages/creationSuccessful'
  userCreationSuccessfulReply:
    address: null
    message:
      - $ref: '#/components/messages/creationSuccessful'
components:
  messages:
    createUser:
      ...
    createUser2:
      ...
    creationFailed:
      ...
    creationSuccessful:
      ...
# myapplication.yaml (owned by my team)

asyncapi: 3.0.0
operations:
  createUserWithSucessfulReply:
    action: send
    channel: 'channels.yaml#/channels/userCreationChannelWithType2'
    description: Creates a user and expects a response in the same channel.
    reply:
      channel: 'channels.yaml#/channels/userCreationSuccessfulReply'

@GreenRover
Copy link
Collaborator

results of call from 23. May

Sample for use case A, D

channels:
  user.creation.channel:
    message:
      $ref: 'sample.yaml#/components/messages/createUser'

  user.creation.response.channel:
    message:
      $ref: 'sample.yaml#/components/messages/creationSucessfull'

operations:
  createUser:
    action: send
    channel: user.creation.channel
    description: Creates a user and expects a response in the same channel.
    reply:
      channel: user.creation.response.channel
        
components:
  messages:
    createUser:
      type: request
      name: CreateUser
      summary: Represents an explicit request to the service xyz
      contentType: application/json
      correlationId:
        description: Default Correlation ID
        location: $message.header#/correlationId
      headers:
        type: object
        required:
         - correlationId
        properties:
          correlationId:
            type: string
            description: This header have to be copied to the response message.
      payload:
        $ref: 'sample.yaml#/components/schemas/createUser'
         
    creationSucessfull:
      type: response
      name: CreationWasSucessfull
      summary: an entity was created
      contentType: application/json
      correlationId:
        description: Default Correlation ID
        location: $message.header#/correlationId
      headers:
        type: object
        required:
         - correlationId
        properties:
          correlationId:
            type: string
      payload:
        $ref: 'sample.yaml#/components/schemas/creationSucessfull'
  • The request will be send to topic/channel "user.creation.channel"
  • The request message needs to have a "correlationId" header
  • The response will be send to topic/channel "user.creation.response.channel"
  • The response message needs to have a "correlationId" header
  • The request and response message have independent well defined schemas.

Sample for use case B, C

channels:
  user.creation.channel:
    message:
      $ref: 'sample.yaml#/components/messages/createUser'

operations:
  createUser:
    action: send
    channel: user.creation.channel
    description: Creates a user and expects a response in the same channel.
    reply:
      message:
        oneOf:
          - $ref: 'sample.yaml#/components/messages/creationFailed'
          - $ref: 'sample.yaml#/components/messages/creationSucessfull'
        
components:
  messages:
    createUser:
      type: request
      name: CreateUser
      summary: Represents an explicit request to the service xyz
      contentType: application/json
      correlationId:
        description: Default Correlation ID
        location: $message.header#/correlationId
      replyTopic:
        description: Default topic to answer to
        location: $message.header#/replyTo
      headers:
        type: object
        required:
         - correlationId
         - replyTo
        properties:
          correlationId:
            type: string
          replyTo: 
            type: string
            pattern: user.creation.response.([^.]+) 
      payload:
        $ref: 'sample.yaml#/components/schemas/createUser'
         
    creationSucessfull:
      type: response
      name: CreationWasSucessfull
      summary: an entity was created
      contentType: application/json
      correlationId:
        description: Default Correlation ID
        location: $message.header#/correlationId
      headers:
        type: object
        required:
         - correlationId
        properties:
          correlationId:
            type: string
      payload:
        $ref: 'sample.yaml#/components/schemas/creationSucessfull'
  • The request will be send to topic/channel "user.creation.channel"
  • The request message needs to have a "correlationId" header
  • The request message needs to have a "replyTopic" header
  • The response will be send to topic from the "replyTopic" header of request message
  • The response message needs to have a "correlationId" header
  • The request and response message have independent well defined schemas.

New defined schema elements

components.messages.*.type

type: string
default: generic
enum:
 - generic
 - request
 - response

Specify a messages as request or response. To give the code generator a hint.

components.messages.*.replyTopic

This field is optional.
If the reply topic is defined in the request message header. You need to define where in the request message it could be found.

For specifying and computing the location of a Correlation ID, a runtime expression is used.
https://www.asyncapi.com/docs/specifications/v2.2.0#runtimeExpression

operations.*.action

type: string
enum:
 - send
 - receive

This is the replacement of async api v2 channels.*.publish and channels.*.subscribe

operations.*.reply

Complex type.

type: object
properties:
  channel:
    type: string
    description: referring to a defined element in channels section
  message:
    type: object
    description: Either message or channel have to be set, to define the response message schema. If both are given, the message object will overwrite the message of the channel, all other options from the channel still take in place.

To define a request reply couple.

operations.*.reply.channel

This field is optional.

It should be set if:

  • reply channel options doesn't match the sending channel. For example, if the response goes via a different broker.
  • if you want to specify a static well defined reply topic

operations.*.reply.message

This field is optional.

It should be set if:

  • The components.messages.*.reply.channel attribute is not set
  • You want to over write the message schema from the given channel

@GreenRover
Copy link
Collaborator

results of call from 23. May, off topic: message / channel

In async api v2 -> v3 the relation between channel, message, operation will change.

In v3: channel

A channel has a reference can transport multiple messages.

asyncapi: 3.0.0
channels:
  userCreationReply:
    address: null
    message:
      - $ref: 'channels.yaml#/components/messages/creationFailed'
      - $ref: 'channels.yaml#/components/messages/creationSuccessful'

In v2->v3 there is a bigger change:

the channel name userCreationChannel and topic user.creation.channel
is not longer the same field.

asyncapi: 3.0.0
channels:
  userCreationChannel:
    address: user.creation.channel
    message:
      - $ref: 'channels.yaml#/components/messages/createUser'
    

For requestReply where the reply adress is part of the request message, the address needs to be optional.

On going discussion:

For request reply not to set address

  • Just dont have this field
  • Needs to be defined as NULL

Optinion of meeting attendee: Have it defined as NULL makes it more clear and harder to over read.

On going discussion:

A channel can transport multiple messages schemas.
There for it should be an array of messages.

--> See: "Assuming the operations[*].message/operations[*].reply.message are limiting"

In v3: operation

A operation has a reference to a channel and/or message.

asyncapi: 3.0.0
operations:
  createUserWithSucessfulReply:
    action: send
    channel: 'channels.yaml#/channels/userCreation'
    message: '/components/messages/createUser'
    description: Creates a user and expects a response in the same channel.
    reply:
        channel: 'channels.yaml#/channels/replies'
        message: '/components/messages/createSuccesfull'

Optinions:

  • The message could over write the channels[*].message
  • channels[*].message is an array and the operations[*].message/operations[*].reply.message is a limit / subset to allow for this operation only a subset of the messages defined of channels level.
    • Hint: A use case here could be: a company wide definition of channels and messages and the application defined in a separate yaml file only glue it together via operations.
    • My Question: Are components/messages realy defined on company level. By channel and topic structure i aggree.

Assuming the operations[*].message/operations[*].reply.message are limiting

Now we have the need to identify messages to be able to limit those.

Option A:

asyncapi: 3.0.0
channels:
  userCreationReply:
    address: null
    message:
      - $ref: 'channels.yaml#/components/messages/creationFailed'
      - $ref: 'channels.yaml#/components/messages/creationSuccessful'
operations:
  createUserWithSucessfulReply:
    action: send
    channel: 'channels.yaml#/channels/userCreation'
    reply:
        channel: 'channels.yaml#/channels/userCreationReply'
        message: 'channels.yaml#/components/messages/creationFailed'

Using the default $ref syntax. Like: channels.yaml#/components/messages/creationFailed

Benefit:
  • All standard editor understanding json schema are able to handle this.
  • It is totaly clear what is referenced to.
Down sides:
  • The validation of the schema is a little bit harder. But with js object comparision it should still work.
  • No annonymous schmemas for messages are supported any more.

Option B:

Using only message and channel ids.

# channels.yaml
asyncapi: 3.0.0
channels:
  userCreationReply:
    address: null
    message:
      - $ref: 'creationFailed'
      - $ref: 'creationSuccessful'
components:
  messages:
    createUser:
      ...
    createUser2:
      ...
    creationFailed:
      ...
    creationSuccessful:
      ...
      
# myapplication.yaml (owned by my team)
operations:
  createUserWithSucessfulReply:
    action: send
    channel: 'userCreation'
    reply:
        channel: 'userCreationReply'
        message: 'creationFailed'
Benefit:
  • Very easy to parse
Down sides:
  • Standard editor not made for async api will not be able to handle it.
  • All IDs have to be unique over all files.
  • It is not clear where those ids are defined. This will make it unclear/confusing when using multiple files.

@GreenRover
Copy link
Collaborator

GreenRover commented Jun 9, 2022

Example:

Converted the kraken 2.0.0 spec sample made by @fmvilas to 3.0.0

to kraken async api 3.0.0 example

asyncapi: 3.0.0

info:
  title: Kraken Websockets API
  version: '1.8.0'
  description: |
    WebSockets API offers real-time market data updates. WebSockets is a bidirectional protocol offering fastest real-time data, helping you build real-time applications. The public message types presented below do not require authentication. Private-data messages can be subscribed on a separate authenticated endpoint. 

    ### General Considerations

    - TLS with SNI (Server Name Indication) is required in order to establish a Kraken WebSockets API connection. See Cloudflare's [What is SNI?](https://www.cloudflare.com/learning/ssl/what-is-sni/) guide for more details.
    - All messages sent and received via WebSockets are encoded in JSON format
    - All decimal fields (including timestamps) are quoted to preserve precision.
    - Timestamps should not be considered unique and not be considered as aliases for transaction IDs. Also, the granularity of timestamps is not representative of transaction rates.
    - At least one private message should be subscribed to keep the authenticated client connection open.
    - Please use REST API endpoint [AssetPairs](https://www.kraken.com/features/api#get-tradable-pairs) to fetch the list of pairs which can be subscribed via WebSockets API. For example, field 'wsname' gives the supported pairs name which can be used to subscribe.
    - Cloudflare imposes a connection/re-connection rate limit (per IP address) of approximately 150 attempts per rolling 10 minutes. If this is exceeded, the IP is banned for 10 minutes.
    - Recommended reconnection behaviour is to (1) attempt reconnection instantly up to a handful of times if the websocket is dropped randomly during normal operation but (2) after maintenance or extended downtime, attempt to reconnect no more quickly than once every 5 seconds. There is no advantage to reconnecting more rapidly after maintenance during cancel_only mode.

servers:
 # i dont know how this section will look like in 3.0.0

channels:
  ping:
    address: /
    message:
      - $ref: 'channels.yaml#/components/messages/ping'
  pong:
    address: /
    message:
      - $ref: 'channels.yaml#/components/messages/pong'

  heartbeat:
    address: /
    message:
      - $ref: 'channels.yaml#/components/messages/heartbeat'
      
  systemStatus:
    address: /
    message:
      - $ref: 'channels.yaml#/components/messages/systemStatus'
      
  subscriptionStatus:
    address: /
    message:
      - $ref: 'channels.yaml#/components/messages/subscriptionStatus'
  subscribe:
    address: /
    message:
      - $ref: 'channels.yaml#/components/messages/subscribe'
  unsubscribe:
    address: /
    message:
      - $ref: 'channels.yaml#/components/messages/unsubscribe'

      
operations:
  pingPong:
    action: send
    channel: 'channels.yaml#/channels/ping'
    reply:
        channel: 'channels.yaml#/channels/pong'
  heartbeat:
    action: receive
    channel: 'channels.yaml#/channels/heartbeat' 
  systemStatus:
    action: receive
    channel: 'channels.yaml#/channels/systemStatus' 
  subscribe:
    action: send
    channel: 'channels.yaml#/channels/subscribe'
    reply:
        channel: 'channels.yaml#/channels/subscriptionStatus'
  unsubscribe:
    action: send
    channel: 'channels.yaml#/channels/unsubscribe'
    reply:
        channel: 'channels.yaml#/channels/subscriptionStatus'   
       
       
components:
  messages:
    ping:
      summary: Ping server to determine whether connection is alive
      description: Client can ping server to determine whether connection is alive, server responds with pong. This is an application level ping as opposed to default ping in websockets standard which is server initiated
      payload:
        $ref: '#/components/schemas/ping'
      headers:
        type: object
        properties:
          correlationId:
            type: string
      correlationId:
        location: $message.header#/correlationId
        
    pong:
      summary: Pong is a response to ping message
      description: Server pong response to a ping to determine whether connection is alive. This is an application level pong as opposed to default pong in websockets standard which is sent by client in response to a ping
      payload:
        $ref: '#/components/schemas/pong' 
      headers:
        type: object
        properties:
          correlationId:
            type: string
      correlationId:
        location: $message.header#/correlationId

    subscribe:
      description: Subscribe to a topic on a single or multiple currency pairs.
      payload:
        $ref: '#/components/schemas/subscribe'
      headers:
        type: object
        properties:
          correlationId:
            type: string
      correlationId:
        location: $message.header#/correlationId
    unsubscribe:
      description: Unsubscribe, can specify a channelID or multiple currency pairs.
      payload:
        $ref: '#/components/schemas/unsubscribe'
      headers:
        type: object
        properties:
          correlationId:
            type: string
      correlationId:
        location: $message.header#/correlationId
    subscriptionStatus:
      description: Subscription status response to subscribe, unsubscribe or exchange initiated unsubscribe.
      payload:
        $ref: '#/components/schemas/subscriptionStatus'
      examples:
        - payload:
            channelID: 10001
            channelName: ohlc-5
            event: subscriptionStatus
            pair: XBT/EUR
            reqid: 42
            status: unsubscribed
            subscription:
              interval: 5
              name: ohlc
        - payload:
            errorMessage: Subscription depth not supported
            event: subscriptionStatus
            pair: XBT/USD
            status: error
            subscription:
              depth: 42
              name: book

    systemStatus:
      description: Status sent on connection or system status changes.
      payload:
        $ref: '#/components/schemas/systemStatus' 
        
    heartbeat:
      description: Server heartbeat sent if no subscription traffic within 1 second (approximately)
      payload:
        $ref: '#/components/schemas/heartbeat' 
        
        
  schemas:
    ping:
      type: object
      properties:
        event:
          type: string
          const: ping
        reqid:
          $ref: '#/components/schemas/reqid'
      required:
        - event
    heartbeat:
      type: object
      properties:
        event:
          type: string
          const: heartbeat
    pong:
      type: object
      properties:
        event:
          type: string
          const: pong
        reqid:
          $ref: '#/components/schemas/reqid'
    systemStatus:
      type: object
      properties:
        event:
          type: string
          const: systemStatus
        connectionID:
          type: integer
          description: The ID of the connection
        status:
          $ref: '#/components/schemas/status'
        version:
          type: string
    status:
      type: string
      enum:
        - online
        - maintenance
        - cancel_only
        - limit_only
        - post_only
    subscribe:
      type: object
      properties:
        event:
          type: string
          const: subscribe
        reqid:
          $ref: '#/components/schemas/reqid'
        pair:
          $ref: '#/components/schemas/pair'
        subscription:
          type: object
          properties:
            depth:
              $ref: '#/components/schemas/depth'
            interval:
              $ref: '#/components/schemas/interval'
            name:
              $ref: '#/components/schemas/name'
            ratecounter:
              $ref: '#/components/schemas/ratecounter'
            snapshot:
              $ref: '#/components/schemas/snapshot'
            token:
              $ref: '#/components/schemas/token'
          required:
            - name
      required:
        - event
    unsubscribe:
      type: object
      properties:
        event:
          type: string
          const: unsubscribe
        reqid:
          $ref: '#/components/schemas/reqid'
        pair:
          $ref: '#/components/schemas/pair'
        subscription:
          type: object
          properties:
            depth:
              $ref: '#/components/schemas/depth'
            interval:
              $ref: '#/components/schemas/interval'
            name:
              $ref: '#/components/schemas/name'
            token:
              $ref: '#/components/schemas/token'
          required:
            - name
      required:
        - event
    subscriptionStatus:
      type: object
      oneOf:
        - $ref: '#/components/schemas/subscriptionStatusError'
        - $ref: '#/components/schemas/subscriptionStatusSuccess'
    subscriptionStatusError:
      allOf:
        - properties:
            errorMessage:
              type: string
          required:
            - errorMessage
        - $ref: '#/components/schemas/subscriptionStatusCommon'
    subscriptionStatusSuccess:
      allOf:
        - properties:
            channelID:
              type: integer
              description: ChannelID on successful subscription, applicable to public messages only.
            channelName:
              type: string
              description: Channel Name on successful subscription. For payloads 'ohlc' and 'book', respective interval or depth will be added as suffix.
          required:
            - channelID
            - channelName
        - $ref: '#/components/schemas/subscriptionStatusCommon'
    subscriptionStatusCommon:
      type: object
      required:
         - event
      properties:
        event:
          type: string
          const: subscriptionStatus
        reqid:
          $ref: '#/components/schemas/reqid'
        pair:
          $ref: '#/components/schemas/pair'
        status:
          $ref: '#/components/schemas/status'
        subscription:
          required:
            - name
          type: object
          properties:
            depth:
              $ref: '#/components/schemas/depth'
            interval:
              $ref: '#/components/schemas/interval'
            maxratecount:
              $ref: '#/components/schemas/maxratecount'
            name:
              $ref: '#/components/schemas/name'
            token:
              $ref: '#/components/schemas/token'
    interval:
      type: integer
      description: Time interval associated with ohlc subscription in minutes.
      default: 1
      enum:
        - 1
        - 5
        - 15
        - 30
        - 60
        - 240
        - 1440
        - 10080
        - 21600
    name:
      type: string
      description: The name of the channel you subscribe too.
      enum:
        - book
        - ohlc
        - openOrders
        - ownTrades
        - spread
        - ticker
        - trade
    token:
      type: string
      description: base64-encoded authentication token for private-data endpoints.
    depth:
      type: integer
      default: 10
      enum:
        - 10
        - 25
        - 100
        - 500
        - 1000
      description: Depth associated with book subscription in number of levels each side.
    maxratecount:
      type: integer
      description: Max rate-limit budget. Compare to the ratecounter field in the openOrders updates to check whether you are approaching the rate limit.
    ratecounter:
      type: boolean
      default: false
      description: Whether to send rate-limit counter in updates (supported only for openOrders subscriptions)
    snapshot:
      type: boolean
      default: true
      description: Whether to send historical feed data snapshot upon subscription (supported only for ownTrades subscriptions)
    reqid:
      type: integer
      description: client originated ID reflected in response message.
    pair:
      type: array
      description: Array of currency pairs.
      items:
        type: string
        description: Format of each pair is "A/B", where A and B are ISO 4217-A3 for standardized assets and popular unique symbol if not standardized.
        pattern: '[A-Z\s]+\/[A-Z\s]+'

GreenRover added a commit to GreenRover/asyncapi-spec that referenced this issue Oct 4, 2022
@derberg
Copy link
Member

derberg commented Nov 16, 2022

Whoever listens to notifications from this issue, I encourage you to have a look at #847. It is pretty advanced with few cards on the table, we need different opinions to figure out which one is the best.

If ya need guidance on where to look at, feel free to ask for help

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
keep-open Prevents stale bot from closing this issue or PR
Projects
None yet
Development

No branches or pull requests