Skip to content

Commit

Permalink
MONDRIAN: Data source change listener plugin. Using this plugin, mond…
Browse files Browse the repository at this point in the history
…rian can check if the data source has changed and flush the cache. Currently only flushing of hierarchy cache is provided.

[git-p4: depot-paths = "//open/mondrian/": change = 8480]
  • Loading branch information
Bart Pappyn committed Jan 8, 2007
1 parent 2ade390 commit f04a8c3
Show file tree
Hide file tree
Showing 11 changed files with 366 additions and 11 deletions.
59 changes: 58 additions & 1 deletion doc/schema.html
Expand Up @@ -72,6 +72,7 @@
<li><a href="#Member_formatter">Member formatter</a></li>
<li><a href="#Property_formatter">Property formatter</a></li>
<li><a href="#Schema_processor">Schema processor</a></li>
<li><a href="#DataSource_change_listener">Data source change listener</a></li>
</ol>
</li>
<li><a href="#I18n">Internationalization</a></li>
Expand Down Expand Up @@ -1538,7 +1539,8 @@ <h3>6. Plug-ins<a name="Plugins">&nbsp;</a></h3>
user-defined functions</a>; <a href="#Cell_formatter">
cell</a>, <a href="#Member_formatter">member</a> and
<a href="#Property_formatter">property formatters</a>;
and <a href="#Schema_processor">dynamic schema processors</a>.
<a href="#Schema_processor">dynamic schema processors</a> and
<a href="#DataSource_change_listener">data source change listeners</a>.
There is incomplete support for <a href="#Member_reader">member
readers</a> and <a href="#Cell_reader">cell readers</a>,
and in future we may support pluggable
Expand Down Expand Up @@ -1932,6 +1934,61 @@ <h1>6.7 Schema processor<a name="Schema_processor">&nbsp;</a></h1>
<p>Dynamic schemas are a very powerful construct. As we shall see, an important application
for them is <a href="#I18n">internationalization</a>.</p>

<!--
#########################################
## 6.8 Data source change listener #####
######################################### -->
<h1>6.8 Data source change listener<a name="DataSource_change_listener">&nbsp;</a></h1>

<p>A data source change listener implements the <code>
<a href="api/mondrian/spi/DataSourceChangeListener.html">
mondrian.spi.DataSourceChangeListener</a></code> interface. It is specified as part of
the connection string, like this:</p>

<blockquote>
<code>Jdbc=jdbc:odbc:MondrianFoodMart; JdbcUser=ziggy;
JdbcPassword=stardust; DataSourceChangeListener=com.acme.MyChangeListener;</code>
</blockquote>

<p>Everytime mondrian has to decide whether it will use data from cache, it
will call the change listener. When the change listener tells mondrian
the datasource has changed for a dimension, cube, ... then mondrian will
flush the cache and read from database again.</p>

<p>This class should be called in mondrian before any data is read, so
even before cache is build. This way, the plugin is able to register
the first timestamp mondrian tries to read the datasource.</p>

<p>For now, only flushing of the hierarchy cache is implemented.</p>

<p>Here is an example of a data source change listener plugin class :
<blockquote>
<code>&nbsp;package com.acme;<br>
<br>
&nbsp;public class MyChangeListener extends DataSourceChangeListenerImpl {<br>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;public MyChangeListener() {<br>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}<br>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;public synchronized boolean isHierarchyChanged(RolapHierarchy hierarchy) {<br><br>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;// Since this function is called many times, it is a good idea to not check the database every time<br>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;// And use some sort of time interval...<br><br>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;// Get name of the table (does not work if based on view)<br>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;String tableName = getTableName(hierarchy);<br>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;Connection jdbcConnection = null;<br>
&nbsp;<br>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;DataSource dataSource =<br>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;hierarchy.getRolapSchema().getInternalConnection().getDataSource();<br>
&nbsp;<br>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;try {<br>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;jdbcConnection = dataSource.getConnection();<br>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;if (jdbcConnection != null) {<br>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;// Check database whether hierarchy data source has changed<br>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;// ...<br>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}<br>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}<br>
&nbsp;&nbsp;&nbsp;&nbsp;&nbsp;}<br>
&nbsp;}<br>
</blockquote>

<!--
#################################
## 7. Internationalization #####
Expand Down
3 changes: 3 additions & 0 deletions src/main/mondrian/rolap/CacheMemberReader.java
Expand Up @@ -78,6 +78,9 @@ public Object makeKey(RolapMember parent, Object key) {
public RolapMember getMember(Object key) {
return mapKeyToMember.get(key);
}
public RolapMember getMember(Object key, boolean mustCheckCacheStatus) {
return mapKeyToMember.get(key);
}

// implement MemberCache
public Object putMember(Object key, RolapMember value) {
Expand Down
7 changes: 7 additions & 0 deletions src/main/mondrian/rolap/MemberCache.java
Expand Up @@ -40,6 +40,13 @@ interface MemberCache {
*/
RolapMember getMember(Object key);

/**
* Retrieves the {@link RolapMember} with a given key (created by
* {@link #makeKey}). It is possible to disable the checking of the
* cache status with this call.
*/
RolapMember getMember(Object key, boolean mustCheckCacheStatus);

/**
* Replaces the {@link RolapMember} with a given key (created by {@link
* #makeKey}). Returns the previous member with that key, or null.
Expand Down
11 changes: 9 additions & 2 deletions src/main/mondrian/rolap/RolapConnectionProperties.java
Expand Up @@ -121,8 +121,15 @@ public enum RolapConnectionProperties {
* "en_US", "hu". If Locale is not specified, then the name of system's
* default will be used, as per {@link java.util.Locale#getDefault()}.
*/
Locale;

Locale,

/**
* The name of a class implementing the
* {@link mondrian.spi.DataSourceChangeListener} interface.
* A data source change listener is used to flush the cache of
* mondrian every time the datasource is changed.
*/
DataSourceChangeListener;
/**
* Any property beginning with this value will be added to the
* JDBC connection properties, after removing this prefix. This
Expand Down
6 changes: 3 additions & 3 deletions src/main/mondrian/rolap/RolapHierarchy.java
Expand Up @@ -34,7 +34,7 @@
* @since 10 August, 2001
* @version $Id$
*/
class RolapHierarchy extends HierarchyBase {
public class RolapHierarchy extends HierarchyBase {

private static final Logger LOGGER = Logger.getLogger(RolapHierarchy.class);

Expand Down Expand Up @@ -353,11 +353,11 @@ private static boolean tableExists(String tableName,
return false;
}

RolapSchema getRolapSchema() {
public RolapSchema getRolapSchema() {
return (RolapSchema) dimension.getSchema();
}

MondrianDef.Relation getRelation() {
public MondrianDef.Relation getRelation() {
return relation;
}

Expand Down
64 changes: 64 additions & 0 deletions src/main/mondrian/rolap/RolapSchema.java
Expand Up @@ -54,6 +54,7 @@
import mondrian.rolap.aggmatcher.JdbcSchema;
import mondrian.rolap.sql.SqlQuery;
import mondrian.spi.UserDefinedFunction;
import mondrian.spi.DataSourceChangeListener;

import org.apache.log4j.Logger;
import org.apache.commons.vfs.*;
Expand Down Expand Up @@ -149,6 +150,8 @@ public class RolapSchema implements Schema {
new ArrayList<RolapSchemaParameter >();

private Date schemaLoadDate;

private DataSourceChangeListener dataSourceChangeListener;

/**
* This is ONLY called by other constructors (and MUST be called
Expand Down Expand Up @@ -176,6 +179,7 @@ private RolapSchema(
this.mapNameToCube = new HashMap<String, RolapCube>();
this.mapNameToRole = new HashMap<String, Role>();
this.aggTableManager = new AggTableManager(this);
this.dataSourceChangeListener = createDataSourceChangeListener(connectInfo);
}


Expand Down Expand Up @@ -1426,6 +1430,49 @@ public Cube getCube() {
}
};
}

/**
* Creates a {@link DataSourceChangeListener} with which to detect changes to datasources.
*/
private DataSourceChangeListener createDataSourceChangeListener(
Util.PropertyList connectInfo) {

DataSourceChangeListener changeListener = null;

// If CatalogContent is specified in the connect string, ignore
// everything else. In particular, ignore the dynamic schema
// processor.
String dataSourceChangeListenerStr = connectInfo.get(
RolapConnectionProperties.DataSourceChangeListener.name());

if ( ! Util.isEmpty(dataSourceChangeListenerStr)) {
try {

Class<?> clazz = Class.forName(dataSourceChangeListenerStr);
Constructor<?> constructor = clazz.getConstructor();
changeListener = (DataSourceChangeListener)constructor.newInstance();

/* final Class<DataSourceChangeListener> clazz =
(Class<DataSourceChangeListener>) Class.forName(dataSourceChangeListenerStr);
final Constructor<DataSourceChangeListener> ctor =
clazz.getConstructor();
changeListener = ctor.newInstance(); */

} catch (Exception e) {
throw Util.newError(e, "loading DataSourceChangeListener "
+ dataSourceChangeListenerStr);
}

if (LOGGER.isDebugEnabled()) {
String msg = "RolapSchema.createDataSourceChangeListener: create datasource change listener \"" +
dataSourceChangeListenerStr;

LOGGER.debug(msg);
}
}
return changeListener;
}


/**
* Connection for purposes of parsing and validation. Careful! It won't
Expand Down Expand Up @@ -1553,6 +1600,23 @@ public synchronized int getNextDimensionOrdinal() {
return nextDimensionOrdinal++;
}


/**
* @return Returns the dataSourceChangeListener.
*/
public DataSourceChangeListener getDataSourceChangeListener() {
return dataSourceChangeListener;
}


/**
* @param dataSourceChangeListener The dataSourceChangeListener to set.
*/
public void setDataSourceChangeListener(
DataSourceChangeListener dataSourceChangeListener) {
this.dataSourceChangeListener = dataSourceChangeListener;
}

}

// End RolapSchema.java
49 changes: 46 additions & 3 deletions src/main/mondrian/rolap/SmartMemberReader.java
Expand Up @@ -20,6 +20,8 @@
import mondrian.rolap.sql.MemberChildrenConstraint;
import mondrian.rolap.sql.TupleConstraint;

import mondrian.spi.DataSourceChangeListener;

import java.util.*;

/**
Expand Down Expand Up @@ -54,13 +56,15 @@ public class SmartMemberReader implements MemberReader, MemberCache {
final SmartMemberListCache<RolapMember, List<RolapMember>> mapMemberToChildren;

/** a cache for alle members to ensure uniqueness */
final SmartCache<Object, RolapMember> mapKeyToMember;
SmartCache<Object, RolapMember> mapKeyToMember;

/** maps a level to its members */
final SmartMemberListCache<RolapLevel, List<RolapMember>> mapLevelToMembers;

private List<RolapMember> rootMembers;
DataSourceChangeListener changeListener;

private List<RolapMember> rootMembers;

SmartMemberReader(MemberReader source) {
this.source = source;
if (!source.setCache(this)) {
Expand All @@ -73,6 +77,13 @@ public class SmartMemberReader implements MemberReader, MemberCache {
this.mapKeyToMember = new SoftSmartCache<Object, RolapMember>();
this.mapMemberToChildren =
new SmartMemberListCache<RolapMember, List<RolapMember>>();

if (source.getHierarchy() != null) {
changeListener = source.getHierarchy().getRolapSchema().getDataSourceChangeListener();
}
else {
changeListener = null;
}
}

// implement MemberReader
Expand All @@ -86,6 +97,18 @@ public boolean setCache(MemberCache cache) {
// own cache
return false;
}

private synchronized void checkCacheStatus() {

if (changeListener != null) {
if (changeListener.isHierarchyChanged(getHierarchy())) {
/* Flush the cache */
mapMemberToChildren.clear();
mapKeyToMember.clear();
mapLevelToMembers.clear();
}
}
}

// implement MemberCache
public Object makeKey(RolapMember parent, Object key) {
Expand All @@ -95,8 +118,20 @@ public Object makeKey(RolapMember parent, Object key) {
// implement MemberCache
// synchronization: Must synchronize, because uses mapKeyToMember
public synchronized RolapMember getMember(Object key) {
return getMember(key, true);
}

// implement MemberCache
// synchronization: Must synchronize, because uses mapKeyToMember
public synchronized RolapMember getMember(Object key, boolean mustCheckCacheStatus) {

if (mustCheckCacheStatus == true) {
checkCacheStatus();
}

return mapKeyToMember.get(key);
}


// implement MemberCache
// synchronization: Must synchronize, because modifies mapKeyToMember
Expand Down Expand Up @@ -143,10 +178,15 @@ public synchronized List<RolapMember> getMembersInLevel(
int endOrdinal,
TupleConstraint constraint)
{
List<RolapMember> members = mapLevelToMembers.get(level, constraint);
List<RolapMember> members = null;

checkCacheStatus();

members = mapLevelToMembers.get(level, constraint);
if (members != null) {
return members;
}

members = source.getMembersInLevel(level, startOrdinal, endOrdinal, constraint);
mapLevelToMembers.put(level, constraint, members);
return members;
Expand Down Expand Up @@ -183,6 +223,9 @@ public synchronized void getMemberChildren(
List<RolapMember> parentMembers,
List<RolapMember> children,
MemberChildrenConstraint constraint) {

checkCacheStatus();

List<RolapMember> missed = new ArrayList<RolapMember>();
for (RolapMember parentMember : parentMembers) {
List<RolapMember> list =
Expand Down
4 changes: 3 additions & 1 deletion src/main/mondrian/rolap/SqlMemberSource.java
Expand Up @@ -727,6 +727,7 @@ private void getMemberChildren(

int limit = MondrianProperties.instance().ResultLimit.get();
int nFetch = 0;
boolean checkCacheStatus=true;

while (resultSet.next()) {

Expand All @@ -747,7 +748,8 @@ private void getMemberChildren(
captionValue = null;
}
Object key = cache.makeKey(parentMember, value);
RolapMember member = cache.getMember(key);
RolapMember member = cache.getMember(key, checkCacheStatus);
checkCacheStatus = false; /* Only check the first time */
if (member == null) {
member = makeMember(
parentMember, childLevel, value, captionValue,
Expand Down
4 changes: 3 additions & 1 deletion src/main/mondrian/rolap/SqlTupleReader.java
Expand Up @@ -128,6 +128,7 @@ private int internalAddRow(ResultSet resultSet, int column) throws SQLException
if (currMember != null) {
member = currMember;
} else {
boolean checkCacheStatus=true;
for (int i = 0; i <= levelDepth; i++) {
RolapLevel childLevel = levels[i];
if (childLevel.isAll()) {
Expand All @@ -146,7 +147,8 @@ private int internalAddRow(ResultSet resultSet, int column) throws SQLException
}
RolapMember parentMember = member;
Object key = cache.makeKey(parentMember, value);
member = cache.getMember(key);
member = cache.getMember(key, checkCacheStatus);
checkCacheStatus = false; /* Only check the first time */
if (member == null) {
member = memberBuilder.makeMember(
parentMember, childLevel, value, captionValue,
Expand Down

0 comments on commit f04a8c3

Please sign in to comment.