Skip to content

Commit

Permalink
#37 Investigate and where necessary fix bug suggested by Fabian with …
Browse files Browse the repository at this point in the history
…respect to out of order notifications. Ooops: the following test shows that the implementation of the MultipleWorkerBlockingQueueMonitorNotifcationServiceImpl in the '1.1.0-RELEASE' release has a bug !
  • Loading branch information
simondelabici committed Jul 16, 2018
1 parent 136a40d commit 0738f14
Showing 1 changed file with 58 additions and 0 deletions.
Expand Up @@ -15,7 +15,9 @@
import java.util.Locale;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.LockSupport;
import java.util.function.Consumer;
import java.util.logging.Level;
import java.util.logging.Logger;
Expand Down Expand Up @@ -371,6 +373,62 @@ void testSlowConsumerBehavior( String monitorNotifierImpl )
} );
}


// Provide the parameters for the 'testConsumerDeliverySequence' method.
// The test will iterate over all service implementations and with different consumer sleep times.
private static Stream<Arguments> getTestConsumerDeliverySequenceArgs()
{
return Stream.of ( Arguments.of( "SingleWorkerBlockingQueueMonitorNotificationServiceImpl", 100_000_000, 0 ),
Arguments.of( "SingleWorkerBlockingQueueMonitorNotificationServiceImpl", 100_000_000, 100 ),
Arguments.of( "SingleWorkerBlockingQueueMonitorNotificationServiceImpl", 100_000_000, 1000 ),
Arguments.of( "DisruptorMonitorNotificationServiceNewImpl", 100_000_000, 0 ),
Arguments.of( "DisruptorMonitorNotificationServiceNewImpl", 100_000_000, 100 ),
Arguments.of( "DisruptorMonitorNotificationServiceNewImpl", 100_000_000, 1000 ),
Arguments.of( "DisruptorMonitorNotificationServiceOldImpl", 100_000_000, 0 ),
Arguments.of( "DisruptorMonitorNotificationServiceOldImpl", 100_000_000, 100 ),
Arguments.of( "DisruptorMonitorNotificationServiceOldImpl", 100_000_000, 1000 ),
Arguments.of( "MultipleWorkerBlockingQueueMonitorNotificationServiceImpl", 100_000_000, 0 ),
Arguments.of( "MultipleWorkerBlockingQueueMonitorNotificationServiceImpl", 100_000_000, 100 ),
Arguments.of( "MultipleWorkerBlockingQueueMonitorNotificationServiceImpl", 100_000_000, 1000 ) );
}

@ParameterizedTest
@MethodSource( "getTestConsumerDeliverySequenceArgs")
void testConsumerDeliverySequence( String monitorNotifierImpl, int maxCount, int consumerSleepTimeInNanoSeconds )
{
logger.log( Level.INFO, String.format("Starting test with MonitorNotifier configuration '%s' and maxCount '%,d", monitorNotifierImpl, maxCount ) );

final AtomicLong lastValue = new AtomicLong(0L );
final AtomicLong errorValue = new AtomicLong(0L );
final AtomicLong numberOfErrors = new AtomicLong(0L );
final Consumer<Long> consumer = v -> {

// Pause for the requested time to apply some back pressure to the publisher
LockSupport.parkNanos( consumerSleepTimeInNanoSeconds );

//logger.log( Level.INFO, " Received: ", v.longValue() );
if ( v <= lastValue.get() )
{
//logger.log( Level.INFO, String.format( "Notified Value was ('%s') less than previous value ('%s')", v.longValue(), lastValue.get() ) );
errorValue.set( v );
numberOfErrors.incrementAndGet();
}
lastValue.set( v );
};

final MonitorNotificationServiceFactory factory = new MonitorNotificationServiceFactory( monitorNotifierImpl );
final MonitorNotificationService<? super Long> consumerNotifier = factory.getServiceForConsumer( consumer );

for ( long i = 1; i < maxCount; i++ )
{
//logger.log( Level.INFO, String.o" Publishing: ", i );
consumerNotifier.publish( i );
}

// Check that the error flag remains set at zero
assertEquals( 0L, numberOfErrors.get(), "Consumer received value(s) that were out of sequence. Number of errors: (" + String.valueOf( numberOfErrors.get() ) + "). Last error value: (" + String.valueOf( errorValue.get() ) + ")" );
}

private void busyWait( int timeInMilliseconds )
{
long startTime = System.nanoTime();
Expand Down

0 comments on commit 0738f14

Please sign in to comment.