Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -160,10 +160,13 @@ public void startAndBlock() {
String output = null;
TaskFailureDetails failureDetails = null;
try {
String instanceId = activityRequest.getOrchestrationInstance().getInstanceId();
String taskName = activityRequest.getName();
String taskExecutionKey = instanceId + "-" + taskName;
output = taskActivityExecutor.execute(
activityRequest.getName(),
activityRequest.getInput().getValue(),
activityRequest.getTaskId());
taskExecutionKey);
} catch (Throwable e) {
failureDetails = TaskFailureDetails.newBuilder()
.setErrorType(e.getClass().getName())
Expand Down
17 changes: 12 additions & 5 deletions client/src/main/java/io/dapr/durabletask/TaskActivityContext.java
Original file line number Diff line number Diff line change
Expand Up @@ -7,11 +7,18 @@
* its input.
*/
public interface TaskActivityContext {
/**
* Gets the name of the current task activity.
* @return the name of the current task activity
*/
String getName();
/**
* Gets the name of the current task activity.
* @return the name of the current task activity
*/
String getName();

/**
* Gets the task execution key of the current task activity.
* This key is used to identify the task execution and is unique for each task execution.
* @return the task execution key of the current task activity
*/
String getTaskExecutionKey();

/**
* Gets the deserialized activity input.
Expand Down
14 changes: 11 additions & 3 deletions client/src/main/java/io/dapr/durabletask/TaskActivityExecutor.java
Original file line number Diff line number Diff line change
Expand Up @@ -19,7 +19,7 @@ public TaskActivityExecutor(
this.logger = logger;
}

public String execute(String taskName, String input, int taskId) throws Throwable {
public String execute(String taskName, String input, String taskExecutionKey) throws Throwable {
TaskActivityFactory factory = this.activityFactories.get(taskName);
if (factory == null) {
throw new IllegalStateException(
Expand All @@ -32,7 +32,7 @@ public String execute(String taskName, String input, int taskId) throws Throwabl
String.format("The task factory '%s' returned a null TaskActivity object.", taskName));
}

TaskActivityContextImpl context = new TaskActivityContextImpl(taskName, input);
TaskActivityContextImpl context = new TaskActivityContextImpl(taskName, input, taskExecutionKey);

// Unhandled exceptions are allowed to escape
Object output = activity.run(context);
Expand All @@ -44,21 +44,29 @@ public String execute(String taskName, String input, int taskId) throws Throwabl
}

private class TaskActivityContextImpl implements TaskActivityContext {
private final String taskExecutionKey;
private final String name;
private final String rawInput;


private final DataConverter dataConverter = TaskActivityExecutor.this.dataConverter;

public TaskActivityContextImpl(String activityName, String rawInput) {
public TaskActivityContextImpl(String activityName, String rawInput, String taskExecutionKey) {
this.name = activityName;
this.rawInput = rawInput;
this.taskExecutionKey = taskExecutionKey;
}

@Override
public String getName() {
return this.name;
}

@Override
public String getTaskExecutionKey() {
return this.taskExecutionKey;
}

@Override
public <T> T getInput(Class<T> targetType) {
if (this.rawInput == null) {
Expand Down
Loading