From ca9fed971f7ff26643ce2417c750b951fc161752 Mon Sep 17 00:00:00 2001 From: Chandni Singh Date: Mon, 16 May 2016 17:30:51 -0700 Subject: [PATCH] APEXMALHAR-2073 fixed the race condition that caused the first key in tbm to be null --- .../apex/malhar/lib/state/managed/Bucket.java | 12 ++------- .../lib/state/managed/BucketsFileSystem.java | 26 +++++++++++++------ .../managed/IncrementalCheckpointManager.java | 8 +++++- .../lib/state/managed/StateTracker.java | 2 +- .../state/managed/BucketsFileSystemTest.java | 13 +++++++--- .../state/managed/ManagedStateImplTest.java | 1 - 6 files changed, 37 insertions(+), 25 deletions(-) diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java index 107bfc6563..352f121e6d 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/Bucket.java @@ -285,7 +285,6 @@ private Slice getFromReaders(Slice key, long timeBucket) } else { //search all the time buckets for (BucketsFileSystem.TimeBucketMeta immutableTimeBucketMeta : cachedBucketMetas) { - if (managedStateContext.getKeyComparator().compare(key, immutableTimeBucketMeta.getFirstKey()) >= 0) { //keys in the time bucket files are sorted so if the first key in the file is greater than the key being //searched, the key will not be present in that file. @@ -400,16 +399,8 @@ public void put(Slice key, long timeBucket, Slice value) @Override public long freeMemory(long windowId) throws IOException { - LOG.debug("free space {}", bucketId); long memoryFreed = 0; - - for (Map.Entry> windowEntry : committedData.entrySet()) { - for (Map.Entry entry: windowEntry.getValue().entrySet()) { - memoryFreed += entry.getKey().length + entry.getValue().getValue().length; - } - } - - Long clearWindowId = null; + Long clearWindowId; while ((clearWindowId = committedData.floorKey(windowId)) != null) { Map windowData = committedData.remove(clearWindowId); @@ -432,6 +423,7 @@ public long freeMemory(long windowId) throws IOException } sizeInBytes.getAndAdd(-memoryFreed); + LOG.debug("space freed {} {}", bucketId, memoryFreed); return memoryFreed; } diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/BucketsFileSystem.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/BucketsFileSystem.java index 8304fb6690..483d9e8c63 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/BucketsFileSystem.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/BucketsFileSystem.java @@ -150,7 +150,11 @@ protected void writeBucketData(long windowId, long bucketId, Map getAllTimeBuckets(long bucketId) throws IOExcepti */ private void loadBucketMetaFile(long bucketId, DataInputStream dis) throws IOException { + LOG.debug("Loading bucket meta-file {}", bucketId); int metaDataVersion = dis.readInt(); if (metaDataVersion == META_FILE_VERSION) { diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java index 02fd6ec98a..536702d466 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/IncrementalCheckpointManager.java @@ -71,6 +71,8 @@ public class IncrementalCheckpointManager extends WindowDataManager.FSWindowData private transient int waitMillis; private volatile long lastTransferredWindow = Stateless.WINDOW_ID; + private transient long largestWindowAddedToTransferQueue = Stateless.WINDOW_ID; + public IncrementalCheckpointManager() { super(); @@ -188,8 +190,12 @@ protected void committed(int operatorId, long windowId) throws IOException, Inte { LOG.debug("data manager committed {}", windowId); for (Long currentWindow : savedWindows.keySet()) { + if (currentWindow <= largestWindowAddedToTransferQueue) { + continue; + } if (currentWindow <= windowId) { - LOG.debug("to transfer {}", windowId); + LOG.debug("to transfer {}", currentWindow); + largestWindowAddedToTransferQueue = currentWindow; windowsToTransfer.add(currentWindow); } else { break; diff --git a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/StateTracker.java b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/StateTracker.java index 3bb5507f31..4813c25771 100644 --- a/library/src/main/java/org/apache/apex/malhar/lib/state/managed/StateTracker.java +++ b/library/src/main/java/org/apache/apex/malhar/lib/state/managed/StateTracker.java @@ -123,7 +123,7 @@ public void run() long sizeFreed; try { sizeFreed = bucket.freeMemory(managedStateImpl.checkpointManager.getLastTransferredWindow()); - LOG.debug("size freed {} {}", bucketId, sizeFreed); + LOG.debug("bucket freed {} {}", bucketId, sizeFreed); } catch (IOException e) { managedStateImpl.throwable.set(e); throw new RuntimeException("freeing " + bucketId, e); diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/BucketsFileSystemTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/BucketsFileSystemTest.java index ee6cf0089f..1696d4d9a2 100644 --- a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/BucketsFileSystemTest.java +++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/BucketsFileSystemTest.java @@ -97,10 +97,12 @@ public void testTransferOfExistingBucket() throws IOException public void testUpdateBucketMetaDataFile() throws IOException { testMeta.bucketsFileSystem.setup(testMeta.managedStateContext); - BucketsFileSystem.MutableTimeBucketMeta mutableTbm = testMeta.bucketsFileSystem.getOrCreateTimeBucketMeta(1, 1); + BucketsFileSystem.MutableTimeBucketMeta mutableTbm = new BucketsFileSystem.MutableTimeBucketMeta(1, 1); mutableTbm.updateTimeBucketMeta(10, 100, new Slice("1".getBytes())); + testMeta.bucketsFileSystem.updateTimeBuckets(mutableTbm); testMeta.bucketsFileSystem.updateBucketMetaFile(1); + BucketsFileSystem.TimeBucketMeta immutableTbm = testMeta.bucketsFileSystem.getTimeBucketMeta(1, 1); Assert.assertNotNull(immutableTbm); Assert.assertEquals("last transferred window", 10, immutableTbm.getLastTransferredWindowId()); @@ -116,7 +118,8 @@ public void testGetTimeBucketMeta() throws IOException BucketsFileSystem.TimeBucketMeta bucketMeta = testMeta.bucketsFileSystem.getTimeBucketMeta(1, 1); Assert.assertNull("bucket meta", bucketMeta); - testMeta.bucketsFileSystem.getOrCreateTimeBucketMeta(1, 1); + BucketsFileSystem.MutableTimeBucketMeta mutableTimeBucketMeta = new BucketsFileSystem.MutableTimeBucketMeta(1, 1); + testMeta.bucketsFileSystem.updateTimeBuckets(mutableTimeBucketMeta); bucketMeta = testMeta.bucketsFileSystem.getTimeBucketMeta(1, 1); Assert.assertNotNull("bucket meta not null", bucketMeta); testMeta.bucketsFileSystem.teardown(); @@ -126,11 +129,13 @@ public void testGetTimeBucketMeta() throws IOException public void testGetAllTimeBucketMeta() throws IOException { testMeta.bucketsFileSystem.setup(testMeta.managedStateContext); - BucketsFileSystem.MutableTimeBucketMeta tbm1 = testMeta.bucketsFileSystem.getOrCreateTimeBucketMeta(1, 1); + BucketsFileSystem.MutableTimeBucketMeta tbm1 = new BucketsFileSystem.MutableTimeBucketMeta(1, 1); tbm1.updateTimeBucketMeta(10, 100, new Slice("1".getBytes())); + testMeta.bucketsFileSystem.updateTimeBuckets(tbm1); - BucketsFileSystem.MutableTimeBucketMeta tbm2 = testMeta.bucketsFileSystem.getOrCreateTimeBucketMeta(1, 2); + BucketsFileSystem.MutableTimeBucketMeta tbm2 = new BucketsFileSystem.MutableTimeBucketMeta(1, 2); tbm2.updateTimeBucketMeta(10, 100, new Slice("2".getBytes())); + testMeta.bucketsFileSystem.updateTimeBuckets(tbm2); testMeta.bucketsFileSystem.updateBucketMetaFile(1); TreeSet timeBucketMetas = diff --git a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateImplTest.java b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateImplTest.java index cdd9781802..99e6c23ed0 100644 --- a/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateImplTest.java +++ b/library/src/test/java/org/apache/apex/malhar/lib/state/managed/ManagedStateImplTest.java @@ -174,7 +174,6 @@ public void testFreeWindowTransferRaceCondition() throws Exception { testMeta.managedState.setMaxMemorySize(1); testMeta.managedState.setCheckStateSizeInterval(Duration.millis(1L)); - testMeta.managedState.setCheckStateSizeInterval(Duration.millis(1L)); testMeta.managedState.setup(testMeta.operatorContext); int numKeys = 300;