From 91effc9792ac24278971de266d6d6abd44da6e6e Mon Sep 17 00:00:00 2001 From: Sanjay Pujare Date: Sun, 11 Jun 2017 22:14:33 -0700 Subject: [PATCH] APEXCORE-733 implement the new logic using apex.dfsRootDirectory and impersonated user flag --- .../com/datatorrent/stram/StramClient.java | 2 +- .../datatorrent/stram/client/StramAgent.java | 2 +- .../stram/client/StramAppLauncher.java | 4 +- .../stram/client/StramClientUtils.java | 103 +++++-- .../stram/security/StramUserLogin.java | 1 + .../stram/client/StramAppLauncherTest.java | 37 +++ .../stram/client/StramClientUtilsTest.java | 270 ++++++++++++++++++ 7 files changed, 397 insertions(+), 22 deletions(-) diff --git a/engine/src/main/java/com/datatorrent/stram/StramClient.java b/engine/src/main/java/com/datatorrent/stram/StramClient.java index 96f9daaca3..da613dbb11 100644 --- a/engine/src/main/java/com/datatorrent/stram/StramClient.java +++ b/engine/src/main/java/com/datatorrent/stram/StramClient.java @@ -446,7 +446,7 @@ public void startApplication() throws YarnException, IOException // copy required jar files to dfs, to be localized for containers try (FileSystem fs = StramClientUtils.newFileSystemInstance(conf)) { - Path appsBasePath = new Path(StramClientUtils.getDTDFSRootDir(fs, conf), StramClientUtils.SUBDIR_APPS); + Path appsBasePath = new Path(StramClientUtils.getApexDFSRootDir(fs, conf), StramClientUtils.SUBDIR_APPS); Path appPath; String configuredAppPath = dag.getValue(LogicalPlan.APPLICATION_PATH); if (configuredAppPath == null) { diff --git a/engine/src/main/java/com/datatorrent/stram/client/StramAgent.java b/engine/src/main/java/com/datatorrent/stram/client/StramAgent.java index 669a445b52..a1ac8ca42a 100644 --- a/engine/src/main/java/com/datatorrent/stram/client/StramAgent.java +++ b/engine/src/main/java/com/datatorrent/stram/client/StramAgent.java @@ -312,7 +312,7 @@ public JSONObject issueStramWebGetRequest(WebServicesClient webServiceClient, St public String getAppsRoot() { - return (defaultStramRoot == null) ? (StramClientUtils.getDTDFSRootDir(fileSystem, conf) + "/" + StramClientUtils.SUBDIR_APPS) : defaultStramRoot; + return (defaultStramRoot == null) ? (StramClientUtils.getApexDFSRootDir(fileSystem, conf) + "/" + StramClientUtils.SUBDIR_APPS) : defaultStramRoot; } public String getAppPath(String appId) diff --git a/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java b/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java index 97420de52b..d4f0170f3c 100644 --- a/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java +++ b/engine/src/main/java/com/datatorrent/stram/client/StramAppLauncher.java @@ -273,7 +273,7 @@ private void init() throws Exception if (originalAppId == null) { throw new AssertionError("Need original app id if launching without apa or appjar"); } - Path appsBasePath = new Path(StramClientUtils.getDTDFSRootDir(fs, conf), StramClientUtils.SUBDIR_APPS); + Path appsBasePath = new Path(StramClientUtils.getApexDFSRootDir(fs, conf), StramClientUtils.SUBDIR_APPS); Path origAppPath = new Path(appsBasePath, originalAppId); StringWriter writer = new StringWriter(); try (FSDataInputStream in = fs.open(new Path(origAppPath, "meta.json"))) { @@ -550,7 +550,7 @@ private void setTokenRefreshCredentials(LogicalPlan dag, Configuration conf) thr if (keytab != null) { Path localKeyTabPath = new Path(keytab); try (FileSystem fs = StramClientUtils.newFileSystemInstance(conf)) { - Path destPath = new Path(StramClientUtils.getDTDFSRootDir(fs, conf), localKeyTabPath.getName()); + Path destPath = new Path(StramClientUtils.getApexDFSRootDir(fs, conf), localKeyTabPath.getName()); if (!fs.exists(destPath)) { fs.copyFromLocalFile(false, false, localKeyTabPath, destPath); } diff --git a/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java b/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java index d9032e5b0c..d8caa1edf6 100644 --- a/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java +++ b/engine/src/main/java/com/datatorrent/stram/client/StramClientUtils.java @@ -103,6 +103,7 @@ public class StramClientUtils { public static final String DT_VERSION = StreamingApplication.DT_PREFIX + "version"; public static final String DT_DFS_ROOT_DIR = StreamingApplication.DT_PREFIX + "dfsRootDirectory"; + public static final String APEX_APP_DFS_ROOT_DIR = StreamingApplication.APEX_PREFIX + "app.dfsRootDirectory"; public static final String DT_DFS_USER_NAME = "%USER_NAME%"; public static final String DT_CONFIG_STATUS = StreamingApplication.DT_PREFIX + "configStatus"; public static final String SUBDIR_APPS = "apps"; @@ -519,28 +520,94 @@ public static FileSystem newFileSystemInstance(Configuration conf) throws IOExce } } - public static Path getDTDFSRootDir(FileSystem fs, Configuration conf) + /** + * Helper function used by both getApexDFSRootDir and getDTDFSRootDir to process dfsRootDir + * + * @param fs FileSystem object for HDFS file system + * @param conf Configuration object + * @param dfsRootDir value of dt.dfsRootDir or apex.app.dfsRootDir + * @param userShortName current user short name (either login user or current user depending on impersonation settings) + * @param prependHomeDir prepend user's home dir if dfsRootDir is relative path + + * @return + */ + private static Path evalDFSRootDir(FileSystem fs, Configuration conf, String dfsRootDir, String userShortName, + boolean prependHomeDir) { - String dfsRootDir = conf.get(DT_DFS_ROOT_DIR); - if (StringUtils.isBlank(dfsRootDir)) { - return new Path(fs.getHomeDirectory(), "datatorrent"); - } else { + try { + if (userShortName != null && dfsRootDir.contains(DT_DFS_USER_NAME)) { + dfsRootDir = dfsRootDir.replace(DT_DFS_USER_NAME, userShortName); + conf.set(DT_DFS_ROOT_DIR, dfsRootDir); + } + URI uri = new URI(dfsRootDir); + if (uri.isAbsolute()) { + return new Path(uri); + } + if (userShortName != null && prependHomeDir && dfsRootDir.startsWith("/") == false) { + dfsRootDir = "/user/" + userShortName + "/" + dfsRootDir; + } + } catch (URISyntaxException ex) { + LOG.warn("{} is not a valid URI. Using the default filesystem to construct the path", dfsRootDir, ex); + } + return new Path(fs.getUri().getScheme(), fs.getUri().getAuthority(), dfsRootDir); + } + + private static String getDefaultRootFolder() + { + return "datatorrent"; + } + + /** + * This gets the DFS Root dir to be used at runtime by Apex applications as per the following logic: + * Value of apex.app.dfsRootDirectory is referred to as Apex-root-dir below. + * A "user" refers to either impersonating or impersonated user: + * If apex.application.path.impersonated is true then use impersonated user else impersonating user. + * + * + * + * @param fs FileSystem object for HDFS file system + * @param conf Configuration object + * @return + * @throws IOException + */ + public static Path getApexDFSRootDir(FileSystem fs, Configuration conf) + { + String apexDfsRootDir = conf.get(APEX_APP_DFS_ROOT_DIR); + boolean useImpersonated = conf.getBoolean(StramUserLogin.DT_APP_PATH_IMPERSONATED, false); + String userShortName = null; + if (useImpersonated) { try { - if (dfsRootDir.contains(DT_DFS_USER_NAME)) { - dfsRootDir = dfsRootDir.replace(DT_DFS_USER_NAME, UserGroupInformation.getLoginUser().getShortUserName()); - conf.set(DT_DFS_ROOT_DIR, dfsRootDir); - } - URI uri = new URI(dfsRootDir); - if (uri.isAbsolute()) { - return new Path(uri); - } + userShortName = UserGroupInformation.getCurrentUser().getShortUserName(); } catch (IOException ex) { - LOG.warn("Error getting user login name {}", dfsRootDir, ex); - } catch (URISyntaxException ex) { - LOG.warn("{} is not a valid URI. Using the default filesystem to construct the path", dfsRootDir, ex); + LOG.warn("Error getting current/login user name {}", apexDfsRootDir, ex); } - return new Path(fs.getUri().getScheme(), fs.getUri().getAuthority(), dfsRootDir); } + if (!useImpersonated || userShortName == null) { + return getDTDFSRootDir(fs, conf); + } + if (StringUtils.isBlank(apexDfsRootDir)) { + apexDfsRootDir = getDefaultRootFolder(); + } + return evalDFSRootDir(fs, conf, apexDfsRootDir, userShortName, true); + } + + public static Path getDTDFSRootDir(FileSystem fs, Configuration conf) + { + String dfsRootDir = conf.get(DT_DFS_ROOT_DIR); + if (StringUtils.isBlank(dfsRootDir)) { + return new Path(fs.getHomeDirectory(), getDefaultRootFolder()); + } + String userShortName = null; + try { + userShortName = UserGroupInformation.getLoginUser().getShortUserName(); + } catch (IOException ex) { + LOG.warn("Error getting user login name {}", dfsRootDir, ex); + } + return evalDFSRootDir(fs, conf, dfsRootDir, userShortName, false); } public static Path getDTDFSConfigDir(FileSystem fs, Configuration conf) @@ -828,7 +895,7 @@ public static List cleanAppDirectories(YarnClient clientRMSer List result = new ArrayList<>(); List applications = clientRMService.getApplications(Sets.newHashSet(StramClient.YARN_APPLICATION_TYPE, StramClient.YARN_APPLICATION_TYPE_DEPRECATED), EnumSet.of(YarnApplicationState.FAILED, YarnApplicationState.FINISHED, YarnApplicationState.KILLED)); - Path appsBasePath = new Path(StramClientUtils.getDTDFSRootDir(fs, conf), StramClientUtils.SUBDIR_APPS); + Path appsBasePath = new Path(StramClientUtils.getApexDFSRootDir(fs, conf), StramClientUtils.SUBDIR_APPS); for (ApplicationReport ar : applications) { long finishTime = ar.getFinishTime(); if (finishTime < finishedBefore) { diff --git a/engine/src/main/java/com/datatorrent/stram/security/StramUserLogin.java b/engine/src/main/java/com/datatorrent/stram/security/StramUserLogin.java index 0c1d0c93b9..83aa78181d 100644 --- a/engine/src/main/java/com/datatorrent/stram/security/StramUserLogin.java +++ b/engine/src/main/java/com/datatorrent/stram/security/StramUserLogin.java @@ -51,6 +51,7 @@ public class StramUserLogin public static final String DT_AUTH_PREFIX = StreamingApplication.DT_PREFIX + "authentication."; public static final String DT_AUTH_PRINCIPAL = DT_AUTH_PREFIX + "principal"; public static final String DT_AUTH_KEYTAB = DT_AUTH_PREFIX + "keytab"; + public static final String DT_APP_PATH_IMPERSONATED = DT_AUTH_PREFIX + "impersonation.path.enable"; private static String principal; private static String keytab; diff --git a/engine/src/test/java/com/datatorrent/stram/client/StramAppLauncherTest.java b/engine/src/test/java/com/datatorrent/stram/client/StramAppLauncherTest.java index 5d4c84b769..2069bab553 100644 --- a/engine/src/test/java/com/datatorrent/stram/client/StramAppLauncherTest.java +++ b/engine/src/test/java/com/datatorrent/stram/client/StramAppLauncherTest.java @@ -60,6 +60,7 @@ public static class RefreshTokenTests File workspace; File sourceKeytab; File dfsDir; + File apexDfsDir; static final String principal = "username/group@domain"; @@ -82,6 +83,7 @@ protected void starting(Description description) throw new RuntimeException(e); } dfsDir = new File(workspace, "dst"); + apexDfsDir = new File(workspace, "adst"); suppress(method(StramAppLauncher.class, "init")); } @@ -138,6 +140,41 @@ private void testDFSTokenPath(Configuration conf) throws Exception new File(dfsDir, sourceKeytab.getName()).getAbsolutePath()).toString(), dag.getValue(LogicalPlan.KEY_TAB_FILE)); } + @Test + public void testUserLoginTokenRefreshKeytabWithApexDFS() throws Exception + { + Configuration conf = new Configuration(false); + /* + spy(StramUserLogin.class); + when(StramUserLogin.getPrincipal()).thenReturn(principal); + when(StramUserLogin.getKeytab()).thenReturn(sourceKeytab.getPath()); + */ + StramUserLogin.authenticate(principal, sourceKeytab.getPath()); + testDFSTokenPathWithApexDFS(conf); + } + + @Test + public void testAuthPropTokenRefreshKeytabWithApexDFS() throws Exception + { + Configuration conf = new Configuration(false); + conf.set(StramUserLogin.DT_AUTH_PRINCIPAL, principal); + conf.set(StramUserLogin.DT_AUTH_KEYTAB, sourceKeytab.getPath()); + StramUserLogin.authenticate(conf); + testDFSTokenPathWithApexDFS(conf); + } + + private void testDFSTokenPathWithApexDFS(Configuration conf) throws Exception + { + FileSystem fs = FileSystem.newInstance(conf); + conf.set(StramClientUtils.DT_DFS_ROOT_DIR, dfsDir.getAbsolutePath()); + conf.set(StramClientUtils.APEX_APP_DFS_ROOT_DIR, apexDfsDir.getAbsolutePath()); + conf.setBoolean(StramUserLogin.DT_APP_PATH_IMPERSONATED, true); // needs to be true for APEX_APP_DFS_ROOT_DIR to be honored + LogicalPlan dag = applyTokenRefreshKeytab(fs, conf); + Assert.assertEquals("Token refresh principal", principal, dag.getValue(LogicalPlan.PRINCIPAL)); + Assert.assertEquals("Token refresh keytab path", new Path(fs.getUri().getScheme(), fs.getUri().getAuthority(), + new File(apexDfsDir, sourceKeytab.getName()).getAbsolutePath()).toString(), dag.getValue(LogicalPlan.KEY_TAB_FILE)); + } + private LogicalPlan applyTokenRefreshKeytab(FileSystem fs, Configuration conf) throws Exception { LogicalPlan dag = new LogicalPlan(); diff --git a/engine/src/test/java/com/datatorrent/stram/client/StramClientUtilsTest.java b/engine/src/test/java/com/datatorrent/stram/client/StramClientUtilsTest.java index 55b41d0930..ed61efb03a 100644 --- a/engine/src/test/java/com/datatorrent/stram/client/StramClientUtilsTest.java +++ b/engine/src/test/java/com/datatorrent/stram/client/StramClientUtilsTest.java @@ -18,9 +18,11 @@ */ package com.datatorrent.stram.client; +import java.io.IOException; import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.UnknownHostException; +import java.security.PrivilegedExceptionAction; import java.util.List; import java.util.Properties; @@ -29,8 +31,12 @@ import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.CommonConfigurationKeysPublic; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.security.UserGroupInformation; import org.apache.hadoop.yarn.conf.YarnConfiguration; +import com.datatorrent.stram.security.StramUserLogin; import com.datatorrent.stram.util.ConfigUtils; @@ -154,4 +160,268 @@ public InetSocketAddress getSocketAddr(String name, String defaultAddress, int d Assert.assertEquals(getHostString("192.168.1.2") + ":8032", StramClientUtils.getSocketConnectString(addresses.get(1))); } + /** + * apex.dfsRootDirectory not set: legacy behavior of getDTDFSRootDir() + * @throws IOException + * + */ + @Test + public void getApexDFSRootDirLegacy() throws IOException + { + Configuration conf = new YarnConfiguration(new Configuration(false)); + conf.set(StramClientUtils.DT_DFS_ROOT_DIR, "/a/b/c"); + conf.setBoolean(StramUserLogin.DT_APP_PATH_IMPERSONATED, false); + + FileSystem fs = FileSystem.newInstance(conf); + Path path = StramClientUtils.getApexDFSRootDir(fs, conf); + Assert.assertEquals("file:/a/b/c", path.toString()); + } + + /** + * apex.dfsRootDirectory set: absolute path e.g. /x/y/z + * @throws IOException + * + */ + @Test + public void getApexDFSRootDirAbsPath() throws IOException + { + Configuration conf = new YarnConfiguration(new Configuration(false)); + conf.set(StramClientUtils.APEX_APP_DFS_ROOT_DIR, "/x/y/z"); + conf.setBoolean(StramUserLogin.DT_APP_PATH_IMPERSONATED, false); + + FileSystem fs = FileSystem.newInstance(conf); + UserGroupInformation testUser = UserGroupInformation.createUserForTesting("testUser1", new String[]{""}); + UserGroupInformation.setLoginUser(testUser); + Path path = StramClientUtils.getApexDFSRootDir(fs, conf); + Assert.assertEquals(fs.getHomeDirectory() + "/datatorrent", path.toString()); + } + + /** + * apex.dfsRootDirectory set: absolute path with scheme e.g. file:/p/q/r + * @throws IOException + * + */ + @Test + public void getApexDFSRootDirScheme() throws IOException + { + Configuration conf = new YarnConfiguration(new Configuration(false)); + conf.set(StramClientUtils.APEX_APP_DFS_ROOT_DIR, "file:/p/q/r"); + conf.setBoolean(StramUserLogin.DT_APP_PATH_IMPERSONATED, false); + + FileSystem fs = FileSystem.newInstance(conf); + UserGroupInformation testUser = UserGroupInformation.createUserForTesting("testUser1", new String[]{""}); + UserGroupInformation.setLoginUser(testUser); + Path path = StramClientUtils.getApexDFSRootDir(fs, conf); + Assert.assertEquals(fs.getHomeDirectory() + "/datatorrent", path.toString()); + } + + /** + * apex.dfsRootDirectory set: absolute path with variable %USER_NAME% + * @throws IOException + * @throws InterruptedException + * + */ + @Test + public void getApexDFSRootDirWithVar() throws IOException, InterruptedException + { + final Configuration conf = new YarnConfiguration(new Configuration(false)); + conf.set(StramClientUtils.APEX_APP_DFS_ROOT_DIR, "/x/%USER_NAME%/z"); + conf.setBoolean(StramUserLogin.DT_APP_PATH_IMPERSONATED, false); + + final FileSystem fs = FileSystem.newInstance(conf); + UserGroupInformation testUser = UserGroupInformation.createUserForTesting("testUser1", new String[]{""}); + UserGroupInformation.setLoginUser(testUser); + UserGroupInformation doAsUser = UserGroupInformation.createUserForTesting("impersonated", new String[]{""}); + + doAsUser.doAs(new PrivilegedExceptionAction() + { + @Override + public Void run() throws Exception + { + Path path = StramClientUtils.getApexDFSRootDir(fs, conf); + Assert.assertEquals(fs.getHomeDirectory() + "/datatorrent", path.toString()); + return null; + } + }); + } + + /** + * apex.dfsRootDirectory set: absolute path with %USER_NAME% and scheme e.g. file:/x/%USER_NAME%/z + * @throws IOException + * @throws InterruptedException + * + */ + @Test + public void getApexDFSRootDirWithSchemeAndVar() throws IOException, InterruptedException + { + final Configuration conf = new YarnConfiguration(new Configuration(false)); + conf.set(StramClientUtils.APEX_APP_DFS_ROOT_DIR, "file:/x/%USER_NAME%/z"); + conf.setBoolean(StramUserLogin.DT_APP_PATH_IMPERSONATED, true); + + final FileSystem fs = FileSystem.newInstance(conf); + UserGroupInformation testUser = UserGroupInformation.createUserForTesting("testUser1", new String[]{""}); + UserGroupInformation.setLoginUser(testUser); + UserGroupInformation doAsUser = UserGroupInformation.createUserForTesting("impersonated", new String[]{""}); + + doAsUser.doAs(new PrivilegedExceptionAction() + { + @Override + public Void run() throws Exception + { + Path path = StramClientUtils.getApexDFSRootDir(fs, conf); + Assert.assertEquals("file:/x/impersonated/z", path.toString()); + return null; + } + }); + } + + /** + * apex.dfsRootDirectory set: relative path + * @throws IOException + * @throws InterruptedException + * + */ + @Test + public void getApexDFSRootDirRelPath() throws IOException, InterruptedException + { + final Configuration conf = new YarnConfiguration(new Configuration(false)); + conf.set(StramClientUtils.APEX_APP_DFS_ROOT_DIR, "apex"); + conf.setBoolean(StramUserLogin.DT_APP_PATH_IMPERSONATED, false); + + final FileSystem fs = FileSystem.newInstance(conf); + UserGroupInformation testUser = UserGroupInformation.createUserForTesting("testUser1", new String[]{""}); + UserGroupInformation.setLoginUser(testUser); + UserGroupInformation doAsUser = UserGroupInformation.createUserForTesting("impersonated", new String[]{""}); + + doAsUser.doAs(new PrivilegedExceptionAction() + { + @Override + public Void run() throws Exception + { + Path path = StramClientUtils.getApexDFSRootDir(fs, conf); + Assert.assertEquals(fs.getHomeDirectory() + "/datatorrent", path.toString()); + return null; + } + }); + } + + /** + * apex.dfsRootDirectory set: absolute path with %USER_NAME% and impersonation enabled + * @throws IOException + * @throws InterruptedException + * + */ + @Test + public void getApexDFSRootDirAbsPathAndVar() throws IOException, InterruptedException + { + final Configuration conf = new YarnConfiguration(new Configuration(false)); + conf.set(StramClientUtils.APEX_APP_DFS_ROOT_DIR, "/x/%USER_NAME%/z"); + conf.setBoolean(StramUserLogin.DT_APP_PATH_IMPERSONATED, true); + + final FileSystem fs = FileSystem.newInstance(conf); + UserGroupInformation testUser = UserGroupInformation.createUserForTesting("testUser1", new String[]{""}); + UserGroupInformation.setLoginUser(testUser); + UserGroupInformation doAsUser = UserGroupInformation.createUserForTesting("impersonated", new String[]{""}); + + doAsUser.doAs(new PrivilegedExceptionAction() + { + @Override + public Void run() throws Exception + { + Path path = StramClientUtils.getApexDFSRootDir(fs, conf); + Assert.assertEquals("file:/x/impersonated/z", path.toString()); + return null; + } + }); + } + + /** + * apex.dfsRootDirectory set: relative path and impersonation enabled and doAS + * @throws IOException + * @throws InterruptedException + * + */ + @Test + public void getApexDFSRootDirRelPathAndImpersonation() throws IOException, InterruptedException + { + final Configuration conf = new YarnConfiguration(new Configuration(false)); + conf.set(StramClientUtils.APEX_APP_DFS_ROOT_DIR, "apex"); + conf.setBoolean(StramUserLogin.DT_APP_PATH_IMPERSONATED, true); + + final FileSystem fs = FileSystem.newInstance(conf); + UserGroupInformation testUser = UserGroupInformation.createUserForTesting("testUser1", new String[]{""}); + UserGroupInformation.setLoginUser(testUser); + UserGroupInformation doAsUser = UserGroupInformation.createUserForTesting("testUser2", new String[]{""}); + + doAsUser.doAs(new PrivilegedExceptionAction() + { + @Override + public Void run() throws Exception + { + Path path = StramClientUtils.getApexDFSRootDir(fs, conf); + Assert.assertEquals("file:/user/testUser2/apex", path.toString()); + return null; + } + }); + } + + /** + * apex.dfsRootDirectory set: relative path blank and impersonation enabled and doAS + * @throws IOException + * @throws InterruptedException + * + */ + @Test + public void getApexDFSRootDirBlankPathAndImpersonation() throws IOException, InterruptedException + { + final Configuration conf = new YarnConfiguration(new Configuration(false)); + conf.setBoolean(StramUserLogin.DT_APP_PATH_IMPERSONATED, true); + + final FileSystem fs = FileSystem.newInstance(conf); + UserGroupInformation testUser = UserGroupInformation.createUserForTesting("testUser1", new String[]{""}); + UserGroupInformation.setLoginUser(testUser); + UserGroupInformation doAsUser = UserGroupInformation.createUserForTesting("testUser2", new String[]{""}); + + doAsUser.doAs(new PrivilegedExceptionAction() + { + @Override + public Void run() throws Exception + { + Path path = StramClientUtils.getApexDFSRootDir(fs, conf); + Assert.assertEquals("file:/user/testUser2/datatorrent", path.toString()); + return null; + } + }); + } + + /** + * apex.dfsRootDirectory set: relative path having %USER_NAME% and impersonation enabled and doAS + * Make sure currentUser appears twice + * @throws IOException + * @throws InterruptedException + * + */ + @Test + public void getApexDFSRootDirRelPathVarAndImpersonation() throws IOException, InterruptedException + { + final Configuration conf = new YarnConfiguration(new Configuration(false)); + conf.set(StramClientUtils.APEX_APP_DFS_ROOT_DIR, "apex/%USER_NAME%/xyz"); + conf.setBoolean(StramUserLogin.DT_APP_PATH_IMPERSONATED, true); + + final FileSystem fs = FileSystem.newInstance(conf); + UserGroupInformation testUser = UserGroupInformation.createUserForTesting("testUser1", new String[]{""}); + UserGroupInformation.setLoginUser(testUser); + UserGroupInformation doAsUser = UserGroupInformation.createUserForTesting("testUser2", new String[]{""}); + + doAsUser.doAs(new PrivilegedExceptionAction() + { + @Override + public Void run() throws Exception + { + Path path = StramClientUtils.getApexDFSRootDir(fs, conf); + Assert.assertEquals("file:/user/testUser2/apex/testUser2/xyz", path.toString()); + return null; + } + }); + } }