Skip to content

Commit

Permalink
Merge 93ba086 into 80e3cd9
Browse files Browse the repository at this point in the history
  • Loading branch information
mprimi committed Sep 18, 2019
2 parents 80e3cd9 + 93ba086 commit 523dfab
Show file tree
Hide file tree
Showing 33 changed files with 440 additions and 196 deletions.
Expand Up @@ -40,7 +40,8 @@ public interface JobProcessManager extends ApplicationListener<KillService.KillE
*
* @param jobDirectory Job directory
* @param environmentVariables additional environment variables (to merge on top of inherited environment)
* @param commandLine command-line executable and arguments
* @param commandArguments command-line executable and its fixed arguments arguments
* @param jobArguments job-specific arguments
* @param interactive launch in interactive mode (inherit I/O) or batch (no input, write outputs to files)
* @param timeout The optional number of seconds this job is allowed to run before the system will
* kill it
Expand All @@ -49,7 +50,8 @@ public interface JobProcessManager extends ApplicationListener<KillService.KillE
void launchProcess(
File jobDirectory,
Map<String, String> environmentVariables,
List<String> commandLine,
List<String> commandArguments,
List<String> jobArguments,
boolean interactive,
@Nullable Integer timeout
) throws JobLaunchException;
Expand Down
Expand Up @@ -17,6 +17,7 @@
*/
package com.netflix.genie.agent.execution.process.impl;

import com.google.common.collect.Lists;
import com.netflix.genie.agent.cli.UserConsole;
import com.netflix.genie.agent.execution.exceptions.JobLaunchException;
import com.netflix.genie.agent.execution.process.JobProcessManager;
Expand Down Expand Up @@ -78,7 +79,8 @@ public JobProcessManagerImpl(final TaskScheduler taskScheduler) {
public void launchProcess(
final File jobDirectory,
final Map<String, String> environmentVariablesMap,
final List<String> commandLine,
final List<String> commandArguments,
final List<String> jobArguments,
final boolean interactive,
@Nullable final Integer timeout
) throws JobLaunchException {
Expand Down Expand Up @@ -125,30 +127,38 @@ public void launchProcess(
});

// Validate arguments
if (commandLine == null) {
if (commandArguments == null) {
throw new JobLaunchException("Job command-line arguments is null");
} else if (commandLine.isEmpty()) {
} else if (commandArguments.isEmpty()) {
throw new JobLaunchException("Job command-line arguments are empty");
}

// Configure arguments
log.info("Job command-line: {}", Arrays.toString(commandLine.toArray()));
log.info(
"Job executable: {} and arguments: {}",
Arrays.toString(commandArguments.toArray()),
Arrays.toString(jobArguments.toArray())
);

final List<String> expandedCommandLine;
final List<String> expandedExecutableAndArguments;
try {
expandedCommandLine = expandCommandLineVariables(
commandLine,
expandedExecutableAndArguments = expandCommandLineVariables(
commandArguments,
Collections.unmodifiableMap(currentEnvironmentVariables)
);
} catch (final EnvUtils.VariableSubstitutionException e) {
throw new JobLaunchException("Job command-line arguments variables could not be expanded");
}

if (!commandLine.equals(expandedCommandLine)) {
log.info("Job command-line with variables expanded: {}", Arrays.toString(expandedCommandLine.toArray()));
final List<String> commandLine = Lists.newArrayList();
commandLine.addAll(expandedExecutableAndArguments);
commandLine.addAll(jobArguments);

if (!commandArguments.equals(expandedExecutableAndArguments)) {
log.info("Job command-line after expansion: {}", Arrays.toString(commandLine.toArray()));
}

processBuilder.command(expandedCommandLine);
processBuilder.command(commandLine);

if (interactive) {
processBuilder.inheritIO();
Expand Down
Expand Up @@ -30,7 +30,6 @@
import lombok.extern.slf4j.Slf4j;

import java.io.File;
import java.util.List;
import java.util.Map;

/**
Expand Down Expand Up @@ -74,17 +73,16 @@ protected Events executeStateAction(final ExecutionContext executionContext) {
final JobSpecification jobSpec = executionContext.getJobSpecification().get();
final File jobDirectory = executionContext.getJobDirectory().get();
final Map<String, String> jobEnvironment = executionContext.getJobEnvironment().get();
final List<String> jobCommandLine = jobSpec.getCommandArgs();
final boolean interactive = jobSpec.isInteractive();

try {
this.jobProcessManager.launchProcess(
jobDirectory,
jobEnvironment,
jobCommandLine,
jobSpec.getExecutableArgs(),
jobSpec.getJobArgs(),
interactive,
jobSpec.getTimeout().orElse(null)
);
jobSpec.getTimeout().orElse(null));
} catch (final JobLaunchException e) {
throw new RuntimeException("Failed to launch job", e);
}
Expand Down
Expand Up @@ -172,7 +172,7 @@ class ResolveJobSpecCommandSpec extends Specification {
1 * jobRequestConverter.agentJobRequestArgsToDTO(jobArgs) >> jobRequest
1 * service.resolveJobSpecificationDryRun(jobRequest) >> spec
1 * commandArgs.isPrintRequestDisabled() >> false
1 * spec.getCommandArgs() >> { throw new IOException("") }
1 * spec.getExecutableArgs() >> { throw new IOException("") }
def e = thrown(RuntimeException)
e.getCause() instanceof JsonProcessingException
}
Expand Down
Expand Up @@ -70,7 +70,8 @@ class JobProcessManagerImplSpec extends Specification {
this.manager.launchProcess(
this.temporaryFolder.getRoot(),
this.envMap,
["touch", expectedFile.getAbsolutePath()],
["touch"],
[expectedFile.getAbsolutePath()],
true,
null
)
Expand Down Expand Up @@ -101,7 +102,8 @@ class JobProcessManagerImplSpec extends Specification {
this.manager.launchProcess(
this.temporaryFolder.getRoot(),
this.envMap,
["\${ECHO_COMMAND}", helloWorld],
["\${ECHO_COMMAND}"],
[helloWorld],
false,
null
)
Expand Down Expand Up @@ -134,6 +136,7 @@ class JobProcessManagerImplSpec extends Specification {
this.temporaryFolder.getRoot(),
this.envMap,
["env"],
[],
false,
null
)
Expand Down Expand Up @@ -163,7 +166,8 @@ class JobProcessManagerImplSpec extends Specification {
this.manager.launchProcess(
this.temporaryFolder.getRoot(),
this.envMap,
["rm", nonExistentFile.absolutePath],
["rm"],
[nonExistentFile.absolutePath],
false,
null
)
Expand Down Expand Up @@ -194,6 +198,7 @@ class JobProcessManagerImplSpec extends Specification {
this.temporaryFolder.getRoot(),
this.envMap,
[uuid],
[],
false,
null
)
Expand All @@ -209,6 +214,7 @@ class JobProcessManagerImplSpec extends Specification {
this.temporaryFolder.getRoot(),
this.envMap,
["\$COMMAND"],
[],
false,
null
)
Expand All @@ -224,6 +230,7 @@ class JobProcessManagerImplSpec extends Specification {
null,
this.envMap,
["echo"],
[],
false,
null
)
Expand All @@ -239,6 +246,7 @@ class JobProcessManagerImplSpec extends Specification {
this.temporaryFolder.newFile("foo"),
this.envMap,
["echo"],
[],
false,
null
)
Expand All @@ -254,6 +262,7 @@ class JobProcessManagerImplSpec extends Specification {
new File(this.temporaryFolder.getRoot(), "foo"),
this.envMap,
["echo"],
[],
false,
null
)
Expand All @@ -269,6 +278,7 @@ class JobProcessManagerImplSpec extends Specification {
this.temporaryFolder.getRoot(),
null,
["echo"],
[],
false,
null
)
Expand All @@ -284,6 +294,7 @@ class JobProcessManagerImplSpec extends Specification {
this.temporaryFolder.getRoot(),
this.envMap,
null,
null,
false,
null
)
Expand All @@ -299,6 +310,7 @@ class JobProcessManagerImplSpec extends Specification {
this.temporaryFolder.getRoot(),
this.envMap,
[],
[],
false,
null
)
Expand All @@ -315,7 +327,8 @@ class JobProcessManagerImplSpec extends Specification {
this.manager.launchProcess(
this.temporaryFolder.getRoot(),
this.envMap,
["sleep", "60"],
["sleep"],
["60"],
true,
59
)
Expand Down Expand Up @@ -349,7 +362,8 @@ class JobProcessManagerImplSpec extends Specification {
this.manager.launchProcess(
this.temporaryFolder.getRoot(),
this.envMap,
["sleep", "60"],
["sleep"],
["60"],
true,
null
)
Expand Down Expand Up @@ -385,7 +399,8 @@ class JobProcessManagerImplSpec extends Specification {
this.manager.launchProcess(
this.temporaryFolder.getRoot(),
this.envMap,
["echo", "foo"],
["echo"],
["foo"],
true,
null
)
Expand Down Expand Up @@ -425,7 +440,8 @@ class JobProcessManagerImplSpec extends Specification {
this.manager.launchProcess(
this.temporaryFolder.getRoot(),
this.envMap,
["echo", "foo"],
["echo"],
["foo"],
true,
10
)
Expand All @@ -452,7 +468,8 @@ class JobProcessManagerImplSpec extends Specification {
this.manager.launchProcess(
this.temporaryFolder.getRoot(),
this.envMap,
["echo", "foo"],
["echo"],
["foo"],
true,
null
)
Expand All @@ -465,7 +482,8 @@ class JobProcessManagerImplSpec extends Specification {
this.manager.launchProcess(
this.temporaryFolder.getRoot(),
this.envMap,
["echo", "foo"],
["echo"],
["foo"],
true,
null
)
Expand Down Expand Up @@ -496,7 +514,8 @@ class JobProcessManagerImplSpec extends Specification {
realManager.launchProcess(
this.temporaryFolder.getRoot(),
this.envMap,
["sleep", "60"],
["sleep"],
["60"],
true,
1
)
Expand Down
Expand Up @@ -36,7 +36,8 @@ class LaunchJobActionSpec extends Specification {
JobProcessManager jobProcessManager
AgentJobService agentJobService
File jobDirectory
List<String> jobCommandLine
List<String> commandArgs
List<String> jobArgs
boolean interactive

void setup() {
Expand All @@ -45,7 +46,8 @@ class LaunchJobActionSpec extends Specification {
this.jobSpec = Mock(JobSpecification)
this.jobDirectory = Mock(File)
this.jobEnvironment = Mock(Map)
this.jobCommandLine = Mock(List)
this.commandArgs = Mock(List)
this.jobArgs = Mock(List)
this.interactive = true
this.jobProcessManager = Mock(JobProcessManager)
this.agentJobService = Mock(AgentJobService)
Expand All @@ -63,10 +65,11 @@ class LaunchJobActionSpec extends Specification {
1 * executionContext.getJobSpecification() >> Optional.of(jobSpec)
1 * executionContext.getJobDirectory() >> Optional.of(jobDirectory)
1 * executionContext.getJobEnvironment() >> Optional.of(jobEnvironment)
1 * jobSpec.getCommandArgs() >> jobCommandLine
1 * jobSpec.getExecutableArgs() >> commandArgs
1 * jobSpec.getJobArgs() >> jobArgs
1 * jobSpec.isInteractive() >> interactive
1 * jobSpec.getTimeout() >> Optional.ofNullable(10)
1 * jobProcessManager.launchProcess(jobDirectory, jobEnvironment, jobCommandLine, interactive, 10)
1 * jobProcessManager.launchProcess(jobDirectory, jobEnvironment, commandArgs, jobArgs, interactive, 10)
1 * executionContext.getClaimedJobId() >> Optional.of(id)
1 * agentJobService.changeJobStatus(id, JobStatus.INIT, JobStatus.RUNNING, _ as String)
1 * executionContext.setCurrentJobStatus(JobStatus.RUNNING)
Expand All @@ -85,10 +88,11 @@ class LaunchJobActionSpec extends Specification {
1 * executionContext.getJobSpecification() >> Optional.of(jobSpec)
1 * executionContext.getJobDirectory() >> Optional.of(jobDirectory)
1 * executionContext.getJobEnvironment() >> Optional.of(jobEnvironment)
1 * jobSpec.getCommandArgs() >> jobCommandLine
1 * jobSpec.getExecutableArgs() >> commandArgs
1 * jobSpec.getJobArgs() >> jobArgs
1 * jobSpec.isInteractive() >> interactive
1 * jobSpec.getTimeout() >> Optional.ofNullable(null)
1 * jobProcessManager.launchProcess(jobDirectory, jobEnvironment, jobCommandLine, interactive, null) >> {
1 * jobProcessManager.launchProcess(jobDirectory, jobEnvironment, commandArgs, jobArgs, interactive, null) >> {
throw exception
}
def e = thrown(RuntimeException)
Expand All @@ -105,10 +109,11 @@ class LaunchJobActionSpec extends Specification {
1 * executionContext.getJobSpecification() >> Optional.of(jobSpec)
1 * executionContext.getJobDirectory() >> Optional.of(jobDirectory)
1 * executionContext.getJobEnvironment() >> Optional.of(jobEnvironment)
1 * jobSpec.getCommandArgs() >> jobCommandLine
1 * jobSpec.getExecutableArgs() >> commandArgs
1 * jobSpec.getJobArgs() >> jobArgs
1 * jobSpec.isInteractive() >> interactive
1 * jobSpec.getTimeout() >> Optional.ofNullable(null)
1 * jobProcessManager.launchProcess(jobDirectory, jobEnvironment, jobCommandLine, interactive, null)
1 * jobProcessManager.launchProcess(jobDirectory, jobEnvironment, commandArgs, jobArgs, interactive, null)
1 * executionContext.getClaimedJobId() >> Optional.of(id)
1 * agentJobService.changeJobStatus(id, JobStatus.INIT, JobStatus.RUNNING, _ as String) >> { throw exception }
0 * executionContext.setCurrentJobStatus(_)
Expand Down
Expand Up @@ -429,12 +429,14 @@ public void testClusterDependenciesMethods() throws Exception {
@Test
public void testClusterCommandsMethods() throws Exception {

final List<String> executableAndArgs = Lists.newArrayList("exec");

final Command foo = new Command.Builder(
"name",
"user",
"version",
CommandStatus.ACTIVE,
"exec",
executableAndArgs,
5
).withId("foo")
.build();
Expand All @@ -446,7 +448,7 @@ public void testClusterCommandsMethods() throws Exception {
"user",
"version",
CommandStatus.ACTIVE,
"exec",
executableAndArgs,
5
).withId("bar")
.build();
Expand All @@ -458,7 +460,7 @@ public void testClusterCommandsMethods() throws Exception {
"user",
"version",
CommandStatus.ACTIVE,
"exec",
executableAndArgs,
5
).withId("pi")
.build();
Expand Down

0 comments on commit 523dfab

Please sign in to comment.