Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

str feat: RestartSourceWithContext #32178

Merged
merged 5 commits into from
Dec 5, 2023

Conversation

leviramsey
Copy link
Contributor

SourceWithContext cannot be wrapped into a RestartSource.

val sourceWithContext = SourceWithContext.fromTuples(Source(Seq("A", "B", "C").zipWithIndex))

RestartSource.onFailuresWithBackoff(RestartSettings(1.milli, 10.millis, 0.2)) { () => sourceWithContext }

       error: type mismatch;
        found   : akka.stream.scaladsl.SourceWithContext[String,Int,akka.NotUsed]
        required: akka.stream.scaladsl.Source[?, _]

I would assume that similar limitations apply to SinkWithContext/FlowWithContext, but it's not clear how well their respective limitations with respect to dropping on restart mesh with the intended use of the ...WithContext variations for carrying something like a Kafka or Projection offset, which tend to be where an at-least-once guarantee is desired.

@leviramsey
Copy link
Contributor Author

leviramsey commented Oct 16, 2023

Weird, I can't even get Test / compile to work for me with Scala 3.3 due to an akka-(actor|cluster)-typed weirdness (so not brought in by this change)

akka > +~ 3.3 Test/compile
[error] /home2/levi/code/akka/akka-actor-typed/src/main/scala-3/akka/actor/typed/internal/receptionist/Platform.scala:16: error: ; expected but match found
[error]   type Service[K <: Aux[_]] = K match {
[error]                                 ^: /home2/levi/code/akka/akka-actor-typed/src/main/scala-3/akka/actor/typed/internal/receptionist/Platform.scala
[error] /home2/levi/code/akka/akka-cluster-typed/src/main/scala-3/akka/cluster/typed/internal/receptionist/ClusterReceptionistProtocol.scala:18: error: ; expected but match found
[error]   type SubscriptionsKV[K <: Aux[_]] = K match {
[error]                                         ^: /home2/levi/code/akka/akka-cluster-typed/src/main/scala-3/akka/cluster/typed/internal/receptionist/ClusterReceptionistProtocol.scala
[error] (akka-actor-typed / Compile / scalafmt) /home2/levi/code/akka/akka-actor-typed/src/main/scala-3/akka/actor/typed/internal/receptionist/Platform.scala:16: error: ; expected but match found
[error]   type Service[K <: Aux[_]] = K match {
[error]                                 ^: /home2/levi/code/akka/akka-actor-typed/src/main/scala-3/akka/actor/typed/internal/receptionist/Platform.scala
[error] (akka-cluster-typed / Compile / scalafmt) /home2/levi/code/akka/akka-cluster-typed/src/main/scala-3/akka/cluster/typed/internal/receptionist/ClusterReceptionistProtocol.scala:18: error: ; expected but match found
[error]   type SubscriptionsKV[K <: Aux[_]] = K match {
[error]                                         ^: /home2/levi/code/akka/akka-cluster-typed/src/main/scala-3/akka/cluster/typed/internal/receptionist/ClusterReceptionistProtocol.scala

EDIT: scalafmt (see #32186 & #32187)

@leviramsey
Copy link
Contributor Author

Close/reopening for CI

Copy link
Member

@johanandren johanandren left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks good, needs Java APIs

leviramsey and others added 3 commits November 29, 2023 10:50
…ithContext.scala

Co-authored-by: Johan Andrén <johan@markatta.com>
Copy link
Member

@johanandren johanandren left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One more thing then I think this is ready to go.

*/
def withBackoff[T, C](
settings: RestartSettings,
sourceFactory: Creator[SourceWithContext[T, C, _]]): SourceWithContext[T, C, NotUsed] = {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The akka.japi.function interfaces are parallel with the JDK stdlib ones but add the capability to throw checked, for example for usage in actor message handling. That's not important here, so let's go with java.util.function.Supplier instead.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only used Creator to mirror the other Restart* factories... suppose there should be an issue to move those to Supplier (which would be binary incompat, but source compat) in 2.10?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ah, yes, let's keep it aligned then 👍

Copy link
Member

@johanandren johanandren left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM

@johanandren johanandren merged commit 6e080c0 into akka:main Dec 5, 2023
5 checks passed
@johanandren johanandren added this to the 2.9.1 milestone Dec 5, 2023
He-Pin pushed a commit to He-Pin/akka that referenced this pull request Jan 7, 2024
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants