Skip to content

Commit

Permalink
[hotfix] [tests] Speed up queryable state IT tests by removing sleep
Browse files Browse the repository at this point in the history
  • Loading branch information
tillrohrmann committed Nov 27, 2017
1 parent e512558 commit 6489c67
Showing 1 changed file with 22 additions and 20 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -107,6 +107,7 @@
public abstract class AbstractQueryableStateTestBase extends TestLogger {

private static final FiniteDuration TEST_TIMEOUT = new FiniteDuration(10000L, TimeUnit.SECONDS);
public static final long RETRY_TIMEOUT = 50L;

private final ScheduledExecutorService executorService = Executors.newScheduledThreadPool(4);
private final ScheduledExecutor executor = new ScheduledExecutorServiceAdapter(executorService);
Expand Down Expand Up @@ -208,11 +209,12 @@ public Integer getKey(Tuple2<Integer, Long> value) throws Exception {

final AtomicLongArray counts = new AtomicLongArray(numKeys);

final List<CompletableFuture<ReducingState<Tuple2<Integer, Long>>>> futures = new ArrayList<>(numKeys);

boolean allNonZero = false;
while (!allNonZero && deadline.hasTimeLeft()) {
allNonZero = true;

final List<CompletableFuture<ReducingState<Tuple2<Integer, Long>>>> futures = new ArrayList<>(numKeys);
futures.clear();

for (int i = 0; i < numKeys; i++) {
final int key = i;
Expand Down Expand Up @@ -834,7 +836,7 @@ public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
success = true;
} else {
// Retry
Thread.sleep(50L);
Thread.sleep(RETRY_TIMEOUT);
}
}

Expand Down Expand Up @@ -925,7 +927,7 @@ public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
success = true;
} else {
// Retry
Thread.sleep(50L);
Thread.sleep(RETRY_TIMEOUT);
}
}

Expand Down Expand Up @@ -1033,7 +1035,7 @@ public void processElement(Tuple2<Integer, Long> value, Context ctx, Collector<O
success = true;
} else {
// Retry
Thread.sleep(50L);
Thread.sleep(RETRY_TIMEOUT);
}
}

Expand Down Expand Up @@ -1145,7 +1147,7 @@ public void processElement(Tuple2<Integer, Long> value, Context ctx, Collector<O
results.put(key, res);
} else {
// Retry
Thread.sleep(50L);
Thread.sleep(RETRY_TIMEOUT);
}
}

Expand Down Expand Up @@ -1240,7 +1242,7 @@ public Integer getKey(Tuple2<Integer, Long> value) throws Exception {
success = true;
} else {
// Retry
Thread.sleep(50L);
Thread.sleep(RETRY_TIMEOUT);
}
}

Expand Down Expand Up @@ -1330,6 +1332,7 @@ private static class TestKeyRangeSource extends RichParallelSourceFunction<Tuple
private final int numKeys;
private final ThreadLocalRandom random = ThreadLocalRandom.current();
private volatile boolean isRunning = true;
private int counter = 0;

TestKeyRangeSource(int numKeys) {
this.numKeys = numKeys;
Expand All @@ -1352,9 +1355,13 @@ public void run(SourceContext<Tuple2<Integer, Long>> ctx) throws Exception {
synchronized (ctx.getCheckpointLock()) {
record.f0 = random.nextInt(numKeys);
ctx.collect(record);
counter++;
}

if (counter % 50 == 0) {
// mild slow down
Thread.sleep(1L);
}
// mild slow down
Thread.sleep(1L);
}
}

Expand Down Expand Up @@ -1474,7 +1481,7 @@ private static <K, S extends State, V> CompletableFuture<S> getKvState(
final TypeInformation<K> keyTypeInfo,
final StateDescriptor<S, V> stateDescriptor,
final boolean failForUnknownKeyOrNamespace,
final ScheduledExecutor executor) throws InterruptedException {
final ScheduledExecutor executor) {

final CompletableFuture<S> resultFuture = new CompletableFuture<>();
getKvStateIgnoringCertainExceptions(
Expand All @@ -1493,10 +1500,9 @@ private static <K, S extends State, V> void getKvStateIgnoringCertainExceptions(
final TypeInformation<K> keyTypeInfo,
final StateDescriptor<S, V> stateDescriptor,
final boolean failForUnknownKeyOrNamespace,
final ScheduledExecutor executor) throws InterruptedException {
final ScheduledExecutor executor) {

if (!resultFuture.isDone()) {
Thread.sleep(100L);
CompletableFuture<S> expected = client.getKvState(jobId, queryName, key, keyTypeInfo, stateDescriptor);
expected.whenCompleteAsync((result, throwable) -> {
if (throwable != null) {
Expand All @@ -1507,13 +1513,9 @@ private static <K, S extends State, V> void getKvStateIgnoringCertainExceptions(
) {
resultFuture.completeExceptionally(throwable.getCause());
} else if (deadline.hasTimeLeft()) {
try {
getKvStateIgnoringCertainExceptions(
deadline, resultFuture, client, jobId, queryName, key, keyTypeInfo,
stateDescriptor, failForUnknownKeyOrNamespace, executor);
} catch (InterruptedException e) {
e.printStackTrace();
}
getKvStateIgnoringCertainExceptions(
deadline, resultFuture, client, jobId, queryName, key, keyTypeInfo,
stateDescriptor, failForUnknownKeyOrNamespace, executor);
}
} else {
resultFuture.complete(result);
Expand Down Expand Up @@ -1557,7 +1559,7 @@ private void executeValueQuery(
success = true;
} else {
// Retry
Thread.sleep(50L);
Thread.sleep(RETRY_TIMEOUT);
}
}

Expand Down

0 comments on commit 6489c67

Please sign in to comment.