Skip to content

Commit

Permalink
MONDRIAN-PACINO: Refactors the FlushCommand into a private class of S…
Browse files Browse the repository at this point in the history
…egmentcacheManager.

Adds SegmentCacheManager.flush().

Fixes an issue with the event test. It wasn't waiting for propagation of the cache update.

[git-p4: depot-paths = "//open/mondrian-release/pacino/": change = 14792]
  • Loading branch information
lucboudreau committed Nov 23, 2011
1 parent db53882 commit 02030cd
Show file tree
Hide file tree
Showing 5 changed files with 232 additions and 202 deletions.
109 changes: 2 additions & 107 deletions src/main/mondrian/rolap/agg/AggregationManager.java
Expand Up @@ -159,8 +159,7 @@ public CacheControl getCacheControl(
{
return new CacheControlImpl(connection) {
protected void flushNonUnion(final CellRegion region) {
cacheMgr.execute(
new FlushCommand(region, this));
cacheMgr.flush(AggregationManager.this, this, region);
}

public void flush(final CellRegion region) {
Expand All @@ -185,111 +184,7 @@ public void trace(final String message) {
};
}

private final class FlushCommand implements Command<Void> {
private final CellRegion region;
private final CacheControlImpl cacheControlImpl;
public FlushCommand(
final CellRegion region,
final CacheControlImpl cacheControlImpl)
{
this.region = region;
this.cacheControlImpl = cacheControlImpl;
}
public Void call() throws Exception {
/*
* For each measure and each star, ask the index
* which headers intersect.
*/
final List<SegmentHeader> headers =
new ArrayList<SegmentHeader>();
final List<Member> measures =
CacheControlImpl.findMeasures(region);
final ConstrainedColumn[] flushRegion =
CacheControlImpl.findAxisValues(region);

for (Member member : measures) {
if (!(member instanceof RolapStoredMeasure)) {
continue;
}
RolapStoredMeasure storedMeasure =
(RolapStoredMeasure) member;
headers.addAll(
cacheMgr.segmentIndex.intersectRegion(
member.getDimension().getSchema().getName(),
((RolapSchema)member.getDimension()
.getSchema()).getChecksum(),
storedMeasure.getCube().getName(),
storedMeasure.getName(),
storedMeasure.getCube().getStar()
.getFactTable().getAlias(),
flushRegion));
}

// If flushregion is empty, this means we must clear all
// segments for the region's measures.
if (flushRegion.length == 0) {
for (SegmentHeader header : headers) {
for (SegmentCacheWorker worker
: AggregationManager.this.segmentCacheWorkers)
{
if (worker.contains(header)) {
worker.remove(header);
}
cacheMgr.segmentIndex.remove(header);
}
}
return null;
}

// Now we know which headers intersect. For each of them,
// we append an excluded region.
// TODO optimize the logic here. If a segment is mostly
// empty, we should thrash it completely.
for (SegmentHeader header : headers) {
if (!header.canConstrain(flushRegion)) {
// We have to delete that segment altogether.
cacheControlImpl.trace(
"discard segment - it cannot be constrained and maintain consistency: "
+ header.getDescription());
for (SegmentCacheWorker worker
: AggregationManager.this.segmentCacheWorkers)
{
if (worker.contains(header)) {
worker.remove(header);
}
}
cacheMgr.segmentIndex.remove(header);
continue;
}
final SegmentHeader newHeader =
header.constrain(flushRegion);
for (SegmentCacheWorker worker
: AggregationManager.this.segmentCacheWorkers)
{
if (worker.supportsRichIndex()) {
final SegmentBody sb = worker.get(header);
if (worker.contains(header)) {
worker.remove(header);
}
if (sb != null) {
worker.put(newHeader, sb);
}
} else {
// The cache doesn't support rich index. We have
// to clear the segment entirely.
if (worker.contains(header)) {
worker.remove(header);
}
}
}
cacheMgr.segmentIndex.remove(header);
cacheMgr.segmentIndex.add(newHeader);
}

// Done
return null;
}
}


public Object getCellFromCache(CellRequest request) {
return getCellFromCache(request, null);
Expand Down
167 changes: 162 additions & 5 deletions src/main/mondrian/rolap/agg/SegmentCacheManager.java
Expand Up @@ -9,8 +9,12 @@
*/
package mondrian.rolap.agg;

import mondrian.olap.CacheControl;
import mondrian.olap.Member;
import mondrian.olap.Util;
import mondrian.olap.CacheControl.CellRegion;
import mondrian.rolap.*;
import mondrian.rolap.agg.SegmentHeader.ConstrainedColumn;
import mondrian.rolap.cache.SegmentCacheIndex;
import mondrian.rolap.cache.SegmentCacheIndexImpl;
import mondrian.util.Pair;
Expand Down Expand Up @@ -289,7 +293,7 @@ public void loadFailed(
}

/**
* Adds segment to segment index and cache.
* Adds a segment to segment index and cache.
*
* <p>Called when a SQL statement has finished loading a segment.</p>
*
Expand All @@ -307,6 +311,33 @@ public void add(
new SegmentAddEvent(aggMgr, header, body));
}

/**
* Removes a segment to segment index and cache.
*
* @param aggMgr Aggregate manager
* @param header segment header
* @param body segment body
*/
public void remove(
AggregationManager aggMgr,
SegmentHeader header)
{
ACTOR.event(
handler,
new SegmentRemoveEvent(aggMgr, header));
}

public void flush(
AggregationManager aggMan,
CacheControl cacheControl,
CacheControl.CellRegion region)
{
execute(new FlushCommand(
aggMan,
region,
(CacheControlImpl)cacheControl));
}

/**
* Tells the cache that a segment is newly available in an external cache.
*/
Expand Down Expand Up @@ -339,6 +370,7 @@ public interface Visitor {
void visit(SegmentLoadSucceededEvent event);
void visit(SegmentLoadFailedEvent event);
void visit(SegmentAddEvent event);
void visit(SegmentRemoveEvent event);
void visit(ExternalSegmentCreatedEvent event);
void visit(ExternalSegmentDeletedEvent event);
}
Expand Down Expand Up @@ -382,6 +414,17 @@ public void visit(SegmentAddEvent event) {
}
}

public void visit(SegmentRemoveEvent event) {
event.aggMgr.cacheMgr.segmentIndex.remove(event.header);
for (SegmentCacheWorker segmentCacheWorker
: event.aggMgr.segmentCacheWorkers)
{
if (segmentCacheWorker.contains(event.header)) {
segmentCacheWorker.remove(event.header);
}
}
}

public void visit(ExternalSegmentCreatedEvent event) {
event.aggMgr.cacheMgr.segmentIndex.add(event.header);
}
Expand All @@ -408,6 +451,101 @@ interface Message {
public static interface Command<T> extends Message, Callable<T> {
}

private final class FlushCommand implements Command<Void> {
private final CellRegion region;
private final CacheControlImpl cacheControlImpl;
private final AggregationManager aggMan;
public FlushCommand(
AggregationManager aggMan,
CellRegion region,
CacheControlImpl cacheControlImpl)
{
this.aggMan = aggMan;
this.region = region;
this.cacheControlImpl = cacheControlImpl;
}
public Void call() throws Exception {
/*
* For each measure and each star, ask the index
* which headers intersect.
*/
final List<SegmentHeader> headers =
new ArrayList<SegmentHeader>();
final List<Member> measures =
CacheControlImpl.findMeasures(region);
final ConstrainedColumn[] flushRegion =
CacheControlImpl.findAxisValues(region);

for (Member member : measures) {
if (!(member instanceof RolapStoredMeasure)) {
continue;
}
RolapStoredMeasure storedMeasure =
(RolapStoredMeasure) member;
headers.addAll(
segmentIndex.intersectRegion(
member.getDimension().getSchema().getName(),
((RolapSchema)member.getDimension()
.getSchema()).getChecksum(),
storedMeasure.getCube().getName(),
storedMeasure.getName(),
storedMeasure.getCube().getStar()
.getFactTable().getAlias(),
flushRegion));
}

// If flushregion is empty, this means we must clear all
// segments for the region's measures.
if (flushRegion.length == 0) {
for (SegmentHeader header : headers) {
remove(aggMan, header);
}
return null;
}

// Now we know which headers intersect. For each of them,
// we append an excluded region.
// TODO optimize the logic here. If a segment is mostly
// empty, we should thrash it completely.
for (SegmentHeader header : headers) {
if (!header.canConstrain(flushRegion)) {
// We have to delete that segment altogether.
cacheControlImpl.trace(
"discard segment - it cannot be constrained and maintain consistency: "
+ header.getDescription());
remove(aggMan, header);
continue;
}
final SegmentHeader newHeader =
header.constrain(flushRegion);
for (SegmentCacheWorker worker
: aggMan.segmentCacheWorkers)
{
if (worker.supportsRichIndex()) {
final SegmentBody sb = worker.get(header);
if (worker.contains(header)) {
worker.remove(header);
}
if (sb != null) {
worker.put(newHeader, sb);
}
} else {
// The cache doesn't support rich index. We have
// to clear the segment entirely.
if (worker.contains(header)) {
worker.remove(header);
}
}
}
segmentIndex.remove(header);
segmentIndex.add(newHeader);
}

// Done
return null;
}
}

private static class ShutdownCommand implements Command<String> {
public String call() throws Exception {
return "Shutdown succeeded";
Expand Down Expand Up @@ -519,9 +657,9 @@ private static class Actor implements Runnable {
private final BlockingQueue<Pair<Handler, Message>> eventQueue =
new ArrayBlockingQueue<Pair<Handler, Message>>(1000);

private final ResponseQueue<Command, Pair<Object, Throwable>>
private final ResponseQueue<Command<?>, Pair<Object, Throwable>>
responseQueue =
new ResponseQueue<Command, Pair<Object, Throwable>>(1000);
new ResponseQueue<Command<?>, Pair<Object, Throwable>>(1000);

public void run() {
thread = Thread.currentThread();
Expand All @@ -534,8 +672,8 @@ public void run() {
// A message is either a command or an event.
// A command returns a value that must be read by
// the caller.
if (message instanceof Command) {
Command command = (Command) message;
if (message instanceof Command<?>) {
Command<?> command = (Command<?>) message;
try {
Object result = command.call();
responseQueue.put(
Expand Down Expand Up @@ -670,6 +808,25 @@ public void acceptWithoutResponse(Visitor visitor) {
}
}

private static class SegmentRemoveEvent extends Event {
private final AggregationManager aggMgr;
private final SegmentHeader header;

public SegmentRemoveEvent(
AggregationManager aggMgr,
SegmentHeader header)
{
assert header != null;
assert aggMgr != null;
this.aggMgr = aggMgr;
this.header = header;
}

public void acceptWithoutResponse(Visitor visitor) {
visitor.visit(this);
}
}

private static class ExternalSegmentCreatedEvent extends Event {
private final AggregationManager aggMgr;
private final SegmentHeader header;
Expand Down
6 changes: 5 additions & 1 deletion src/main/mondrian/spi/SegmentCache.java
Expand Up @@ -13,7 +13,6 @@
import mondrian.olap.MondrianProperties;
import mondrian.rolap.agg.SegmentBody;
import mondrian.rolap.agg.SegmentHeader;
import mondrian.rolap.agg.SegmentHeader.ConstrainedColumn;
import mondrian.spi.SegmentCache.SegmentCacheListener.SegmentCacheEvent;

import java.util.List;
Expand Down Expand Up @@ -127,6 +126,11 @@ public interface SegmentCache {
* to the state of the cache and be notified of changes to its
* state or its entries. Mondrian will automatically register
* a listener with the implementations it uses.
*
* Implementations of SegmentCache should only send events if the
* cause of the event is not Mondrian itself. Only in cases where
* the cache gets updated by other Mondrian nodes or by a third
* party application is it required to use this interface.
*/
interface SegmentCacheListener {
/**
Expand Down

0 comments on commit 02030cd

Please sign in to comment.