From c118884ff351e06d6ad86d9fd9e2de497e1ac356 Mon Sep 17 00:00:00 2001 From: francisf Date: Fri, 14 Oct 2016 17:03:15 +0530 Subject: [PATCH] APEXMALHAR-2299 TimeBasedDedupOperator. Fixing edge case bug in time bucket assignment --- .../lib/state/managed/TimeBucketAssigner.java | 2 +- .../state/managed/TimeBucketAssignerTest.java | 21 +++++++++++++++++++ 2 files changed, 22 insertions(+), 1 deletion(-) diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/TimeBucketAssigner.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/TimeBucketAssigner.java index 435ffe2a25..d218b37d03 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/TimeBucketAssigner.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/TimeBucketAssigner.java @@ -124,7 +124,7 @@ public long getTimeBucketAndAdjustBoundaries(long value) } long diffFromStart = value - fixedStart; long key = diffFromStart / bucketSpanMillis; - if (value > end) { + if (value >= end) { long diffInBuckets = (value - end) / bucketSpanMillis; long move = (diffInBuckets + 1) * bucketSpanMillis; start += move; diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/TimeBucketAssignerTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/TimeBucketAssignerTest.java index 4ceef1f738..8ca0960139 100644 --- a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/TimeBucketAssignerTest.java +++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/TimeBucketAssignerTest.java @@ -92,4 +92,25 @@ public void testTimeBucketKey() Assert.assertEquals("time bucket", -1, testMeta.timeBucketAssigner.getTimeBucketAndAdjustBoundaries(expiredTime)); testMeta.timeBucketAssigner.teardown(); } + + @Test + public void testTimeBucketKeyExpiry() + { + testMeta.timeBucketAssigner.setExpireBefore(Duration.standardSeconds(1)); + testMeta.timeBucketAssigner.setBucketSpan(Duration.standardSeconds(1)); + + long referenceTime = testMeta.timeBucketAssigner.getReferenceInstant().getMillis(); + testMeta.timeBucketAssigner.setup(testMeta.mockManagedStateContext); + + long time1 = Duration.standardSeconds(9).getMillis() + referenceTime; + Assert.assertEquals("time bucket", 10, testMeta.timeBucketAssigner.getTimeBucketAndAdjustBoundaries(time1) ); + + long time2 = Duration.standardSeconds(10).getMillis() + referenceTime; + Assert.assertEquals("time bucket", 11, testMeta.timeBucketAssigner.getTimeBucketAndAdjustBoundaries(time2) ); + + //Check for expiry of time1 now + Assert.assertEquals("time bucket", -1, testMeta.timeBucketAssigner.getTimeBucketAndAdjustBoundaries(time1) ); + + testMeta.timeBucketAssigner.teardown(); + } }