Skip to content
This repository has been archived by the owner on Jan 5, 2022. It is now read-only.

Commit

Permalink
Minor refactoring, renaming and debug logging changes.
Browse files Browse the repository at this point in the history
  • Loading branch information
snoopdave committed Oct 13, 2016
1 parent f56e1b0 commit f8c3a2d
Show file tree
Hide file tree
Showing 9 changed files with 70 additions and 48 deletions.
Expand Up @@ -24,13 +24,9 @@
import org.safehaus.guicyfig.Key;


/**
*
*/
@FigSingleton
public interface AsyncEventsSchedulerFig extends GuicyFig {


/**
* Amount of threads to use in async processing
*/
Expand All @@ -42,25 +38,22 @@ public interface AsyncEventsSchedulerFig extends GuicyFig {
*/
String IO_SCHEDULER_NAME = "scheduler.io.poolName";


/**
* Amount of threads to use in async processing
*/
String REPAIR_SCHEDULER_THREADS = "repair.io.threads";


/**
* Name of pool to use when performing scheduling
*/
String REPAIR_SCHEDULER_NAME = "repair.io.poolName";



@Default( "40" )
@Key( IO_SCHEDULER_THREADS )
int getMaxIoThreads();

@Default( "Usergrid-SQS-Pool" )
@Default( "Usergrid-Queue-Worker-Pool" )
@Key( IO_SCHEDULER_NAME )
String getIoSchedulerName();

Expand Down
Expand Up @@ -52,9 +52,9 @@ public interface QakkaFig extends GuicyFig, Serializable {

String QUEUE_GET_TIMEOUT = "queue.get.timeout.seconds";

String QUEUE_MAX_SHARD_COUNTER = "queue.max.inmemory.max.shard.counter";
String QUEUE_SHARD_COUNTER_MAX_IN_MEMORY = "queue.shard.counter.max-in-memory";

String QUEUE_MAX_MESSAGE_CHANGES = "queue.max.inmemory.max.message.changes";
String QUEUE_MESSAGE_COUNTER_MAX_IN_MEMORY = "queue.message.counter.max-in-memory";

String QUEUE_SHARD_ALLOCATION_CHECK_FREQUENCY = "queue.shard.allocation.check.frequency.millis";

Expand Down Expand Up @@ -123,14 +123,14 @@ public interface QakkaFig extends GuicyFig, Serializable {
int getSendTimeoutSeconds();

/** Once counter reaches this value, write it to permanent storage */
@Key(QUEUE_MAX_SHARD_COUNTER)
@Key(QUEUE_SHARD_COUNTER_MAX_IN_MEMORY)
@Default("100")
long getMaxInMemoryShardCounter();
long getShardCounterMaxInMemory();

/** Once counter reaches this value, write it to permanent storage */
@Key(QUEUE_MAX_MESSAGE_CHANGES)
@Key(QUEUE_MESSAGE_COUNTER_MAX_IN_MEMORY)
@Default("100")
long getMaxInMemoryMessageCounter();
long getMessageCounterMaxInMemory();

/** How often to check whether new shard is needed for each queue */
@Key(QUEUE_SHARD_ALLOCATION_CHECK_FREQUENCY)
Expand Down
Expand Up @@ -125,6 +125,7 @@ private void checkLatestShard( String queueName, Shard.Type type ) {
long counterValue = 0;
try {
counterValue = shardCounterSerialization.getCounterValue( queueName, type, shard.getShardId() );

} catch ( NotFoundException ignored ) {}

if (counterValue > (0.9 * qakkaFig.getMaxShardSize())) {
Expand All @@ -140,6 +141,10 @@ private void checkLatestShard( String queueName, Shard.Type type ) {

logger.info("{} Created new shard for queue {} shardId {} timestamp {} counterValue {}",
this.hashCode(), queueName, shard.getShardId(), futureUUID.timestamp(), counterValue );

} else {
// logger.debug("No new shard for queue {} counterValue {} of max {}",
// queueName, counterValue, qakkaFig.getMaxShardSize() );
}

} catch ( Throwable t ) {
Expand Down
Expand Up @@ -241,9 +241,9 @@ public Collection<DatabaseQueueMessage> getNextMessages( String queueName, int c
}
}

if ( ret.isEmpty() ) {
logger.info( "Requested {} but queue '{}' is empty", count, queueName);
}
// if ( ret.isEmpty() ) {
// logger.info( "Requested {} but queue '{}' is empty", count, queueName);
// }
return ret;

} finally {
Expand Down
Expand Up @@ -78,19 +78,35 @@ public class MessageCounterSerializationImpl implements MessageCounterSerializat

class InMemoryCount {
long baseCount;
final AtomicLong totalInMemoryCount = new AtomicLong( 0L ); // for testing using only in-memory counter
final AtomicLong increment = new AtomicLong( 0L );
final AtomicLong decrement = new AtomicLong( 0L );

InMemoryCount( long baseCount ) {
this.baseCount = baseCount;
}
public AtomicLong getIncrement() {
return increment;
public void increment( long inc ) {
increment.addAndGet( inc );
totalInMemoryCount.addAndGet( inc );
}
public AtomicLong getDecrement() {
return decrement;
public void decrement( long dec ) {
decrement.addAndGet( dec );
totalInMemoryCount.addAndGet( -dec );
}
public long getIncrement() {
return increment.get();
}
public long getDecrement() {
return decrement.get();
}
public void clearDeltas() {
increment.set( 0L );
decrement.set( 0L );
}
public long value() {

// return totalInMemoryCount.get(); // for testing using just in-memory counter:

return baseCount + increment.get() - decrement.get();
}
void setBaseCount( long baseCount ) {
Expand All @@ -106,7 +122,7 @@ public MessageCounterSerializationImpl(
CassandraConfig cassandraConfig, QakkaFig qakkaFig, CassandraClient cassandraClient ) {

this.cassandraConfig = cassandraConfig;
this.maxChangesBeforeSave = qakkaFig.getMaxInMemoryMessageCounter();
this.maxChangesBeforeSave = qakkaFig.getMessageCounterMaxInMemory();
this.cassandraClient = cassandraClient;
}

Expand Down Expand Up @@ -139,13 +155,16 @@ public void incrementCounter(String queueName, DatabaseQueueMessage.Type type, l
InMemoryCount inMemoryCount = inMemoryCounters.get( key );

synchronized ( inMemoryCount ) {
inMemoryCount.getIncrement().addAndGet( increment );

//logger.info("Incremented Count for queue {} type {} = {}",
//queueName, type, getCounterValue( queueName, type ));

inMemoryCount.increment( increment );
saveIfNeeded( queueName, type );
}

if ( logger.isDebugEnabled() ) {
long value = inMemoryCounters.get( key ).value();
if (value <= 0) {
logger.debug( "Queue {} type {} decremented count = {}", queueName, type, value );
}
}
}


Expand All @@ -172,14 +191,16 @@ public void decrementCounter(String queueName, DatabaseQueueMessage.Type type, l
InMemoryCount inMemoryCount = inMemoryCounters.get( key );

synchronized ( inMemoryCount ) {

inMemoryCount.getDecrement().addAndGet( decrement );

//logger.info("Decremented Count for queue {} type {} = {}",
//queueName, type, getCounterValue( queueName, type ));

inMemoryCount.decrement( decrement );
saveIfNeeded( queueName, type );
}

if ( logger.isDebugEnabled() ) {
long value = inMemoryCounters.get( key ).value();
if (value <= 0) {
logger.debug( "Queue {} type {} incremented count = {}", queueName, type, value );
}
}
}


Expand All @@ -194,14 +215,14 @@ public long getCounterValue( String queueName, DatabaseQueueMessage.Type type )

if ( value == null ) {
throw new NotFoundException(
MessageFormat.format( "No counter found for queue {0} type {1}",
queueName, type ));
MessageFormat.format( "No counter found for queue {0} type {1}", queueName, type ));
} else {
inMemoryCounters.put( key, new InMemoryCount( value ));
}
}

return inMemoryCounters.get( key ).value();
long value = inMemoryCounters.get( key ).value();
return value;
}


Expand Down Expand Up @@ -253,15 +274,18 @@ private void saveIfNeeded( String queueName, DatabaseQueueMessage.Type type ) {

if ( numChanges.incrementAndGet() > maxChangesBeforeSave ) {

long totalIncrement = inMemoryCount.getIncrement().get();
long totalIncrement = inMemoryCount.getIncrement();
incrementCounterInStorage( queueName, type, totalIncrement );

long totalDecrement = inMemoryCount.getDecrement().get();
long totalDecrement = inMemoryCount.getDecrement();
decrementCounterInStorage( queueName, type, totalDecrement );

inMemoryCount.setBaseCount( retrieveCounterFromStorage( queueName, type ) );
inMemoryCount.getIncrement().set( 0L );
inMemoryCount.getDecrement().set( 0L );
long baseCount = retrieveCounterFromStorage( queueName, type );

logger.debug("Writing queue counter {} type {} to storage count = {}", queueName, type, baseCount );

inMemoryCount.setBaseCount( baseCount );
inMemoryCount.clearDeltas();

numChanges.set( 0 );
}
Expand Down
Expand Up @@ -179,7 +179,7 @@ public DatabaseQueueMessage loadMessage(
Shard.Type shardType = DatabaseQueueMessage.Type.DEFAULT.equals( type ) ?
Shard.Type.DEFAULT : Shard.Type.INFLIGHT;
Shard shard = shardStrategy.selectShard(
queueName, actorSystemFig.getRegionLocal(), shardType, queueMessageId );
queueName, region, shardType, queueMessageId );
shardId = shard.getShardId();
} else {
shardId = shardIdOrNull;
Expand Down
Expand Up @@ -94,7 +94,7 @@ void setBaseCount( long baseCount ) {
public ShardCounterSerializationImpl(
CassandraConfig cassandraConfig, QakkaFig qakkaFig, CassandraClient cassandraClient ) {
this.cassandraConfig = cassandraConfig;
this.maxInMemoryIncrement = qakkaFig.getMaxInMemoryShardCounter();
this.maxInMemoryIncrement = qakkaFig.getShardCounterMaxInMemory();
this.cassandraClient = cassandraClient;
}

Expand Down
Expand Up @@ -24,7 +24,6 @@ log4j.appender.stdout.layout.ConversionPattern=%d %p (%t) %c{1} - %m%n
log4j.logger.org.apache.cassandra=WARN
log4j.logger.org.glassfish=WARN

log4j.logger.org.apache.usergrid=INFO
#log4j.logger.org.apache.usergrid.persistence.qakka=DEBUG
#log4j.logger.org.apache.usergrid.persistence.queue=DEBUG
log4j.logger.org.apache.usergrid.persistence.qakka=INFO
log4j.logger.org.apache.usergrid.persistence.queue=INFO
log4j.logger.org.apache.usergrid.corepersistence.asyncevents=INFO
Expand Up @@ -48,8 +48,9 @@ queue.shard.max.size=10
queue.shard.allocation.check.frequency.millis=100
queue.shard.allocation.advance.time.millis=200

queue.max.inmemory.shard.counter = 100
queue.max.inmemory.max.message.changes=3
# set low for testing purposes
queue.shard.counter.max-in-memory=10
queue.message.counter.max-in-memory=10

queue.long.polling.time.millis=2000

Expand Down

0 comments on commit f8c3a2d

Please sign in to comment.