Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Wait for materialized value before completing in chunk upload #504

Merged
merged 1 commit into from
Nov 14, 2023

Conversation

mdedetrich
Copy link
Contributor

@mdedetrich mdedetrich commented Nov 14, 2023

About this change - What it does

Currently in Pekko we ignore the result when a chunk upload sink is uploaded, this PR fixes that.

Why this way

Originally when I wrote the kafkaBatchSink I wasn't that familiar with the graphDSL and in its original implementation I didn't pass in the successSink/failureGraph into the graph. What this meant is that while the core business logic would run, it wouldn't wait on completion of either the successSink or failureSink.

This may also help in the current flakiness of guardian tests.

Also make sure to hide whitespace when viewing the PR, also the PR requires a snapshot version of Pekko connectors due to needing this change apache/pekko-connectors#280

.contramap[(UploadPartResponse, immutable.Iterable[kafkaClientInterface.CursorContext])] {
Sink.fromGraph(
GraphDSL.createGraph(
successSink.contramap[(UploadPartResponse, immutable.Iterable[kafkaClientInterface.CursorContext])] {
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is the major difference which is that we pass in both the successSink and failureSink explicitly, this gives us access to the materialized values (explained below).

case (response, value) => (response.asInstanceOf[FailedUploadPart], value)
}
SinkShape(partition.in)
})
) { (successSinkMat, failureSinkMat) =>
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since we are passing in the sinks explicitly now, we have to supply an extra argument to GraphDSL.create that defines how to combine the materialization values of the sinks. In Pekko terminology, materialization is the result value of an activation of a sink (i.e. for a Kafka publisher sink, whenever the we push a value into the Kafka publisher then the materialized value is a result of that action which with the Kafka example would typically be a Future[NotUsed] or a Future[Done])

For this specific case of successSinkMat/failureSinkMat, they are both Future's that contain values we don't really care about. The only thing we do care about is if at least one of the Future's have failed and also that we only complete when both Future's are completed (this can be done asynchronously, hence the Future.sequence), we also we don't care about ordering.

@mdedetrich mdedetrich merged commit 406fcda into main Nov 14, 2023
4 checks passed
@mdedetrich mdedetrich deleted the wait-on-mat-results-for-upload-chunk branch November 14, 2023 17:31
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

2 participants