Skip to content

Commit

Permalink
updating doc ...
Browse files Browse the repository at this point in the history
  • Loading branch information
chenson42 committed Apr 19, 2012
1 parent faed433 commit 4a3734b
Show file tree
Hide file tree
Showing 14 changed files with 94 additions and 216 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -30,7 +30,7 @@
</para>
<para>
More complex conflict resolution strategies can be accomplished by using the
<literal>IDataLoaderFilter</literal> extension point which has access to both
<literal>IDatabaseWriterFilter</literal> extension point which has access to both
old and new data.
</para>
</section>
Expand Down
123 changes: 42 additions & 81 deletions symmetric/symmetric-assemble/src/docbook/extensions.xml
Original file line number Diff line number Diff line change
Expand Up @@ -6,36 +6,36 @@
xmlns:ns="http://docbook.org/ns/docbook"
xmlns:mml="http://www.w3.org/1998/Math/MathML"
xmlns:html="http://www.w3.org/1999/xhtml">
<title>Extending SymmetricDS</title>


<title>Extension Points</title>
<para>
SymmetricDS can be extended using it's plug-in architecture. A Java class that implements
the appropriate extension point interface, can implement custom logic and change the behavior
of SymmetricDS to suit special needs. All supported extension
points extend the <literal>IExtensionPoint</literal> interface. The available extension points are documented in the following sections.
</para>
<para>
SymmetricDS may be extended via a plug-in like architecture where extension point interfaces
may be implemented by a custom class and registered with the synchronization engine. All supported extension
points extend the IExtensionPoint interface. The currently available extension points are documented in the following sections.
When an engine starts up, the <literal>ExtensionPointManager</literal> searches a <ulink url="http://springframework.org">Spring Framework</ulink>
context for classes that implement the <literal>IExtensionPoint</literal> interface, then creates and registers
the class with the appropriate SymmetricDS component.
</para>
<para>
When the synchronization engine starts up, a Spring
post processor searches the Spring ApplicationContext for any registered classes which implement
IExtensionPoint. An IExtensionPoint designates whether it should be auto registered or not. If the extension point
is to be auto registered then the post processor registers the known interface with the appropriate service.
The <literal>ISymmetricEngineAware</literal> interface may be optionally implemented if the
extension point needs a handle to the <literal>ISymmetricEngine</literal> in order to
gain access to the database or a SymmetricDS service.
</para>
<para>
The INodeGroupExtensionPoint interface may be optionally implemented to designate that auto registered
extension points should only be auto registered with specific node groups.
The <literal>INodeGroupExtensionPoint</literal> interface may be optionally implemented to indicate that a registered
extension point should only be registered with specific node groups.
<programlisting><![CDATA[/**
* Only apply this extension point to the 'root' node group.
*/
public String[] getNodeGroupIdsToApplyTo() {
return new String[] { "root" };
}
]]></programlisting>
</para>
</para>
<para>
SymmetricDS will look for Spring configured extensions in the application Classpath by importing any Spring
XML configuration files found matching the following pattern: <literal>META-INF/services/symmetric-*-ext.xml</literal>. When packaged in
a jar file the <literal>META-INF</literal> directory should be at the root of the jar file. When packaged in a war file, the <literal>META-INF</literal> directory
should be in the <literal>WEB-INF/classes</literal> directory.
Extensions can be configured in the <literal>conf/symmetric-extensions.xml</literal> file.
</para>
<section id="extensions-parameter-filter">
<title>IParameterFilter</title>
Expand Down Expand Up @@ -69,66 +69,49 @@
</para>
</section>
<section id="extensions-data-loader-filter">
<title>IDataLoaderFilter</title>
<para>
Data can be filtered as it is loaded into the target database. It can also be
filtered when
it is extracted from the source database.

As data is loaded into the target database, a filter can change the
data in a column or save it somewhere else. It can also specify by the
<title>IDatabaseWriterFilter</title>
<para>
Data can be filtered or manipulated before it is loaded into the target database.
A filter can change the
data in a column, save it somewhere else or do something else with the data entirely.
It can also specify by the
return value of the function call that the data loader should continue on
and load the data (by returning true) or ignore it (by returning false). One
possible use of the filter might be to
possible use of the filter, for example, might be to
route credit card data to a secure database and blank it out as it loads
into a less-restricted reporting database.
</para>
<para>
An IDataLoaderContext is passed to each of the callback methods. A new
context is created for each synchronization. The context provides methods to
lookup column indexes by column name, get table meta data, and access to
old data if the <literal>sync_column_level</literal> flag is enabled. The context also provides a means
to share data during a synchronization between different rows of data that are
committed in a database transaction and are in the same channel. It does so by
providing a context cache which can be populated by the extension point.
A <literal>DataContext</literal> is passed to each of the callback methods. A new
context is created for each synchronization. The context provides a mechanism
to share data during the load of a batch between different rows of data that are
committed in a single database transaction.
</para>
<para>
Many times the IDataLoaderFilter will be combined with the IBatchListener. The
XmlPublisherFilter (in the <literal>org.jumpmind.symmetric.ext</literal> package) is a good example
of using the combination of the two extension points in order to create XML messages
to be published to JMS.
The filter also provide callback methods for the batch lifecycle. The <literal>DatabaseWriterFilterAdapter</literal>
may be used if not all methods are required.
</para>
<para>
A class implementing the IDataLoaderFilter interface is injected onto the
A class implementing the IDatabaseWriterFilter interface is injected onto the
DataLoaderService in order to receive callbacks when data is inserted,
updated, or deleted.

<programlisting><![CDATA[public MyFilter implements IDataLoaderFilter {
public boolean isAutoRegister() {
return true;
}
public boolean filterInsert(IDataLoaderContext context,
String[] columnValues) {
return true;
}
public boolean filterUpdate(IDataLoaderContext context,
String[] columnValues, String[] keyValues) {
return true;
}
public void filterDelete(IDataLoaderContext context,
String[] keyValues) {
<programlisting><![CDATA[public class MyFilter extends DatabaseWriterFilterAdapter {
@Override
public boolean beforeWrite(DataContext context, Table table, CsvData data) {
if (table.getName().equalsIgnoreCase("CREDIT_CARD_TENDER")
&& data.getDataEventType().equals(DataEventType.INSERT)) {
String[] parsedData = data.getParsedData(CsvData.ROW_DATA);
// blank out credit card number
parsedData[table.getColumnIndex("CREDIT_CARD_NUMBER")] = null;
}
return true;
}
}]]></programlisting>
</para>
<para>
The filter class is specified as a Spring-managed bean. A custom Spring XML file
is specified as follows in a jar at <literal>META-INF/services/symmetric-myfilter-ext.xml</literal>.
The filter class should be specified in <literal>conf/symmetric-extensions.xml</literal> as follows.

<programlisting><![CDATA[<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans"
Expand All @@ -144,20 +127,6 @@
</beans>]]></programlisting>
</para>
</section>
<section id="extensions-table-column-filter">
<title>ITableColumnFilter</title>
<para>
Implement this extension point to filter out specific columns from
use by the dataloader. Only one column filter may be added per target table.
</para>
</section>
<section id="extensions-batch-listener">
<title>IBatchListener</title>
<para>
This extension point is called whenever a batch has completed loading but before
the transaction has committed.
</para>
</section>
<section id="extensions-acknowledge-event-listener">
<title>IAcknowledgeEventListener</title>
<para>
Expand All @@ -173,14 +142,6 @@
happens at the point of extraction.
</para>
</section>
<section id="extensions-extractor-filter">
<title>IExtractorFilter</title>
<para>
This extension point is called after data has been extracted, but before it has been streamed. It
has the ability to inspect each row of data to take some action and indicate, if necessary, that the
row should not be streamed.
</para>
</section>
<section id="extensions-sync-url-extension">
<title>ISyncUrlExtension</title>
<para>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ private void recordChannelFlushNeeded(
}
}

private <R extends IDataReader, W extends IDataWriter> void recordTransformFlushNeeded(
private void recordTransformFlushNeeded(
DataContext context, Table table) {
if (isTransformFlushNeeded(table)) {
context.put(CTX_KEY_FLUSH_TRANSFORMS_NEEDED, true);
Expand Down Expand Up @@ -128,7 +128,7 @@ private boolean matchesTable(Table table, String tableSuffix) {
}

@Override
public <R extends IDataReader, W extends IDataWriter> void batchCommitted(
public void batchCommitted(
DataContext context) {
if (context.get(CTX_KEY_FLUSH_CHANNELS_NEEDED) != null) {
log.info("Channels flushed because new channels came through the data loader");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@ public class SchemaPerNodeDataLoaderFilter extends DatabaseWriterFilterAdapter {
private String schemaPrefix;

@Override
public <R extends IDataReader, W extends IDataWriter> boolean beforeWrite(
public boolean beforeWrite(
DataContext context, Table table, CsvData data) {
if (!table.getName().startsWith(tablePrefix)) {
Batch batch = context.getBatch();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ public String[] getNodeGroupIdsToApplyTo() {
}

@Override
public <R extends IDataReader, W extends IDataWriter> boolean beforeWrite(
public boolean beforeWrite(
DataContext context, Table table, CsvData data) {
numberOfTimesCalled++;
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,7 @@ public TestDataWriterFilter() {
}

@Override
public <R extends IDataReader, W extends IDataWriter> boolean beforeWrite(
public boolean beforeWrite(
DataContext context, Table table, CsvData data) {
numberOfTimesCalled++;
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -3,39 +3,30 @@
import org.jumpmind.db.model.Table;
import org.jumpmind.symmetric.io.data.CsvData;
import org.jumpmind.symmetric.io.data.DataContext;
import org.jumpmind.symmetric.io.data.IDataReader;
import org.jumpmind.symmetric.io.data.IDataWriter;

public class DatabaseWriterFilterAdapter implements IDatabaseWriterFilter {

public <R extends IDataReader, W extends IDataWriter> boolean beforeWrite(
DataContext context, Table table, CsvData data) {

public boolean beforeWrite(DataContext context, Table table, CsvData data) {
return true;
}

public <R extends IDataReader, W extends IDataWriter> void afterWrite(
DataContext context, Table table, CsvData data) {
public void afterWrite(DataContext context, Table table, CsvData data) {
}

public <R extends IDataReader, W extends IDataWriter> boolean handlesMissingTable(
DataContext context, Table table) {
public boolean handlesMissingTable(DataContext context, Table table) {
return false;
}

public <R extends IDataReader, W extends IDataWriter> void earlyCommit(
DataContext context) {
public void earlyCommit(DataContext context) {
}

public <R extends IDataReader, W extends IDataWriter> void batchComplete(
DataContext context) {
public void batchComplete(DataContext context) {
}

public <R extends IDataReader, W extends IDataWriter> void batchCommitted(
DataContext context) {
public void batchCommitted(DataContext context) {
}

public <R extends IDataReader, W extends IDataWriter> void batchRolledback(
DataContext context) {
public void batchRolledback(DataContext context) {
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -4,8 +4,6 @@
import org.jumpmind.extension.IExtensionPoint;
import org.jumpmind.symmetric.io.data.CsvData;
import org.jumpmind.symmetric.io.data.DataContext;
import org.jumpmind.symmetric.io.data.IDataReader;
import org.jumpmind.symmetric.io.data.IDataWriter;

public interface IDatabaseWriterFilter extends IExtensionPoint {

Expand All @@ -16,51 +14,51 @@ public interface IDatabaseWriterFilter extends IExtensionPoint {
* @return true if the row should be loaded. false if the filter has handled
* the row and it should be ignored.
*/
public <R extends IDataReader, W extends IDataWriter> boolean beforeWrite(
public boolean beforeWrite(
DataContext context, Table table, CsvData data);

/**
* Called right after a DML statement has been successfully executed against
* the database for the data.
*/
public <R extends IDataReader, W extends IDataWriter> void afterWrite(
public void afterWrite(
DataContext context, Table table, CsvData data);

/**
* Give the filter a chance to indicate that is can handle a table that is
* missing. This might return true if the filter will be performing
* transformations on the data and inserting the data itself.
*/
public <R extends IDataReader, W extends IDataWriter> boolean handlesMissingTable(
public boolean handlesMissingTable(
DataContext context, Table table);

/**
* If the {@link ParameterConstants#DATA_LOADER_MAX_ROWS_BEFORE_COMMIT}
* property is set and the max number of rows is reached and a commit is
* about to happen, then this method is called.
*/
public <R extends IDataReader, W extends IDataWriter> void earlyCommit(
public void earlyCommit(
DataContext context);

/**
* This method is called after a batch has been successfully processed. It
* is called in the scope of the transaction that controls the batch commit.
*/
public <R extends IDataReader, W extends IDataWriter> void batchComplete(
public void batchComplete(
DataContext context);

/**
* This method is called after the database transaction for the batch has
* been committed.
*/
public <R extends IDataReader, W extends IDataWriter> void batchCommitted(
public void batchCommitted(
DataContext context);

/**
* This method is called after the database transaction for the batch has
* been rolled back.
*/
public <R extends IDataReader, W extends IDataWriter> void batchRolledback(
public void batchRolledback(
DataContext context);

}
Loading

0 comments on commit 4a3734b

Please sign in to comment.