Skip to content
This repository has been archived by the owner. It is now read-only.
Permalink
Browse files
FALCON-2090 HDFS Snapshot failed with UnknownHostException in HA mode
…uling in HA Mode

Author: bvellanki <bvellanki@hortonworks.com>

Reviewers: "Peeyush B <pbishnoi@hortonworks.com>"

Closes #237 from bvellanki/FALCON-2090

(cherry picked from commit 3ed804e)
Signed-off-by: bvellanki <bvellanki@hortonworks.com>
  • Loading branch information
bvellanki committed Jul 21, 2016
1 parent 3dc75dc commit 91dc204b60c0403d16899a44d43ce3e22e9c9960
Showing 5 changed files with 28 additions and 18 deletions.
@@ -73,8 +73,12 @@ public int run(String[] args) throws FalconException {
String sourceStorageUrl = cmd.getOptionValue(HdfsSnapshotMirrorProperties.SOURCE_NN.getName());
String targetStorageUrl = cmd.getOptionValue(HdfsSnapshotMirrorProperties.TARGET_NN.getName());

DistributedFileSystem sourceFs = HdfsSnapshotUtil.getSourceFileSystem(cmd);
DistributedFileSystem targetFs = HdfsSnapshotUtil.getTargetFileSystem(cmd);
// Always add to getConf() so that configuration set by oozie action is
// available when creating DistributedFileSystem.
DistributedFileSystem sourceFs = HdfsSnapshotUtil.getSourceFileSystem(cmd,
new Configuration(getConf()));
DistributedFileSystem targetFs = HdfsSnapshotUtil.getTargetFileSystem(cmd,
new Configuration(getConf()));

String currentSnapshotName = HdfsSnapshotUtil.SNAPSHOT_PREFIX
+ cmd.getOptionValue(HdfsSnapshotMirrorProperties.SNAPSHOT_JOB_NAME.getName())
@@ -60,8 +60,10 @@ public static void main(String[] args) throws Exception {
@Override
public int run(String[] args) throws Exception {
CommandLine cmd = getCommand(args);
DistributedFileSystem sourceFs = HdfsSnapshotUtil.getSourceFileSystem(cmd);
DistributedFileSystem targetFs = HdfsSnapshotUtil.getTargetFileSystem(cmd);
DistributedFileSystem sourceFs = HdfsSnapshotUtil.getSourceFileSystem(cmd,
new Configuration(getConf()));
DistributedFileSystem targetFs = HdfsSnapshotUtil.getTargetFileSystem(cmd,
new Configuration(getConf()));

String sourceDir = cmd.getOptionValue(HdfsSnapshotMirrorProperties.SOURCE_SNAPSHOT_DIR.getName());
String targetDir = cmd.getOptionValue(HdfsSnapshotMirrorProperties.TARGET_SNAPSHOT_DIR.getName());
@@ -19,6 +19,7 @@
package org.apache.falcon.snapshots.util;

import org.apache.commons.cli.CommandLine;
import org.apache.commons.lang3.StringUtils;
import org.apache.falcon.FalconException;
import org.apache.falcon.entity.ClusterHelper;
import org.apache.falcon.extensions.mirroring.hdfsSnapshot.HdfsSnapshotMirrorProperties;
@@ -37,29 +38,33 @@ public final class HdfsSnapshotUtil {

private HdfsSnapshotUtil() {}

public static DistributedFileSystem getSourceFileSystem(CommandLine cmd) throws FalconException {
public static DistributedFileSystem getSourceFileSystem(CommandLine cmd,
Configuration conf) throws FalconException {
String sourceStorageUrl = cmd.getOptionValue(HdfsSnapshotMirrorProperties.SOURCE_NN.getName());
String sourceExecuteEndpoint = cmd.getOptionValue(HdfsSnapshotMirrorProperties.SOURCE_EXEC_URL.getName());
String sourcePrincipal = parseKerberosPrincipal(cmd.getOptionValue(
HdfsSnapshotMirrorProperties.SOURCE_NN_KERBEROS_PRINCIPAL.getName()));
Configuration sourceConf = ClusterHelper.getConfiguration(sourceStorageUrl,

Configuration sourceConf = ClusterHelper.getConfiguration(conf, sourceStorageUrl,
sourceExecuteEndpoint, sourcePrincipal);
return HadoopClientFactory.get().createDistributedProxiedFileSystem(sourceConf);
}

public static DistributedFileSystem getTargetFileSystem(CommandLine cmd) throws FalconException {
public static DistributedFileSystem getTargetFileSystem(CommandLine cmd,
Configuration conf) throws FalconException {
String targetStorageUrl = cmd.getOptionValue(HdfsSnapshotMirrorProperties.TARGET_NN.getName());
String taregtExecuteEndpoint = cmd.getOptionValue(HdfsSnapshotMirrorProperties.TARGET_EXEC_URL.getName());
String targetPrincipal = parseKerberosPrincipal(cmd.getOptionValue(
HdfsSnapshotMirrorProperties.TARGET_NN_KERBEROS_PRINCIPAL.getName()));

Configuration targetConf = ClusterHelper.getConfiguration(targetStorageUrl,
Configuration targetConf = ClusterHelper.getConfiguration(conf, targetStorageUrl,
taregtExecuteEndpoint, targetPrincipal);
return HadoopClientFactory.get().createDistributedProxiedFileSystem(targetConf);
}

public static String parseKerberosPrincipal(String principal) {
if (principal.equals(HdfsSnapshotMirroringExtension.EMPTY_KERBEROS_PRINCIPAL)) {
if (StringUtils.isEmpty(principal)
|| principal.equals(HdfsSnapshotMirroringExtension.EMPTY_KERBEROS_PRINCIPAL)) {
return null;
}
return principal;
@@ -25,7 +25,7 @@
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.falcon.extensions.mirroring.hdfsSnapshot.HdfsSnapshotMirrorProperties;
import org.apache.falcon.hadoop.HadoopClientFactory;
import org.apache.falcon.snapshots.util.HdfsSnapshotUtil;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.permission.FsAction;
@@ -72,6 +72,7 @@ public class HdfsSnapshotReplicatorTest extends HdfsSnapshotReplicator {

@BeforeClass
public void init() throws Exception {
this.setConf(new Configuration());
baseDir = Files.createTempDirectory("test_snapshot-replication").toFile().getAbsoluteFile();
miniDFSCluster = MiniHdfsClusterUtil.initMiniDfs(MiniHdfsClusterUtil.SNAPSHOT_REPL_TEST_PORT, baseDir);
miniDfs = miniDFSCluster.getFileSystem();
@@ -100,14 +101,13 @@ private Cluster initCluster(String clusterName) throws Exception {

@Test
public void replicationTest() throws Exception {
Configuration sourceConf = ClusterHelper.getConfiguration(sourceCluster);
this.setConf(sourceConf);
Configuration targetConf = ClusterHelper.getConfiguration(targetCluster);
sourceStorageUrl = ClusterHelper.getStorageUrl(sourceCluster);
targetStorageUrl = ClusterHelper.getStorageUrl(targetCluster);

DistributedFileSystem sourceFs = HadoopClientFactory.get().createDistributedProxiedFileSystem(sourceConf);
DistributedFileSystem targetFs = HadoopClientFactory.get().createDistributedProxiedFileSystem(targetConf);
DistributedFileSystem sourceFs = HdfsSnapshotUtil.getSourceFileSystem(cmd,
new Configuration(getConf()));
DistributedFileSystem targetFs = HdfsSnapshotUtil.getTargetFileSystem(cmd,
new Configuration(getConf()));

// create dir1, create snapshot, invoke copy, check file in target, create snapshot on target
Path dir1 = new Path(sourceDir, "dir1");
@@ -75,9 +75,8 @@ public static Configuration getConfiguration(Cluster cluster) {
return conf;
}

public static Configuration getConfiguration(String storageUrl, String executeEndPoint,
String kerberosPrincipal) {
Configuration conf = new Configuration();
public static Configuration getConfiguration(Configuration conf, String storageUrl,
String executeEndPoint, String kerberosPrincipal) {
conf.set(HadoopClientFactory.FS_DEFAULT_NAME_KEY, storageUrl);
conf.set(HadoopClientFactory.MR_JT_ADDRESS_KEY, executeEndPoint);
conf.set(HadoopClientFactory.YARN_RM_ADDRESS_KEY, executeEndPoint);

0 comments on commit 91dc204

Please sign in to comment.