Skip to content

Commit

Permalink
OOZIE-3586 Oozie spark actions using --keytab fail due to duplicate d…
Browse files Browse the repository at this point in the history
…ist. cache (jmakai via asalamon74)
  • Loading branch information
asalamon74 committed Mar 11, 2020
1 parent 417305c commit 56870ca
Show file tree
Hide file tree
Showing 5 changed files with 184 additions and 13 deletions.
1 change: 1 addition & 0 deletions release-log.txt
@@ -1,5 +1,6 @@
-- Oozie 5.3.0 release (trunk - unreleased)

OOZIE-3586 Oozie spark actions using --keytab fail due to duplicate dist. cache (jmakai via asalamon74)
OOZIE-3592 Do not print misleading SecurityException for successful jobs (matijhs via asalamon74)
OOZIE-3584 Fork-join action issue when action param cannot be resolved (jmakai via asalamon74)
OOZIE-3589 Avoid calling copyActionData method multiple times in ReRunXCommand (zuston via asalamon74)
Expand Down
Expand Up @@ -69,6 +69,7 @@ class SparkArgsExtractor {
private static final String JOB_NAME_OPTION = "--name";
private static final String CLASS_NAME_OPTION = "--class";
private static final String VERBOSE_OPTION = "--verbose";
private static final String KEYTAB_OPTION = "--keytab";
private static final String DRIVER_CLASSPATH_OPTION = "--driver-class-path";
private static final String EXECUTOR_CLASSPATH = "spark.executor.extraClassPath=";
private static final String DRIVER_CLASSPATH = "spark.driver.extraClassPath=";
Expand All @@ -92,6 +93,10 @@ class SparkArgsExtractor {
static final String SPARK_DEFAULTS_GENERATED_PROPERTIES = "spark-defaults-oozie-generated.properties";

private boolean pySpark = false;
boolean isKeytabPresentInSparkArgs = false;
boolean isKeytabsFullPathPresentInSparkArgs = false;
String keytabSymlinkNameInSparkArgs;
String keytabFileNameInSparkArgs;
private final Configuration actionConf;

SparkArgsExtractor(final Configuration actionConf) {
Expand Down Expand Up @@ -253,6 +258,16 @@ List<String> extract(final String[] mainArgs) throws OozieActionConfiguratorExce
userArchives.append(userArchive);
addToSparkArgs = false;
}
if (opt.startsWith(KEYTAB_OPTION)) {
isKeytabPresentInSparkArgs = true;
Path keytabValueInSparkArgs = new Path(sparkOptions.get(i + 1));
if (keytabValueInSparkArgs.isAbsolute()) {
isKeytabsFullPathPresentInSparkArgs = true;
keytabFileNameInSparkArgs = keytabValueInSparkArgs.getName();
} else {
keytabSymlinkNameInSparkArgs = keytabValueInSparkArgs.toString();
}
}
if (addToSparkArgs) {
sparkArgs.add(opt);
}
Expand Down Expand Up @@ -326,8 +341,16 @@ else if (sparkArgs.get(sparkArgs.size() - 1).equals(CONF_OPTION)) {
mergeAndAddPropertiesFile(sparkArgs, propertiesFile);

if ((yarnClusterMode || yarnClientMode)) {
final Map<String, URI> fixedFileUrisMap =
SparkMain.fixFsDefaultUrisAndFilterDuplicates(DistributedCache.getCacheFiles(actionConf));
final Map<String, URI> fixedFileUrisMap;
if (isKeytabPresentInSparkArgs) {
fixedFileUrisMap =
SparkMain.fixFsDefaultUrisAndFilterDuplicates
(DistributedCache.getCacheFiles(actionConf), geKeytabNotToAdd());
} else {
fixedFileUrisMap =
SparkMain.fixFsDefaultUrisAndFilterDuplicates
(DistributedCache.getCacheFiles(actionConf));
}
fixedFileUrisMap.put(SparkMain.SPARK_LOG4J_PROPS, new Path(SparkMain.SPARK_LOG4J_PROPS).toUri());
fixedFileUrisMap.put(SparkMain.HIVE_SITE_CONF, new Path(SparkMain.HIVE_SITE_CONF).toUri());
addUserDefined(userFiles.toString(), fixedFileUrisMap);
Expand Down Expand Up @@ -554,4 +577,17 @@ private void setSparkYarnJarsConf(final List<String> sparkArgs, final String spa
sparkArgs.add(SPARK_YARN_JARS + OPT_SEPARATOR + sparkYarnJar);
}
}

/**
* Gets the keytab string which is either the name of the keytab when full path is given or the symlink if not.
*/
private String geKeytabNotToAdd(){
String keytabNotToAdd;
if (isKeytabsFullPathPresentInSparkArgs) {
keytabNotToAdd = keytabFileNameInSparkArgs;
} else {
keytabNotToAdd = keytabSymlinkNameInSparkArgs;
}
return keytabNotToAdd;
}
}
Expand Up @@ -26,6 +26,7 @@
import java.util.LinkedHashMap;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.regex.Pattern;

import org.apache.commons.io.FileUtils;
Expand Down Expand Up @@ -219,23 +220,50 @@ private String setUpSparkLog4J(final Configuration actionConf) throws IOExceptio
}

/**
* Convert URIs into the default format which Spark expects
* Also filters out duplicate entries
* @param files
* Convert URIs into the default format which Spark expects.
* Filters out duplicate entries.
* @param uris
* @return
* @throws IOException
* @throws URISyntaxException
*/
static Map<String, URI> fixFsDefaultUrisAndFilterDuplicates(final URI[] files) throws IOException, URISyntaxException {
final Map<String, URI> map= new LinkedHashMap<>();
if (files == null) {
static Map<String, URI> fixFsDefaultUrisAndFilterDuplicates(final URI[] uris)
throws IOException, URISyntaxException {
final Map<String, URI> map = new LinkedHashMap<>();
if (uris == null) {
return map;
}
final FileSystem fs = FileSystem.get(new Configuration(true));
for (int i = 0; i < files.length; i++) {
final URI fileUri = files[i];
final Path p = new Path(fileUri);
map.put(p.getName(), HadoopUriFinder.getFixedUri(fs, fileUri));
for (URI uri : uris) {
final Path p = new Path(uri);
map.put(p.getName(), HadoopUriFinder.getFixedUri(fs, uri));
}
return map;
}

/**
* Convert URIs into the default format which Spark expects.
* Filters out duplicate entries and checks if the file was added earlier with --keytab option.
* @param uris
* @param elementNotToAdd
* @return
* @throws IOException
* @throws URISyntaxException
*/
static Map<String, URI> fixFsDefaultUrisAndFilterDuplicates(final URI[] uris, String elementNotToAdd)
throws IOException, URISyntaxException {
final Map<String, URI> map = new LinkedHashMap<>();
if (uris == null) {
return map;
}
final FileSystem fs = FileSystem.get(new Configuration(true));
for (URI uri : uris) {
final Path p = new Path(uri);
URI fullFileUri = HadoopUriFinder.getFixedUri(fs, uri);
String symlinkInFile = fullFileUri.getFragment();
if (!Objects.equals(symlinkInFile, elementNotToAdd) && !p.getName().equals(elementNotToAdd)) {
map.put(p.getName(), fullFileUri);
}
}
return map;
}
Expand Down
Expand Up @@ -492,6 +492,52 @@ private Configuration createSparkActionConfWithCustomSparkOpts(final String spar
return actionConf;
}

@Test
public void testKeytabDuplicateWithFileName()
throws OozieActionConfiguratorException, IOException, URISyntaxException {
final Configuration actionConf = new Configuration();

actionConf.set(SparkActionExecutor.SPARK_MASTER, "yarn");
actionConf.set(SparkActionExecutor.SPARK_MODE, "client");
actionConf.set(SparkActionExecutor.SPARK_CLASS, "org.apache.oozie.example.SparkFileCopy");
actionConf.set(SparkActionExecutor.SPARK_JOB_NAME, "Spark Copy File");
actionConf.set(SparkActionExecutor.SPARK_DEFAULT_OPTS, "defaultProperty=1\ndefaultProperty2=2\ndefaultProperty3=3");
actionConf.set(SparkActionExecutor.SPARK_OPTS,
"--principal foobar --keytab /foo/bar.keytab");
actionConf.set(SparkActionExecutor.SPARK_JAR, "/lib/test.jar");

final String[] mainArgs = {"arg0", "arg1"};
SparkArgsExtractor sparkArgsExtractor = new SparkArgsExtractor(actionConf);
sparkArgsExtractor.extract(mainArgs);
String expectedFileName = "bar.keytab";
assertEquals("Error happened while setting keytab presence.", true, sparkArgsExtractor.isKeytabPresentInSparkArgs);
assertEquals("Error happened while deciding if keytab full path given or not.",
true, sparkArgsExtractor.isKeytabsFullPathPresentInSparkArgs);
assertEquals("File name wrongly set.", expectedFileName, sparkArgsExtractor.keytabFileNameInSparkArgs);
}

@Test
public void testKeytabDuplicateWithSymlink() throws OozieActionConfiguratorException, IOException, URISyntaxException {
final Configuration actionConf = new Configuration();

actionConf.set(SparkActionExecutor.SPARK_MASTER, "yarn");
actionConf.set(SparkActionExecutor.SPARK_MODE, "client");
actionConf.set(SparkActionExecutor.SPARK_CLASS, "org.apache.oozie.example.SparkFileCopy");
actionConf.set(SparkActionExecutor.SPARK_JOB_NAME, "Spark Copy File");
actionConf.set(SparkActionExecutor.SPARK_DEFAULT_OPTS, "defaultProperty=1\ndefaultProperty2=2\ndefaultProperty3=3");
actionConf.set(SparkActionExecutor.SPARK_OPTS,
"--principal foobar --keytab foo");
actionConf.set(SparkActionExecutor.SPARK_JAR, "/lib/test.jar");
final String[] mainArgs = {"arg0", "arg1"};
SparkArgsExtractor sparkArgsExtractor = new SparkArgsExtractor(actionConf);
sparkArgsExtractor.extract(mainArgs);
String expectedSymlink = "foo";
assertEquals("Error happened while setting keytab presence.", true, sparkArgsExtractor.isKeytabPresentInSparkArgs);
assertEquals("Error happened while deciding if keytab full path given or not.",
false, sparkArgsExtractor.isKeytabsFullPathPresentInSparkArgs);
assertEquals("Symlink wrongly set.", expectedSymlink, sparkArgsExtractor.keytabSymlinkNameInSparkArgs);
}

private void assertContainsSublist(final List<String> expected, final List<String> actual) {
final int sublistSize = expected.size();
assertTrue("actual size is below expected size", actual.size() >= sublistSize);
Expand Down
Expand Up @@ -25,13 +25,16 @@
import java.io.OutputStream;
import java.io.OutputStreamWriter;
import java.io.Writer;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
import java.util.LinkedHashSet;
import java.util.List;
import java.util.Set;
import java.util.Map;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.regex.Pattern;

import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.oozie.util.IOUtils;
Expand Down Expand Up @@ -148,4 +151,61 @@ protected List<File> getFilesToDelete() {
filesToDelete.add(new File(SparkMain.HIVE_SITE_CONF));
return filesToDelete;
}

public void testFixFsDefaultUrisAndFilterDuplicates() throws URISyntaxException, IOException {
URI[] uris = new URI[2];
URI uri1 = new URI("/foo/bar.keytab#foo.bar");
URI uri2 = new URI("/foo/bar.keytab#bar.foo");

uris[0] = uri1;
uris[1] = uri2;

Map<String, URI> result1 = SparkMain.fixFsDefaultUrisAndFilterDuplicates(uris);
assertEquals("Duplication elimination was not successful. " +
"Reason: Keytab added twice, but the result map contained more or less.",
1, result1.size());
}

public void testFixFsDefaultUrisAndFilterDuplicatesNoDuplication() throws URISyntaxException, IOException {
URI[] uris = new URI[2];
URI uri1 = new URI("/bar/foo.keytab#foo.bar");
URI uri2 = new URI("/foo/bar.keytab#bar.foo");

uris[0] = uri1;
uris[1] = uri2;

Map<String, URI> result = SparkMain.fixFsDefaultUrisAndFilterDuplicates(uris);
assertEquals("Duplication elimination was not successful. " +
"Reason: Two different keytabs were added, but the result map contained more or less.",
2, result.size());
}

public void testFixFsDefaultUrisAndFilterDuplicatesWithKeytabSymNotToAdd() throws URISyntaxException, IOException {
URI[] uris = new URI[2];
URI uri1 = new URI("/bar/foo.keytab#foo.bar");
URI uri2 = new URI("/foo/bar.keytab#bar.foo");

uris[0] = uri1;
uris[1] = uri2;

Map<String, URI> result = SparkMain.fixFsDefaultUrisAndFilterDuplicates(uris, "foo.bar");
assertEquals("Duplication elimination was not successful. " +
"Reason: foo.bar exists in the URI array, but the deletion did not happen.",
1, result.size());
}

public void testFixFsDefaultUrisAndFilterDuplicatesWithKeytabNameNotToAdd() throws URISyntaxException, IOException {
URI[] uris = new URI[2];

URI uri1 = new URI("/bar/foo.keytab#foo.bar");
URI uri2 = new URI("/foo/bar.keytab#bar.foo");

uris[0] = uri1;
uris[1] = uri2;

Map<String, URI> result = SparkMain.fixFsDefaultUrisAndFilterDuplicates(uris, "foo.keytab");
assertEquals("Duplication elimination was not successful. " +
"Reason: foo.keytab exists in the URI array, but the deletion did not happen.",
1, result.size());
}
}

0 comments on commit 56870ca

Please sign in to comment.