Skip to content

Commit

Permalink
MONDRIAN: [MONDRIAN-861] Refactors the Segmentcache access into a wor…
Browse files Browse the repository at this point in the history
…ker class in order to simplify and centralize the error handling rules. Adds a method to SegmentCache so that Mondrian can get a list of all available segments in cache. Adds more properties to control the scan timeouts.

[git-p4: depot-paths = "//open/mondrian/": change = 14095]
  • Loading branch information
lucboudreau committed Feb 4, 2011
1 parent 778fd8e commit c1bdadc
Show file tree
Hide file tree
Showing 6 changed files with 378 additions and 182 deletions.
13 changes: 13 additions & 0 deletions src/main/mondrian/olap/MondrianProperties.java
Expand Up @@ -477,6 +477,19 @@ private void load(final PropertySource source) {
new IntegerProperty(
this, "mondrian.rolap.SegmentCacheLookupTimeout", 5000);

/**
* Property which defines the timeout for
* {@link mondrian.rolap.agg.SegmentCache#getSegmentHeaders()}
* in milliseconds. Defaults to 5000.
*
* <p>This is an internal control property. The timeout value
* won't be passed to the underlying
* {@link mondrian.rolap.agg.SegmentCache} SPI.
*/
public transient final IntegerProperty SegmentCacheScanTimeout =
new IntegerProperty(
this, "mondrian.rolap.SegmentCacheScanTimeout", 5000);

/**
* Property which tells the SegmentLoader to throw an exception
* if an exception is encountered while loading segments from
Expand Down
12 changes: 12 additions & 0 deletions src/main/mondrian/resource/MondrianResource.xml
Expand Up @@ -794,6 +794,12 @@
</text>
</exception>

<exception id="7000633" name="SegmentCacheFailedToScanSegments">
<text>
An exception was encountered while getting a list of segment headers in the SegmentCache.
</text>
</exception>

<exception id="7000650" name="SegmentCacheReadTimeout">
<text>
Timeout reached while reading segment from SegmentCache.
Expand All @@ -812,6 +818,12 @@
</text>
</exception>

<exception id="7000653" name="SegmentCacheScanTimeout">
<text>
Timeout reached while getting a list of segment headers from the SegmentCache.
</text>
</exception>

<!-- ====================================================================== -->
<!-- Optimizer -->
<exception id="8000000" name="NativeEvaluationUnsupported"
Expand Down
26 changes: 18 additions & 8 deletions src/main/mondrian/rolap/agg/SegmentCache.java
Expand Up @@ -16,18 +16,23 @@
import mondrian.olap.MondrianProperties;

/**
* SPI definition of the segments cache. Implementations are
* expected to be thread-safe. It is the responsibility of the
* cache implementation to maintain a consistent state.
*
* <p>Implementations must provide a default empty constructor.
* It will be called by each segment loader. The SegmentCache
* objects are spawned often. We recommend using
* a facade object which points to a singleton cache instance.
* SPI definition of the segments cache.
*
* <p>Lookups are performed using {@link SegmentHeader}s and
* {@link SegmentBody}s. Both are immutable and fully serializable.
*
* <p>Implementations are expected to be thread-safe.
* It is the responsibility of the cache implementation
* to maintain a consistent state.
*
* <p>Implementations must provide a default empty constructor.
* Segment caches are instantiated as a singleton but can be
* hot swapped by modifying {@link MondrianProperties#SegmentCache}.
* Implementations will get a termination signal through
* {@link SegmentCache#tearDown()} but Mondrian will relinquish
* control of the termination thread and will not be listening
* to thrown exceptions.
*
* @see MondrianProperties#SegmentCache
* @author LBoudreau
* @version $Id$
Expand Down Expand Up @@ -68,5 +73,10 @@ public interface SegmentCache {
* @param body The segment body to cache.
*/
Future<Boolean> put(SegmentHeader header, SegmentBody body);

/**
* Tear down and clean up the cache.
*/
void tearDown();
}
// End SegmentCache.java
265 changes: 265 additions & 0 deletions src/main/mondrian/rolap/agg/SegmentCacheWorker.java
@@ -0,0 +1,265 @@
package mondrian.rolap.agg;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import mondrian.olap.MondrianProperties;
import mondrian.resource.MondrianResource;

import org.apache.log4j.Logger;
import org.eigenbase.util.property.Property;
import org.eigenbase.util.property.TriggerBase;

final class SegmentCacheWorker {

private final static Logger LOGGER =
Logger.getLogger(SegmentCacheWorker.class);
private static SegmentCache segmentCache = null;
private final static ExecutorService executor =
Executors.newCachedThreadPool();

static {
final String cacheName =
MondrianProperties.instance().SegmentCache.get();
if (cacheName != null) {
setCache(cacheName);
}
// Rig up a trigger to that property to hot-swap the cache.
MondrianProperties.instance().SegmentCache.addTrigger(
new TriggerBase(true) {
public void execute(Property property, String value) {
setCache(value);
}
}
);
}

private static final void setCache(String cacheName) {
try {
final SegmentCache scopedSC = segmentCache;
if (scopedSC != null
&& scopedSC.getClass().equals(cacheName))
{
// No need to reload the cache.
// It's the same property value.
return;
}
if (scopedSC != null) {
executor.submit(
new Runnable() {
public void run() {
LOGGER.debug("Tearing down segment cache.");
scopedSC.tearDown();
}
});
}
segmentCache = null;
LOGGER.debug("Starting cache instance:" + cacheName);
Class<?> clazz =
Class.forName(cacheName);
Object scObject = clazz.newInstance();
if (scObject instanceof SegmentCache) {
segmentCache = (SegmentCache) scObject;
} else {
LOGGER.error(
MondrianResource.instance()
.SegmentCacheIsNotImplementingInterface
.baseMessage);
}
} catch (Exception e) {
LOGGER.error(
MondrianResource.instance()
.SegmentCacheFailedToInstanciate.baseMessage,
e);
if (MondrianProperties.instance()
.SegmentCacheFailOnError.get())
{
throw MondrianResource.instance()
.SegmentCacheFailedToInstanciate.ex(e);
}
}
}

/**
* Returns a segment body corresponding to a header.
* <p>If no cache is configured or there is an error while
* querying the cache, null is returned none the less.
* To throw an exception, enable
* {@link MondrianProperties#SegmentCacheFailOnError} To adjust
* timeout values, set {@link MondrianProperties#SegmentCacheReadTimeout}
* @param header Header to search.
* @return Either a segment body object or null if there
* was no cache configured or no segment could be found
* for the passed header.
*/
public final static SegmentBody get(SegmentHeader header) {
if (segmentCache != null) {
try {
return segmentCache.get(header)
.get(
MondrianProperties.instance()
.SegmentCacheReadTimeout.get(),
TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
LOGGER.error(
MondrianResource.instance()
.SegmentCacheReadTimeout.baseMessage,
e);
throw MondrianResource.instance()
.SegmentCacheReadTimeout.ex(e);
} catch (Throwable t) {
LOGGER.error(
MondrianResource.instance()
.SegmentCacheFailedToLoadSegment
.baseMessage,
t);
if (MondrianProperties.instance()
.SegmentCacheFailOnError.get())
{
throw MondrianResource.instance()
.SegmentCacheFailedToLoadSegment.ex(t);
}
}
}
return null;
}

/**
* Returns whether there is a cached segment body available
* for a given segment header.
*
* <p>If no cache is configured or there is an error while
* querying the cache, false is returned none the less.
* To throw an exception, enable
* {@link MondrianProperties#SegmentCacheFailOnError} To adjust
* timeout values, set {@link MondrianProperties#SegmentCacheLookupTimeout}
* @param header A header to search for in the segment cache.
* @return True or false, whether there is a segment body
* available in a segment cache.
*/
public final static Boolean contains(SegmentHeader header) {
if (segmentCache != null) {
try {
return segmentCache.contains(header)
.get(
MondrianProperties.instance()
.SegmentCacheLookupTimeout.get(),
TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
LOGGER.error(
MondrianResource.instance()
.SegmentCacheLookupTimeout.baseMessage,
e);
throw MondrianResource.instance()
.SegmentCacheLookupTimeout.ex(e);
} catch (Throwable t) {
LOGGER.error(
MondrianResource.instance()
.SegmentCacheFailedToLookupSegment.baseMessage,
t);
if (MondrianProperties.instance()
.SegmentCacheFailOnError.get())
{
throw MondrianResource.instance()
.SegmentCacheFailedToLookupSegment.ex(t);
}
}
}
return false;
}

/**
* Places a segment in the cache. Returns true or false
* if the operation succeeds.
*
* <p>If no cache is configured or there is an error while
* querying the cache, false is returned none the less.
* To throw an exception, enable
* {@link MondrianProperties#SegmentCacheFailOnError} To adjust
* timeout values, set {@link MondrianProperties#SegmentCacheWriteTimeout}
* @param header A header to search for in the segment cache.
* @return True or false, whether there is a segment body
* available in a segment cache.
*/
public final static Boolean put(SegmentHeader header, SegmentBody body) {
if (segmentCache != null) {
try {
return segmentCache.put(header, body)
.get(
MondrianProperties.instance()
.SegmentCacheWriteTimeout.get(),
TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
LOGGER.error(
MondrianResource.instance()
.SegmentCacheReadTimeout.baseMessage,
e);
if (MondrianProperties.instance()
.SegmentCacheFailOnError.get())
{
throw MondrianResource.instance()
.SegmentCacheReadTimeout.ex(e);
}
} catch (Throwable t) {
LOGGER.error("Failed to save segment to cache.", t);
if (MondrianProperties.instance()
.SegmentCacheFailOnError.get())
{
throw MondrianResource.instance()
.SegmentCacheFailedToSaveSegment.ex(t);
}
}
}
return false;
}

/**
* Returns a list of segments present in the cache.
*
* <p>If no cache is configured or there is an error while
* querying the cache, an empty list is returned none the less.
* To throw an exception, enable
* {@link MondrianProperties#SegmentCacheFailOnError} To adjust
* timeout values, set {@link MondrianProperties#SegmentCacheScanTimeout}
*
* @param header Header to search.
* @return Either a segment body object or null if there
* was no cache configured or no segment could be found
* for the passed header.
*/
public final static List<SegmentHeader> getSegmentHeaders() {
if (segmentCache != null) {
try {
return segmentCache.getSegmentHeaders()
.get(
MondrianProperties.instance()
.SegmentCacheScanTimeout.get(),
TimeUnit.MILLISECONDS);
} catch (TimeoutException e) {
LOGGER.error(
MondrianResource.instance()
.SegmentCacheScanTimeout.baseMessage,
e);
if (MondrianProperties.instance()
.SegmentCacheFailOnError.get())
{
throw MondrianResource.instance()
.SegmentCacheScanTimeout.ex(e);
}
} catch (Throwable t) {
LOGGER.error("Failed to get a list of segment headers.", t);
if (MondrianProperties.instance()
.SegmentCacheFailOnError.get())
{
throw MondrianResource.instance()
.SegmentCacheFailedToScanSegments.ex(t);
}
}
}
return Collections.emptyList();
}
}

0 comments on commit c1bdadc

Please sign in to comment.