Skip to content

Conversation

@siri-varma
Copy link

@siri-varma siri-varma commented May 7, 2025

Issue describing the changes in this PR

resolves #
github.com/dapr/java-sdk/issues/1331

Pull request checklist

  • My changes do not require documentation changes
    • Otherwise: Documentation issue linked to PR
  • My changes are added to the CHANGELOG.md
  • I have added all required tests (Unit tests, E2E tests)

Additional information

Additional PR information

@siri-varma
Copy link
Author

@yaron2 , @cgillum I plan to test this PR out tomorrow. Can you folks tell me your initial thoughts on this please ? This is the github issue: github.com/dapr/java-sdk/issues/1331

@siri-varma
Copy link
Author

I looked at dotnet durable task implementation. It just uses a simple run https://github.com/microsoft/durabletask-dotnet/blob/main/src/Worker/Grpc/GrpcDurableTaskWorker.Processor.cs#L333

So followed a similar implementation

this.sidecarClient = TaskHubSidecarServiceGrpc.newBlockingStub(sidecarGrpcChannel);
this.dataConverter = builder.dataConverter != null ? builder.dataConverter : new JacksonDataConverter();
this.maximumTimerInterval = builder.maximumTimerInterval != null ? builder.maximumTimerInterval : DEFAULT_MAXIMUM_TIMER_INTERVAL;
this.workerPool = Executors.newCachedThreadPool();

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please allow client to pass the workerPool, so that client can make use of the VirtualThread pool if and when required.
2. Await for the executor to shutdown before closing the connection to mitigate the risk on inflight requests being processed by threadpool

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

For the second point, we need to use the this.workerPoolshutdown() method in the public void stop() {} function.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Provided the ability to pass in the worker poll too and added the workerPoolShutdown

@salaboy
Copy link
Collaborator

salaboy commented May 7, 2025

@siri-varma @rishi-dev89 ok.. I am looking at this PR and I am trying to understand a bit the expected behaviour that is implemented.
If we use a thread pool then tasks will be executed as scheduled, so the behavior might not be the same if we have concurrent tasks. If we are ok with that change in behaviour this PR looks like ok to me, but I might be lacking some context.

@salaboy
Copy link
Collaborator

salaboy commented May 7, 2025

Also the TODO validated that this was the original intention from the author:
//TODO: Run this on a worker pool thread: https://www.baeldung.com/thread-pool-java-and-guava

@siri-varma siri-varma marked this pull request as ready for review May 7, 2025 16:06
@siri-varma
Copy link
Author

image
Validated that it is running on different threads

siri-varma added 3 commits May 7, 2025 09:20
Signed-off-by: siri-varma <siri.varma@outlook.com>
Signed-off-by: siri-varma <siri.varma@outlook.com>
Signed-off-by: siri-varma <siri.varma@outlook.com>
@siri-varma siri-varma force-pushed the users/svegiraju/add-thread branch from 7ecfa96 to 5b2db59 Compare May 7, 2025 16:20
Signed-off-by: siri-varma <siri.varma@outlook.com>
Copy link

@cgillum cgillum left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall LGTM. I especially appreciate the change to allow the user code to provide the executor pool. Just one main concern about the management of the worker's while loop.


// TODO: How do we interrupt manually?
while (true) {
while (!this.workerPool.isShutdown()) {
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm wondering what the side effects of this change might be. For sure we'll need to update the logging code to account for this new exit condition. But does this mean that user code could also accidentally shut down this loop by shutting down the worker pool? Is this change critical?

Copy link
Author

@siri-varma siri-varma May 7, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Once the workerpool is closed they cannot submit new activities anyway so I thought we could safely come out of the while loop.

Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If that's the case, we should at least log a warning.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

is this resolved? @siri-varma

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. Took care of it.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@siri-varma
Copy link
Author

@cgillum, @JoshVanL, @rishi-dev89, @cicoyle, @salaboy Addressed all the feedback on this PR. Thank you for the reviews

Signed-off-by: siri-varma <siri.varma@outlook.com>
@cicoyle cicoyle merged commit fb9c30d into dapr:main May 7, 2025
1 check passed
.setCustomStatus(StringValue.of(taskOrchestratorResult.getCustomStatus()))
.build();

this.sidecarClient.completeOrchestratorTask(response);

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please use a try catch here as well as it can throw StatusRuntimeException

catch (StatusRuntimeException e) {
if (e.getStatus().getCode() == Status.Code.UNAVAILABLE) {
logger.log(Level.INFO, "The sidecar at address {0} is unavailable. Will continue retrying.", this.getSidecarAddress());
} else if (e.getStatus().getCode() == Status.Code.CANCELLED) {
logger.log(Level.INFO, "Durable Task worker has disconnected from {0}.", this.getSidecarAddress());
logger.log(Level.INFO, "Durable Task worker has disconnected from {0}.", this.getSidecarAddress());
} else {
logger.log(Level.WARNING, "Unexpected failure connecting to {0}.", this.getSidecarAddress());
}

responseBuilder.setFailureDetails(failureDetails);
}

this.sidecarClient.completeActivityTask(responseBuilder.build());

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Try catch block would be required here as well, otherwise the exception will never be logged and could be difficult to debbug

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good idea. I will do a quick small PR to address this

}

private void shutDownWorkerPool() {
logger.log(Level.WARNING, "ExecutorService shutdown initiated. No new tasks will be accepted");
Copy link

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is not correct. Now this warning will be logged every time a user shuts down their worker, but we only want to log the warning when the worker pool was shut down unexpectedly.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I opened a PR here. I think this addresses your comment.

@cicoyle cicoyle mentioned this pull request May 7, 2025
JoshVanL pushed a commit to JoshVanL/durabletask-java that referenced this pull request May 7, 2025
@siri-varma siri-varma deleted the users/svegiraju/add-thread branch May 7, 2025 23:17
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

6 participants