Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Block on dl only #1371

Merged
merged 5 commits into from
Jan 23, 2017
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@

import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.google.common.base.Optional;
import com.google.common.base.Throwables;
import com.google.common.collect.ImmutableList;
import com.google.common.collect.Lists;
Expand Down Expand Up @@ -66,7 +67,7 @@ public class SingularityExecutorTaskArtifactFetcher {
private final ExecutorData executorData;
private final SingularityExecutorTask task;

public SingularityExecutorTaskArtifactFetcher(ArtifactManager artifactManager, ExecutorData executorData, SingularityExecutorTask task) {
private SingularityExecutorTaskArtifactFetcher(ArtifactManager artifactManager, ExecutorData executorData, SingularityExecutorTask task) {
this.artifactManager = artifactManager;
this.executorData = executorData;
this.task = task;
Expand Down Expand Up @@ -152,7 +153,8 @@ private void downloadFilesFromLocalDownloadService(List<? extends S3Artifact> s3

for (S3Artifact s3Artifact : s3Artifacts) {
String destination = task.getArtifactPath(s3Artifact, task.getTaskDefinition().getTaskDirectoryPath()).toString();
ArtifactDownloadRequest artifactDownloadRequest = new ArtifactDownloadRequest(destination, s3Artifact);
ArtifactDownloadRequest artifactDownloadRequest = new ArtifactDownloadRequest(destination, s3Artifact,
Optional.of(SingularityExecutorArtifactFetcher.this.executorConfiguration.getLocalDownloadServiceTimeoutMillis()));

task.getLog().debug("Requesting {} from {}", artifactDownloadRequest, localDownloadUri);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,21 +4,25 @@

import com.fasterxml.jackson.annotation.JsonCreator;
import com.fasterxml.jackson.annotation.JsonProperty;
import com.google.common.base.Optional;
import com.google.common.base.Preconditions;
import com.hubspot.deploy.S3Artifact;

public class ArtifactDownloadRequest {

private final String targetDirectory;
private final S3Artifact s3Artifact;
private final Optional<Long> timeoutMillis;

@JsonCreator
public ArtifactDownloadRequest(@JsonProperty("targetDirectory") String targetDirectory, @JsonProperty("s3Artifact") S3Artifact s3Artifact) {
public ArtifactDownloadRequest(@JsonProperty("targetDirectory") String targetDirectory, @JsonProperty("s3Artifact") S3Artifact s3Artifact,
@JsonProperty("timeoutMillis") Optional<Long> timeoutMillis) {
Preconditions.checkNotNull(targetDirectory);
Preconditions.checkNotNull(s3Artifact);

this.targetDirectory = targetDirectory;
this.s3Artifact = s3Artifact;
this.timeoutMillis = timeoutMillis;

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

checkNotNull?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The JSON serializer should make any null value an Optional.

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh word 👍

}

public String getTargetDirectory() {
Expand All @@ -29,6 +33,10 @@ public S3Artifact getS3Artifact() {
return s3Artifact;
}

public Optional<Long> getTimeoutMillis() {
return timeoutMillis;
}

@Override
public int hashCode() {
return Objects.hash(targetDirectory, s3Artifact);
Expand All @@ -50,7 +58,7 @@ public boolean equals(Object o) {

@Override
public String toString() {
return "ArtifactDownloadRequest [targetDirectory=" + targetDirectory + ", s3Artifact=" + s3Artifact + "]";
return "ArtifactDownloadRequest [targetDirectory=" + targetDirectory + ", s3Artifact=" + s3Artifact + ", timeoutMillis=" + timeoutMillis + "]";
}

}
Original file line number Diff line number Diff line change
@@ -0,0 +1,7 @@
package com.hubspot.singularity.s3downloader.server;

public interface DownloadListener {

public void notifyDownloadFinished(SingularityS3DownloaderAsyncHandler handler);

}
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@

import com.codahale.metrics.Timer.Context;
import com.google.common.collect.ImmutableMap;
import com.hubspot.deploy.S3Artifact;
import com.hubspot.mesos.JavaUtils;
import com.hubspot.singularity.runner.base.sentry.SingularityRunnerExceptionNotifier;
import com.hubspot.singularity.s3.base.ArtifactDownloadRequest;
Expand All @@ -28,30 +29,40 @@ public class SingularityS3DownloaderAsyncHandler implements Runnable {
private final long start;
private final SingularityS3DownloaderMetrics metrics;
private final SingularityRunnerExceptionNotifier exceptionNotifier;
private final DownloadListener downloadListener;

public SingularityS3DownloaderAsyncHandler(ArtifactManager artifactManager, ArtifactDownloadRequest artifactDownloadRequest, Continuation continuation, SingularityS3DownloaderMetrics metrics, SingularityRunnerExceptionNotifier exceptionNotifier) {
public SingularityS3DownloaderAsyncHandler(ArtifactManager artifactManager, ArtifactDownloadRequest artifactDownloadRequest, Continuation continuation, SingularityS3DownloaderMetrics metrics,
SingularityRunnerExceptionNotifier exceptionNotifier, DownloadListener downloadListener) {
this.artifactManager = artifactManager;
this.artifactDownloadRequest = artifactDownloadRequest;
this.continuation = continuation;
this.metrics = metrics;
this.start = System.currentTimeMillis();
this.exceptionNotifier = exceptionNotifier;
this.downloadListener = downloadListener;
}

private void download() throws Exception {
public S3Artifact getS3Artifact() {
return artifactDownloadRequest.getS3Artifact();
}

private boolean download() throws Exception {
LOG.info("Beginning download {} after {}", artifactDownloadRequest, JavaUtils.duration(start));

if (continuation.isExpired()) {
LOG.info("Continuation expired for {}, aborting...", artifactDownloadRequest.getTargetDirectory());
return;
return false;
}

final Path fetched = artifactManager.fetch(artifactDownloadRequest.getS3Artifact());

downloadListener.notifyDownloadFinished(this);

final Path targetDirectory = Paths.get(artifactDownloadRequest.getTargetDirectory());

if (continuation.isExpired()) {
LOG.info("Continuation expired for {} after download, aborting...", artifactDownloadRequest.getTargetDirectory());
return;
return false;
}

if (Objects.toString(fetched.getFileName()).endsWith(".tar.gz")) {
Expand All @@ -63,6 +74,8 @@ private void download() throws Exception {
LOG.info("Finishing request {} after {}", artifactDownloadRequest.getTargetDirectory(), JavaUtils.duration(start));

getResponse().getOutputStream().close();

return true;
}

private HttpServletResponse getResponse() {
Expand All @@ -71,8 +84,13 @@ private HttpServletResponse getResponse() {

@Override
public void run() {
boolean success = false;
try (final Context context = metrics.getDownloadTimer().time()) {
download();
success = download();
if (!success) {
metrics.getServerErrorsMeter().mark();
getResponse().sendError(500, "Hit client timeout");
}
} catch (Throwable t) {
metrics.getServerErrorsMeter().mark();
LOG.error("While handling {}", artifactDownloadRequest.getTargetDirectory(), t);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,7 @@
import com.hubspot.singularity.s3downloader.config.SingularityS3DownloaderConfiguration;
import com.hubspot.singularity.s3downloader.config.SingularityS3DownloaderModule;

public class SingularityS3DownloaderCoordinator {
public class SingularityS3DownloaderCoordinator implements DownloadListener {

private static final Logger LOG = LoggerFactory.getLogger(SingularityS3DownloaderCoordinator.class);

Expand Down Expand Up @@ -66,21 +66,31 @@ public void register(final Continuation continuation, final ArtifactDownloadRequ
downloadJoinerService.submit(downloadJoiner);
}

@Override
public void notifyDownloadFinished(SingularityS3DownloaderAsyncHandler handler) {
boolean removed = downloadRequestToHandler.remove(handler.getS3Artifact(), handler);
LOG.debug("Handler for artifact {} finished download - removed {}", handler.getS3Artifact(), removed);
}

private class DownloadJoiner implements Runnable {

private final long start;
private final Continuation continuation;
private final ArtifactDownloadRequest artifactDownloadRequest;

public DownloadJoiner(Continuation continuation, ArtifactDownloadRequest artifactDownloadRequest) {
private DownloadJoiner(Continuation continuation, ArtifactDownloadRequest artifactDownloadRequest) {
this.continuation = continuation;
this.artifactDownloadRequest = artifactDownloadRequest;
this.start = System.currentTimeMillis();
}

private void reEnqueue() {
LOG.debug("Re-enqueueing request for {}, waiting {}, ({} active, {} queue, {} max), total time {}", artifactDownloadRequest.getTargetDirectory(), JavaUtils.durationFromMillis(configuration.getMillisToWaitForReEnqueue()),
downloadJoinerService.getActiveCount(), downloadJoinerService.getQueue().size(), configuration.getNumEnqueueThreads(), JavaUtils.duration(start));
LOG.debug("Re-enqueueing request for {}, waiting {}, ({} active, {} queue, {} max), total time {}", artifactDownloadRequest.getTargetDirectory(),
JavaUtils.durationFromMillis(configuration.getMillisToWaitForReEnqueue()),
downloadJoinerService.getActiveCount(),
downloadJoinerService.getQueue().size(),
configuration.getNumEnqueueThreads(),
JavaUtils.duration(start));

downloadJoinerService.schedule(this, configuration.getMillisToWaitForReEnqueue(), TimeUnit.MILLISECONDS);
}
Expand All @@ -92,7 +102,8 @@ private boolean addDownloadRequest() {
return false;
}

SingularityS3DownloaderAsyncHandler newHandler = new SingularityS3DownloaderAsyncHandler(artifactManagerProvider.get(), artifactDownloadRequest, continuation, metrics, exceptionNotifier);
final SingularityS3DownloaderAsyncHandler newHandler = new SingularityS3DownloaderAsyncHandler(artifactManagerProvider.get(), artifactDownloadRequest, continuation, metrics, exceptionNotifier,
SingularityS3DownloaderCoordinator.this);

existingHandler = downloadRequestToHandler.putIfAbsent(artifactDownloadRequest.getS3Artifact(), newHandler);

Expand All @@ -108,17 +119,32 @@ private boolean addDownloadRequest() {
future.addListener(new Runnable() {
@Override
public void run() {
downloadRequestToHandler.remove(artifactDownloadRequest.getS3Artifact());
notifyDownloadFinished(newHandler);
}
}, listeningResponseExecutorService);

return true;
}

private void handleContinuationExpired() {
try {
LOG.info("Continuation expired for {} after {} - returning 500", artifactDownloadRequest, JavaUtils.duration(start));
((HttpServletResponse) continuation.getServletResponse()).sendError(500, "Hit client timeout");
} catch (Throwable t) {
LOG.warn("{} while sending error after continuation for {}", t.getClass().getSimpleName(), artifactDownloadRequest.getTargetDirectory());
} finally {
continuation.complete();
}
}

@Override
public void run() {
try {
if (!addDownloadRequest()) {
if (continuation.isExpired()) {
handleContinuationExpired();
return;
}
reEnqueue();
}
} catch (Throwable t) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -63,6 +63,9 @@ public void handle(String target, Request baseRequest, HttpServletRequest reques

Continuation continuation = ContinuationSupport.getContinuation(request);
continuation.suspend(response);
if (artifactOptional.get().getTimeoutMillis().isPresent()) {
continuation.setTimeout(artifactOptional.get().getTimeoutMillis().get());
}

downloaderCoordinator.register(continuation, artifactOptional.get());
}
Expand Down