-
Notifications
You must be signed in to change notification settings - Fork 645
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
MQTT streaming: SourceQueue backpressure #1577
Conversation
case (_, other) => | ||
waitForQueueOfferCompleted(behavior, stash :+ other) | ||
} | ||
.orElse(behavior) // handle signals immediately |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
One thing I'm not clear on are the semantics here. Note that on L43, we catch all events that flow. Thus, the intention is that all signals go to behavior
but nothing else (until we hit L34 in the future)
If behavior
's signal handler (i.e. the behavior being run via orElse
) returns Behavior.same
, I'd like to clarify that it doesn't change the behavior of the actor to itself, i.e. waitForQueueOfferCompleted
(L26-L46) remains the behavior. Hopefully my question makes sense..
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think the behaviour would indeed remain the same i.e. stay as per the behaviour of waitForQueueOfferCompleted
. But I've not yet proved this.
However, given L46 with the orElse
, I'm not seeing the signal handler of behavior
being applied. When I run the tests I see a DeathPactException
which means that there is no signal handler for Terminated
, but there is such a signal handler for behavior
...
mqtt-streaming/src/main/scala/akka/stream/alpakka/mqtt/streaming/impl/ClientState.scala
Outdated
Show resolved
Hide resolved
I've been looking at this in some detail and ended up posing a question to the Lightbend forum. I've also tidied up a couple of things around the edge, but nothing specifically to do with this PR. @patriknw 'hoping you may be able to enlighten me re my question. :-) |
.run() | ||
|
||
val connect = Connect("some-client-id", ConnectFlags.None) | ||
|
||
client.offer(Command(connect)) | ||
|
||
result.failed.futureValue shouldBe a[WatchedActorTerminatedException] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This exhibits the core of my change here - this termination exception isn't something that the caller should be concerned with. It is internal to the session.
@@ -1271,7 +1271,7 @@ class MqttSessionSpec | |||
case Right(Event(cp: Unsubscribe, _)) if cp.topicFilters == unsubscribe.topicFilters => | |||
unsubscribeReceived.success(Done) | |||
}) | |||
.toMat(Sink.seq)(Keep.both) | |||
.toMat(Sink.ignore)(Keep.left) | |||
.run() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The result
wasn't being used, so I removed it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great to get the backpressure all the way.
mqtt-streaming/src/main/scala/akka/stream/alpakka/mqtt/streaming/impl/ClientState.scala
Outdated
Show resolved
Hide resolved
@@ -538,6 +544,9 @@ final class ActorMqttServerSession(settings: MqttSessionSettings)(implicit mat: | |||
case c: Command[A] => throw new IllegalStateException(c + " is not a server command") | |||
} | |||
) | |||
.recover { | |||
case _: WatchedActorTerminatedException => ByteString.empty |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great to clear those from the logs when they shouldn't concern anyone.
6511de7
to
fc2d365
Compare
FYI - This is waiting on an Akka release with akka/akka#26524 given its reliance on |
A number of commits here that improve the resilience of the mqtt-streaming connector. #1577 remains important as well, but will require an Akka release. ## Require a connection id to be specified for client flows This allows the MQTT session to better track which events are relevant, given that there can be races when old connections are torn down and new ones are established. ## Fix a bug where actor state was closed over This could cause duplicate publications to never be sent to new connections ## Republish messages on reconnect only by default Previously, messages for QoS1/2 were only republished on an interval after not receiving an ack. It is more conventional to instead republish everything only on connect, and indeed to be compliant for MQTT 5, that is the only time this is allowed. To accommodate this, the timeouts default to 0, but the previous behavior can still be restored by changing the default producer timeout settings.
1ac2674
to
e683d5b
Compare
Rebased this to |
Apparently, there may be an Akka release next week. |
I know you are keen to get this in now that Akka 2.5.22 is available. |
e683d5b
to
1994542
Compare
This is still a draft, anything more we need to solve? |
Ah, that was oversight! This is ready for review/merge. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some minor comments. Needs a rebase.
|
||
QueueOfferState.waitForQueueOfferCompleted( | ||
serverConnected(data), | ||
Seq.empty |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Might be good to add (more of these further down)
Seq.empty | |
stash = Seq.empty |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
done
project/Dependencies.scala
Outdated
} | ||
|
||
val AwsSdkVersion = "1.11.476" | ||
val AwsSdk2Version = "2.5.20" | ||
val AkkaHttpVersion = "10.1.7" |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is not connected to this PR, is it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks - removed
Thanks @ennru - I'll get this updated tomorrow. FWIW, we've been running it in production since Akka 2.5.22 was released. It's been pretty stable, I think we're down to one remaining known issue with the connector, ( |
23546ee
to
767e4d9
Compare
Okay, this is all set. |
or not - MiMA failures. I think it's related to the new field |
Yes, that is why we require the constructors to be private. |
I took the liberty to add the MiMa rule. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. Renamed the MiMa fitlers file, as we had one Alpakka release since then.
@longshorej @huntc Just to be sure, as I guess you've run this code for a while now -- are we good to make it part of Alpakka 1.1.0 next week? |
Yeah, let’s get this in! Thanks |
When offering commands to the internal queue, a backpressure strategy is now used to ensure that elements are not dropped. Instead, we wait for consumption downstream before continuing.
I noticed that a test wasn’t quite right - the client should only complete after we’ve received all elements. We now use “take” to ensure that the stream completes.
We want WatchedActorTerminatedException to complete stream processing as before, but not with a failure. A session shutting down isn’t in itself a failure.
Rebased from the latest master, as the MiMa previous artifact version was derived to be |
Thank you for this important improvement. |
* mqtt streaming - introduce a mechanism to use SourceQueue backpressure When offering commands to the internal queue, a backpressure strategy is now used to ensure that elements are not dropped. Instead, we wait for consumption downstream before continuing. We want WatchedActorTerminatedException to complete stream processing as before, but not with a failure. A session shutting down isn’t in itself a failure.
This adds a mechanism to
mqtt-streaming
to useSource.queue
withOverflowStrategy.backpressure
and wait for the backpressure signal before handling more messages.I think we could probably increase from 1 to say 4 or 8, reducing latency at the expense of the cost of the memory required to buffer those events, but curious to get opinions on that.
While we're at it, it also removes
BroadcastHub
usage when there's only a single consumer, which should be a bit more performant.Some funky things with the tests, in particular doc ones, so this is a draft for now.
/cc @huntc who vowed to pick up where I've left off 😃