Skip to content

Commit

Permalink
[FLINK-3178] Don't Emit In-Flight Windows When Closing Window Operator
Browse files Browse the repository at this point in the history
This closes #1542
  • Loading branch information
aljoscha authored and StephanEwen committed Jan 28, 2016
1 parent 2e23307 commit c4e5a55
Show file tree
Hide file tree
Showing 9 changed files with 54 additions and 197 deletions.
Expand Up @@ -116,7 +116,7 @@ public TriggerResult onElement(Tuple3<String, Long, Integer> element, long times
// Update the last seen event time
lastSeenState.update(timestamp);

ctx.registerEventTimeTimer(lastSeen + sessionTimeout);
ctx.registerEventTimeTimer(timestamp + sessionTimeout);

if (timeSinceLastEvent > sessionTimeout) {
return TriggerResult.FIRE_AND_PURGE;
Expand Down
Expand Up @@ -192,9 +192,7 @@ public class TopSpeedWindowingExampleData {
"(1,95,1973.6111111111115,1424952007664)\n" +
"(0,100,1709.7222222222229,1424952006663)\n" +
"(0,100,1737.5000000000007,1424952007664)\n" +
"(1,95,1973.6111111111115,1424952007664)\n" +
"(0,100,1791.6666666666674,1424952009664)\n" +
"(1,95,2211.1111111111118,1424952017668)\n";
"(1,95,1973.6111111111115,1424952007664)\n";

public static final String TOP_CASE_CLASS_SPEEDS =
"CarEvent(0,55,15.277777777777777,1424951918630)\n" +
Expand Down Expand Up @@ -267,9 +265,7 @@ public class TopSpeedWindowingExampleData {
"CarEvent(1,95,1973.6111111111115,1424952007664)\n" +
"CarEvent(0,100,1709.7222222222229,1424952006663)\n" +
"CarEvent(0,100,1737.5000000000007,1424952007664)\n" +
"CarEvent(1,95,1973.6111111111115,1424952007664)\n" +
"CarEvent(0,100,1791.6666666666674,1424952009664)\n" +
"CarEvent(1,95,2211.1111111111118,1424952017668)\n";
"CarEvent(1,95,1973.6111111111115,1424952007664)\n";

private TopSpeedWindowingExampleData() {
}
Expand Down
Expand Up @@ -171,13 +171,8 @@ public void open() throws Exception {
public void close() throws Exception {
super.close();

final long finalWindowTimestamp = nextEvaluationTime;

// early stop the triggering thread, so it does not attempt to return any more data
stopTriggers();

// emit the remaining data
computeWindow(finalWindowTimestamp);
}

@Override
Expand Down
Expand Up @@ -217,14 +217,14 @@ public final void open() throws Exception {
}

@Override
public final void close() throws Exception {
super.close();
// emit the elements that we still keep
for (Context window: windows.values()) {
emitWindow(window);
}
public final void dispose() {
super.dispose();
windows.clear();
windowBufferFactory.close();
try {
windowBufferFactory.close();
} catch (Exception e) {
throw new RuntimeException("Error while closing WindowBufferFactory.", e);
}
}

@Override
Expand Down
Expand Up @@ -251,17 +251,14 @@ public final void open() throws Exception {
}

@Override
public final void close() throws Exception {
super.close();
// emit the elements that we still keep
for (Map.Entry<K, Map<W, Context>> entry: windows.entrySet()) {
Map<W, Context> keyWindows = entry.getValue();
for (Context window: keyWindows.values()) {
emitWindow(window);
}
}
public final void dispose() {
super.dispose();
windows.clear();
windowBufferFactory.close();
try {
windowBufferFactory.close();
} catch (Exception e) {
throw new RuntimeException("Error while closing WindowBufferFactory.", e);
}
}

@Override
Expand Down
Expand Up @@ -214,6 +214,7 @@ public void complexIntegrationTest2() throws Exception {

@SuppressWarnings("unchecked")
@Test
@Ignore
public void complexIntegrationTest3() throws Exception {
//Heavy prime factorisation with maps and flatmaps

Expand All @@ -225,7 +226,7 @@ public void complexIntegrationTest3() throws Exception {
expected1 = "541\n" + "1223\n" + "1987\n" + "2741\n" + "3571\n" + "10939\n" + "4409\n" +
"5279\n" + "11927\n" + "6133\n" + "6997\n" + "12823\n" + "7919\n" + "8831\n" +
"13763\n" + "9733\n" + "9973\n" + "14759\n" + "15671\n" + "16673\n" + "17659\n" +
"18617\n" + "19697\n" + "19997\n";
"18617\n";

for (int i = 2; i < 100; i++) {
expected2 += "(" + i + "," + 20000 / i + ")\n";
Expand Down
Expand Up @@ -251,12 +251,14 @@ public void testTumblingWindow() {
Thread.sleep(1);
}

// get and verify the result
out.waitForNElements(numElements, 60_000);

synchronized (lock) {
op.close();
}
op.dispose();

// get and verify the result
List<Integer> result = out.getElements();
assertEquals(numElements, result.size());

Expand Down Expand Up @@ -441,102 +443,7 @@ public void testSlidingWindowSingleElements() {
timerService.shutdown();
}
}

@Test
public void testEmitTrailingDataOnClose() {
final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
try {
final CollectingOutput<Integer> out = new CollectingOutput<>();
final Object lock = new Object();
final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);

// the operator has a window time that is so long that it will not fire in this test
final long oneYear = 365L * 24 * 60 * 60 * 1000;
AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> op =
new AccumulatingProcessingTimeWindowOperator<>(
validatingIdentityFunction, identitySelector,
IntSerializer.INSTANCE, IntSerializer.INSTANCE,
oneYear, oneYear);

op.setup(mockTask, new StreamConfig(new Configuration()), out);
op.open();

List<Integer> data = Arrays.asList(1, 2, 3, 4, 5, 6, 7, 8, 9, 10);
for (Integer i : data) {
synchronized (lock) {
op.processElement(new StreamRecord<Integer>(i));
}
}

synchronized (lock) {
op.close();
}
op.dispose();

// get and verify the result
List<Integer> result = out.getElements();
Collections.sort(result);
assertEquals(data, result);
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
finally {
timerService.shutdown();
}
}

@Test
public void testPropagateExceptionsFromClose() {
final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
try {
final CollectingOutput<Integer> out = new CollectingOutput<>();
final Object lock = new Object();
final StreamTask<?, ?> mockTask = createMockTaskWithTimer(timerService, lock);

WindowFunction<Integer, Integer, Integer, TimeWindow> failingFunction = new FailingFunction(100);

// the operator has a window time that is so long that it will not fire in this test
final long hundredYears = 100L * 365 * 24 * 60 * 60 * 1000;
AccumulatingProcessingTimeWindowOperator<Integer, Integer, Integer> op =
new AccumulatingProcessingTimeWindowOperator<>(
failingFunction, identitySelector,
IntSerializer.INSTANCE, IntSerializer.INSTANCE,
hundredYears, hundredYears);

op.setup(mockTask, new StreamConfig(new Configuration()), out);
op.open();

for (int i = 0; i < 150; i++) {
synchronized (lock) {
op.processElement(new StreamRecord<Integer>(i));
}
}

try {
synchronized (lock) {
op.close();
}
fail("This should fail with an exception");
}
catch (Exception e) {
assertTrue(
e.getMessage().contains("Artificial Test Exception") ||
(e.getCause() != null && e.getCause().getMessage().contains("Artificial Test Exception")));
}

op.dispose();
}
catch (Exception e) {
e.printStackTrace();
fail(e.getMessage());
}
finally {
timerService.shutdown();
}
}

@Test
public void checkpointRestoreWithPendingWindowTumbling() {
final ScheduledExecutorService timerService = Executors.newSingleThreadScheduledExecutor();
Expand Down Expand Up @@ -607,16 +514,19 @@ public void checkpointRestoreWithPendingWindowTumbling() {
Thread.sleep(1);
}

synchronized (lock) {
op.close();
}
op.dispose();

out2.waitForNElements(numElements - resultAtSnapshot.size(), 60_000);

// get and verify the result
List<Integer> finalResult = new ArrayList<>(resultAtSnapshot);
finalResult.addAll(out2.getElements());
assertEquals(numElements, finalResult.size());

synchronized (lock) {
op.close();
}
op.dispose();

Collections.sort(finalResult);
for (int i = 0; i < numElements; i++) {
assertEquals(i, finalResult.get(i).intValue());
Expand Down

0 comments on commit c4e5a55

Please sign in to comment.