This repository has been archived by the owner. It is now read-only.
Permalink
Browse files

Closes OOZIE-10 add user-retry in workflow action

  • Loading branch information...
Angelo Kaichen Huang Mohammad Kamrul Islam
Angelo Kaichen Huang authored and Mohammad Kamrul Islam committed Aug 5, 2011
1 parent 65baec9 commit e7b4d9d0a2415e32148b3d8d9522709d57b13b14
Showing with 1,722 additions and 90 deletions.
  1. +22 −0 client/src/main/java/org/apache/oozie/client/WorkflowAction.java
  2. +285 −0 client/src/main/resources/oozie-workflow-0.3.xsd
  3. +3 −2 core/src/main/java/org/apache/oozie/ErrorCode.java
  4. +15 −0 core/src/main/java/org/apache/oozie/WorkflowActionBean.java
  5. +5 −1 core/src/main/java/org/apache/oozie/action/hadoop/JavaActionExecutor.java
  6. +40 −0 core/src/main/java/org/apache/oozie/client/rest/JsonWorkflowAction.java
  7. +14 −7 core/src/main/java/org/apache/oozie/command/wf/ActionCheckXCommand.java
  8. +17 −9 core/src/main/java/org/apache/oozie/command/wf/ActionEndXCommand.java
  9. +2 −1 core/src/main/java/org/apache/oozie/command/wf/ActionKillXCommand.java
  10. +22 −3 core/src/main/java/org/apache/oozie/command/wf/ActionStartXCommand.java
  11. +63 −24 core/src/main/java/org/apache/oozie/command/wf/ActionXCommand.java
  12. +7 −3 core/src/main/java/org/apache/oozie/command/wf/SignalXCommand.java
  13. +3 −0 core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionGetJPAExecutor.java
  14. +3 −0 core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionsGetForJobJPAExecutor.java
  15. +3 −0 core/src/main/java/org/apache/oozie/executor/jpa/WorkflowActionsRunningGetJPAExecutor.java
  16. +3 −0 core/src/main/java/org/apache/oozie/executor/jpa/WorkflowJobGetActionsJPAExecutor.java
  17. +99 −3 core/src/main/java/org/apache/oozie/service/LiteWorkflowStoreService.java
  18. +3 −0 core/src/main/java/org/apache/oozie/service/RecoveryService.java
  19. +5 −2 core/src/main/java/org/apache/oozie/service/SchemaService.java
  20. +3 −0 core/src/main/java/org/apache/oozie/store/WorkflowStore.java
  21. +12 −3 core/src/main/java/org/apache/oozie/workflow/lite/ActionNodeDef.java
  22. +9 −1 core/src/main/java/org/apache/oozie/workflow/lite/LiteWorkflowAppParser.java
  23. +150 −24 core/src/main/java/org/apache/oozie/workflow/lite/NodeDef.java
  24. +53 −1 core/src/main/resources/oozie-default.xml
  25. +1 −1 core/src/test/java/org/apache/oozie/command/wf/TestActionCheckXCommand.java
  26. +68 −1 core/src/test/java/org/apache/oozie/command/wf/TestActionErrors.java
  27. +1 −1 core/src/test/java/org/apache/oozie/command/wf/TestActionStartXCommand.java
  28. +75 −1 core/src/test/java/org/apache/oozie/command/wf/TestWorkflowKillXCommand.java
  29. +125 −0 core/src/test/java/org/apache/oozie/service/TestRecoveryService.java
  30. +38 −0 core/src/test/resources/wf-ext-schema-valid-user-retry.xml
  31. +572 −2 docs/src/site/twiki/WorkflowFunctionalSpec.twiki
  32. +1 −0 release-log.txt
@@ -29,6 +29,7 @@
RUNNING,
OK,
ERROR,
+ USER_RETRY,
START_RETRY,
START_MANUAL,
DONE,
@@ -86,6 +87,27 @@
* @return the number of retries of the action.
*/
int getRetries();
+
+ /**
+ * Return the number of user retry of the action.
+ *
+ * @return the number of user retry of the action.
+ */
+ int getUserRetryCount();
+
+ /**
+ * Return the max number of user retry of the action.
+ *
+ * @return the max number of user retry of the action.
+ */
+ int getUserRetryMax();
+
+ /**
+ * Return the interval of user retry of the action, in minutes.
+ *
+ * @return the interval of user retry of the action, in minutes.
+ */
+ int getUserRetryInterval();
/**
* Return the start time of the action action.
@@ -0,0 +1,285 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Copyright (c) 2010 Yahoo! Inc. All rights reserved.
+ Licensed under the Apache License, Version 2.0 (the "License");
+ you may not use this file except in compliance with the License.
+ You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License. See accompanying LICENSE file.
+-->
+<xs:schema xmlns:xs="http://www.w3.org/2001/XMLSchema" xmlns:workflow="uri:oozie:workflow:0.3"
+ elementFormDefault="qualified" targetNamespace="uri:oozie:workflow:0.3">
+
+ <xs:element name="workflow-app" type="workflow:WORKFLOW-APP"/>
+
+ <xs:simpleType name="IDENTIFIER">
+ <xs:restriction base="xs:string">
+ <xs:pattern value="([a-zA-Z_]([\-_a-zA-Z0-9])*){1,39})"/>
+ </xs:restriction>
+ </xs:simpleType>
+
+ <xs:complexType name="WORKFLOW-APP">
+ <xs:sequence>
+ <xs:element name="credentials" type="workflow:CREDENTIALS" minOccurs="0" maxOccurs="1"/>
+ <xs:element name="start" type="workflow:START" minOccurs="1" maxOccurs="1"/>
+ <xs:choice minOccurs="0" maxOccurs="unbounded">
+ <xs:element name="decision" type="workflow:DECISION" minOccurs="1" maxOccurs="1"/>
+ <xs:element name="fork" type="workflow:FORK" minOccurs="1" maxOccurs="1"/>
+ <xs:element name="join" type="workflow:JOIN" minOccurs="1" maxOccurs="1"/>
+ <xs:element name="kill" type="workflow:KILL" minOccurs="1" maxOccurs="1"/>
+ <xs:element name="action" type="workflow:ACTION" minOccurs="1" maxOccurs="1"/>
+ </xs:choice>
+ <xs:element name="end" type="workflow:END" minOccurs="1" maxOccurs="1"/>
+ <xs:any namespace="uri:oozie:sla:0.1" minOccurs="0" maxOccurs="1"/>
+ </xs:sequence>
+ <xs:attribute name="name" type="workflow:IDENTIFIER" use="required"/>
+ </xs:complexType>
+
+ <xs:complexType name="START">
+ <xs:attribute name="to" type="workflow:IDENTIFIER" use="required"/>
+ </xs:complexType>
+
+ <xs:complexType name="END">
+ <xs:attribute name="name" type="workflow:IDENTIFIER" use="required"/>
+ </xs:complexType>
+
+ <xs:complexType name="DECISION">
+ <xs:sequence>
+ <xs:element name="switch" type="workflow:SWITCH" minOccurs="1" maxOccurs="1"/>
+ </xs:sequence>
+ <xs:attribute name="name" type="workflow:IDENTIFIER" use="required"/>
+ </xs:complexType>
+
+ <xs:element name="switch" type="workflow:SWITCH"/>
+
+ <xs:complexType name="SWITCH">
+ <xs:sequence>
+ <xs:sequence>
+ <xs:element name="case" type="workflow:CASE" minOccurs="1" maxOccurs="unbounded"/>
+ <xs:element name="default" type="workflow:DEFAULT" minOccurs="1" maxOccurs="1"/>
+ </xs:sequence>
+ </xs:sequence>
+ </xs:complexType>
+
+ <xs:complexType name="CASE">
+ <xs:simpleContent>
+ <xs:extension base="xs:string">
+ <xs:attribute name="to" type="workflow:IDENTIFIER" use="required"/>
+ </xs:extension>
+ </xs:simpleContent>
+ </xs:complexType>
+
+ <xs:complexType name="DEFAULT">
+ <xs:attribute name="to" type="workflow:IDENTIFIER" use="required"/>
+ </xs:complexType>
+
+ <xs:complexType name="FORK_TRANSITION">
+ <xs:attribute name="start" type="workflow:IDENTIFIER" use="required"/>
+ </xs:complexType>
+
+ <xs:complexType name="FORK">
+ <xs:sequence>
+ <xs:element name="path" type="workflow:FORK_TRANSITION" minOccurs="2" maxOccurs="unbounded"/>
+ </xs:sequence>
+ <xs:attribute name="name" type="workflow:IDENTIFIER" use="required"/>
+ </xs:complexType>
+
+ <xs:complexType name="JOIN">
+ <xs:attribute name="name" type="workflow:IDENTIFIER" use="required"/>
+ <xs:attribute name="to" type="workflow:IDENTIFIER" use="required"/>
+ </xs:complexType>
+
+ <xs:element name="kill" type="workflow:KILL"/>
+
+ <xs:complexType name="KILL">
+ <xs:sequence>
+ <xs:element name="message" type="xs:string" minOccurs="1" maxOccurs="1"/>
+ </xs:sequence>
+ <xs:attribute name="name" type="workflow:IDENTIFIER" use="required"/>
+ </xs:complexType>
+
+ <xs:complexType name="ACTION_TRANSITION">
+ <xs:attribute name="to" type="workflow:IDENTIFIER" use="required"/>
+ </xs:complexType>
+
+ <xs:element name="map-reduce" type="workflow:MAP-REDUCE"/>
+ <xs:element name="pig" type="workflow:PIG"/>
+ <xs:element name="sub-workflow" type="workflow:SUB-WORKFLOW"/>
+ <xs:element name="fs" type="workflow:FS"/>
+ <xs:element name="java" type="workflow:JAVA"/>
+
+ <xs:complexType name="ACTION">
+ <xs:sequence>
+ <xs:choice minOccurs="1" maxOccurs="1">
+ <xs:element name="map-reduce" type="workflow:MAP-REDUCE" minOccurs="1" maxOccurs="1"/>
+ <xs:element name="pig" type="workflow:PIG" minOccurs="1" maxOccurs="1"/>
+ <xs:element name="sub-workflow" type="workflow:SUB-WORKFLOW" minOccurs="1" maxOccurs="1"/>
+ <xs:element name="fs" type="workflow:FS" minOccurs="1" maxOccurs="1"/>
+ <xs:element name="java" type="workflow:JAVA" minOccurs="1" maxOccurs="1"/>
+ <xs:any namespace="##other" minOccurs="1" maxOccurs="1"/>
+ </xs:choice>
+ <xs:element name="ok" type="workflow:ACTION_TRANSITION" minOccurs="1" maxOccurs="1"/>
+ <xs:element name="error" type="workflow:ACTION_TRANSITION" minOccurs="1" maxOccurs="1"/>
+ <xs:any namespace="uri:oozie:sla:0.1" minOccurs="0" maxOccurs="1"/>
+ </xs:sequence>
+ <xs:attribute name="name" type="workflow:IDENTIFIER" use="required"/>
+ <xs:attribute name="cred" type="xs:string"/>
+ <xs:attribute name="retry-max" type="xs:string"/>
+ <xs:attribute name="retry-interval" type="xs:string"/>
+ </xs:complexType>
+
+ <xs:complexType name="MAP-REDUCE">
+ <xs:sequence>
+ <xs:element name="job-tracker" type="xs:string" minOccurs="1" maxOccurs="1"/>
+ <xs:element name="name-node" type="xs:string" minOccurs="1" maxOccurs="1"/>
+ <xs:element name="prepare" type="workflow:PREPARE" minOccurs="0" maxOccurs="1"/>
+ <xs:choice minOccurs="0" maxOccurs="1">
+ <xs:element name="streaming" type="workflow:STREAMING" minOccurs="0" maxOccurs="1"/>
+ <xs:element name="pipes" type="workflow:PIPES" minOccurs="0" maxOccurs="1"/>
+ </xs:choice>
+ <xs:element name="job-xml" type="xs:string" minOccurs="0" maxOccurs="1"/>
+ <xs:element name="configuration" type="workflow:CONFIGURATION" minOccurs="0" maxOccurs="1"/>
+ <xs:element name="file" type="xs:string" minOccurs="0" maxOccurs="unbounded"/>
+ <xs:element name="archive" type="xs:string" minOccurs="0" maxOccurs="unbounded"/>
+ </xs:sequence>
+ </xs:complexType>
+
+ <xs:complexType name="PIG">
+ <xs:sequence>
+ <xs:element name="job-tracker" type="xs:string" minOccurs="1" maxOccurs="1"/>
+ <xs:element name="name-node" type="xs:string" minOccurs="1" maxOccurs="1"/>
+ <xs:element name="prepare" type="workflow:PREPARE" minOccurs="0" maxOccurs="1"/>
+ <xs:element name="job-xml" type="xs:string" minOccurs="0" maxOccurs="1"/>
+ <xs:element name="configuration" type="workflow:CONFIGURATION" minOccurs="0" maxOccurs="1"/>
+ <xs:element name="script" type="xs:string" minOccurs="1" maxOccurs="1"/>
+ <xs:element name="param" type="xs:string" minOccurs="0" maxOccurs="unbounded"/>
+ <xs:element name="argument" type="xs:string" minOccurs="0" maxOccurs="unbounded"/>
+ <xs:element name="file" type="xs:string" minOccurs="0" maxOccurs="unbounded"/>
+ <xs:element name="archive" type="xs:string" minOccurs="0" maxOccurs="unbounded"/>
+ </xs:sequence>
+ </xs:complexType>
+
+ <xs:complexType name="SUB-WORKFLOW">
+ <xs:sequence>
+ <xs:element name="app-path" type="xs:string" minOccurs="1" maxOccurs="1"/>
+ <xs:element name="propagate-configuration" type="workflow:FLAG" minOccurs="0" maxOccurs="1"/>
+ <xs:element name="configuration" type="workflow:CONFIGURATION" minOccurs="0" maxOccurs="1"/>
+ </xs:sequence>
+ </xs:complexType>
+
+ <xs:complexType name="FS">
+ <xs:sequence>
+ <xs:element name="delete" type="workflow:DELETE" minOccurs="0" maxOccurs="unbounded"/>
+ <xs:element name="mkdir" type="workflow:MKDIR" minOccurs="0" maxOccurs="unbounded"/>
+ <xs:element name="move" type="workflow:MOVE" minOccurs="0" maxOccurs="unbounded"/>
+ <xs:element name="chmod" type="workflow:CHMOD" minOccurs="0" maxOccurs="unbounded"/>
+ </xs:sequence>
+ </xs:complexType>
+
+ <xs:complexType name="JAVA">
+ <xs:sequence>
+ <xs:element name="job-tracker" type="xs:string" minOccurs="1" maxOccurs="1"/>
+ <xs:element name="name-node" type="xs:string" minOccurs="1" maxOccurs="1"/>
+ <xs:element name="prepare" type="workflow:PREPARE" minOccurs="0" maxOccurs="1"/>
+ <xs:element name="job-xml" type="xs:string" minOccurs="0" maxOccurs="1"/>
+ <xs:element name="configuration" type="workflow:CONFIGURATION" minOccurs="0" maxOccurs="1"/>
+ <xs:element name="main-class" type="xs:string" minOccurs="1" maxOccurs="1"/>
+ <xs:element name="java-opts" type="xs:string" minOccurs="0" maxOccurs="1"/>
+ <xs:element name="arg" type="xs:string" minOccurs="0" maxOccurs="unbounded"/>
+ <xs:element name="file" type="xs:string" minOccurs="0" maxOccurs="unbounded"/>
+ <xs:element name="archive" type="xs:string" minOccurs="0" maxOccurs="unbounded"/>
+ <xs:element name="capture-output" type="workflow:FLAG" minOccurs="0" maxOccurs="1"/>
+ </xs:sequence>
+ </xs:complexType>
+
+ <xs:complexType name="FLAG"/>
+
+ <xs:complexType name="CONFIGURATION">
+ <xs:sequence>
+ <xs:element name="property" minOccurs="1" maxOccurs="unbounded">
+ <xs:complexType>
+ <xs:sequence>
+ <xs:element name="name" minOccurs="1" maxOccurs="1" type="xs:string"/>
+ <xs:element name="value" minOccurs="1" maxOccurs="1" type="xs:string"/>
+ <xs:element name="description" minOccurs="0" maxOccurs="1" type="xs:string"/>
+ </xs:sequence>
+ </xs:complexType>
+ </xs:element>
+ </xs:sequence>
+ </xs:complexType>
+
+ <xs:complexType name="STREAMING">
+ <xs:sequence>
+ <xs:element name="mapper" type="xs:string" minOccurs="0" maxOccurs="1"/>
+ <xs:element name="reducer" type="xs:string" minOccurs="0" maxOccurs="1"/>
+ <xs:element name="record-reader" type="xs:string" minOccurs="0" maxOccurs="1"/>
+ <xs:element name="record-reader-mapping" type="xs:string" minOccurs="0" maxOccurs="unbounded"/>
+ <xs:element name="env" type="xs:string" minOccurs="0" maxOccurs="unbounded"/>
+ </xs:sequence>
+ </xs:complexType>
+
+ <xs:complexType name="PIPES">
+ <xs:sequence>
+ <xs:element name="map" type="xs:string" minOccurs="0" maxOccurs="1"/>
+ <xs:element name="reduce" type="xs:string" minOccurs="0" maxOccurs="1"/>
+ <xs:element name="inputformat" type="xs:string" minOccurs="0" maxOccurs="1"/>
+ <xs:element name="partitioner" type="xs:string" minOccurs="0" maxOccurs="1"/>
+ <xs:element name="writer" type="xs:string" minOccurs="0" maxOccurs="1"/>
+ <xs:element name="program" type="xs:string" minOccurs="0" maxOccurs="1"/>
+ </xs:sequence>
+ </xs:complexType>
+
+ <xs:complexType name="PREPARE">
+ <xs:sequence>
+ <xs:element name="delete" type="workflow:DELETE" minOccurs="0" maxOccurs="unbounded"/>
+ <xs:element name="mkdir" type="workflow:MKDIR" minOccurs="0" maxOccurs="unbounded"/>
+ </xs:sequence>
+ </xs:complexType>
+
+ <xs:complexType name="DELETE">
+ <xs:attribute name="path" type="xs:string" use="required"/>
+ </xs:complexType>
+
+ <xs:complexType name="MKDIR">
+ <xs:attribute name="path" type="xs:string" use="required"/>
+ </xs:complexType>
+
+ <xs:complexType name="MOVE">
+ <xs:attribute name="source" type="xs:string" use="required"/>
+ <xs:attribute name="target" type="xs:string" use="required"/>
+ </xs:complexType>
+
+ <xs:complexType name="CHMOD">
+ <xs:attribute name="path" type="xs:string" use="required"/>
+ <xs:attribute name="permissions" type="xs:string" use="required"/>
+ <xs:attribute name="dir-files" type="xs:string"/>
+ </xs:complexType>
+
+ <xs:complexType name="CREDENTIALS">
+ <xs:sequence minOccurs="0" maxOccurs="unbounded">
+ <xs:element name="credential" type="workflow:CREDENTIAL"/>
+ </xs:sequence>
+ </xs:complexType>
+
+ <xs:complexType name="CREDENTIAL">
+ <xs:sequence minOccurs="0" maxOccurs="unbounded" >
+ <xs:element name="property" minOccurs="1" maxOccurs="unbounded">
+ <xs:complexType>
+ <xs:sequence>
+ <xs:element name="name" minOccurs="1" maxOccurs="1" type="xs:string"/>
+ <xs:element name="value" minOccurs="1" maxOccurs="1" type="xs:string"/>
+ <xs:element name="description" minOccurs="0" maxOccurs="1" type="xs:string"/>
+ </xs:sequence>
+ </xs:complexType>
+ </xs:element>
+ </xs:sequence>
+ <xs:attribute name="name" type="xs:string" use="required"/>
+ <xs:attribute name="type" type="xs:string" use="required"/>
+ </xs:complexType>
+</xs:schema>
@@ -143,7 +143,8 @@
E0816(XLog.STD, "Action pending=[{0}], status=[{1}]. Skipping ActionStart Execution"),
E0817(XLog.STD, "The wf action [{0}] has been udated recently. Ignoring ActionCheck."),
E0818(XLog.STD, "Action [{0}] status is running but WF Job [{1}] status is [{2}]. Expected status is RUNNING."),
- E0819(XLog.STD, "Unable to delete the temp dir of job WF Job [{1}]."),
+ E0819(XLog.STD, "Unable to delete the temp dir of job WF Job [{0}]."),
+ E0820(XLog.STD, "Action user retry max [{0}] is over system defined max [{1}], re-assign to use system max."),
E0900(XLog.OPS, "Jobtracker [{0}] not allowed, not in Oozie's whitelist"),
E0901(XLog.OPS, "Namenode [{0}] not allowed, not in Oozie's whitelist"),
@@ -171,7 +172,7 @@
E1020(XLog.STD, "Could not kill coord job, this job either finished successfully or does not exist , [{0}]"),
E1021(XLog.STD, "Coord Action Input Check Error: {0}"),
- E1100(XLog.STD, "Command precondition does not hold before execution"),
+ E1100(XLog.STD, "Command precondition does not hold before execution, [{0}]"),
E1101(XLog.STD, "SLA Nominal time is required."),
E1102(XLog.STD, "SLA should-start can't be empty."),
@@ -158,6 +158,9 @@ public void write(DataOutput dataOutput) throws IOException {
dataOutput.writeLong((pendingAge != null) ? pendingAge.getTime() : -1);
WritableUtils.writeStr(dataOutput, signalValue);
WritableUtils.writeStr(dataOutput, logToken);
+ dataOutput.writeInt(getUserRetryCount());
+ dataOutput.writeInt(getUserRetryInterval());
+ dataOutput.writeInt(getUserRetryMax());
}
/**
@@ -203,6 +206,9 @@ public void readFields(DataInput dataInput) throws IOException {
}
signalValue = WritableUtils.readStr(dataInput);
logToken = WritableUtils.readStr(dataInput);
+ setUserRetryCount(dataInput.readInt());
+ setUserRetryInterval(dataInput.readInt());
+ setUserRetryMax(dataInput.readInt());
}
/**
@@ -225,6 +231,15 @@ public boolean isRetryOrManual() {
return (getStatus() == WorkflowAction.Status.START_RETRY || getStatus() == WorkflowAction.Status.START_MANUAL
|| getStatus() == WorkflowAction.Status.END_RETRY || getStatus() == WorkflowAction.Status.END_MANUAL);
}
+
+ /**
+ * Return true if the action is USER_RETRY
+ *
+ * @return boolean true if status is USER_RETRY
+ */
+ public boolean isUserRetry() {
+ return (getStatus() == WorkflowAction.Status.USER_RETRY);
+ }
/**
* Return if the action is complete.
@@ -43,6 +43,7 @@
import org.apache.hadoop.mapred.JobID;
import org.apache.hadoop.mapred.RunningJob;
import org.apache.hadoop.util.DiskChecker;
+import org.apache.oozie.WorkflowActionBean;
import org.apache.oozie.WorkflowJobBean;
import org.apache.oozie.action.ActionExecutor;
import org.apache.oozie.action.ActionExecutorException;
@@ -536,7 +537,10 @@ public void submitLauncher(FileSystem actionFs, Context context, WorkflowAction
boolean alreadyRunning = launcherId != null;
RunningJob runningJob;
- if (alreadyRunning) {
+ // if user-retry is on, always submit new launcher
+ boolean isUserRetry = ((WorkflowActionBean)action).isUserRetry();
+
+ if (alreadyRunning && !isUserRetry) {
runningJob = jobClient.getJob(JobID.forName(launcherId));
if (runningJob == null) {
String jobTracker = launcherJobConf.get("mapred.job.tracker");
Oops, something went wrong.

0 comments on commit e7b4d9d

Please sign in to comment.