Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -74,10 +74,10 @@ public void put(E e) throws InterruptedException {

@Override
public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException {
boolean timedOut =
boolean success =
WorkflowThread.await(
unit.toMillis(timeout), "WorkflowQueue.offer", () -> queue.size() < capacity);
if (timedOut) {
if (!success) {
return false;
}
queue.addLast(e);
Expand Down
157 changes: 157 additions & 0 deletions src/test/java/com/uber/cadence/internal/sync/PromiseTest.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import com.uber.cadence.workflow.CompletablePromise;
import com.uber.cadence.workflow.Promise;
import com.uber.cadence.workflow.Workflow;
import java.time.Duration;
import java.util.ArrayList;
import java.util.IllegalFormatCodePointException;
import java.util.List;
Expand Down Expand Up @@ -84,6 +85,122 @@ public void testFailure() throws Throwable {
trace.setExpected(expected);
}

@Test
public void testTimedFailure() throws Throwable {
DeterministicRunner r =
DeterministicRunner.newRunner(
() -> {
CompletablePromise<Boolean> f = Workflow.newPromise();
trace.add("root begin");
WorkflowInternal.newThread(
false, () -> f.completeExceptionally(new IllegalArgumentException("foo")))
.start();
WorkflowInternal.newThread(
false,
() -> {
try {
f.get(10, TimeUnit.DAYS);
trace.add("thread1 get success");
fail("failure expected");
} catch (Exception e) {
assertEquals(IllegalArgumentException.class, e.getClass());
trace.add("thread1 get failure");
}
})
.start();
trace.add("root done");
});
r.runUntilAllBlocked();
String[] expected =
new String[] {
"root begin", "root done", "thread1 get failure",
};
trace.setExpected(expected);
}

@Test
public void testGetFailure() throws Throwable {
DeterministicRunner r =
DeterministicRunner.newRunner(
() -> {
CompletablePromise<Boolean> f = Workflow.newPromise();
trace.add("root begin");
WorkflowInternal.newThread(
false,
() -> {
trace.add("thread1 begin");
assertEquals(IllegalArgumentException.class, f.getFailure().getClass());
trace.add("thread1 done");
})
.start();
WorkflowInternal.newThread(
false,
() -> {
f.completeExceptionally(new IllegalArgumentException("foo"));
trace.add("thread2 done");
})
.start();

trace.add("root done");
});
r.runUntilAllBlocked();
String[] expected =
new String[] {"root begin", "root done", "thread1 begin", "thread2 done", "thread1 done"};
trace.setExpected(expected);
}

@Test
public void testGetFailureWithTimeout() throws Throwable {
DeterministicRunner r =
DeterministicRunner.newRunner(
() -> currentTime,
() -> {
CompletablePromise<Boolean> f = Workflow.newPromise();
trace.add("root begin");
WorkflowInternal.newThread(
false,
() -> {
try {
trace.add("thread1 begin");
f.get(1, TimeUnit.MINUTES);
trace.add("thread1 get success");
fail("failure expected");
} catch (Exception e) {
assertEquals(IllegalArgumentException.class, e.getClass());
trace.add("thread1 get failure");
}
})
.start();
WorkflowInternal.newThread(
false,
() -> {
Workflow.sleep(Duration.ofSeconds(30));
trace.add("thread2 awake");
f.completeExceptionally(new IllegalArgumentException("foo"));
trace.add("thread2 done");
})
.start();

trace.add("root done");
});
r.runUntilAllBlocked();
String[] expected = new String[] {"root begin", "root done", "thread1 begin"};
trace.setExpected(expected);

currentTime += Duration.ofSeconds(31).toMillis();
r.runUntilAllBlocked();
expected =
new String[] {
"root begin",
"root done",
"thread1 begin",
"thread2 awake",
"thread2 done",
"thread1 get failure"
};
trace.setExpected(expected);
}

@Test
public void testGetTimeout() throws Throwable {
ExecutorService threadPool =
Expand Down Expand Up @@ -249,6 +366,46 @@ public void testGetDefaultOnFailure() throws Throwable {
threadPool.awaitTermination(1, TimeUnit.MINUTES);
}

@Test
public void testGetDefault_success() throws Throwable {
ExecutorService threadPool =
new ThreadPoolExecutor(1, 1000, 1, TimeUnit.SECONDS, new SynchronousQueue<>());

DeterministicRunner r =
DeterministicRunner.newRunner(
threadPool,
null,
() -> currentTime,
() -> {
CompletablePromise<String> f = Workflow.newPromise();
trace.add("root begin");
WorkflowInternal.newThread(
false,
() -> {
trace.add("thread1 begin");
assertEquals("success", f.get("default"));
trace.add("thread1 get success");
})
.start();
WorkflowInternal.newThread(
false,
() -> {
trace.add("thread2 begin");
f.complete("success");
})
.start();
trace.add("root done");
});
r.runUntilAllBlocked();
String[] expected =
new String[] {
"root begin", "root done", "thread1 begin", "thread2 begin", "thread1 get success",
};
trace.setExpected(expected);
threadPool.shutdown();
threadPool.awaitTermination(1, TimeUnit.MINUTES);
}

@Test
public void testMultiple() throws Throwable {
DeterministicRunner r =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,8 +20,11 @@
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

import com.uber.cadence.workflow.QueueConsumer;
import com.uber.cadence.workflow.Workflow;
import com.uber.cadence.workflow.WorkflowQueue;
import java.util.concurrent.TimeUnit;
import org.junit.Assert;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
Expand Down Expand Up @@ -137,4 +140,171 @@ public void testPutBlocking() throws Throwable {
};
trace.setExpected(expected);
}

@Test
public void testPoll() throws Throwable {
DeterministicRunner r =
DeterministicRunner.newRunner(
() -> currentTime,
() -> {
WorkflowQueue<String> f = WorkflowInternal.newQueue(1);
f.offer("foo");
trace.add("root begin");
WorkflowInternal.newThread(
false,
() -> {
try {
trace.add("thread1 begin");
Assert.assertEquals("foo", f.poll(1, TimeUnit.SECONDS));
trace.add("thread1 foo");
Assert.assertNull(f.poll(1, TimeUnit.SECONDS));
trace.add("thread1 done");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
})
.start();

trace.add("root done");
});
r.runUntilAllBlocked();
String[] expected = new String[] {"root begin", "root done", "thread1 begin", "thread1 foo"};
trace.setExpected(expected);

currentTime += 1000;
r.runUntilAllBlocked();
expected =
new String[] {"root begin", "root done", "thread1 begin", "thread1 foo", "thread1 done"};
trace.setExpected(expected);
}

@Test
public void testOffer() throws Throwable {
DeterministicRunner r =
DeterministicRunner.newRunner(
() -> currentTime,
() -> {
WorkflowQueue<String> f = WorkflowInternal.newQueue(1);
trace.add("root begin");
WorkflowInternal.newThread(
false,
() -> {
Assert.assertTrue(f.offer("foo"));
Assert.assertFalse(f.offer("bar"));
trace.add("thread1 done");
})
.start();

trace.add("root done");
});
r.runUntilAllBlocked();
String[] expected = new String[] {"root begin", "root done", "thread1 done"};
trace.setExpected(expected);
}

@Test
public void testOfferTimed() throws Throwable {
DeterministicRunner r =
DeterministicRunner.newRunner(
() -> currentTime,
() -> {
WorkflowQueue<String> f = WorkflowInternal.newQueue(1);
trace.add("root begin");
WorkflowInternal.newThread(
false,
() -> {
try {
trace.add("thread1 begin");
Assert.assertTrue(f.offer("foo", 1, TimeUnit.SECONDS));
trace.add("thread1 foo");
Assert.assertFalse(f.offer("bar", 1, TimeUnit.SECONDS));
trace.add("thread1 done");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
})
.start();

trace.add("root done");
});
r.runUntilAllBlocked();
String[] expected = new String[] {"root begin", "root done", "thread1 begin", "thread1 foo"};
trace.setExpected(expected);

currentTime += 1000;
r.runUntilAllBlocked();
expected =
new String[] {"root begin", "root done", "thread1 begin", "thread1 foo", "thread1 done"};
trace.setExpected(expected);
}

@Test
public void testMappedTake() throws Throwable {
DeterministicRunner r =
DeterministicRunner.newRunner(
() -> currentTime,
() -> {
WorkflowQueue<Boolean> f = WorkflowInternal.newQueue(1);
f.offer(true);
trace.add("root begin");
WorkflowInternal.newThread(
false,
() -> {
try {
QueueConsumer<String> mappedQueue = f.map(x -> x ? "yes" : "no");
trace.add("thread1 begin");
Assert.assertEquals("yes", mappedQueue.take());
trace.add("thread1 done");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
})
.start();

trace.add("root done");
});
r.runUntilAllBlocked();
String[] expected = new String[] {"root begin", "root done", "thread1 begin", "thread1 done"};
trace.setExpected(expected);
}

@Test
public void testMappedPoll() throws Throwable {
DeterministicRunner r =
DeterministicRunner.newRunner(
() -> currentTime,
() -> {
WorkflowQueue<Boolean> f = WorkflowInternal.newQueue(1);
f.offer(true);
trace.add("root begin");
WorkflowInternal.newThread(
false,
() -> {
try {
QueueConsumer<String> mappedQueue =
f.map(x -> x ? "yes" : "no").map(x -> x);
trace.add("thread1 begin");
Assert.assertEquals("yes", mappedQueue.poll(1, TimeUnit.SECONDS));
trace.add("thread1 yes");
Assert.assertNull(mappedQueue.poll(1, TimeUnit.SECONDS));

trace.add("thread1 done");
} catch (InterruptedException e) {
throw new RuntimeException(e);
}
})
.start();

trace.add("root done");
});
r.runUntilAllBlocked();
String[] expected = new String[] {"root begin", "root done", "thread1 begin", "thread1 yes"};
trace.setExpected(expected);

currentTime += 1000;
r.runUntilAllBlocked();
expected =
new String[] {"root begin", "root done", "thread1 begin", "thread1 yes", "thread1 done"};
trace.setExpected(expected);
}
}