Skip to content
This repository has been archived by the owner. It is now read-only.
Permalink
Browse files
FALCON-30 Enable embedding pig scripts directly in a process. Contrib…
…uted by Venkatesh Seetharam
  • Loading branch information
sriksun committed Jul 20, 2013
1 parent 689d52a commit 09971e66be57d6c0884415240feed85cb2a444a6
Show file tree
Hide file tree
Showing 20 changed files with 1,392 additions and 170 deletions.
@@ -8,6 +8,9 @@ Trunk (Unreleased)

IMPROVEMENTS

FALCON-30 Enable embedding pig scripts directly in a process. (Venkatesh
Seetharam via Srikanth Sundarrajan)

FALCON-47 Falcon Replication should support configurable delays in feed,
parallel, timeout and bulk transfer with variable frequency (Shaik Idris
Ali via Srikanth Sundarrajan)
@@ -283,14 +283,15 @@
</xs:complexType>

<xs:complexType name="workflow">
<xs:attribute type="engine-type" name="engine" use="optional"/>
<xs:attribute type="engine-type" name="engine" use="optional" default="oozie"/>
<xs:attribute type="xs:string" name="path" use="required"/>
<xs:attribute type="xs:string" name="lib" use="optional"/>
</xs:complexType>

<xs:simpleType name="engine-type">
<xs:restriction base="xs:string">
<xs:enumeration value="oozie"/>
<xs:enumeration value="pig"/>
</xs:restriction>
</xs:simpleType>

@@ -394,12 +394,25 @@ queueName and jobPriority are special properites, which when present are used by
<property name="queueName" value="hadoopQueue"/>
<property name="jobPriority" value="VERY_HIGH"/>
</verbatim>

---++++ Workflow
The workflow defines the workflow engine that should be used and the path to the workflow on hdfs. The workflow definition on hdfs contains the actual job that should run and it should confirm to the workflow specification of the engine specified. The libraries required by the workflow should be in lib folder inside the workflow path.

The properties defined in the cluster and cluster properties(nameNode and jobTracker) will also be available for the workflow.
The workflow defines the workflow engine that should be used and the path to the workflow on hdfs.
The workflow definition on hdfs contains the actual job that should run and it should confirm to
the workflow specification of the engine specified. The libraries required by the workflow should
be in lib folder inside the workflow path.

The properties defined in the cluster and cluster properties(nameNode and jobTracker) will also
be available for the workflow.

There are 2 engines supported today.

---+++++ Oozie

As part of oozie workflow engine support, users can embed a oozie workflow.
Refer to oozie [[http://incubator.apache.org/oozie/overview.html][workflow overview]] and
[[http://incubator.apache.org/oozie/docs/3.1.3/docs/WorkflowFunctionalSpec.html][workflow specification]] for details.

As of now, only oozie workflow engine is supported. Refer to oozie [[http://incubator.apache.org/oozie/overview.html][workflow overview]] and [[http://incubator.apache.org/oozie/docs/3.1.3/docs/WorkflowFunctionalSpec.html][workflow specification]] for details.
Syntax:
<verbatim>
<process name="[process name]">
@@ -417,7 +430,25 @@ Example:
...
</process>
</verbatim>
This defines the workflow engine to be oozie and the workflow xml is defined at /projects/bootcamp/workflow/workflow.xml. The libraries are at /projects/bootcamp/workflow/lib.

This defines the workflow engine to be oozie and the workflow xml is defined at
/projects/bootcamp/workflow/workflow.xml. The libraries are at /projects/bootcamp/workflow/lib.

---+++++ Pig

Falcon also adds the Pig engine which enables users to embed a Pig script as a process.

Example:
<verbatim>
<process name="sample-process">
...
<workflow engine="pig" path="/projects/bootcamp/pig.script"/>
...
</process>
</verbatim>

This defines the workflow engine to be pig and the pig script is defined at
/projects/bootcamp/pig.script.

---++++ Retry
Retry policy defines how the workflow failures should be handled. Two retry policies are defined: backoff and exp-backoff(exponential backoff). Depending on the delay and number of attempts, the workflow is re-tried after specific intervals.
@@ -23,6 +23,7 @@
import org.apache.commons.cli.Options;
import org.apache.commons.cli.ParseException;
import org.apache.falcon.entity.v0.EntityType;
import org.apache.falcon.entity.v0.process.EngineType;
import org.apache.hadoop.conf.Configured;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
@@ -64,6 +65,7 @@ private static class ARGS {
private String runId;
private String logDir;
private String entityType;
private String userWorkflowEngine;
}

public static void main(String[] args) throws Exception {
@@ -75,6 +77,7 @@ public int run(String[] arguments) throws Exception {
try {
ARGS args = new ARGS();
setupArgs(arguments, args);

OozieClient client = new OozieClient(args.oozieUrl);
WorkflowJob jobInfo;
try {
@@ -83,17 +86,18 @@ public int run(String[] arguments) throws Exception {
LOG.error("Error getting jobinfo for: " + args.subflowId, e);
return 0;
}

Path path = new Path(args.logDir + "/"
+ String.format("%03d", Integer.parseInt(args.runId)));

FileSystem fs = path.getFileSystem(getConf());

if (args.entityType.equalsIgnoreCase(EntityType.FEED.name())) {
// if replication wf
if (args.entityType.equalsIgnoreCase(EntityType.FEED.name())
|| notUserWorkflowEngineIsOozie(args.userWorkflowEngine)) {
// if replication wf or PIG Process
copyOozieLog(client, fs, path, jobInfo.getId());
copyTTlogs(fs, path, jobInfo.getActions().get(2));
} else {
// if process wf
// if process wf with oozie engine
String subflowId = jobInfo.getExternalId();
copyOozieLog(client, fs, path, subflowId);
WorkflowJob subflowInfo = client.getJobInfo(subflowId);
@@ -107,7 +111,6 @@ public int run(String[] arguments) throws Exception {
+ action.getName());
}
}

}

} catch (Exception e) {
@@ -116,6 +119,11 @@ public int run(String[] arguments) throws Exception {
return 0;
}

private boolean notUserWorkflowEngineIsOozie(String userWorkflowEngine) {
// userWorkflowEngine will be null for replication and "pig" for pig
return userWorkflowEngine != null && EngineType.fromValue(userWorkflowEngine) != EngineType.OOZIE;
}

private void copyOozieLog(OozieClient client, FileSystem fs, Path path,
String id) throws OozieClientException, IOException {
InputStream in = new ByteArrayInputStream(client.getJobLog(id).getBytes());
@@ -150,23 +158,31 @@ private String getMappedStatus(WorkflowAction.Status status) {

private void setupArgs(String[] arguments, ARGS args) throws ParseException {
Options options = new Options();
Option opt;
opt = new Option("workflowEngineUrl", true,
"url of workflow engine, ex:oozie");

Option opt = new Option("workflowEngineUrl", true, "url of workflow engine, ex:oozie");
opt.setRequired(true);
options.addOption(opt);

opt = new Option("subflowId", true, "external id of userworkflow");
opt.setRequired(true);
options.addOption(opt);

opt = new Option("userWorkflowEngine", true, "user workflow engine type");
opt.setRequired(false); // replication will NOT have this arg sent
options.addOption(opt);

opt = new Option("runId", true, "current workflow's runid");
opt.setRequired(true);
options.addOption(opt);

opt = new Option("logDir", true, "log dir where job logs are stored");
opt.setRequired(true);
options.addOption(opt);

opt = new Option("status", true, "user workflow status");
opt.setRequired(true);
options.addOption(opt);

opt = new Option("entityType", true, "entity type feed or process");
opt.setRequired(true);
options.addOption(opt);
@@ -175,6 +191,7 @@ private void setupArgs(String[] arguments, ARGS args) throws ParseException {

args.oozieUrl = cmd.getOptionValue("workflowEngineUrl");
args.subflowId = cmd.getOptionValue("subflowId");
args.userWorkflowEngine = cmd.getOptionValue("userWorkflowEngine");
args.runId = cmd.getOptionValue("runId");
args.logDir = cmd.getOptionValue("logDir");
args.entityType = cmd.getOptionValue("entityType");
@@ -59,6 +59,7 @@ public enum Arg {
LOG_FILE("logFile", "log file path where feeds to be deleted are recorded"),
WF_ENGINE_URL("workflowEngineUrl", "url of workflow engine server, ex:oozie"),
USER_SUBFLOW_ID("subflowId", "external id of user workflow"),
USER_WORKFLOW_ENGINE("userWorkflowEngine", "user workflow engine type"),
LOG_DIR("logDir", "log dir where job logs are copied");

private String name;
@@ -80,7 +81,6 @@ public String getOptionName() {
public String getOptionValue(CommandLine cmd) {
return cmd.getOptionValue(this.name);
}

}

public static void main(String[] args) throws Exception {
@@ -94,9 +94,11 @@ public int run(String[] args) throws Exception {

LOG.info("Sending user message " + cmd);
invokeUserMessageProducer(cmd);

//LogMover doesnt throw exception, a failed logmover will not fail the user workflow
LOG.info("Moving logs " + cmd);
invokeLogProducer(cmd);

LOG.info("Sending falcon message " + cmd);
invokeFalconMessageProducer(cmd);

@@ -154,20 +156,19 @@ private void invokeFalconMessageProducer(CommandLine cmd) throws Exception {
addArg(args, cmd, Arg.LOG_FILE);

MessageProducer.main(args.toArray(new String[0]));

}

private void invokeLogProducer(CommandLine cmd) throws Exception {
List<String> args = new ArrayList<String>();
addArg(args, cmd, Arg.WF_ENGINE_URL);
addArg(args, cmd, Arg.ENTITY_TYPE);
addArg(args, cmd, Arg.USER_SUBFLOW_ID);
addArg(args, cmd, Arg.USER_WORKFLOW_ENGINE);
addArg(args, cmd, Arg.RUN_ID);
addArg(args, cmd, Arg.LOG_DIR);
addArg(args, cmd, Arg.STATUS);

LogMover.main(args.toArray(new String[0]));

}

private void addArg(List<String> args, CommandLine cmd, Arg arg) {
@@ -198,15 +199,18 @@ private static CommandLine getCommand(String[] arguments)
addOption(options, Arg.LOG_FILE);
addOption(options, Arg.WF_ENGINE_URL);
addOption(options, Arg.USER_SUBFLOW_ID);
addOption(options, Arg.USER_WORKFLOW_ENGINE, false);
addOption(options, Arg.LOG_DIR);
return new GnuParser().parse(options, arguments);
}

private static void addOption(Options options, Arg arg) {
addOption(options, arg, true);
}

private static void addOption(Options options, Arg arg, boolean isRequired) {
Option option = arg.getOption();
option.setRequired(true);
option.setRequired(isRequired);
options.addOption(option);
}


}
@@ -68,7 +68,8 @@ public void setup() throws Exception {
"-" + Arg.WF_ENGINE_URL.getOptionName(),
"http://localhost:11000/oozie/",
"-" + Arg.LOG_DIR.getOptionName(), "target/log",
"-" + Arg.USER_SUBFLOW_ID.getOptionName(), "userflow@wf-id" + "test", };
"-" + Arg.USER_SUBFLOW_ID.getOptionName(), "userflow@wf-id" + "test",
"-" + Arg.USER_WORKFLOW_ENGINE.getOptionName(), "oozie", };
broker = new BrokerService();
broker.addConnector(BROKER_URL);
broker.setDataDirectory("target/activemq");
@@ -823,6 +823,7 @@
<exclude>**/.project</exclude>
<exclude>**/.settings/**</exclude>
<exclude>**/test-output/**</exclude>
<exclude>**/data.txt</exclude>
</excludes>
</configuration>
<executions>

0 comments on commit 09971e6

Please sign in to comment.