Skip to content
This repository has been archived by the owner on Jan 5, 2022. It is now read-only.

Commit

Permalink
Another counter concurrency fix, plus test stabilization changes
Browse files Browse the repository at this point in the history
  • Loading branch information
snoopdave committed Oct 11, 2016
1 parent 9f2863f commit 041109f
Show file tree
Hide file tree
Showing 3 changed files with 53 additions and 48 deletions.
Expand Up @@ -123,25 +123,29 @@ public void incrementCounter(String queueName, DatabaseQueueMessage.Type type, l


synchronized ( inMemoryCounters ) { synchronized ( inMemoryCounters ) {


if ( inMemoryCounters.get( key ) == null ) { if (inMemoryCounters.get( key ) == null) {


Long value = retrieveCounterFromStorage( queueName, type ); Long value = retrieveCounterFromStorage( queueName, type );


if ( value == null ) { if (value == null) {
incrementCounterInStorage( queueName, type, 0L ); incrementCounterInStorage( queueName, type, 0L );
inMemoryCounters.put( key, new InMemoryCount( 0L )); inMemoryCounters.put( key, new InMemoryCount( 0L ) );
} else { } else {
inMemoryCounters.put( key, new InMemoryCount( value )); inMemoryCounters.put( key, new InMemoryCount( value ) );
} }
} }
}

InMemoryCount inMemoryCount = inMemoryCounters.get( key );


InMemoryCount inMemoryCount = inMemoryCounters.get( key ); synchronized ( inMemoryCount ) {
inMemoryCount.getIncrement().addAndGet( increment ); inMemoryCount.getIncrement().addAndGet( increment );


// logger.info("Incremented Count for queue {} type {} = {}", //logger.info("Incremented Count for queue {} type {} = {}",
// queueName, type, getCounterValue( queueName, type )); //queueName, type, getCounterValue( queueName, type ));

saveIfNeeded( queueName, type );
} }
saveIfNeeded( queueName, type );
} }




Expand All @@ -152,25 +156,30 @@ public void decrementCounter(String queueName, DatabaseQueueMessage.Type type, l


synchronized ( inMemoryCounters ) { synchronized ( inMemoryCounters ) {


if ( inMemoryCounters.get( key ) == null ) { if (inMemoryCounters.get( key ) == null) {


Long value = retrieveCounterFromStorage( queueName, type ); Long value = retrieveCounterFromStorage( queueName, type );


if ( value == null ) { if (value == null) {
decrementCounterInStorage( queueName, type, 0L ); decrementCounterInStorage( queueName, type, 0L );
inMemoryCounters.put( key, new InMemoryCount( 0L )); inMemoryCounters.put( key, new InMemoryCount( 0L ) );
} else { } else {
inMemoryCounters.put( key, new InMemoryCount( value )); inMemoryCounters.put( key, new InMemoryCount( value ) );
} }
} }
}

InMemoryCount inMemoryCount = inMemoryCounters.get( key );

synchronized ( inMemoryCount ) {


InMemoryCount inMemoryCount = inMemoryCounters.get( key );
inMemoryCount.getDecrement().addAndGet( decrement ); inMemoryCount.getDecrement().addAndGet( decrement );


// logger.info("Decremented Count for queue {} type {} = {}", //logger.info("Decremented Count for queue {} type {} = {}",
// queueName, type, getCounterValue( queueName, type )); //queueName, type, getCounterValue( queueName, type ));

saveIfNeeded( queueName, type );
} }
saveIfNeeded( queueName, type );
} }




Expand All @@ -179,19 +188,16 @@ public long getCounterValue( String queueName, DatabaseQueueMessage.Type type )


String key = buildKey( queueName, type ); String key = buildKey( queueName, type );


synchronized ( inMemoryCounters ) { if ( inMemoryCounters.get( key ) == null ) {


if ( inMemoryCounters.get( key ) == null ) { Long value = retrieveCounterFromStorage( queueName, type );


Long value = retrieveCounterFromStorage( queueName, type ); if ( value == null ) {

throw new NotFoundException(
if ( value == null ) { MessageFormat.format( "No counter found for queue {0} type {1}",
throw new NotFoundException( queueName, type ));
MessageFormat.format( "No counter found for queue {0} type {1}", } else {
queueName, type )); inMemoryCounters.put( key, new InMemoryCount( value ));
} else {
inMemoryCounters.put( key, new InMemoryCount( value ));
}
} }
} }


Expand Down Expand Up @@ -245,22 +251,19 @@ private void saveIfNeeded( String queueName, DatabaseQueueMessage.Type type ) {


InMemoryCount inMemoryCount = inMemoryCounters.get( key ); InMemoryCount inMemoryCount = inMemoryCounters.get( key );


synchronized ( inMemoryCount ) { if ( numChanges.incrementAndGet() > maxChangesBeforeSave ) {

if ( numChanges.incrementAndGet() > maxChangesBeforeSave ) {


long totalIncrement = inMemoryCount.getIncrement().get(); long totalIncrement = inMemoryCount.getIncrement().get();
incrementCounterInStorage( queueName, type, totalIncrement ); incrementCounterInStorage( queueName, type, totalIncrement );


long totalDecrement = inMemoryCount.getDecrement().get(); long totalDecrement = inMemoryCount.getDecrement().get();
decrementCounterInStorage( queueName, type, totalDecrement ); decrementCounterInStorage( queueName, type, totalDecrement );


inMemoryCount.setBaseCount( retrieveCounterFromStorage( queueName, type ) ); inMemoryCount.setBaseCount( retrieveCounterFromStorage( queueName, type ) );
inMemoryCount.getIncrement().set( 0L ); inMemoryCount.getIncrement().set( 0L );
inMemoryCount.getDecrement().set( 0L ); inMemoryCount.getDecrement().set( 0L );


numChanges.set( 0 ); numChanges.set( 0 );
}
} }
} }


Expand Down
Expand Up @@ -152,32 +152,31 @@ public void testGetMultipleQueueMessages() throws InterruptedException {
queueName, region, region, messageId, null, null ); queueName, region, region, messageId, null, null );
} }


int maxRetries = 25; int maxRetries = 10;
int retries = 0; int retries = 0;
int count = 0; long count = 0;
while (retries++ < maxRetries) { while (retries++ < maxRetries) {
distributedQueueService.refresh(); distributedQueueService.refresh();
if ( queueMessageManager.getQueueDepth( queueName ) == 100 ) { count = queueMessageManager.getQueueDepth( queueName );
count = 100; if ( count == 100 ) {
break; break;
} }
count = inMemoryQueue.size( queueName );
Thread.sleep( 1000 ); Thread.sleep( 1000 );
} }


Assert.assertEquals( 100, count ); Assert.assertEquals( 100, count );


Assert.assertEquals( 25, distributedQueueService.getNextMessages( queueName, 25 ).size() ); Assert.assertEquals( 25, distributedQueueService.getNextMessages( queueName, 25 ).size() );
Assert.assertEquals( 75, inMemoryQueue.size( queueName ) ); Assert.assertEquals( 75, queueMessageManager.getQueueDepth( queueName ) );


Assert.assertEquals( 25, distributedQueueService.getNextMessages( queueName, 25 ).size() ); Assert.assertEquals( 25, distributedQueueService.getNextMessages( queueName, 25 ).size() );
Assert.assertEquals( 50, inMemoryQueue.size( queueName ) ); Assert.assertEquals( 50, queueMessageManager.getQueueDepth( queueName ) );


Assert.assertEquals( 25, distributedQueueService.getNextMessages( queueName, 25 ).size() ); Assert.assertEquals( 25, distributedQueueService.getNextMessages( queueName, 25 ).size() );
Assert.assertEquals( 25, inMemoryQueue.size( queueName ) ); Assert.assertEquals( 25, queueMessageManager.getQueueDepth( queueName ) );


Assert.assertEquals( 25, distributedQueueService.getNextMessages( queueName, 25 ).size() ); Assert.assertEquals( 25, distributedQueueService.getNextMessages( queueName, 25 ).size() );
Assert.assertEquals( 0, inMemoryQueue.size( queueName ) ); Assert.assertEquals( 0, queueMessageManager.getQueueDepth( queueName ) );


distributedQueueService.shutdown(); distributedQueueService.shutdown();


Expand Down
Expand Up @@ -40,6 +40,9 @@ queue.num.actors=50
queue.sender.num.actors=100 queue.sender.num.actors=100
queue.writer.num.actors=100 queue.writer.num.actors=100


queue.send.timeout.seconds=5
queue.get.timeout.seconds=5

# set shard size and times low for testing purposes # set shard size and times low for testing purposes
queue.shard.max.size=10 queue.shard.max.size=10
queue.shard.allocation.check.frequency.millis=100 queue.shard.allocation.check.frequency.millis=100
Expand Down

0 comments on commit 041109f

Please sign in to comment.