diff --git a/client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java b/client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java index 616fac5a..bdced0b2 100644 --- a/client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java +++ b/client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java @@ -160,10 +160,13 @@ public void startAndBlock() { String output = null; TaskFailureDetails failureDetails = null; try { + String instanceId = activityRequest.getOrchestrationInstance().getInstanceId(); + String taskName = activityRequest.getName(); + String taskExecutionKey = instanceId + "-" + taskName; output = taskActivityExecutor.execute( activityRequest.getName(), activityRequest.getInput().getValue(), - activityRequest.getTaskId()); + taskExecutionKey); } catch (Throwable e) { failureDetails = TaskFailureDetails.newBuilder() .setErrorType(e.getClass().getName()) diff --git a/client/src/main/java/io/dapr/durabletask/TaskActivityContext.java b/client/src/main/java/io/dapr/durabletask/TaskActivityContext.java index 5db3ba5e..6f53532e 100644 --- a/client/src/main/java/io/dapr/durabletask/TaskActivityContext.java +++ b/client/src/main/java/io/dapr/durabletask/TaskActivityContext.java @@ -7,11 +7,18 @@ * its input. */ public interface TaskActivityContext { - /** - * Gets the name of the current task activity. - * @return the name of the current task activity - */ - String getName(); + /** + * Gets the name of the current task activity. + * @return the name of the current task activity + */ + String getName(); + + /** + * Gets the task execution key of the current task activity. + * This key is used to identify the task execution and is unique for each task execution. + * @return the task execution key of the current task activity + */ + String getTaskExecutionKey(); /** * Gets the deserialized activity input. diff --git a/client/src/main/java/io/dapr/durabletask/TaskActivityExecutor.java b/client/src/main/java/io/dapr/durabletask/TaskActivityExecutor.java index 1d394545..0d860b42 100644 --- a/client/src/main/java/io/dapr/durabletask/TaskActivityExecutor.java +++ b/client/src/main/java/io/dapr/durabletask/TaskActivityExecutor.java @@ -19,7 +19,7 @@ public TaskActivityExecutor( this.logger = logger; } - public String execute(String taskName, String input, int taskId) throws Throwable { + public String execute(String taskName, String input, String taskExecutionKey) throws Throwable { TaskActivityFactory factory = this.activityFactories.get(taskName); if (factory == null) { throw new IllegalStateException( @@ -32,7 +32,7 @@ public String execute(String taskName, String input, int taskId) throws Throwabl String.format("The task factory '%s' returned a null TaskActivity object.", taskName)); } - TaskActivityContextImpl context = new TaskActivityContextImpl(taskName, input); + TaskActivityContextImpl context = new TaskActivityContextImpl(taskName, input, taskExecutionKey); // Unhandled exceptions are allowed to escape Object output = activity.run(context); @@ -44,14 +44,17 @@ public String execute(String taskName, String input, int taskId) throws Throwabl } private class TaskActivityContextImpl implements TaskActivityContext { + private final String taskExecutionKey; private final String name; private final String rawInput; + private final DataConverter dataConverter = TaskActivityExecutor.this.dataConverter; - public TaskActivityContextImpl(String activityName, String rawInput) { + public TaskActivityContextImpl(String activityName, String rawInput, String taskExecutionKey) { this.name = activityName; this.rawInput = rawInput; + this.taskExecutionKey = taskExecutionKey; } @Override @@ -59,6 +62,11 @@ public String getName() { return this.name; } + @Override + public String getTaskExecutionKey() { + return this.taskExecutionKey; + } + @Override public T getInput(Class targetType) { if (this.rawInput == null) {