Skip to content

Commit

Permalink
MONDRIAN: Fixes memory leaks in the test suite.
Browse files Browse the repository at this point in the history
Fixes a bug where the grouping sets would mix up the order of the measures and wouldn't map the correct SQL column to the proper cell data.

Fixes a bug in the monitor system where obtaining the server instance (through the Locus) would fail if the connection had been closed before the monitor actor would have time to send the events out.

Fixes a bug in SegmentHeader.canConstrain where it would return true when a cell region to flush was overlapping over an entire axis of the header. When we flush completely an axis, there is no reason to keep that header, since it's completely stale.

Refactored the segment cache indexes so there is one index per RolapStar instance. This prevents memory leaks (the index was owned by the server instance and was refering to the RolapSchema objects). It also makes the indexes more efficient, since an index only contains data relevant to a single star, and not all of the stars of a MondrianServer instance.

Fixes bugs in the finalize() methods of some objects. If an exception is thrown when the garbage collector calls finalize(), the object becomes a true-memory-leak and never gets GC'd even if nobody refers to it.

[git-p4: depot-paths = "//open/mondrian/": change = 14915]
  • Loading branch information
lucboudreau committed Feb 1, 2012
1 parent a8a487b commit b12cfad
Show file tree
Hide file tree
Showing 32 changed files with 1,149 additions and 458 deletions.
20 changes: 20 additions & 0 deletions src/main/mondrian/resource/MondrianResource.xml
Expand Up @@ -849,6 +849,26 @@
</text>
</exception>

<!-- ====================================================================== -->
<!-- GC -->
<exception id="9000001" name="FinalizerErrorRolapSchema">
<text>
An exception was encountered while finalizing a RolapSchema object instance.
</text>
</exception>

<exception id="9000002" name="FinalizerErrorMondrianServerImpl">
<text>
An exception was encountered while finalizing a RolapSchema object instance.
</text>
</exception>

<exception id="9000003" name="FinalizerErrorRolapConnection">
<text>
An exception was encountered while finalizing a RolapConnection object instance.
</text>
</exception>

<!-- End of last error section ============================================ -->

</resourceBundle>
Expand Down
42 changes: 8 additions & 34 deletions src/main/mondrian/rolap/CacheControlImpl.java
Expand Up @@ -10,6 +10,7 @@
package mondrian.rolap;

import mondrian.olap.*;
import mondrian.olap.CacheControl.CellRegion;
import mondrian.olap.Id.Quoting;
import mondrian.resource.MondrianResource;
import mondrian.rolap.agg.SegmentCacheManager;
Expand Down Expand Up @@ -520,22 +521,22 @@ public static List<RolapStar> getStarList(CellRegion region) {

public void printCacheState(
final PrintWriter pw,
CellRegion region)
final CellRegion region)
{
List<RolapStar> starList = getStarList(region);
final List<RolapStar> starList = getStarList(region);
for (RolapStar star : starList) {
star.print(pw, "", false);
}
final SegmentCacheManager manager =
MondrianServer.forConnection(connection)
.getAggregationManager().cacheMgr;
Locus.execute(
Execution.NONE,
connection,
"CacheControlImpl.printCacheState",
new Locus.Action<Object>() {
public Object execute() {
return manager.execute(
new PrintCacheStateCommand(manager, pw, Locus.peek()));
new Locus.Action<Void>() {
public Void execute() {
manager.printCacheState(region, pw, Locus.peek());
return null;
}
});
}
Expand Down Expand Up @@ -1531,33 +1532,6 @@ public void commit() {
}
}

private static class PrintCacheStateCommand
implements SegmentCacheManager.Command<Void>
{
private final SegmentCacheManager manager;
private final PrintWriter pw;
private final Locus locus;

public PrintCacheStateCommand(
SegmentCacheManager manager,
PrintWriter pw,
Locus locus)
{
this.manager = manager;
this.pw = pw;
this.locus = locus;
}

public Void call() {
manager.segmentIndex.printCacheState(pw);
return null;
}

public Locus getLocus() {
return locus;
}
}

/**
* Command that deletes a member and its descendants from the cache.
*/
Expand Down
15 changes: 9 additions & 6 deletions src/main/mondrian/rolap/FastBatchingCellReader.java
Expand Up @@ -13,6 +13,7 @@
import mondrian.rolap.agg.*;
import mondrian.rolap.aggmatcher.AggGen;
import mondrian.rolap.aggmatcher.AggStar;
import mondrian.rolap.cache.SegmentCacheIndex;
import mondrian.rolap.cache.SegmentCacheIndexImpl;
import mondrian.server.Execution;
import mondrian.server.Locus;
Expand Down Expand Up @@ -247,7 +248,7 @@ boolean loadAggregations() {
for (SegmentHeader header : response.cacheSegments) {
final SegmentBody body = cacheMgr.compositeCache.get(header);
if (body == null) {
cacheMgr.remove(header);
cacheMgr.remove(cube.getStar(), header);
failedSegments.add(header);
++failureCount;
continue;
Expand Down Expand Up @@ -442,7 +443,7 @@ private SegmentBody loadSegmentFromCache(
}
body = cacheMgr.compositeCache.get(header);
if (body == null) {
cacheMgr.remove(header);
cacheMgr.remove(cube.getStar(), header);
return null;
}
headerBodies.put(header, body);
Expand Down Expand Up @@ -546,8 +547,10 @@ private void recordCellRequest2(final CellRequest request) {
final RolapStar.Measure measure = request.getMeasure();
final RolapStar star = measure.getStar();
final RolapSchema schema = star.getSchema();
final SegmentCacheIndex index =
cacheMgr.getIndexRegistry().getIndex(star);
final List<SegmentHeader> headersInCache =
cacheMgr.segmentIndex.locate(
index.locate(
schema.getName(),
schema.getChecksum(),
measure.getCubeName(),
Expand All @@ -565,7 +568,7 @@ private void recordCellRequest2(final CellRequest request) {
if (!headersInCache.isEmpty()) {
final SegmentHeader headerInCache = headersInCache.get(0);
final Future<SegmentBody> future =
cacheMgr.segmentIndex.getFuture(headerInCache);
index.getFuture(headerInCache);
if (future != null) {
// Segment header is in cache, body is being loaded. Worker will
// need to wait for load to complete.
Expand All @@ -574,7 +577,7 @@ private void recordCellRequest2(final CellRequest request) {
// Segment is in cache.
cacheHeaders.add(headerInCache);
}
cacheMgr.segmentIndex.setConverter(
index.setConverter(
headerInCache.schemaName,
headerInCache.schemaChecksum,
headerInCache.cubeName,
Expand Down Expand Up @@ -602,7 +605,7 @@ private void recordCellRequest2(final CellRequest request) {
// Don't even bother doing a segment lookup if we can't
// rollup that measure.
final List<List<SegmentHeader>> rollup =
cacheMgr.segmentIndex.findRollupCandidates(
index.findRollupCandidates(
schema.getName(),
schema.getChecksum(),
measure.getCubeName(),
Expand Down
11 changes: 9 additions & 2 deletions src/main/mondrian/rolap/RolapConnection.java
Expand Up @@ -281,8 +281,15 @@ public RolapConnection(

@Override
protected void finalize() throws Throwable {
super.finalize();
close();
try {
super.finalize();
close();
} catch (Throwable t) {
LOGGER.info(
MondrianResource.instance()
.FinalizerErrorRolapConnection.baseMessage,
t);
}
}

/**
Expand Down
42 changes: 21 additions & 21 deletions src/main/mondrian/rolap/RolapSchema.java
Expand Up @@ -13,7 +13,6 @@
package mondrian.rolap;

import mondrian.olap.*;
import mondrian.olap.CacheControl.CellRegion;
import mondrian.olap.fun.*;
import mondrian.olap.type.*;
import mondrian.resource.MondrianResource;
Expand All @@ -40,6 +39,7 @@
import java.lang.reflect.Constructor;
import java.lang.reflect.InvocationTargetException;
import java.util.*;

import javax.sql.DataSource;

/**
Expand Down Expand Up @@ -79,7 +79,7 @@ public class RolapSchema implements Schema {
/**
* Internal use only.
*/
private final RolapConnection internalConnection;
private RolapConnection internalConnection;

/**
* Holds cubes in this schema.
Expand Down Expand Up @@ -205,7 +205,9 @@ private RolapSchema(
final MondrianServer internalServer = MondrianServer.forId(null);
this.internalConnection =
new RolapConnection(internalServer, connectInfo, this, dataSource);
internalServer.addConnection(internalConnection);
internalServer.removeConnection(internalConnection);
internalServer.removeStatement(
internalConnection.getInternalStatement());

this.aggTableManager = new AggTableManager(this);
this.dataSourceChangeListener =
Expand Down Expand Up @@ -333,25 +335,22 @@ static PropertyFormatter createPropertyFormatter(
}

protected void finalCleanUp() {
if (internalConnection != null) {
// REVIEW: Is this supposed to happen???
final CacheControl cacheControl =
internalConnection.getCacheControl(null);
for (Cube cube : getCubes()) {
CellRegion cr =
cacheControl.createMeasuresRegion(cube);
cacheControl.flush(cr);
}
}
if (aggTableManager != null) {
aggTableManager.finalCleanUp();
aggTableManager = null;
}
}

protected void finalize() throws Throwable {
super.finalize();
finalCleanUp();
try {
super.finalize();
finalCleanUp();
} catch (Throwable t) {
LOGGER.info(
MondrianResource.instance()
.FinalizerErrorRolapSchema.baseMessage,
t);
}
}

public boolean equals(Object o) {
Expand Down Expand Up @@ -460,7 +459,7 @@ public List<Exception> getWarnings() {
return Collections.unmodifiableList(warningList);
}

Role getDefaultRole() {
public Role getDefaultRole() {
return defaultRole;
}

Expand Down Expand Up @@ -779,9 +778,10 @@ private Role createRole(MondrianDef.Role xmlRole) {
final boolean ignoreInvalidMembers =
MondrianProperties.instance().IgnoreInvalidMembers
.get();
Member member = schemaReader.getMemberByUniqueName(
Util.parseIdentifier(memberGrant.member),
!ignoreInvalidMembers);
Member member = schemaReader.withLocus()
.getMemberByUniqueName(
Util.parseIdentifier(memberGrant.member),
!ignoreInvalidMembers);
if (member == null) {
// They asked to ignore members that don't exist
// (e.g. [Store].[USA].[Foo]), so ignore this grant
Expand Down Expand Up @@ -1615,7 +1615,7 @@ private MemberReader createMemberReader(
}

public SchemaReader getSchemaReader() {
return new RolapSchemaReader(defaultRole, this);
return new RolapSchemaReader(defaultRole, this).withLocus();
}

/**
Expand Down Expand Up @@ -1738,7 +1738,7 @@ private RolapStar makeRolapStar(final MondrianDef.Relation fact) {
/**
* <code>RolapStarRegistry</code> is a registry for {@link RolapStar}s.
*/
class RolapStarRegistry {
public class RolapStarRegistry {
private final Map<String, RolapStar> stars =
new HashMap<String, RolapStar>();

Expand Down
23 changes: 9 additions & 14 deletions src/main/mondrian/rolap/RolapStar.java
Expand Up @@ -21,6 +21,7 @@
import mondrian.spi.*;
import mondrian.util.Bug;

import org.apache.commons.collections.map.ReferenceMap;
import org.apache.log4j.Logger;

import java.io.PrintWriter;
Expand Down Expand Up @@ -109,11 +110,6 @@ public class RolapStar {
this.changeListener = schema.getDataSourceChangeListener();
}

public AggregationManager getAggregationManager() {
return schema.getInternalConnection().getServer()
.getAggregationManager();
}

/**
* Retrieves the value of the cell identified by a cell request, if it
* can be found in the local cache of the current statement (thread).
Expand Down Expand Up @@ -180,16 +176,16 @@ public Object getCellFromAllCaches(final CellRequest request) {

private Object getCellFromExternalCache(CellRequest request) {
final SegmentWithData segment =
getAggregationManager().cacheMgr.peek(request);
Locus.peek().getServer().getAggregationManager()
.cacheMgr.peek(request);
if (segment == null) {
return null;
}
return segment.getCellValue(request.getSingleValues());
}

public void register(SegmentWithData segment) {
final Bar bar = localBars.get();
bar.segmentRefs.add(
localBars.get().segmentRefs.add(
new SoftReference<SegmentWithData>(segment));
}

Expand All @@ -203,7 +199,7 @@ public void register(SegmentWithData segment) {
public static class Bar {
/** Holds all thread-local aggregations of this star. */
private final Map<AggregationKey, Aggregation> aggregations =
new HashMap<AggregationKey, Aggregation>();
new ReferenceMap(ReferenceMap.WEAK, ReferenceMap.WEAK);

private final List<SoftReference<SegmentWithData>> segmentRefs =
new ArrayList<SoftReference<SegmentWithData>>();
Expand Down Expand Up @@ -547,9 +543,8 @@ void clearCachedAggregations(boolean forced) {
}

// Clear aggregation cache for the current thread context.
final Bar bar = localBars.get();
bar.aggregations.clear();
bar.segmentRefs.clear();
localBars.get().aggregations.clear();
localBars.get().segmentRefs.clear();
}
}

Expand All @@ -573,7 +568,8 @@ public Aggregation lookupOrCreateAggregation(
new Aggregation(
aggregationKey);

localBars.get().aggregations.put(aggregationKey, aggregation);
localBars.get().aggregations.put(
aggregationKey, aggregation);

// Let the change listener get the opportunity to register the
// first time the aggregation is used
Expand All @@ -596,7 +592,6 @@ public Aggregation lookupOrCreateAggregation(
* @see Util#deprecated(Object) currently always returns null -- remove
*/
public Aggregation lookupSegment(AggregationKey aggregationKey) {
// First try thread local cache
return localBars.get().aggregations.get(aggregationKey);
}

Expand Down
14 changes: 14 additions & 0 deletions src/main/mondrian/rolap/agg/Aggregation.java
Expand Up @@ -169,6 +169,20 @@ private List<Segment> createSegments(
compoundPredicateList);
segments.add(segment);
}
// It is important to sort the segments per measure bitkey.
// The order in which the measures come in is not deterministic.
// It actually depends on the order of the CellRequests.
// See: mondrian.rolap.BatchLoader.Batch.add(CellRequest request).
// Failure to sort them will give out wrong results (uses the wrong
// column) if we have more than one column in the grouping set.
Collections.sort(
segments, new Comparator<Segment>() {
public int compare(Segment o1, Segment o2) {
return Integer.valueOf(
o1.measure.getBitPosition())
.compareTo(o2.measure.getBitPosition());
}
});
return segments;
}

Expand Down

0 comments on commit b12cfad

Please sign in to comment.