Skip to content

Commit

Permalink
Merge 4fc5af6 into 75cfed7
Browse files Browse the repository at this point in the history
  • Loading branch information
mprimi committed Sep 16, 2020
2 parents 75cfed7 + 4fc5af6 commit 3f5c8fa
Show file tree
Hide file tree
Showing 42 changed files with 1,399 additions and 256 deletions.
25 changes: 25 additions & 0 deletions genie-docs/src/docs/asciidoc/_metrics.adoc
Expand Up @@ -197,6 +197,31 @@ It will have to be added if one is desired otherwise metrics are just published
|ArchiveStatusCleanupTask
|status, exceptionClass

|genie.jobs.attachments.s3.count.distribution
|Distribution summary of the number of files attached
|count
|S3AttachmentServiceImpl
|

|genie.jobs.attachments.s3.largestSize.distribution
|Distribution summary of the size of the largest file attached
|bytes
|S3AttachmentServiceImpl
|

|genie.jobs.attachments.s3.totalSize.distribution
|Distribution summary of the total size of the files attached
|bytes
|S3AttachmentServiceImpl
|

|genie.jobs.attachments.s3.upload.timer
|genie.jobs.attachments.s3.upload.timer
|Time taken to upload job attachments to S3 (only measured for jobs with attachments)
|nanoseconds
|S3AttachmentServiceImpl
|status, exceptionClass

|genie.jobs.clusters.selectors.script.select.timer
|Time taken by the loaded script to select a cluster among the one passed as input
|nanoseconds
Expand Down
27 changes: 26 additions & 1 deletion genie-docs/src/docs/asciidoc/_properties.adoc
Expand Up @@ -17,6 +17,16 @@ Whereas static properties values are bound during application startup and do not
|===
|Property |Description |Default Value |Dynamic

|genie.agent.connection-tracking.cleanup-interval
|Interval at which the cleanup task runs
|2s
|no

|genie.agent.connection-tracking.connection-expiration-period
|How long after the last heartbeat an agent connection is marked expired
|10s
|no

|genie.agent.configuration.agent-properties-filter-pattern
|Regular expression applied to filter server properties that are forwarded to the agent
|^genie\.agent\.runtime\..*
Expand Down Expand Up @@ -230,6 +240,21 @@ Health of the system is marked unhealthy if the CPU load of a system goes beyond
|null
|yes

|genie.jobs.attachments.location-prefix
|Common prefix where attachments are stored
|s3://genie/attachments
|no

|genie.jobs.attachments.max-size
|Maximum size of an attachment
|100MB
|no

|genie.jobs.attachments.max-total-size
|Maximum size of all attachments combined (Spring and Tomcat may also independently limit the size of upload)
|150MB
|no

|genie.jobs.cleanup.deleteDependencies
|Whether or not to delete the dependencies directories for applications, cluster, command to save disk space after job completion
|true
Expand Down Expand Up @@ -293,7 +318,7 @@ cases is specified as part of the `Command` entity for a particular job.

|genie.jobs.locations.attachments
|The default root location where job attachments will be temporarily stored. Scheme should be included. Created if
doesn't exist.
doesn't exist (deprecated, see genie.jobs.attachments.* properties)
|file://${java.io.tmpdir}genie/attachments/
|no

Expand Down
Expand Up @@ -64,7 +64,6 @@
import org.assertj.core.api.Assertions;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;
import org.mockito.Mockito;
import org.springframework.core.io.FileSystemResource;
import org.springframework.core.io.Resource;
import org.springframework.data.domain.Page;
Expand Down Expand Up @@ -373,13 +372,9 @@ void canSaveAndVerifyJobSubmissionWithAttachments(@TempDir final Path tempDir) t
totalAttachmentSize
);

Mockito
.when(this.attachmentService.saveAttachments(Mockito.anyString(), Mockito.anySet()))
.thenReturn(attachmentURIs);

// Save the job submission
final String id = this.service.saveJobSubmission(
new JobSubmission.Builder(jobRequest, jobRequestMetadata).withAttachments(attachments).build()
new JobSubmission.Builder(jobRequest, jobRequestMetadata).withAttachments(attachmentURIs).build()
);

// Going to assume that most other verification of parameters other than attachments is done in
Expand Down
Expand Up @@ -27,7 +27,7 @@
import com.netflix.genie.web.data.services.impl.jpa.repositories.JpaFileRepository;
import com.netflix.genie.web.data.services.impl.jpa.repositories.JpaJobRepository;
import com.netflix.genie.web.data.services.impl.jpa.repositories.JpaTagRepository;
import com.netflix.genie.web.services.AttachmentService;
import com.netflix.genie.web.services.LegacyAttachmentService;
import com.netflix.genie.web.spring.autoconfigure.ValidationAutoConfiguration;
import com.netflix.genie.web.spring.autoconfigure.data.DataAutoConfiguration;
import org.junit.jupiter.api.AfterEach;
Expand Down Expand Up @@ -61,7 +61,7 @@
)
@MockBean(
{
AttachmentService.class,
LegacyAttachmentService.class,
PersistedJobStatusObserver.class //TODO: Needed for JobEntityListener but should be in DataAutoConfiguration
}
)
Expand Down Expand Up @@ -105,14 +105,14 @@ class JpaPersistenceServiceIntegrationTestBase {
protected PersistedJobStatusObserver persistedJobStatusObserver;

@Autowired
protected AttachmentService attachmentService;
protected LegacyAttachmentService legacyAttachmentService;

@Autowired
protected TestEntityManager entityManager;

@AfterEach
void resetMocks() {
// Could use @DirtiesContext but seems excessive
Mockito.reset(this.persistedJobStatusObserver, this.attachmentService);
Mockito.reset(this.persistedJobStatusObserver, this.legacyAttachmentService);
}
}
Expand Up @@ -23,6 +23,7 @@
import com.google.common.collect.Sets;
import com.netflix.genie.web.agent.services.AgentConnectionTrackingService;
import com.netflix.genie.web.agent.services.AgentRoutingService;
import com.netflix.genie.web.properties.AgentConnectionTrackingServiceProperties;
import lombok.extern.slf4j.Slf4j;
import org.springframework.boot.actuate.info.Info;
import org.springframework.boot.actuate.info.InfoContributor;
Expand All @@ -46,38 +47,40 @@
@Slf4j
public class AgentConnectionTrackingServiceImpl implements AgentConnectionTrackingService, InfoContributor {

// Minimum time from the last heartbeat before a stream is considered expired
protected static final int STREAM_EXPIRATION_PERIOD = 10_000; // TODO: Make configurable
protected static final int CLEANUP_TASK_PERIOD = 2_000; // TODO: Make configurable
private final AgentRoutingService agentRoutingService;
private final TaskScheduler taskScheduler;
private final ConcurrentMap<String, JobStreamsRecord> jobStreamRecordsMap = Maps.newConcurrentMap();
private final AgentConnectionTrackingServiceProperties serviceProperties;
private final Supplier<Instant> timeSupplier;

/**
* Constructor.
*
* @param agentRoutingService the agent routing service
* @param taskScheduler the task scheduler
* @param serviceProperties the service properties
*/
public AgentConnectionTrackingServiceImpl(
final AgentRoutingService agentRoutingService,
final TaskScheduler taskScheduler
final TaskScheduler taskScheduler,
final AgentConnectionTrackingServiceProperties serviceProperties
) {
this(agentRoutingService, taskScheduler, Instant::now);
this(agentRoutingService, taskScheduler, serviceProperties, Instant::now);
}

@VisibleForTesting
AgentConnectionTrackingServiceImpl(
final AgentRoutingService agentRoutingService,
final TaskScheduler taskScheduler,
final AgentConnectionTrackingServiceProperties serviceProperties,
final Supplier<Instant> timeSupplier
) {
this.agentRoutingService = agentRoutingService;
this.taskScheduler = taskScheduler;
this.serviceProperties = serviceProperties;
this.timeSupplier = timeSupplier;

this.taskScheduler.scheduleAtFixedRate(this::cleanupTask, CLEANUP_TASK_PERIOD);
this.taskScheduler.scheduleAtFixedRate(this::cleanupTask, this.serviceProperties.getCleanupInterval());
}

/**
Expand Down Expand Up @@ -179,7 +182,7 @@ private void removeExpiredAgents() {
}

private void removeExpiredStreams() {
final Instant cutoff = this.timeSupplier.get().minusMillis(STREAM_EXPIRATION_PERIOD);
final Instant cutoff = this.timeSupplier.get().minus(serviceProperties.getConnectionExpirationPeriod());
// Purge streams that did not send a heartbeat more recently than cutoff
this.jobStreamRecordsMap.forEach(
(jobId, record) -> record.expungeExpiredStreams(cutoff)
Expand Down
Expand Up @@ -29,7 +29,6 @@
import com.netflix.genie.common.internal.exceptions.unchecked.GenieIdAlreadyExistsException;
import com.netflix.genie.common.internal.exceptions.unchecked.GenieJobNotFoundException;
import com.netflix.genie.common.internal.exceptions.unchecked.GenieJobSpecificationNotFoundException;
import com.netflix.genie.common.internal.exceptions.unchecked.GenieRuntimeException;
import com.netflix.genie.web.agent.inspectors.InspectionReport;
import com.netflix.genie.web.agent.services.AgentConfigurationService;
import com.netflix.genie.web.agent.services.AgentFilterService;
Expand All @@ -40,7 +39,6 @@
import com.netflix.genie.web.dtos.ResolvedJob;
import com.netflix.genie.web.exceptions.checked.IdAlreadyExistsException;
import com.netflix.genie.web.exceptions.checked.NotFoundException;
import com.netflix.genie.web.exceptions.checked.SaveAttachmentException;
import com.netflix.genie.web.services.JobResolverService;
import com.netflix.genie.web.util.MetricsUtils;
import io.micrometer.core.instrument.MeterRegistry;
Expand Down Expand Up @@ -169,9 +167,6 @@ public String reserveJobId(
} catch (final IdAlreadyExistsException e) {
// TODO: How to handle this?
throw new GenieIdAlreadyExistsException(e);
} catch (final SaveAttachmentException e) {
// this really shouldn't happen as there are no attachments with an agent cli job
throw new GenieRuntimeException(e);
}
}

Expand Down
Expand Up @@ -30,6 +30,7 @@
import com.netflix.genie.common.internal.exceptions.unchecked.GenieJobNotFoundException;
import com.netflix.genie.common.internal.exceptions.unchecked.GenieJobSpecificationNotFoundException;
import com.netflix.genie.common.internal.exceptions.unchecked.GenieRuntimeException;
import com.netflix.genie.web.exceptions.checked.AttachmentTooLargeException;
import com.netflix.genie.web.exceptions.checked.IdAlreadyExistsException;
import com.netflix.genie.web.exceptions.checked.JobNotFoundException;
import com.netflix.genie.web.exceptions.checked.NotFoundException;
Expand Down Expand Up @@ -135,6 +136,8 @@ public ResponseEntity<GenieCheckedException> handleGenieCheckedException(final G
return new ResponseEntity<>(e, HttpStatus.NOT_FOUND);
} else if (e instanceof PreconditionFailedException) {
return new ResponseEntity<>(e, HttpStatus.BAD_REQUEST);
} else if (e instanceof AttachmentTooLargeException) {
return new ResponseEntity<>(e, HttpStatus.PAYLOAD_TOO_LARGE);
} else {
return new ResponseEntity<>(e, HttpStatus.INTERNAL_SERVER_ERROR);
}
Expand Down
Expand Up @@ -61,6 +61,7 @@
import com.netflix.genie.web.services.JobCoordinatorService;
import com.netflix.genie.web.services.JobDirectoryServerService;
import com.netflix.genie.web.services.JobLaunchService;
import com.netflix.genie.web.services.LegacyAttachmentService;
import com.netflix.genie.web.util.JobExecutionModeSelector;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.MeterRegistry;
Expand Down Expand Up @@ -157,9 +158,10 @@ public class JobRestController {
private final AgentRoutingService agentRoutingService;
private final PersistenceService persistenceService;
private final Environment environment;
private final AttachmentService attachmentService;

// TODO: V3 Execution only
private final AttachmentService attachmentService;
private final LegacyAttachmentService legacyAttachmentService;
private final JobExecutionModeSelector jobExecutionModeSelector;

// Metrics
Expand All @@ -168,7 +170,6 @@ public class JobRestController {

/**
* Constructor.
*
* @param jobLaunchService The {@link JobLaunchService} implementation to use
* @param dataServices The {@link DataServices} instance to use
* @param jobCoordinatorService The job coordinator service to use.
Expand All @@ -181,6 +182,7 @@ public class JobRestController {
* @param agentRoutingService Agent routing service
* @param environment The application environment to pull dynamic properties from
* @param attachmentService The attachment service to use to save attachments.
* @param legacyAttachmentService The attachment service to use to save attachments.
* @param jobExecutionModeSelector The execution mode (agent vs. embedded) mode selector
*/
@Autowired
Expand All @@ -198,6 +200,7 @@ public JobRestController(
final AgentRoutingService agentRoutingService,
final Environment environment,
final AttachmentService attachmentService,
final LegacyAttachmentService legacyAttachmentService,
final JobExecutionModeSelector jobExecutionModeSelector
) {
this.jobLaunchService = jobLaunchService;
Expand All @@ -217,9 +220,10 @@ public JobRestController(
this.agentRoutingService = agentRoutingService;
this.persistenceService = dataServices.getPersistenceService();
this.environment = environment;
this.attachmentService = attachmentService;

// TODO: V3 Only. Remove.
this.attachmentService = attachmentService;
this.legacyAttachmentService = legacyAttachmentService;
this.jobExecutionModeSelector = jobExecutionModeSelector;

// Set up the metrics
Expand Down Expand Up @@ -897,7 +901,7 @@ private String embeddedExecution(
if (originalFilename == null) {
originalFilename = UUID.randomUUID().toString();
}
this.attachmentService.save(jobId, originalFilename, attachment.getInputStream());
this.legacyAttachmentService.save(jobId, originalFilename, attachment.getInputStream());
} catch (final IOException ioe) {
throw new GenieServerException("Failed to save job attachment", ioe);
}
Expand Down Expand Up @@ -946,10 +950,13 @@ private String agentExecution(

if (attachments != null) {
jobSubmissionBuilder.withAttachments(
Arrays
.stream(attachments)
.map(MultipartFile::getResource)
.collect(Collectors.toSet())
this.attachmentService.saveAttachments(
jobRequest.getId().orElse(null),
Arrays
.stream(attachments)
.map(MultipartFile::getResource)
.collect(Collectors.toSet())
)
);
}

Expand Down
Expand Up @@ -48,7 +48,7 @@
import com.netflix.genie.web.exceptions.checked.IdAlreadyExistsException;
import com.netflix.genie.web.exceptions.checked.NotFoundException;
import com.netflix.genie.web.exceptions.checked.PreconditionFailedException;
import com.netflix.genie.web.exceptions.checked.SaveAttachmentException;
import com.netflix.genie.web.services.LegacyAttachmentService;
import org.springframework.data.domain.Page;
import org.springframework.data.domain.Pageable;
import org.springframework.validation.annotation.Validated;
Expand Down Expand Up @@ -719,20 +719,15 @@ long deleteJobsCreatedBefore(
* The underlying attachment storage system must be accessible by the agent process configured by the system. For
* example if the server is set up to write attachments to local disk but the agent is not running locally but
* instead on the remote system it will not be able to access those attachments (as dependencies) and fail.
* See {@link com.netflix.genie.web.services.AttachmentService} for more information.
* See {@link LegacyAttachmentService} for more information.
*
* @param jobSubmission All the information the system has gathered regarding the job submission from the user
* either via the API or via the agent CLI
* @return The unique id of the job within the Genie ecosystem
* @throws IdAlreadyExistsException If the id the user requested already exists in the system for another job
* @throws SaveAttachmentException If attachments were sent in with the job submission and they were unable to be
* persisted to an underlying storage mechanism meant to share the data with an
* agent process
*/
@Nonnull
String saveJobSubmission(@Valid JobSubmission jobSubmission) throws
IdAlreadyExistsException,
SaveAttachmentException;
String saveJobSubmission(@Valid JobSubmission jobSubmission) throws IdAlreadyExistsException;

/**
* Get the original request for a job.
Expand Down

0 comments on commit 3f5c8fa

Please sign in to comment.