Skip to content
This repository has been archived by the owner on Jun 7, 2021. It is now read-only.

Commit

Permalink
APEXCORE-515 Providing principal for token refresh
Browse files Browse the repository at this point in the history
  • Loading branch information
pramodin committed Sep 1, 2016
1 parent d651edc commit dd5e95a
Show file tree
Hide file tree
Showing 6 changed files with 54 additions and 31 deletions.
Expand Up @@ -676,6 +676,7 @@ private void execute() throws YarnException, IOException
long tokenLifeTime = (long)(dag.getValue(LogicalPlan.TOKEN_REFRESH_ANTICIPATORY_FACTOR) * Math.min(dag.getValue(LogicalPlan.HDFS_TOKEN_LIFE_TIME), dag.getValue(LogicalPlan.RM_TOKEN_LIFE_TIME)));
long expiryTime = System.currentTimeMillis() + tokenLifeTime;
LOG.debug(" expiry token time {}", tokenLifeTime);
String principal = dag.getValue(LogicalPlan.PRINCIPAL);
String hdfsKeyTabFile = dag.getValue(LogicalPlan.KEY_TAB_FILE);

// Register self with ResourceManager
Expand Down Expand Up @@ -753,7 +754,7 @@ private void execute() throws YarnException, IOException

if (UserGroupInformation.isSecurityEnabled() && currentTimeMillis >= expiryTime && hdfsKeyTabFile != null) {
String applicationId = appAttemptID.getApplicationId().toString();
expiryTime = StramUserLogin.refreshTokens(tokenLifeTime, FileUtils.getTempDirectoryPath(), applicationId, conf, hdfsKeyTabFile, credentials, rmAddress, true);
expiryTime = StramUserLogin.refreshTokens(tokenLifeTime, FileUtils.getTempDirectoryPath(), applicationId, conf, principal, hdfsKeyTabFile, credentials, rmAddress, true);
}

if (currentTimeMillis > nodeReportUpdateTime) {
Expand Down
Expand Up @@ -560,14 +560,12 @@ public URLClassLoader loadDependencies()
return cl;
}

private void setTokenRefreshKeytab(LogicalPlan dag, Configuration conf) throws IOException
private void setTokenRefreshCredentials(LogicalPlan dag, Configuration conf) throws IOException
{
String keytabPath;
if ((keytabPath = conf.get(StramClientUtils.KEY_TAB_FILE)) == null) {
String keytab;
if ((keytab = StramUserLogin.getKeytab()) == null) {
keytab = conf.get(StramUserLogin.DT_AUTH_KEYTAB);
}
String principal = StramUserLogin.getPrincipal();
String keytabPath = conf.get(StramClientUtils.KEY_TAB_FILE);
if (keytabPath == null) {
String keytab = StramUserLogin.getKeytab();
if (keytab != null) {
Path localKeyTabPath = new Path(keytab);
try (FileSystem fs = StramClientUtils.newFileSystemInstance(conf)) {
Expand All @@ -579,10 +577,11 @@ private void setTokenRefreshKeytab(LogicalPlan dag, Configuration conf) throws I
}
}
}
if (keytabPath != null) {
if ((principal != null) && (keytabPath != null)) {
dag.setAttribute(LogicalPlan.PRINCIPAL, principal);
dag.setAttribute(LogicalPlan.KEY_TAB_FILE, keytabPath);
} else {
LOG.warn("No keytab specified for refreshing tokens, application may not be able to run indefinitely");
LOG.warn("Credentials for refreshing tokens not available, application may not be able to run indefinitely");
}
}

Expand All @@ -600,13 +599,12 @@ public ApplicationId launchApp(AppFactory appConfig) throws Exception
Configuration conf = propertiesBuilder.conf;
conf.setEnum(StreamingApplication.ENVIRONMENT, StreamingApplication.Environment.CLUSTER);
LogicalPlan dag = appConfig.createApp(propertiesBuilder);
long hdfsTokenMaxLifeTime = conf.getLong(StramClientUtils.DT_HDFS_TOKEN_MAX_LIFE_TIME, conf.getLong(StramClientUtils.HDFS_TOKEN_MAX_LIFE_TIME, StramClientUtils.DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT));
dag.setAttribute(LogicalPlan.HDFS_TOKEN_LIFE_TIME, hdfsTokenMaxLifeTime);
long rmTokenMaxLifeTime = conf.getLong(StramClientUtils.DT_RM_TOKEN_MAX_LIFE_TIME, conf.getLong(YarnConfiguration.DELEGATION_TOKEN_MAX_LIFETIME_KEY, YarnConfiguration.DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT));
dag.setAttribute(LogicalPlan.RM_TOKEN_LIFE_TIME, rmTokenMaxLifeTime);
// TODO:- Need to see if other token refresh attributes are needed if security is not enabled
if (UserGroupInformation.isSecurityEnabled()) {
setTokenRefreshKeytab(dag, conf);
long hdfsTokenMaxLifeTime = conf.getLong(StramClientUtils.DT_HDFS_TOKEN_MAX_LIFE_TIME, conf.getLong(StramClientUtils.HDFS_TOKEN_MAX_LIFE_TIME, StramClientUtils.DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT));
dag.setAttribute(LogicalPlan.HDFS_TOKEN_LIFE_TIME, hdfsTokenMaxLifeTime);
long rmTokenMaxLifeTime = conf.getLong(StramClientUtils.DT_RM_TOKEN_MAX_LIFE_TIME, conf.getLong(YarnConfiguration.DELEGATION_TOKEN_MAX_LIFETIME_KEY, YarnConfiguration.DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT));
dag.setAttribute(LogicalPlan.RM_TOKEN_LIFE_TIME, rmTokenMaxLifeTime);
setTokenRefreshCredentials(dag, conf);
}
String tokenRefreshFactor = conf.get(StramClientUtils.TOKEN_ANTICIPATORY_REFRESH_FACTOR);
if (tokenRefreshFactor != null && tokenRefreshFactor.trim().length() > 0) {
Expand Down
Expand Up @@ -616,11 +616,12 @@ public void heartbeatLoop() throws Exception
Token<?> token = iter.next();
logger.debug("token: {}", token);
}
String principal = containerContext.getValue(LogicalPlan.PRINCIPAL);
String hdfsKeyTabFile = containerContext.getValue(LogicalPlan.KEY_TAB_FILE);
while (!exitHeartbeatLoop) {

if (UserGroupInformation.isSecurityEnabled() && System.currentTimeMillis() >= expiryTime && hdfsKeyTabFile != null) {
expiryTime = StramUserLogin.refreshTokens(tokenLifeTime, FileUtils.getTempDirectoryPath(), containerId, conf, hdfsKeyTabFile, credentials, null, false);
expiryTime = StramUserLogin.refreshTokens(tokenLifeTime, FileUtils.getTempDirectoryPath(), containerId, conf, principal, hdfsKeyTabFile, credentials, null, false);
}
synchronized (this.heartbeatTrigger) {
try {
Expand Down
Expand Up @@ -157,6 +157,7 @@ public class LogicalPlan implements Serializable, DAG
public static Attribute<Boolean> FAST_PUBLISHER_SUBSCRIBER = new Attribute<>(false);
public static Attribute<Long> HDFS_TOKEN_LIFE_TIME = new Attribute<>(604800000L);
public static Attribute<Long> RM_TOKEN_LIFE_TIME = new Attribute<>(YarnConfiguration.DELEGATION_TOKEN_MAX_LIFETIME_DEFAULT);
public static Attribute<String> PRINCIPAL = new Attribute<String>(null, new StringCodec.String2String());
public static Attribute<String> KEY_TAB_FILE = new Attribute<>((String)null, new StringCodec.String2String());
public static Attribute<Double> TOKEN_REFRESH_ANTICIPATORY_FACTOR = new Attribute<>(0.7);
/**
Expand Down
Expand Up @@ -49,20 +49,25 @@ public class StramUserLogin
{
private static final Logger LOG = LoggerFactory.getLogger(StramUserLogin.class);
public static final String DT_AUTH_PREFIX = StreamingApplication.DT_PREFIX + "authentication.";
private static final String DT_AUTH_PRINCIPAL = DT_AUTH_PREFIX + "principal";
public static final String DT_AUTH_PRINCIPAL = DT_AUTH_PREFIX + "principal";
public static final String DT_AUTH_KEYTAB = DT_AUTH_PREFIX + "keytab";
private static String principal;
private static String keytab;

public static void attemptAuthentication(Configuration conf) throws IOException
{
if (UserGroupInformation.isSecurityEnabled()) {
String userPrincipal = conf.get(DT_AUTH_PRINCIPAL);
String userKeytab = conf.get(DT_AUTH_KEYTAB);
authenticate(userPrincipal, userKeytab);
authenticate(conf);
}
}

public static void authenticate(Configuration conf) throws IOException
{
String userPrincipal = conf.get(DT_AUTH_PRINCIPAL);
String userKeytab = conf.get(DT_AUTH_KEYTAB);
authenticate(userPrincipal, userKeytab);
}

public static void authenticate(String principal, String keytab) throws IOException
{
if ((principal != null) && !principal.isEmpty()
Expand All @@ -79,7 +84,7 @@ public static void authenticate(String principal, String keytab) throws IOExcept
}
}

public static long refreshTokens(long tokenLifeTime, String destinationDir, String destinationFile, final Configuration conf, String hdfsKeyTabFile, final Credentials credentials, final InetSocketAddress rmAddress, final boolean renewRMToken) throws IOException
public static long refreshTokens(long tokenLifeTime, String destinationDir, String destinationFile, final Configuration conf, String principal, String hdfsKeyTabFile, final Credentials credentials, final InetSocketAddress rmAddress, final boolean renewRMToken) throws IOException
{
long expiryTime = System.currentTimeMillis() + tokenLifeTime;
//renew tokens
Expand All @@ -93,7 +98,10 @@ public static long refreshTokens(long tokenLifeTime, String destinationDir, Stri
keyTabFile = FSUtil.copyToLocalFileSystem(fs, destinationDir, destinationFile, hdfsKeyTabFile, conf);
}

UserGroupInformation ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(UserGroupInformation.getCurrentUser().getUserName(), keyTabFile.getAbsolutePath());
if (principal == null) {
principal = UserGroupInformation.getCurrentUser().getUserName();
}
UserGroupInformation ugi = UserGroupInformation.loginUserFromKeytabAndReturnUGI(principal, keyTabFile.getAbsolutePath());
try {
ugi.doAs(new PrivilegedExceptionAction<Object>()
{
Expand Down
Expand Up @@ -42,22 +42,26 @@
import com.datatorrent.stram.security.StramUserLogin;

import static org.powermock.api.mockito.PowerMockito.method;
import static org.powermock.api.mockito.PowerMockito.spy;
import static org.powermock.api.mockito.PowerMockito.suppress;
import static org.powermock.api.mockito.PowerMockito.when;

/**
* StramAppLauncher Test
*/
@RunWith(Enclosed.class)
public class StramAppLauncherTest
{
@PrepareForTest({StramAppLauncher.class, StramUserLogin.class})

private static final String SET_TOKEN_REFRESH_CREDENTIALS_METHOD = "setTokenRefreshCredentials";

@PrepareForTest({StramAppLauncher.class})
@PowerMockIgnore({"javax.xml.*", "org.w3c.*", "org.apache.hadoop.*", "org.apache.log4j.*"})
public static class RefreshTokenTests
{
File workspace;
File sourceKeytab;
File dfsDir;

static final String principal = "username/group@domain";

@Rule
public PowerMockRule rule = new PowerMockRule();
Expand All @@ -77,6 +81,7 @@ protected void starting(Description description)
} catch (IOException e) {
throw new RuntimeException(e);
}
dfsDir = new File(workspace, "dst");
suppress(method(StramAppLauncher.class, "init"));
}

Expand All @@ -92,43 +97,52 @@ protected void finished(Description description)
public void testGetTokenRefreshKeytab() throws Exception
{
Configuration conf = new Configuration(false);
conf.set(StramClientUtils.KEY_TAB_FILE, sourceKeytab.getPath());
File storeKeytab = new File(dfsDir, "keytab2");
conf.set(StramClientUtils.KEY_TAB_FILE, storeKeytab.getPath());
StramUserLogin.authenticate(principal, sourceKeytab.getPath());
LogicalPlan dag = applyTokenRefreshKeytab(FileSystem.newInstance(conf), conf);
Assert.assertEquals("Token refresh keytab path", sourceKeytab.getPath(), dag.getValue(LogicalPlan.KEY_TAB_FILE));
Assert.assertEquals("Token refresh principal", principal, dag.getValue(LogicalPlan.PRINCIPAL));
Assert.assertEquals("Token refresh keytab path", storeKeytab.getPath(), dag.getValue(LogicalPlan.KEY_TAB_FILE));
}

@Test
public void testUserLoginTokenRefreshKeytab() throws Exception
{
Configuration conf = new Configuration(false);
/*
spy(StramUserLogin.class);
when(StramUserLogin.getPrincipal()).thenReturn(principal);
when(StramUserLogin.getKeytab()).thenReturn(sourceKeytab.getPath());
*/
StramUserLogin.authenticate(principal, sourceKeytab.getPath());
testDFSTokenPath(conf);
}

@Test
public void testAuthPropTokenRefreshKeytab() throws Exception
{
Configuration conf = new Configuration(false);
conf.set(StramUserLogin.DT_AUTH_PRINCIPAL, principal);
conf.set(StramUserLogin.DT_AUTH_KEYTAB, sourceKeytab.getPath());
StramUserLogin.authenticate(conf);
testDFSTokenPath(conf);
}

private void testDFSTokenPath(Configuration conf) throws Exception
{
FileSystem fs = FileSystem.newInstance(conf);
File dfsDir = new File(workspace, "dst");
conf.set(StramClientUtils.DT_DFS_ROOT_DIR, dfsDir.getAbsolutePath());
LogicalPlan dag = applyTokenRefreshKeytab(fs, conf);
Assert.assertEquals("Token refresh principal", principal, dag.getValue(LogicalPlan.PRINCIPAL));
Assert.assertEquals("Token refresh keytab path", new Path(fs.getUri().getScheme(), fs.getUri().getAuthority(),
new File(dfsDir, "keytab").getAbsolutePath()).toString(), dag.getValue(LogicalPlan.KEY_TAB_FILE));
new File(dfsDir, sourceKeytab.getName()).getAbsolutePath()).toString(), dag.getValue(LogicalPlan.KEY_TAB_FILE));
}

private LogicalPlan applyTokenRefreshKeytab(FileSystem fs, Configuration conf) throws Exception
{
LogicalPlan dag = new LogicalPlan();
StramAppLauncher appLauncher = new StramAppLauncher(fs, conf);
Whitebox.invokeMethod(appLauncher, "setTokenRefreshKeytab", dag, conf);
Whitebox.invokeMethod(appLauncher, SET_TOKEN_REFRESH_CREDENTIALS_METHOD, dag, conf);
return dag;
}
}
Expand Down

0 comments on commit dd5e95a

Please sign in to comment.