Skip to content

Commit

Permalink
MONDRIAN: Fixes a Javadoc failure.
Browse files Browse the repository at this point in the history
Refactors the RolapResultShepherd into a MondrianServer specific instance instead of a static class.

Modifies the Evaluation and RolapResultShepherd classes so that they use the correct methods when canceling and closing SQL statements according to the Java compliance level of the runtime environment.

Refactored the API between Execution and RolapResultShepherd so that threads are managed more simply and the tasks get canceled on the background.

Refactors the RolapResultShepherd shepherding thread into a simple java.util.Timer.

Adds a limiter to the number of threads allowed by each MondrianServer instance.

Adds a conditional to prevent users from shutting down the default MondrianServer.

Adds a check to prevent a shutdown MondrianServer instance from being used.

Adds the concept of SqlState to the Execution class so it can determine when to cleanup the opened SQL statements.

[git-p4: depot-paths = "//open/mondrian/": change = 14747]
  • Loading branch information
lucboudreau committed Nov 8, 2011
1 parent f78eb79 commit a5f37b0
Show file tree
Hide file tree
Showing 7 changed files with 281 additions and 110 deletions.
10 changes: 10 additions & 0 deletions src/main/mondrian/olap/MondrianProperties.xml
Expand Up @@ -980,6 +980,16 @@ as expected. Defaults to 1000ms.</p>
<Type>int</Type>
<Default>1000</Default>
</PropertyDefinition>
<PropertyDefinition>
<Name>RolapConnectionShepherdNbThreads</Name>
<Path>mondrian.rolap.maxQueryThreads</Path>
<Description>
<p>Maximum number of user threads per Mondrian server instance.
Defaults to 10.</p>
</Description>
<Type>int</Type>
<Default>10</Default>
</PropertyDefinition>
<PropertyDefinition>
<Name>IgnoreInvalidMembers</Name>
<Path>mondrian.rolap.ignoreInvalidMembers</Path>
Expand Down
3 changes: 3 additions & 0 deletions src/main/mondrian/olap/MondrianServer.java
Expand Up @@ -10,6 +10,7 @@
package mondrian.olap;

import mondrian.rolap.RolapConnection;
import mondrian.rolap.RolapResultShepherd;
import mondrian.rolap.agg.AggregationManager;
import mondrian.server.*;
import mondrian.server.monitor.Monitor;
Expand Down Expand Up @@ -113,6 +114,8 @@ public MondrianVersion getVersion() {
*/
public abstract List<String> getKeywords();

public abstract RolapResultShepherd getResultShepherd();

/**
* Returns the lock box that can be used to pass objects via their string
* key.
Expand Down
4 changes: 2 additions & 2 deletions src/main/mondrian/rolap/RolapConnection.java
Expand Up @@ -613,7 +613,7 @@ public Result execute(Query query) {
*/
public Result execute(final Execution execution) {
return
RolapResultShepherd
server.getResultShepherd()
.shepherdExecution(
execution,
new Callable<Result>() {
Expand Down Expand Up @@ -660,11 +660,11 @@ public void memoryUsageNotification(long used, long max) {
RolapUtil.MDX_LOGGER.debug(currId + ": " + Util.unparse(query));
}

statement.start(execution);
final Locus locus = new Locus(execution, null, "Loading cells");
Locus.push(locus);
Result result;
try {
statement.start(execution);
result = new RolapResult(execution, true);
int i = 0;
for (QueryAxis axis : query.getAxes()) {
Expand Down
91 changes: 54 additions & 37 deletions src/main/mondrian/rolap/RolapResultShepherd.java
Expand Up @@ -14,6 +14,8 @@
import mondrian.util.Pair;

import java.util.List;
import java.util.Timer;
import java.util.TimerTask;
import java.util.concurrent.*;

/**
Expand All @@ -30,55 +32,67 @@
* @author LBoudreau
* @version $Id$
*/
class RolapResultShepherd {
public class RolapResultShepherd {

/**
* An executor service used for both the shepherd thread and the
* Execution objects.
*/
private static final ExecutorService executor =
Util.getExecutorService("mondrian.rolap.RolapResultShepherd$executor");
private final ExecutorService executor =
Util.getExecutorService(
MondrianProperties.instance()
.RolapConnectionShepherdNbThreads.get(),
"mondrian.rolap.RolapResultShepherd$executor");

/**
* List of tasks that should be monitored by the shepherd thread.
*/
private static final List<Pair<FutureTask<Result>, Execution>> tasks =
new CopyOnWriteArrayList<Pair<FutureTask<Result>,Execution>>();

/*
* Fire up the shepherd thread.
*/
static {
executor.execute(
new Runnable() {
private final int delay =
MondrianProperties.instance()
.RolapConnectionShepherdThreadPollingInterval.get();
public void run() {
while (true) {
for (Pair<FutureTask<Result>, Execution> task
: tasks)
{
if (task.left.isDone()) {
continue;
}
if (task.right.isCancelOrTimeout()) {
// First, free the user thread.
task.left.cancel(true);
// Now try a graceful shutdown of the Execution
// instance
task.right.cleanStatements();
private final Timer timer =
new Timer("mondrian.rolap.RolapResultShepherd#timer", true);

public RolapResultShepherd() {
timer.scheduleAtFixedRate(
new TimerTask() {
public void run() {
for (final Pair<FutureTask<Result>, Execution> task
: tasks)
{
if (task.left.isDone()) {
tasks.remove(task);
continue;
}
if (task.right.isCancelOrTimeout()) {
// Remove it from the list so that we know
// it was cleaned once.
tasks.remove(task);
// Cancel the FutureTask for which
// the user thread awaits. The user
// thread will call
// Execution.checkCancelOrTimeout
// later and take care of sending
// an exception on the user thread.
task.left.cancel(true);
// The cleanup operation can be done async.
// Let's not interrupt this task.
executor.submit(
new Runnable() {
public void run() {
// Now cleanup the statement on
// this thread.
task.right.cancelSqlStatements();
}
}
try {
Thread.sleep(delay);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
return;
}
});
}
}
});
}
},
MondrianProperties.instance()
.RolapConnectionShepherdThreadPollingInterval.get(),
MondrianProperties.instance()
.RolapConnectionShepherdThreadPollingInterval.get());
}

/**
Expand All @@ -99,7 +113,7 @@ public void run() {
* @return A Result object, as supplied by the Callable passed as a
* parameter.
*/
static Result shepherdExecution(
public Result shepherdExecution(
Execution execution,
Callable<Result> callable)
{
Expand Down Expand Up @@ -142,10 +156,13 @@ static Result shepherdExecution(
// Since we got here, this means that the exception was
// something else. Just wrap/throw.
throw new MondrianException(e);
} finally {
tasks.remove(pair);
}
}

public void shutdown() {
this.timer.cancel();
this.executor.shutdown();
}
}

// End RolapResultShepherd.java
Expand Down

0 comments on commit a5f37b0

Please sign in to comment.