Skip to content

Commit

Permalink
MONDRIAN-PACINO: Execute SQL to load segments in a worker thread, sep…
Browse files Browse the repository at this point in the history
…arate from

    both the MDX statement thread and the actor that manages the segment cache.
    When the SQL worker has finished loading a segment, it tells the segment
    cache to add it to its index & external cache, and also tells any MDX
    statements that are waiting on the segment-future.

    Reduce the number of copies that are made when translating between external
    and internal segment dataset representations.

    Add class ArraySortedSet (prevents a copy when building segment axes; may
    be useful low-memory, good-locality implementation of read-only sets).

    Improve error-handling. If there is an error while loading segments, we
    consider the error adequately handled if at least one segment has not
    finished loading and so we are able to set its status to 'failed'.

[git-p4: depot-paths = "//open/mondrian-release/pacino/": change = 14777]
  • Loading branch information
julianhyde committed Nov 18, 2011
1 parent 8f5ace0 commit 4e8c054
Show file tree
Hide file tree
Showing 22 changed files with 739 additions and 335 deletions.
4 changes: 2 additions & 2 deletions src/main/mondrian/rolap/FastBatchingCellReader.java
Expand Up @@ -216,7 +216,7 @@ private void recordCellRequest2(
}

/**
* Locates a segment that acutally exists.
* Locates a segment that actually exists.
*
* @param request Cell request
* @param map Column values
Expand All @@ -227,7 +227,7 @@ private Pair<SegmentHeader, SegmentBody> locateHeaderBody(
Map<String, Comparable<?>> map)
{
final List<SegmentHeader> locate =
aggMgr.segmentIndex.locate(
aggMgr.cacheMgr.segmentIndex.locate(
request.getMeasure().getStar().getSchema().getName(),
request.getMeasure().getStar().getSchema().getChecksum(),
request.getMeasure().getCubeName(),
Expand Down
16 changes: 12 additions & 4 deletions src/main/mondrian/rolap/agg/AbstractSegmentBody.java
Expand Up @@ -9,6 +9,9 @@
*/
package mondrian.rolap.agg;

import mondrian.util.Pair;

import java.util.List;
import java.util.SortedSet;

/**
Expand All @@ -24,12 +27,17 @@ abstract class AbstractSegmentBody implements SegmentBody {
private final boolean[] nullAxisFlags;

public AbstractSegmentBody(
SortedSet<Comparable<?>>[] axisValueSets,
boolean[] nullAxisFlags)
List<Pair<SortedSet<Comparable<?>>, Boolean>> axes)
{
super();
this.axisValueSets = axisValueSets.clone();
this.nullAxisFlags = nullAxisFlags.clone();
//noinspection unchecked
this.axisValueSets = new SortedSet[axes.size()];
this.nullAxisFlags = new boolean[axes.size()];
for (int i = 0; i < axes.size(); i++) {
Pair<SortedSet<Comparable<?>>, Boolean> pair = axes.get(i);
axisValueSets[i] = pair.left;
nullAxisFlags[i] = pair.right;
}
}

public SortedSet<Comparable<?>>[] getAxisValueSets() {
Expand Down
10 changes: 7 additions & 3 deletions src/main/mondrian/rolap/agg/AggregationManager.java
Expand Up @@ -24,7 +24,7 @@
import org.apache.log4j.Logger;

import java.util.*;
import java.util.concurrent.Future;
import java.util.concurrent.*;

/**
* <code>RolapAggregationManager</code> manages all {@link Aggregation}s
Expand All @@ -49,8 +49,12 @@ public class AggregationManager extends RolapAggregationManager {

private static AggregationManager instance;

public final SegmentCacheIndex segmentIndex =
new SegmentCacheIndexImpl();
// TODO: create using factory and/or configuration parameters. Executor
// should be shared within MondrianServer or target JDBC database.
public final Executor sqlExecutor =
new ThreadPoolExecutor(
3, 10, 1, TimeUnit.SECONDS,
new ArrayBlockingQueue<Runnable>(10));

/**
* Returns or creates the singleton.
Expand Down
55 changes: 31 additions & 24 deletions src/main/mondrian/rolap/agg/DenseDoubleSegmentBody.java
Expand Up @@ -9,59 +9,66 @@
*/
package mondrian.rolap.agg;

import mondrian.util.Pair;

import java.util.*;

/**
* Implementation of a segment body which stores the data inside
* a dense primitive array of double precision numbers.
*
* @author LBoudreau
* @version $Id$
*/
class DenseDoubleSegmentBody extends AbstractSegmentBody {
private static final long serialVersionUID = 5775717165497921144L;
final double[] data;
private final int size;

private final double[] values;
private final BitSet nullIndicators;

/**
* Creates a DenseDoubleSegmentBody.
*
* <p>Stores the given array of cell values and null indicators; caller must
* not modify them afterwards.</p>
*
* @param nullIndicators Null indicators
* @param values Cell values
* @param axes Axes
*/
DenseDoubleSegmentBody(
BitSet nullIndicators,
double[] dataToSave,
int size,
SortedSet<Comparable<?>>[] axisValueSets,
boolean[] nullAxisFlags)
double[] values,
List<Pair<SortedSet<Comparable<?>>, Boolean>> axes)
{
super(axisValueSets, nullAxisFlags);
this.size = size;
this.data = new double[size];
System.arraycopy(dataToSave, 0, data, 0, size);
this.nullIndicators = new BitSet(nullIndicators.length());
this.nullIndicators.or(nullIndicators);
super(axes);
this.values = values;
this.nullIndicators = nullIndicators;
}

public SegmentDataset createSegmentDataset(
Segment segment,
SegmentAxis[] axes)
{
DenseDoubleSegmentDataset ds =
new DenseDoubleSegmentDataset(axes, this.size);
System.arraycopy(data, 0, ds.values, 0, this.size);
ds.nullIndicators.clear();
ds.nullIndicators.or(nullIndicators);
return ds;
return new DenseDoubleSegmentDataset(axes, values, nullIndicators);
}

@Override
public String toString() {
StringBuilder sb = new StringBuilder();
sb.append("DenseDoubleSegmentBody(size=" + size);
sb.append("DenseDoubleSegmentBody(size=");
sb.append(values.length);
sb.append(", data=");
sb.append(Arrays.toString(data));
sb.append(", nullIndicators=" + nullIndicators);
sb.append(", axisValueSets=" + Arrays.toString(getAxisValueSets()));
sb.append(", nullAxisFlags=" + Arrays.toString(getNullAxisFlags()));
sb.append(Arrays.toString(values));
sb.append(", nullIndicators=").append(nullIndicators);
sb.append(", axisValueSets=");
sb.append(Arrays.toString(getAxisValueSets()));
sb.append(", nullAxisFlags=");
sb.append(Arrays.toString(getNullAxisFlags()));
if (getAxisValueSets().length > 0) {
if (getAxisValueSets()[0].iterator().hasNext()) {
sb.append(", aVS[0]=" + getAxisValueSets()[0].getClass());
sb.append(", aVS[0]=");
sb.append(getAxisValueSets()[0].getClass());
sb.append(", aVS[0][0]=");
sb.append(getAxisValueSets()[0].iterator().next().getClass());
}
Expand Down
36 changes: 23 additions & 13 deletions src/main/mondrian/rolap/agg/DenseDoubleSegmentDataset.java
Expand Up @@ -15,8 +15,9 @@

import mondrian.rolap.CellKey;
import mondrian.rolap.SqlStatement;
import mondrian.util.Pair;

import java.util.SortedSet;
import java.util.*;

/**
* Implementation of {@link mondrian.rolap.agg.DenseSegmentDataset} that stores
Expand All @@ -29,14 +30,27 @@ class DenseDoubleSegmentDataset extends DenseNativeSegmentDataset {
final double[] values; // length == m[0] * ... * m[axes.length-1]

/**
* Creates a DenseSegmentDataset.
* Creates a DenseDoubleSegmentDataset.
*
* @param axes Segment axes, containing actual column values
* @param size Number of coordinates
*/
DenseDoubleSegmentDataset(SegmentAxis[] axes, int size) {
super(axes, size);
this.values = new double[size];
this(axes, new double[size], new BitSet(size));
}

/**
* Creates a populated DenseDoubleSegmentDataset.
*
* @param axes Segment axes, containing actual column values
* @param values Cell values; not copied
* @param nullIndicators Null indicators
*/
DenseDoubleSegmentDataset(
SegmentAxis[] axes, double[] values, BitSet nullIndicators)
{
super(axes, nullIndicators);
this.values = values;
}

public double getDouble(CellKey key) {
Expand Down Expand Up @@ -92,16 +106,12 @@ protected int getSize() {
}

public SegmentBody createSegmentBody(
SortedSet<Comparable<?>>[] axisValueSets,
boolean[] nullAxisFlags)
List<Pair<SortedSet<Comparable<?>>, Boolean>> axes)
{
return
new DenseDoubleSegmentBody(
nullIndicators,
values,
getSize(),
axisValueSets,
nullAxisFlags);
return new DenseDoubleSegmentBody(
nullIndicators,
values,
axes);
}
}

Expand Down
41 changes: 21 additions & 20 deletions src/main/mondrian/rolap/agg/DenseIntSegmentBody.java
Expand Up @@ -9,8 +9,9 @@
*/
package mondrian.rolap.agg;

import java.util.BitSet;
import java.util.SortedSet;
import mondrian.util.Pair;

import java.util.*;

/**
* Implementation of a segment body which stores the data inside
Expand All @@ -21,35 +22,35 @@
*/
class DenseIntSegmentBody extends AbstractSegmentBody {
private static final long serialVersionUID = 5391233622968115488L;
final int[] data;
private final int size;

private final int[] values;
private final BitSet nullIndicators;

/**
* Creates a DenseIntSegmentBody.
*
* <p>Stores the given array of cell values and null indicators; caller must
* not modify them afterwards.</p>
*
* @param nullIndicators Null indicators
* @param values Cell values
* @param axes Axes
*/
DenseIntSegmentBody(
BitSet nullIndicators,
int[] dataToSave,
int size,
SortedSet<Comparable<?>>[] axisValueSets,
boolean[] nullAxisFlags)
int[] values,
List<Pair<SortedSet<Comparable<?>>, Boolean>> axes)
{
super(axisValueSets, nullAxisFlags);
this.size = size;
this.data = new int[size];
System.arraycopy(dataToSave, 0, data, 0, size);
this.nullIndicators = new BitSet(nullIndicators.length());
this.nullIndicators.or(nullIndicators);
super(axes);
this.values = values;
this.nullIndicators = nullIndicators;
}

public SegmentDataset createSegmentDataset(
Segment segment,
SegmentAxis[] axes)
{
DenseIntSegmentDataset ds =
new DenseIntSegmentDataset(axes, this.size);
System.arraycopy(data, 0, ds.values, 0, this.size);
ds.nullIndicators.clear();
ds.nullIndicators.or(nullIndicators);
return ds;
return new DenseIntSegmentDataset(axes, values, nullIndicators);
}
}

Expand Down
36 changes: 24 additions & 12 deletions src/main/mondrian/rolap/agg/DenseIntSegmentDataset.java
Expand Up @@ -14,8 +14,9 @@

import mondrian.rolap.CellKey;
import mondrian.rolap.SqlStatement;
import mondrian.util.Pair;

import java.util.SortedSet;
import java.util.*;

/**
* Implementation of {@link DenseSegmentDataset} that stores
Expand All @@ -38,8 +39,23 @@ class DenseIntSegmentDataset extends DenseNativeSegmentDataset {
* @param size Number of coordinates
*/
DenseIntSegmentDataset(SegmentAxis[] axes, int size) {
super(axes, size);
this.values = new int[size];
this(axes, new int[size], new BitSet(size));
}

/**
* Creates a populated DenseIntSegmentDataset.
*
* @param axes Segment axes, containing actual column values
* @param values Cell values; not copied
* @param nullIndicators Null indicators
*/
DenseIntSegmentDataset(
SegmentAxis[] axes,
int[] values,
BitSet nullIndicators)
{
super(axes, nullIndicators);
this.values = values;
}

public int getInt(CellKey key) {
Expand Down Expand Up @@ -105,16 +121,12 @@ protected int getSize() {
}

public SegmentBody createSegmentBody(
SortedSet<Comparable<?>>[] axisValueSets,
boolean[] nullAxisFlags)
List<Pair<SortedSet<Comparable<?>>, Boolean>> axes)
{
return
new DenseIntSegmentBody(
nullIndicators,
values,
getSize(),
axisValueSets,
nullAxisFlags);
return new DenseIntSegmentBody(
nullIndicators,
values,
axes);
}
}

Expand Down
10 changes: 6 additions & 4 deletions src/main/mondrian/rolap/agg/DenseNativeSegmentDataset.java
Expand Up @@ -30,12 +30,14 @@ abstract class DenseNativeSegmentDataset extends DenseSegmentDataset {
* Creates a DenseNativeSegmentDataset.
*
* @param axes Segment axes, containing actual column values
* @param size Number of coordinates
* @param nullIndicators Null indicators
*/
DenseNativeSegmentDataset(SegmentAxis[] axes, int size) {
DenseNativeSegmentDataset(
SegmentAxis[] axes,
BitSet nullIndicators)
{
super(axes);
this.nullIndicators = new BitSet(size);
this.nullIndicators.set(0, size, false);
this.nullIndicators = nullIndicators;
}

public boolean isNull(CellKey key) {
Expand Down

0 comments on commit 4e8c054

Please sign in to comment.