Skip to content

Commit

Permalink
Added configurable Kerberos support to HdfsFetcher and upgraded hadoo…
Browse files Browse the repository at this point in the history
…p jars to 1.0.2
  • Loading branch information
Chinmay Soman committed Dec 6, 2012
1 parent daa49bf commit c745f80
Show file tree
Hide file tree
Showing 9 changed files with 116 additions and 20 deletions.
5 changes: 3 additions & 2 deletions .classpath
Expand Up @@ -15,11 +15,12 @@
<classpathentry kind="src" path="contrib/collections/src/java"/>
<classpathentry kind="src" path="contrib/collections/test"/>
<classpathentry kind="lib" path="lib/catalina-ant.jar"/>
<classpathentry kind="lib" path="lib/commons-codec-1.3.jar"/>
<classpathentry kind="lib" path="lib/commons-codec-1.4.jar"/>
<classpathentry kind="lib" path="lib/commons-dbcp-1.2.2.jar"/>
<classpathentry kind="lib" path="lib/colt-1.2.0.jar"/>
<classpathentry kind="lib" path="contrib/hadoop-store-builder/lib/commons-cli-2.0-SNAPSHOT.jar"/>
<classpathentry kind="lib" path="contrib/hadoop-store-builder/lib/hadoop-0.20.2-core.jar"/>
<classpathentry kind="lib" path="contrib/hadoop-store-builder/lib/commons-configuration-1.6.jar"/>
<classpathentry kind="lib" path="contrib/hadoop-store-builder/lib/hadoop-core-1.0.2-p1.jar"/>
<classpathentry kind="lib" path="lib/junit-4.6.jar"/>
<classpathentry kind="lib" path="lib/log4j-1.2.15.jar"/>
<classpathentry kind="lib" path="lib/jetty-6.1.18.jar"/>
Expand Down
Binary file not shown.
Binary file not shown.
Binary file not shown.
Expand Up @@ -21,6 +21,8 @@
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.net.URI;
import java.security.PrivilegedExceptionAction;
import java.text.NumberFormat;
import java.util.Arrays;
import java.util.Comparator;
Expand All @@ -36,6 +38,7 @@
import org.apache.hadoop.fs.FileStatus;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.log4j.Logger;

import voldemort.VoldemortException;
Expand Down Expand Up @@ -69,6 +72,8 @@ public class HdfsFetcher implements FileFetcher {
private long minBytesPerSecond = 0;
private DynamicThrottleLimit globalThrottleLimit = null;
private static final int NUM_RETRIES = 3;
private String keytabLocation = "";
private String proxyUser = "voldemrt";

public HdfsFetcher(VoldemortConfig config) {
this(config.getMaxBytesPerSecond(),
Expand All @@ -82,9 +87,12 @@ public HdfsFetcher(VoldemortConfig config) {

public HdfsFetcher(VoldemortConfig config, DynamicThrottleLimit dynThrottleLimit) {
this(dynThrottleLimit,
null,
config.getReportingIntervalBytes(),
config.getFetcherBufferSize(),
config.getMinBytesPerSecond());
config.getMinBytesPerSecond(),
config.getReadOnlyKeytabPath(),
config.getReadOnlyKerberosProxyUser());

logger.info("Created hdfs fetcher with throttle rate " + dynThrottleLimit.getRate()
+ ", buffer size " + bufferSize + ", reporting interval bytes "
Expand All @@ -98,21 +106,16 @@ public HdfsFetcher() {
}

public HdfsFetcher(Long maxBytesPerSecond, Long reportingIntervalBytes, int bufferSize) {
this(null, maxBytesPerSecond, reportingIntervalBytes, bufferSize, 0);
}

public HdfsFetcher(DynamicThrottleLimit dynThrottleLimit,
Long reportingIntervalBytes,
int bufferSize,
long minBytesPerSecond) {
this(dynThrottleLimit, null, reportingIntervalBytes, bufferSize, minBytesPerSecond);
this(null, maxBytesPerSecond, reportingIntervalBytes, bufferSize, 0, "", "");
}

public HdfsFetcher(DynamicThrottleLimit dynThrottleLimit,
Long maxBytesPerSecond,
Long reportingIntervalBytes,
int bufferSize,
long minBytesPerSecond) {
long minBytesPerSecond,
String keytabLocation,
String proxyUser) {
if(maxBytesPerSecond != null) {
this.maxBytesPerSecond = maxBytesPerSecond;
this.throttler = new EventThrottler(this.maxBytesPerSecond);
Expand All @@ -128,6 +131,8 @@ public HdfsFetcher(DynamicThrottleLimit dynThrottleLimit,
this.bufferSize = bufferSize;
this.status = null;
this.minBytesPerSecond = minBytesPerSecond;
this.keytabLocation = keytabLocation;
this.proxyUser = proxyUser;
}

public File fetch(String sourceFileUrl, String destinationFile) throws IOException {
Expand All @@ -140,12 +145,40 @@ public File fetch(String sourceFileUrl, String destinationFile) throws IOExcepti
ObjectName jmxName = null;
try {

Path path = new Path(sourceFileUrl);
Configuration config = new Configuration();
final Path path = new Path(sourceFileUrl);
final Configuration config = new Configuration();
config.setInt("io.socket.receive.buffer", bufferSize);
config.set("hadoop.rpc.socket.factory.class.ClientProtocol",
ConfigurableSocketFactory.class.getName());
FileSystem fs = path.getFileSystem(config);
FileSystem fs = null;

/*
* Get the filesystem object in a secured (authenticated) block in
* case this server is talking to a Kerberized Hadoop cluster.
*
* Otherwise get the default filesystem object.
*/
if(this.keytabLocation.length() > 0) {
logger.debug("keytab path = " + keytabLocation + " and proxy user = " + proxyUser);
UserGroupInformation.loginUserFromKeytab(proxyUser, keytabLocation);
logger.debug("I've logged in and am now Doasing as "
+ UserGroupInformation.getCurrentUser().getUserName());
try {
fs = UserGroupInformation.getCurrentUser()
.doAs(new PrivilegedExceptionAction<FileSystem>() {

public FileSystem run() throws Exception {
FileSystem fs = path.getFileSystem(config);
return fs;
}
});
} catch(InterruptedException e) {
logger.error(e.getMessage());
return null;
}
} else {
fs = path.getFileSystem(config);
}

CopyStats stats = new CopyStats(sourceFileUrl, sizeOfPath(fs, path));
jmxName = JmxUtils.registerMbean("hdfs-copy-" + copyCount.getAndIncrement(), stats);
Expand All @@ -158,6 +191,9 @@ public File fetch(String sourceFileUrl, String destinationFile) throws IOExcepti

boolean result = fetch(fs, path, destination, stats);

// Close the filesystem
fs.close();

if(result) {
return destination;
} else {
Expand Down Expand Up @@ -460,16 +496,50 @@ public void setAsyncOperationStatus(AsyncOperationStatus status) {
*/
public static void main(String[] args) throws Exception {
if(args.length != 1)
Utils.croak("USAGE: java " + HdfsFetcher.class.getName() + " url");
Utils.croak("USAGE: java " + HdfsFetcher.class.getName()
+ " url [keytab location] [kerberos username]");
String url = args[0];
String keytabLocation = args[1];
String proxyUser = args[2];
long maxBytesPerSec = 1024 * 1024 * 1024;
Path p = new Path(url);
Configuration config = new Configuration();

final Configuration config = new Configuration();
final URI uri = new URI(url);
config.setInt("io.file.buffer.size", VoldemortConfig.DEFAULT_BUFFER_SIZE);
config.set("hadoop.rpc.socket.factory.class.ClientProtocol",
ConfigurableSocketFactory.class.getName());
config.setInt("io.socket.receive.buffer", 1 * 1024 * 1024 - 10000);
FileStatus status = p.getFileSystem(config).getFileStatus(p);
FileSystem fs = null;

/*
* Get the filesystem object in a secured (authenticated) block in case
* this server is talking to a Kerberized Hadoop cluster.
*
* Otherwise get the default filesystem object.
*/
if(keytabLocation.length() > 0) {
logger.debug("keytab path = " + keytabLocation + " and proxy user = " + proxyUser);
UserGroupInformation.loginUserFromKeytab(proxyUser, keytabLocation);
logger.debug("I've logged in and am now Doasing as "
+ UserGroupInformation.getCurrentUser().getUserName());
try {
fs = UserGroupInformation.getCurrentUser()
.doAs(new PrivilegedExceptionAction<FileSystem>() {

public FileSystem run() throws Exception {
FileSystem fs = FileSystem.get(uri, config);
return fs;
}
});
} catch(InterruptedException e) {
logger.error(e.getMessage());
}
} else {
fs = FileSystem.get(uri, config);
}

FileStatus status = fs.getFileStatus(p);
long size = status.getLen();
HdfsFetcher fetcher = new HdfsFetcher(maxBytesPerSec,
VoldemortConfig.REPORTING_INTERVAL_BYTES,
Expand All @@ -482,5 +552,6 @@ public static void main(String[] args) throws Exception {
nf.setMaximumFractionDigits(2);
System.out.println("Fetch to " + location + " completed: "
+ nf.format(rate / (1024.0 * 1024.0)) + " MB/sec.");
fs.close();
}
}
Binary file removed lib/commons-codec-1.3.jar
Binary file not shown.
Binary file added lib/commons-codec-1.4.jar
Binary file not shown.
8 changes: 6 additions & 2 deletions src/java/voldemort/client/protocol/admin/AdminClient.java
Expand Up @@ -1490,11 +1490,15 @@ public String waitForCompletion(int nodeId,
long waitUntil = System.currentTimeMillis() + timeUnit.toMillis(maxWait);

String description = null;
String oldStatus = "";
while(System.currentTimeMillis() < waitUntil) {
try {
AsyncOperationStatus status = getAsyncRequestStatus(nodeId, requestId);
logger.info("Status from node " + nodeId + " (" + status.getDescription() + ") - "
+ status.getStatus());
if(!status.getStatus().equalsIgnoreCase(oldStatus))
logger.info("Status from node " + nodeId + " (" + status.getDescription()
+ ") - " + status.getStatus());
oldStatus = status.getStatus();

if(higherStatus != null) {
higherStatus.setStatus("Status from node " + nodeId + " ("
+ status.getDescription() + ") - " + status.getStatus());
Expand Down
20 changes: 20 additions & 0 deletions src/java/voldemort/server/VoldemortConfig.java
Expand Up @@ -108,6 +108,8 @@ public class VoldemortConfig implements Serializable {
private long minBytesPerSecond;
private long reportingIntervalBytes;
private int fetcherBufferSize;
private String readOnlyKeytabPath;
private String readOnlyKerberosProxyUser;

private OpTimeMap testingSlowQueueingDelays;
private OpTimeMap testingSlowConcurrentDelays;
Expand Down Expand Up @@ -269,6 +271,8 @@ public VoldemortConfig(Props props) {
REPORTING_INTERVAL_BYTES);
this.fetcherBufferSize = (int) props.getBytes("hdfs.fetcher.buffer.size",
DEFAULT_BUFFER_SIZE);
this.readOnlyKeytabPath = props.getString("readonly.keytab.path", "");
this.readOnlyKerberosProxyUser = props.getString("readonly.kerberos.proxyuser", "voldemrt");

// TODO probably turn to false by default?
this.setUseMlock(props.getBoolean("readonly.mlock.index", true));
Expand Down Expand Up @@ -1582,6 +1586,22 @@ public void setReadOnlyDeleteBackupMs(int readOnlyDeleteBackupTimeMs) {
this.readOnlyDeleteBackupTimeMs = readOnlyDeleteBackupTimeMs;
}

public String getReadOnlyKeytabPath() {
return readOnlyKeytabPath;
}

public void setReadOnlyKeytabPath(String readOnlyKeytabPath) {
this.readOnlyKeytabPath = readOnlyKeytabPath;
}

public String getReadOnlyKerberosProxyUser() {
return readOnlyKerberosProxyUser;
}

public void setReadOnlyKerberosProxyUser(String readOnlyKerberosProxyUser) {
this.readOnlyKerberosProxyUser = readOnlyKerberosProxyUser;
}

public int getSocketBufferSize() {
return socketBufferSize;
}
Expand Down

0 comments on commit c745f80

Please sign in to comment.