Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -199,7 +199,7 @@ private Collection<String> getS3PrefixesForDeploy(S3Configuration s3Configuratio
return prefixes;
}

private List<SingularityS3Log> getS3LogsWithExecutorService(S3Configuration s3Configuration, Optional<String> group, ListeningExecutorService executorService, Collection<String> prefixes) throws InterruptedException, ExecutionException, TimeoutException {
private List<SingularityS3Log> getS3LogsWithExecutorService(S3Configuration s3Configuration, Optional<String> group, ListeningExecutorService executorService, Collection<String> prefixes, final boolean excludeMetadata) throws InterruptedException, ExecutionException, TimeoutException {
List<ListenableFuture<S3Object[]>> futures = Lists.newArrayListWithCapacity(prefixes.size());

final String s3Bucket = (group.isPresent() && s3Configuration.getGroupOverrides().containsKey(group.get())) ? s3Configuration.getGroupOverrides().get(group.get()).getS3Bucket() : s3Configuration.getS3Bucket();
Expand Down Expand Up @@ -240,9 +240,13 @@ public SingularityS3Log call() throws Exception {
String getUrl = s3Service.createSignedGetUrl(s3Bucket, s3Object.getKey(), expireAt);
String downloadUrl = s3Service.createSignedUrl("GET", s3Bucket, s3Object.getKey(), FORCE_DOWNLOAD_S3_PARAMS, null, expireAt.getTime() / 1000, false);

Map<String, Object> objectMetadata = s3Service.getObjectDetails(s3Bucket, s3Object.getKey()).getMetadataMap();
Optional<Long> maybeStartTime = getMetadataAsLong(objectMetadata, SingularityS3Log.LOG_START_S3_ATTR);
Optional<Long> maybeEndTime = getMetadataAsLong(objectMetadata, SingularityS3Log.LOG_END_S3_ATTR);
Optional<Long> maybeStartTime = Optional.absent();
Optional<Long> maybeEndTime = Optional.absent();
if (!excludeMetadata) {
Map<String, Object> objectMetadata = s3Service.getObjectDetails(s3Bucket, s3Object.getKey()).getMetadataMap();
maybeStartTime = getMetadataAsLong(objectMetadata, SingularityS3Log.LOG_START_S3_ATTR);
maybeEndTime = getMetadataAsLong(objectMetadata, SingularityS3Log.LOG_END_S3_ATTR);
}

return new SingularityS3Log(getUrl, s3Object.getKey(), s3Object.getLastModifiedDate().getTime(), s3Object.getContentLength(), downloadUrl, maybeStartTime, maybeEndTime);
}
Expand Down Expand Up @@ -270,7 +274,7 @@ private Optional<Long> getMetadataAsLong(Map<String, Object> objectMetadata, Str
}
}

private List<SingularityS3Log> getS3Logs(S3Configuration s3Configuration, Optional<String> group, Collection<String> prefixes) throws InterruptedException, ExecutionException, TimeoutException {
private List<SingularityS3Log> getS3Logs(S3Configuration s3Configuration, Optional<String> group, Collection<String> prefixes, boolean excldueMetadata) throws InterruptedException, ExecutionException, TimeoutException {
if (prefixes.isEmpty()) {
return Collections.emptyList();
}
Expand All @@ -279,7 +283,7 @@ private List<SingularityS3Log> getS3Logs(S3Configuration s3Configuration, Option
new ThreadFactoryBuilder().setNameFormat("S3LogFetcher-%d").build()));

try {
List<SingularityS3Log> logs = Lists.newArrayList(getS3LogsWithExecutorService(s3Configuration, group, executorService, prefixes));
List<SingularityS3Log> logs = Lists.newArrayList(getS3LogsWithExecutorService(s3Configuration, group, executorService, prefixes, excldueMetadata));
Collections.sort(logs, LOG_COMPARATOR);
return logs;
} finally {
Expand Down Expand Up @@ -327,13 +331,14 @@ private Optional<String> getRequestGroup(final String requestId) {
public List<SingularityS3Log> getS3LogsForTask(
@ApiParam("The task ID to search for") @PathParam("taskId") String taskId,
@ApiParam("Start timestamp (millis, 13 digit)") @QueryParam("start") Optional<Long> start,
@ApiParam("End timestamp (mills, 13 digit)") @QueryParam("end") Optional<Long> end) throws Exception {
@ApiParam("End timestamp (mills, 13 digit)") @QueryParam("end") Optional<Long> end,
@ApiParam("Exclude custom object metadata") @QueryParam("excludeMetadata") Optional<Boolean> excludeMetadata) throws Exception {
checkS3();

SingularityTaskId taskIdObject = getTaskIdObject(taskId);

try {
return getS3Logs(configuration.get(), getRequestGroupForTask(taskIdObject), getS3PrefixesForTask(configuration.get(), taskIdObject, start, end));
return getS3Logs(configuration.get(), getRequestGroupForTask(taskIdObject), getS3PrefixesForTask(configuration.get(), taskIdObject, start, end), excludeMetadata.or(false));
} catch (TimeoutException te) {
throw timeout("Timed out waiting for response from S3 for %s", taskId);
} catch (Throwable t) {
Expand All @@ -347,11 +352,12 @@ public List<SingularityS3Log> getS3LogsForTask(
public List<SingularityS3Log> getS3LogsForRequest(
@ApiParam("The request ID to search for") @PathParam("requestId") String requestId,
@ApiParam("Start timestamp (millis, 13 digit)") @QueryParam("start") Optional<Long> start,
@ApiParam("End timestamp (mills, 13 digit)") @QueryParam("end") Optional<Long> end) throws Exception {
@ApiParam("End timestamp (mills, 13 digit)") @QueryParam("end") Optional<Long> end,
@ApiParam("Exclude custom object metadata") @QueryParam("excludeMetadata") Optional<Boolean> excludeMetadata) throws Exception {
checkS3();

try {
return getS3Logs(configuration.get(), getRequestGroup(requestId), getS3PrefixesForRequest(configuration.get(), requestId, start, end));
return getS3Logs(configuration.get(), getRequestGroup(requestId), getS3PrefixesForRequest(configuration.get(), requestId, start, end), excludeMetadata.or(false));
} catch (TimeoutException te) {
throw timeout("Timed out waiting for response from S3 for %s", requestId);
} catch (Throwable t) {
Expand All @@ -366,11 +372,12 @@ public List<SingularityS3Log> getS3LogsForDeploy(
@ApiParam("The request ID to search for") @PathParam("requestId") String requestId,
@ApiParam("The deploy ID to search for") @PathParam("deployId") String deployId,
@ApiParam("Start timestamp (millis, 13 digit)") @QueryParam("start") Optional<Long> start,
@ApiParam("End timestamp (mills, 13 digit)") @QueryParam("end") Optional<Long> end) throws Exception {
@ApiParam("End timestamp (mills, 13 digit)") @QueryParam("end") Optional<Long> end,
@ApiParam("Exclude custom object metadata") @QueryParam("excludeMetadata") Optional<Boolean> excludeMetadata) throws Exception {
checkS3();

try {
return getS3Logs(configuration.get(), getRequestGroup(requestId), getS3PrefixesForDeploy(configuration.get(), requestId, deployId, start, end));
return getS3Logs(configuration.get(), getRequestGroup(requestId), getS3PrefixesForDeploy(configuration.get(), requestId, deployId, start, end), excludeMetadata.or(false));
} catch (TimeoutException te) {
throw timeout("Timed out waiting for response from S3 for %s-%s", requestId, deployId);
} catch (Throwable t) {
Expand Down