From cb7400a6db451f98df0401849dd012ea22f4e949 Mon Sep 17 00:00:00 2001 From: chenson42 Date: Sun, 23 Mar 2014 19:59:35 +0000 Subject: [PATCH] 0001581: Add support for multiple channels for file sync --- .../org/jumpmind/symmetric/model/FileSnapshot.java | 11 +++++++++++ .../symmetric/service/impl/FileSyncService.java | 9 +++++---- .../symmetric/service/impl/FileSyncServiceSqlMap.java | 9 +++++---- .../symmetric/service/impl/TriggerRouterService.java | 4 +++- 4 files changed, 24 insertions(+), 9 deletions(-) diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/FileSnapshot.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/FileSnapshot.java index 8df3cf4bbd..50a196aefe 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/FileSnapshot.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/FileSnapshot.java @@ -65,6 +65,7 @@ public static LastEventType fromCode(String code) { private String relativeDir; private String fileName; private LastEventType lastEventType; + private String channelId; private long crc32Checksum; private long oldCrc32Checksum; private long fileSize; @@ -88,11 +89,13 @@ public FileSnapshot(FileSnapshot copy) { this.createTime = copy.createTime; this.lastUpdateBy = copy.lastUpdateBy; this.lastUpdateTime = copy.lastUpdateTime; + this.channelId = copy.channelId; } public FileSnapshot(FileTriggerRouter fileTriggerRouter, File file, LastEventType lastEventType) { boolean isDelete = lastEventType == LastEventType.DELETE; this.triggerId = fileTriggerRouter.getFileTrigger().getTriggerId(); + this.channelId = fileTriggerRouter.getFileTrigger().getChannelId(); this.routerId = fileTriggerRouter.getRouter().getRouterId(); this.lastEventType = lastEventType; this.lastUpdateTime = new Date(); @@ -153,6 +156,14 @@ public String getRouterId() { public void setRouterId(String routerId) { this.routerId = routerId; } + + public void setChannelId(String channelId) { + this.channelId = channelId; + } + + public String getChannelId() { + return channelId; + } public String getRelativeDir() { return relativeDir; diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/FileSyncService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/FileSyncService.java index cfe5ee1989..c374465cfa 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/FileSyncService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/FileSyncService.java @@ -335,10 +335,10 @@ public void save(ISqlTransaction sqlTransaction, FileSnapshot snapshot) { getSql("updateFileSnapshotSql"), new Object[] { snapshot.getLastEventType().getCode(), snapshot.getCrc32Checksum(), snapshot.getFileSize(), snapshot.getFileModifiedTime(), - snapshot.getLastUpdateTime(), snapshot.getLastUpdateBy(), + snapshot.getLastUpdateTime(), snapshot.getLastUpdateBy(), snapshot.getChannelId(), snapshot.getTriggerId(), snapshot.getRouterId(), snapshot.getRelativeDir(), snapshot.getFileName() }, new int[] { Types.VARCHAR, Types.NUMERIC, - Types.NUMERIC, Types.NUMERIC, Types.TIMESTAMP, Types.VARCHAR, + Types.NUMERIC, Types.NUMERIC, Types.TIMESTAMP, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR })) { snapshot.setCreateTime(snapshot.getLastUpdateTime()); sqlTransaction.prepareAndExecute( @@ -346,11 +346,11 @@ public void save(ISqlTransaction sqlTransaction, FileSnapshot snapshot) { new Object[] { snapshot.getLastEventType().getCode(), snapshot.getCrc32Checksum(), snapshot.getFileSize(), snapshot.getFileModifiedTime(), snapshot.getCreateTime(), - snapshot.getLastUpdateTime(), snapshot.getLastUpdateBy(), + snapshot.getLastUpdateTime(), snapshot.getLastUpdateBy(), snapshot.getChannelId(), snapshot.getTriggerId(), snapshot.getRouterId(), snapshot.getRelativeDir(), snapshot.getFileName() }, new int[] { Types.VARCHAR, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC, - Types.TIMESTAMP, Types.TIMESTAMP, Types.VARCHAR, Types.VARCHAR, + Types.TIMESTAMP, Types.TIMESTAMP, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR }); } // now that we have captured an update, delete the row for cleanup @@ -904,6 +904,7 @@ public FileSnapshot mapRow(Row rs) { FileSnapshot fileSnapshot = new FileSnapshot(); fileSnapshot.setCrc32Checksum(rs.getLong("crc32_checksum")); fileSnapshot.setCreateTime(rs.getDateTime("create_time")); + fileSnapshot.setChannelId(rs.getString("channel_id")); fileSnapshot.setLastUpdateBy(rs.getString("last_update_by")); fileSnapshot.setLastUpdateTime(rs.getDateTime("last_update_time")); fileSnapshot.setFileModifiedTime(rs.getLong("file_modified_time")); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/FileSyncServiceSqlMap.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/FileSyncServiceSqlMap.java index 749938c2de..0c428f723a 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/FileSyncServiceSqlMap.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/FileSyncServiceSqlMap.java @@ -63,7 +63,8 @@ public FileSyncServiceSqlMap(IDatabasePlatform platform, Map rep " values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?) "); putSql("selectFileSnapshotSql", - " select trigger_id, router_id, relative_dir, file_name, last_event_type, crc32_checksum, " + + " select trigger_id, router_id, channel_id, relative_dir, file_name, " + + " last_event_type, crc32_checksum, " + " file_size, file_modified_time, create_time, last_update_time, last_update_by " + " from $(file_snapshot) where trigger_id=? and router_id=? "); @@ -71,7 +72,7 @@ public FileSyncServiceSqlMap(IDatabasePlatform platform, Map rep " update $(file_snapshot) set " + " last_event_type=?, crc32_checksum=?, " + " file_size=?, file_modified_time=?, last_update_time=?, " + - " last_update_by=? " + + " last_update_by=?, channel_id=? " + " where " + " trigger_id=? and router_id=? and relative_dir=? and file_name=? "); @@ -102,8 +103,8 @@ public FileSyncServiceSqlMap(IDatabasePlatform platform, Map rep " insert into $(file_snapshot) ( " + " last_event_type, crc32_checksum, " + " file_size, file_modified_time, create_time, last_update_time, " + - " last_update_by, trigger_id, router_id, relative_dir, file_name " + - " ) values(?,?,?,?,?,?,?,?,?,?,?) "); + " last_update_by, channel_id, trigger_id, router_id, relative_dir, file_name " + + " ) values(?,?,?,?,?,?,?,?,?,?,?, ?) "); putSql("selectFileTriggerRoutersSql", " select " + diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/TriggerRouterService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/TriggerRouterService.java index be1383c0e6..6f91ff59e9 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/TriggerRouterService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/TriggerRouterService.java @@ -393,7 +393,9 @@ protected Trigger buildTriggerForSymmetricTable(String tableName) { trigger.setChannelId(Constants.CHANNEL_HEARTBEAT); } else if (TableConstants.getTableName(tablePrefix, TableConstants.SYM_FILE_SNAPSHOT) .equals(tableName)) { - trigger.setChannelId(Constants.CHANNEL_FILESYNC); + trigger.setChannelId(Constants.CHANNEL_DYNAMIC); + trigger.setChannelExpression("$(curTriggerValue).$(curColumnPrefix)" + platform.alterCaseToMatchDatabaseDefaultCase("channel_id")); + trigger.setReloadChannelId(Constants.CHANNEL_FILESYNC_RELOAD); trigger.setUseCaptureOldData(true); trigger.setSyncOnIncomingBatch(false); boolean syncEnabled = parameterService.is(ParameterConstants.FILE_SYNC_ENABLE);