Skip to content

Commit

Permalink
Merge 597799a into 91ee78c
Browse files Browse the repository at this point in the history
  • Loading branch information
mprimi committed Feb 20, 2020
2 parents 91ee78c + 597799a commit 2904d18
Show file tree
Hide file tree
Showing 35 changed files with 1,108 additions and 1,657 deletions.
1 change: 1 addition & 0 deletions genie-agent/build.gradle
Expand Up @@ -4,6 +4,7 @@ license {
exclude "*.yml"
exclude "*.xml"
exclude "*.txt"
exclude "*.test.sh"
exclude "META-INF/spring.factories"
}

Expand Down
Expand Up @@ -17,6 +17,7 @@
*/
package com.netflix.genie.agent.cli;

import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.netflix.genie.common.external.dtos.v4.AgentConfigRequest;
import com.netflix.genie.common.external.dtos.v4.AgentJobRequest;
Expand All @@ -32,6 +33,8 @@
import javax.validation.constraints.NotEmpty;
import java.util.List;
import java.util.Set;
import java.util.regex.Matcher;
import java.util.stream.Collectors;

/**
* Convert job request arguments delegate into an AgentJobRequest.
Expand All @@ -41,6 +44,9 @@
*/
public class JobRequestConverter {

private static final String SINGLE_QUOTE = "'";
private static final String ESCAPED_SINGLE_QUOTE = "'\\''";
private static final String SPACE = " ";
private final Validator validator;

JobRequestConverter(final Validator validator) {
Expand Down Expand Up @@ -107,13 +113,17 @@ public AgentJobRequest agentJobRequestArgsToDTO(
jobRequestArguments.getJobSetup()
);

// Convert split, parsed arguments received via command-line back to the same single-string, unparsed
// format that comes through the V3 API.
final List<String> commandArguments = getV3ArgumentsString(jobRequestArguments.getCommandArguments());

final AgentJobRequest agentJobRequest = new AgentJobRequest.Builder(
jobMetadataBuilder.build(),
criteria,
requestedAgentConfig,
jobArchivalDataRequest
)
.withCommandArgs(jobRequestArguments.getCommandArguments())
.withCommandArgs(commandArguments)
.withRequestedId(jobRequestArguments.getJobId())
.withResources(jobExecutionResources)
.build();
Expand All @@ -127,6 +137,23 @@ public AgentJobRequest agentJobRequestArgsToDTO(
return agentJobRequest;
}

// In order to make all command arguments look the same in a job specification object, transform the
// split and unwrapped arguments back into a quoted string.
// This allows the downstream agent code to make no special case.
// All arguments are coming in as they would from the V3 API.
// This means:
// - Escape single-quotes
// - Wrap each token in single quotes
// - Join everything into a single string and send it as a list with just one element
private List<String> getV3ArgumentsString(final List<String> commandArguments) {
return Lists.newArrayList(
commandArguments.stream()
.map(s -> s.replaceAll(SINGLE_QUOTE, Matcher.quoteReplacement(ESCAPED_SINGLE_QUOTE)))
.map(s -> SINGLE_QUOTE + s + SINGLE_QUOTE)
.collect(Collectors.joining(SPACE))
);
}

/**
* Exception thrown in case of conversion error due to resulting object failing validation.
*/
Expand Down
Expand Up @@ -28,7 +28,6 @@
import javax.validation.constraints.NotBlank;
import java.io.File;
import java.util.List;
import java.util.Map;
import java.util.Optional;

/**
Expand Down Expand Up @@ -68,20 +67,6 @@ public interface ExecutionContext {
*/
void setJobSpecification(JobSpecification jobSpecification);

/**
* Get the environment variables map for the job process.
*
* @return a map of environment variables and values if one was set, or empty
*/
Optional<Map<String, String>> getJobEnvironment();

/**
* Set the job environment variables map.
*
* @param jobEnvironment a map of environment variables and their value to be passed to the job process at launch
*/
void setJobEnvironment(Map<String, String> jobEnvironment);

/**
* Enqueue cleanup for a state action.
*
Expand Down Expand Up @@ -165,4 +150,18 @@ void addStateActionError(
* @param jobId the job id
*/
void setClaimedJobId(@NotBlank String jobId);

/**
* Set the job script (a.k.a. run file).
*
* @param jobScript the job script file
*/
void setJobScript(File jobScript);

/**
* Get the job script (a.k.a. run file) if one was generated.
*
* @return a file handle to the script or empty
*/
Optional<File> getJobScript();
}
Expand Up @@ -31,7 +31,6 @@
import java.io.File;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.atomic.AtomicReference;

Expand All @@ -47,12 +46,12 @@ class ExecutionContextImpl implements ExecutionContext {

private final AtomicReference<File> jobDirectoryRef = new AtomicReference<>();
private final AtomicReference<JobSpecification> jobSpecRef = new AtomicReference<>();
private final AtomicReference<Map<String, String>> jobEnvironmentRef = new AtomicReference<>();
private final AtomicReference<JobStatus> finalJobStatusRef = new AtomicReference<>();
private final AtomicReference<JobStatus> currentJobStatusRef = new AtomicReference<>();
private final AtomicReference<String> claimedJobIdRef = new AtomicReference<>();
private final List<StateAction> cleanupActions = Lists.newArrayList();
private final List<Triple<States, Class<? extends Action>, Exception>> stateActionErrors = Lists.newArrayList();
private final AtomicReference<File> jobScriptRef = new AtomicReference<>();

ExecutionContextImpl() {
}
Expand Down Expand Up @@ -95,22 +94,6 @@ public void setJobSpecification(final JobSpecification jobSpecification) {
setIfNullOrThrow(jobSpecification, jobSpecRef);
}

/**
* {@inheritDoc}
*/
@Override
public Optional<Map<String, String>> getJobEnvironment() {
return Optional.ofNullable(jobEnvironmentRef.get());
}

/**
* {@inheritDoc}
*/
@Override
public void setJobEnvironment(final Map<String, String> jobEnvironment) {
setIfNullOrThrow(jobEnvironment, jobEnvironmentRef);
}

/**
* {@inheritDoc}
*/
Expand Down Expand Up @@ -215,4 +198,20 @@ public Optional<String> getClaimedJobId() {
public void setClaimedJobId(@NotBlank final String jobId) {
setIfNullOrThrow(jobId, claimedJobIdRef);
}

/**
* {@inheritDoc}
*/
@Override
public void setJobScript(final File jobScript) {
setIfNullOrThrow(jobScript, jobScriptRef);
}

/**
* {@inheritDoc}
*/
@Override
public Optional<File> getJobScript() {
return Optional.ofNullable(jobScriptRef.get());
}
}
Expand Up @@ -23,8 +23,6 @@

import javax.annotation.Nullable;
import java.io.File;
import java.util.List;
import java.util.Map;

/**
* Singleton to manage the subprocess for the actual user job this Agent instance is managing.
Expand All @@ -39,9 +37,7 @@ public interface JobProcessManager extends ApplicationListener<KillService.KillE
* Launch the job process (unless launch was aborted by previous a {@code kill} call).
*
* @param jobDirectory Job directory
* @param environmentVariables additional environment variables (to merge on top of inherited environment)
* @param commandArguments command-line executable and its fixed arguments arguments
* @param jobArguments job-specific arguments
* @param jobScript job script (a.k.a. run file)
* @param interactive launch in interactive mode (inherit I/O) or batch (no input, write outputs to files)
* @param timeout The optional number of seconds this job is allowed to run before the system will
* kill it
Expand All @@ -50,9 +46,7 @@ public interface JobProcessManager extends ApplicationListener<KillService.KillE
*/
void launchProcess(
File jobDirectory,
Map<String, String> environmentVariables,
List<String> commandArguments,
List<String> jobArguments,
File jobScript,
boolean interactive,
@Nullable Integer timeout,
boolean launchInJobDirectory
Expand Down
Expand Up @@ -17,18 +17,15 @@
*/
package com.netflix.genie.agent.execution.process.impl;

import com.google.common.collect.Lists;
import com.netflix.genie.agent.cli.UserConsole;
import com.netflix.genie.agent.execution.exceptions.JobLaunchException;
import com.netflix.genie.agent.execution.process.JobProcessManager;
import com.netflix.genie.agent.execution.process.JobProcessResult;
import com.netflix.genie.agent.execution.services.KillService;
import com.netflix.genie.agent.utils.EnvUtils;
import com.netflix.genie.agent.utils.PathUtils;
import com.netflix.genie.common.dto.JobStatusMessages;
import com.netflix.genie.common.external.dtos.v4.JobStatus;
import lombok.extern.slf4j.Slf4j;
import org.apache.commons.lang3.StringUtils;
import org.springframework.scheduling.TaskScheduler;

import javax.annotation.Nullable;
Expand All @@ -37,11 +34,6 @@
import java.lang.reflect.Field;
import java.nio.file.Paths;
import java.time.Instant;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collections;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ScheduledFuture;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicReference;
Expand Down Expand Up @@ -79,9 +71,7 @@ public JobProcessManagerImpl(final TaskScheduler taskScheduler) {
@Override
public void launchProcess(
final File jobDirectory,
final Map<String, String> environmentVariablesMap,
final List<String> commandArguments,
final List<String> jobArguments,
final File jobScript,
final boolean interactive,
@Nullable final Integer timeout,
final boolean launchInJobDirectory
Expand All @@ -103,66 +93,21 @@ public void launchProcess(
throw new JobLaunchException("Job directory is not writable: " + jobDirectory);
}

final Map<String, String> currentEnvironmentVariables = processBuilder.environment();

if (environmentVariablesMap == null) {
throw new JobLaunchException("Job environment variables map is null");
}

// Merge job environment variables into process inherited environment
environmentVariablesMap.forEach((key, value) -> {
final String replacedValue = currentEnvironmentVariables.put(key, value);
if (StringUtils.isBlank(replacedValue)) {
log.debug(
"Added job environment variable: {}={}",
key,
value
);
} else if (!replacedValue.equals(value)) {
log.debug(
"Set job environment variable: {}={} (previous value: {})",
key,
value,
replacedValue
);
}
});

// Validate arguments
if (commandArguments == null) {
throw new JobLaunchException("Job command-line arguments is null");
} else if (commandArguments.isEmpty()) {
throw new JobLaunchException("Job command-line arguments are empty");
if (jobScript == null) {
throw new JobLaunchException("Job script is null");
} else if (!jobScript.exists() || !jobScript.isFile()) {
throw new JobLaunchException("Job script is not a valid file");
} else if (!jobScript.canExecute()) {
throw new JobLaunchException("Job script is not executable");
}

final List<String> commandLine = Lists.newArrayList(commandArguments);
commandLine.addAll(jobArguments);

log.info(
"Job command-line: {} arguments: {} (working directory: {})",
Arrays.toString(commandArguments.toArray()),
Arrays.toString(jobArguments.toArray()),
"Executing job script: {} (working directory: {})",
jobScript.getAbsolutePath(),
launchInJobDirectory ? jobDirectory : Paths.get("").toAbsolutePath().normalize().toString()
);

// Configure arguments
final List<String> expandedCommandArguments;
try {
expandedCommandArguments = expandCommandLineVariables(
commandArguments,
Collections.unmodifiableMap(currentEnvironmentVariables)
);
} catch (final EnvUtils.VariableSubstitutionException e) {
throw new JobLaunchException("Command executable and arguments variables could not be expanded", e);
}

final List<String> expandedCommandLine = Lists.newArrayList();
expandedCommandLine.addAll(expandedCommandArguments);
expandedCommandLine.addAll(jobArguments);

log.info("Command-line after expansion: {}", expandedCommandLine);

processBuilder.command(expandedCommandLine);
processBuilder.command(jobScript.getAbsolutePath());

if (launchInJobDirectory) {
processBuilder.directory(jobDirectory);
Expand Down Expand Up @@ -292,21 +237,6 @@ public void onApplicationEvent(final KillService.KillEvent event) {
this.kill(source);
}

private List<String> expandCommandLineVariables(
final List<String> commandLine,
final Map<String, String> environmentVariables
) throws EnvUtils.VariableSubstitutionException {
final ArrayList<String> expandedCommandLine = new ArrayList<>(commandLine.size());

for (final String argument : commandLine) {
expandedCommandLine.add(
EnvUtils.expandShellVariables(argument, environmentVariables)
);
}

return Collections.unmodifiableList(expandedCommandLine);
}

/* TODO: HACK, Process does not expose PID in Java 8 API */
private long getPid(final Process process) {
long pid = -1;
Expand Down

0 comments on commit 2904d18

Please sign in to comment.