Skip to content
This repository has been archived by the owner on Apr 4, 2021. It is now read-only.

Commit

Permalink
rebased and resolved the conflicts from master
Browse files Browse the repository at this point in the history
  • Loading branch information
sandeepSamudrala committed Aug 5, 2016
2 parents 271318b + 7d9687b commit 1a4dcd2
Show file tree
Hide file tree
Showing 32 changed files with 336 additions and 128 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -136,13 +136,6 @@ private void init(String[] args) throws Exception {
inputOptions = parseOptions(args);
LOG.info("Input Options: {}", inputOptions);

// Update the source staging path
inputOptions.setSourceStagingPath();
inputOptions.setTargetStagingPath();

LOG.info("srcStaginPath: {}", inputOptions.getSourceStagingPath());
LOG.info("tgtStaginPath: {}", inputOptions.getTargetStagingPath());

Configuration sourceConf = FileUtils.getConfiguration(getConf(), inputOptions.getSourceWriteEP(),
inputOptions.getSourceNNKerberosPrincipal());
sourceClusterFS = FileSystem.get(sourceConf);
Expand All @@ -155,6 +148,14 @@ private void init(String[] args) throws Exception {

// init DR status store
drStore = new HiveDRStatusStore(targetClusterFS);

// Update the source staging path after initing DR status store
inputOptions.setSourceStagingPath();
inputOptions.setTargetStagingPath();

LOG.info("srcStaginPath: {}", inputOptions.getSourceStagingPath());
LOG.info("tgtStaginPath: {}", inputOptions.getTargetStagingPath());

eventSoucerUtil = new EventSourcerUtils(jobConf, inputOptions.shouldKeepHistory(), inputOptions.getJobName());
}

Expand Down Expand Up @@ -310,7 +311,7 @@ private String getLastEvents(Configuration conf) throws Exception {
}

private Map<String, Long> getLastDBTableEvents(Path lastEventIdFile) throws Exception {
Map<String, Long> lastEventsIdMap = new HashMap<String, Long>();
Map<String, Long> lastEventsIdMap = new HashMap<>();
BufferedReader in = new BufferedReader(new InputStreamReader(jobFS.open(lastEventIdFile)));
try {
String line;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
*/
public abstract class DRStatusStore {

public static final String BASE_DEFAULT_STORE_PATH = "/apps/data-mirroring/";
public static final String BASE_DEFAULT_STORE_PATH = "/apps/falcon/extensions/mirroring";
public static final FsPermission DEFAULT_STORE_PERMISSION =
new FsPermission(FsAction.ALL, FsAction.ALL, FsAction.NONE);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@
import java.util.Properties;

/**
* Utility class to handle Hive events for data-mirroring.
* Utility class to handle Hive events for hive-mirroring.
*/
public class EventUtils {
private static final String DRIVER_NAME = "org.apache.hive.jdbc.HiveDriver";
Expand Down
2 changes: 1 addition & 1 deletion common/src/main/resources/runtime.properties
Original file line number Diff line number Diff line change
Expand Up @@ -51,4 +51,4 @@

*.falcon.service.ProxyUserService.proxyuser.#USER#.groups=*

######### Proxyuser Configuration End #########
######### Proxyuser Configuration End #########
7 changes: 7 additions & 0 deletions common/src/main/resources/startup.properties
Original file line number Diff line number Diff line change
Expand Up @@ -83,6 +83,7 @@

##### Workflow Job Execution Completion listeners #####
*.workflow.execution.listeners=
#org.apache.falcon.service.LogMoverService

######### Implementation classes #########

Expand Down Expand Up @@ -336,3 +337,9 @@ it.workflow.execution.listeners=org.apache.falcon.catalog.CatalogPartitionHandle
# Backlog Metric Properties
#*.falcon.backlog.metricservice.emit.interval.millisecs=60000
#*.falcon.backlog.metricservice.recheck.interval.millisecs=600000

# Property to remove postProcessing
*.falcon.postprocessing.enable=true

### LogMoveService Thread count
*.falcon.logMoveService.threadCount=50
14 changes: 14 additions & 0 deletions docs/src/site/twiki/Configuration.twiki
Original file line number Diff line number Diff line change
Expand Up @@ -317,6 +317,20 @@ su - $OOZIE_USER
Where $OOZIE_USER is the Oozie user. For example, oozie.
</verbatim>

---+++Disabling Falcon Post Processing
Falcon post processing performs two tasks:
They send user notifications to Active mq.
It moves oozie executor logs once the workflow finishes.

If post processing is failing because of any reason user mind end up having a backlog in the pipeline thats why it has been made optional.

To disable post processing set the following property to false in startup.properties :
<verbatim>
*.falcon.postprocessing.enable=false
*.workflow.execution.listeners=org.apache.falcon.service.LogMoverService
</verbatim>
*NOTE : Please make sure Oozie JMS Notifications are enabled as logMoverService depends on the Oozie JMS Notification.*


---+++Enabling Falcon Native Scheudler
You can either choose to schedule entities using Oozie's coordinator or using Falcon's native scheduler. To be able to
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,16 +151,14 @@ public String getExtensionLibPath(final String extensionName) throws StoreAccess

for (FileStatus fileStatus : files) {
if (fileStatus.getPath().getName().equalsIgnoreCase(LIBS_DIR)) {
if (fileStatus.getLen() != 0) {
libsPath = Path.getPathWithoutSchemeAndAuthority(fileStatus.getPath());
}
break;
}
}

if (libsPath == null) {
LOG.info("For extension " + extensionName + " there is no "
+ LIBS_DIR + "at the extension store path " + storePath);
+ LIBS_DIR + "at the extension store path " + extensionPath);
return null;
} else {
return libsPath.toString();
Expand Down
4 changes: 2 additions & 2 deletions extensions/src/test/resources/hive-mirroring-template.xml
Original file line number Diff line number Diff line change
Expand Up @@ -37,8 +37,8 @@
<property name="oozie.wf.subworkflow.classpath.inheritance" value="true"/>
</properties>

<workflow name="##workflow.name##" engine="oozie"
path="/apps/data-mirroring/workflows/hive-disaster-recovery-workflow.xml" lib="##workflow.lib.path##"/>
<workflow name="##workflow.name##" engine="oozie" path="/apps/falcon/extensions/hive-mirroring/resources/runtime/hive-mirroring-workflow.xml"
lib="##workflow.lib.path##"/>
<retry policy="##retry.policy##" delay="##retry.delay##" attempts="3"/>
<notification type="##notification.type##" to="##notification.receivers##"/>
<ACL/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
<property name="oozie.wf.subworkflow.classpath.inheritance" value="true"/>
</properties>

<workflow name="##falcon.recipe.workflow.name##" engine="oozie" path="/apps/data-mirroring/workflows/hdfs-replication-workflow.xml" lib="##workflow.lib.path##"/>
<workflow name="##falcon.recipe.workflow.name##" engine="oozie" path="/apps/falcon/extensions/hdfs-mirroring/resources/runtime/hdfs-mirroring-workflow.xml" lib="##workflow.lib.path##"/>
<retry policy="##falcon.recipe.retry.policy##" delay="##falcon.recipe.retry.delay##" attempts="3"/>
<notification type="##falcon.recipe.notification.type##" to="##falcon.recipe.notification.receivers##"/>
<ACL/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ falcon.recipe.name=hdfs-replication-monthly
##### Workflow properties
falcon.recipe.workflow.name=hdfs-dr-workflow
# Provide Wf absolute path. This can be HDFS or local FS path. If WF is on local FS it will be copied to HDFS
falcon.recipe.workflow.path=/apps/data-mirroring/workflows/hdfs-replication-workflow.xml
falcon.recipe.workflow.path=/apps/falcon/extensions/hdfs-mirroring/resources/runtime/hdfs-mirroring-workflow.xml

##### Cluster properties

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
</properties>

<workflow name="##workflow.name##" engine="oozie"
path="/apps/data-mirroring/workflows/hive-disaster-recovery-workflow.xml" lib="##workflow.lib.path##"/>
path="/apps/falcon/extensions/hive-mirroring/resources/runtime/hive-mirroring-workflow.xml" lib="##workflow.lib.path##"/>
<retry policy="##retry.policy##" delay="##retry.delay##" attempts="3"/>
<notification type="##falcon.recipe.notification.type##" to="##falcon.recipe.notification.receivers##"/>
<ACL/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
</properties>

<workflow name="##workflow.name##" engine="oozie"
path="/apps/data-mirroring/workflows/hive-disaster-recovery-workflow.xml" lib="##workflow.lib.path##"/>
path="/apps/falcon/extensions/hive-mirroring/resources/runtime/hive-mirroring-workflow.xml" lib="##workflow.lib.path##"/>
<retry policy="##retry.policy##" delay="##retry.delay##" attempts="3"/>
<notification type="##falcon.recipe.notification.type##" to="##falcon.recipe.notification.receivers##"/>
<ACL/>
Expand Down
4 changes: 2 additions & 2 deletions falcon-ui/app/js/services/entity/entity-model.js
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,7 @@
workflow: {
_name: "hdfs-dr-workflow",
_engine: "oozie",
_path: "/apps/data-mirroring/workflows/hdfs-replication-workflow.xml",
_path: "/apps/falcon/extensions/hdfs-mirroring/resources/runtime/hdfs-mirroring-workflow.xml",
_lib: ""
},
retry: {
Expand Down Expand Up @@ -434,7 +434,7 @@
workflow: {
_name: "falcon-dr-hive-workflow",
_engine: "oozie",
_path: "/apps/data-mirroring/workflows/hive-disaster-recovery-workflow.xml",
_path: "/apps/falcon/extensions/hive-mirroring/resources/runtime/hive-mirroring-workflow.xml",
_lib: ""
},
retry: {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,19 +59,27 @@ public static Properties build(Cluster cluster, Path basePath, Feed feed) throws

//Add eviction action
ACTION eviction = OozieBuilderUtils.unmarshalAction(EVICTION_ACTION_TEMPLATE);
OozieBuilderUtils.addTransition(eviction, OozieBuilderUtils.SUCCESS_POSTPROCESS_ACTION_NAME,
OozieBuilderUtils.FAIL_POSTPROCESS_ACTION_NAME);
workflow.getDecisionOrForkOrJoin().add(eviction);

//Add post-processing actions
ACTION success = OozieBuilderUtils.getSuccessPostProcessAction();
OozieBuilderUtils.addTransition(success, OozieBuilderUtils.OK_ACTION_NAME, OozieBuilderUtils.FAIL_ACTION_NAME);
workflow.getDecisionOrForkOrJoin().add(success);

ACTION fail = OozieBuilderUtils.getFailPostProcessAction();
OozieBuilderUtils.addTransition(fail, OozieBuilderUtils.FAIL_ACTION_NAME, OozieBuilderUtils.FAIL_ACTION_NAME);
workflow.getDecisionOrForkOrJoin().add(fail);

if (!Boolean.parseBoolean(OozieBuilderUtils.ENABLE_POSTPROCESSING)){
OozieBuilderUtils.addTransition(eviction, OozieBuilderUtils.OK_ACTION_NAME,
OozieBuilderUtils.FAIL_ACTION_NAME);
workflow.getDecisionOrForkOrJoin().add(eviction);
} else {
OozieBuilderUtils.addTransition(eviction, OozieBuilderUtils.SUCCESS_POSTPROCESS_ACTION_NAME,
OozieBuilderUtils.FAIL_POSTPROCESS_ACTION_NAME);
workflow.getDecisionOrForkOrJoin().add(eviction);

//Add post-processing actions
ACTION success = OozieBuilderUtils.getSuccessPostProcessAction();
OozieBuilderUtils.addTransition(success, OozieBuilderUtils.OK_ACTION_NAME,
OozieBuilderUtils.FAIL_ACTION_NAME);
workflow.getDecisionOrForkOrJoin().add(success);

ACTION fail = OozieBuilderUtils.getFailPostProcessAction();
OozieBuilderUtils.addTransition(fail, OozieBuilderUtils.FAIL_ACTION_NAME,
OozieBuilderUtils.FAIL_ACTION_NAME);
workflow.getDecisionOrForkOrJoin().add(fail);
}
OozieBuilderUtils.decorateWorkflow(workflow, wfName, EVICTION_ACTION_NAME);
OozieBuilderUtils.addLibExtensionsToWorkflow(cluster, workflow, Tag.RETENTION, EntityType.FEED);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,8 @@ public final class OozieBuilderUtils {
public static final String ENTITY_PATH = "ENTITY_PATH";
public static final String ENTITY_NAME = "ENTITY_NAME";
public static final String IGNORE = "IGNORE";
public static final String ENABLE_POSTPROCESSING = StartupProperties.get().
getProperty("falcon.postprocessing.enable");


static {
Expand Down
1 change: 0 additions & 1 deletion oozie/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -105,7 +105,6 @@
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
</dependency>

</dependencies>

<build>
Expand Down
14 changes: 14 additions & 0 deletions oozie/src/main/java/org/apache/falcon/logging/JobLogMover.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.IOUtils;
import org.apache.hadoop.security.UserGroupInformation;
import org.apache.hadoop.util.ReflectionUtils;
import org.apache.oozie.client.OozieClient;
import org.apache.oozie.client.OozieClientException;
Expand Down Expand Up @@ -68,6 +69,19 @@ private Configuration getConf() {
return conf == null ? new Configuration(): conf;
}

public void moveLog(WorkflowExecutionContext context){
if (UserGroupInformation.isSecurityEnabled()) {
LOG.info("Unable to move logs as security is enabled.");
return;
}
try {
run(context);
} catch (Exception ignored) {
// Mask exception, a failed log mover will not fail the user workflow
LOG.error("Exception in job log mover:", ignored);
}
}

public int run(WorkflowExecutionContext context) {
try {
OozieClient client = new OozieClient(context.getWorkflowEngineUrl());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,18 +67,7 @@ protected Properties getWorkflow(Cluster cluster, WORKFLOWAPP workflow, Path bui
ImportExportCommon.addHCatalogProperties(props, entity, cluster, workflow, this, buildPath, sqoopExport);
OozieUtils.marshalSqoopAction(action, actionJaxbElement);

addTransition(action, SUCCESS_POSTPROCESS_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME);
workflow.getDecisionOrForkOrJoin().add(action);

//Add post-processing actions
ACTION success = getSuccessPostProcessAction();
addTransition(success, OK_ACTION_NAME, FAIL_ACTION_NAME);
workflow.getDecisionOrForkOrJoin().add(success);

ACTION fail = getFailPostProcessAction();
addTransition(fail, FAIL_ACTION_NAME, FAIL_ACTION_NAME);
workflow.getDecisionOrForkOrJoin().add(fail);

addPostProcessing(workflow, action);
decorateWorkflow(workflow, workflow.getName(), EXPORT_ACTION_NAME);
addLibExtensionsToWorkflow(cluster, workflow, Tag.EXPORT);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,18 +69,7 @@ protected Properties getWorkflow(Cluster cluster, WORKFLOWAPP workflow, Path bui
ImportExportCommon.addHCatalogProperties(props, entity, cluster, workflow, this, buildPath, sqoopImport);
OozieUtils.marshalSqoopAction(action, actionJaxbElement);

addTransition(action, SUCCESS_POSTPROCESS_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME);
workflow.getDecisionOrForkOrJoin().add(action);

//Add post-processing actions
ACTION success = getSuccessPostProcessAction();
addTransition(success, OK_ACTION_NAME, FAIL_ACTION_NAME);
workflow.getDecisionOrForkOrJoin().add(success);

ACTION fail = getFailPostProcessAction();
addTransition(fail, FAIL_ACTION_NAME, FAIL_ACTION_NAME);
workflow.getDecisionOrForkOrJoin().add(fail);

addPostProcessing(workflow, action);
decorateWorkflow(workflow, workflow.getName(), IMPORT_ACTION_NAME);
addLibExtensionsToWorkflow(cluster, workflow, Tag.IMPORT);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,13 @@
public abstract class OozieOrchestrationWorkflowBuilder<T extends Entity> extends OozieEntityBuilder<T> {
public static final String HIVE_CREDENTIAL_NAME = "falconHiveAuth";

public String getEnablePostProcessing() {
return enablePostprocessing;
}

private String enablePostprocessing = StartupProperties.get().
getProperty("falcon.postprocessing.enable");

protected static final String USER_ACTION_NAME = "user-action";
protected static final String PREPROCESS_ACTION_NAME = "pre-processing";
protected static final String SUCCESS_POSTPROCESS_ACTION_NAME = "succeeded-post-processing";
Expand Down Expand Up @@ -130,6 +137,10 @@ public static OozieOrchestrationWorkflowBuilder get(Entity entity, Cluster clust
return get(entity, cluster, lifecycle, Scheduler.OOZIE);
}

public Boolean isPostProcessingEnabled(){
return Boolean.parseBoolean(getEnablePostProcessing());
}

public static OozieOrchestrationWorkflowBuilder get(Entity entity, Cluster cluster, Tag lifecycle,
Scheduler scheduler)
throws FalconException {
Expand Down Expand Up @@ -219,6 +230,25 @@ protected void decorateWorkflow(WORKFLOWAPP wf, String name, String startAction)
wf.getDecisionOrForkOrJoin().add(kill);
}

protected void addPostProcessing(WORKFLOWAPP workflow, ACTION action) throws FalconException{
if (!isPostProcessingEnabled()){
addTransition(action, OK_ACTION_NAME, FAIL_ACTION_NAME);
workflow.getDecisionOrForkOrJoin().add(action);
}else{
addTransition(action, SUCCESS_POSTPROCESS_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME);
workflow.getDecisionOrForkOrJoin().add(action);

//Add post-processing actions
ACTION success = getSuccessPostProcessAction();
addTransition(success, OK_ACTION_NAME, FAIL_ACTION_NAME);
workflow.getDecisionOrForkOrJoin().add(success);

ACTION fail = getFailPostProcessAction();
addTransition(fail, FAIL_ACTION_NAME, FAIL_ACTION_NAME);
workflow.getDecisionOrForkOrJoin().add(fail);
}
}

protected ACTION getSuccessPostProcessAction() throws FalconException {
ACTION action = unmarshalAction(POSTPROCESS_TEMPLATE);
decorateWithOozieRetries(action);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,20 +59,7 @@ public FSReplicationWorkflowBuilder(Feed entity) {
addAdditionalReplicationProperties(replication);
enableCounters(replication);
enableTDE(replication);
addTransition(replication, SUCCESS_POSTPROCESS_ACTION_NAME, FAIL_POSTPROCESS_ACTION_NAME);
workflow.getDecisionOrForkOrJoin().add(replication);

//Add post-processing actions
ACTION success = getSuccessPostProcessAction();
addHDFSServersConfig(success, src, target);
addTransition(success, OK_ACTION_NAME, FAIL_ACTION_NAME);
workflow.getDecisionOrForkOrJoin().add(success);

ACTION fail = getFailPostProcessAction();
addHDFSServersConfig(fail, src, target);
addTransition(fail, FAIL_ACTION_NAME, FAIL_ACTION_NAME);
workflow.getDecisionOrForkOrJoin().add(fail);

addPostProcessing(workflow, replication);
decorateWorkflow(workflow, wfName, start);
return workflow;
}
Expand Down
Loading

0 comments on commit 1a4dcd2

Please sign in to comment.