From b87d4069d51c1ba3e1604087a6faddccde6a2d61 Mon Sep 17 00:00:00 2001 From: Yiming Zang Date: Wed, 20 Jul 2016 18:09:33 -0700 Subject: [PATCH 1/2] fix test cases caused by time sensitive issues RB_ID=854009 --- .../config/TestConfigurationSubscription.java | 16 +++++++++++++++- .../TestDynamicConfigurationFeatureProvider.java | 16 ++++++++++++++++ .../service/TestDistributedLogService.java | 10 ++++++---- 3 files changed, 37 insertions(+), 5 deletions(-) diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/config/TestConfigurationSubscription.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/config/TestConfigurationSubscription.java index 278bf29e6..24733a457 100644 --- a/distributedlog-core/src/test/java/com/twitter/distributedlog/config/TestConfigurationSubscription.java +++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/config/TestConfigurationSubscription.java @@ -42,6 +42,17 @@ public class TestConfigurationSubscription { static final Logger LOG = LoggerFactory.getLogger(TestConfigurationSubscription.class); + /** + * Give FileChangedReloadingStrategy some time to start reloading + * Make sure now!=lastChecked + * {@link org.apache.commons.configuration.reloading.FileChangedReloadingStrategy#reloadingRequired()} + */ + private void ensureConfigReloaded() throws InterruptedException { + // sleep 1 ms so that System.currentTimeMillis() != + // lastChecked (the time we construct FileChangedReloadingStrategy + Thread.sleep(1); + } + @Test(timeout = 60000) public void testReloadConfiguration() throws Exception { PropertiesWriter writer = new PropertiesWriter(); @@ -63,7 +74,8 @@ public void onReload(ConcurrentBaseConfiguration conf) { // add writer.setProperty("prop1", "1"); writer.save(); - + // ensure the file change reloading event can be triggered + ensureConfigReloaded(); // reload the config confSub.reload(); assertNotNull(confHolder.get()); @@ -85,6 +97,8 @@ public void testAddReloadBasicsConfig() throws Exception { // add writer.setProperty("prop1", "1"); writer.save(); + // ensure the file change reloading event can be triggered + ensureConfigReloaded(); mockScheduler.tick(100, TimeUnit.MILLISECONDS); assertEquals("1", conf.getProperty("prop1")); diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/feature/TestDynamicConfigurationFeatureProvider.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/feature/TestDynamicConfigurationFeatureProvider.java index 35fac65dd..46c188030 100644 --- a/distributedlog-core/src/test/java/com/twitter/distributedlog/feature/TestDynamicConfigurationFeatureProvider.java +++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/feature/TestDynamicConfigurationFeatureProvider.java @@ -30,6 +30,19 @@ */ public class TestDynamicConfigurationFeatureProvider { + /** + * Make sure config is reloaded + * + * Give FileChangedReloadingStrategy some time to allow reloading + * Make sure now!=lastChecked + * {@link org.apache.commons.configuration.reloading.FileChangedReloadingStrategy#reloadingRequired()} + */ + private void ensureConfigReloaded() throws InterruptedException { + // sleep 1 ms so that System.currentTimeMillis() != + // lastChecked (the time we construct FileChangedReloadingStrategy + Thread.sleep(1); + } + @Test(timeout = 60000) public void testLoadFeaturesFromBase() throws Exception { PropertiesWriter writer = new PropertiesWriter(); @@ -43,6 +56,7 @@ public void testLoadFeaturesFromBase() throws Exception { DynamicConfigurationFeatureProvider provider = new DynamicConfigurationFeatureProvider("", conf, NullStatsLogger.INSTANCE); provider.start(); + ensureConfigReloaded(); Feature feature1 = provider.getFeature("feature_1"); assertTrue(feature1.isAvailable()); @@ -79,6 +93,7 @@ public void testLoadFeaturesFromOverlay() throws Exception { DynamicConfigurationFeatureProvider provider = new DynamicConfigurationFeatureProvider("", conf, NullStatsLogger.INSTANCE); provider.start(); + ensureConfigReloaded(); Feature feature1 = provider.getFeature("feature_1"); assertTrue(feature1.isAvailable()); @@ -118,6 +133,7 @@ public void testReloadFeaturesFromOverlay() throws Exception { DynamicConfigurationFeatureProvider provider = new DynamicConfigurationFeatureProvider("", conf, NullStatsLogger.INSTANCE); provider.start(); + ensureConfigReloaded(); Feature feature1 = provider.getFeature("feature_1"); assertTrue(feature1.isAvailable()); diff --git a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java index 61fb80806..ed456b993 100644 --- a/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java +++ b/distributedlog-service/src/test/java/com/twitter/distributedlog/service/TestDistributedLogService.java @@ -742,12 +742,14 @@ public void testCloseStreamsShouldAbort() throws Exception { StatusCode.WRITE_EXCEPTION == response.getHeader().getCode() || StatusCode.WRITE_CANCELLED_EXCEPTION == response.getHeader().getCode()); } - assertTrue("There should be no streams in the cache", - streamManager.getCachedStreams().isEmpty()); + // acquired streams should all been removed after we close them assertTrue("There should be no streams in the acquired cache", - streamManager.getAcquiredStreams().isEmpty()); - + streamManager.getAcquiredStreams().isEmpty()); localService.shutdown(); + // cached streams wouldn't be removed immediately after streams are closed + // but they should be removed after we shutdown the service + assertTrue("There should be no streams in the cache after shutting down the service", + streamManager.getCachedStreams().isEmpty()); } @Test(timeout = 60000) From 5aaff644b03f72ad9193216b63ceb0bf83cf5127 Mon Sep 17 00:00:00 2001 From: Yiming Zang Date: Fri, 22 Jul 2016 10:05:02 -0700 Subject: [PATCH 2/2] Fix some flaky tests RB_ID=854457 --- .../BKDistributedLogManager.java | 2 + .../distributedlog/TestAsyncReaderWriter.java | 5 ++- .../distributedlog/TestNonBlockingReads.java | 38 ++++++++++++++----- 3 files changed, 34 insertions(+), 11 deletions(-) diff --git a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java index fd8ec2dee..9c193813c 100644 --- a/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java +++ b/distributedlog-core/src/main/java/com/twitter/distributedlog/BKDistributedLogManager.java @@ -969,6 +969,7 @@ public Future openAsyncLogReader(DLSN fromDLSN) { false, dynConf.getDeserializeRecordSetOnReads(), statsLogger); + pendingReaders.add(reader); return Future.value(reader); } @@ -1095,6 +1096,7 @@ LogReader getInputStreamInternal(DLSN fromDLSN, Optional fromTxnId) true, dynConf.getDeserializeRecordSetOnReads(), statsLogger); + pendingReaders.add(asyncReader); return new BKSyncLogReaderDLSN(conf, asyncReader, scheduler, fromTxnId); } diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java index a6a89ba7e..06cf07926 100644 --- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java +++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestAsyncReaderWriter.java @@ -885,8 +885,11 @@ public void testSimpleAsyncReadWriteStartEmptyFactory() throws Exception { } } + /** + * Flaky test fixed: readers need to be added to the pendingReaders + * @throws Exception + */ @Test(timeout = 300000) - @DistributedLogAnnotations.FlakyTest public void testSimpleAsyncReadWriteSimulateErrors() throws Exception { String name = runtime.getMethodName(); DistributedLogConfiguration confLocal = new DistributedLogConfiguration(); diff --git a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestNonBlockingReads.java b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestNonBlockingReads.java index 90a33e8de..58863c487 100644 --- a/distributedlog-core/src/test/java/com/twitter/distributedlog/TestNonBlockingReads.java +++ b/distributedlog-core/src/test/java/com/twitter/distributedlog/TestNonBlockingReads.java @@ -17,6 +17,7 @@ */ package com.twitter.distributedlog; +import java.util.concurrent.ScheduledFuture; import java.util.concurrent.ScheduledThreadPoolExecutor; import java.util.concurrent.TimeUnit; @@ -33,13 +34,12 @@ public class TestNonBlockingReads extends TestDistributedLogBase { static final Logger LOG = LoggerFactory.getLogger(TestNonBlockingReads.class); - // TODO: investigate why long poll read makes test flaky static { conf.setOutputBufferSize(0); conf.setImmediateFlushEnabled(true); } - @Test(timeout = 60000) + @Test(timeout = 100000) public void testNonBlockingRead() throws Exception { String name = "distrlog-non-blocking-reader"; final DistributedLogConfiguration confLocal = new DistributedLogConfiguration(); @@ -49,9 +49,10 @@ public void testNonBlockingRead() throws Exception { confLocal.setReaderIdleWarnThresholdMillis(100); final DistributedLogManager dlm = createNewDLM(confLocal, name); ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1); + ScheduledFuture writerClosedFuture = null; try { final Thread currentThread = Thread.currentThread(); - executor.schedule( + writerClosedFuture = executor.schedule( new Runnable() { @Override public void run() { @@ -67,12 +68,16 @@ public void run() { readNonBlocking(dlm, false); assertFalse(currentThread.isInterrupted()); } finally { + if (writerClosedFuture != null){ + // ensure writer.closeAndComplete is done before we close dlm + writerClosedFuture.get(); + } executor.shutdown(); dlm.close(); } } - @Test(timeout = 60000) + @Test(timeout = 100000) public void testNonBlockingReadRecovery() throws Exception { String name = "distrlog-non-blocking-reader-recovery"; final DistributedLogConfiguration confLocal = new DistributedLogConfiguration(); @@ -81,9 +86,10 @@ public void testNonBlockingReadRecovery() throws Exception { confLocal.setReadAheadMaxRecords(10); final DistributedLogManager dlm = createNewDLM(confLocal, name); ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1); + ScheduledFuture writerClosedFuture = null; try { final Thread currentThread = Thread.currentThread(); - executor.schedule( + writerClosedFuture = executor.schedule( new Runnable() { @Override public void run() { @@ -100,12 +106,16 @@ public void run() { readNonBlocking(dlm, false); assertFalse(currentThread.isInterrupted()); } finally { + if (writerClosedFuture != null){ + // ensure writer.closeAndComplete is done before we close dlm + writerClosedFuture.get(); + } executor.shutdown(); dlm.close(); } } - @Test(timeout = 60000) + @Test(timeout = 100000) public void testNonBlockingReadIdleError() throws Exception { String name = "distrlog-non-blocking-reader-error"; final DistributedLogConfiguration confLocal = new DistributedLogConfiguration(); @@ -116,10 +126,10 @@ public void testNonBlockingReadIdleError() throws Exception { confLocal.setReaderIdleErrorThresholdMillis(100); final DistributedLogManager dlm = createNewDLM(confLocal, name); ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1); - + ScheduledFuture writerClosedFuture = null; try { final Thread currentThread = Thread.currentThread(); - executor.schedule( + writerClosedFuture = executor.schedule( new Runnable() { @Override public void run() { @@ -141,6 +151,10 @@ public void run() { assertTrue(exceptionEncountered); assertFalse(currentThread.isInterrupted()); } finally { + if (writerClosedFuture != null){ + // ensure writer.closeAndComplete is done before we close dlm + writerClosedFuture.get(); + } executor.shutdown(); dlm.close(); } @@ -157,10 +171,10 @@ public void testNonBlockingReadAheadStall() throws Exception { confLocal.setReaderIdleErrorThresholdMillis(30000); final DistributedLogManager dlm = createNewDLM(confLocal, name); ScheduledThreadPoolExecutor executor = new ScheduledThreadPoolExecutor(1); - + ScheduledFuture writerClosedFuture = null; try { final Thread currentThread = Thread.currentThread(); - executor.schedule( + writerClosedFuture = executor.schedule( new Runnable() { @Override public void run() { @@ -183,6 +197,10 @@ public void run() { assertFalse(exceptionEncountered); assertFalse(currentThread.isInterrupted()); } finally { + if (writerClosedFuture != null){ + // ensure writer.closeAndComplete is done before we close dlm + writerClosedFuture.get(); + } executor.shutdown(); dlm.close(); }