Skip to content

Commit

Permalink
Implement S3 attachment service
Browse files Browse the repository at this point in the history
 - Attachment service uploads attachments received via API to S3 so they are available for agent execution everywhere
 - Move use of attachment service out of persistence service
 - Update JobSubmission to include URIs of attachments
 - Provide a local filesystem variant of the service for integration tests
  • Loading branch information
mprimi committed Sep 15, 2020
1 parent 563f05e commit dfcbc2b
Show file tree
Hide file tree
Showing 31 changed files with 1,093 additions and 117 deletions.
25 changes: 25 additions & 0 deletions genie-docs/src/docs/asciidoc/_metrics.adoc
Expand Up @@ -197,6 +197,31 @@ It will have to be added if one is desired otherwise metrics are just published
|ArchiveStatusCleanupTask
|status, exceptionClass

|genie.jobs.attachments.s3.count.distribution
|Distribution summary of the number of files attached
|count
|S3AttachmentServiceImpl
|

|genie.jobs.attachments.s3.largestSize.distribution
|Distribution summary of the size of the largest file attached
|bytes
|S3AttachmentServiceImpl
|

|genie.jobs.attachments.s3.totalSize.distribution
|Distribution summary of the total size of the files attached
|bytes
|S3AttachmentServiceImpl
|

|genie.jobs.attachments.s3.upload.timer
|genie.jobs.attachments.s3.upload.timer
|Time taken to upload job attachments to S3 (only measured for jobs with attachments)
|nanoseconds
|S3AttachmentServiceImpl
|status, exceptionClass

|genie.jobs.clusters.selectors.script.select.timer
|Time taken by the loaded script to select a cluster among the one passed as input
|nanoseconds
Expand Down
17 changes: 16 additions & 1 deletion genie-docs/src/docs/asciidoc/_properties.adoc
Expand Up @@ -230,6 +230,21 @@ Health of the system is marked unhealthy if the CPU load of a system goes beyond
|null
|yes

|genie.jobs.attachments.location-prefix
|Common prefix where attachments are stored
|s3://genie/attachments
|no

|genie.jobs.attachments.max-size
|Maximum size of an attachment
|100MB
|no

|genie.jobs.attachments.max-total-size
|Maximum size of all attachments combined (Spring and Tomcat may also independently limit the size of upload)
|150MB
|no

|genie.jobs.cleanup.deleteDependencies
|Whether or not to delete the dependencies directories for applications, cluster, command to save disk space after job completion
|true
Expand Down Expand Up @@ -293,7 +308,7 @@ cases is specified as part of the `Command` entity for a particular job.

|genie.jobs.locations.attachments
|The default root location where job attachments will be temporarily stored. Scheme should be included. Created if
doesn't exist.
doesn't exist (deprecated, see genie.jobs.attachments.* properties)
|file://${java.io.tmpdir}genie/attachments/
|no

Expand Down
Expand Up @@ -64,7 +64,6 @@
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.mockito.Mockito;
import org.springframework.core.io.FileSystemResource;
import org.springframework.core.io.Resource;
import org.springframework.data.domain.Page;
Expand Down Expand Up @@ -373,13 +372,9 @@ void canSaveAndVerifyJobSubmissionWithAttachments(@TempDir final Path tempDir) t
totalAttachmentSize
);

Mockito
.when(this.legacyAttachmentService.saveAttachments(Mockito.anyString(), Mockito.anySet()))
.thenReturn(attachmentURIs);

// Save the job submission
final String id = this.service.saveJobSubmission(
new JobSubmission.Builder(jobRequest, jobRequestMetadata).withAttachments(attachments).build()
new JobSubmission.Builder(jobRequest, jobRequestMetadata).withAttachments(attachmentURIs).build()
);

// Going to assume that most other verification of parameters other than attachments is done in
Expand Down
Expand Up @@ -29,7 +29,6 @@
import com.netflix.genie.common.internal.exceptions.unchecked.GenieIdAlreadyExistsException;
import com.netflix.genie.common.internal.exceptions.unchecked.GenieJobNotFoundException;
import com.netflix.genie.common.internal.exceptions.unchecked.GenieJobSpecificationNotFoundException;
import com.netflix.genie.common.internal.exceptions.unchecked.GenieRuntimeException;
import com.netflix.genie.web.agent.inspectors.InspectionReport;
import com.netflix.genie.web.agent.services.AgentConfigurationService;
import com.netflix.genie.web.agent.services.AgentFilterService;
Expand All @@ -40,7 +39,6 @@
import com.netflix.genie.web.dtos.ResolvedJob;
import com.netflix.genie.web.exceptions.checked.IdAlreadyExistsException;
import com.netflix.genie.web.exceptions.checked.NotFoundException;
import com.netflix.genie.web.exceptions.checked.SaveAttachmentException;
import com.netflix.genie.web.services.JobResolverService;
import com.netflix.genie.web.util.MetricsUtils;
import io.micrometer.core.instrument.MeterRegistry;
Expand Down Expand Up @@ -169,9 +167,6 @@ public String reserveJobId(
} catch (final IdAlreadyExistsException e) {
// TODO: How to handle this?
throw new GenieIdAlreadyExistsException(e);
} catch (final SaveAttachmentException e) {
// this really shouldn't happen as there are no attachments with an agent cli job
throw new GenieRuntimeException(e);
}
}

Expand Down
Expand Up @@ -30,6 +30,7 @@
import com.netflix.genie.common.internal.exceptions.unchecked.GenieJobNotFoundException;
import com.netflix.genie.common.internal.exceptions.unchecked.GenieJobSpecificationNotFoundException;
import com.netflix.genie.common.internal.exceptions.unchecked.GenieRuntimeException;
import com.netflix.genie.web.exceptions.checked.AttachmentTooLargeException;
import com.netflix.genie.web.exceptions.checked.IdAlreadyExistsException;
import com.netflix.genie.web.exceptions.checked.JobNotFoundException;
import com.netflix.genie.web.exceptions.checked.NotFoundException;
Expand Down Expand Up @@ -135,6 +136,8 @@ public ResponseEntity<GenieCheckedException> handleGenieCheckedException(final G
return new ResponseEntity<>(e, HttpStatus.NOT_FOUND);
} else if (e instanceof PreconditionFailedException) {
return new ResponseEntity<>(e, HttpStatus.BAD_REQUEST);
} else if (e instanceof AttachmentTooLargeException) {
return new ResponseEntity<>(e, HttpStatus.PAYLOAD_TOO_LARGE);
} else {
return new ResponseEntity<>(e, HttpStatus.INTERNAL_SERVER_ERROR);
}
Expand Down
Expand Up @@ -57,10 +57,11 @@
import com.netflix.genie.web.dtos.JobSubmission;
import com.netflix.genie.web.exceptions.checked.NotFoundException;
import com.netflix.genie.web.properties.JobsProperties;
import com.netflix.genie.web.services.LegacyAttachmentService;
import com.netflix.genie.web.services.AttachmentService;
import com.netflix.genie.web.services.JobCoordinatorService;
import com.netflix.genie.web.services.JobDirectoryServerService;
import com.netflix.genie.web.services.JobLaunchService;
import com.netflix.genie.web.services.LegacyAttachmentService;
import com.netflix.genie.web.util.JobExecutionModeSelector;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
Expand Down Expand Up @@ -157,6 +158,7 @@ public class JobRestController {
private final AgentRoutingService agentRoutingService;
private final PersistenceService persistenceService;
private final Environment environment;
private final AttachmentService attachmentService;

// TODO: V3 Execution only
private final LegacyAttachmentService legacyAttachmentService;
Expand All @@ -168,7 +170,6 @@ public class JobRestController {

/**
* Constructor.
*
* @param jobLaunchService The {@link JobLaunchService} implementation to use
* @param dataServices The {@link DataServices} instance to use
* @param jobCoordinatorService The job coordinator service to use.
Expand All @@ -180,6 +181,7 @@ public class JobRestController {
* @param registry The metrics registry to use
* @param agentRoutingService Agent routing service
* @param environment The application environment to pull dynamic properties from
* @param attachmentService The attachment service to use to save attachments.
* @param legacyAttachmentService The attachment service to use to save attachments.
* @param jobExecutionModeSelector The execution mode (agent vs. embedded) mode selector
*/
Expand All @@ -197,6 +199,7 @@ public JobRestController(
final MeterRegistry registry,
final AgentRoutingService agentRoutingService,
final Environment environment,
final AttachmentService attachmentService,
final LegacyAttachmentService legacyAttachmentService,
final JobExecutionModeSelector jobExecutionModeSelector
) {
Expand All @@ -217,6 +220,7 @@ public JobRestController(
this.agentRoutingService = agentRoutingService;
this.persistenceService = dataServices.getPersistenceService();
this.environment = environment;
this.attachmentService = attachmentService;

// TODO: V3 Only. Remove.
this.legacyAttachmentService = legacyAttachmentService;
Expand Down Expand Up @@ -946,10 +950,13 @@ private String agentExecution(

if (attachments != null) {
jobSubmissionBuilder.withAttachments(
Arrays
.stream(attachments)
.map(MultipartFile::getResource)
.collect(Collectors.toSet())
this.attachmentService.saveAttachments(
jobRequest.getId().orElse(null),
Arrays
.stream(attachments)
.map(MultipartFile::getResource)
.collect(Collectors.toSet())
)
);
}

Expand Down
Expand Up @@ -48,7 +48,6 @@
import com.netflix.genie.web.exceptions.checked.IdAlreadyExistsException;
import com.netflix.genie.web.exceptions.checked.NotFoundException;
import com.netflix.genie.web.exceptions.checked.PreconditionFailedException;
import com.netflix.genie.web.exceptions.checked.SaveAttachmentException;
import com.netflix.genie.web.services.LegacyAttachmentService;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
Expand Down Expand Up @@ -726,14 +725,9 @@ long deleteJobsCreatedBefore(
* either via the API or via the agent CLI
* @return The unique id of the job within the Genie ecosystem
* @throws IdAlreadyExistsException If the id the user requested already exists in the system for another job
* @throws SaveAttachmentException If attachments were sent in with the job submission and they were unable to be
* persisted to an underlying storage mechanism meant to share the data with an
* agent process
*/
@Nonnull
String saveJobSubmission(@Valid JobSubmission jobSubmission) throws
IdAlreadyExistsException,
SaveAttachmentException;
String saveJobSubmission(@Valid JobSubmission jobSubmission) throws IdAlreadyExistsException;

/**
* Get the original request for a job.
Expand Down
Expand Up @@ -105,8 +105,6 @@
import com.netflix.genie.web.exceptions.checked.IdAlreadyExistsException;
import com.netflix.genie.web.exceptions.checked.NotFoundException;
import com.netflix.genie.web.exceptions.checked.PreconditionFailedException;
import com.netflix.genie.web.exceptions.checked.SaveAttachmentException;
import com.netflix.genie.web.services.LegacyAttachmentService;
import edu.umd.cs.findbugs.annotations.SuppressFBWarnings;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
Expand Down Expand Up @@ -208,21 +206,14 @@ public class JpaPersistenceServiceImpl implements PersistenceService {
private final JpaJobRepository jobRepository;
private final JpaTagRepository tagRepository;

// TODO: Maybe this should be moved to a higher place, job resolver? Not sure persistence tier is proper place for
// saving attachments?
private final LegacyAttachmentService legacyAttachmentService;

/**
* Constructor.
*
* @param entityManager The {@link EntityManager} to use
* @param entityManager The {@link EntityManager} to use
* @param jpaRepositories All the repositories in the Genie application
* @param legacyAttachmentService The {@link LegacyAttachmentService} implementation to use
*/
public JpaPersistenceServiceImpl(
final EntityManager entityManager,
final JpaRepositories jpaRepositories,
final LegacyAttachmentService legacyAttachmentService
final JpaRepositories jpaRepositories
) {
this.entityManager = entityManager;
this.agentConnectionRepository = jpaRepositories.getAgentConnectionRepository();
Expand All @@ -233,7 +224,6 @@ public JpaPersistenceServiceImpl(
this.fileRepository = jpaRepositories.getFileRepository();
this.jobRepository = jpaRepositories.getJobRepository();
this.tagRepository = jpaRepositories.getTagRepository();
this.legacyAttachmentService = legacyAttachmentService;
}

//region Application APIs
Expand Down Expand Up @@ -1686,7 +1676,7 @@ public long deleteJobsCreatedBefore(
@Nonnull
public String saveJobSubmission(
@Valid final JobSubmission jobSubmission
) throws IdAlreadyExistsException, SaveAttachmentException {
) throws IdAlreadyExistsException {
log.debug("[saveJobSubmission] Attempting to save job submission {}", jobSubmission);
// TODO: Metrics
final JobEntity jobEntity = new JobEntity();
Expand All @@ -1698,20 +1688,14 @@ public String saveJobSubmission(
// Create the unique id if one doesn't already exist
this.setUniqueId(jobEntity, jobRequest.getRequestedId().orElse(null));

// Do we have attachments? Save them so the agent can access them later.
final Set<URI> attachmentURIs = this.legacyAttachmentService.saveAttachments(
jobEntity.getUniqueId(),
jobSubmission.getAttachments()
);

jobEntity.setCommandArgs(jobRequest.getCommandArgs());

this.setJobMetadataFields(
jobEntity,
jobRequest.getMetadata(),
jobRequest.getResources().getSetupFile().orElse(null)
);
this.setJobExecutionEnvironmentFields(jobEntity, jobRequest.getResources(), attachmentURIs);
this.setJobExecutionEnvironmentFields(jobEntity, jobRequest.getResources(), jobSubmission.getAttachments());
this.setExecutionResourceCriteriaFields(jobEntity, jobRequest.getCriteria());
this.setRequestedJobEnvironmentFields(jobEntity, jobRequest.getRequestedJobEnvironment());
this.setRequestedAgentConfigFields(jobEntity, jobRequest.getRequestedAgentConfig());
Expand Down
Expand Up @@ -24,11 +24,11 @@
import lombok.EqualsAndHashCode;
import lombok.Getter;
import lombok.ToString;
import org.springframework.core.io.Resource;

import javax.annotation.Nullable;
import javax.validation.Valid;
import javax.validation.constraints.NotNull;
import java.net.URI;
import java.util.Arrays;
import java.util.Collection;
import java.util.Set;
Expand All @@ -40,22 +40,8 @@
* @since 4.0.0
*/
@Getter
@EqualsAndHashCode(
doNotUseGetters = true,
of = {
// Exclude the attachments to save on scanning bytes
"jobRequest",
"jobRequestMetadata"
}
)
@ToString(
doNotUseGetters = true,
of = {
// Exclude the attachments to save on scanning bytes
"jobRequest",
"jobRequestMetadata"
}
)
@EqualsAndHashCode(doNotUseGetters = true)
@ToString(doNotUseGetters = true)
@SuppressWarnings("FinalClass")
public class JobSubmission {

Expand All @@ -66,7 +52,7 @@ public class JobSubmission {
@Valid
private final JobRequestMetadata jobRequestMetadata;
@NotNull
private final Set<Resource> attachments;
private final Set<URI> attachments;

private JobSubmission(final Builder builder) {
this.jobRequest = builder.bJobRequest;
Expand All @@ -83,7 +69,7 @@ private JobSubmission(final Builder builder) {
public static class Builder {
private final JobRequest bJobRequest;
private final JobRequestMetadata bJobRequestMetadata;
private final Set<Resource> bAttachments;
private final Set<URI> bAttachments;

/**
* Constructor with required parameters.
Expand All @@ -100,21 +86,21 @@ public Builder(final JobRequest jobRequest, final JobRequestMetadata jobRequestM
/**
* Set the attachments associated with this submission if there were any.
*
* @param attachments The attachments as {@link Resource} instances
* @param attachments The attachments {@link URI}s
* @return the builder
*/
public Builder withAttachments(@Nullable final Set<Resource> attachments) {
public Builder withAttachments(@Nullable final Set<URI> attachments) {
this.setAttachments(attachments);
return this;
}

/**
* Set the attachments associated with this submission.
*
* @param attachments The attachments as {@link Resource} instances
* @param attachments The attachments as {@link URI}s
* @return the builder
*/
public Builder withAttachments(final Resource... attachments) {
public Builder withAttachments(final URI... attachments) {
this.setAttachments(Arrays.asList(attachments));
return this;
}
Expand All @@ -128,7 +114,7 @@ public JobSubmission build() {
return new JobSubmission(this);
}

private void setAttachments(@Nullable final Collection<Resource> attachments) {
private void setAttachments(@Nullable final Collection<URI> attachments) {
this.bAttachments.clear();
if (attachments != null) {
this.bAttachments.addAll(attachments);
Expand Down

0 comments on commit dfcbc2b

Please sign in to comment.