Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

KAFKA-10199: Add remove operation with future to state updater #15852

Merged
merged 2 commits into from
May 6, 2024

Conversation

cadonna
Copy link
Contributor

@cadonna cadonna commented May 3, 2024

Adds a remove operation to the state updater that returns a future instead of adding the removed tasks to an output queue. Code that uses the state updater can then wait on the future.

Committer Checklist (excluded from commit message)

  • Verify design and implementation
  • Verify test coverage and CI build status
  • Verify documentation (including upgrade notes)

Adds a remove operation to the state updater that returns a future
instead of adding the removed tasks to an output queue. Code that
uses the state updater can then wait on the future.
@cadonna cadonna requested a review from lucasbru May 3, 2024 11:10
Copy link
Contributor Author

@cadonna cadonna May 3, 2024

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I did not care to keep the unit tests for the remove without future since that method will be removed in a future PR.

Copy link
Member

@lucasbru lucasbru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR! Left some comments

break;
case REMOVE:
removeTask(taskAndAction.getTaskId());
if (taskAndAction.futureForRemove() == null) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need the variant without the future?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No, we do not but for the sake of breaking down my gigantic PR, I need to temporary keep it until one of my future PRs makes it obsolete. It will be simply called remove() after that.

private void removeTask(final TaskId taskId, final CompletableFuture<RemovedTaskResult> future) {
try {
if (updatingTasks.containsKey(taskId)) {
removeUpdatingTask(taskId, future);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could all four remove...Task methods just return a boolean if successful?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I will try that.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done! Code looks better now. Thanks a lot!

*
* @param taskId ID of the task to remove
*/
CompletableFuture<RemovedTaskResult> removeWithFuture(final TaskId taskId);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

removeAsync?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about remove()? 🙂
See my comment above.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Sounds good

Copy link
Member

@lucasbru lucasbru left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, thanks, one nit

future.complete(null);
log.warn("Task " + taskId + " could not be removed from the state updater because "
+ "the state updater does not own this task.");
if (!removeUpdatingTask(taskId, future)) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: Used short-circuiting to avoid the nesting

if (!removeUpdatingTask(taskId, future)
 	&& !removePausedTask(taskId, future)
	&& .. 

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, thanks! I am going to change that in one of my future PRs.

@cadonna cadonna merged commit 366aeab into apache:trunk May 6, 2024
1 check failed
gongxuanzhang pushed a commit to gongxuanzhang/kafka that referenced this pull request Jun 12, 2024
…e#15852)

Adds a remove operation to the state updater that returns a future
instead of adding the removed tasks to an output queue. Code that
uses the state updater can then wait on the future.

Reviewer: Lucas Brutschy <lbrutschy@confluent.io>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

2 participants