Skip to content

Commit

Permalink
0006256: Add virtual column to sym_file_snapshot for easier routing by
Browse files Browse the repository at this point in the history
column match and bean shell routers
  • Loading branch information
Philip Marzullo committed Feb 27, 2024
1 parent d0d1010 commit 7871ab3
Show file tree
Hide file tree
Showing 2 changed files with 56 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ Not only is file synchronization configured similar to database synchronization,

The changes to FILE_SNAPSHOT are then routed and batched by a file-synchronization-specific router that delegates to the configured router based on the FILE_TRIGGER_ROUTER configuration. The file sync router can make routing decisions based on the column data of the snapshot table, columns which contain attributes of the file like the name, path, size, and last modified time. Both old and new file snapshot data are also available. The router can, for example, parse the path or name of the file and use it as the node id to route to.

An additional virtual column is available for processing. The column is called TOP_RELATIVE_DIR, and is the first segment of the RELATIVE_DIR value. For example, if the RELATIVE_DIR value for the location of the file is "node1/dir1", the TOP_RELATIVE_DIR value will be set to "node1". This can make it easier for routing by the Column Match Data Router to match the first segment of the RELATIVE_DIR to the node id value or the external id value.

Batches of file snapshot changes are stored on the filesync channel in OUTGOING_BATCH. The existing SymmetricDS pull and push jobs ignore the filesync channel. Instead, they are processed by file-synchronization-specific push and pull jobs.

When transferring data, the file sync push and pull jobs build a zip file dynamically based on the batched snapshot data. The zip file contains a directory per batch. The directory name is the batch_id. A sync.bsh Bean Shell script is generated and placed in the root of each batch directory. The Bean Shell script contains the commands to copy or delete files at their file destination from an extracted zip in the staging directory on the target node. The zip file is downloaded in the case of a pull, or, in the case of a push, is uploaded as an HTTP multi-part attachment. Outgoing zip files are written and transferred from the outgoing staging directory. Incoming zip files are staged in the filesync_incoming staging directory by source node id. The filesync_incoming/{node_id} staging directory is cleared out before each subsequent delivery of files.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,17 +20,22 @@
*/
package org.jumpmind.symmetric.route;

import java.sql.Types;
import java.util.HashSet;
import java.util.Map;
import java.util.Set;

import org.apache.commons.lang3.StringUtils;
import org.jumpmind.db.model.Column;
import org.jumpmind.db.model.Table;
import org.jumpmind.extension.IBuiltInExtensionPoint;
import org.jumpmind.symmetric.ISymmetricEngine;
import org.jumpmind.symmetric.model.Data;
import org.jumpmind.symmetric.model.DataMetaData;
import org.jumpmind.symmetric.model.FileTriggerRouter;
import org.jumpmind.symmetric.model.Node;
import org.jumpmind.symmetric.model.Router;
import org.jumpmind.symmetric.model.TriggerHistory;
import org.jumpmind.symmetric.model.FileSnapshot.LastEventType;
import org.jumpmind.symmetric.model.TriggerRouter;
import org.jumpmind.symmetric.service.IFileSyncService;
Expand All @@ -55,14 +60,20 @@ public Set<String> routeToNodes(SimpleRouterContext context, DataMetaData dataMe
String routerId = newData.get("ROUTER_ID");
String sourceNodeId = newData.get("LAST_UPDATE_BY");
String lastEventType = newData.get("LAST_EVENT_TYPE");
String relativeDir = newData.get("RELATIVE_DIR");
// Append calculated top relative dir to old data and new data
// Append top relative dir column name to list of columns in sym_file_snapshot trigger history
if (triggerId == null) {
Map<String, String> oldData = getOldDataAsString(null, dataMetaData,
engine.getSymmetricDialect());
triggerId = oldData.get("TRIGGER_ID");
routerId = oldData.get("ROUTER_ID");
sourceNodeId = oldData.get("LAST_UPDATE_BY");
lastEventType = oldData.get("LAST_EVENT_TYPE");
relativeDir = oldData.get("RELATIVE_DIR");
}
String topRelativeDir = getTopRelativeDir(relativeDir);
addTopRelativeDirToData(topRelativeDir, dataMetaData);
LastEventType eventType = LastEventType.fromCode(lastEventType);
FileTriggerRouter fileTriggerRouter = fileSyncService.getFileTriggerRouter(
triggerId, routerId, true);
Expand Down Expand Up @@ -108,6 +119,49 @@ public Set<String> routeToNodes(SimpleRouterContext context, DataMetaData dataMe
}
return nodeIds;
}

private String getTopRelativeDir(String relativeDir) {
String topRelativeDir = null;
if (relativeDir != null) {
relativeDir = relativeDir.replace('\\','/');
topRelativeDir = (relativeDir.contains("/") ? relativeDir.substring(0, relativeDir.indexOf('/')) : relativeDir);
}
return topRelativeDir;
}

private void addTopRelativeDirToData(String topRelativeDir, DataMetaData dataMetaData) {
Table copy;
try {
copy = (Table) dataMetaData.getTable().clone();
} catch (CloneNotSupportedException e) {
throw new RuntimeException(e);
}
copy.addColumn(new Column("top_relative_dir", false, Types.VARCHAR, topRelativeDir.length(), 0));
dataMetaData.setTable(copy);
Data data = dataMetaData.getData();
String oldData = data.getCsvData(Data.OLD_DATA);
String newData = data.getCsvData(Data.ROW_DATA);
if (oldData != null) {
oldData = oldData.concat(",");
if (! StringUtils.isBlank(oldData)) {
oldData = oldData.concat("\"").concat(topRelativeDir).concat("\"");
}
data.putCsvData(Data.OLD_DATA, oldData);
}
if (newData != null) {
newData = newData.concat(",");
if (! StringUtils.isBlank(newData)) {
newData = newData.concat("\"").concat(topRelativeDir).concat("\"");
}
data.putCsvData(Data.ROW_DATA, newData);
}
TriggerHistory triggerHistory = data.getTriggerHistory();
TriggerHistory newTriggerHistory = new TriggerHistory(
triggerHistory.getSourceTableName(),
triggerHistory.getPkColumnNames(),
triggerHistory.getColumnNames().concat(",").concat("TOP_RELATIVE_DIR"));
data.setTriggerHistory(newTriggerHistory);
}

@Override
public boolean isConfigurable() {
Expand Down

0 comments on commit 7871ab3

Please sign in to comment.