Skip to content
This repository has been archived by the owner. It is now read-only.
Permalink
Browse files
FALCON-2084 HCatReplicationTest are failing in secure mode
Removed the dependency on job-xml whose priority has changed because of some backward incompatible changes in Oozie - this patch will work with 4.2.0 and upcoming 4.3.0

Author: Venkat Ranganathan <venkat@hortonworks.com>

Reviewers: "Balu Vellanki <balu@apache.org>, Ying Zheng <yzheng@hortonworks.com>"

Closes #231 from vrangan/FALCON-2084

(cherry picked from commit ec3e49e)
Signed-off-by: bvellanki <bvellanki@hortonworks.com>
  • Loading branch information
Venkat Ranganathan authored and bvellanki committed Jul 20, 2016
1 parent ef00c3e commit 003adddc10b7d2ec13876ee3a7668346b7589478
Show file tree
Hide file tree
Showing 6 changed files with 34 additions and 22 deletions.
@@ -275,11 +275,6 @@ private void setupHiveConfiguration(Cluster srcCluster, Cluster trgCluster,
Path scriptPath = new Path(buildPath, "scripts");
copyHiveScript(fs, scriptPath, IMPORT_HQL);
copyHiveScript(fs, scriptPath, EXPORT_HQL);

// create hive conf to stagingDir
Path confPath = new Path(buildPath + "/conf");
persistHiveConfiguration(fs, confPath, srcCluster, "falcon-source-");
persistHiveConfiguration(fs, confPath, trgCluster, "falcon-target-");
} catch (IOException e) {
throw new FalconException("Unable to create hive conf files", e);
}
@@ -24,7 +24,6 @@
import org.apache.falcon.entity.ClusterHelper;
import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.FeedHelper;
import org.apache.falcon.entity.HiveUtil;
import org.apache.falcon.entity.store.ConfigurationStore;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.cluster.Cluster;
@@ -79,11 +78,7 @@ public boolean isCounterEnabled() throws FalconException {
marshal(cluster, workflow, buildPath);
Properties props = getProperties(buildPath, wfName);
props.putAll(createDefaultConfiguration(cluster));
if (EntityUtil.isTableStorageType(cluster, entity)) {
// todo: kludge send source hcat creds for coord dependency check to pass
props.putAll(HiveUtil.getHiveCredentials(srcCluster));
props.putAll(HiveUtil.getHiveCredentials(cluster));
}

props.putAll(getWorkflowProperties(entity));
props.putAll(FeedHelper.getUserWorkflowProperties(getLifecycle()));
// Write out the config to config-default.xml
@@ -22,15 +22,18 @@
import org.apache.falcon.Tag;
import org.apache.falcon.entity.ClusterHelper;
import org.apache.falcon.entity.EntityUtil;
import org.apache.falcon.entity.HiveUtil;
import org.apache.falcon.entity.v0.cluster.Cluster;
import org.apache.falcon.entity.v0.feed.Feed;
import org.apache.falcon.oozie.workflow.ACTION;
import org.apache.falcon.oozie.workflow.WORKFLOWAPP;
import org.apache.falcon.oozie.workflow.CONFIGURATION;
import org.apache.falcon.util.OozieUtils;
import org.apache.falcon.workflow.WorkflowExecutionArgs;

import javax.xml.bind.JAXBElement;
import java.util.Arrays;
import java.util.Map;
import java.util.Properties;

/**
@@ -60,6 +63,15 @@ public HCatReplicationWorkflowBuilder(Feed entity) {
//Add pre-processing
if (shouldPreProcess()) {
ACTION action = getPreProcessingAction(false, Tag.REPLICATION);
Properties hiveConf = HiveUtil.getHiveCredentials(src);
for (Map.Entry<Object, Object> e : hiveConf.entrySet()) {
CONFIGURATION.Property prop = new CONFIGURATION.Property();
prop.setName((String) e.getKey());
prop.setValue((String) e.getValue());
LOG.info("Adding config to replication hive preprocessing action : key = {} value = {}",
e.getKey(), e.getValue());
action.getJava().getConfiguration().getProperty().add(prop);
}
addHDFSServersConfig(action, src, target);
addTransition(action, EXPORT_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME);
workflow.getDecisionOrForkOrJoin().add(action);
@@ -72,6 +84,16 @@ public HCatReplicationWorkflowBuilder(Feed entity) {
OozieUtils.unMarshalHiveAction(export);
org.apache.falcon.oozie.hive.ACTION hiveExportAction = exportActionJaxbElement.getValue();
addHDFSServersConfig(hiveExportAction, src, target);
Properties hiveConf = HiveUtil.getHiveCredentials(src);
for (Map.Entry<Object, Object> e : hiveConf.entrySet()) {
org.apache.falcon.oozie.hive.CONFIGURATION.Property prop =
new org.apache.falcon.oozie.hive.CONFIGURATION.Property();
prop.setName((String) e.getKey());
prop.setValue((String) e.getValue());
LOG.info("Adding config to replication hive export action : key = {} value = {}",
e.getKey(), e.getValue());
hiveExportAction.getConfiguration().getProperty().add(prop);
}
OozieUtils.marshalHiveAction(export, exportActionJaxbElement);
addTransition(export, REPLICATION_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME);
workflow.getDecisionOrForkOrJoin().add(export);
@@ -89,6 +111,16 @@ public HCatReplicationWorkflowBuilder(Feed entity) {
OozieUtils.unMarshalHiveAction(importAction);
org.apache.falcon.oozie.hive.ACTION hiveImportAction = importActionJaxbElement.getValue();
addHDFSServersConfig(hiveImportAction, src, target);
Properties hiveConf2 = HiveUtil.getHiveCredentials(target);
for (Map.Entry<Object, Object> e : hiveConf2.entrySet()) {
org.apache.falcon.oozie.hive.CONFIGURATION.Property prop =
new org.apache.falcon.oozie.hive.CONFIGURATION.Property();
prop.setName((String) e.getKey());
prop.setValue((String) e.getValue());
LOG.info("Adding config to replication hive import action : key = {} value = {}",
e.getKey(), e.getValue());
hiveImportAction.getConfiguration().getProperty().add(prop);
}
OozieUtils.marshalHiveAction(importAction, importActionJaxbElement);
addTransition(importAction, CLEANUP_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME);
workflow.getDecisionOrForkOrJoin().add(importAction);
@@ -133,8 +165,8 @@ private void setupHiveCredentials(Cluster sourceCluster, Cluster targetCluster,
(org.apache.falcon.oozie.workflow.ACTION) object;
String actionName = action.getName();
if (PREPROCESS_ACTION_NAME.equals(actionName)) {

// add reference to hive-site conf to each action
action.getJava().setJobXml("${wf:appPath()}/conf/falcon-source-hive-site.xml");

if (isSecurityEnabled) { // add a reference to credential in the action
action.setCred(SOURCE_HIVE_CREDENTIAL_NAME);
@@ -26,7 +26,6 @@
<prepare>
<delete path="${distcpSourcePaths}"/>
</prepare>
<job-xml>${wf:appPath()}/conf/falcon-source-hive-site.xml</job-xml>
<configuration>
<property>
<name>mapred.job.queue.name</name>
@@ -20,7 +20,6 @@
<hive xmlns="uri:oozie:hive-action:0.2">
<job-tracker>${falconTargetJobTracker}</job-tracker>
<name-node>${falconTargetNameNode}</name-node>
<job-xml>${wf:appPath()}/conf/falcon-target-hive-site.xml</job-xml>
<configuration>
<property>
<name>mapred.job.queue.name</name>
@@ -561,9 +561,6 @@ public void testReplicationCoordsForTableStorage(String secureOption) throws Exc
Assert.assertTrue(fs.exists(new Path(wfPath + "/scripts/falcon-table-export.hql")));
Assert.assertTrue(fs.exists(new Path(wfPath + "/scripts/falcon-table-import.hql")));

Assert.assertTrue(fs.exists(new Path(wfPath + "/conf")));
Assert.assertTrue(fs.exists(new Path(wfPath + "/conf/falcon-source-hive-site.xml")));
Assert.assertTrue(fs.exists(new Path(wfPath + "/conf/falcon-target-hive-site.xml")));

HashMap<String, String> props = getCoordProperties(coord);

@@ -626,11 +623,7 @@ public void testReplicationCoordsForTableStorage(String secureOption) throws Exc
private void assertReplicationHCatCredentials(WORKFLOWAPP wf, String wfPath) throws IOException {
FileSystem fs = trgMiniDFS.getFileSystem();

Path hiveConfPath = new Path(wfPath, "conf/falcon-source-hive-site.xml");
Assert.assertTrue(fs.exists(hiveConfPath));

hiveConfPath = new Path(wfPath, "conf/falcon-target-hive-site.xml");
Assert.assertTrue(fs.exists(hiveConfPath));

boolean isSecurityEnabled = SecurityUtil.isSecurityEnabled();
if (isSecurityEnabled) {
@@ -651,7 +644,6 @@ private void assertReplicationHCatCredentials(WORKFLOWAPP wf, String wfPath) thr
}

if ("recordsize".equals(actionName)) {
Assert.assertEquals(action.getJava().getJobXml(), "${wf:appPath()}/conf/falcon-source-hive-site.xml");
if (isSecurityEnabled) {
Assert.assertNotNull(action.getCred());
Assert.assertEquals(action.getCred(), "falconSourceHiveAuth");

0 comments on commit 003addd

Please sign in to comment.