Skip to content

Commit

Permalink
Add support for kerberized grids in the job by supporting protocols
Browse files Browse the repository at this point in the history
  • Loading branch information
abh1nay committed Dec 6, 2012
1 parent 374d02a commit daa49bf
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 16 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -98,6 +98,11 @@ public class VoldemortBuildAndPushJob extends AbstractJob {

private static final String AVRO_GENERIC_VERSIONED_TYPE_NAME = "avro-generic-versioned";

// new properties for the push job

private final String hdfsFetcherPort;
private final String hdfsFetcherProtocol;

/* Informed stuff */
private final String informedURL = "http://informed.corp.linkedin.com/_post";
private final List<Future> informedResults;
Expand Down Expand Up @@ -134,6 +139,12 @@ public VoldemortBuildAndPushJob(String name, Props props) {
this.informedResults = Lists.newArrayList();
this.informedExecutor = Executors.newFixedThreadPool(2);

this.hdfsFetcherProtocol = props.getString("voldemort.fetcher.protocol", "hftp");
this.hdfsFetcherPort = props.getString("voldemort.fetcher.port", "50070");

log.info("voldemort.fetcher.protocol is set to : " + hdfsFetcherProtocol);
log.info("voldemort.fetcher.port is set to : " + hdfsFetcherPort);

isAvroJob = props.getBoolean("build.type.avro", false);

// Set default to false
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,16 +43,23 @@ public class VoldemortSwapJob extends AbstractJob {

private final Props _props;
private VoldemortSwapConf swapConf;
private String hdfsFetcherProtocol;
private String hdfsFetcherPort;

public VoldemortSwapJob(String id, Props props) throws IOException {
super(id);
_props = props;

this.hdfsFetcherProtocol = props.getString("voldemort.fetcher.protocol", "hftp");
this.hdfsFetcherPort = props.getString("voldemort.fetcher.port", "50070");
swapConf = new VoldemortSwapConf(_props);
}

public VoldemortSwapJob(String id, Props props, VoldemortSwapConf conf) throws IOException {
super(id);
_props = props;
this.hdfsFetcherProtocol = props.getString("voldemort.fetcher.protocol", "hftp");
this.hdfsFetcherPort = props.getString("voldemort.fetcher.port", "50070");
swapConf = conf;
}

Expand Down Expand Up @@ -149,17 +156,6 @@ public void run() throws Exception {
Path dataPath = new Path(dataDir);
dataDir = dataPath.makeQualified(FileSystem.get(conf)).toString();

/*
* Set the protocol according to config: webhdfs if its enabled
* Otherwise use hftp.
*/
Configuration hadoopConfig = new Configuration();
String protocolName = hadoopConfig.get("dfs.webhdfs.enabled");
String protocolPort = "";
if(hadoopConfig.get("dfs.http.address").split(":").length >= 2)
protocolPort = hadoopConfig.get("dfs.http.address").split(":")[1];
protocolName = (protocolName == null) ? "hftp" : "webhdfs";

/*
* Replace the default protocol and port with the one derived as above
*/
Expand All @@ -171,12 +167,10 @@ public void run() throws Exception {
existingPort = pathComponents[2].split("/")[0];
}
info("Existing protocol = " + existingProtocol + " and port = " + existingPort);
if(protocolName.length() > 0 && protocolPort.length() > 0) {
dataDir = dataDir.replaceFirst(existingProtocol, protocolName);
dataDir = dataDir.replaceFirst(existingPort, protocolPort);
if(hdfsFetcherProtocol.length() > 0 && hdfsFetcherPort.length() > 0) {
dataDir = dataDir.replaceFirst(existingProtocol, this.hdfsFetcherProtocol);
dataDir = dataDir.replaceFirst(existingPort, this.hdfsFetcherPort);
}
info("dfs.webhdfs.enabled = " + hadoopConfig.get("dfs.webhdfs.enabled")
+ " and new protocol = " + protocolName + " and port = " + protocolPort);

// Create admin client
AdminClient client = new AdminClient(cluster,
Expand Down

0 comments on commit daa49bf

Please sign in to comment.