Skip to content

Commit

Permalink
MONDRIAN: Only flush cache of aggregates belonging to the current thr…
Browse files Browse the repository at this point in the history
…ead. This used to be not thread-safe when cubes had cache turned off. This could lead to RolapResult.executeBody() running until maxEvalDepth was reached, due to the fact the cache was flushed by another thread.

[git-p4: depot-paths = "//open/mondrian/": change = 8481]
  • Loading branch information
Bart Pappyn committed Jan 8, 2007
1 parent f04a8c3 commit bdc00f2
Showing 1 changed file with 95 additions and 7 deletions.
102 changes: 95 additions & 7 deletions src/main/mondrian/rolap/RolapStar.java
Expand Up @@ -85,7 +85,7 @@ public void execute(Property property, String value) {
private final Map<RolapCube, Map<RolapLevel, Column>> mapCubeToMapLevelToColumn;

/** holds all aggregations of this star */
private Map<BitKey,Aggregation> aggregations;
private Map<RolapStarAggregationKey,Aggregation> aggregations;

/** how many columns (column and columnName) there are */
private int columnCount;
Expand Down Expand Up @@ -120,7 +120,7 @@ public void execute(Property property, String value) {

this.mapCubeToMapLevelToColumn =
new HashMap<RolapCube, Map<RolapLevel, Column>>();
this.aggregations = new HashMap<BitKey, Aggregation>();
this.aggregations = new HashMap<RolapStarAggregationKey, Aggregation>();

clearAggStarList();

Expand Down Expand Up @@ -281,7 +281,8 @@ boolean isCacheAggregations() {
* caching set to off.
*
* @param forced if true, then the cached aggregations are cleared
* regardless of any other settings.
* regardless of any other settings. When set to false, only cache
* from the current thread context is cleared.
*/
void clearCachedAggregations(boolean forced) {
if (forced || (! this.cacheAggregations) || RolapStar.disableCaching) {
Expand All @@ -294,8 +295,26 @@ void clearCachedAggregations(boolean forced) {
buf.append(getFactTable().getAlias());
LOGGER.debug(buf.toString());
}

aggregations.clear();
synchronized(aggregations) {
if (forced) {
aggregations.clear();
}
else {
/* Only clear aggregation cache for the currect thread context */
Thread thread = Thread.currentThread();
long threadId = thread.getId();

Iterator<Map.Entry<RolapStarAggregationKey, Aggregation>> it = aggregations.entrySet().iterator();
while (it.hasNext()) {
Map.Entry<RolapStarAggregationKey, Aggregation> e = it.next();
RolapStarAggregationKey aggregationKey = e.getKey();

if (aggregationKey.getThreadId() == threadId) {
it.remove();
}
}
}
}
}
}

Expand All @@ -309,7 +328,9 @@ public Aggregation lookupOrCreateAggregation(final BitKey bitKey) {
Aggregation aggregation = lookupAggregation(bitKey);
if (aggregation == null) {
aggregation = new Aggregation(this, bitKey);
this.aggregations.put(bitKey, aggregation);
RolapStarAggregationKey aggregationKey = new RolapStarAggregationKey(bitKey, (! this.cacheAggregations) || RolapStar.disableCaching);

this.aggregations.put(aggregationKey, aggregation);
}
return aggregation;
}
Expand All @@ -323,7 +344,8 @@ public Aggregation lookupOrCreateAggregation(final BitKey bitKey) {
*/
public Aggregation lookupAggregation(BitKey bitKey) {
synchronized (aggregations) {
return aggregations.get(bitKey);
RolapStarAggregationKey aggregationKey = new RolapStarAggregationKey(bitKey, (! this.cacheAggregations) || RolapStar.disableCaching);
return aggregations.get(aggregationKey);
}
}

Expand Down Expand Up @@ -1645,5 +1667,71 @@ private String visit(String table) {
}
}
}
/*
* Class to create an aggregation key for the cache that is thread based.
*/
class RolapStarAggregationKey {
private BitKey bitKey;
private long threadId;
/*
* Creates an aggregation key, specify whether the cache is local to the thread
*/
RolapStarAggregationKey(final BitKey bitKey, boolean threadLocal) {
this.bitKey = bitKey.copy();
if (threadLocal) {
Thread thread = Thread.currentThread();
this.threadId = thread.getId();
}
else {
this.threadId = -1;
}
}
public int hashCode() {
int c = 1;
if (bitKey != null) {
c = bitKey.hashCode();
}
if (threadId != -1) {
c = c ^ (int)threadId;
}
return c;
}
public boolean equals(Object other) {
if (other instanceof RolapStarAggregationKey) {
RolapStarAggregationKey aggregationKey = (RolapStarAggregationKey) other;
return bitKey.equals(aggregationKey.bitKey) && (threadId == aggregationKey.threadId);
} else {
return false;
}
}
public String toString() {
return bitKey.toString() + " thread id = " + threadId;
}
/**
* @return Returns the bitKey.
*/
public BitKey getBitKey() {
return bitKey;
}
/**
* @param bitKey The bitKey to set.
*/
public void setBitKey(BitKey bitKey) {
this.bitKey = bitKey;
}
/**
* @return Returns the threadId.
*/
public long getThreadId() {
return threadId;
}
/**
* @param threadId The threadId to set.
*/
public void setThreadId(long threadId) {
this.threadId = threadId;
}
};


// End RolapStar.java

0 comments on commit bdc00f2

Please sign in to comment.