Skip to content

Commit

Permalink
ACCUMULO-829 Apply patch from Sean Hickey, with modification of movin…
Browse files Browse the repository at this point in the history
…g utility class for creating tokens to core instead of server

git-svn-id: https://svn.apache.org/repos/asf/accumulo/trunk@1485798 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information
ctubbsii committed May 23, 2013
1 parent e945873 commit 38e0913
Show file tree
Hide file tree
Showing 12 changed files with 697 additions and 9 deletions.
Expand Up @@ -28,13 +28,22 @@ public class ClientOnRequiredTable extends ClientOpts {
@Parameter(names = {"-t", "--table"}, required = true, description = "table to use")
public String tableName = null;

@Parameter(names = {"-tf", "--tokenFile"}, description = "File in hdfs containing the user's authentication token create with \"bin/accumulo create-token\"")
public String tokenFile = "";

@Override
public void setAccumuloConfigs(Job job) throws AccumuloSecurityException {
super.setAccumuloConfigs(job);
AccumuloInputFormat.setConnectorInfo(job, principal, getToken());

if (tokenFile.isEmpty()) {
AccumuloInputFormat.setConnectorInfo(job, principal, getToken());
AccumuloOutputFormat.setConnectorInfo(job, principal, getToken());
} else {
AccumuloInputFormat.setConnectorInfo(job, principal, tokenFile);
AccumuloOutputFormat.setConnectorInfo(job, principal, tokenFile);
}
AccumuloInputFormat.setInputTableName(job, tableName);
AccumuloInputFormat.setScanAuthorizations(job, auths);
AccumuloOutputFormat.setConnectorInfo(job, principal, getToken());
AccumuloOutputFormat.setCreateTables(job, true);
AccumuloOutputFormat.setDefaultTableName(job, tableName);
}
Expand Down
Expand Up @@ -38,6 +38,7 @@
*
* <ul>
* <li>{@link AccumuloInputFormat#setConnectorInfo(JobConf, String, AuthenticationToken)}
* <li>{@link AccumuloInputFormat#setConnectorInfo(JobConf, String, String)}
* <li>{@link AccumuloInputFormat#setScanAuthorizations(JobConf, Authorizations)}
* <li>{@link AccumuloInputFormat#setZooKeeperInstance(JobConf, String, String)} OR {@link AccumuloInputFormat#setMockInstance(JobConf, String)}
* </ul>
Expand Down
Expand Up @@ -60,6 +60,7 @@
*
* <ul>
* <li>{@link AccumuloOutputFormat#setConnectorInfo(JobConf, String, AuthenticationToken)}
* <li>{@link AccumuloOutputFormat#setConnectorInfo(JobConf, String)}
* <li>{@link AccumuloOutputFormat#setZooKeeperInstance(JobConf, String, String)} OR {@link AccumuloOutputFormat#setMockInstance(JobConf, String)}
* </ul>
*
Expand Down Expand Up @@ -90,6 +91,25 @@ public static void setConnectorInfo(JobConf job, String principal, Authenticatio
OutputConfigurator.setConnectorInfo(CLASS, job, principal, token);
}

/**
* Sets the connector information needed to communicate with Accumulo in this job.
*
* <p>
* Stores the password in a file in HDFS and pulls that into the Distributed Cache in an attempt to be more secure than storing it in the Configuration.
*
* @param job
* the Hadoop job instance to be configured
* @param principal
* a valid Accumulo user name (user must have Table.CREATE permission if {@link #setCreateTables(JobConf, boolean)} is set to true)
* @param tokenFile
* the path to the password file
* @throws AccumuloSecurityException
* @since 1.6.0
*/
public static void setConnectorInfo(JobConf job, String principal, String tokenFile) throws AccumuloSecurityException {
OutputConfigurator.setConnectorInfo(CLASS, job, principal, tokenFile);
}

/**
* Determines if the connector has been configured.
*
Expand Down
Expand Up @@ -103,6 +103,25 @@ public static void setConnectorInfo(JobConf job, String principal, Authenticatio
InputConfigurator.setConnectorInfo(CLASS, job, principal, token);
}

/**
* Sets the connector information needed to communicate with Accumulo in this job.
*
* <p>
* Stores the password in a file in HDFS and pulls that into the Distributed Cache in an attempt to be more secure than storing it in the Configuration.
*
* @param job
* the Hadoop job instance to be configured
* @param principal
* a valid Accumulo user name (user must have Table.CREATE permission)
* @param tokenFile
* the path to the token file
* @throws AccumuloSecurityException
* @since 1.6.0
*/
public static void setConnectorInfo(JobConf job, String principal, String tokenFile) throws AccumuloSecurityException {
InputConfigurator.setConnectorInfo(CLASS, job, principal, tokenFile);
}

/**
* Determines if the connector has been configured.
*
Expand Down Expand Up @@ -156,6 +175,20 @@ protected static byte[] getToken(JobConf job) {
return InputConfigurator.getToken(CLASS, job);
}

/**
* Gets the password file from the configuration. It is BASE64 encoded to provide a charset safe conversion to a string, and is not intended to be secure. If
* specified, the password will be stored in a file rather than in the Configuration.
*
* @param job
* the Hadoop context for the configured job
* @return path to the password file as a String
* @since 1.6.0
* @see #setConnectorInfo(JobConf, String, AuthenticationToken)
*/
protected static String getTokenFile(JobConf job) {
return InputConfigurator.getTokenFile(CLASS, job);
}

/**
* Configures a {@link ZooKeeperInstance} for this job.
*
Expand Down
Expand Up @@ -38,6 +38,7 @@
*
* <ul>
* <li>{@link AccumuloInputFormat#setConnectorInfo(Job, String, AuthenticationToken)}
* <li>{@link AccumuloInputFormat#setConnectorInfo(Job, String, String)}
* <li>{@link AccumuloInputFormat#setInputTableName(Job, String)}
* <li>{@link AccumuloInputFormat#setScanAuthorizations(Job, Authorizations)}
* <li>{@link AccumuloInputFormat#setZooKeeperInstance(Job, String, String)} OR {@link AccumuloInputFormat#setMockInstance(Job, String)}
Expand Down
Expand Up @@ -94,6 +94,25 @@ public static void setConnectorInfo(Job job, String principal, AuthenticationTok
OutputConfigurator.setConnectorInfo(CLASS, job.getConfiguration(), principal, token);
}

/**
* Sets the connector information needed to communicate with Accumulo in this job.
*
* <p>
* Stores the password in a file in HDFS and pulls that into the Distributed Cache in an attempt to be more secure than storing it in the Configuration.
*
* @param job
* the Hadoop job instance to be configured
* @param principal
* a valid Accumulo user name (user must have Table.CREATE permission if {@link #setCreateTables(Job, boolean)} is set to true)
* @param tokenFile
* the path to the token file
* @throws AccumuloSecurityException
* @since 1.6.0
*/
public static void setConnectorInfo(Job job, String principal, String tokenFile) throws AccumuloSecurityException {
OutputConfigurator.setConnectorInfo(CLASS, job.getConfiguration(), principal, tokenFile);
}

/**
* Determines if the connector has been configured.
*
Expand Down
Expand Up @@ -73,6 +73,7 @@
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.io.Writable;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapreduce.InputFormat;
import org.apache.hadoop.mapreduce.InputSplit;
import org.apache.hadoop.mapreduce.Job;
Expand Down Expand Up @@ -117,6 +118,25 @@ public static void setConnectorInfo(Job job, String principal, AuthenticationTok
InputConfigurator.setConnectorInfo(CLASS, job.getConfiguration(), principal, token);
}

/**
* Sets the connector information needed to communicate with Accumulo in this job.
*
* <p>
* Stores the password in a file in HDFS and pulls that into the Distributed Cache in an attempt to be more secure than storing it in the Configuration.
*
* @param job
* the Hadoop job instance to be configured
* @param principal
* a valid Accumulo user name (user must have Table.CREATE permission)
* @param tokenFile
* the path to the token file
* @throws AccumuloSecurityException
* @since 1.6.0
*/
public static void setConnectorInfo(Job job, String principal, String tokenFile) throws AccumuloSecurityException {
InputConfigurator.setConnectorInfo(CLASS, job.getConfiguration(), principal, tokenFile);
}

/**
* Determines if the connector has been configured.
*
Expand Down Expand Up @@ -170,6 +190,19 @@ protected static byte[] getToken(JobContext context) {
return InputConfigurator.getToken(CLASS, getConfiguration(context));
}

/**
* Gets the password file from the configuration.
*
* @param job
* the Hadoop context for the configured job
* @return path to the password file as a String
* @since 1.6.0
* @see #setConnectorInfo(JobConf, String, AuthenticationToken)
*/
protected static String getTokenFile(JobContext context) {
return InputConfigurator.getTokenFile(CLASS, getConfiguration(context));
}

/**
* Configures a {@link ZooKeeperInstance} for this job.
*
Expand Down
Expand Up @@ -16,7 +16,10 @@
*/
package org.apache.accumulo.core.client.mapreduce.lib.util;

import java.net.URI;
import java.net.URISyntaxException;
import java.nio.charset.Charset;
import java.io.IOException;

import org.apache.accumulo.core.client.AccumuloSecurityException;
import org.apache.accumulo.core.client.Instance;
Expand All @@ -28,6 +31,11 @@
import org.apache.commons.codec.binary.Base64;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.util.StringUtils;
import org.apache.hadoop.filecache.DistributedCache;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.mapred.JobConf;
import org.apache.log4j.Level;
import org.apache.log4j.Logger;

Expand All @@ -42,7 +50,7 @@ public class ConfiguratorBase {
* @since 1.5.0
*/
public static enum ConnectorInfo {
IS_CONFIGURED, PRINCIPAL, TOKEN, TOKEN_CLASS
IS_CONFIGURED, PRINCIPAL, TOKEN, TOKEN_CLASS, TOKEN_FILE
}

/**
Expand Down Expand Up @@ -107,6 +115,41 @@ public static void setConnectorInfo(Class<?> implementingClass, Configuration co
conf.set(enumToConfKey(implementingClass, ConnectorInfo.TOKEN), CredentialHelper.tokenAsBase64(token));
}

/**
* Sets the connector information needed to communicate with Accumulo in this job.
*
* <p>
* Pulls a token file into the Distributed Cache that contains the authentication token in an attempt to be more secure than storing the password in the
* Configuration. Token file created with "bin/accumulo create-token".
*
* @param implementingClass
* the class whose name will be used as a prefix for the property configuration key
* @param conf
* the Hadoop configuration object to configure
* @param principal
* a valid Accumulo user name
* @param tokenFile
* the path to the token file
* @throws AccumuloSecurityException
* @since 1.6.0
*/
public static void setConnectorInfo(Class<?> implementingClass, Configuration conf, String principal, String tokenFile) throws AccumuloSecurityException {
if (isConnectorInfoSet(implementingClass, conf))
throw new IllegalStateException("Connector info for " + implementingClass.getSimpleName() + " can only be set once per job");

ArgumentChecker.notNull(principal, tokenFile);

try {
DistributedCache.addCacheFile(new URI(tokenFile), conf);
} catch (URISyntaxException e) {
throw new IllegalStateException("Unable to add tokenFile \"" + tokenFile + "\" to distributed cache.");
}

conf.setBoolean(enumToConfKey(implementingClass, ConnectorInfo.IS_CONFIGURED), true);
conf.set(enumToConfKey(implementingClass, ConnectorInfo.TOKEN_FILE), tokenFile);
conf.set(enumToConfKey(implementingClass, ConnectorInfo.PRINCIPAL), principal);
}

/**
* Determines if the connector info has already been set for this instance.
*
Expand Down Expand Up @@ -138,7 +181,7 @@ public static String getPrincipal(Class<?> implementingClass, Configuration conf
}

/**
* Gets the serialized token class from the configuration.
* Gets the serialized token class from either the configuration or the token file.
*
* @param implementingClass
* the class whose name will be used as a prefix for the property configuration key
Expand All @@ -149,12 +192,17 @@ public static String getPrincipal(Class<?> implementingClass, Configuration conf
* @see #setConnectorInfo(Class, Configuration, String, AuthenticationToken)
*/
public static String getTokenClass(Class<?> implementingClass, Configuration conf) {
return conf.get(enumToConfKey(implementingClass, ConnectorInfo.TOKEN_CLASS));
String tokenFile = getTokenFile(implementingClass, conf);
if (tokenFile.isEmpty()) {
return conf.get(enumToConfKey(implementingClass, ConnectorInfo.TOKEN_CLASS));
} else {
return readTokenFile(implementingClass, conf).split(":")[1];
}
}

/**
* Gets the password from the configuration. WARNING: The password is stored in the Configuration and shared with all MapReduce tasks; It is BASE64 encoded to
* provide a charset safe conversion to a string, and is not intended to be secure.
* Gets the password from either the configuration or the token file. WARNING: If no token file is specified, the password is stored in the Configuration and
* shared with all MapReduce tasks; It is BASE64 encoded to provide a charset safe conversion to a string, and is not intended to be secure.
*
* @param implementingClass
* the class whose name will be used as a prefix for the property configuration key
Expand All @@ -165,7 +213,79 @@ public static String getTokenClass(Class<?> implementingClass, Configuration con
* @see #setConnectorInfo(Class, Configuration, String, AuthenticationToken)
*/
public static byte[] getToken(Class<?> implementingClass, Configuration conf) {
return Base64.decodeBase64(conf.get(enumToConfKey(implementingClass, ConnectorInfo.TOKEN), "").getBytes(Charset.forName("UTF-8")));
String tokenFile = getTokenFile(implementingClass, conf);
String token = null;
if (tokenFile.isEmpty()) {
token = conf.get(enumToConfKey(implementingClass, ConnectorInfo.TOKEN));
} else {
token = readTokenFile(implementingClass, conf).split(":")[2];
}
return Base64.decodeBase64(token.getBytes(Charset.forName("UTF-8")));
}

/**
* Grabs the token file's path out of the Configuration.
*
* @param job
* the Hadoop context for the configured job
* @return path to the token file as a String
* @since 1.6.0
* @see #setConnectorInfo(JobConf, String, AuthenticationToken)
*/
public static String getTokenFile(Class<?> implementingClass, Configuration conf) {
return conf.get(enumToConfKey(implementingClass, ConnectorInfo.TOKEN_FILE), "");
}

/**
* Reads from the token file in distributed cache. Currently, the token file stores data separated by colons e.g. principal:token_class:token
*
* @param job
* the Hadoop context for the configured job
* @return path to the token file as a String
* @since 1.6.0
* @see #setConnectorInfo(JobConf, String, AuthenticationToken)
*/
public static String readTokenFile(Class<?> implementingClass, Configuration conf) {
String tokenFile = getTokenFile(implementingClass, conf);
FSDataInputStream in = null;
try {
URI[] uris = DistributedCache.getCacheFiles(conf);
Path path = null;
for (URI u : uris) {
if (u.toString().equals(tokenFile)) {
path = new Path(u);
}
}
if (path == null) {
throw new IllegalArgumentException("Couldn't find password file called \"" + tokenFile + "\" in cache.");
}
FileSystem fs = FileSystem.get(conf);
in = fs.open(path);
} catch (IOException e) {
throw new IllegalArgumentException("Couldn't open password file called \"" + tokenFile + "\".");
}
java.util.Scanner fileScanner = new java.util.Scanner(in);
try {
String line = null;
boolean found = false;
String principal = getPrincipal(implementingClass, conf);
while (!found && fileScanner.hasNextLine()) {
line = fileScanner.nextLine();
if (line.startsWith(principal + ":")) {
found = true;
break;
}
}
if (found)
return line;
else
throw new IllegalArgumentException("Couldn't find token for user \"" + principal + "\" in file \"" + tokenFile + "\"");
} finally {
if (fileScanner != null && fileScanner.ioException() == null)
fileScanner.close();
else if (fileScanner.ioException() != null)
throw new RuntimeException(fileScanner.ioException());
}
}

/**
Expand Down

0 comments on commit 38e0913

Please sign in to comment.