Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

also grab containerId when grabbing directory #1344

Merged
merged 4 commits into from Nov 21, 2016
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -4,6 +4,7 @@

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Objects;

public class MesosExecutorObject {

Expand Down Expand Up @@ -54,4 +55,16 @@ public String getId() {
return id;
}

@Override
public String toString() {
return Objects.toStringHelper(this)
.add("directory", directory)
.add("id", id)
.add("container", container)
.add("name", name)
.add("resources", resources)
.add("tasks", tasks)
.add("completedTasks", completedTasks)
.toString();
}
}
Expand Up @@ -4,6 +4,7 @@

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Objects;
import com.hubspot.mesos.SingularityMesosTaskLabel;

public class MesosTaskObject {
Expand Down Expand Up @@ -55,4 +56,17 @@ public String getFrameworkId() {
public String getExecutorId() {
return executorId;
}

@Override
public String toString() {
return Objects.toStringHelper(this)
.add("resources", resources)
.add("state", state)
.add("id", id)
.add("name", name)
.add("slaveId", slaveId)
.add("frameworkId", frameworkId)
.add("executorId", executorId)
.toString();
}
}
Expand Up @@ -5,26 +5,29 @@
import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonIgnore;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Objects;
import com.google.common.base.Optional;
import com.hubspot.mesos.JavaUtils;

public class SingularityTaskHistory {

private final List<SingularityTaskHistoryUpdate> taskUpdates;
private final Optional<String> directory;
private final Optional<String> containerId;
private final SingularityTask task;
private final List<SingularityTaskHealthcheckResult> healthcheckResults;
private final List<SingularityLoadBalancerUpdate> loadBalancerUpdates;
private final List<SingularityTaskShellCommandHistory> shellCommandHistory;
private final List<SingularityTaskMetadata> taskMetadata;

@JsonCreator
public SingularityTaskHistory(@JsonProperty("taskUpdates") List<SingularityTaskHistoryUpdate> taskUpdates, @JsonProperty("directory") Optional<String> directory,
@JsonProperty("healthcheckResults") List<SingularityTaskHealthcheckResult> healthcheckResults, @JsonProperty("task") SingularityTask task,
public SingularityTaskHistory(@JsonProperty("taskUpdates") List<SingularityTaskHistoryUpdate> taskUpdates, @JsonProperty("directory") Optional<String> directory, @JsonProperty("containerId") Optional<String> containerId,
@JsonProperty("healthcheckResults") List<SingularityTaskHealthcheckResult> healthcheckResults, @JsonProperty("task") SingularityTask task,
@JsonProperty("loadBalancerUpdates") List<SingularityLoadBalancerUpdate> loadBalancerUpdates,
@JsonProperty("shellCommandHistory") List<SingularityTaskShellCommandHistory> shellCommandHistory,
@JsonProperty("taskMetadata") List<SingularityTaskMetadata> taskMetadata) {
this.directory = directory;
this.containerId = containerId;
this.task = task;
this.taskUpdates = JavaUtils.nonNullImmutable(taskUpdates);
this.healthcheckResults = JavaUtils.nonNullImmutable(healthcheckResults);
Expand All @@ -41,6 +44,10 @@ public Optional<String> getDirectory() {
return directory;
}

public Optional<String> getContainerId() {
return containerId;
}

public SingularityTask getTask() {
return task;
}
Expand Down Expand Up @@ -68,8 +75,16 @@ public Optional<SingularityTaskHistoryUpdate> getLastTaskUpdate() {

@Override
public String toString() {
return "SingularityTaskHistory [taskUpdates=" + taskUpdates + ", directory=" + directory + ", task=" + task + ", healthcheckResults=" + healthcheckResults + ", loadBalancerUpdates="
+ loadBalancerUpdates + ", shellCommandHistory=" + shellCommandHistory + ", taskMetadata=" + taskMetadata + "]";
return Objects.toStringHelper(this)
.add("taskUpdates", taskUpdates)
.add("directory", directory)
.add("containerId", containerId)
.add("task", task)
.add("healthcheckResults", healthcheckResults)
.add("loadBalancerUpdates", loadBalancerUpdates)
.add("shellCommandHistory", shellCommandHistory)
.add("taskMetadata", taskMetadata)
.add("lastTaskUpdate", getLastTaskUpdate())
.toString();
}

}
Expand Up @@ -82,6 +82,7 @@ public class TaskManager extends CuratorAsyncManager {

private static final String LAST_HEALTHCHECK_KEY = "LAST_HEALTHCHECK";
private static final String DIRECTORY_KEY = "DIRECTORY";
private static final String CONTAINER_ID_KEY = "CONTAINER_ID";
private static final String TASK_KEY = "TASK";
private static final String NOTIFIED_OVERDUE_TO_FINISH_KEY = "NOTIFIED_OVERDUE_TO_FINISH";

Expand Down Expand Up @@ -214,6 +215,10 @@ private String getDirectoryPath(SingularityTaskId taskId) {
return ZKPaths.makePath(getHistoryPath(taskId), DIRECTORY_KEY);
}

private String getContainerIdPath(SingularityTaskId taskId) {
return ZKPaths.makePath(getHistoryPath(taskId), CONTAINER_ID_KEY);
}

private String getNotifiedOverduePath(SingularityTaskId taskId) {
return ZKPaths.makePath(getHistoryPath(taskId), NOTIFIED_OVERDUE_TO_FINISH_KEY);
}
Expand Down Expand Up @@ -272,6 +277,10 @@ public void saveTaskDirectory(SingularityTaskId taskId, String directory) {
save(getDirectoryPath(taskId), Optional.of(directory.getBytes(UTF_8)));
}

public void saveContainerId(SingularityTaskId taskId, String containerId) {
save(getContainerIdPath(taskId), Optional.of(containerId.getBytes(UTF_8)));
}

@Timed
public void saveLastActiveTaskStatus(SingularityTaskStatusHolder taskStatus) {
save(getLastActiveTaskStatusPath(taskStatus.getTaskId()), taskStatus, taskStatusTranscoder);
Expand All @@ -281,6 +290,10 @@ public Optional<String> getDirectory(SingularityTaskId taskId) {
return getData(getDirectoryPath(taskId), StringTranscoder.INSTANCE);
}

public Optional<String> getContainerId(SingularityTaskId taskId) {
return getData(getContainerIdPath(taskId), StringTranscoder.INSTANCE);
}

public void saveHealthcheckResult(SingularityTaskHealthcheckResult healthcheckResult) {
final Optional<byte[]> bytes = Optional.of(healthcheckResultTranscoder.toBytes(healthcheckResult));

Expand Down Expand Up @@ -547,6 +560,7 @@ public Optional<SingularityTaskHistory> getTaskHistory(SingularityTaskId taskId)

List<SingularityTaskHistoryUpdate> taskUpdates = getTaskHistoryUpdates(taskId);
Optional<String> directory = getDirectory(taskId);
Optional<String> containerId = getContainerId(taskId);
List<SingularityTaskHealthcheckResult> healthchecks = getHealthcheckResults(taskId);

List<SingularityLoadBalancerUpdate> loadBalancerUpdates = Lists.newArrayListWithCapacity(2);
Expand All @@ -558,7 +572,7 @@ public Optional<SingularityTaskHistory> getTaskHistory(SingularityTaskId taskId)

List<SingularityTaskMetadata> taskMetadata = getTaskMetadata(taskId);

return Optional.of(new SingularityTaskHistory(taskUpdates, directory, healthchecks, task.get(), loadBalancerUpdates, shellCommandHistory, taskMetadata));
return Optional.of(new SingularityTaskHistory(taskUpdates, directory, containerId, healthchecks, task.get(), loadBalancerUpdates, shellCommandHistory, taskMetadata));
}

private List<SingularityTaskShellCommandHistory> getTaskShellCommandHistory(SingularityTaskId taskId) {
Expand Down
Expand Up @@ -27,17 +27,17 @@
import io.dropwizard.lifecycle.Managed;

@Singleton
public class SingularityLogSupport implements Managed {
public class SingularityMesosExecutorInfoSupport implements Managed {

private static final Logger LOG = LoggerFactory.getLogger(SingularityLogSupport.class);
private static final Logger LOG = LoggerFactory.getLogger(SingularityMesosExecutorInfoSupport.class);

private final MesosClient mesosClient;
private final TaskManager taskManager;

private final ThreadPoolExecutor logLookupExecutorService;

@Inject
public SingularityLogSupport(SingularityConfiguration configuration, MesosClient mesosClient, TaskManager taskManager) {
public SingularityMesosExecutorInfoSupport(SingularityConfiguration configuration, MesosClient mesosClient, TaskManager taskManager) {
this.mesosClient = mesosClient;
this.taskManager = taskManager;

Expand All @@ -53,77 +53,88 @@ public void stop() {
MoreExecutors.shutdownAndAwaitTermination(logLookupExecutorService, 1, TimeUnit.SECONDS);
}

private Optional<String> findDirectory(SingularityTaskId taskId, List<MesosExecutorObject> executors) {
private Optional<MesosExecutorObject> findExecutor(SingularityTaskId taskId, List<MesosExecutorObject> executors) {
for (MesosExecutorObject executor : executors) {
for (MesosTaskObject executorTask : executor.getTasks()) {
if (taskId.getId().equals(executorTask.getId())) {
return Optional.of(executor.getDirectory());
return Optional.of(executor);
}
}
for (MesosTaskObject executorTask : executor.getCompletedTasks()) {
if (taskId.getId().equals(executorTask.getId())) {
return Optional.of(executor.getDirectory());
return Optional.of(executor);
}
}
}

return Optional.absent();
}

private void loadDirectory(SingularityTask task) {
private void loadDirectoryAndContainer(SingularityTask task) {
final long start = System.currentTimeMillis();

final String slaveUri = mesosClient.getSlaveUri(task.getOffer().getHostname());

LOG.info("Fetching slave data to find log directory for task {} from uri {}", task.getTaskId(), slaveUri);
LOG.info("Fetching slave data to find log directory and container id for task {} from uri {}", task.getTaskId(), slaveUri);

MesosSlaveStateObject slaveState = mesosClient.getSlaveState(slaveUri);

Optional<String> directory = Optional.absent();
Optional<String> containerId = Optional.absent();

for (MesosSlaveFrameworkObject slaveFramework : slaveState.getFrameworks()) {
directory = findDirectory(task.getTaskId(), slaveFramework.getExecutors());
if (directory.isPresent()) {
Optional<MesosExecutorObject> maybeExecutor = findExecutor(task.getTaskId(), slaveFramework.getExecutors());
if (maybeExecutor.isPresent()) {
directory = Optional.of(maybeExecutor.get().getDirectory());
containerId = Optional.of(maybeExecutor.get().getContainer());
break;
}

directory = findDirectory(task.getTaskId(), slaveFramework.getCompletedExecutors());
if (directory.isPresent()) {
maybeExecutor = findExecutor(task.getTaskId(), slaveFramework.getCompletedExecutors());
if (maybeExecutor.isPresent()) {
directory = Optional.of(maybeExecutor.get().getDirectory());
containerId = Optional.of(maybeExecutor.get().getContainer());
break;
}
}

if (!directory.isPresent()) {
if (!directory.isPresent() && !containerId.isPresent()) {
LOG.warn("Couldn't find matching executor for task {}", task.getTaskId());
return;
}

LOG.debug("Found a directory {} for task {}", directory.get(), task.getTaskId());
LOG.debug("Found a directory {} and container id {} for task {}", directory.or(""), containerId.or(""), task.getTaskId());

taskManager.saveTaskDirectory(task.getTaskId(), directory.get());
if (directory.isPresent()) {
taskManager.saveTaskDirectory(task.getTaskId(), directory.get());
}
if (containerId.isPresent()) {
taskManager.saveContainerId(task.getTaskId(), containerId.get());
}

LOG.trace("Updated task {} directory in {}", task.getTaskId(), JavaUtils.duration(start));
LOG.trace("Updated task {} directory and container id in {}", task.getTaskId(), JavaUtils.duration(start));
}

@Timed
public void checkDirectory(final SingularityTaskId taskId) {
public void checkDirectoryAndContainerId(final SingularityTaskId taskId) {
final Optional<String> maybeDirectory = taskManager.getDirectory(taskId);
final Optional<String> maybeContainerId = taskManager.getContainerId(taskId);

if (maybeDirectory.isPresent()) {
LOG.debug("Already had a directory for task {}, skipping lookup", taskId);
if (maybeDirectory.isPresent() && maybeContainerId.isPresent()) {
LOG.debug("Already had a directory and container id for task {}, skipping lookup", taskId);
return;
}

final Optional<SingularityTask> task = taskManager.getTask(taskId);

if (!task.isPresent()) {
LOG.warn("No task found available for task {}, can't locate directory", taskId);
LOG.warn("No task found available for task {}, can't locate directory or container id", taskId);
return;
}

Runnable cmd = generateLookupCommand(task.get());

LOG.trace("Enqueing a request to fetch directory for task: {}, current queue size: {}", taskId, logLookupExecutorService.getQueue().size());
LOG.trace("Enqueing a request to fetch directory and container id for task: {}, current queue size: {}", taskId, logLookupExecutorService.getQueue().size());

logLookupExecutorService.submit(cmd);
}
Expand All @@ -134,9 +145,9 @@ private Runnable generateLookupCommand(final SingularityTask task) {
@Override
public void run() {
try {
loadDirectory(task);
loadDirectoryAndContainer(task);
} catch (Throwable t) {
LOG.error("While fetching directory for task: {}", task.getTaskId(), t);
LOG.error("While fetching directory and container id for task: {}", task.getTaskId(), t);
}
}
};
Expand Down
Expand Up @@ -23,7 +23,7 @@ public class SingularityMesosModule extends AbstractModule {
@Override
public void configure() {
bind(SingularityDriver.class).in(Scopes.SINGLETON);
bind(SingularityLogSupport.class).in(Scopes.SINGLETON);
bind(SingularityMesosExecutorInfoSupport.class).in(Scopes.SINGLETON);
bind(SingularityMesosScheduler.class).in(Scopes.SINGLETON);
bind(SingularityMesosFrameworkMessageHandler.class).in(Scopes.SINGLETON);
bind(SingularityMesosSchedulerDelegator.class).in(Scopes.SINGLETON);
Expand Down
Expand Up @@ -61,7 +61,7 @@ public class SingularityMesosStatusUpdateHandler implements Managed {
private final SingularityHealthchecker healthchecker;
private final SingularityNewTaskChecker newTaskChecker;
private final SingularitySlaveAndRackManager slaveAndRackManager;
private final SingularityLogSupport logSupport;
private final SingularityMesosExecutorInfoSupport logSupport;
private final SingularityScheduler scheduler;
private final Provider<SingularitySchedulerStateCache> stateCacheProvider;
private final String serverId;
Expand All @@ -80,7 +80,7 @@ public class SingularityMesosStatusUpdateHandler implements Managed {
@Inject
public SingularityMesosStatusUpdateHandler(TaskManager taskManager, DeployManager deployManager, RequestManager requestManager,
IdTranscoder<SingularityTaskId> taskIdTranscoder, SingularityExceptionNotifier exceptionNotifier, SingularityHealthchecker healthchecker,
SingularityNewTaskChecker newTaskChecker, SingularitySlaveAndRackManager slaveAndRackManager, SingularityLogSupport logSupport, SingularityScheduler scheduler,
SingularityNewTaskChecker newTaskChecker, SingularitySlaveAndRackManager slaveAndRackManager, SingularityMesosExecutorInfoSupport logSupport, SingularityScheduler scheduler,
Provider<SingularitySchedulerStateCache> stateCacheProvider, @Named(SingularityMainModule.SERVER_ID_PROPERTY) String serverId,
SchedulerDriverSupplier schedulerDriverSupplier,
@Named(SingularityMesosModule.SCHEDULER_LOCK_NAME) final Lock schedulerLock,
Expand Down Expand Up @@ -252,7 +252,7 @@ private void unsafeProcessStatusUpdate(Protos.TaskStatus status) {
new SingularityTaskHistoryUpdate(taskIdObj, timestamp, taskState, statusMessage, status.hasReason() ? Optional.of(status.getReason().name()) : Optional.<String>absent());
final SingularityCreateResult taskHistoryUpdateCreateResult = taskManager.saveTaskHistoryUpdate(taskUpdate);

logSupport.checkDirectory(taskIdObj);
logSupport.checkDirectoryAndContainerId(taskIdObj);

if (taskState.isDone()) {
healthchecker.cancelHealthcheck(taskId);
Expand Down
Expand Up @@ -39,7 +39,7 @@
import com.hubspot.singularity.data.SandboxManager.SlaveNotFoundException;
import com.hubspot.singularity.data.TaskManager;
import com.hubspot.singularity.data.history.HistoryManager;
import com.hubspot.singularity.mesos.SingularityLogSupport;
import com.hubspot.singularity.mesos.SingularityMesosExecutorInfoSupport;
import com.wordnik.swagger.annotations.Api;
import com.wordnik.swagger.annotations.ApiOperation;
import com.wordnik.swagger.annotations.ApiParam;
Expand All @@ -51,11 +51,11 @@ public class SandboxResource extends AbstractHistoryResource {
public static final String PATH = SingularityService.API_BASE_PATH + "/sandbox";

private final SandboxManager sandboxManager;
private final SingularityLogSupport logSupport;
private final SingularityMesosExecutorInfoSupport logSupport;
private final SingularityConfiguration configuration;

@Inject
public SandboxResource(HistoryManager historyManager, TaskManager taskManager, SandboxManager sandboxManager, DeployManager deployManager, SingularityLogSupport logSupport,
public SandboxResource(HistoryManager historyManager, TaskManager taskManager, SandboxManager sandboxManager, DeployManager deployManager, SingularityMesosExecutorInfoSupport logSupport,
SingularityConfiguration configuration, SingularityAuthorizationHelper authorizationHelper, Optional<SingularityUser> user) {
super(historyManager, taskManager, deployManager, authorizationHelper, user);

Expand All @@ -69,7 +69,7 @@ private SingularityTaskHistory checkHistory(String taskId) {
final SingularityTaskHistory taskHistory = getTaskHistoryRequired(taskIdObj);

if (!taskHistory.getDirectory().isPresent()) {
logSupport.checkDirectory(taskIdObj);
logSupport.checkDirectoryAndContainerId(taskIdObj);

throw badRequest("Task %s does not have a directory yet - check again soon (enqueued request to refetch)", taskId);
}
Expand Down
Expand Up @@ -93,9 +93,7 @@ public void blowDBAway() {
private SingularityTaskHistory buildTask(long launchTime) {
SingularityTask task = prepTask(request, firstDeploy, launchTime, 1);

SingularityTaskHistory taskHistory = new SingularityTaskHistory(null, Optional.<String> absent(), null, task, null, null, null);

return taskHistory;
return new SingularityTaskHistory(null, Optional.<String> absent(), Optional.<String>absent(), null, task, null, null, null);
}

private void saveTasks(int num, long launchTime) {
Expand Down