Skip to content

Commit

Permalink
Add remove/get job artifacts to cache endpoints.
Browse files Browse the repository at this point in the history
  • Loading branch information
fdc-ntflx committed Jun 9, 2023
1 parent b119679 commit a2c1084
Show file tree
Hide file tree
Showing 11 changed files with 180 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,12 @@ public interface ResourceCluster extends ResourceClusterGateway {

CompletableFuture<Ack> addNewJobArtifactsToCache(ClusterID clusterID, List<ArtifactID> artifacts);

CompletableFuture<Ack> removeJobArtifactsToCache(ClusterID clusterID, List<ArtifactID> artifacts);

CompletableFuture<List<ArtifactID>> getJobArtifactsToCache();

CompletableFuture<List<ArtifactID>> listJobArtifactsOnTaskExecutor(TaskExecutorID taskExecutorID);

/**
* Can throw {@link NoResourceAvailableException} wrapped within the CompletableFuture in case there
* are no task executors.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
import io.mantisrx.server.core.CacheJobArtifactsRequest;
import io.mantisrx.server.core.ExecuteStageRequest;
import io.mantisrx.server.core.domain.WorkerId;
import java.util.List;
import java.util.concurrent.CompletableFuture;
import org.apache.flink.runtime.rpc.RpcGateway;

Expand All @@ -38,13 +39,21 @@ public interface TaskExecutorGateway extends RpcGateway {
CompletableFuture<Ack> submitTask(ExecuteStageRequest request);

/**
* instruct the task executor on which job artifact to cache in order to speed up job initialization time.
* instruct the task executor on which job artifacts to cache in order to speed up job initialization time.
*
* @param request List of job artifacts that need to be cached.
* @return Ack in any case (this task is best effort).
*/
CompletableFuture<Ack> cacheJobArtifacts(CacheJobArtifactsRequest request);

/**
* enumerate the job artifacts stored by the task executor.
*
* @param request List of job artifacts that need to be cached.
* @return Ack in any case (this task is best effort).
*/
CompletableFuture<List<String>> listJobArtifactsRequest();

/**
* cancel the currently running task and get rid of all of the associated resources.
*
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,7 @@
import io.mantisrx.master.resourcecluster.proto.ResourceClusterScaleRuleProto.CreateResourceClusterScaleRuleRequest;
import io.mantisrx.master.resourcecluster.proto.ResourceClusterScaleRuleProto.GetResourceClusterScaleRulesRequest;
import io.mantisrx.master.resourcecluster.proto.ResourceClusterScaleRuleProto.GetResourceClusterScaleRulesResponse;
import io.mantisrx.master.resourcecluster.proto.ResourceClusterScaleRuleProto.JobArtifactToCacheRequest;
import io.mantisrx.master.resourcecluster.proto.ResourceClusterScaleRuleProto.JobArtifactsToCacheRequest;
import io.mantisrx.master.resourcecluster.proto.ScaleResourceRequest;
import io.mantisrx.master.resourcecluster.proto.ScaleResourceResponse;
import io.mantisrx.master.resourcecluster.proto.SetResourceClusterScalerStatusRequest;
Expand Down Expand Up @@ -86,6 +86,7 @@
* /api/v1/resourceClusters/{}/scaleRules (GET, POST)
* <p>
* /api/v1/resourceClusters/{}/taskExecutors/{}/getTaskExecutorState (GET)
* /api/v1/resourceClusters/{}/taskExecutors/{}/listJobArtifacts (GET)
* <p>
* /api/v1/resourceClusters/{}/cacheJobArtifacts (POST)
*
Expand Down Expand Up @@ -239,19 +240,33 @@ protected Route constructRoutes() {
path(
PathMatchers.segment().slash("cacheJobArtifacts"),
(clusterName) -> pathEndOrSingleSlash(() -> concat(
// GET
get(() -> withFuture(gateway.getClusterFor(getClusterID(clusterName)).getJobArtifactsToCache())),

// POST
post(() -> cacheJobArtifacts(clusterName))
post(() -> cacheJobArtifacts(clusterName)),

// DELETE
delete(() -> removeJobArtifactsToCache(clusterName))
))
),

// /api/v1/resourceClusters/{}/taskExecutors/{}/getTaskExecutorState
// /api/v1/resourceClusters/{}/taskExecutors/{}
pathPrefix(
PathMatchers.segment().slash("taskExecutors"),
(clusterName) ->
(clusterName) -> concat (
// /getTaskExecutorState
path(
PathMatchers.segment().slash("getTaskExecutorState"),
(taskExecutorId) ->
pathEndOrSingleSlash(() -> concat(get(() -> getTaskExecutorState(getClusterID(clusterName), getTaskExecutorID(taskExecutorId))))))
pathEndOrSingleSlash(() -> concat(get(() -> getTaskExecutorState(getClusterID(clusterName), getTaskExecutorID(taskExecutorId)))))),

// /listJobArtifacts
path(
PathMatchers.segment().slash("listJobArtifacts"),
(taskExecutorId) ->
pathEndOrSingleSlash(() -> concat(get(() -> withFuture(gateway.getClusterFor(getClusterID(clusterName)).listJobArtifactsOnTaskExecutor(getTaskExecutorID(taskExecutorId)))))))
)
)
));

Expand Down Expand Up @@ -458,7 +473,7 @@ private Route getScaleRules(String clusterId) {
}

private Route cacheJobArtifacts(String clusterId) {
return entity(Jackson.unmarshaller(JobArtifactToCacheRequest.class), request -> {
return entity(Jackson.unmarshaller(JobArtifactsToCacheRequest.class), request -> {
log.info("POST /api/v1/resourceClusters/{}/cacheJobArtifacts {}", clusterId, request);
final CompletionStage<Ack> response =
gateway.getClusterFor(getClusterID(clusterId)).addNewJobArtifactsToCache(request.getClusterID(), request.getArtifacts());
Expand All @@ -474,4 +489,23 @@ private Route cacheJobArtifacts(String clusterId) {
);
});
}

private Route removeJobArtifactsToCache(String clusterId) {
return entity(Jackson.unmarshaller(JobArtifactsToCacheRequest.class), request -> {
log.info("DELETE /api/v1/resourceClusters/{}/cacheJobArtifacts {}", clusterId, request);

final CompletionStage<Ack> response =
gateway.getClusterFor(getClusterID(clusterId)).removeJobArtifactsToCache(request.getClusterID(), request.getArtifacts());

return completeAsync(
response.thenApply(dontCare -> new BaseResponse(request.requestId, BaseResponse.ResponseCode.SUCCESS, "job artifacts removed successfully")),
resp -> complete(
StatusCodes.OK,
request.getArtifacts(),
Jackson.marshaller()),
Endpoints.RESOURCE_CLUSTERS,
HttpRequestMetrics.HttpVerb.DELETE
);
});
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,7 @@
import java.time.Clock;
import java.time.Duration;
import java.time.Instant;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -197,18 +198,32 @@ public Receive createReceive() {
.match(PublishResourceOverviewMetricsRequest.class, this::onPublishResourceOverviewMetricsRequest)
.match(CacheJobArtifactsOnTaskExecutorRequest.class, this::onCacheJobArtifactsOnTaskExecutorRequest)
.match(AddNewJobArtifactsToCacheRequest.class, this::onAddNewJobArtifactsToCacheRequest)
.match(RemoveJobArtifactsToCacheRequest.class, this::onRemoveJobArtifactsToCacheRequest)
.match(GetJobArtifactsToCacheRequest.class, req -> sender().tell(new ArtifactList(new ArrayList<>(jobArtifactsToCache)), self()))
.match(ListJobArtifactsOnTaskExecutorRequest.class, this::onListJobArtifactsOnTaskExecutorRequest)
.build();
}

private void onAddNewJobArtifactsToCacheRequest(AddNewJobArtifactsToCacheRequest req) {
try {
mantisJobStore.addNewJobArtifactsToCache(req.getClusterID(), req.getArtifacts());
jobArtifactsToCache.addAll(req.artifacts);
sender().tell(Ack.getInstance(), self());
} catch (IOException e) {
log.warn("Cannot add new job artifacts {} to cache in cluster: {}", req.getArtifacts(), req.getClusterID(), e);
}
}

private void onRemoveJobArtifactsToCacheRequest(RemoveJobArtifactsToCacheRequest req) {
try {
mantisJobStore.removeJobArtifactsToCache(req.getClusterID(), req.getArtifacts());
req.artifacts.forEach(jobArtifactsToCache::remove);
sender().tell(Ack.getInstance(), self());
} catch (IOException e) {
log.warn("Cannot remove job artifacts {} to cache in cluster: {}", req.getArtifacts(), req.getClusterID(), e);
}
}

private void fetchJobArtifactsToCache() {
try {
mantisJobStore.getJobArtifactsToCache(clusterID)
Expand Down Expand Up @@ -666,6 +681,18 @@ private void onCacheJobArtifactsOnTaskExecutorRequest(CacheJobArtifactsOnTaskExe
}
}

private void onListJobArtifactsOnTaskExecutorRequest(ListJobArtifactsOnTaskExecutorRequest request) {
ActorRef sender = getSender();
TaskExecutorState state = this.executorStateManager.get(request.getTaskExecutorID());
if (state != null) {
TaskExecutorGateway gateway = state.getGateway().join();
gateway.listJobArtifactsRequest()
.thenAccept(artifacts -> {
sender.tell(new ArtifactList(artifacts.stream().map(ArtifactID::of).collect(Collectors.toList())), self());
});
}
}

@Value
private static class HeartbeatTimeout {

Expand Down Expand Up @@ -775,6 +802,11 @@ static class TaskExecutorsList {
List<TaskExecutorID> taskExecutors;
}

@Value
static class ArtifactList {
List<ArtifactID> artifacts;
}

@Value
static class GetClusterUsageRequest {
ClusterID clusterID;
Expand All @@ -801,13 +833,32 @@ static class CacheJobArtifactsOnTaskExecutorRequest {
ClusterID clusterID;
}

@Value
static class ListJobArtifactsOnTaskExecutorRequest {
TaskExecutorID taskExecutorID;
ClusterID clusterID;
}

@Value
@Builder
static class AddNewJobArtifactsToCacheRequest {
ClusterID clusterID;
List<ArtifactID> artifacts;
}

@Value
@Builder
static class RemoveJobArtifactsToCacheRequest {
ClusterID clusterID;
List<ArtifactID> artifacts;
}

@Value
@Builder
static class GetJobArtifactsToCacheRequest {
ClusterID clusterID;
}

/**
* Represents the Availability of a given node in the resource cluster.
* Can go from PENDING -> ASSIGNED(workerId) -> RUNNING(workerId) -> PENDING
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,15 +20,19 @@
import akka.pattern.Patterns;
import io.mantisrx.common.Ack;
import io.mantisrx.master.resourcecluster.ResourceClusterActor.AddNewJobArtifactsToCacheRequest;
import io.mantisrx.master.resourcecluster.ResourceClusterActor.ArtifactList;
import io.mantisrx.master.resourcecluster.ResourceClusterActor.GetActiveJobsRequest;
import io.mantisrx.master.resourcecluster.ResourceClusterActor.GetAssignedTaskExecutorRequest;
import io.mantisrx.master.resourcecluster.ResourceClusterActor.GetAvailableTaskExecutorsRequest;
import io.mantisrx.master.resourcecluster.ResourceClusterActor.GetBusyTaskExecutorsRequest;
import io.mantisrx.master.resourcecluster.ResourceClusterActor.GetJobArtifactsToCacheRequest;
import io.mantisrx.master.resourcecluster.ResourceClusterActor.GetRegisteredTaskExecutorsRequest;
import io.mantisrx.master.resourcecluster.ResourceClusterActor.GetTaskExecutorStatusRequest;
import io.mantisrx.master.resourcecluster.ResourceClusterActor.GetTaskExecutorWorkerMappingRequest;
import io.mantisrx.master.resourcecluster.ResourceClusterActor.GetUnregisteredTaskExecutorsRequest;
import io.mantisrx.master.resourcecluster.ResourceClusterActor.InitializeTaskExecutorRequest;
import io.mantisrx.master.resourcecluster.ResourceClusterActor.ListJobArtifactsOnTaskExecutorRequest;
import io.mantisrx.master.resourcecluster.ResourceClusterActor.RemoveJobArtifactsToCacheRequest;
import io.mantisrx.master.resourcecluster.ResourceClusterActor.ResourceOverviewRequest;
import io.mantisrx.master.resourcecluster.ResourceClusterActor.TaskExecutorAssignmentRequest;
import io.mantisrx.master.resourcecluster.ResourceClusterActor.TaskExecutorGatewayRequest;
Expand Down Expand Up @@ -147,6 +151,41 @@ public CompletableFuture<Ack> addNewJobArtifactsToCache(ClusterID clusterID, Lis
.toCompletableFuture();
}

@Override
public CompletableFuture<Ack> removeJobArtifactsToCache(ClusterID clusterID, List<ArtifactID> artifacts) {
return Patterns
.ask(
resourceClusterManagerActor,
new RemoveJobArtifactsToCacheRequest(clusterID, artifacts),
askTimeout)
.thenApply(Ack.class::cast)
.toCompletableFuture();
}

@Override
public CompletableFuture<List<ArtifactID>> getJobArtifactsToCache() {
return Patterns
.ask(
resourceClusterManagerActor,
new GetJobArtifactsToCacheRequest(clusterID),
askTimeout)
.thenApply(ArtifactList.class::cast)
.toCompletableFuture()
.thenApply(ArtifactList::getArtifacts);
}

@Override
public CompletableFuture<List<ArtifactID>> listJobArtifactsOnTaskExecutor(TaskExecutorID taskExecutorID) {
return Patterns
.ask(
resourceClusterManagerActor,
new ListJobArtifactsOnTaskExecutorRequest(taskExecutorID, clusterID),
askTimeout)
.thenApply(ArtifactList.class::cast)
.toCompletableFuture()
.thenApply(ArtifactList::getArtifacts);
}

@Override
public CompletableFuture<TaskExecutorID> getTaskExecutorFor(TaskExecutorAllocationRequest allocationRequest) {
return
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,9 +27,12 @@
import io.mantisrx.master.resourcecluster.ResourceClusterActor.GetAssignedTaskExecutorRequest;
import io.mantisrx.master.resourcecluster.ResourceClusterActor.GetAvailableTaskExecutorsRequest;
import io.mantisrx.master.resourcecluster.ResourceClusterActor.GetBusyTaskExecutorsRequest;
import io.mantisrx.master.resourcecluster.ResourceClusterActor.GetJobArtifactsToCacheRequest;
import io.mantisrx.master.resourcecluster.ResourceClusterActor.GetRegisteredTaskExecutorsRequest;
import io.mantisrx.master.resourcecluster.ResourceClusterActor.GetTaskExecutorStatusRequest;
import io.mantisrx.master.resourcecluster.ResourceClusterActor.GetUnregisteredTaskExecutorsRequest;
import io.mantisrx.master.resourcecluster.ResourceClusterActor.ListJobArtifactsOnTaskExecutorRequest;
import io.mantisrx.master.resourcecluster.ResourceClusterActor.RemoveJobArtifactsToCacheRequest;
import io.mantisrx.master.resourcecluster.ResourceClusterActor.ResourceOverviewRequest;
import io.mantisrx.master.resourcecluster.ResourceClusterActor.TaskExecutorAssignmentRequest;
import io.mantisrx.master.resourcecluster.ResourceClusterActor.TaskExecutorGatewayRequest;
Expand Down Expand Up @@ -145,6 +148,12 @@ public Receive createReceive() {
getRCActor(req.getClusterID()).forward(req, context()))
.match(AddNewJobArtifactsToCacheRequest.class, req ->
getRCActor(req.getClusterID()).forward(req, context()))
.match(RemoveJobArtifactsToCacheRequest.class, req ->
getRCActor(req.getClusterID()).forward(req, context()))
.match(GetJobArtifactsToCacheRequest.class, req ->
getRCActor(req.getClusterID()).forward(req, context()))
.match(ListJobArtifactsOnTaskExecutorRequest.class, req ->
getRCActor(req.getClusterID()).forward(req, context()))
.match(TriggerClusterRuleRefreshRequest.class, req ->
getRCScalerActor(req.getClusterID()).forward(req, context()))
.match(SetResourceClusterScalerStatusRequest.class, req ->
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,14 +95,14 @@ public static class ResourceClusterScaleRule {

@EqualsAndHashCode(callSuper = true)
@Value
public static class JobArtifactToCacheRequest extends BaseRequest {
public static class JobArtifactsToCacheRequest extends BaseRequest {
ClusterID clusterID;

@Singular
@NonNull
List<ArtifactID> artifacts;

public JobArtifactToCacheRequest(@JsonProperty("clusterID") ClusterID clusterID, @JsonProperty("artifacts") List<ArtifactID> artifacts) {
public JobArtifactsToCacheRequest(@JsonProperty("clusterID") ClusterID clusterID, @JsonProperty("artifacts") List<ArtifactID> artifacts) {
super();
Preconditions.checkNotNull(clusterID, "clusterID cannot be null");
Preconditions.checkNotNull(artifacts, "artifacts cannot be null");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -168,6 +168,8 @@ void storeAndUpdateWorkers(final IMantisWorkerMetadata existingWorker, final IMa

void addNewJobArtifactsToCache(ClusterID clusterID, List<ArtifactID> artifacts) throws IOException;

void removeJobArtifactsToCache(ClusterID clusterID, List<ArtifactID> artifacts) throws IOException;

List<String> listJobArtifactsToCache(ClusterID clusterID) throws IOException;

List<String> listJobArtifactsByName(String prefix, String contains) throws IOException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -702,6 +702,16 @@ public void addNewJobArtifactsToCache(ClusterID clusterID, List<ArtifactID> arti
}
}

@Override
public void removeJobArtifactsToCache(ClusterID clusterID, List<ArtifactID> artifacts) throws IOException {
for (ArtifactID artifact: artifacts) {
kvStore.delete(
JOB_ARTIFACTS_TO_CACHE_PER_CLUSTER_ID_NS,
clusterID.getResourceID(),
artifact.getResourceID());
}
}

@Override
public List<String> listJobArtifactsToCache(ClusterID clusterID) throws IOException {
return new ArrayList<>(kvStore.getAll(JOB_ARTIFACTS_TO_CACHE_PER_CLUSTER_ID_NS, clusterID.getResourceID())
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -267,6 +267,10 @@ public void addNewJobArtifactsToCache(ClusterID clusterID, List<ArtifactID> arti
storageProvider.addNewJobArtifactsToCache(clusterID, artifacts);
}

public void removeJobArtifactsToCache(ClusterID clusterID, List<ArtifactID> artifacts) throws IOException {
storageProvider.removeJobArtifactsToCache(clusterID, artifacts);
}

public List<String> getJobArtifactsToCache(ClusterID clusterID) throws IOException {
return storageProvider.listJobArtifactsToCache(clusterID);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,8 @@
import io.mantisrx.shaded.com.google.common.util.concurrent.Service;
import io.mantisrx.shaded.com.google.common.util.concurrent.Service.State;
import io.mantisrx.shaded.org.apache.curator.shaded.com.google.common.annotations.VisibleForTesting;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.Callable;
Expand Down Expand Up @@ -479,6 +481,11 @@ public CompletableFuture<Ack> cacheJobArtifacts(CacheJobArtifactsRequest request
return CompletableFuture.completedFuture(Ack.getInstance());
}

@Override
public CompletableFuture<List<String>> listJobArtifactsRequest() {
return CompletableFuture.completedFuture(Arrays.asList("Hello", "World", "Java"));
}

private void prepareTask(WrappedExecuteStageRequest wrappedRequest) {
try {
UserCodeClassLoader userCodeClassLoader = this.taskFactory.getUserCodeClassLoader(
Expand Down

0 comments on commit a2c1084

Please sign in to comment.