-
Notifications
You must be signed in to change notification settings - Fork 645
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
MQTT streaming: Express the carry type #1293
Conversation
This ensures that the client code usage is nicer, and tighter.
We recently got this contribution to the docs: Pass through |
Thanks @ennru. I’ve long wanted to see Akka Streams itself provide for this scenario. Our situation though is that the MQTT connector already caters for carrying these elements. Any advice on how to use BroadcastHub with generic types from Java? |
I should add that the PassThroughFlow example wouldn’t work for us because we have two flows: a command flow and an event flow. For example a publish command needs to provide a carry that is emitted with its correlating publish ack event. As well as there being two flows, there’s not guaranteed order in which event flow elements are received eg you may get a publish ack for a later message and then get one for an earlier one, or it may be that a ping req is received before you get a publish ack etc. |
We need to retain the generic parameter in the returned type - Java doesn’t do this, which highlights a difference between Java’s class method vs Scala’s classOf.
@ennru I've now fixed the Java API and updated the description of the PR accordingly. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM.
@@ -149,7 +149,7 @@ public void establishServerBidirectionalConnectionAndSubscribeToATopic() | |||
run = | |||
Source.<Command<Object>>queue(2, OverflowStrategy.dropHead()) | |||
.via(mqttFlow) | |||
.toMat(BroadcastHub.of(DecodeErrorOrEvent.class), Keep.both()) | |||
.toMat(BroadcastHub.of(DecodeErrorOrEvent.classOf()), Keep.both()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
JFYI BroadcastHub.of
doesn't even use the class value. It's only there to help define the type parameter. So you can actually pass null
and let type inference define type parameter. 😄
This ensures that the client code usage is nicer, and tighter, when working with the carry object. The Java API is more affected that the Scala API. The Scala API is hardly affected at all, and where it is, it requires substituting
Command[_]
andEvent[_]
withCommand[Nothing]
andEvent[Nothing]
respectively (unless you have a carry, where then you provide its type instead ofNothing
)(see the Scala API changes via our tests).An interesting point to note is that Java's
class
method does not return the type of a generic e.g.:...will return a
Class<DecodeErrorOrEvent>
. However, we wantClass<DecodeErrorOrEvent<A>>
whereA
is the type of the element to carry. In Scala,classOf
will return this i.e.classOf[DecodeErrorOrEvent[A]]
returns what we want. So, I've created a Java APIclassOf
function forDecodeErrorOrEvent
. Thus, we can correctly express situations like:If there's a better way to provide the
classOf
outcome then I'm all ears. However, I've dwelled on this for a couple of days. I think the solution here works well.