Skip to content

Commit

Permalink
Added evicted check to output schedule, enabled eviction on stats col…
Browse files Browse the repository at this point in the history
…lectors, added ability to choose time unit on eviction schedule. Removed obsolete Output Code from MpContainer that should have been removed with #40 or #34
  • Loading branch information
catalincapota committed May 24, 2012
1 parent 987106e commit def611e
Show file tree
Hide file tree
Showing 7 changed files with 173 additions and 133 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
package com.nokia.dempsy.config;

import java.lang.reflect.Method;
import java.util.concurrent.TimeUnit;

import com.nokia.dempsy.Adaptor;
import com.nokia.dempsy.DempsyException;
Expand Down Expand Up @@ -50,7 +51,8 @@ public class ClusterDefinition
private Object outputExecuter;
private boolean adaptorIsDaemon = false;
private KeySource<?> keySource = null;
private long evictionFrequencySeconds = 600;
private long evictionFrequency = 600;
private TimeUnit evictionTimeUnit = TimeUnit.SECONDS;

private ApplicationDefinition parent;

Expand Down Expand Up @@ -168,13 +170,21 @@ public ClusterDefinition setMessageProcessorPrototype(Object messageProcessor) t
public ClusterDefinition setAdaptorDaemon(boolean isAdaptorDaemon) { this.adaptorIsDaemon = isAdaptorDaemon; return this; }
public KeySource<?> getKeySource(){ return keySource; }
public ClusterDefinition setKeySource(KeySource<?> keySource){ this.keySource = keySource; return this; }

public long getEvictionFrequencySeconds() {
return evictionFrequencySeconds;

public long getEvictionFrequency() {
return evictionFrequency;
}

public ClusterDefinition setEvictionFrequency(long evictionFrequency) {
this.evictionFrequency = evictionFrequency; return this;
}

public TimeUnit getEvictionTimeUnit() {
return evictionTimeUnit;
}

public ClusterDefinition setEvictionFrequencySeconds(long evictionFrequencySeconds) {
this.evictionFrequencySeconds = evictionFrequencySeconds; return this;
public ClusterDefinition setEvictionTimeUnit(TimeUnit timeUnit) {
this.evictionTimeUnit = timeUnit; return this;
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -87,16 +87,26 @@ public interface StatsCollector {
*/
public void preInstantiationCompleted();


/**
* Dempsy calls into this just before calling @Output methods for MPs.
*/
public void outputInvokeStarted();

/**
* empsy calls into this just after @Output methods for MPs complete.
* Dempsy calls into this just after @Output methods for MPs complete.
*/
public void outputInvokeCompleted();

/**
* Dempsy calls into this just before calling @Output methods for MPs.
*/
public void evictionPassStarted();

/**
* Dempsy calls into this just after @Output methods for MPs complete.
*/
public void evictionPassCompleted();

// FIXME
/*
Expand All @@ -117,5 +127,6 @@ public interface StatsCollector {
int getInFlightMessageCount();
double getPreInstantiationDuration();
double getOutputInvokeDuration();
double getEvictionDuration();

}
2 changes: 1 addition & 1 deletion lib-dempsyimpl/src/main/java/com/nokia/dempsy/Dempsy.java
Original file line number Diff line number Diff line change
Expand Up @@ -204,7 +204,7 @@ public void run()
t.start();
}

container.startEvictionThread(clusterDefinition.getEvictionFrequencySeconds());
container.startEvictionThread(clusterDefinition.getEvictionFrequency(), clusterDefinition.getEvictionTimeUnit());
} catch(RuntimeException e) { throw e; }
catch(Exception e) { throw new DempsyException(e); }
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -74,7 +74,7 @@ public class MpContainer implements Listener, OutputInvoker
// changes to this map will be synchronized; read-only may be concurrent
private ConcurrentHashMap<Object,InstanceWrapper> instances = new ConcurrentHashMap<Object,InstanceWrapper>();

private ScheduledExecutorService outputScheduler;
// Scheduler to handle eviction thread.
private ScheduledExecutorService evictionScheduler;

// The ClusterId is set for the sake of error messages.
Expand Down Expand Up @@ -254,10 +254,7 @@ public void shuttingDown()
}

public void shutdown()
{
if (outputScheduler != null)
outputScheduler.shutdownNow();

{
if (evictionScheduler != null)
evictionScheduler.shutdownNow();
}
Expand Down Expand Up @@ -301,43 +298,45 @@ public Object getMessageProcessor(Object key)
// Internals
//----------------------------------------------------------------------------

// this is called directly from tests but shouldn't be accessed otherwise.
protected boolean dispatch(Object message, boolean block) throws ContainerException
{
if (message == null)
return false; // No. We didn't process the null message
// this is called directly from tests but shouldn't be accessed otherwise.
protected boolean dispatch(Object message, boolean block) throws ContainerException {
if (message == null)
return false; // No. We didn't process the null message

InstanceWrapper wrapper;
wrapper = getInstanceForDispatch(message);
InstanceWrapper wrapper;
wrapper = getInstanceForDispatch(message);

boolean ret = false;
boolean ret = false;

// wrapper cannot be null ... look at the getInstanceForDispatch method
boolean gotLock = false;
try
{
Object instance = wrapper.getExclusive(block);
if (instance != null) // null indicates we didn't get the lock
{
gotLock = true;
invokeOperation(wrapper.getInstance(), Operation.handle, message);
ret = true;
}
else
{
if (logger.isTraceEnabled())
logger.trace("the container for " + clusterId + " failed to obtain lock on " + SafeString.valueOf(prototype));
statCollector.messageDiscarded(message);
}
}
finally
{
if (gotLock)
wrapper.releaseLock();
}
// wrapper cannot be null ... look at the getInstanceForDispatch method
boolean gotLock = false;

return ret;
}
try {
Object instance = wrapper.getExclusive(block);
if (instance != null) // null indicates we didn't get the lock
{
if(wrapper.isEvicted()){
logger.trace("the container for " + clusterId + " failed to obtain lock on " + SafeString.valueOf(prototype)
+ " due to eviction");
statCollector.messageDiscarded(message);
return ret;
}

gotLock = true;
invokeOperation(wrapper.getInstance(), Operation.handle, message);
ret = true;
} else {
if (logger.isTraceEnabled())
logger.trace("the container for " + clusterId + " failed to obtain lock on " + SafeString.valueOf(prototype));
statCollector.messageDiscarded(message);
}
} finally {
if (gotLock)
wrapper.releaseLock();
}

return ret;
}

/**
* Returns the instance associated with the given message, creating it if
Expand All @@ -362,6 +361,8 @@ public void evict() {
if (!prototype.isEvictableSupported())
return;

statCollector.evictionPassStarted();

for (Iterator<Object> keys = instances.keySet().iterator(); keys.hasNext();) {
Object key = keys.next();
InstanceWrapper wrapper = instances.get(key);
Expand All @@ -372,9 +373,9 @@ public void evict() {
Object instance = wrapper.getInstance();
try {
if (prototype.invokeEvictable(instance)) {
wrapper.markEvicted();
prototype.passivate(wrapper.getInstance());
wrapper.markPassivated();
wrapper.markEvicted();
instances.remove(key);
}
} catch (Throwable e) {
Expand All @@ -385,63 +386,67 @@ public void evict() {
wrapper.releaseLock();
}
}

statCollector.evictionPassCompleted();
}

public void startEvictionThread(long evictionFrequencyInSeconds) {
public void startEvictionThread(long evictionFrequency, TimeUnit timeUnit) {
if (prototype != null && prototype.isEvictableSupported()){
evictionScheduler = Executors.newSingleThreadScheduledExecutor();
evictionScheduler.scheduleWithFixedDelay(new Runnable(){ public void run(){ evict(); }}, 0, evictionFrequencyInSeconds * 1000, TimeUnit.MILLISECONDS);
evictionScheduler.scheduleWithFixedDelay(new Runnable(){ public void run(){ evict(); }}, 0, evictionFrequency, timeUnit);
}
}

public void setOutPutPass(long freqInSeconds, TimeUnit timeUnit)
{
outputScheduler = Executors.newSingleThreadScheduledExecutor();
outputScheduler.scheduleWithFixedDelay(new Runnable() { @Override public void run() { outputPass(); }}, 0, freqInSeconds, timeUnit );
}

// This method MUST NOT THROW
public void outputPass()
{
if (!prototype.isOutputSupported())
return;

// take a snapshot of the current container state.
LinkedList<InstanceWrapper> toOutput = new LinkedList<InstanceWrapper>(instances.values());

// keep going until all of the outputs have been invoked
while (toOutput.size() > 0)
{
for (Iterator<InstanceWrapper> iter = toOutput.iterator(); iter.hasNext(); )
{
InstanceWrapper wrapper = iter.next();
boolean gotLock = false;
try
{
gotLock = wrapper.tryLock();
if (gotLock)
{
Object instance = wrapper.getInstance(); // only called while holding the lock
try { invokeOperation(instance, Operation.output, null); }
catch (Throwable e) { /* The error message is logged in invokeOperation */ }
iter.remove();
}
}
finally
{
if (gotLock)
wrapper.releaseLock();
}
}
}
}

@Override
public void invokeOutput() {
statCollector.outputInvokeStarted();
outputPass();
statCollector.outputInvokeCompleted();
}
// This method MUST NOT THROW
public void outputPass() {
if (!prototype.isOutputSupported())
return;

// take a snapshot of the current container state.
LinkedList<InstanceWrapper> toOutput = new LinkedList<InstanceWrapper>(instances.values());

// keep going until all of the outputs have been invoked
while (toOutput.size() > 0) {
for (Iterator<InstanceWrapper> iter = toOutput.iterator(); iter.hasNext();) {
InstanceWrapper wrapper = iter.next();
boolean gotLock = false;

if (wrapper.isEvicted()) {
iter.remove();
continue;
}

try {
gotLock = wrapper.tryLock();

if (wrapper.isEvicted()) {
iter.remove();
continue;
} else if (gotLock) {
Object instance = wrapper.getInstance(); // only called while holding the lock
try {
invokeOperation(instance, Operation.output, null);
} catch (Throwable e) { /*
* The error message is logged
* in invokeOperation
*/
}
iter.remove();
}
} finally {
if (gotLock)
wrapper.releaseLock();
}
}
}
}

@Override
public void invokeOutput() {
statCollector.outputInvokeStarted();
outputPass();
statCollector.outputInvokeCompleted();
}

//----------------------------------------------------------------------------
// Internals
Expand Down Expand Up @@ -485,14 +490,14 @@ public InstanceWrapper getInstanceForKey(Object key) throws ContainerException
{
// common case has "no" contention
InstanceWrapper wrapper = instances.get(key);
if(wrapper != null && !wrapper.isEvicted())
if(wrapper != null)
return wrapper;

// otherwise we'll do an atomic check-and-update
synchronized (this)
{
wrapper = instances.get(key); // double checked lock?????
if (wrapper != null && !wrapper.isEvicted())
if (wrapper != null)
return wrapper;

Object instance = null;
Expand Down
Loading

0 comments on commit def611e

Please sign in to comment.