Skip to content

Commit

Permalink
0001647: Create a Java router, transform, and load filter that uses c…
Browse files Browse the repository at this point in the history
…ompiled Java code
  • Loading branch information
erilong committed Mar 18, 2014
1 parent fbe10e2 commit 115dfc9
Show file tree
Hide file tree
Showing 7 changed files with 414 additions and 199 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,36 +20,26 @@
*/
package org.jumpmind.symmetric.load;

import java.util.ArrayList;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.commons.lang.BooleanUtils;
import org.apache.commons.lang.StringUtils;
import org.jumpmind.db.model.Table;
import org.jumpmind.extension.IBuiltInExtensionPoint;
import org.jumpmind.symmetric.ISymmetricEngine;
import org.jumpmind.symmetric.SymmetricException;
import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.io.data.CsvData;
import org.jumpmind.symmetric.io.data.DataContext;
import org.jumpmind.symmetric.io.data.DataEventType;
import org.jumpmind.symmetric.io.data.writer.IDatabaseWriterErrorHandler;
import org.jumpmind.symmetric.io.data.writer.IDatabaseWriterFilter;
import org.jumpmind.symmetric.model.LoadFilter;
import org.jumpmind.util.Context;
import org.jumpmind.util.FormatUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import bsh.EvalError;
import bsh.Interpreter;
import bsh.TargetError;

public class BshDatabaseWriterFilter implements IDatabaseWriterFilter, IDatabaseWriterErrorHandler,
IBuiltInExtensionPoint {
public class BshDatabaseWriterFilter extends DynamicDatabaseWriterFilter {

private static final String OLD_ = "OLD_";
private static final String CONTEXT = "context";
Expand All @@ -59,69 +49,76 @@ public class BshDatabaseWriterFilter implements IDatabaseWriterFilter, IDatabase
private static final String ENGINE = "engine";
private static final String LOG = "log";
private final String INTERPRETER_KEY = String.format("%d.BshInterpreter", hashCode());
private final String BATCH_COMPLETE_SCRIPTS_KEY = String.format("%d.BatchCompleteScripts",
hashCode());
private final String BATCH_COMMIT_SCRIPTS_KEY = String.format("%d.BatchCommitScripts",
hashCode());
private final String BATCH_ROLLBACK_SCRIPTS_KEY = String.format("%d.BatchRollbackScripts",
hashCode());
private final String FAIL_ON_ERROR_KEY = String.format("%d.FailOnError", hashCode());

protected final Logger log = LoggerFactory.getLogger(getClass());

protected ISymmetricEngine engine = null;

protected Map<String, List<LoadFilter>> loadFilters = null;

public enum WriteMethod {
BEFORE_WRITE, AFTER_WRITE, BATCH_COMPLETE, BATCH_COMMIT, BATCH_ROLLBACK, HANDLE_ERROR
};

public BshDatabaseWriterFilter(ISymmetricEngine engine,
Map<String, List<LoadFilter>> loadFilters) {

this.engine = engine;
this.loadFilters = loadFilters;
super(engine, loadFilters);
}

public boolean beforeWrite(DataContext context, Table table, CsvData data) {
return processLoadFilters(context, table, data, null, WriteMethod.BEFORE_WRITE);
}
@Override
protected boolean processLoadFilters(DataContext context, Table table, CsvData data,
Exception error, WriteMethod writeMethod, List<LoadFilter> loadFiltersForTable) {

public void afterWrite(DataContext context, Table table, CsvData data) {
processLoadFilters(context, table, data, null, WriteMethod.AFTER_WRITE);
}
boolean writeRow = true;
LoadFilter currentFilter = null;

public boolean handleError(DataContext context, Table table, CsvData data, Exception error) {
return processLoadFilters(context, table, data, error, WriteMethod.HANDLE_ERROR);
}
try {
Interpreter interpreter = getInterpreter(context);
bind(interpreter, context, table, data, error);
for (LoadFilter filter : loadFiltersForTable) {
currentFilter = filter;
if (filter.isFilterOnDelete()
&& data.getDataEventType().equals(DataEventType.DELETE)
|| filter.isFilterOnInsert()
&& data.getDataEventType().equals(DataEventType.INSERT)
|| filter.isFilterOnUpdate()
&& data.getDataEventType().equals(DataEventType.UPDATE)) {
Object result = null;
if (writeMethod.equals(WriteMethod.BEFORE_WRITE)
&& filter.getBeforeWriteScript() != null) {
result = interpreter.eval(filter.getBeforeWriteScript());
} else if (writeMethod.equals(WriteMethod.AFTER_WRITE)
&& filter.getAfterWriteScript() != null) {
result = interpreter.eval(filter.getAfterWriteScript());
} else if (writeMethod.equals(WriteMethod.HANDLE_ERROR)
&& filter.getHandleErrorScript() != null) {
result = interpreter.eval(filter.getHandleErrorScript());
}

public boolean handlesMissingTable(DataContext context, Table table) {
if (engine!=null && engine.getParameterService()!=null &&
engine.getParameterService().is(ParameterConstants.BSH_LOAD_FILTER_HANDLES_MISSING_TABLES)) {
return true;
} else {
String tableName = table.getFullyQualifiedTableName();
if (isIgnoreCase()) {
tableName = tableName.toUpperCase();
if (result != null && result.equals(Boolean.FALSE)) {
writeRow = false;
}
}
}
return loadFilters.containsKey(tableName);
} catch (EvalError ex) {
processError(currentFilter, table, ex);
}
}

public void earlyCommit(DataContext context) {
}

public void batchComplete(DataContext context) {
executeScripts(context, BATCH_COMPLETE_SCRIPTS_KEY);
}

public void batchCommitted(DataContext context) {
executeScripts(context, BATCH_COMMIT_SCRIPTS_KEY);
return writeRow;
}

public void batchRolledback(DataContext context) {
executeScripts(context, BATCH_ROLLBACK_SCRIPTS_KEY);
@Override
protected void executeScripts(DataContext context, String key, Set<String> scripts, boolean isFailOnError) {
Interpreter interpreter = getInterpreter(context);
String currentScript = null;
try {
bind(interpreter, context, null, null, null);
if (scripts != null) {
for (String script : scripts) {
currentScript = script;
interpreter.eval(script);
}
}
} catch (EvalError e) {
String errorMsg = String.format("Beanshell script %s with error %s", new Object[] {
currentScript, e.getErrorText() });
log.error(errorMsg);
if (isFailOnError) {
throw new SymmetricException(errorMsg);
}
}
}

protected Interpreter getInterpreter(Context context) {
Expand Down Expand Up @@ -174,124 +171,4 @@ protected void processError(LoadFilter currentFilter, Table table, Throwable ex)
throw new SymmetricException(formattedMessage, ex);
}
}

protected void addBatchScriptsToContext(DataContext context, LoadFilter filter) {
addBatchScriptToContext(context, BATCH_COMPLETE_SCRIPTS_KEY,
filter.getBatchCompleteScript());
addBatchScriptToContext(context, BATCH_COMMIT_SCRIPTS_KEY, filter.getBatchCommitScript());
addBatchScriptToContext(context, BATCH_ROLLBACK_SCRIPTS_KEY,
filter.getBatchRollbackScript());
if (filter.isFailOnError()) {
context.put(FAIL_ON_ERROR_KEY, Boolean.TRUE);
}
}

protected void addBatchScriptToContext(DataContext context, String key, String script) {
if (StringUtils.isNotBlank(script)) {
@SuppressWarnings("unchecked")
Set<String> scripts = (Set<String>) context.get(key);
if (scripts == null) {
scripts = new HashSet<String>();
context.put(key, scripts);
}
scripts.add(script);
}
}

protected void executeScripts(DataContext context, String key) {
@SuppressWarnings("unchecked")
Set<String> scripts = (Set<String>) context.get(key);
Interpreter interpreter = getInterpreter(context);
String currentScript = null;
try {
bind(interpreter, context, null, null, null);
if (scripts != null) {
for (String script : scripts) {
currentScript = script;
interpreter.eval(script);
}
}
} catch (EvalError e) {
String errorMsg = String.format("Beanshell script %s with error %s", new Object[] {
currentScript, e.getErrorText() });
log.error(errorMsg);
if (BooleanUtils.isTrue((Boolean) context.get(FAIL_ON_ERROR_KEY))) {
throw new SymmetricException(errorMsg);
}
}

}

protected boolean processLoadFilters(DataContext context, Table table, CsvData data,
Exception error, WriteMethod writeMethod) {

boolean writeRow = true;
LoadFilter currentFilter = null;

List<LoadFilter> wildcardLoadFilters = null;
if (table != null) {
if (!table.getName().toLowerCase().startsWith(engine.getTablePrefix() + "_")) {
wildcardLoadFilters = loadFilters.get(Table.getFullyQualifiedTableName(
table.getCatalog(), table.getSchema(), FormatUtils.WILDCARD));
}

String tableName = table.getFullyQualifiedTableName();
if (isIgnoreCase()) {
tableName = Table.getFullyQualifiedTableName(table.getCatalog(), table.getSchema(), table.getName().toUpperCase(), "");
}
List<LoadFilter> tableSpecificLoadFilters = loadFilters.get(tableName);
int size = (wildcardLoadFilters != null ? wildcardLoadFilters.size() : 0)
+ (tableSpecificLoadFilters != null ? tableSpecificLoadFilters.size() : 0);

if (size > 0) {
List<LoadFilter> loadFiltersForTable = new ArrayList<LoadFilter>(size);
if (wildcardLoadFilters != null) {
loadFiltersForTable.addAll(wildcardLoadFilters);
}

if (tableSpecificLoadFilters != null) {
loadFiltersForTable.addAll(tableSpecificLoadFilters);
}
try {
Interpreter interpreter = getInterpreter(context);
bind(interpreter, context, table, data, error);
for (LoadFilter filter : loadFiltersForTable) {
currentFilter = filter;
addBatchScriptsToContext(context, filter);
if (filter.isFilterOnDelete()
&& data.getDataEventType().equals(DataEventType.DELETE)
|| filter.isFilterOnInsert()
&& data.getDataEventType().equals(DataEventType.INSERT)
|| filter.isFilterOnUpdate()
&& data.getDataEventType().equals(DataEventType.UPDATE)) {
Object result = null;
if (writeMethod.equals(WriteMethod.BEFORE_WRITE)
&& filter.getBeforeWriteScript() != null) {
result = interpreter.eval(filter.getBeforeWriteScript());
} else if (writeMethod.equals(WriteMethod.AFTER_WRITE)
&& filter.getAfterWriteScript() != null) {
result = interpreter.eval(filter.getAfterWriteScript());
} else if (writeMethod.equals(WriteMethod.HANDLE_ERROR)
&& filter.getHandleErrorScript() != null) {
result = interpreter.eval(filter.getHandleErrorScript());
}

if (result != null && result.equals(Boolean.FALSE)) {
writeRow = false;
}
}
}
} catch (EvalError ex) {
processError(currentFilter, table, ex);
}
}
}

return writeRow;
}

protected boolean isIgnoreCase() {
return engine.getParameterService()
.is(ParameterConstants.DB_METADATA_IGNORE_CASE);
}
}
Loading

0 comments on commit 115dfc9

Please sign in to comment.