Skip to content

Commit

Permalink
MONDRIAN: Restructure thread synchronization in Segment.java to reduc…
Browse files Browse the repository at this point in the history
…e lock contention (fixes bug MONDRIAN-577).

[git-p4: depot-paths = "//open/mondrian/": change = 12954]
  • Loading branch information
Eric McDermid committed Jul 17, 2009
1 parent 1c00d1c commit f3c7f58
Showing 1 changed file with 135 additions and 43 deletions.
178 changes: 135 additions & 43 deletions src/main/mondrian/rolap/agg/Segment.java
Expand Up @@ -15,8 +15,9 @@
import mondrian.olap.*;
import mondrian.rolap.*;

import java.sql.*;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.io.PrintWriter;

import org.apache.log4j.Logger;
Expand Down Expand Up @@ -81,11 +82,48 @@ class Segment {
final RolapStar.Measure measure;

final Aggregation.Axis[] axes;

/**
* <p><code>data</code> holds a reference to the <code>SegmentDataset</code>
* that contains the underlying cell values.</p>
*
* <p>Since the <code>SegmentDataset</code> is loaded and assigned after
* <code>Segment</code> is constructed, threadsafe access to it is only
* guaranteed if the access is guarded.<p/>
*
* <p>Access which does not depend on <code>data</code> already having been
* loaded should be guarded by obtaining either a read or write lock on
* <code>stateLock</code>, as appropriate.</p>
*
* <p>Access that should not proceed until the <code>data</code> reference
* has been loaded should be guarded using the <code>dataGate</code> latch.
* This is typically accomplished by calling <code>waitUntilLoaded()</code>,
* which will block until the latch is released and throw an error if
* <code>data</code> failed to load.</p>
*
* <p>Once set, the value of <code>data</code> is presumed to be invariant
* and should never be reset, nor should the contents be modified. Thus,
* for a given thread, any read access to data which comes after
* <code>dataGate.await()</code> (or, by extension,
* <code>waitUntilLoaded</code> will be threadsafe.</p>
*/
private SegmentDataset data;
private final CellKey cellKey; // workspace
private final CountDownLatch dataGate = new CountDownLatch(1);

/** State of the segment (loading, ready, etc.). */
private State state;

/**
* <p><code>state</code> == state of the segment (loading, ready, etc).
* Since it correlates to the value of the <code>data</code> reference
* and may be accessed from multiple threads, access to it
* is guarded by stateLock.</p>
*
* <p>The initial value of state is Loading. It may then be set to
* either Ready or Failed. Ready and Failed are both terminal
* states; once set to either, state may not be reset.</p>
*/
private State state = State.Loading;
private final ReentrantReadWriteLock stateLock =
new ReentrantReadWriteLock();

/**
* List of regions to ignore when reading this segment. This list is
Expand Down Expand Up @@ -115,8 +153,6 @@ class Segment {
this.aggregation = aggregation;
this.measure = measure;
this.axes = axes;
this.cellKey = CellKey.Generator.newCellKey(axes.length);
this.state = State.Loading;
this.excludedRegions = excludedRegions;
for (Region region : excludedRegions) {
assert region.getPredicates().size() == axes.length;
Expand All @@ -127,44 +163,75 @@ class Segment {
* Sets the data, and notifies any threads which are blocked in
* {@link #waitUntilLoaded}.
*/
synchronized void setData(
void setData(
SegmentDataset data,
RolapAggregationManager.PinSet pinnedSegments)
{
Util.assertTrue(this.data == null);
Util.assertTrue(this.state == State.Loading);
stateLock.writeLock().lock(); // need exclusive write access to state
try {
Util.assertTrue(this.data == null);
Util.assertTrue(this.state == State.Loading);

this.data = data;
this.state = State.Ready;
this.data = data;
this.state = State.Ready;
} finally {
stateLock.writeLock().unlock(); // always release state lock
}

notifyAll();
dataGate.countDown(); // allow data reader threads to proceed
}

/**
* If this segment is still loading, signals that it failed to load, and
* notifies any threads which are blocked in {@link #waitUntilLoaded}.
*/
synchronized void setFailIfStillLoading() {
switch (state) {
case Loading:
Util.assertTrue(this.data == null);
this.state = State.Failed;
notifyAll();
break;
case Ready:
// The segment loaded just fine.
break;
default:
throw Util.badValue(state);
void setFailIfStillLoading() {
stateLock.writeLock().lock(); // need exclusive write access to state
try {
switch (state) {
case Loading:
Util.assertTrue(this.data == null);
this.state = State.Failed;
break;
case Ready:
// The segment loaded just fine.
break;
default:
throw Util.badValue(state);
}
} finally {
stateLock.writeLock().unlock(); // always release state lock
if (this.state == State.Failed) {
dataGate.countDown(); // allow data reader threads to proceed
}
}
}

/**
* Compares internal <code>state</code> variable to a passed-in value
* in a threadsafe way using the <code>stateLock</code> read lock.
*
* @param value The State value to which <code>state</code> should be
* compared.
* @return True if states match, false otherwise
*/
private boolean compareState(State value) {
boolean retval = false;
stateLock.readLock().lock();
try {
retval = (state == value);
} finally {
stateLock.readLock().unlock();
}
return (retval);
}

public boolean isReady() {
return (state == State.Ready);
return (compareState(State.Ready));
}

boolean isFailed() {
return (state == State.Failed);
return (compareState(State.Failed));
}

private void makeDescription(StringBuilder buf, boolean values) {
Expand Down Expand Up @@ -243,9 +310,10 @@ public String toString() {
* </ul></p>
*
*/
synchronized Object getCellValue(Object[] keys) {
Object getCellValue(Object[] keys) {
assert keys.length == axes.length;
int missed = 0;
CellKey cellKey = CellKey.Generator.newCellKey(axes.length);
for (int i = 0; i < keys.length; i++) {
Object key = keys[i];
int offset = axes[i].getOffset(key);
Expand All @@ -272,6 +340,9 @@ synchronized Object getCellValue(Object[] keys) {
// or more of its keys does have any values
return Util.nullValue;
} else {
// waitUntilLoaded() ensures data exists, and makes
// following read threadsafe
waitUntilLoaded();
Object o = data.get(cellKey);
if (o == null) {
o = Util.nullValue;
Expand Down Expand Up @@ -311,28 +382,33 @@ private boolean isExcluded(Object[] keys) {
* Blocks until this segment has finished loading; if this segment has
* already loaded, returns immediately.
*/
public synchronized void waitUntilLoaded() {
public void waitUntilLoaded() {
if (isLoading()) {
try {
LOGGER.debug("Waiting on " + printSegmentHeaderInfo(","));
wait();
dataGate.await();

stateLock.readLock().lock();
switch (state) {
case Ready:
return; // excellent!
case Failed:
throw Util.newError(
"Pending segment failed to load: "
+ toString());
default:
throw Util.badValue(state);
}
} catch (InterruptedException e) {
}
switch (state) {
case Ready:
return; // excellent!
case Failed:
throw Util.newError(
"Pending segment failed to load: "
+ toString());
default:
throw Util.badValue(state);
//ignore
} finally {
stateLock.readLock().unlock();
}
}
}

private boolean isLoading() {
return state == State.Loading;
return (compareState(State.Loading));
}

/**
Expand Down Expand Up @@ -453,6 +529,9 @@ Segment createSubSegment(
// be dense and VERY occasionally a subset of a relatively dense dataset
// will be sparse.)
SegmentDataset newData;

// isReady() is guarded and ensures visibility of data
Util.assertTrue(isReady());
if (data instanceof SparseSegmentDataset) {
newData =
new SparseSegmentDataset(
Expand Down Expand Up @@ -498,11 +577,24 @@ Segment createSubSegment(
}

/**
* Returns this Segment's dataset, or null if the data has not yet been
* loaded.
* <p>Returns this Segment's dataset, or null if the data has not yet been
* loaded.</p>
*
* <p>WARNING: the returned SegmentDataset reference should not be modified;
* it is assumed to be invariant.</p>
*
* @return The <code>data</code> reference if it has been loaded,
* null otherwise.
*/
SegmentDataset getData() {
return data;
//Review: letting a non-threadsafe object reference escape
//is inherently unsafe. Consider returning a copy.
if (isReady()) {
// isReady() is guarded, and ensures visibility of data
return data;
} else {
return null;
}
}

/**
Expand Down

0 comments on commit f3c7f58

Please sign in to comment.