Skip to content
This repository has been archived by the owner on May 12, 2021. It is now read-only.

Commit

Permalink
Added trigger unit tests
Browse files Browse the repository at this point in the history
  • Loading branch information
David Yan committed Jun 25, 2016
1 parent 48ad954 commit 5ac037b
Show file tree
Hide file tree
Showing 7 changed files with 86 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
/**
* This interface describes what needs to be implemented for the operator that supports the Apache Beam model of
* windowing and triggering
*
* @param <InputT> The type of the input tuple
*/
@InterfaceStability.Evolving
public interface WindowedOperator<InputT>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,8 @@
* WindowedStorage is a key-value store with the key being the window. The implementation of this interface should
* make sure checkpointing and recovery will be done correctly.
*
* @param <T> The type of the data that is stored per window
*
* TODO: Look at the possibility of integrating spillable data structure: https://issues.apache.org/jira/browse/APEXMALHAR-2026
*/
@InterfaceStability.Evolving
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,10 @@
* concepts. The subclass of this abstract class is supposed to provide the implementation of how the accumulated
* values are stored in the storage.
*
* @param <InputT> The type of the input tuple
* @param <OutputT> The type of the output tuple
* @param <DataStorageT> The type of the data storage
* @param <AccumulationT> The type of the accumulation
*/
@InterfaceStability.Evolving
public abstract class AbstractWindowedOperator<InputT, OutputT, DataStorageT extends WindowedStorage, AccumulationT extends Accumulation>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,11 @@
/**
* This is an implementation of WindowedOperator that takes in key value pairs as input and gives out key value pairs
* as output. If your operation is not key based, please use {@link WindowedOperatorImpl}.
*
* @param <KeyT> The type of the key of both the input and the output tuple
* @param <InputValT> The type of the value of the keyed input tuple
* @param <AccumT> The type of the accumulated value in the operator state per key per window
* @param <OutputValT> The type of the value of the keyed output tuple
*/
@InterfaceStability.Evolving
public class KeyedWindowedOperatorImpl<KeyT, InputValT, AccumT, OutputValT>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,7 @@
import org.apache.hadoop.classification.InterfaceStability;

/**
* Created by david on 6/23/16.
* The implementation of the Watermark tuple
*/
@InterfaceStability.Evolving
public class WatermarkImpl implements ControlTuple.Watermark
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,10 @@

/**
* This is an implementation of the WindowedOperator. If your operation is key based, please use {@link KeyedWindowedOperatorImpl}.
*
* @param <InputT> The type of the value of the input tuple
* @param <AccumT> The type of the accumulated value in the operator state per window
* @param <OutputT> The type of the value of the output tuple
*/
@InterfaceStability.Evolving
public class WindowedOperatorImpl<InputT, AccumT, OutputT>
Expand Down Expand Up @@ -68,6 +72,8 @@ public void fireRetractionTrigger(Window window)
throw new UnsupportedOperationException();
}
AccumT accumulatedValue = retractionStorage.get(window);
output.emit(new Tuple.WindowedTuple<>(window, accumulation.getRetraction(accumulatedValue)));
if (accumulatedValue != null) {
output.emit(new Tuple.WindowedTuple<>(window, accumulation.getRetraction(accumulatedValue)));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -135,26 +135,86 @@ public void testWatermarkAndAllowedLateness()
windowedOperator.endWindow();
}

private void testTrigger(TriggerOption.AccumulationMode accumulationMode)
{
WindowedOperatorImpl<Long, Long, Long> windowedOperator = createDefaultWindowedOperator();
TriggerOption triggerOption = new TriggerOption().withEarlyFiringsAtEvery(Duration.millis(1000));
switch (accumulationMode) {
case ACCUMULATING:
triggerOption.accumulatingFiredPanes();
break;
case ACCUMULATING_AND_RETRACTING:
triggerOption.accumulatingAndRetractingFiredPanes();
break;
case DISCARDING:
triggerOption.discardingFiredPanes();
break;
}
windowedOperator.setTriggerOption(triggerOption);
windowedOperator.setWindowOption(new WindowOption.TimeWindows(Duration.millis(1000)));
CollectorTestSink sink = new CollectorTestSink();
windowedOperator.output.setSink(sink);
OperatorContextTestHelper.TestIdOperatorContext context = new OperatorContextTestHelper.TestIdOperatorContext(1,
new Attribute.AttributeMap.DefaultAttributeMap());
windowedOperator.setup(context);
windowedOperator.beginWindow(1);
windowedOperator.processTuple(new Tuple.TimestampedTuple<>(100L, 2L));
windowedOperator.processTuple(new Tuple.TimestampedTuple<>(200L, 3L));
windowedOperator.endWindow();
Assert.assertTrue("No trigger should be fired yet", sink.collectedTuples.isEmpty());
windowedOperator.beginWindow(2);
windowedOperator.endWindow();
Assert.assertTrue("No trigger should be fired yet", sink.collectedTuples.isEmpty());
windowedOperator.beginWindow(3);
windowedOperator.endWindow();
Assert.assertEquals("There should be exactly one tuple for the time trigger", 1, sink.collectedTuples.size());
Assert.assertEquals(5L, ((Tuple<Long>)sink.collectedTuples.get(0)).getValue().longValue());
sink.collectedTuples.clear();
windowedOperator.beginWindow(4);
windowedOperator.processTuple(new Tuple.TimestampedTuple<>(400L, 4L));
windowedOperator.endWindow();
Assert.assertTrue("No trigger should be fired yet", sink.collectedTuples.isEmpty());
windowedOperator.beginWindow(5);
windowedOperator.processTuple(new Tuple.TimestampedTuple<>(300L, 5L));
windowedOperator.endWindow();
switch (accumulationMode) {
case ACCUMULATING:
Assert.assertEquals("There should be exactly two tuples for the time trigger", 1, sink.collectedTuples.size());
Assert.assertEquals(14L, ((Tuple<Long>)sink.collectedTuples.get(0)).getValue().longValue());
break;
case DISCARDING:
Assert.assertEquals("There should be exactly two tuples for the time trigger", 1, sink.collectedTuples.size());
Assert.assertEquals(9L, ((Tuple<Long>)sink.collectedTuples.get(0)).getValue().longValue());
break;
case ACCUMULATING_AND_RETRACTING:
Assert.assertEquals("There should be exactly two tuples for the time trigger", 2, sink.collectedTuples.size());
Assert.assertEquals(-5L, ((Tuple<Long>)sink.collectedTuples.get(0)).getValue().longValue());
Assert.assertEquals(14L, ((Tuple<Long>)sink.collectedTuples.get(1)).getValue().longValue());
break;
}
}


@Test
public void testTriggerWithDiscardingMode()
{

testTrigger(TriggerOption.AccumulationMode.DISCARDING);
}

@Test
public void testTriggerWithAccumulatingMode()
{

testTrigger(TriggerOption.AccumulationMode.ACCUMULATING);
}

@Test
public void testTriggerWithAccumulatingModeFiringOnlyUpdatedPanes()
public void testTriggerWithAccumulatingAndRetractingMode()
{

testTrigger(TriggerOption.AccumulationMode.ACCUMULATING_AND_RETRACTING);
}

@Test
public void testTriggerWithAccumulatingAndRetractingMode()
public void testTriggerWithAccumulatingModeFiringOnlyUpdatedPanes()
{

}
Expand Down

0 comments on commit 5ac037b

Please sign in to comment.