Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
@@ -1,9 +1,12 @@
package io.mantisrx.master.resourcecluster;

import static akka.pattern.Patterns.pipe;

import akka.actor.AbstractActorWithTimers;
import akka.actor.ActorRef;
import akka.actor.Props;
import akka.actor.Status;
import akka.dispatch.MessageDispatcher;
import com.netflix.spectator.api.Tag;
import com.netflix.spectator.api.TagList;
import io.mantisrx.common.Ack;
Expand Down Expand Up @@ -109,6 +112,27 @@
@Slf4j
public class ExecutorStateManagerActor extends AbstractActorWithTimers {

@Value
static class LoadDisabledTaskExecutorsResponse {
List<DisableTaskExecutorsRequest> requests;
int attempt;
}

@Value
static class LoadDisabledTaskExecutorsFailure {
int attempt;
Throwable cause;
}

@Value
static class RetryLoadDisabledTaskExecutors {
int attempt;
}

static final int MAX_INIT_LOAD_RETRIES = 5;
static final Duration INIT_LOAD_RETRY_DELAY = Duration.ofSeconds(5);
static final String BLOCKING_IO_DISPATCHER = "akka.actor.default-blocking-io-dispatcher";

@Value
static class ExpireDisableTaskExecutorsRequest {
DisableTaskExecutorsRequest request;
Expand Down Expand Up @@ -337,18 +361,35 @@ public static Props props(
@Override
public void preStart() throws Exception {
super.preStart();
List<DisableTaskExecutorsRequest> activeRequests =
mantisJobStore.loadAllDisableTaskExecutorsRequests(clusterID);
for (DisableTaskExecutorsRequest request : activeRequests) {
onNewDisableTaskExecutorsRequest(request);
}
// Load disabled task executors asynchronously on the blocking-io-dispatcher
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do we need pre-load all the disabled task executors into memory? can we load on demand?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this is to load existing ones during leader switch so applied disable reqs can remain effective.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as below

// to avoid blocking both preStart() and the actor thread.
// A blocking call in preStart() throws ActorInitializationException on timeout,
// causing permanent actor death with no restart.
startAsyncLoadDisabledTaskExecutors(0);

timers().startTimerWithFixedDelay(
String.format("periodic-disabled-task-executors-test-for-%s", clusterID.getResourceID()),
new CheckDisabledTaskExecutors("periodic"),
disabledTaskExecutorsCheckInterval);
}

private void startAsyncLoadDisabledTaskExecutors(int attempt) {
MessageDispatcher ioDispatcher = getContext().getSystem().dispatchers().lookup(BLOCKING_IO_DISPATCHER);
CompletableFuture<Object> future = CompletableFuture.supplyAsync(
() -> {
try {
List<DisableTaskExecutorsRequest> requests =
Copy link
Collaborator

@hellolittlej hellolittlej Mar 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

does DisableTaskExecutorsRequest table have ttl?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

TTL is set at data store level

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i meant we should not load expired disableTaskexecutorRequest into memory during start up, and we should set expiration for this req if it doesn't have this field

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

bulk read (+ ttl on data store side) should be efficient enough here? the actual logic level check will exclude expired ones.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

if we are going to exclude expired ones in later logic anyway, why do we need keep them in memory during start up?

Copy link
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's to simplify the IO logic on data store scan/load. it can be improved too but i prefer not to touch that component for now.

mantisJobStore.loadAllDisableTaskExecutorsRequests(clusterID);
return (Object) new LoadDisabledTaskExecutorsResponse(requests, attempt);
} catch (IOException e) {
throw new java.util.concurrent.CompletionException(e);
}
},
ioDispatcher)
.exceptionally(t -> new LoadDisabledTaskExecutorsFailure(attempt, t));
pipe(future, getContext().getDispatcher()).to(self());
}

@Override
public Receive createReceive() {
return receiveBuilder()
Expand Down Expand Up @@ -387,6 +428,9 @@ public Receive createReceive() {
.match(ExpireDisableTaskExecutorsRequest.class, this::onDisableTaskExecutorsRequestExpiry)
.match(UpdateJobArtifactsToCache.class, this::onUpdateJobArtifactsToCache)
.match(DisableTaskExecutorsRequest.class, this::onNewDisableTaskExecutorsRequest)
.match(LoadDisabledTaskExecutorsResponse.class, this::onLoadDisabledTaskExecutorsSuccess)
.match(LoadDisabledTaskExecutorsFailure.class, this::onLoadDisabledTaskExecutorsFailure)
.match(RetryLoadDisabledTaskExecutors.class, req -> startAsyncLoadDisabledTaskExecutors(req.getAttempt()))
.match(Ack.class, ack -> log.debug("Received ack from {}", sender()))
.build();
}
Expand Down Expand Up @@ -1003,6 +1047,34 @@ private void onUpdateJobArtifactsToCache(UpdateJobArtifactsToCache update) {
this.jobArtifactsToCache.addAll(update.getArtifacts());
}

private void onLoadDisabledTaskExecutorsSuccess(LoadDisabledTaskExecutorsResponse response) {
log.info("Loaded {} disabled task executor requests for cluster {} (attempt {})",
response.getRequests().size(), clusterID, response.getAttempt() + 1);
for (DisableTaskExecutorsRequest disableRequest : response.getRequests()) {
restoreDisableTaskExecutorsRequest(disableRequest);
}
}

private void onLoadDisabledTaskExecutorsFailure(LoadDisabledTaskExecutorsFailure failure) {
metrics.incrementCounter(
ResourceClusterActorMetrics.EXECUTOR_STATE_MANAGER_INIT_FAILURE,
TagList.create(ImmutableMap.of("resourceCluster", clusterID.getResourceID())));
int nextAttempt = failure.getAttempt() + 1;
if (nextAttempt <= MAX_INIT_LOAD_RETRIES) {
log.error("Failed to load disabled task executors for cluster {} (attempt {}/{}), scheduling retry",
clusterID, failure.getAttempt() + 1, MAX_INIT_LOAD_RETRIES, failure.getCause());
// Schedule a timer that re-triggers the async load on the io-dispatcher.
timers().startSingleTimer(
"load-disabled-te-retry-" + clusterID.getResourceID(),
new RetryLoadDisabledTaskExecutors(nextAttempt),
INIT_LOAD_RETRY_DELAY);
} else {
log.error("Exhausted retries loading disabled task executors for cluster {} after {} attempts. "
+ "Relying on periodic CheckDisabledTaskExecutors for eventual consistency.",
clusterID, MAX_INIT_LOAD_RETRIES, failure.getCause());
}
}

// custom equals function to check if the existing set already has the request under consideration.
private boolean addNewDisableTaskExecutorsRequest(DisableTaskExecutorsRequest newRequest) {
if (newRequest.isRequestByAttributes()) {
Expand All @@ -1016,19 +1088,34 @@ private boolean addNewDisableTaskExecutorsRequest(DisableTaskExecutorsRequest ne
Preconditions.checkState(activeDisableTaskExecutorsByAttributesRequests.add(newRequest), "activeDisableTaskExecutorRequests cannot contain %s", newRequest);
return true;
} else if (newRequest.getTaskExecutorID().isPresent() && !disabledTaskExecutors.contains(newRequest.getTaskExecutorID().get())) {
log.info("Req with id {}", newRequest);
log.debug("DisableTaskExecutorsRequest with id {}", newRequest);
disabledTaskExecutors.add(newRequest.getTaskExecutorID().get());
return true;
}
log.info("No Req {}", newRequest);
log.debug("skip DisableTaskExecutorsRequest Req {}", newRequest);
return false;
}

/**
* Restore a disable request loaded from the store into in-memory state.
* Skips the store write since the request already exists in persistence.
*/
private void restoreDisableTaskExecutorsRequest(DisableTaskExecutorsRequest request) {
if (addNewDisableTaskExecutorsRequest(request)) {
Duration toExpiry = Comparators.max(Duration.between(clock.instant(), request.getExpiry()), Duration.ZERO);
getTimers().startSingleTimer(
getExpiryKeyFor(request),
new ExpireDisableTaskExecutorsRequest(request),
toExpiry);
findAndMarkDisabledTaskExecutorsFor(request);
}
}

private void onNewDisableTaskExecutorsRequest(DisableTaskExecutorsRequest request) {
ActorRef sender = sender();
if (addNewDisableTaskExecutorsRequest(request)) {
try {
log.info("New req to add {}", request);
log.info("New DisableTaskExecutorsRequest to add {}", request);
// store the request in a persistent store in order to retrieve it if the node goes down
mantisJobStore.storeNewDisabledTaskExecutorsRequest(request);
// figure out the time to expire the current request
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
import akka.actor.Props;
import akka.actor.Status;
import akka.actor.SupervisorStrategy;
import akka.actor.Terminated;
import akka.japi.pf.ReceiveBuilder;
import com.netflix.spectator.api.TagList;
import io.mantisrx.common.Ack;
Expand Down Expand Up @@ -99,6 +100,13 @@ public SupervisorStrategy supervisorStrategy() {
return MantisActorSupervisorStrategy.getInstance().create();
}

private static final Duration CHILD_RECREATE_DELAY = Duration.ofSeconds(10);

@Value
static class RecreateChildActor {
String actorName;
}

private final Duration heartbeatTimeout;
private final Duration assignmentTimeout;
private final Duration disabledTaskExecutorsCheckInterval;
Expand Down Expand Up @@ -268,6 +276,7 @@ public void preStart() throws Exception {
Props registryProps = ReservationRegistryActor.props(this.clusterID, clock, null, null, null, metrics);
reservationRegistryActor = getContext().actorOf(registryProps, reservationRegistryActorName);
}
getContext().watch(reservationRegistryActor);

Option<ActorRef> existingExecutorStateManager = getContext().child(executorStateManagerActorName);
if (existingExecutorStateManager.isDefined()) {
Expand All @@ -293,6 +302,7 @@ public void preStart() throws Exception {
reservationSchedulingEnabled);
executorStateManagerActor = getContext().actorOf(esmProps, executorStateManagerActorName);
}
getContext().watch(executorStateManagerActor);

syncExecutorJobArtifactsCache();

Expand Down Expand Up @@ -370,6 +380,8 @@ public Receive createReceive() {
.match(AddNewJobArtifactsToCacheRequest.class, this::onAddNewJobArtifactsToCacheRequest)
.match(RemoveJobArtifactsToCacheRequest.class, this::onRemoveJobArtifactsToCacheRequest)
.match(GetJobArtifactsToCacheRequest.class, req -> sender().tell(new ArtifactList(new ArrayList<>(jobArtifactsToCache)), self()))
.match(Terminated.class, this::onChildTerminated)
.match(RecreateChildActor.class, this::onRecreateChildActor)
.build();
}

Expand Down Expand Up @@ -451,6 +463,103 @@ private void onGetReservationAwareClusterUsage(GetReservationAwareClusterUsageRe
});
}

private void onChildTerminated(Terminated terminated) {
ActorRef deadActor = terminated.getActor();
String actorType;
if (deadActor.equals(executorStateManagerActor)) {
actorType = "executorStateManager";
log.error("{} terminated for cluster {}. Nulling ref and scheduling re-creation.",
actorType, clusterID);
executorStateManagerActor = null;
timers().startSingleTimer(
"recreate-esm-" + clusterID.getResourceID(),
new RecreateChildActor(executorStateManagerActorName),
CHILD_RECREATE_DELAY);
} else if (deadActor.equals(reservationRegistryActor)) {
actorType = "reservationRegistry";
log.error("{} terminated for cluster {}. Nulling ref and scheduling re-creation.",
actorType, clusterID);
reservationRegistryActor = null;
timers().startSingleTimer(
"recreate-rr-" + clusterID.getResourceID(),
new RecreateChildActor(reservationRegistryActorName),
CHILD_RECREATE_DELAY);
} else {
actorType = "unknown";
log.error("Received Terminated for unknown actor {} in cluster {}", deadActor, clusterID);
}
metrics.incrementCounter(
ResourceClusterActorMetrics.CHILD_ACTOR_TERMINATED,
TagList.create(ImmutableMap.of(
"resourceCluster", clusterID.getResourceID(),
"actorType", actorType)));
}

private void onRecreateChildActor(RecreateChildActor request) {
String actorName = request.getActorName();
// Check if actor already exists (e.g. from a race with restart)
Option<ActorRef> existing = getContext().child(actorName);
if (existing.isDefined()) {
log.info("Child actor {} already exists for cluster {}, skipping re-creation", actorName, clusterID);
if (actorName.equals(executorStateManagerActorName)) {
executorStateManagerActor = existing.get();
getContext().watch(executorStateManagerActor);
} else if (actorName.equals(reservationRegistryActorName)) {
reservationRegistryActor = existing.get();
getContext().watch(reservationRegistryActor);
}
return;
}

try {
if (actorName.equals(executorStateManagerActorName)) {
if (!(executorStateManager instanceof ExecutorStateManagerImpl)) {
throw new IllegalStateException("ExecutorStateManager is not an instance of ExecutorStateManagerImpl");
}
Props esmProps = ExecutorStateManagerActor.props(
(ExecutorStateManagerImpl) executorStateManager,
clock,
rpcService,
jobMessageRouter,
mantisJobStore,
heartbeatTimeout,
assignmentTimeout,
disabledTaskExecutorsCheckInterval,
clusterID,
isJobArtifactCachingEnabled,
jobClustersWithArtifactCachingEnabled,
metrics,
executeStageRequestFactory,
reservationSchedulingEnabled);
executorStateManagerActor = getContext().actorOf(esmProps, executorStateManagerActorName);
getContext().watch(executorStateManagerActor);
syncExecutorJobArtifactsCache();
log.info("Successfully recreated ExecutorStateManagerActor for cluster {}", clusterID);
metrics.incrementCounter(
ResourceClusterActorMetrics.CHILD_ACTOR_RECREATED,
TagList.create(ImmutableMap.of(
"resourceCluster", clusterID.getResourceID(),
"actorType", "executorStateManager")));
} else if (actorName.equals(reservationRegistryActorName)) {
Props registryProps = ReservationRegistryActor.props(this.clusterID, clock, null, null, null, metrics);
reservationRegistryActor = getContext().actorOf(registryProps, reservationRegistryActorName);
getContext().watch(reservationRegistryActor);
log.info("Successfully recreated ReservationRegistryActor for cluster {}", clusterID);
metrics.incrementCounter(
ResourceClusterActorMetrics.CHILD_ACTOR_RECREATED,
TagList.create(ImmutableMap.of(
"resourceCluster", clusterID.getResourceID(),
"actorType", "reservationRegistry")));
}
} catch (Exception e) {
log.error("Failed to recreate child actor {} for cluster {}, scheduling retry", actorName, clusterID, e);
timers().startSingleTimer(
"recreate-retry-" + actorName,
new RecreateChildActor(actorName),
CHILD_RECREATE_DELAY);
}
}

private void syncExecutorJobArtifactsCache() {
if (executorStateManagerActor == null) {
return;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,10 @@ class ResourceClusterActorMetrics {
public static final String NUM_PENDING_RESERVATIONS = "numPendingReservations";
public static final String RESERVATION_FULFILLMENT_LATENCY = "reservationFulfillmentLatency";

public static final String EXECUTOR_STATE_MANAGER_INIT_FAILURE = "executorStateManagerInitFailure";
public static final String CHILD_ACTOR_TERMINATED = "childActorTerminated";
public static final String CHILD_ACTOR_RECREATED = "childActorRecreated";

private final Registry registry;
private final ConcurrentHashMap<String, Tuple2<Counter, Timer>> messageMetrics;
private final Tuple2<Counter, Timer> unknownMessageMetrics;
Expand Down
Loading
Loading