Skip to content

Commit

Permalink
Standardizing the Kerberos login phase : explicitly specify the Hadoo…
Browse files Browse the repository at this point in the history
…p config and keytab path. Also assumes that extra kerberos related config parameters are passed to the Java process
  • Loading branch information
Chinmay Soman committed Dec 6, 2012
1 parent 577378e commit 44c4667
Showing 1 changed file with 100 additions and 75 deletions.
Expand Up @@ -25,7 +25,6 @@
import java.net.URI;
import java.net.URL;
import java.net.URLClassLoader;
import java.security.PrivilegedExceptionAction;
import java.text.NumberFormat;
import java.util.Arrays;
import java.util.Comparator;
Expand All @@ -37,6 +36,7 @@
import org.apache.commons.codec.binary.Hex;
import org.apache.commons.io.IOUtils;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.CommonConfigurationKeys;
import org.apache.hadoop.fs.FSDataInputStream;
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
Expand Down Expand Up @@ -67,6 +67,9 @@ public class HdfsFetcher implements FileFetcher {

private static final Logger logger = Logger.getLogger(HdfsFetcher.class);

private static String keytabPath = "";
private static String kerberosPrincipal = "voldemrt";

private final Long maxBytesPerSecond, reportingIntervalBytes;
private final int bufferSize;
private static final AtomicInteger copyCount = new AtomicInteger(0);
Expand All @@ -79,17 +82,7 @@ public class HdfsFetcher implements FileFetcher {
private String kerberosUser = "voldemrt";
private VoldemortConfig voldemortConfig = null;

public HdfsFetcher(VoldemortConfig config) {
this(config.getMaxBytesPerSecond(),
config.getReportingIntervalBytes(),
config.getFetcherBufferSize());

this.voldemortConfig = config;

logger.info("Created hdfs fetcher with throttle rate " + maxBytesPerSecond
+ ", buffer size " + bufferSize + ", reporting interval bytes "
+ reportingIntervalBytes);
}
public static final String FS_DEFAULT_NAME = "fs.default.name";

public HdfsFetcher(VoldemortConfig config, DynamicThrottleLimit dynThrottleLimit) {
this(dynThrottleLimit,
Expand Down Expand Up @@ -139,11 +132,16 @@ public HdfsFetcher(DynamicThrottleLimit dynThrottleLimit,
this.bufferSize = bufferSize;
this.status = null;
this.minBytesPerSecond = minBytesPerSecond;
this.keytabLocation = keytabLocation;
this.kerberosUser = kerberosUser;
HdfsFetcher.kerberosPrincipal = kerberosUser;
HdfsFetcher.keytabPath = keytabLocation;
}

public File fetch(String sourceFileUrl, String destinationFile) throws IOException {
return fetch(sourceFileUrl, destinationFile, this.voldemortConfig.getHadoopConfigPath());
}

public File fetch(String sourceFileUrl, String destinationFile, String hadoopConfigPath)
throws IOException {
if(this.globalThrottleLimit != null) {
if(this.globalThrottleLimit.getSpeculativeRate() < this.minBytesPerSecond)
throw new VoldemortException("Too many push jobs.");
Expand All @@ -153,52 +151,45 @@ public File fetch(String sourceFileUrl, String destinationFile) throws IOExcepti
ObjectName jmxName = null;
try {

final Path path = new Path(sourceFileUrl);
final Configuration config = new Configuration();
config.setInt("io.socket.receive.buffer", bufferSize);
config.set("hadoop.rpc.socket.factory.class.ClientProtocol",
ConfigurableSocketFactory.class.getName());
config.set("hadoop.security.group.mapping",
"org.apache.hadoop.security.ShellBasedUnixGroupsMapping");
config.addResource(new Path(hadoopConfigPath + "/core-site.xml"));
config.addResource(new Path(hadoopConfigPath + "/hdfs-site.xml"));

FileSystem fs = null;

HdfsFetcher.addPath(this.voldemortConfig.getHadoopConfigPath());

/*
* Get the filesystem object in a secured (authenticated) block in
* case this server is talking to a Kerberized Hadoop cluster.
*
* Otherwise get the default filesystem object.
*/
synchronized(this) {
if(this.keytabLocation.length() > 0) {
logger.info("keytab path = " + keytabLocation + " and Kerberos user = "
+ kerberosUser);
UserGroupInformation.loginUserFromKeytab(kerberosUser, keytabLocation);
logger.info("I've logged in and am now Doasing as "
+ UserGroupInformation.getCurrentUser().getUserName());
try {
fs = UserGroupInformation.getCurrentUser()
.doAs(new PrivilegedExceptionAction<FileSystem>() {

public FileSystem run() throws Exception {
FileSystem fs = path.getFileSystem(config);
return fs;
}
});
} catch(InterruptedException e) {
logger.error(e.getMessage());
return null;
} catch(Exception e) {
logger.error("Got an exception while getting the filesystem object: ");
logger.error("Exception class : " + e.getClass());
e.printStackTrace();
for(StackTraceElement et: e.getStackTrace()) {
logger.error(et.toString());
}
}
final Path path = new Path(sourceFileUrl);
logger.info("Using path : " + path);

String security = config.get(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION);

if(security == null || !security.equals("kerberos")) {
logger.info("Security isn't turned on in the conf: "
+ CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION + " = "
+ config.get(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION));
logger.info("Fix that. Exiting.");
return null;
} else {
logger.info("Security is turned on in the conf. That's good");

} else {
fs = path.getFileSystem(config);
}
}

System.err.println(HdfsFetcher.keytabPath);

try {
UserGroupInformation.setConfiguration(config);
UserGroupInformation.loginUserFromKeytab(HdfsFetcher.kerberosPrincipal,
HdfsFetcher.keytabPath);
fs = path.getFileSystem(config);
// fs = FileSystem.get(config);
} catch(IOException e) {
e.printStackTrace();
System.err.println("Error !!! Exiting !!!");
System.exit(-1);
}

CopyStats stats = new CopyStats(sourceFileUrl, sizeOfPath(fs, path));
Expand Down Expand Up @@ -536,7 +527,7 @@ public static void addPath(String s) throws Exception {
* Main method for testing fetching
*/
public static void main(String[] args) throws Exception {
if(args.length < 1)
if(args.length < 4)
Utils.croak("USAGE: java " + HdfsFetcher.class.getName()
+ " url [keytab location] [kerberos username] [hadoop-config-path]");
String url = args[0];
Expand All @@ -559,46 +550,80 @@ public static void main(String[] args) throws Exception {
config.set("hadoop.rpc.socket.factory.class.ClientProtocol",
ConfigurableSocketFactory.class.getName());
config.setInt("io.socket.receive.buffer", 1 * 1024 * 1024 - 10000);
config.set("hadoop.security.group.mapping",
"org.apache.hadoop.security.ShellBasedUnixGroupsMapping");
config.addResource(new Path(hadoopPath + "/core-site.xml"));
config.addResource(new Path(hadoopPath + "/hdfs-site.xml"));

FileSystem fs = null;

p = new Path(url);
System.err.println("Using path : " + p);

String security = config.get(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION);

if(security == null || !security.equals("kerberos")) {
logger.info("Security isn't turned on in the conf: "
+ CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION + " = "
+ config.get(CommonConfigurationKeys.HADOOP_SECURITY_AUTHENTICATION));
logger.info("Fix that. Exiting.");
return;
} else {
logger.info("Security is turned on in the conf. That's good");

}

// Add the Hadoop config to classpath
HdfsFetcher.addPath(hadoopPath);
// HdfsFetcher.addPath(hadoopPath);

HdfsFetcher.keytabPath = keytabLocation;
HdfsFetcher.kerberosPrincipal = kerberosUser;

/*
* Get the filesystem object in a secured (authenticated) block in case
* this server is talking to a Kerberized Hadoop cluster.
*
* Otherwise get the default filesystem object.
*/
if(keytabLocation.length() > 0) {
logger.debug("keytab path = " + keytabLocation + " and Kerberos user = " + kerberosUser);
/*
* if(keytabLocation.length() > 0) { logger.debug("keytab path = " +
* keytabLocation + " and Kerberos user = " + kerberosUser);
* UserGroupInformation.loginUserFromKeytab(kerberosUser,
* keytabLocation); logger.debug("I've logged in and am now Doasing as "
* + UserGroupInformation.getCurrentUser().getUserName()); try { fs =
* UserGroupInformation.getCurrentUser() .doAs(new
* PrivilegedExceptionAction<FileSystem>() {
*
* public FileSystem run() throws Exception { FileSystem fs =
* FileSystem.get(uri, config); return fs; } }); }
* catch(InterruptedException e) { logger.error(e.getMessage()); } }
* else { fs = FileSystem.get(uri, config); }
*/

try {
UserGroupInformation.setConfiguration(config);
UserGroupInformation.loginUserFromKeytab(kerberosUser, keytabLocation);
logger.debug("I've logged in and am now Doasing as "
+ UserGroupInformation.getCurrentUser().getUserName());
try {
fs = UserGroupInformation.getCurrentUser()
.doAs(new PrivilegedExceptionAction<FileSystem>() {

public FileSystem run() throws Exception {
FileSystem fs = FileSystem.get(uri, config);
return fs;
}
});
} catch(InterruptedException e) {
logger.error(e.getMessage());
}
} else {
fs = FileSystem.get(uri, config);
// fs = FileSystem.get(uri, config);
fs = p.getFileSystem(config);
} catch(IOException e) {
e.printStackTrace();
System.err.println("Error !!! Exiting !!!");
System.exit(-1);
}

FileStatus status = fs.getFileStatus(p);
// FileStatus status = fs.getFileStatus(p);
FileStatus status = fs.listStatus(p)[0];
long size = status.getLen();
HdfsFetcher fetcher = new HdfsFetcher(maxBytesPerSec,
VoldemortConfig.REPORTING_INTERVAL_BYTES,
VoldemortConfig.DEFAULT_BUFFER_SIZE);
long start = System.currentTimeMillis();
// File location = fetcher.fetch(url,
// System.getProperty("java.io.tmpdir") + File.separator
// + start);
File location = fetcher.fetch(url, System.getProperty("java.io.tmpdir") + File.separator
+ start);
+ start, hadoopPath);

double rate = size * Time.MS_PER_SECOND / (double) (System.currentTimeMillis() - start);
NumberFormat nf = NumberFormat.getInstance();
nf.setMaximumFractionDigits(2);
Expand Down

0 comments on commit 44c4667

Please sign in to comment.