diff --git a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/scheduler/ResourceClusterAwareSchedulerActor.java b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/scheduler/ResourceClusterAwareSchedulerActor.java index 3da39a80c..fd0775ec7 100644 --- a/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/scheduler/ResourceClusterAwareSchedulerActor.java +++ b/mantis-control-plane/mantis-control-plane-server/src/main/java/io/mantisrx/server/master/scheduler/ResourceClusterAwareSchedulerActor.java @@ -57,6 +57,7 @@ class ResourceClusterAwareSchedulerActor extends AbstractActorWithTimers { private final Duration intervalBetweenRetries; private final Timer schedulingLatency; private final Counter schedulingFailures; + private final Counter connectionFailures; public static Props props( int maxScheduleRetries, @@ -90,10 +91,13 @@ public ResourceClusterAwareSchedulerActor( .id(metricsGroup, Tag.of("resourceCluster", resourceCluster.getName())) .addTimer("schedulingLatency") .addCounter("schedulingFailures") + .addCounter("connectionFailures") .build(); metricsRegistry.registerAndGet(metrics); this.schedulingLatency = metrics.getTimer("schedulingLatency"); this.schedulingFailures = metrics.getCounter("schedulingFailures"); + this.connectionFailures = metrics.getCounter("connectionFailures"); + } @Override @@ -135,26 +139,37 @@ private void onInitializeRunningWorkerRequest(InitializeRunningWorkerRequestEven } private void onAssignedScheduleRequestEvent(AssignedScheduleRequestEvent event) { + TaskExecutorGateway gateway = null; + TaskExecutorRegistration info = null; try { - TaskExecutorGateway gateway = - resourceCluster.getTaskExecutorGateway(event.getTaskExecutorID()).join(); - - TaskExecutorRegistration info = - resourceCluster.getTaskExecutorInfo(event.getTaskExecutorID()).join(); + gateway = resourceCluster.getTaskExecutorGateway(event.getTaskExecutorID()).join(); + info = resourceCluster.getTaskExecutorInfo(event.getTaskExecutorID()).join(); + } catch (Exception e) { + // we are not able to get the gateway, which either means the node is not great or some transient network issue + // we will retry the request + log.error( + "Failed to establish connection with the task executor {}; Resubmitting the request", + event.getTaskExecutorID(), e); + connectionFailures.increment(); + self().tell(event.getScheduleRequestEvent().onFailure(e), self()); + } + if (gateway != null && info != null) { CompletableFuture ackFuture = gateway - .submitTask(executeStageRequestFactory.of(event.getScheduleRequestEvent().getRequest(), info)) + .submitTask( + executeStageRequestFactory.of(event.getScheduleRequestEvent().getRequest(), + info)) .thenApply( - dontCare -> new SubmittedScheduleRequestEvent(event.getScheduleRequestEvent(), + dontCare -> new SubmittedScheduleRequestEvent( + event.getScheduleRequestEvent(), event.getTaskExecutorID())) .exceptionally( - throwable -> new FailedToSubmitScheduleRequestEvent(event.getScheduleRequestEvent(), + throwable -> new FailedToSubmitScheduleRequestEvent( + event.getScheduleRequestEvent(), event.getTaskExecutorID(), throwable)); pipe(ackFuture, getContext().getDispatcher()).to(self()); - } catch (Exception e) { - log.error("Failed here", e); } }