-
Notifications
You must be signed in to change notification settings - Fork 387
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
"With context" support (for use with FlowWithContext) #780
Conversation
The implementation of the Java |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The Java DSL is not stable, yet.
core/src/main/scala/akka/kafka/internal/TransactionalSource.scala
Outdated
Show resolved
Hide resolved
@RayRoestenburg Would be great if you'd have a look at this. |
@ennru sorry I got my mail filters setup wrong somehow, will have a look. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Super minor comments, LGTM
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Some questions regarding API naming.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Couple of more comments.
* Batches offsets from context and commits them to Kafka. | ||
*/ | ||
@ApiMayChange | ||
def sinkWithOffsetContext[E](settings: CommitterSettings): Sink[(E, Committable), Future[Done]] = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We still need to fix the name here. sinkWithCommitableContext
? Same for the flow above?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
But it is the same as the flow?! withOffsetContext
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes. I think that one should also be renamed to flowWithCommittableContext
as that is what the context type is.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is a hard one. The API is actually lying, it can't handle "any" Committable
as everything but offsets will fail here
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I changed those new withOffsetContext
methods to require CommittableOffset
other Committable
s won't do anyway.
/** | ||
* Batches offsets and commits them to Kafka, emits `CommittableOffsetBatch` for every committed batch. | ||
*/ | ||
def batchFlow[C <: Committable](settings: CommitterSettings): Flow[C, CommittableOffsetBatch, NotUsed] = |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This was lacking in the Java DSL, I'm tempted to require CommittableOffset
here as well, but that would differ from the Scala DSL.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I think it is good to keep them the same.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
LGTM. After the small fix in docs, this can be merged in.
docs/src/main/paradox/consumer.md
Outdated
| Factory method | Stream element type | Emits | | ||
|-------------------------|--------------------------------|--------------| | ||
| `sink` | `Committable` | N/A | | ||
| `sinkWithOffsetContext` | Any (`Committable` in context) | N/A | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| `sinkWithOffsetContext` | Any (`Committable` in context) | N/A | | |
| `sinkWithOffsetContext` | Any (`CommittableOffset` in context) | N/A | |
docs/src/main/paradox/consumer.md
Outdated
| `sinkWithOffsetContext` | Any (`Committable` in context) | N/A | | ||
| `flow` | `Committable` | `Done` | | ||
| `batchFlow` | `Committable` | `CommittableOffsetBatch` | | ||
| `flowWithOffsetContext` | Any (`Committable` in context) | `NotUsed` (`CommittableOffsetBatch` in context) | |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
| `flowWithOffsetContext` | Any (`Committable` in context) | `NotUsed` (`CommittableOffsetBatch` in context) | | |
| `flowWithOffsetContext` | Any (`CommittableOffset` in context) | `NotUsed` (`CommittableOffsetBatch` in context) | |
Purpose
This adds
Producer.withContext
to support Akka's newFlowWithContext
out of the box.It works the same as
flexiFlow
with the difference that thePassThrough
is unavailable to the user as it is used to carry the context along internally.Adds
Consumer.sourceWithOffsetContext
which createsSourceWithContext[ConsumerRecord[K, V], CommittableOffset, Control]
.References
Source.asSourceWithContext
Flow.asFlowWithContext
Changes
Envelope
to set the pass-through valueProducer
flowWithContext
methodsConsumer
sourceWithOffsetContext
Transactional
sourceWithOffsetContext
sinkWithOffsetContext
flowWithOffsetContext
Committer
flowWithOffsetContext
sinkWithOffsetContext