Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

OOZIE-2816 Strip out the first command word from Sqoop action if its … #26

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -44,9 +44,10 @@ public class SqoopActionExecutor extends JavaActionExecutor {
public static final String OOZIE_ACTION_EXTERNAL_STATS_WRITE = "oozie.action.external.stats.write";
private static final String SQOOP_MAIN_CLASS_NAME = "org.apache.oozie.action.hadoop.SqoopMain";
static final String SQOOP_ARGS = "oozie.sqoop.args";
private static final String SQOOP = "sqoop";

public SqoopActionExecutor() {
super("sqoop");
super(SQOOP);
}

@Override
Expand Down Expand Up @@ -85,25 +86,35 @@ Configuration setupActionConf(Configuration actionConf, Context context, Element
throw convertException(ex);
}

String[] args;
final List<String> argList = new ArrayList<>();
// Build a list of arguments from either a tokenized <command> string or a list of <arg>
if (actionXml.getChild("command", ns) != null) {
String command = actionXml.getChild("command", ns).getTextTrim();
StringTokenizer st = new StringTokenizer(command, " ");
List<String> l = new ArrayList<String>();
while (st.hasMoreTokens()) {
l.add(st.nextToken());
argList.add(st.nextToken());
}
args = l.toArray(new String[l.size()]);
}
else {
List<Element> eArgs = (List<Element>) actionXml.getChildren("arg", ns);
args = new String[eArgs.size()];
for (int i = 0; i < eArgs.size(); i++) {
args[i] = eArgs.get(i).getTextTrim();
for (Element elem : eArgs) {
argList.add(elem.getTextTrim());
}
}
// If the command is given accidentally as "sqoop import --option"
// instead of "import --option" we can make a user's life easier
// by removing away the unnecessary "sqoop" token.
// However, we do not do this if the command looks like
// "sqoop --option", as that's entirely invalid.
if (argList.size() > 1 &&
argList.get(0).equalsIgnoreCase(SQOOP) &&
!argList.get(1).startsWith("-")) {
XLog.getLog(getClass()).info(
"Found a redundant 'sqoop' prefixing the command. Removing it.");
argList.remove(0);
}

setSqoopCommand(actionConf, args);
setSqoopCommand(actionConf, argList.toArray(new String[argList.size()]));
return actionConf;
}

Expand Down
Expand Up @@ -52,11 +52,11 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Properties;

public class TestSqoopActionExecutor extends ActionExecutorTestCase {

private static final String SQOOP_COMMAND = "import --connect {0} --table TT --target-dir {1} -m 1";
private static final String SQOOP_IMPORT_COMMAND =
"import --connect {0} --table TT --target-dir {1} -m 1";

private static final String SQOOP_ACTION_COMMAND_XML =
"<sqoop xmlns=\"uri:oozie:sqoop-action:0.1\">" +
Expand All @@ -81,18 +81,18 @@ public class TestSqoopActionExecutor extends ActionExecutorTestCase {
"<value>INFO</value>" +
"</property>" +
"</configuration>" +
"<arg>import</arg>" +
"{2}" +
"<arg>--connect</arg>" +
"<arg>{2}</arg>" +
"<arg>{3}</arg>" +
"<arg>--username</arg>" +
"<arg>sa</arg>" +
"<arg>--password</arg>" +
"<arg></arg>" +
"<arg>--verbose</arg>" +
"<arg>--query</arg>" +
"<arg>{3}</arg>" +
"<arg>--target-dir</arg>" +
"<arg>{4}</arg>" +
"<arg>--target-dir</arg>" +
"<arg>{5}</arg>" +
"<arg>--split-by</arg>" +
"<arg>I</arg>" +
"</sqoop>";
Expand Down Expand Up @@ -146,23 +146,45 @@ private String getSqoopOutputDir() {
}

private String getActionXml() {
String command = MessageFormat.format(SQOOP_COMMAND, getActionJdbcUri(), getSqoopOutputDir());
String command = MessageFormat.format(SQOOP_IMPORT_COMMAND, getActionJdbcUri(), getSqoopOutputDir());
return MessageFormat.format(SQOOP_ACTION_COMMAND_XML, getJobTrackerUri(), getNameNodeUri(),
"dummy", "dummyValue", command);
}

private String getRedundantCommandActionXml() {
String command = "sqoop " +
MessageFormat.format(SQOOP_IMPORT_COMMAND, getActionJdbcUri(), getSqoopOutputDir());
return MessageFormat.format(SQOOP_ACTION_COMMAND_XML, getJobTrackerUri(), getNameNodeUri(),
"dummy", "dummyValue", command);
}

private String getBadCommandActionXml() {
String command = MessageFormat.format(SQOOP_IMPORT_COMMAND,
getActionJdbcUri(), getSqoopOutputDir()).replace("import", "sqoop");
return MessageFormat.format(SQOOP_ACTION_COMMAND_XML, getJobTrackerUri(), getNameNodeUri(),
"dummy", "dummyValue", command);
}

private String getActionXmlEval() {
String query = "select TT.I, TT.S from TT";
return MessageFormat.format(SQOOP_ACTION_EVAL_XML, getJobTrackerUri(), getNameNodeUri(),
getActionJdbcUri(), query);
}

private String getActionXmlFreeFromQuery() {
private String getArgsActionXmlFreeFromQuery(boolean redundant) {
String query = "select TT.I, TT.S from TT where $CONDITIONS";
return MessageFormat.format(SQOOP_ACTION_ARGS_XML, getJobTrackerUri(), getNameNodeUri(),
(redundant ? "<arg>sqoop</arg>" : "") + "<arg>import</arg>",
getActionJdbcUri(), query, getSqoopOutputDir());
}

private String getBadArgsActionXml() {
String query = "select TT.I, TT.S from TT where $CONDITIONS";
return MessageFormat.format(SQOOP_ACTION_ARGS_XML, getJobTrackerUri(), getNameNodeUri(),
"<arg>sqoop</arg>",
getActionJdbcUri(), query, getSqoopOutputDir());
}

private void createDB() throws Exception {
Class.forName("org.hsqldb.jdbcDriver");
Connection conn = DriverManager.getConnection(getLocalJdbcUri(), "sa", "");
Expand All @@ -175,10 +197,57 @@ private void createDB() throws Exception {
conn.close();
}

/**
* Tests a bad command of 'sqoop --username ...' style.
* Test asserts that the job will fail.
*/
public void testSqoopActionWithBadCommand() throws Exception {
runSqoopActionWithBadCommand(getBadCommandActionXml());
}

private void runSqoopActionWithBadCommand(String actionXml) throws Exception {
createDB();

Context context = createContext(actionXml);
final RunningJob launcherJob = submitAction(context);
String launcherId = context.getAction().getExternalId();
waitFor(120 * 1000, new Predicate() {
public boolean evaluate() throws Exception {
return launcherJob.isComplete();
}
});
assertTrue(launcherJob.isSuccessful());
Map<String, String> actionData = LauncherMapperHelper.getActionData(getFileSystem(), context.getActionDir(),
context.getProtoActionConf());
assertFalse(LauncherMapperHelper.hasIdSwap(actionData));

SqoopActionExecutor ae = new SqoopActionExecutor();
ae.check(context, context.getAction());
assertTrue(launcherId.equals(context.getAction().getExternalId()));
assertEquals("FAILED/KILLED", context.getAction().getExternalStatus());
ae.end(context, context.getAction());
assertEquals(WorkflowAction.Status.ERROR, context.getAction().getStatus());
}

/**
* Tests a normal command of 'import --username ...'.
*/
public void testSqoopAction() throws Exception {
runSqoopAction(getActionXml());
}

/**
* Tests a redundant command of 'sqoop import --username ...'.
* The test guarantees a success, since the redundant 'sqoop' must get removed.
*/
public void testSqoopActionWithRedundantPrefix() throws Exception {
runSqoopAction(getRedundantCommandActionXml());
}

private void runSqoopAction(String actionXml) throws Exception {
createDB();

Context context = createContext(getActionXml());
Context context = createContext(actionXml);
final RunningJob launcherJob = submitAction(context);
String launcherId = context.getAction().getExternalId();
waitFor(120 * 1000, new Predicate() {
Expand Down Expand Up @@ -245,10 +314,33 @@ public boolean evaluate() throws Exception {
assertTrue(hadoopCounters.isEmpty());
}

public void testSqoopActionFreeFormQuery() throws Exception {
/**
* Runs a job with arg-style command of 'sqoop --username ...' form that's invalid.
* The test ensures it fails.
*/
public void testSqoopActionWithBadRedundantArgsAndFreeFormQuery() throws Exception {
runSqoopActionWithBadCommand(getBadArgsActionXml());
}

/**
* Runs a job with the arg-style command of 'sqoop import --username ...'.
* The test guarantees that the redundant 'sqoop' is auto-removed (job passes).
*/
public void testSqoopActionWithRedundantArgsAndFreeFormQuery() throws Exception {
runSqoopActionFreeFormQuery(getArgsActionXmlFreeFromQuery(true));
}

/**
* Runs a job with the normal arg-style command of 'import --username ...'.
*/
public void testSqoopActionWithArgsAndFreeFormQuery() throws Exception {
runSqoopActionFreeFormQuery(getArgsActionXmlFreeFromQuery(false));
}

private void runSqoopActionFreeFormQuery(String actionXml) throws Exception {
createDB();

Context context = createContext(getActionXmlFreeFromQuery());
Context context = createContext(actionXml);
final RunningJob launcherJob = submitAction(context);
String launcherId = context.getAction().getExternalId();
waitFor(120 * 1000, new Predicate() {
Expand Down