diff --git a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java index 2eb482aa07..ee4bab2ca8 100644 --- a/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java +++ b/stack/corepersistence/queue/src/main/java/org/apache/usergrid/persistence/qakka/serialization/queuemessages/impl/MessageCounterSerializationImpl.java @@ -123,25 +123,29 @@ public void incrementCounter(String queueName, DatabaseQueueMessage.Type type, l synchronized ( inMemoryCounters ) { - if ( inMemoryCounters.get( key ) == null ) { + if (inMemoryCounters.get( key ) == null) { Long value = retrieveCounterFromStorage( queueName, type ); - if ( value == null ) { + if (value == null) { incrementCounterInStorage( queueName, type, 0L ); - inMemoryCounters.put( key, new InMemoryCount( 0L )); + inMemoryCounters.put( key, new InMemoryCount( 0L ) ); } else { - inMemoryCounters.put( key, new InMemoryCount( value )); + inMemoryCounters.put( key, new InMemoryCount( value ) ); } } + } + + InMemoryCount inMemoryCount = inMemoryCounters.get( key ); - InMemoryCount inMemoryCount = inMemoryCounters.get( key ); + synchronized ( inMemoryCount ) { inMemoryCount.getIncrement().addAndGet( increment ); -// logger.info("Incremented Count for queue {} type {} = {}", -// queueName, type, getCounterValue( queueName, type )); + //logger.info("Incremented Count for queue {} type {} = {}", + //queueName, type, getCounterValue( queueName, type )); + + saveIfNeeded( queueName, type ); } - saveIfNeeded( queueName, type ); } @@ -152,25 +156,30 @@ public void decrementCounter(String queueName, DatabaseQueueMessage.Type type, l synchronized ( inMemoryCounters ) { - if ( inMemoryCounters.get( key ) == null ) { + if (inMemoryCounters.get( key ) == null) { Long value = retrieveCounterFromStorage( queueName, type ); - if ( value == null ) { + if (value == null) { decrementCounterInStorage( queueName, type, 0L ); - inMemoryCounters.put( key, new InMemoryCount( 0L )); + inMemoryCounters.put( key, new InMemoryCount( 0L ) ); } else { - inMemoryCounters.put( key, new InMemoryCount( value )); + inMemoryCounters.put( key, new InMemoryCount( value ) ); } } + } + + InMemoryCount inMemoryCount = inMemoryCounters.get( key ); + + synchronized ( inMemoryCount ) { - InMemoryCount inMemoryCount = inMemoryCounters.get( key ); inMemoryCount.getDecrement().addAndGet( decrement ); -// logger.info("Decremented Count for queue {} type {} = {}", -// queueName, type, getCounterValue( queueName, type )); + //logger.info("Decremented Count for queue {} type {} = {}", + //queueName, type, getCounterValue( queueName, type )); + + saveIfNeeded( queueName, type ); } - saveIfNeeded( queueName, type ); } @@ -179,19 +188,16 @@ public long getCounterValue( String queueName, DatabaseQueueMessage.Type type ) String key = buildKey( queueName, type ); - synchronized ( inMemoryCounters ) { + if ( inMemoryCounters.get( key ) == null ) { - if ( inMemoryCounters.get( key ) == null ) { + Long value = retrieveCounterFromStorage( queueName, type ); - Long value = retrieveCounterFromStorage( queueName, type ); - - if ( value == null ) { - throw new NotFoundException( - MessageFormat.format( "No counter found for queue {0} type {1}", - queueName, type )); - } else { - inMemoryCounters.put( key, new InMemoryCount( value )); - } + if ( value == null ) { + throw new NotFoundException( + MessageFormat.format( "No counter found for queue {0} type {1}", + queueName, type )); + } else { + inMemoryCounters.put( key, new InMemoryCount( value )); } } @@ -245,22 +251,19 @@ private void saveIfNeeded( String queueName, DatabaseQueueMessage.Type type ) { InMemoryCount inMemoryCount = inMemoryCounters.get( key ); - synchronized ( inMemoryCount ) { - - if ( numChanges.incrementAndGet() > maxChangesBeforeSave ) { + if ( numChanges.incrementAndGet() > maxChangesBeforeSave ) { - long totalIncrement = inMemoryCount.getIncrement().get(); - incrementCounterInStorage( queueName, type, totalIncrement ); + long totalIncrement = inMemoryCount.getIncrement().get(); + incrementCounterInStorage( queueName, type, totalIncrement ); - long totalDecrement = inMemoryCount.getDecrement().get(); - decrementCounterInStorage( queueName, type, totalDecrement ); + long totalDecrement = inMemoryCount.getDecrement().get(); + decrementCounterInStorage( queueName, type, totalDecrement ); - inMemoryCount.setBaseCount( retrieveCounterFromStorage( queueName, type ) ); - inMemoryCount.getIncrement().set( 0L ); - inMemoryCount.getDecrement().set( 0L ); + inMemoryCount.setBaseCount( retrieveCounterFromStorage( queueName, type ) ); + inMemoryCount.getIncrement().set( 0L ); + inMemoryCount.getDecrement().set( 0L ); - numChanges.set( 0 ); - } + numChanges.set( 0 ); } } diff --git a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java index 7fe8b1616f..f5512e566a 100644 --- a/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java +++ b/stack/corepersistence/queue/src/test/java/org/apache/usergrid/persistence/qakka/distributed/QueueActorServiceTest.java @@ -152,32 +152,31 @@ public void testGetMultipleQueueMessages() throws InterruptedException { queueName, region, region, messageId, null, null ); } - int maxRetries = 25; + int maxRetries = 10; int retries = 0; - int count = 0; + long count = 0; while (retries++ < maxRetries) { distributedQueueService.refresh(); - if ( queueMessageManager.getQueueDepth( queueName ) == 100 ) { - count = 100; + count = queueMessageManager.getQueueDepth( queueName ); + if ( count == 100 ) { break; } - count = inMemoryQueue.size( queueName ); Thread.sleep( 1000 ); } Assert.assertEquals( 100, count ); Assert.assertEquals( 25, distributedQueueService.getNextMessages( queueName, 25 ).size() ); - Assert.assertEquals( 75, inMemoryQueue.size( queueName ) ); + Assert.assertEquals( 75, queueMessageManager.getQueueDepth( queueName ) ); Assert.assertEquals( 25, distributedQueueService.getNextMessages( queueName, 25 ).size() ); - Assert.assertEquals( 50, inMemoryQueue.size( queueName ) ); + Assert.assertEquals( 50, queueMessageManager.getQueueDepth( queueName ) ); Assert.assertEquals( 25, distributedQueueService.getNextMessages( queueName, 25 ).size() ); - Assert.assertEquals( 25, inMemoryQueue.size( queueName ) ); + Assert.assertEquals( 25, queueMessageManager.getQueueDepth( queueName ) ); Assert.assertEquals( 25, distributedQueueService.getNextMessages( queueName, 25 ).size() ); - Assert.assertEquals( 0, inMemoryQueue.size( queueName ) ); + Assert.assertEquals( 0, queueMessageManager.getQueueDepth( queueName ) ); distributedQueueService.shutdown(); diff --git a/stack/corepersistence/queue/src/test/resources/qakka.properties b/stack/corepersistence/queue/src/test/resources/qakka.properties index 142138dd2f..95b2509300 100644 --- a/stack/corepersistence/queue/src/test/resources/qakka.properties +++ b/stack/corepersistence/queue/src/test/resources/qakka.properties @@ -40,6 +40,9 @@ queue.num.actors=50 queue.sender.num.actors=100 queue.writer.num.actors=100 +queue.send.timeout.seconds=5 +queue.get.timeout.seconds=5 + # set shard size and times low for testing purposes queue.shard.max.size=10 queue.shard.allocation.check.frequency.millis=100