Skip to content

Commit

Permalink
MONDRIAN: Bug fixes in cache infrastructure.
Browse files Browse the repository at this point in the history
[git-p4: depot-paths = "//open/mondrian/": change = 14876]
  • Loading branch information
julianhyde committed Jan 7, 2012
1 parent 9b5114b commit 95e1ccd
Show file tree
Hide file tree
Showing 6 changed files with 171 additions and 71 deletions.
107 changes: 80 additions & 27 deletions src/main/mondrian/rolap/FastBatchingCellReader.java
Expand Up @@ -209,16 +209,26 @@ boolean loadAggregations() {
if (!isDirty()) {
return false;
}
BatchLoader.LoadBatchResponse response =
cacheMgr.execute(
new BatchLoader.LoadBatchCommand(
Locus.peek(),
cacheMgr,
getDialect(),
cube,
new ArrayList<CellRequest>(cellRequests)));

for (;;) {
// List of futures yielding segments populated by SQL statements. If
// loading requires several iterations, we just append to the list. We
// don't mind if it takes a while for SQL statements to return.
final List<Future<Map<Segment, SegmentWithData>>> sqlSegmentMapFutures =
new ArrayList<Future<Map<Segment, SegmentWithData>>>();

final List<CellRequest> cellRequests1 =
new ArrayList<CellRequest>(cellRequests);

for (int iteration = 0;; ++iteration) {
final BatchLoader.LoadBatchResponse response =
cacheMgr.execute(
new BatchLoader.LoadBatchCommand(
Locus.peek(),
cacheMgr,
getDialect(),
cube,
Collections.unmodifiableList(cellRequests1)));

int failureCount = 0;

// Segments that have been retrieved from cache this cycle. Allows
Expand All @@ -237,6 +247,7 @@ boolean loadAggregations() {
for (SegmentHeader header : response.cacheSegments) {
final SegmentBody body = cacheMgr.compositeCache.get(header);
if (body == null) {
cacheMgr.remove(header);
failedSegments.add(header);
++failureCount;
continue;
Expand Down Expand Up @@ -302,12 +313,32 @@ boolean loadAggregations() {
// Wait for SQL statements to end -- but only if there are no
// failures.
//
// If there are failures, it's more urgent that we create and
// execute a follow-up request. We will wait for the pending SQL
// statements at the end of that.
if (failureCount == 0) {
// If there are failures, and its the first iteration, it's more
// urgent that we create and execute a follow-up request. We will
// wait for the pending SQL statements at the end of that.
//
// If there are failures on later iterations, wait for SQL
// statements to end. The cache might be porous. SQL might be the
// only way to make progress.
sqlSegmentMapFutures.addAll(response.sqlSegmentMapFutures);
if (failureCount == 0 || iteration > 0) {
// Wait on segments being loaded by someone else.
for (Map.Entry<SegmentHeader, Future<SegmentBody>> entry
: response.futures.entrySet())
{
final SegmentHeader header = entry.getKey();
final Future<SegmentBody> bodyFuture = entry.getValue();
final SegmentBody body = Util.safeGet(
bodyFuture,
"Waiting for someone else's segment to load via SQL");
final SegmentWithData segmentWithData =
response.convert(header, body);
segmentWithData.getStar().register(segmentWithData);
}

// Wait on segments being loaded by SQL statements we asked for.
for (Future<Map<Segment, SegmentWithData>> sqlSegmentMapFuture
: response.sqlSegmentMapFutures)
: sqlSegmentMapFutures)
{
final Map<Segment, SegmentWithData> segmentMap =
Util.safeGet(
Expand All @@ -328,21 +359,36 @@ boolean loadAggregations() {

// Figure out which cell requests are not satisfied by any of the
// segments retrieved.
List<CellRequest> unsatisfied = new ArrayList<CellRequest>();
for (CellRequest cellRequest : cellRequests) {
@SuppressWarnings("unchecked")
List<CellRequest> old = new ArrayList<CellRequest>(cellRequests1);
cellRequests1.clear();
for (CellRequest cellRequest : old) {
if (cellRequest.getMeasure().getStar()
.getCellFromCache(cellRequest, null) == null)
{
unsatisfied.add(cellRequest);
cellRequests1.add(cellRequest);
}
}

if (unsatisfied.isEmpty()) {
if (cellRequests1.isEmpty()) {
break;
}

// Form and execute a new request.
throw new UnsupportedOperationException(); // TODO:
if (cellRequests1.size() >= old.size()
&& iteration > 10)
{
throw Util.newError(
"Cache round-trip did not resolve any cell requests. "
+ "Iteration #" + iteration
+ "; request count " + cellRequests1.size()
+ "; requested headers: " + response.cacheSegments.size()
+ "; requested rollups: " + response.rollups.size()
+ "; requested SQL: "
+ response.sqlSegmentMapFutures.size());
}

// Continue loop; form and execute a new request with the smaller
// set of cell requests.
}

dirty = false;
Expand Down Expand Up @@ -395,9 +441,11 @@ private SegmentBody loadSegmentFromCache(
return body;
}
body = cacheMgr.compositeCache.get(header);
if (body != null) {
headerBodies.put(header, body);
if (body == null) {
cacheMgr.remove(header);
return null;
}
headerBodies.put(header, body);
return body;
}

Expand Down Expand Up @@ -444,8 +492,8 @@ class BatchLoader {
private final Set<SegmentHeader> cacheHeaders =
new LinkedHashSet<SegmentHeader>();

private final List<Future<SegmentBody>> futures =
new ArrayList<Future<SegmentBody>>();
private final Map<SegmentHeader, Future<SegmentBody>> futures =
new HashMap<SegmentHeader, Future<SegmentBody>>();

private final List<RollupInfo> rollups = new ArrayList<RollupInfo>();

Expand Down Expand Up @@ -521,7 +569,7 @@ private void recordCellRequest2(final CellRequest request) {
if (future != null) {
// Segment header is in cache, body is being loaded. Worker will
// need to wait for load to complete.
futures.add(future);
futures.put(headerInCache, future);
} else {
// Segment is in cache.
cacheHeaders.add(headerInCache);
Expand Down Expand Up @@ -668,7 +716,8 @@ LoadBatchResponse load(List<CellRequest> cellRequests) {
new ArrayList<SegmentHeader>(cacheHeaders),
rollups,
converterMap,
segmentMapFutures);
segmentMapFutures,
futures);
}

static List<CompositeBatch> groupBatches(List<Batch> batchList) {
Expand Down Expand Up @@ -875,18 +924,22 @@ static class LoadBatchResponse {

final Map<List, SegmentBuilder.SegmentConverter> converterMap;

final Map<SegmentHeader, Future<SegmentBody>> futures;

LoadBatchResponse(
List<CellRequest> cellRequests,
List<SegmentHeader> cacheSegments,
List<RollupInfo> rollups,
Map<List, SegmentBuilder.SegmentConverter> converterMap,
List<Future<Map<Segment, SegmentWithData>>> sqlSegmentMapFutures)
List<Future<Map<Segment, SegmentWithData>>> sqlSegmentMapFutures,
Map<SegmentHeader, Future<SegmentBody>> futures)
{
this.cellRequests = cellRequests;
this.sqlSegmentMapFutures = sqlSegmentMapFutures;
this.cacheSegments = cacheSegments;
this.rollups = rollups;
this.converterMap = converterMap;
this.futures = futures;
}

public SegmentWithData convert(
Expand Down
35 changes: 21 additions & 14 deletions src/main/mondrian/rolap/agg/SegmentCacheManager.java
Expand Up @@ -198,6 +198,8 @@
* <p>23. All code that calls {@link Future#get} should probably handle
* {@link CancellationException}.</p>
*
* <p>24. Obsolete {@link #handler}. Indirection doesn't win anything.</p>
*
*
* @author jhyde
* @version $Id$
Expand Down Expand Up @@ -317,21 +319,23 @@ public void loadFailed(
}

/**
* Removes a segment from segment index and cache.
* Removes a segment from segment index.
*
* <p>Call is asynchronous. It comes back immediately.</p>
*
* <p>Does not remove it from the external cache.</p>
*
* @param cacheMgr Cache manager
* @param header segment header
*/
public void remove(
SegmentCacheManager cacheMgr,
SegmentHeader header)
{
ACTOR.event(
handler,
new SegmentRemoveEvent(
System.currentTimeMillis(),
Locus.peek(),
cacheMgr,
this,
header));
}

Expand Down Expand Up @@ -495,7 +499,7 @@ public void run() {
}

public void visit(ExternalSegmentCreatedEvent event) {
event.cacheMgr.segmentIndex.add(event.header, null, null);
event.cacheMgr.segmentIndex.add(event.header, false, null);

event.locus.getServer().getMonitor().sendEvent(
new CellCacheSegmentCreateEvent(
Expand Down Expand Up @@ -575,7 +579,7 @@ public FlushResult call() throws Exception {
// segments for the region's measures.
if (flushRegion.length == 0) {
for (SegmentHeader header : headers) {
cacheMgr.remove(cacheMgr, header);
cacheMgr.remove(header);
}
return new FlushResult(
Collections.<Callable<Boolean>>emptyList());
Expand All @@ -594,7 +598,7 @@ public FlushResult call() throws Exception {
cacheControlImpl.trace(
"discard segment - it cannot be constrained and maintain consistency: "
+ header.getDescription());
cacheMgr.remove(cacheMgr, header);
cacheMgr.remove(header);
continue;
}
final SegmentHeader newHeader =
Expand Down Expand Up @@ -622,7 +626,7 @@ public Boolean call() throws Exception {
});
}
cacheMgr.segmentIndex.remove(header);
cacheMgr.segmentIndex.add(newHeader, null, null);
cacheMgr.segmentIndex.add(newHeader, false, null);
}

// Done
Expand Down Expand Up @@ -1183,12 +1187,15 @@ public PeekResponse call() {
// Is there a pending segment? (A segment that has been created and
// is loading via SQL.)
for (SegmentHeader header : headers) {
converterMap.put(
SegmentCacheIndexImpl.makeConverterKey(header),
getConverter(header));
headerMap.put(
header,
segmentIndex.getFuture(header));
final Future<SegmentBody> bodyFuture =
segmentIndex.getFuture(header);
if (bodyFuture != null) {
converterMap.put(
SegmentCacheIndexImpl.makeConverterKey(header),
getConverter(header));
headerMap.put(
header, bodyFuture);
}
}

return new PeekResponse(headerMap, converterMap);
Expand Down
6 changes: 3 additions & 3 deletions src/main/mondrian/rolap/agg/SegmentLoader.java
Expand Up @@ -46,7 +46,7 @@
*/
public class SegmentLoader {

private final static Logger LOGGER = Logger.getLogger(SegmentLoader.class);
private static final Logger LOGGER = Logger.getLogger(SegmentLoader.class);

private final SegmentCacheManager cacheMgr;

Expand Down Expand Up @@ -103,7 +103,7 @@ public void load(
for (Segment segment : groupingSet.getSegments()) {
cacheMgr.segmentIndex.add(
segment.getHeader(),
new SlotFuture<SegmentBody>(),
true,
new SegmentBuilder.StarSegmentConverter(
segment.measure,
compoundPredicateList));
Expand Down Expand Up @@ -877,7 +877,7 @@ public boolean equals(Object obj) {
* Collection of rows, each with a set of columns of type Object, double, or
* int. Native types are not boxed.
*/
static protected class RowList {
protected static class RowList {
private final Column[] columns;
private int rowCount = 0;
private int capacity = 0;
Expand Down
8 changes: 5 additions & 3 deletions src/main/mondrian/rolap/cache/SegmentCacheIndex.java
Expand Up @@ -13,7 +13,6 @@
import mondrian.rolap.agg.SegmentBuilder;
import mondrian.spi.*;
import mondrian.util.ByteString;
import mondrian.util.SlotFuture;

import java.io.PrintWriter;
import java.util.List;
Expand Down Expand Up @@ -103,13 +102,16 @@ public List<SegmentHeader> intersectRegion(
/**
* Adds a header to the index.
*
* <p>If {@code loading} is true, there must follow a call to
* {@link #loadSucceeded} or {@link #loadFailed}.</p>
*
* @param header Segment header
* @param bodyFuture Future that will be informed when the segment has
* @param loading Whether segment is pending a load from SQL
* @param converter Segment converter
*/
void add(
SegmentHeader header,
SlotFuture<SegmentBody> bodyFuture,
boolean loading,
SegmentBuilder.SegmentConverter converter);

/**
Expand Down

0 comments on commit 95e1ccd

Please sign in to comment.