Skip to content

Commit

Permalink
added execute-as-user feature. refer to documentation on usage details
Browse files Browse the repository at this point in the history
  • Loading branch information
John Yu committed Oct 9, 2015
1 parent a796975 commit 84dde5b
Show file tree
Hide file tree
Showing 19 changed files with 187 additions and 133 deletions.
Expand Up @@ -101,7 +101,7 @@ public class ExecutorManager extends EventHandler implements
private ExecutingManagerUpdaterThread executingManager;
// 12 weeks
private static final long DEFAULT_EXECUTION_LOGS_RETENTION_MS = 3 * 4 * 7
* 24 * 60 * 60 * 1000L;
* 24 * 60 * 60 * 1000l;
private long lastCleanerThreadCheckTime = -1;

private long lastThreadCheckTime = -1;
Expand Down Expand Up @@ -1982,4 +1982,4 @@ private void handleNoExecutorSelectedCase(ExecutionReference reference,
queuedFlows.enqueue(exflow, reference);
}
}
}
}
Expand Up @@ -104,7 +104,7 @@ private synchronized void uploadExecutableFlow(Connection connection,
runner.query(connection, LastInsertID.LAST_INSERT_ID,
new LastInsertID());

if (id == -1L) {
if (id == -1l) {
throw new ExecutorManagerException(
"Execution id is not properly created.");
}
Expand Down Expand Up @@ -1061,7 +1061,7 @@ private static class LastInsertID implements ResultSetHandler<Long> {
@Override
public Long handle(ResultSet rs) throws SQLException {
if (!rs.next()) {
return -1L;
return -1l;
}
long id = rs.getLong(1);
return id;
Expand Down
67 changes: 47 additions & 20 deletions azkaban-common/src/main/java/azkaban/jobExecutor/ProcessJob.java
Expand Up @@ -48,9 +48,11 @@ public class ProcessJob extends AbstractProcessJob {
"memCheck.freeMemDecrAmt";

public static final String AZKABAN_MEMORY_CHECK = "azkaban.memory.check";

public static final String NATIVE_LIB_FOLDER = "azkaban.native.lib";
public static final String EXECUTE_AS_USER = "execute.as.user";
public static final String EXECUTE_AS_USER_OVERRIDE =
"execute.as.user.override";
public static final String USER_TO_PROXY = "user.to.proxy";
public static final String KRB5CCNAME = "KRB5CCNAME";

Expand Down Expand Up @@ -86,7 +88,7 @@ public void run() throws Exception {
memPair.getFirst(), memPair.getSecond(), getId()));
}
}

List<String> commands = null;
try {
commands = getCommandList();
Expand All @@ -102,29 +104,45 @@ public void run() throws Exception {

info(commands.size() + " commands to execute.");
File[] propFiles = initPropsFiles();

// change krb5ccname env var so that each job execution gets its own cache
Map<String, String> envVars = getEnvironmentVariables();
envVars.put(KRB5CCNAME, getKrb5ccname(jobProps));

// determine whether users should be running their jobs as proxyUser/submit user or
// if everybody will run as Azkaban
String executeAsUserBinary = null;
boolean isExecuteAsUser = sysProps.getBoolean(EXECUTE_AS_USER, false);
if(isExecuteAsUser){
String nativeLibFolder = sysProps.getString(NATIVE_LIB_FOLDER);
executeAsUserBinary = String.format("%s/%s", nativeLibFolder, "execute-as-user");

// determine whether to run as Azkaban or run as effectiveUser
String executeAsUserBinary = null;
String effectiveUser = null;
boolean isExecuteAsUser = determineExecuteAsUser(sysProps, jobProps);

if (isExecuteAsUser) {
String nativeLibFolder = sysProps.getString(NATIVE_LIB_FOLDER);
executeAsUserBinary =
String.format("%s/%s", nativeLibFolder, "execute-as-user");
effectiveUser = getEffectiveUser(jobProps);
if ("root".equals(effectiveUser)) {
throw new RuntimeException(
"Not permitted to proxy as root through Azkaban");
}
}

for (String command : commands) {
if(isExecuteAsUser){
command = String.format("%s %s %s", executeAsUserBinary, getEffectiveUser(jobProps), command);
}

info("Command: " + command);
AzkabanProcessBuilder builder =
new AzkabanProcessBuilder(partitionCommandLine(command))
.setEnv(envVars).setWorkingDir(getCwd()).setLogger(getLog());
AzkabanProcessBuilder builder = null;
if (isExecuteAsUser) {
command =
String.format("%s %s %s", executeAsUserBinary, effectiveUser,
command);
info("Command: " + command);
builder =
new AzkabanProcessBuilder(partitionCommandLine(command))
.setEnv(envVars).setWorkingDir(getCwd()).setLogger(getLog())
.setExecuteAsUser().setExecuteAsUserBinary(executeAsUserBinary)
.setEffectiveUser(effectiveUser);
} else {
info("Command: " + command);
builder =
new AzkabanProcessBuilder(partitionCommandLine(command))
.setEnv(envVars).setWorkingDir(getCwd()).setLogger(getLog());
}

if (builder.getEnv().size() > 0) {
info("Environment variables: " + builder.getEnv());
Expand Down Expand Up @@ -157,6 +175,15 @@ public void run() throws Exception {
generateProperties(propFiles[1]);
}

private boolean determineExecuteAsUser(Props sysProps, Props jobProps) {
boolean isExecuteAsUser = sysProps.getBoolean(EXECUTE_AS_USER, false);
// putting an override in case user needs to override. A temporary opening
if (jobProps.containsKey(EXECUTE_AS_USER_OVERRIDE))
isExecuteAsUser = jobProps.getBoolean(EXECUTE_AS_USER_OVERRIDE, false);

return isExecuteAsUser;
}

/**
* <pre>
* This method extracts the kerberos ticket cache file name from the jobprops.
Expand Down
Expand Up @@ -50,6 +50,10 @@ public class AzkabanProcess {
private volatile int processId;
private volatile Process process;

private boolean isExecuteAsUser = false;
private String executeAsUserBinary = null;
private String effectiveUser = null;

public AzkabanProcess(final List<String> cmd, final Map<String, String> env,
final String workingDir, final Logger logger) {
this.cmd = cmd;
Expand All @@ -61,6 +65,15 @@ public AzkabanProcess(final List<String> cmd, final Map<String, String> env,
this.logger = logger;
}

public AzkabanProcess(List<String> cmd, Map<String, String> env,
String workingDir, Logger logger, String executeAsUserBinary,
String effectiveUser) {
this(cmd, env, workingDir, logger);
this.isExecuteAsUser = true;
this.executeAsUserBinary = executeAsUserBinary;
this.effectiveUser = effectiveUser;
}

/**
* Execute this process, blocking until it has completed.
*/
Expand Down Expand Up @@ -101,16 +114,15 @@ public void run() throws IOException {
}

completeLatch.countDown();

// try to wait for everything to get logged out before exiting
outputGobbler.awaitCompletion(5000);
errorGobbler.awaitCompletion(5000);

if (exitCode != 0) {
throw new ProcessFailureException(exitCode, errorGobbler.getRecentLog());
}


} finally {
IOUtils.closeQuietly(process.getInputStream());
IOUtils.closeQuietly(process.getOutputStream());
Expand Down Expand Up @@ -158,7 +170,14 @@ public boolean softKill(final long time, final TimeUnit unit)
checkStarted();
if (processId != 0 && isStarted()) {
try {
Runtime.getRuntime().exec("kill " + processId);
if (isExecuteAsUser) {
String cmd =
String.format("%s %s kill %d", executeAsUserBinary,
effectiveUser, processId);
Runtime.getRuntime().exec(cmd);
} else {
Runtime.getRuntime().exec("kill " + processId);
}
return completeLatch.await(time, unit);
} catch (IOException e) {
logger.error("Kill attempt failed.", e);
Expand All @@ -176,7 +195,14 @@ public void hardKill() {
if (isRunning()) {
if (processId != 0) {
try {
Runtime.getRuntime().exec("kill -9 " + processId);
if (isExecuteAsUser) {
String cmd =
String.format("%s %s kill -9 %d", executeAsUserBinary,
effectiveUser, processId);
Runtime.getRuntime().exec(cmd);
} else {
Runtime.getRuntime().exec("kill -9 " + processId);
}
} catch (IOException e) {
logger.error("Kill attempt failed.", e);
}
Expand Down Expand Up @@ -237,4 +263,12 @@ public String toString() {
return "Process(cmd = " + Joiner.on(" ").join(cmd) + ", env = " + env
+ ", cwd = " + workingDir + ")";
}

public boolean isExecuteAsUser() {
return isExecuteAsUser;
}

public String getEffectiveUser() {
return effectiveUser;
}
}
Expand Up @@ -35,6 +35,9 @@ public class AzkabanProcessBuilder {
private Map<String, String> env = new HashMap<String, String>();
private String workingDir = System.getProperty("user.dir");
private Logger logger = Logger.getLogger(AzkabanProcess.class);
private boolean isExecuteAsUser = false;
private String executeAsUserBinary = null;
private String effectiveUser = null;

private int stdErrSnippetSize = 30;
private int stdOutSnippetSize = 30;
Expand Down Expand Up @@ -100,7 +103,11 @@ public AzkabanProcessBuilder setLogger(Logger logger) {
}

public AzkabanProcess build() {
return new AzkabanProcess(cmd, env, workingDir, logger);
if (isExecuteAsUser)
return new AzkabanProcess(cmd, env, workingDir, logger,
executeAsUserBinary, effectiveUser);
else
return new AzkabanProcess(cmd, env, workingDir, logger);
}

public List<String> getCommand() {
Expand All @@ -116,4 +123,19 @@ public String toString() {
return "ProcessBuilder(cmd = " + Joiner.on(" ").join(cmd) + ", env = "
+ env + ", cwd = " + workingDir + ")";
}

public AzkabanProcessBuilder setExecuteAsUser() {
this.isExecuteAsUser = true;
return this;
}

public AzkabanProcessBuilder setExecuteAsUserBinary(String executeAsUserBinary) {
this.executeAsUserBinary = executeAsUserBinary;
return this;
}

public AzkabanProcessBuilder setEffectiveUser(String effectiveUser) {
this.effectiveUser = effectiveUser;
return this;
}
}
Expand Up @@ -269,7 +269,7 @@ public Project createProject(String projectName, String description,
"Project names must start with a letter, followed by any number of letters, digits, '-' or '_'.");
}

if (projectsByName.containsKey(projectName)) {
if (projectsByName.contains(projectName)) {
throw new ProjectManagerException("Project already exists.");
}

Expand Down
Expand Up @@ -58,7 +58,7 @@ public SessionCache(Props props) {
* @return
*/
public Session getSession(String sessionId) {
Session elem = cache.get(Session.class, sessionId);
Session elem = cache.<Session> get(sessionId);

return elem;
}
Expand Down
Expand Up @@ -178,7 +178,7 @@ private synchronized void addTrigger(Connection connection, Trigger t,
runner.query(connection, LastInsertID.LAST_INSERT_ID,
new LastInsertID());

if (id == -1L) {
if (id == -1l) {
logger.error("trigger id is not properly created.");
throw new TriggerLoaderException("trigger id is not properly created.");
}
Expand Down Expand Up @@ -257,7 +257,7 @@ private static class LastInsertID implements ResultSetHandler<Long> {
@Override
public Long handle(ResultSet rs) throws SQLException {
if (!rs.next()) {
return -1L;
return -1l;
}

long id = rs.getLong(1);
Expand Down
Expand Up @@ -71,7 +71,7 @@ public Integer getInt(K key, Integer defaultVal) {
}

public Long getLong(K key) {
return getLong(key, -1L);
return getLong(key, -1l);
}

public Long getLong(K key, Long defaultVal) {
Expand Down
7 changes: 4 additions & 3 deletions azkaban-common/src/main/java/azkaban/utils/cache/Cache.java
Expand Up @@ -38,16 +38,17 @@ public enum EjectionPolicy {
LRU, FIFO
}

/* package */ Cache(CacheManager manager) {
/* package */Cache(CacheManager manager) {
this.manager = manager;
}

public <T> T get(Class<T> clazz, Object key) {
@SuppressWarnings("unchecked")
public <T> T get(Object key) {
Element<?> element = elementMap.get(key);
if (element == null) {
return null;
}
return clazz.cast(element.getElement());
return (T) element.getElement();
}

public <T> void put(Object key, T item) {
Expand Down
Expand Up @@ -196,7 +196,7 @@ private void outputGeneratedProperties(Props outputProperties) {
}
writer.write("}".getBytes());
} catch (Exception e) {
throw new RuntimeException("Unable to store output properties to: "
new RuntimeException("Unable to store output properties to: "
+ outputFileStr);
} finally {
try {
Expand Down
Expand Up @@ -24,7 +24,6 @@
import org.junit.AfterClass;
import org.junit.Assert;
import org.junit.BeforeClass;
import org.junit.Ignore;
import org.junit.Test;

import azkaban.utils.Props;
Expand Down Expand Up @@ -83,7 +82,6 @@ public static void cleanup() {
Utils.removeFile(scriptFile);
}

@Ignore("Test appears to hang.")
@Test
public void testPythonJob() {

Expand Down
4 changes: 2 additions & 2 deletions azkaban-common/src/test/java/azkaban/project/ProjectTest.java
Expand Up @@ -28,8 +28,8 @@ public class ProjectTest {
@Test
public void testToAndFromObject() throws Exception {
Project project = new Project(1, "tesTing");
project.setCreateTimestamp(1L);
project.setLastModifiedTimestamp(2L);
project.setCreateTimestamp(1l);
project.setLastModifiedTimestamp(2l);
project.setDescription("I am a test");
project.setUserPermission("user1", new Permission(new Type[] { Type.ADMIN,
Type.EXECUTE }));
Expand Down

0 comments on commit 84dde5b

Please sign in to comment.