Skip to content

Commit

Permalink
Adding metrics
Browse files Browse the repository at this point in the history
  • Loading branch information
sundargates committed May 16, 2023
1 parent 5034b7b commit dabf29f
Showing 1 changed file with 25 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,7 @@ class ResourceClusterAwareSchedulerActor extends AbstractActorWithTimers {
private final Duration intervalBetweenRetries;
private final Timer schedulingLatency;
private final Counter schedulingFailures;
private final Counter connectionFailures;

public static Props props(
int maxScheduleRetries,
Expand Down Expand Up @@ -90,10 +91,13 @@ public ResourceClusterAwareSchedulerActor(
.id(metricsGroup, Tag.of("resourceCluster", resourceCluster.getName()))
.addTimer("schedulingLatency")
.addCounter("schedulingFailures")
.addCounter("connectionFailures")
.build();
metricsRegistry.registerAndGet(metrics);
this.schedulingLatency = metrics.getTimer("schedulingLatency");
this.schedulingFailures = metrics.getCounter("schedulingFailures");
this.connectionFailures = metrics.getCounter("connectionFailures");

}

@Override
Expand Down Expand Up @@ -135,26 +139,37 @@ private void onInitializeRunningWorkerRequest(InitializeRunningWorkerRequestEven
}

private void onAssignedScheduleRequestEvent(AssignedScheduleRequestEvent event) {
TaskExecutorGateway gateway = null;
TaskExecutorRegistration info = null;
try {
TaskExecutorGateway gateway =
resourceCluster.getTaskExecutorGateway(event.getTaskExecutorID()).join();

TaskExecutorRegistration info =
resourceCluster.getTaskExecutorInfo(event.getTaskExecutorID()).join();
gateway = resourceCluster.getTaskExecutorGateway(event.getTaskExecutorID()).join();
info = resourceCluster.getTaskExecutorInfo(event.getTaskExecutorID()).join();
} catch (Exception e) {
// we are not able to get the gateway, which either means the node is not great or some transient network issue
// we will retry the request
log.error(
"Failed to establish connection with the task executor {}; Resubmitting the request",
event.getTaskExecutorID(), e);
connectionFailures.increment();
self().tell(event.getScheduleRequestEvent().onFailure(e), self());
}

if (gateway != null && info != null) {
CompletableFuture<Object> ackFuture =
gateway
.submitTask(executeStageRequestFactory.of(event.getScheduleRequestEvent().getRequest(), info))
.submitTask(
executeStageRequestFactory.of(event.getScheduleRequestEvent().getRequest(),
info))
.<Object>thenApply(
dontCare -> new SubmittedScheduleRequestEvent(event.getScheduleRequestEvent(),
dontCare -> new SubmittedScheduleRequestEvent(
event.getScheduleRequestEvent(),
event.getTaskExecutorID()))
.exceptionally(
throwable -> new FailedToSubmitScheduleRequestEvent(event.getScheduleRequestEvent(),
throwable -> new FailedToSubmitScheduleRequestEvent(
event.getScheduleRequestEvent(),
event.getTaskExecutorID(), throwable));

pipe(ackFuture, getContext().getDispatcher()).to(self());
} catch (Exception e) {
log.error("Failed here", e);
}
}

Expand Down

0 comments on commit dabf29f

Please sign in to comment.