From 76f297f283bd5e24cd9ec7516f0363420378ec09 Mon Sep 17 00:00:00 2001 From: Aayushi Acharya Date: Mon, 11 Jul 2016 15:55:57 -0700 Subject: [PATCH] APEXMALHAR-2139 #resolve #comment Emit only the keys for which values changed or were added in a given window. --- .../datatorrent/lib/algo/UniqueCounter.java | 25 +++++------- .../lib/algo/UniqueCounterTest.java | 40 ++++++++++++------- 2 files changed, 35 insertions(+), 30 deletions(-) diff --git a/library/src/main/java/com/datatorrent/lib/algo/UniqueCounter.java b/library/src/main/java/com/datatorrent/lib/algo/UniqueCounter.java index 013c8bca8e..90b90a2de7 100644 --- a/library/src/main/java/com/datatorrent/lib/algo/UniqueCounter.java +++ b/library/src/main/java/com/datatorrent/lib/algo/UniqueCounter.java @@ -19,9 +19,7 @@ package com.datatorrent.lib.algo; import java.util.HashMap; -import java.util.Map; - -import org.apache.commons.lang.mutable.MutableInt; +import java.util.HashSet; import com.datatorrent.api.DefaultInputPort; import com.datatorrent.api.DefaultOutputPort; @@ -32,7 +30,7 @@ /** * This operator counts the number of times a tuple exists in a window. A map from tuples to counts is emitted at the end of each window. *

- * Counts the number of times a key exists in a window; Count is emitted at end of window in a single HashMap. + * Counts the number of times a key exists or is added in that given window; Count is emitted for the modified or added keys at end of window in a single HashMap. *

*

* This is an end of window operator
@@ -58,6 +56,7 @@ public class UniqueCounter extends BaseUniqueKeyCounter { private boolean cumulative; + HashSet inputSet = new HashSet<>(); /** * The input port which receives incoming tuples. @@ -70,9 +69,9 @@ public class UniqueCounter extends BaseUniqueKeyCounter @Override public void process(K tuple) { + inputSet.add(tuple); processTuple(tuple); } - }; /** @@ -90,24 +89,20 @@ public Unifier> getUnifier() }; /** - * Emits one HashMap as tuple + * Emits only the keys and values changed or added in a given window. */ @Override public void endWindow() { - HashMap tuple = null; - for (Map.Entry e: map.entrySet()) { - if (tuple == null) { - tuple = new HashMap(); - } - tuple.put(e.getKey(), e.getValue().toInteger()); - } - if (tuple != null) { - count.emit(tuple); + HashMap tuple = new HashMap<>(); + for (K key: inputSet) { + tuple.put(key, map.get(key).toInteger()); } + count.emit(tuple); if (!cumulative) { map.clear(); } + inputSet.clear(); } /** diff --git a/library/src/test/java/com/datatorrent/lib/algo/UniqueCounterTest.java b/library/src/test/java/com/datatorrent/lib/algo/UniqueCounterTest.java index d1fbc02119..250253858d 100644 --- a/library/src/test/java/com/datatorrent/lib/algo/UniqueCounterTest.java +++ b/library/src/test/java/com/datatorrent/lib/algo/UniqueCounterTest.java @@ -48,7 +48,6 @@ public void testNodeProcessing() throws Exception String btuple = "b"; String ctuple = "c"; String dtuple = "d"; - String etuple = "e"; int numTuples = 10000; oper.beginWindow(0); @@ -57,28 +56,39 @@ public void testNodeProcessing() throws Exception if (i % 2 == 0) { oper.data.process(btuple); } - if (i % 3 == 0) { + if (i % 5 == 0) { oper.data.process(ctuple); } - if (i % 5 == 0) { - oper.data.process(dtuple); + } + oper.endWindow(); + + oper.beginWindow(1); + for (int i = 0; i < numTuples; i++) { + oper.data.process(atuple); + } + oper.endWindow(); + + HashMap tuple = (HashMap)sink.tuple; + int acount = tuple.get("a"); + Assert.assertEquals("number emitted tuples", numTuples, acount); + + oper.beginWindow(2); + for (int i = 0; i < numTuples; i++) { + if (i % 2 == 0) { + oper.data.process(btuple); } + oper.data.process(btuple); if (i % 10 == 0) { - oper.data.process(etuple); + oper.data.process(dtuple); } } oper.endWindow(); - HashMap tuple = (HashMap)sink.tuple; - int acount = tuple.get("a"); + + tuple = (HashMap)sink.tuple; int bcount = tuple.get("b"); - int ccount = tuple.get("c"); int dcount = tuple.get("d"); - int ecount = tuple.get("e"); - Assert.assertEquals("number emitted tuples", 1, sink.count); - Assert.assertEquals("number emitted tuples", numTuples, acount); - Assert.assertEquals("number emitted tuples", numTuples / 2, bcount); - Assert.assertEquals("number emitted tuples", numTuples / 3 + 1, ccount); - Assert.assertEquals("number emitted tuples", numTuples / 5, dcount); - Assert.assertEquals("number emitted tuples", numTuples / 10, ecount); + Assert.assertEquals("number emitted tuples", 3, sink.count); + Assert.assertEquals("number emitted tuples", numTuples + (numTuples / 2), bcount); + Assert.assertEquals("number emitted tuples", numTuples / 10, dcount); } }