Skip to content

Commit

Permalink
MONDRIAN-PACINO: Adds flushing to the new architecture for managing s…
Browse files Browse the repository at this point in the history
…egment data. Adds a listener interface to the segment cache SPI so the segment index can get updated.

[git-p4: depot-paths = "//open/mondrian-release/pacino/": change = 14784]
  • Loading branch information
lucboudreau committed Nov 22, 2011
1 parent 4e8c054 commit 8979eb3
Show file tree
Hide file tree
Showing 23 changed files with 1,275 additions and 683 deletions.
68 changes: 65 additions & 3 deletions src/main/mondrian/rolap/CacheControlImpl.java
Expand Up @@ -12,6 +12,9 @@
import mondrian.olap.*;
import mondrian.olap.Id.Quoting;
import mondrian.resource.MondrianResource;
import mondrian.rolap.agg.SegmentCacheManager;
import mondrian.rolap.agg.SegmentHeader;
import mondrian.rolap.agg.SegmentHeader.ConstrainedColumn;
import mondrian.rolap.sql.MemberChildrenConstraint;
import mondrian.server.Execution;
import mondrian.server.Locus;
Expand All @@ -22,6 +25,7 @@
import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.UndeclaredThrowableException;
import java.util.*;
import java.util.Map.Entry;
import java.util.concurrent.Callable;
import javax.sql.DataSource;

Expand Down Expand Up @@ -402,7 +406,7 @@ public void visit(UnionCellRegion region) {
* @param region Cell region
* @return List of members mentioned in cell region specification
*/
static List<Member> findMeasures(CellRegion region) {
public static List<Member> findMeasures(CellRegion region) {
final List<Member> list = new ArrayList<Member>();
final CellRegionVisitor visitor =
new CellRegionVisitorImpl() {
Expand All @@ -423,7 +427,51 @@ public void visit(MemberRangeCellRegion region) {
return list;
}

static List<RolapStar> getStarList(CellRegion region) {
public static ConstrainedColumn[] findAxisValues(CellRegion region) {
final List<ConstrainedColumn> list =
new ArrayList<ConstrainedColumn>();
final CellRegionVisitor visitor =
new CellRegionVisitorImpl() {
public void visit(MemberCellRegion region) {
if (region.dimension.isMeasures()) {
return;
}
final Map<String, Set<Object>> levels =
new HashMap<String, Set<Object>>();
for (Member member : region.memberList) {
final String ccName =
((RolapLevel)member.getLevel())
.getKeyExp().getGenericExpression();
if (!levels.containsKey(ccName)) {
levels.put(ccName, new HashSet<Object>());
}
levels.get(ccName).add(
((RolapMember)member).getKey());
}
for (Entry<String, Set<Object>> entry
: levels.entrySet())
{
list.add(
new ConstrainedColumn(
entry.getKey(),
entry.getValue().toArray()));
}
}
public void visit(MemberRangeCellRegion region) {
// We translate all ranges into wildcards.
// FIXME Optimize this by resolving the list of members
// into an actual list of values for ConstrainedColumn
list.add(
new ConstrainedColumn(
region.level.getKeyExp().getGenericExpression(),
null));
}
};
((CellRegionImpl) region).accept(visitor);
return list.toArray(new ConstrainedColumn[list.size()]);
}

public static List<RolapStar> getStarList(CellRegion region) {
// Figure out which measure (therefore star) it belongs to.
List<RolapStar> starList = new ArrayList<RolapStar>();
final List<Member> measuresList = findMeasures(region);
Expand All @@ -441,13 +489,27 @@ static List<RolapStar> getStarList(CellRegion region) {
}

public void printCacheState(
PrintWriter pw,
final PrintWriter pw,
CellRegion region)
{
List<RolapStar> starList = getStarList(region);
for (RolapStar star : starList) {
star.print(pw, "", false);
}
final SegmentCacheManager manager =
MondrianServer.forConnection(connection)
.getAggregationManager().cacheMgr;
manager.execute(
new SegmentCacheManager.Command<Void>() {
public Void call() throws Exception {
final List<SegmentHeader> headers =
manager.segmentIndex.getAllHeaders();
for (SegmentHeader header : headers) {
pw.println(header.getDescription());
}
return null;
}
});
}

public MemberSet createMemberSet(Member member, boolean descendants)
Expand Down
35 changes: 16 additions & 19 deletions src/main/mondrian/rolap/FastBatchingCellReader.java
Expand Up @@ -150,23 +150,14 @@ public final void recordCellRequest(CellRequest request) {
private void recordCellRequest2(
CellRequest request)
{
final Map<String, Comparable<?>> map =
new HashMap<String, Comparable<?>>();
final RolapStar.Column[] columns =
request.getConstrainedColumns();
final Object[] values = request.getSingleValues();
for (int i = 0; i < columns.length; i++) {
RolapStar.Column column = columns[i];
final Object o = values[i];
map.put(
column.getExpression().getGenericExpression(),
(Comparable<?>) o);
}

// If there is a segment matching these criteria, write it to the list
// of found segments, and remove the cell request from the list.
final AggregationKey key = new AggregationKey(request);
Pair<SegmentHeader, SegmentBody> headerBody =
locateHeaderBody(request, map);
locateHeaderBody(
request,
request.getMappedCellValues(),
key);
if (headerBody != null) {
// A previous cell request in this request might have hit the same
// segment. Only create a segment the first time we see this segment
Expand All @@ -189,10 +180,9 @@ private void recordCellRequest2(
return;
}

// TOOD: try to roll up
// TODO: try to roll up

// Finally, add to a batch. It will turn in to a SQL request.
final AggregationKey key = new AggregationKey(request);
Batch batch = batches.get(key);
if (batch == null) {
batch = new Batch(request);
Expand All @@ -204,7 +194,9 @@ private void recordCellRequest2(
buf.append(request.getConstrainedColumnsBitKey());
buf.append(Util.nl);

for (RolapStar.Column column : columns) {
for (RolapStar.Column column
: request.getConstrainedColumns())
{
buf.append(" ");
buf.append(column);
buf.append(Util.nl);
Expand All @@ -220,11 +212,13 @@ private void recordCellRequest2(
*
* @param request Cell request
* @param map Column values
* @param key Aggregate key.
* @return Segment header and body
*/
private Pair<SegmentHeader, SegmentBody> locateHeaderBody(
CellRequest request,
Map<String, Comparable<?>> map)
Map<String, Comparable<?>> map,
AggregationKey key)
{
final List<SegmentHeader> locate =
aggMgr.cacheMgr.segmentIndex.locate(
Expand All @@ -234,7 +228,10 @@ private Pair<SegmentHeader, SegmentBody> locateHeaderBody(
request.getMeasure().getName(),
request.getMeasure().getStar().getFactTable().getAlias(),
request.getConstrainedColumnsBitKey(),
map);
map,
AggregationKey.getCompoundPredicateArray(
key.getStar(),
key.getCompoundPredicateList()));
for (SegmentHeader header : locate) {
for (SegmentCacheWorker worker : aggMgr.segmentCacheWorkers) {
final SegmentBody body = worker.get(header);
Expand Down
43 changes: 0 additions & 43 deletions src/main/mondrian/rolap/RolapAggregationManager.java
Expand Up @@ -625,49 +625,6 @@ public abstract String getDrillThroughSql(
StarPredicate starPredicateSlicer,
boolean countOnly);

/**
* Returns an API with which to explicitly manage the contents of the cache.
*
* @param connection Server whose cache to control
* @param pw Print writer, for tracing
* @return CacheControl API
*/
public CacheControl getCacheControl(
RolapConnection connection,
final PrintWriter pw)
{
return new CacheControlImpl(connection) {
protected void flushNonUnion(final CellRegion region) {
final List<RolapStar> starList = getStarList(region);

// For each of the candidate stars, scan the list of aggregates.
for (RolapStar star : starList) {
star.flush(this, region);
}
}

public void flush(final CellRegion region) {
if (pw != null) {
pw.println("Cache state before flush:");
printCacheState(pw, region);
pw.println();
}
super.flush(region);
if (pw != null) {
pw.println("Cache state after flush:");
printCacheState(pw, region);
pw.println();
}
}

public void trace(final String message) {
if (pw != null) {
pw.println(message);
}
}
};
}

public static RolapCacheRegion makeCacheRegion(
final RolapStar star,
final CacheControl.CellRegion region)
Expand Down
19 changes: 19 additions & 0 deletions src/main/mondrian/rolap/RolapCacheRegion.java
Expand Up @@ -27,6 +27,8 @@ public class RolapCacheRegion {
private final BitKey bitKey;
private final Map<Integer, StarColumnPredicate> columnPredicates =
new HashMap<Integer, StarColumnPredicate>();
private final Map<String, StarColumnPredicate> columnPredicatesByName =
new HashMap<String, StarColumnPredicate>();
private Map<List<RolapStar.Column>, StarPredicate> predicates =
new HashMap<List<RolapStar.Column>, StarPredicate>();

Expand Down Expand Up @@ -58,6 +60,9 @@ public void addPredicate(
assert !bitKey.get(bitPosition);
bitKey.set(bitPosition);
columnPredicates.put(bitPosition, predicate);
columnPredicatesByName.put(
column.getExpression().getGenericExpression(),
predicate);
}

/**
Expand All @@ -71,6 +76,20 @@ public StarColumnPredicate getPredicate(int columnOrdinal) {
return columnPredicates.get(columnOrdinal);
}

/**
* Returns the predicate associated with the
* <code>columnName</code>, where column name is
* the generic SQL expression in the form of:
*
* <p>&nbsp;&nbsp;&nbsp;&nbsp;table.column
*
* @param columnOrdinal Column ordinal
* @return Predicate, or null if not constrained
*/
public StarColumnPredicate getPredicate(String columnName) {
return columnPredicatesByName.get(columnName);
}

/**
* Adds a predicate which applies to multiple columns.
*
Expand Down
82 changes: 58 additions & 24 deletions src/main/mondrian/rolap/RolapStar.java
Expand Up @@ -30,6 +30,7 @@
import java.sql.Connection;
import java.sql.*;
import java.util.*;

import javax.sql.DataSource;

/**
Expand Down Expand Up @@ -168,6 +169,63 @@ public Object getCellFromCache(
return null;
}

public Object getCellFromAllCaches(final CellRequest request) {
// First, try the local/thread cache.
Object result = getCellFromCache(request, null);
if (result != null) {
return result;
}
// Now ask the segment cache manager.
final AggregationKey aggregationKey = new AggregationKey(request);
return getAggregationManager().cacheMgr.execute(
new SegmentCacheManager.Command<Object>() {
public Object call() throws Exception {
final List<SegmentHeader> headers =
getAggregationManager().cacheMgr
.segmentIndex.locate(
schema.getName(),
schema.getChecksum(),
request.getMeasure().cubeName,
request.getMeasure().getName(),
request.getMeasure().getStar()
.getFactTable().getAlias(),
request.getConstrainedColumnsBitKey(),
request.getMappedCellValues(),
AggregationKey.getCompoundPredicateArray(
RolapStar.this,
aggregationKey
.getCompoundPredicateList()));
if (headers.size() == 0) {
return null;
}
SegmentBody sb = null;
workerLoop:
for (SegmentCacheWorker worker
: getAggregationManager().segmentCacheWorkers)
{
for (SegmentHeader header : headers) {
if (worker.contains(header)) {
sb = worker.get(header);
break workerLoop;
}
}
}
if (sb == null) {
return null;
}
final Segment emptySegment =
headers.get(0).toSegment(
RolapStar.this,
request.getConstrainedColumnsBitKey(),
request.getConstrainedColumns(),
request.getMeasure());
final SegmentWithData segment =
SegmentHeader.addData(emptySegment, sb);
return segment.getCellValue(request.getSingleValues());
}
});
}

public void register(SegmentWithData segment) {
localBars.get().segmentRefs.add(
new SoftReference<SegmentWithData>(segment));
Expand Down Expand Up @@ -552,7 +610,6 @@ public Aggregation lookupOrCreateAggregation(
getAggregationManager(),
aggregationKey);

if (false)
localBars.get().aggregations.put(aggregationKey, aggregation);

// Let the change listener get the opportunity to register the
Expand Down Expand Up @@ -788,29 +845,6 @@ public void print(PrintWriter pw, String prefix, boolean structure) {
}
}

/**
* Flushes the contents of a given region of cells from this star.
*
* @param cacheControl Cache control API
* @param region Predicate defining a region of cells
*/
public void flush(
CacheControl cacheControl,
CacheControl.CellRegion region)
{
// Translate the region into a set of (column, value) constraints.
final RolapCacheRegion cacheRegion =
RolapAggregationManager.makeCacheRegion(this, region);

// Flush the external cache regions
for (SegmentCacheWorker segmentCacheWorker
: getAggregationManager().segmentCacheWorkers)
{
segmentCacheWorker.flush(
SegmentHeader.forCacheRegion(cacheRegion));
}
}

/**
* Returns the listener for changes to this star's underlying database.
*
Expand Down

0 comments on commit 8979eb3

Please sign in to comment.