Skip to content

Commit

Permalink
[BUGFIX] Backward Imcompatible to some JobTypes (#2228)
Browse files Browse the repository at this point in the history
  • Loading branch information
kxu1026 authored and edwinalu committed May 2, 2019
1 parent 2e37113 commit 1c61d83
Show file tree
Hide file tree
Showing 15 changed files with 165 additions and 37 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,9 +26,11 @@
*/
public abstract class AbstractHadoopJavaProcessJob extends JavaProcessJob implements IHadoopJob {

private final HadoopProxy hadoopProxy;

public AbstractHadoopJavaProcessJob(String jobid, Props sysProps, Props jobProps, Logger logger) {
super(jobid, sysProps, jobProps, logger);
hadoopProxy.init(sysProps, jobProps, logger);
this.hadoopProxy = new HadoopProxy(sysProps, jobProps, logger);
}

@Override
Expand Down Expand Up @@ -66,4 +68,9 @@ public Props appendExtraProps(Props props) {
HadoopJobUtils.addAdditionalNamenodesToPropsFromMRJob(props, getLog());
return props;
}

@Override
public HadoopProxy getHadoopProxy() {
return hadoopProxy;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ public HadoopHiveJob(String jobid, Props sysProps, Props jobProps, Logger log) {
public void run() throws Exception {
setupHadoopJobProperties();
HadoopConfigurationInjector.prepareResourcesToInject(getJobProps(), getWorkingDirectory());
hadoopProxy.setupPropsForProxy(getAllProps(), getJobProps(), getLog());
getHadoopProxy().setupPropsForProxy(getAllProps(), getJobProps(), getLog());
super.run();
}

Expand Down Expand Up @@ -174,6 +174,6 @@ public void cancel() throws InterruptedException {

info("Cancel called. Killing the Hive launched MR jobs on the cluster");

hadoopProxy.killAllSpawnedHadoopJobs(getJobProps(), getLog());
getHadoopProxy().killAllSpawnedHadoopJobs(getJobProps(), getLog());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -115,7 +115,7 @@ protected List<String> getClassPaths() {
public void run() throws Exception {
setupHadoopJobProperties();
HadoopConfigurationInjector.prepareResourcesToInject(getJobProps(), getWorkingDirectory());
hadoopProxy.setupPropsForProxy(getAllProps(), getJobProps(), getLog());
getHadoopProxy().setupPropsForProxy(getAllProps(), getJobProps(), getLog());
super.run();
}

Expand All @@ -142,6 +142,6 @@ public void cancel() throws InterruptedException {

info("Cancel called. Killing the launched MR jobs on the cluster");

hadoopProxy.killAllSpawnedHadoopJobs(getJobProps(), getLog());
getHadoopProxy().killAllSpawnedHadoopJobs(getJobProps(), getLog());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -69,7 +69,7 @@ public HadoopPigJob(String jobid, Props sysProps, Props jobProps, Logger log) {
public void run() throws Exception {
setupHadoopJobProperties();
HadoopConfigurationInjector.prepareResourcesToInject(getJobProps(), getWorkingDirectory());
hadoopProxy.setupPropsForProxy(getAllProps(), getJobProps(), getLog());
getHadoopProxy().setupPropsForProxy(getAllProps(), getJobProps(), getLog());
super.run();
}

Expand Down Expand Up @@ -262,6 +262,6 @@ public void cancel() throws InterruptedException {

info("Cancel called. Killing the Pig launched MR jobs on the cluster");

hadoopProxy.killAllSpawnedHadoopJobs(getJobProps(), getLog());
getHadoopProxy().killAllSpawnedHadoopJobs(getJobProps(), getLog());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -36,7 +36,8 @@ public class HadoopProxy {
private String userToProxy = null;
private File tokenFile = null;

public HadoopProxy() {
public HadoopProxy(Props sysProps, Props jobProps, final Logger logger) {
init(sysProps, jobProps, logger);
}

public boolean isProxyEnabled() {
Expand All @@ -50,7 +51,7 @@ public boolean isProxyEnabled() {
* @param jobProps job properties
* @param logger logger handler
*/
public void init(Props sysProps, Props jobProps, final Logger logger) {
private void init(Props sysProps, Props jobProps, final Logger logger) {
shouldProxy = sysProps.getBoolean(HadoopSecurityManager.ENABLE_PROXYING, false);
jobProps.put(HadoopSecurityManager.ENABLE_PROXYING, Boolean.toString(shouldProxy));
obtainTokens = sysProps.getBoolean(HadoopSecurityManager.OBTAIN_BINARY_TOKEN, false);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -34,10 +34,12 @@ public class HadoopShell extends ProcessJob implements IHadoopJob {
public static final String WHITELIST_REGEX = "command.whitelist.regex";
public static final String BLACKLIST_REGEX = "command.blacklist.regex";

private final HadoopProxy hadoopProxy;

public HadoopShell(String jobid, Props sysProps, Props jobProps, Logger logger)
throws RuntimeException {
super(jobid, sysProps, jobProps, logger);
hadoopProxy.init(sysProps, jobProps, logger);
this.hadoopProxy = new HadoopProxy(sysProps, jobProps, logger);
}

@Override
Expand All @@ -57,6 +59,11 @@ public void run() throws Exception {
}
}

@Override
public HadoopProxy getHadoopProxy() {
return this.hadoopProxy;
}

/**
* Append HADOOP_GLOBAL_OPTS with HADOOP_OPTS in the given props
**/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@ public Props appendExtraProps(Props props) {
@Override
public void run() throws Exception {
HadoopConfigurationInjector.prepareResourcesToInject(getJobProps(), getWorkingDirectory());
hadoopProxy.setupPropsForProxy(getAllProps(), getJobProps(), getLog());
getHadoopProxy().setupPropsForProxy(getAllProps(), getJobProps(), getLog());
setupHadoopJobProperties();
super.run();
}
Expand Down Expand Up @@ -592,6 +592,6 @@ public void cancel() throws InterruptedException {

info("Cancel called. Killing the launched Spark jobs on the cluster");

hadoopProxy.killAllSpawnedHadoopJobs(getJobProps(), getLog());
getHadoopProxy().killAllSpawnedHadoopJobs(getJobProps(), getLog());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
public interface IHadoopJob {

/**
* Hadoop Proxy Wrapper Object
* Get HadoopProxy Instance
*/
HadoopProxy hadoopProxy = new HadoopProxy();
HadoopProxy getHadoopProxy();

/**
* Setup Hadoop-related Job Properties
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,10 +45,13 @@ public abstract class AbstractProcessJob extends AbstractJob {
private static final String SENSITIVE_JOB_PROP_VALUE_PLACEHOLDER = "[MASKED]";
private static final String JOB_DUMP_PROPERTIES_IN_LOG = "job.dump.properties";

private final String jobPath;
protected String cwd;
private volatile Props jobProps;
private volatile Props sysProps;
//Notes: These variables will be accessed directly throw inherited classes,
// which are out of this package. Please remain them to be protected
protected final String _jobPath;
protected String _cwd;
protected volatile Props jobProps;
protected volatile Props sysProps;

private volatile Props generatedProperties;

protected AbstractProcessJob(final String jobId, final Props sysProps, final Props jobProps,
Expand All @@ -57,11 +60,16 @@ protected AbstractProcessJob(final String jobId, final Props sysProps, final Pro

this.jobProps = jobProps;
this.sysProps = sysProps;
this.cwd = getWorkingDirectory();
this.jobPath = this.cwd;
this._cwd = getWorkingDirectory();
this._jobPath = this._cwd;
}

private File createOutputPropsFile(final String id, final String workingDir) {
/**
* This public function will be deprecated since it tends to be a Utility Function
* Please use azkaban.utils.FileIOUtils.createOutputPropsFile(String, String, String) instead.
*/
@Deprecated
public File createOutputPropsFile(final String id, final String workingDir) {
this.info("cwd=" + workingDir);

try {
Expand All @@ -82,6 +90,22 @@ public Props getSysProps() {
return this.sysProps;
}

/**
* Re-configure Job Props
* @param props new props
*/
public void setJobProps(Props props) {
this.jobProps = props;
}

/**
* Re-configure System Props
* @param props props
*/
public void setSysProps(Props props) {
this.sysProps = props;
}

public Props getAllProps() {
Props props = new Props();
props.putAll(jobProps);
Expand All @@ -94,7 +118,7 @@ public Props appendExtraProps(Props props) {
}

public String getJobPath() {
return this.jobPath;
return this._jobPath;
}

protected void resolveProps() {
Expand Down Expand Up @@ -139,18 +163,18 @@ public Props getJobGeneratedProperties() {
public File[] initPropsFiles() {
// Create properties file with additionally all input generated properties.
final File[] files = new File[2];
files[0] = createFlattenedPropsFile(this.cwd);
files[0] = createFlattenedPropsFile(this._cwd);

this.jobProps.put(ENV_PREFIX + JOB_PROP_ENV, files[0].getAbsolutePath());
this.jobProps.put(ENV_PREFIX + JOB_NAME_ENV, getId());

files[1] = createOutputPropsFile(getId(), this.cwd);
files[1] = createOutputPropsFile(getId(), this._cwd);
this.jobProps.put(ENV_PREFIX + JOB_OUTPUT_PROP_FILE, files[1].getAbsolutePath());
return files;
}

public String getCwd() {
return this.cwd;
return this._cwd;
}

/**
Expand All @@ -171,11 +195,16 @@ public Map<String, String> getEnvironmentVariables() {
* @return working directory property
*/
public String getWorkingDirectory() {
final String workingDir = getJobProps().getString(WORKING_DIR, this.jobPath);
final String workingDir = getJobProps().getString(WORKING_DIR, this._jobPath);
return Utils.ifNull(workingDir, "");
}

private Props loadOutputFileProps(final File outputPropertiesFile) {
/**
* This public function will be deprecated since it tends to be a Utility function
* Please use azkaban.utils.FileIOUtils.loadOutputFileProps(String file) instead.
*/
@Deprecated
public Props loadOutputFileProps(final File outputPropertiesFile) {
InputStream reader = null;
try {
this.info("output properties file=" + outputPropertiesFile.getAbsolutePath());
Expand Down Expand Up @@ -208,7 +237,12 @@ private Props loadOutputFileProps(final File outputPropertiesFile) {
}
}

private File createFlattenedPropsFile(final String workingDir) {
/**
* This public function will be deprecated since it tends to be a Utility function
* Please use azkaban.utils.FileIOUtils.createOutputPropsFile(String, String, String) instead.
*/
@Deprecated
public File createFlattenedPropsFile(final String workingDir) {
try {
final File directory = new File(workingDir);
// The temp file prefix must be at least 3 characters.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,10 @@
*/
public interface IMBeanRegistrable {

MBeanRegistrationManager mbeanRegistrationManager = new MBeanRegistrationManager();
/**
* Get MBeanRegistrationManager Instance
*/
MBeanRegistrationManager getMBeanRegistrationManager();

/**
* Function to configure MBean Server
Expand Down
61 changes: 60 additions & 1 deletion azkaban-common/src/main/java/azkaban/utils/FileIOUtils.java
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import java.io.File;
import java.io.FileFilter;
import java.io.FileInputStream;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
Expand All @@ -39,6 +40,7 @@
import java.util.Map;
import java.util.Set;
import java.util.StringTokenizer;
import org.apache.commons.fileupload.util.Streams;
import org.apache.commons.io.FileUtils;
import org.apache.commons.io.IOUtils;
import org.slf4j.Logger;
Expand Down Expand Up @@ -124,7 +126,8 @@ public static long readNumberFromFile(final Path filePath)
}

public static String getSourcePathFromClass(final Class<?> containedClass) {
final String containedClassPath = containedClass.getProtectionDomain().getCodeSource().getLocation().getPath();
final String containedClassPath = containedClass.getProtectionDomain().getCodeSource()
.getLocation().getPath();

File file = new File(containedClassPath);

Expand All @@ -141,6 +144,62 @@ public static String getSourcePathFromClass(final Class<?> containedClass) {
}
}

/**
* Load output file into a Props object
*
* @param file output properties file
* @return Props object
*/
public static Props loadOutputFileProps(final File file) {
InputStream reader = null;
try {
log.info("output properties file=" + file.getAbsolutePath());
reader = new BufferedInputStream(new FileInputStream(file));
final Props outputProps = new Props();
final String content = Streams.asString(reader).trim();

if (!content.isEmpty()) {
final Map<String, Object> propMap =
(Map<String, Object>) JSONUtils.parseJSONFromString(content);

for (final Map.Entry<String, Object> entry : propMap.entrySet()) {
outputProps.put(entry.getKey(), entry.getValue().toString());
}
}
return outputProps;
} catch (final FileNotFoundException e) {
log.info(
String.format("File[%s] wasn't found, returning empty props.", file)
);
return new Props();
} catch (final Exception e) {
log.error(
"Exception thrown when trying to load output file props. Returning empty Props instead of failing. Is this really the best thing to do?",
e);
return new Props();
} finally {
IOUtils.closeQuietly(reader);
}
}

/**
* Create Temp File in a working directory
*
* @param prefix file prefix
* @param suffix file suffix
* @param workingDir working directory
* @return File handle
*/
public static File createOutputPropsFile(final String prefix,
final String suffix, final String workingDir) {
try {
final File directory = new File(workingDir);
final File tempFile = File.createTempFile(prefix, suffix, directory);
return tempFile;
} catch (final IOException e) {
throw new RuntimeException("Failed to create temp output property file ", e);
}
}

/**
* Hard link files and recurse into directories.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,7 @@
import azkaban.metrics.MetricsManager;
import azkaban.server.IMBeanRegistrable;
import azkaban.server.AzkabanServer;
import azkaban.server.MBeanRegistrationManager;
import azkaban.utils.FileIOUtils;
import azkaban.utils.Props;
import azkaban.utils.StdOutErrRedirect;
Expand Down Expand Up @@ -85,6 +86,7 @@ public class AzkabanExecutorServer implements IMBeanRegistrable {

private static AzkabanExecutorServer app;

private final MBeanRegistrationManager mbeanRegistrationManager = new MBeanRegistrationManager();
private final ExecutorLoader executionLoader;
private final FlowRunnerManager runnerManager;
private final MetricsManager metricsManager;
Expand Down Expand Up @@ -491,4 +493,9 @@ public void configureMBeanServer() {
.registerMBean("jobCallbackJMXMBean", jobCallbackMgr.getJmxJobCallbackMBean());
}
}

@Override
public MBeanRegistrationManager getMBeanRegistrationManager() {
return this.mbeanRegistrationManager;
}
}
Loading

0 comments on commit 1c61d83

Please sign in to comment.