Skip to content

Commit

Permalink
Merge pull request #15 from satish-mittal/branch-4.0
Browse files Browse the repository at this point in the history
Merge OOZIE-1663, OOZIE-1664, OOZIE-1314, OOZIE-1577
  • Loading branch information
Shwetha G S committed Mar 20, 2014
2 parents 2ed72fd + ca8a130 commit 24fbea8
Show file tree
Hide file tree
Showing 12 changed files with 83 additions and 19 deletions.
8 changes: 8 additions & 0 deletions core/src/main/java/org/apache/oozie/command/XCommand.java
Original file line number Diff line number Diff line change
Expand Up @@ -521,4 +521,12 @@ public XLog getLog() {
return LOG;
}

/**
* String for the command - key
* @return String
*/
@Override
public String toString() {
return getKey();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -217,8 +217,8 @@ private void updateAction(CoordinatorJobBean coordJob, CoordinatorActionBean coo
coordAction.setCreatedTime(new Date());
}
coordAction.setStatus(CoordinatorAction.Status.WAITING);
coordAction.setExternalId("");
coordAction.setExternalStatus("");
coordAction.setExternalId(null);
coordAction.setExternalStatus(null);
coordAction.setRerunTime(new Date());
coordAction.setLastModifiedTime(new Date());
updateList.add(coordAction);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -308,8 +308,8 @@ protected String submit() throws CommandException {
*/
private void validateCoordinatorJob() {
// check if startTime < endTime
if (coordJob.getStartTime().after(coordJob.getEndTime())) {
throw new IllegalArgumentException("Coordinator Start Time cannot be greater than End Time.");
if (!coordJob.getStartTime().before(coordJob.getEndTime())) {
throw new IllegalArgumentException("Coordinator Start Time must be earlier than End Time.");
}
}

Expand Down Expand Up @@ -761,7 +761,7 @@ private void resolveSLA(Element eAppXml, CoordinatorJobBean coordJob) throws Com
/**
* Resolve input-events/data-in and output-events/data-out tags.
*
* @param eJob : Job element
* @param eJobOrg : Job element
* @throws CoordinatorJobException thrown if failed to resolve input and output events
*/
@SuppressWarnings("unchecked")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -198,14 +198,6 @@ public void run() {
}
}

/**
* @return String the queue dump
*/
@Override
public String toString() {
return "delay=" + getDelay(TimeUnit.MILLISECONDS) + ", elements=" + getElement().toString();
}

/**
* Filter the duplicate callables from the list before queue this.
* <p/>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ public QueueElement<E> poll() {
Iterator<QueueElement<E>> iter = queues[i - 1].iterator();
while(e == null && iter.hasNext()) {
e = iter.next();
if (eligibleToPoll(e)) {
if (e.getDelay(TimeUnit.MILLISECONDS) <= 0 && eligibleToPoll(e)) {
queues[i - 1].remove(e);
}
else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -396,10 +396,10 @@ public void testRequeueOnException() throws Exception {
// Should be requeued at the recovery service interval
final List<String> queueDump = callableQueueService.getQueueDump();
assertEquals(1, callableQueueService.getQueueDump().size());
assertTrue(queueDump.get(0).contains(CoordPushDependencyCheckXCommand.class.getName()));
assertTrue(queueDump.get(0).contains("coord_push_dep_check"));
log.info("Queue dump is " + queueDump.toString());
// Delay should be something like delay=599999. Ignore last three digits
assertTrue(queueDump.get(0).matches("delay=599[0-9]{3}, .*"));
assertTrue(queueDump.get(0).matches(".* delay=599[0-9]{3}"));
}

@Test
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -121,6 +121,7 @@ public void testCoordRerunActions1() throws Exception {
store2.beginTrx();
CoordinatorActionBean action2 = store2.getCoordinatorAction(actionId, false);
assertNotSame(action2.getStatus(), CoordinatorAction.Status.SUCCEEDED);
assertNull(action2.getExternalId());
store2.commitTrx();
store2.closeTrx();
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -132,7 +132,45 @@ public void testBasicSubmitWithStartTimeAfterEndTime() throws Exception {
catch (CommandException e) {
assertEquals(sc.getJob().getStatus(), Job.Status.FAILED);
assertEquals(e.getErrorCode(), ErrorCode.E1003);
assertTrue(e.getMessage().contains("Coordinator Start Time cannot be greater than End Time."));
assertTrue(e.getMessage().contains("Coordinator Start Time must be earlier than End Time."));
}
}

public void testBasicSubmitWithIdenticalStartAndEndTime() throws Exception {
Configuration conf = new XConfiguration();
File appPathFile = new File(getTestCaseDir(), "coordinator.xml");
String appXml = "<coordinator-app name=\"NAME\" frequency=\"${coord:days(1)}\" start=\"2010-02-01T01:00Z\" "
+ "end=\"2010-02-01T01:00Z\" timezone=\"UTC\" "
+ "xmlns=\"uri:oozie:coordinator:0.2\"> <controls> "
+ "<execution>LIFO</execution> </controls> <datasets> "
+ "<dataset name=\"a\" frequency=\"${coord:days(7)}\" initial-instance=\"2009-02-01T01:00Z\" "
+ "timezone=\"UTC\"> <uri-template>" + getTestCaseFileUri("coord/workflows/${YEAR}/${DAY}")
+ "</uri-template> </dataset> "
+ "<dataset name=\"local_a\" frequency=\"${coord:days(7)}\" initial-instance=\"2009-02-01T01:00Z\" "
+ "timezone=\"UTC\"> <uri-template>" + getTestCaseFileUri("coord/workflows/${YEAR}/${DAY}")
+ "</uri-template> </dataset> "
+ "</datasets> <input-events> "
+ "<data-in name=\"A\" dataset=\"a\"> <instance>${coord:latest(0)}</instance> </data-in> "
+ "</input-events> "
+ "<output-events> <data-out name=\"LOCAL_A\" dataset=\"local_a\"> "
+ "<instance>${coord:current(-1)}</instance> </data-out> </output-events> <action> <workflow> "
+ "<app-path>hdfs:///tmp/workflows/</app-path> "
+ "<configuration> <property> <name>inputA</name> <value>${coord:dataIn('A')}</value> </property> "
+ "<property> <name>inputB</name> <value>${coord:dataOut('LOCAL_A')}</value> "
+ "</property></configuration> </workflow> </action> </coordinator-app>";
writeToFile(appXml, appPathFile.getPath());
conf.set(OozieClient.COORDINATOR_APP_PATH, appPathFile.toURI().toString());
conf.set(OozieClient.USER_NAME, getTestUser());
CoordSubmitXCommand sc = new CoordSubmitXCommand(conf);

try {
sc.call();
fail("Expected to catch errors due to incorrectly specified Start and End Time");
}
catch (CommandException e) {
assertEquals(sc.getJob().getStatus(), Job.Status.FAILED);
assertEquals(e.getErrorCode(), ErrorCode.E1003);
assertTrue(e.getMessage().contains("Coordinator Start Time must be earlier than End Time."));
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -400,7 +400,8 @@ public void testConcurrencyReachedAndChooseNextEligible() throws Exception {
List<MyCallable> callables = Arrays.asList(callable1, callable2, callable3, callable4, callable5, callable6);

final MyCallable callableOther = new MyCallable("other", 0, 100);
queueservice.queue(callableOther, 1000);
long now = System.currentTimeMillis();
queueservice.queue(callableOther, 15);

for (MyCallable c : callables) {
queueservice.queue(c, 10);
Expand Down Expand Up @@ -430,6 +431,7 @@ public boolean evaluate() throws Exception {
System.out.println("Callable callableOther executed :" + callableOther.executed);

assertTrue(callableOther.executed < last);
assertTrue(callableOther.executed > (now + 115));
}

public void testSerialConcurrencyLimit() throws Exception {
Expand Down
18 changes: 18 additions & 0 deletions core/src/test/java/org/apache/oozie/test/XTestCase.java
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,24 @@ protected void setSystemProperty(String name, String value) {
}
}

/**
* Return the URI for a test file. The returned value is the testDir + concatenated URI.
*
* @return the test working directory path, it is always an absolute path and appends the relative path. The
* reason for the manual parsing instead of an actual File.toURI is because Oozie tests use tokens ${}
* frequently. Something like URI("c:/temp/${HOUR}").toString() will generate escaped values that will break tests
*/
protected String getTestCaseFileUri(String relativeUri) {
String uri = new File(testCaseDir).toURI().toString();

// truncates '/' if the testCaseDir was provided with a fullpath ended with separator
if (uri.endsWith("/")){
uri = uri.substring(0, uri.length() -1);
}

return uri + "/" + relativeUri;
}

/**
* Reset changed system properties to their original values. <p/> Called from {@link #tearDown}.
*/
Expand Down
3 changes: 2 additions & 1 deletion docs/src/site/twiki/CoordinatorFunctionalSpec.twiki
Original file line number Diff line number Diff line change
Expand Up @@ -652,7 +652,8 @@ Coordinator applications are normally parameterized.

To create a coordinator job, a job configuration that resolves all coordinator application parameters must be provided to the coordinator engine.

A coordinator job is a running instance of a coordinator application running from a start time to an end time.
A coordinator job is a running instance of a coordinator application running from a start time to an end time. The start
time must be earlier than the end time.

At any time, a coordinator job is in one of the following status: *PREP, RUNNING, RUNNINGWITHERROR, PREPSUSPENDED, SUSPENDED, SUSPENDEDWITHERROR, PREPPAUSED, PAUSED, PAUSEDWITHERROR, SUCCEEDED, DONEWITHERROR, KILLED, FAILED*.

Expand Down
4 changes: 4 additions & 0 deletions release-log.txt
Original file line number Diff line number Diff line change
@@ -1,6 +1,10 @@
-- Oozie 4.1.0 release (trunk - unreleased)

OOZIE-1675 Adding absolute URI of local cluster to dist cache not working with hadoop version 0.20.2 and before (satish via ryota)
OOZIE-1663 Queuedump to display command type (shwethags via virag)
OOZIE-1664 PollablePriorityDelayQueue.poll() returns elements with +ve delay (shwethags via rohini)
OOZIE-1314 IllegalArgumentException: wfId cannot be empty (shwethags via virag)
OOZIE-1577 Oozie coordinator job with identical start and end time remains "RUNNING" forever (bowenzhangusa via rkanter)

-- Oozie 4.0.1 release (unreleased)

Expand Down

0 comments on commit 24fbea8

Please sign in to comment.