Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
12 changes: 12 additions & 0 deletions .teamcity/test/platform_tests/PlatformDotnetTestsLinux.kt
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,18 @@ object PlatformDotnetTestsLinux : BuildType({
failureMessage = "NullReferenceException in log"
reverse = false
}
failOnText {
conditionType = BuildFailureOnText.ConditionType.CONTAINS
pattern = "Unexpected notification ID"
failureMessage = "Unexpected notification ID in log"
reverse = false
}
failOnText {
conditionType = BuildFailureOnText.ConditionType.CONTAINS
pattern = "Unexpected response ID"
failureMessage = "Unexpected response ID in log"
reverse = false
}
}

requirements {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -82,18 +82,21 @@ public static CompletableFuture<ResponseWriter> process(
.clientAddress(clientContext.remoteAddress().toString());

TaskExecution<Object> execution = compute.submitMapReduceInternal(taskDescriptor, metadataBuilder, arg, null);
sendTaskResult(execution, notificationSender);

var idsAsync = execution.idsAsync()
.handle((ids, ex) -> {
// empty ids in case of split exception to properly respond with task id and failed status
return ex == null ? ids : Collections.<UUID>emptyList();
});

return execution.idAsync().thenCompose(id -> idsAsync.thenApply(ids -> out -> {
//noinspection DataFlowIssue
out.packUuid(id);
packJobIds(out, ids);
return execution.idAsync().thenCompose(id -> idsAsync.thenApply(ids -> {
sendTaskResult(execution, notificationSender);

return out -> {
//noinspection DataFlowIssue
out.packUuid(id);
packJobIds(out, ids);
};
}));
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,6 @@
import java.util.Set;
import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.function.Function;
import org.apache.ignite.client.handler.ClientContext;
import org.apache.ignite.client.handler.NotificationSender;
import org.apache.ignite.client.handler.ResponseWriter;
Expand Down Expand Up @@ -131,12 +130,8 @@ static CompletableFuture<ComputeJobDataHolder> sendResultAndState(
CompletableFuture<JobExecution<ComputeJobDataHolder>> executionFut,
NotificationSender notificationSender
) {
return executionFut.handle((execution, throwable) -> {
if (throwable != null) {
notificationSender.sendNotification(null, throwable, NULL_HYBRID_TIMESTAMP);
return CompletableFuture.<ComputeJobDataHolder>failedFuture(throwable);
} else {
return execution.resultAsync().whenComplete((val, err) ->
return executionFut.thenCompose(execution ->
execution.resultAsync().whenComplete((val, err) ->
execution.stateAsync().whenComplete((state, errState) -> {
try {
notificationSender.sendNotification(
Expand All @@ -150,9 +145,7 @@ static CompletableFuture<ComputeJobDataHolder> sendResultAndState(
} catch (Throwable e) {
LOG.error("Failed to send job result notification: " + e.getMessage(), e);
}
}));
}
}).thenCompose(Function.identity());
})));
}

static void packSubmitResult(ClientMessagePacker out, UUID jobId, ClusterNode node) {
Expand Down