Skip to content

Commit

Permalink
MONDRIAN: Refactors the SegmentCache SPI to allow flushing within the…
Browse files Browse the repository at this point in the history
… external cache. Splits the dimensionality from the compound predicates in the SegmentHeader so that the SegmentCache SPI can expose a flush(ConstrainedColumn[]) method and implement it effectively.

[git-p4: depot-paths = "//open/mondrian/": change = 14644]
  • Loading branch information
lucboudreau committed Oct 3, 2011
1 parent ff82953 commit 1dd12d1
Show file tree
Hide file tree
Showing 9 changed files with 238 additions and 82 deletions.
3 changes: 1 addition & 2 deletions src/main/mondrian/rolap/CacheControlImpl.java
Expand Up @@ -171,8 +171,7 @@ public void flush(CellRegion region) {
break;
}
}
if (!found)
{
if (!found) {
throw MondrianResource.instance().CacheFlushRegionMustContainMembers
.ex();
}
Expand Down
7 changes: 7 additions & 0 deletions src/main/mondrian/rolap/RolapCacheRegion.java
Expand Up @@ -101,6 +101,13 @@ public void addPredicate(StarPredicate predicate)
public Collection<StarPredicate> getPredicates() {
return predicates.values();
}

/**
* Returns the list of all column predicates.
*/
public Collection<StarColumnPredicate> getColumnPredicates() {
return columnPredicates.values();
}
}

// End RolapCacheRegion.java
24 changes: 8 additions & 16 deletions src/main/mondrian/rolap/agg/Aggregation.java
Expand Up @@ -174,8 +174,11 @@ public void load(
// Segments are loaded using group by grouping sets
// by CompositeBatch.loadAggregation
} else {
final List<GroupingSet> gsList =
new ArrayList<GroupingSet>();
gsList.add(groupingSet);
new SegmentLoader().load(
Collections.singletonList(groupingSet),
gsList,
pinnedSegments,
compoundPredicateList);
}
Expand Down Expand Up @@ -454,9 +457,6 @@ public void flush(
cacheControl.trace(
"discard segment - it has no columns in common: "
+ segment);
// Removes the segment from the external cache, if any.
SegmentCacheWorker.remove(
SegmentHeader.forSegment(segment));
continue;
}

Expand Down Expand Up @@ -562,11 +562,6 @@ public void flush(
final Object[] axisKeys = axis.getKeys();

if (axisBitSet.cardinality() == 0) {
// If one axis is empty, the entire segment is empty.
// Discard it.
// Removes the segment from the external cache, if any.
SegmentCacheWorker.remove(
SegmentHeader.forSegment(segment));
continue segmentLoop;
}

Expand Down Expand Up @@ -622,9 +617,6 @@ public void flush(
// throw away a segment which has a few cells left.
int remainingCellCount = segment.getCellCount();
if (remainingCellCount - cellCount <= 0) {
// Removes the segment from the external cache, if any.
SegmentCacheWorker.remove(
SegmentHeader.forSegment(segment));
continue;
}

Expand Down Expand Up @@ -663,13 +655,13 @@ public void flush(
bestColumnPredicate,
excludedRegions);

// Removes the segment from the external cache, if any.
SegmentCacheWorker.remove(
SegmentHeader.forSegment(segment));

newSegmentRefs.add(new SoftReference<Segment>(newSegment));
}

// Flush the external cache regions
SegmentCacheWorker.flush(
SegmentHeader.forCacheRegion(cacheRegion));

// Replace list of segments.
// FIXME: Synchronize.
// TODO: Replace segmentRefs, don't copy.
Expand Down
6 changes: 4 additions & 2 deletions src/main/mondrian/rolap/agg/Segment.java
Expand Up @@ -299,8 +299,10 @@ private String printSegmentHeaderInfo(String sep) {
buf.append(sep);
buf.append("measure=");
buf.append(
measure.getAggregator().getExpression(
measure.getExpression().getGenericExpression()));
measure.getExpression() == null
? measure.getAggregator().getExpression("*")
: measure.getAggregator().getExpression(
measure.getExpression().getGenericExpression()));
return buf.toString();
}

Expand Down
96 changes: 60 additions & 36 deletions src/main/mondrian/rolap/agg/SegmentCacheWorker.java
Expand Up @@ -11,19 +11,16 @@

import java.util.Collections;
import java.util.List;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import mondrian.olap.MondrianProperties;
import mondrian.olap.Util;
import mondrian.resource.MondrianResource;
import mondrian.rolap.agg.SegmentHeader.ConstrainedColumn;
import mondrian.spi.SegmentCache;
import mondrian.util.ServiceDiscovery;

import org.apache.log4j.Logger;
import org.eigenbase.util.property.Property;
import org.eigenbase.util.property.TriggerBase;

/**
* Utility class to interact with the {@link SegmentCache}.
Expand All @@ -36,35 +33,21 @@ public final class SegmentCacheWorker {
private final static Logger LOGGER =
Logger.getLogger(SegmentCacheWorker.class);
private static SegmentCache segmentCache = null;
private final static ExecutorService executor =
Util.getExecutorService(
1,
"mondrian.rolap.agg.SegmentCacheWorker$ExecutorThread");
private final static ServiceDiscovery<SegmentCache> serviceDiscovery =
ServiceDiscovery.forClass(SegmentCache.class);

static {
initCache();
// Rig up a trigger to the SegmentCache property to hot-swap the cache.
MondrianProperties.instance().SegmentCache.addTrigger(
new TriggerBase(true) {
public void execute(Property property, String value) {
setCache(value);
}
}
);
}

private static void initCache() {
// Always call prior to using segmentCache
private synchronized static void initCache() {
// First try to get the segmentcache impl class from
// mondrian properties.
final String cacheName =
MondrianProperties.instance().SegmentCache.get();
if (cacheName != null) {
setCache(cacheName);

if (segmentCache == null) {
// There was a property defined. We use this one
// by default.
setCache(cacheName);
} else {

// There was no property set. Let's look for Java services.
final List<Class<SegmentCache>> implementors =
serviceDiscovery.getImplementor();
Expand All @@ -84,9 +67,9 @@ private static void initCache() {
* @param cacheName Name of class that implements the {@link SegmentCache}
* API
*/
public static synchronized void setCache(String cacheName) {
private static void setCache(String cacheName) {
try {
final SegmentCache cache = getSegmentCache();
final SegmentCache cache = segmentCache;
if (cache != null
&& cacheName != null
&& cache.getClass().getName().equals(cacheName))
Expand All @@ -96,18 +79,13 @@ public static synchronized void setCache(String cacheName) {
return;
}
if (cache != null) {
executor.submit(
new Runnable() {
public void run() {
LOGGER.debug("Tearing down segment cache.");
cache.tearDown();
}
});
cache.tearDown();
}
segmentCache = null;
if (cacheName == null
|| cacheName.equals(""))
|| cacheName.equals(""))
{
// We're done here.
return;
}
LOGGER.debug("Starting cache instance:" + cacheName);
Expand Down Expand Up @@ -136,6 +114,7 @@ public void run() {
}

private static SegmentCache getSegmentCache() {
initCache();
return segmentCache;
}

Expand All @@ -151,7 +130,6 @@ private static SegmentCache getSegmentCache() {
* for the passed header.
*/
public static SegmentBody get(SegmentHeader header) {
initCache();
final SegmentCache cache = getSegmentCache();
if (cache != null) {
try {
Expand Down Expand Up @@ -316,6 +294,53 @@ public static void remove(SegmentHeader header) {
}
}

/**
* Flushes a segment from the cache. Returns true or false
* if the operation succeeds.
*
* <p>To adjust timeout values, set the
* {@link MondrianProperties#SegmentCacheWriteTimeout} property.
*
* @param header A region to flush from the segment cache.
*/
public static void flush(ConstrainedColumn[] region) {
initCache();
final SegmentCache cache = getSegmentCache();
if (cache != null) {
try {
final boolean result =
cache.flush(region)
.get(
MondrianProperties.instance()
.SegmentCacheWriteTimeout.get(),
TimeUnit.MILLISECONDS);
if (!result) {
LOGGER.error(
MondrianResource.instance()
.SegmentCacheFailedToDeleteSegment
.baseMessage);
throw MondrianResource.instance()
.SegmentCacheFailedToDeleteSegment.ex();
}
} catch (TimeoutException e) {
LOGGER.error(
MondrianResource.instance()
.SegmentCacheReadTimeout.baseMessage,
e);
throw MondrianResource.instance()
.SegmentCacheReadTimeout.ex(e);
} catch (Throwable t) {
LOGGER.error(
MondrianResource.instance()
.SegmentCacheFailedToDeleteSegment
.baseMessage,
t);
throw MondrianResource.instance()
.SegmentCacheFailedToDeleteSegment.ex(t);
}
}
}

/**
* Returns a list of segments present in the cache.
*
Expand Down Expand Up @@ -354,7 +379,6 @@ public static List<SegmentHeader> getSegmentHeaders() {
}

public static boolean isCacheEnabled() {
initCache();
return getSegmentCache() != null;
}
}
Expand Down

0 comments on commit 1dd12d1

Please sign in to comment.