Skip to content

Commit

Permalink
Merge branch '3.8' of https://github.com/JumpMind/symmetric-ds into 3.8
Browse files Browse the repository at this point in the history
  • Loading branch information
mmichalek committed Sep 6, 2016
2 parents 770726d + 8d024b4 commit ffa5cf0
Show file tree
Hide file tree
Showing 15 changed files with 284 additions and 78 deletions.
34 changes: 31 additions & 3 deletions symmetric-assemble/src/asciidoc/configuration/transforms/types.ad
Expand Up @@ -233,7 +233,8 @@ column. The query can reference source column names by prefixing them with a co
===== BeanShell Script Transform

This transformation allows you to provide a http://www.beanshell.org/[BeanShell] script in the transform expression and executes the script
at the time of transformation. Some variables are provided to the script:
at the time of transformation. Beanshell transforms can return either a String value or an instance of NewAndOldValue.
Some variables are provided to the script:

.Variables
|===
Expand All @@ -248,7 +249,7 @@ at the time of transformation. Some variables are provided to the script:

|===

.Tranform Expression Example
.Tranform Expression Example Returning a String
====
----
if (currentValue > oldValue) {
Expand All @@ -259,7 +260,16 @@ if (currentValue > oldValue) {
----
====


.Tranform Expression Example Returning a NewAndOldValue object
====
----
if (currentValue != null && currentValue.length() == 0) {
return org.jumpmind.symmetric.io.data.transform.NewAndOldValue(null, oldValue);
} else {
return currentValue;
}
----
====
ifndef::pro[]
[source, SQL]
----
Expand Down Expand Up @@ -493,3 +503,21 @@ endif::pro[]
would convert a row with columns named "user1" and "user2" containing values "red" and "blue" into two rows with columns
"fieldid" and "color" containing a row of "1" and "red" and a row of "2" and "blue".
====



===== isEmpty Transform

This transformation checks to see if a string is null or zero length. If it is empty the replacement
value will be used. If no value is provided null will be used as a default replacement for empty values.

===== isBlank Transform

This transformation checks to see if a string is null or zero length after trimming white spaces. If it is blank the replacement
value will be used. If no value is provided null will be used as a default replacement for blank values.

===== Null Value Transform

This transformation checks to see if the source value is null and if so replaces it with the provided value.


Expand Up @@ -139,7 +139,7 @@ public void onCtlFile(File file) {
File ctlFile = engine.getFileSyncService().getControleFile(file);

if (ctlFile.exists()) {
log.debug("Control file detected: {}", file.getAbsolutePath());
log.debug("Control file detected: {}", ctlFile.getAbsolutePath());
addSnapshot(file, LastEventType.CREATE);
}
}
Expand Down
Expand Up @@ -48,7 +48,7 @@
import bsh.Interpreter;
import bsh.TargetError;

public class BshColumnTransform implements ISingleValueColumnTransform, IBuiltInExtensionPoint {
public class BshColumnTransform implements ISingleNewAndOldValueColumnTransform, IBuiltInExtensionPoint {

protected final Logger log = LoggerFactory.getLogger(getClass());

Expand Down Expand Up @@ -79,7 +79,7 @@ public boolean isLoadColumnTransform() {
return true;
}

public String transform(IDatabasePlatform platform,
public NewAndOldValue transform(IDatabasePlatform platform,
DataContext context,
TransformColumn column, TransformedData data, Map<String, String> sourceValues,
String newValue, String oldValue) throws IgnoreColumnException, IgnoreRowException {
Expand Down Expand Up @@ -143,10 +143,13 @@ public String transform(IDatabasePlatform platform,
interpreter.unset(columnName);
}

if (result != null) {
return result.toString();
if (result == null) {
return null;
}
else if (result instanceof String) {
return new NewAndOldValue((String) result, null);
} else {
return null;
return (NewAndOldValue) result;
}
} catch (TargetError evalEx) {
Throwable ex = evalEx.getTarget();
Expand Down
Expand Up @@ -54,71 +54,78 @@ public Set<String> routeToNodes(SimpleRouterContext context, DataMetaData dataMe
String fileName = newData.get("FILE_NAME");
String relativeDir = newData.get("RELATIVE_DIR");
String triggerId = newData.get("TRIGGER_ID");
String lastEventType = newData.get("LAST_EVENT_TYPE");
String routerExpression = dataMetaData.getRouter().getRouterExpression();
String channelId = "default";
if (routerExpression != null) {
String[] keyValues = routerExpression.split(",");
if (keyValues.length > 0) {
for (int i=0; i< keyValues.length; i++) {
String[] keyValue = keyValues[i].split("=");
if (keyValue.length > 1) {
if (ROUTER_EXPRESSION_CHANNEL_KEY.equals(keyValue[0])) {
channelId = keyValue[1];
String filePath = relativeDir + "/" + fileName;
IContextService contextService = getEngine().getContextService();

if (lastEventType.equals(DataEventType.DELETE.toString())) {
log.debug("File deleted (" + filePath + "), cleaning up context value.");
contextService.delete(filePath);
}
else {
if (routerExpression != null) {
String[] keyValues = routerExpression.split(",");
if (keyValues.length > 0) {
for (int i=0; i< keyValues.length; i++) {
String[] keyValue = keyValues[i].split("=");
if (keyValue.length > 1) {
if (ROUTER_EXPRESSION_CHANNEL_KEY.equals(keyValue[0])) {
channelId = keyValue[1];
}
}
}
}
}
}
if (triggerId != null) {
String baseDir = getEngine().getFileSyncService().getFileTrigger(triggerId).getBaseDir();
File file = createSourceFile(baseDir, relativeDir, fileName);

IContextService contextService = getEngine().getContextService();

String filePath = relativeDir + "/" + fileName;

Integer lineNumber = contextService.getString(filePath) == null ? 0 : new Integer(contextService.getString(filePath));

List<String> dataRows = parse(file, lineNumber);
String columnNames = getColumnNames();

String nodeList = buildNodeList(nodes);
String externalData = new StringBuilder(EXTERNAL_DATA_TRIGGER_KEY)
.append("=")
.append(triggerId)
.append(",")
.append(EXTERNAL_DATA_ROUTER_KEY)
.append("=")
.append(dataMetaData.getRouter().getRouterId())
.append(",")
.append(EXTERNAL_DATA_FILE_DATA_ID)
.append("=")
.append(dataMetaData.getData().getDataId()).toString();

for (String row : dataRows) {
Data data = new Data();
if (triggerId != null) {
String baseDir = getEngine().getFileSyncService().getFileTrigger(triggerId).getBaseDir();
File file = createSourceFile(baseDir, relativeDir, fileName);

data.setChannelId(channelId);
data.setDataEventType(DataEventType.INSERT);
data.setRowData(row);
data.setTableName(targetTableName);
data.setNodeList(nodeList);
data.setTriggerHistory(getTriggerHistory(targetTableName, columnNames));
data.setExternalData(externalData);
data.setDataId(getEngine().getDataService().insertData(data));
lineNumber++;
}
if (!dataRows.isEmpty()) {
try {
contextService.save(filePath, lineNumber.toString());
deleteFileIfNecessary(dataMetaData);

Integer lineNumber = contextService.getString(filePath) == null ? 0 : new Integer(contextService.getString(filePath));

List<String> dataRows = parse(file, lineNumber);
String columnNames = getColumnNames();

String nodeList = buildNodeList(nodes);
String externalData = new StringBuilder(EXTERNAL_DATA_TRIGGER_KEY)
.append("=")
.append(triggerId)
.append(",")
.append(EXTERNAL_DATA_ROUTER_KEY)
.append("=")
.append(dataMetaData.getRouter().getRouterId())
.append(",")
.append(EXTERNAL_DATA_FILE_DATA_ID)
.append("=")
.append(dataMetaData.getData().getDataId()).toString();

for (String row : dataRows) {
Data data = new Data();

data.setChannelId(channelId);
data.setDataEventType(DataEventType.INSERT);
data.setRowData(row);
data.setTableName(targetTableName);
data.setNodeList(nodeList);
data.setTriggerHistory(getTriggerHistory(targetTableName, columnNames));
data.setExternalData(externalData);
data.setDataId(getEngine().getDataService().insertData(data));
lineNumber++;
}
catch (Exception e) {
e.printStackTrace();
if (!dataRows.isEmpty()) {
try {
contextService.save(filePath, lineNumber.toString());
deleteFileIfNecessary(dataMetaData);
}
catch (Exception e) {
e.printStackTrace();
}
}
}
}

return new HashSet<String>();

}
Expand Down
Expand Up @@ -170,6 +170,16 @@ public Set<String> routeToNodes(SimpleRouterContext routingContext, DataMetaData
}
}

} else if (tableMatches(dataMetaData, TableConstants.SYM_CONSOLE_EVENT)
|| tableMatches(dataMetaData, TableConstants.SYM_MONITOR)
|| tableMatches(dataMetaData, TableConstants.SYM_MONITOR_EVENT)
|| tableMatches(dataMetaData, TableConstants.SYM_NOTIFICATION)) {
String sourceNodeId = columnValues.get("SOURCE_NODE_ID");
for (Node nodeThatMayBeRoutedTo : possibleTargetNodes) {
if (nodeThatMayBeRoutedTo.isVersionGreaterThanOrEqualTo(3,8,0)) {
nodeIds.add(sourceNodeId);
}
}
} else {
IConfigurationService configurationService = engine.getConfigurationService();
for (Node nodeThatMayBeRoutedTo : possibleTargetNodes) {
Expand Down
Expand Up @@ -42,6 +42,8 @@ public interface IContextService {

public int delete(ISqlTransaction transaction, String name);

public int delete(String name);

public void save(String name, String value);

}
Expand Up @@ -75,6 +75,10 @@ public int update(ISqlTransaction transaction, String name, String value) {
public int delete(ISqlTransaction transaction, String name) {
return transaction.prepareAndExecute(getSql("deleteSql"), name);
}

public int delete(String name) {
return sqlTemplate.update(getSql("deleteSql"), name);
}

public void save(String name, String value) {
int count = sqlTemplate.update(getSql("updateSql"), value, name);
Expand Down
Expand Up @@ -613,24 +613,26 @@ public FutureOutgoingBatch call() throws Exception {
Iterator<OutgoingBatch> activeBatchIter = activeBatches.iterator();
for (Future<FutureOutgoingBatch> future : futures) {
currentBatch = activeBatchIter.next();
boolean isSent = false;
while (!isSent) {
boolean isProcessed = false;
while (!isProcessed) {
try {
FutureOutgoingBatch extractBatch = future.get(keepAliveMillis, TimeUnit.MILLISECONDS);
currentBatch = extractBatch.getOutgoingBatch();

if (!extractBatch.isExtractSkipped && (streamToFileEnabled || mode == ExtractMode.FOR_PAYLOAD_CLIENT)) {
if (extractBatch.isExtractSkipped) {
break;
}

if (streamToFileEnabled || mode == ExtractMode.FOR_PAYLOAD_CLIENT) {
processInfo.setStatus(ProcessInfo.Status.TRANSFERRING);
processInfo.setCurrentLoadId(currentBatch.getLoadId());
currentBatch = sendOutgoingBatch(processInfo, targetNode, currentBatch, extractBatch.isRetry(),
dataWriter, writer, mode);
}

if (!extractBatch.isExtractSkipped) {
processedBatches.add(currentBatch);
}
isSent = true;

processedBatches.add(currentBatch);
isProcessed = true;

if (currentBatch.getStatus() != Status.OK) {
currentBatch.setLoadCount(currentBatch.getLoadCount() + 1);
changeBatchStatus(Status.LD, currentBatch, mode);
Expand Down
Expand Up @@ -949,7 +949,7 @@ protected IDataWriter chooseDataWriter(Batch batch) {
IncomingBatch incomingBatch = new IncomingBatch(batch);
listener.getBatchesProcessed().add(incomingBatch);
if (incomingBatchService.acquireIncomingBatch(incomingBatch)) {
log.warn("Unable to retry batch {} because it's not in staging. Setting status to resend.", batch.getNodeBatchId());
log.info("Unable to retry batch {} because it's not in staging. Setting status to resend.", batch.getNodeBatchId());
incomingBatch.setStatus(Status.RS);
incomingBatchService.updateIncomingBatch(incomingBatch);
}
Expand Down
Expand Up @@ -46,6 +46,9 @@
import org.jumpmind.symmetric.io.data.transform.CopyIfChangedColumnTransform;
import org.jumpmind.symmetric.io.data.transform.IColumnTransform;
import org.jumpmind.symmetric.io.data.transform.IdentityColumnTransform;
import org.jumpmind.symmetric.io.data.transform.IsBlankTransform;
import org.jumpmind.symmetric.io.data.transform.IsEmptyTransform;
import org.jumpmind.symmetric.io.data.transform.IsNullTransform;
import org.jumpmind.symmetric.io.data.transform.JavaColumnTransform;
import org.jumpmind.symmetric.io.data.transform.LeftColumnTransform;
import org.jumpmind.symmetric.io.data.transform.LookupColumnTransform;
Expand Down Expand Up @@ -111,6 +114,9 @@ public TransformService(IParameterService parameterService, ISymmetricDialect sy
addColumnTransform(ColumnsToRowsKeyColumnTransform.NAME, new ColumnsToRowsKeyColumnTransform());
addColumnTransform(ColumnsToRowsValueColumnTransform.NAME, new ColumnsToRowsValueColumnTransform());
addColumnTransform(ClarionDateTimeColumnTransform.NAME, new ClarionDateTimeColumnTransform());
addColumnTransform(IsEmptyTransform.NAME, new IsEmptyTransform());
addColumnTransform(IsNullTransform.NAME, new IsNullTransform());
addColumnTransform(IsBlankTransform.NAME, new IsBlankTransform());

setSqlMap(new TransformServiceSqlMap(symmetricDialect.getPlatform(),
createSqlReplacementTokens()));
Expand Down
Expand Up @@ -1544,7 +1544,7 @@ file.push.lock.timeout.ms=7200000
# DatabaseOverridable: true
# Tags: filesync
# Type: boolean
file.sync.delete.ctl.file.after.sync=false;
file.sync.delete.ctl.file.after.sync=false

# If the ctl file is used to control file triggers this will look for a control file
# with the same name but .ctl replacing the existing extension.
Expand All @@ -1553,7 +1553,7 @@ file.sync.delete.ctl.file.after.sync=false;
# DatabaseOverridable: true
# Tags: filesync
# Type: boolean
file.sync.use.ctl.as.file.ext=false;
file.sync.use.ctl.as.file.ext=false

# This parameter can be used to indicate that bean shell load filters will handle missing tables. Useful
# for the case where you want to make, for example, global catalog or schema changes at the destination
Expand Down

0 comments on commit ffa5cf0

Please sign in to comment.