Skip to content
This repository has been archived by the owner. It is now read-only.
Permalink
Browse files
FALCON-2324 Retry, Latererun and JobLog mover services not working wi…
…th kerberos enabled

Author: sandeep <sandysmdl@gmail.com>
Author: sandeep.samudrala <sandeep.samudrala@inmobi.com>
Author: Sandeep Samudrala <sandysmdl@gmail.com>

Reviewers: @pallavi-rao

Closes #399 from sandeepSamudrala/master and squashes the following commits:

974dbbe [Sandeep Samudrala] FALCON-2324 Retry, Latererun and JobLog mover services not working with kerberos enabled
678df54 [Sandeep Samudrala] Merge branch 'master' of https://github.com/apache/falcon
9c907ef [sandeep.samudrala] FALCON-2319. Falcon Build failure fix for enunciate
b5fb578 [sandeep.samudrala] git applyMerge branch 'master' of https://github.com/apache/falcon
575e768 [sandeep.samudrala] Merge branch 'master' of https://github.com/apache/falcon
e0ad358 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
f96a084 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
9cf36e9 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
bbca081 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
48f6afa [sandeep] Merge branch 'master' of https://github.com/apache/falcon
250cc46 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
d0393e9 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
a178805 [sandeep] Merge branch 'master' of https://github.com/apache/falcon
d6dc8bf [sandeep] Merge branch 'master' of https://github.com/apache/falcon
1bb8d3c [sandeep] Merge branch 'master' of https://github.com/apache/falcon
c065566 [sandeep] reverting last line changes made
1a4dcd2 [sandeep] rebased and resolved the conflicts from master
271318b [sandeep] FALCON-2097. Adding UT to the new method for getting next instance time with Delay.
a94d4fe [sandeep] rebasing from master
9e68a57 [sandeep] FALCON-298. Feed update with replication delay creates holes
  • Loading branch information
sandeepSamudrala authored and pallavi-rao committed Feb 26, 2018
1 parent dc47088 commit d7a4eeb4ea19f731a984f40a80cb081995e57521
Showing 10 changed files with 43 additions and 19 deletions.
@@ -72,10 +72,8 @@ public static void authenticate(final String user) {
*
* @param doAsUser doAs user
* @param proxyHost proxy host
* @throws IOException
*/
public static void proxyDoAsUser(final String doAsUser,
final String proxyHost) throws IOException {
public static void proxyDoAsUser(final String doAsUser, final String proxyHost) {
if (!isAuthenticated()) {
throw new IllegalStateException("Authentication not done");
}
@@ -106,10 +104,8 @@ public static void proxyDoAsUser(final String doAsUser,
*
* @param aclOwner entity acl owner
* @param aclGroup entity acl group
* @throws IOException
*/
public static void proxy(final String aclOwner,
final String aclGroup) throws IOException {
public static void proxy(final String aclOwner, final String aclGroup) {
if (!isAuthenticated() || StringUtils.isEmpty(aclOwner)) {
throw new IllegalStateException("Authentication not done or Bad user name");
}
@@ -23,13 +23,14 @@
import org.apache.falcon.entity.v0.process.EngineType;
import org.apache.falcon.hadoop.HadoopClientFactory;
import org.apache.falcon.security.CurrentUser;
import org.apache.falcon.security.HostnameFilter;
import org.apache.falcon.workflow.WorkflowExecutionContext;
import org.apache.falcon.workflow.engine.OozieClientFactory;
import org.apache.falcon.workflow.util.OozieActionConfigurationHelper;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.client.OozieClientException;
@@ -72,10 +73,6 @@ private Configuration getConf() {
}

public void moveLog(WorkflowExecutionContext context){
if (UserGroupInformation.isSecurityEnabled()) {
LOG.info("Unable to move logs as security is enabled.");
return;
}
try {
run(context);
} catch (Exception ignored) {
@@ -95,10 +92,11 @@ public int run(WorkflowExecutionContext context) {
String instanceOwner = context.getWorkflowUser();
if (StringUtils.isNotBlank(instanceOwner)) {
CurrentUser.authenticate(instanceOwner);
CurrentUser.proxyDoAsUser(instanceOwner, HostnameFilter.get());
} else {
CurrentUser.authenticate(System.getProperty("user.name"));
}
OozieClient client = new OozieClient(engineUrl);
OozieClient client = OozieClientFactory.getClientRef(engineUrl);
WorkflowJob jobInfo;
try {
jobInfo = client.getJobInfo(context.getWorkflowId());
@@ -55,7 +55,7 @@ public static OozieClient get(String clusterName) throws FalconException {
return get((Cluster) ConfigurationStore.get().get(EntityType.CLUSTER, clusterName));
}

private static OozieClient getClientRef(String oozieUrl)
public static OozieClient getClientRef(String oozieUrl)
throws FalconException {

if (OozieConstants.LOCAL_OOZIE.equals(oozieUrl)) {
@@ -488,6 +488,26 @@ public List<CoordinatorAction> call() throws Exception {
}
}

@Override
public List<CoordinatorAction> reRunCoord(final String jobId, final String rerunType, final String scope,
final boolean refresh, final boolean noCleanup,
final boolean failed, final Properties props)
throws OozieClientException {
try {
return doAs(CurrentUser.getUser(), new Callable<List<CoordinatorAction>>() {

public List<CoordinatorAction> call() throws Exception {
return ProxyOozieClient.super.reRunCoord(jobId, rerunType, scope, refresh, noCleanup, failed,
props);
}
});
} catch (OozieClientException e) {
throw e;
} catch (Exception e) {
throw new OozieClientException(e.toString(), e);
}
}

@Override
public Void reRunBundle(final String jobId, final String coordScope, final String dateScope,
final boolean refresh, final boolean noCleanup)
@@ -77,7 +77,7 @@ public void run() {
// Login the user to access WfEngine as this user
CurrentUser.authenticate(message.getWorkflowUser());
AbstractWorkflowEngine wfEngine = handler.getWfEngine(message.getEntityType(),
message.getEntityName());
message.getEntityName(), message.getWorkflowUser());
String jobStatus = wfEngine.getWorkflowStatus(
message.getClusterName(), message.getWfId());
handleRerun(message.getClusterName(), jobStatus, message,
@@ -25,6 +25,8 @@
import org.apache.falcon.entity.v0.process.Retry;
import org.apache.falcon.rerun.event.RerunEvent;
import org.apache.falcon.rerun.queue.DelayedQueue;
import org.apache.falcon.security.CurrentUser;
import org.apache.falcon.security.HostnameFilter;
import org.apache.falcon.workflow.WorkflowEngineFactory;
import org.apache.falcon.workflow.WorkflowExecutionListener;
import org.apache.falcon.workflow.engine.AbstractWorkflowEngine;
@@ -60,7 +62,12 @@ public abstract void handleRerun(String clusterName, String entityType,
String wfId, String parentId, String workflowUser, long msgReceivedTime);
//RESUME CHECKSTYLE CHECK ParameterNumberCheck

public AbstractWorkflowEngine getWfEngine(String entityType, String entityName) throws FalconException {
public AbstractWorkflowEngine getWfEngine(String entityType, String entityName, String doAsUser)
throws FalconException {
if (StringUtils.isNotBlank(doAsUser)) {
CurrentUser.authenticate(doAsUser);
CurrentUser.proxyDoAsUser(doAsUser, HostnameFilter.get());
}
if (StringUtils.isBlank(entityType) || StringUtils.isBlank(entityName)) {
return wfEngine;
}
@@ -85,7 +85,8 @@ protected void handleRerun(String clusterName, String jobStatus,
if (StringUtils.isBlank(id)) {
id = message.getWfId();
}
handler.getWfEngine(entityType, entityName).reRun(message.getClusterName(), id, null, true);
handler.getWfEngine(entityType, entityName, message.getWorkflowUser())
.reRun(message.getClusterName(), id, null, true);
LOG.info("Scheduled late rerun for wf-id: {} on cluster: {}",
message.getWfId(), message.getClusterName());
} catch (Exception e) {
@@ -106,7 +107,7 @@ protected void handleRerun(String clusterName, String jobStatus,
public String detectLate(LaterunEvent message) throws Exception {
LateDataHandler late = new LateDataHandler();
AbstractWorkflowEngine wfEngine = handler.getWfEngine(message.getEntityType(),
message.getEntityName());
message.getEntityName(), message.getWorkflowUser());
Properties properties = wfEngine.getWorkflowProperties(message.getClusterName(), message.getWfId());
String falconInputs = properties.getProperty(WorkflowExecutionArgs.INPUT_NAMES.getName());
String falconInPaths = properties.getProperty(WorkflowExecutionArgs.INPUT_FEED_PATHS.getName());
@@ -66,7 +66,7 @@ public void handleRerun(String cluster, String entityType, String entityName, St
Long wait = getEventDelay(entity, nominalTime);
if (wait == -1) {
LOG.info("Late rerun expired for entity: {} ({})", entityType, entityName);
AbstractWorkflowEngine wfEngine = this.getWfEngine(entityType, entityName);
AbstractWorkflowEngine wfEngine = this.getWfEngine(entityType, entityName, entity.getACL().getOwner());
java.util.Properties properties = wfEngine.getWorkflowProperties(cluster, wfId);
String logDir = properties.getProperty("logDir");
String srcClusterName = properties.getProperty("srcClusterName");
@@ -63,7 +63,9 @@ protected void handleRerun(String clusterName, String jobStatus,
if (!id.contains("-C@") && StringUtils.isNotBlank(message.getParentId())) {
id = message.getParentId();
}
handler.getWfEngine(entityType, entityName).reRun(message.getClusterName(), id, null, false);

handler.getWfEngine(entityType, entityName, message.getWorkflowUser())
.reRun(message.getClusterName(), id, null, false);
} catch (Exception e) {
if (e instanceof EntityNotRegisteredException) {
LOG.warn("Entity {} of type {} doesn't exist in config store. So retry "

0 comments on commit d7a4eeb

Please sign in to comment.