Skip to content

Commit

Permalink
Enforce job directory limits for agent jobs
Browse files Browse the repository at this point in the history
* Add service that monitors total max size, max single file size, number of files and kills the job if limits are exceeded.
* Introduce new properties allowing server runtime override
* Handle job killed by limit exceeded
  • Loading branch information
mprimi committed Jun 25, 2020
1 parent 5910a83 commit fca239d
Show file tree
Hide file tree
Showing 18 changed files with 492 additions and 13 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@
import com.netflix.genie.agent.execution.services.AgentHeartBeatService;
import com.netflix.genie.agent.execution.services.AgentJobKillService;
import com.netflix.genie.agent.execution.services.AgentJobService;
import com.netflix.genie.agent.execution.services.JobMonitorService;
import com.netflix.genie.agent.execution.services.JobSetupService;
import com.netflix.genie.agent.execution.statemachine.ExecutionContext;
import com.netflix.genie.agent.execution.statemachine.ExecutionStage;
Expand Down Expand Up @@ -165,7 +166,7 @@ HandshakeStage handshakeStage(final AgentJobService agentJobService) {
/**
* Create a {@link ConfigureAgentStage} bean if one is not already defined.
*
* @param agentJobService the agent job service
* @param agentJobService the agent job service
*/
@Bean
@Lazy
Expand Down Expand Up @@ -404,13 +405,17 @@ SetJobStatusRunning setJobStatusRunning(final AgentJobService agentJobService) {
* Create a {@link WaitJobCompletionStage} bean if one is not already defined.
*
* @param jobProcessManager the job process manager
* @param jobMonitorService the job monitor service
*/
@Bean
@Lazy
@Order(170)
@ConditionalOnMissingBean(WaitJobCompletionStage.class)
WaitJobCompletionStage waitJobCompletionStage(final JobProcessManager jobProcessManager) {
return new WaitJobCompletionStage(jobProcessManager);
WaitJobCompletionStage waitJobCompletionStage(
final JobProcessManager jobProcessManager,
final JobMonitorService jobMonitorService
) {
return new WaitJobCompletionStage(jobProcessManager, jobMonitorService);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -212,6 +212,10 @@ public JobProcessResult waitFor() throws InterruptedException {
return new JobProcessResult
.Builder(JobStatus.KILLED, JobStatusMessages.JOB_EXCEEDED_TIMEOUT, exitCode)
.build();
case FILES_LIMIT:
return new JobProcessResult
.Builder(JobStatus.KILLED, JobStatusMessages.JOB_EXCEEDED_FILES_LIMIT, exitCode)
.build();
case API_KILL_REQUEST:
case SYSTEM_SIGNAL:
default:
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
/*
*
* Copyright 2020 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package com.netflix.genie.agent.execution.services;

import java.nio.file.Path;

/**
* Service that monitors the job directory and may decide to kill the job if some limit is exceeded.
* For example, if a file grows larger than some amount.
*
* @author mprimi
* @since 4.0.0
*/
public interface JobMonitorService {

/**
* Starts the service.
*
* @param jobDirectory the job directory
*/
void start(Path jobDirectory);

/**
* Stop the service.
*/
void stop();
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,10 @@ enum KillSource {
* The job has exceeded its max execution duration.
*/
TIMEOUT,
/**
* The job has exceeded one of the files limit (files count, file size, etc.).
*/
FILES_LIMIT,
}

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,125 @@
/*
*
* Copyright 2020 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package com.netflix.genie.agent.execution.services.impl;

import com.netflix.genie.agent.execution.services.JobMonitorService;
import com.netflix.genie.agent.execution.services.KillService;
import com.netflix.genie.agent.properties.AgentProperties;
import com.netflix.genie.agent.properties.JobMonitorServiceProperties;
import com.netflix.genie.common.internal.dtos.DirectoryManifest;
import com.netflix.genie.common.internal.services.JobDirectoryManifestCreatorService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.scheduling.TaskScheduler;
import org.springframework.util.unit.DataSize;

import java.io.IOException;
import java.nio.file.Path;
import java.util.Comparator;
import java.util.Optional;
import java.util.concurrent.ScheduledFuture;

/**
* Implementation of {@link JobMonitorService} that periodically checks on the size and number of files
* using the manifest creator, rather than looking at the actual files.
* This implementation is not thread safe.
*
* @author mprimi
* @since 4.0.0
*/
@Slf4j
class JobMonitorServiceImpl implements JobMonitorService {
private final KillService killService;
private final JobDirectoryManifestCreatorService manifestCreatorService;
private final TaskScheduler taskScheduler;
private final JobMonitorServiceProperties properties;
private ScheduledFuture<?> scheduledCheck;

JobMonitorServiceImpl(
final KillService killService,
final JobDirectoryManifestCreatorService manifestCreatorService,
final TaskScheduler taskScheduler,
final AgentProperties agentProperties) {
this.killService = killService;
this.manifestCreatorService = manifestCreatorService;
this.taskScheduler = taskScheduler;
this.properties = agentProperties.getJobMonitorService();
}

@Override
public void start(final Path jobDirectory) {
this.scheduledCheck = this.taskScheduler.scheduleAtFixedRate(
() -> this.checkFilesSize(jobDirectory),
this.properties.getCheckInterval()
);

}

@Override
public void stop() {
if (this.scheduledCheck != null) {
this.scheduledCheck.cancel(true);
}
}

private void checkFilesSize(final Path jobDirectory) {
final DirectoryManifest manifest;
try {
manifest = this.manifestCreatorService.getDirectoryManifest(jobDirectory);
} catch (IOException e) {
log.warn("Failed to obtain manifest: {}" + e.getMessage());
return;
}

final int files = manifest.getNumFiles();
final int maxFiles = this.properties.getMaxFiles();
if (files > maxFiles) {
log.error("Limit exceeded, too many files: {}/{}", files, maxFiles);
this.killService.kill(KillService.KillSource.FILES_LIMIT);
return;
}

final DataSize totalSize = DataSize.ofBytes(manifest.getTotalSizeOfFiles());
final DataSize maxTotalSize = this.properties.getMaxTotalSize();
if (totalSize.toBytes() > maxTotalSize.toBytes()) {
log.error("Limit exceeded, job directory too large: {}/{}", totalSize, maxTotalSize);
this.killService.kill(KillService.KillSource.FILES_LIMIT);
return;
}

final Optional<DirectoryManifest.ManifestEntry> largestFile = manifest.getFiles()
.stream()
.max(Comparator.comparing(DirectoryManifest.ManifestEntry::getSize));

if (largestFile.isPresent()) {
final DataSize largestFileSize = DataSize.ofBytes(largestFile.get().getSize());
final DataSize maxFileSize = this.properties.getMaxFileSize();
if (largestFileSize.toBytes() > maxFileSize.toBytes()) {
log.error(
"Limit exceeded, file too large: {}/{} ({})",
largestFileSize,
maxFileSize,
largestFile.get().getPath()
);
this.killService.kill(KillService.KillSource.FILES_LIMIT);
return;
}
}

log.debug("No files limit exceeded");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,11 +20,13 @@
import com.netflix.genie.agent.cli.ArgumentDelegates;
import com.netflix.genie.agent.execution.services.DownloadService;
import com.netflix.genie.agent.execution.services.FetchingCacheService;
import com.netflix.genie.agent.execution.services.JobMonitorService;
import com.netflix.genie.agent.execution.services.JobSetupService;
import com.netflix.genie.agent.execution.services.KillService;
import com.netflix.genie.agent.properties.AgentProperties;
import com.netflix.genie.agent.utils.locks.impl.FileLockFactory;
import com.netflix.genie.common.internal.configs.AwsAutoConfiguration;
import com.netflix.genie.common.internal.services.JobDirectoryManifestCreatorService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.boot.autoconfigure.AutoConfigureAfter;
Expand All @@ -35,6 +37,7 @@
import org.springframework.context.annotation.Lazy;
import org.springframework.core.io.ResourceLoader;
import org.springframework.core.task.TaskExecutor;
import org.springframework.scheduling.TaskScheduler;

import java.io.IOException;

Expand Down Expand Up @@ -121,4 +124,25 @@ public JobSetupService jobSetupService(
) {
return new JobSetupServiceImpl(downloadService);
}

/**
* Provide a lazy {@link JobMonitorService} bean if one hasn't already been defined.
*
* @param killService the kill service
* @param manifestCreatorService the manifest creator service
* @param taskScheduler the task scheduler
* @param agentProperties the agent properties
* @return A {@link JobMonitorServiceImpl} instance
*/
@Bean
@Lazy
@ConditionalOnMissingBean(JobMonitorService.class)
public JobMonitorServiceImpl jobMonitorService(
final KillService killService,
final JobDirectoryManifestCreatorService manifestCreatorService,
@Qualifier("sharedAgentTaskScheduler") final TaskScheduler taskScheduler,
final AgentProperties agentProperties
) {
return new JobMonitorServiceImpl(killService, manifestCreatorService, taskScheduler, agentProperties);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import com.netflix.genie.agent.cli.logging.ConsoleLog;
import com.netflix.genie.agent.execution.process.JobProcessManager;
import com.netflix.genie.agent.execution.process.JobProcessResult;
import com.netflix.genie.agent.execution.services.JobMonitorService;
import com.netflix.genie.agent.execution.statemachine.ExecutionContext;
import com.netflix.genie.agent.execution.statemachine.ExecutionStage;
import com.netflix.genie.agent.execution.statemachine.FatalJobExecutionException;
Expand All @@ -28,6 +29,7 @@
import com.netflix.genie.common.external.dtos.v4.JobStatus;
import lombok.extern.slf4j.Slf4j;


/**
* Wait for job process to exit.
*
Expand All @@ -37,15 +39,21 @@
@Slf4j
public class WaitJobCompletionStage extends ExecutionStage {
private final JobProcessManager jobProcessManager;
private final JobMonitorService jobMonitorService;

/**
* Constructor.
*
* @param jobProcessManager the job process manager
* @param jobMonitorService the job monitor service
*/
public WaitJobCompletionStage(final JobProcessManager jobProcessManager) {
public WaitJobCompletionStage(
final JobProcessManager jobProcessManager,
final JobMonitorService jobMonitorService
) {
super(States.WAIT_JOB_COMPLETION);
this.jobProcessManager = jobProcessManager;
this.jobMonitorService = jobMonitorService;
}

@Override
Expand All @@ -56,11 +64,14 @@ protected void attemptStageAction(
// In case of abort, this state may be reached even if there was no attempt to launch the process.
if (executionContext.isJobLaunched()) {
log.info("Monitoring job process");
this.jobMonitorService.start(executionContext.getJobDirectory().toPath());
final JobProcessResult jobProcessResult;
try {
jobProcessResult = this.jobProcessManager.waitFor();
} catch (final InterruptedException e) {
throw createFatalException(e);
} finally {
this.jobMonitorService.stop();
}

executionContext.setJobProcessResult(jobProcessResult);
Expand All @@ -70,6 +81,5 @@ protected void attemptStageAction(
} else {
log.debug("Job not launched, skipping");
}

}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,4 +43,5 @@ public class AgentProperties {
private FileStreamServiceProperties fileStreamService = new FileStreamServiceProperties();
private HeartBeatServiceProperties heartBeatService = new HeartBeatServiceProperties();
private JobKillServiceProperties jobKillService = new JobKillServiceProperties();
private JobMonitorServiceProperties jobMonitorService = new JobMonitorServiceProperties();
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
/*
*
* Copyright 2020 Netflix, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*
*/
package com.netflix.genie.agent.properties;

import com.netflix.genie.agent.execution.services.JobMonitorService;
import lombok.Getter;
import lombok.Setter;
import org.springframework.util.unit.DataSize;

import java.time.Duration;

/**
* Properties of {@link JobMonitorService}.
*
* @author mprimi
* @since 4.0.0
*/
@Getter
@Setter
public class JobMonitorServiceProperties {
private Duration checkInterval = Duration.ofMinutes(1);
private int maxFiles = 64_000;
private DataSize maxTotalSize = DataSize.ofGigabytes(16);
private DataSize maxFileSize = DataSize.ofGigabytes(8);
}
5 changes: 5 additions & 0 deletions genie-agent/src/main/resources/genie-agent-defaults.yml
Original file line number Diff line number Diff line change
Expand Up @@ -59,6 +59,11 @@ genie:
min-delay: 500ms
max-delay: 5s
factor: 1.2
job-monitor-service:
check-interval: 1m
max-files: 64000
max-file-size: 8GB
max-total-size: 16GB

spring:
banner:
Expand Down
Loading

0 comments on commit fca239d

Please sign in to comment.