Skip to content

Commit

Permalink
0001581: Add support for multiple channels for file sync
Browse files Browse the repository at this point in the history
  • Loading branch information
chenson42 committed Mar 23, 2014
1 parent c1a7dea commit cb7400a
Show file tree
Hide file tree
Showing 4 changed files with 24 additions and 9 deletions.
Expand Up @@ -65,6 +65,7 @@ public static LastEventType fromCode(String code) {
private String relativeDir;
private String fileName;
private LastEventType lastEventType;
private String channelId;
private long crc32Checksum;
private long oldCrc32Checksum;
private long fileSize;
Expand All @@ -88,11 +89,13 @@ public FileSnapshot(FileSnapshot copy) {
this.createTime = copy.createTime;
this.lastUpdateBy = copy.lastUpdateBy;
this.lastUpdateTime = copy.lastUpdateTime;
this.channelId = copy.channelId;
}

public FileSnapshot(FileTriggerRouter fileTriggerRouter, File file, LastEventType lastEventType) {
boolean isDelete = lastEventType == LastEventType.DELETE;
this.triggerId = fileTriggerRouter.getFileTrigger().getTriggerId();
this.channelId = fileTriggerRouter.getFileTrigger().getChannelId();
this.routerId = fileTriggerRouter.getRouter().getRouterId();
this.lastEventType = lastEventType;
this.lastUpdateTime = new Date();
Expand Down Expand Up @@ -153,6 +156,14 @@ public String getRouterId() {
public void setRouterId(String routerId) {
this.routerId = routerId;
}

public void setChannelId(String channelId) {
this.channelId = channelId;
}

public String getChannelId() {
return channelId;
}

public String getRelativeDir() {
return relativeDir;
Expand Down
Expand Up @@ -335,22 +335,22 @@ public void save(ISqlTransaction sqlTransaction, FileSnapshot snapshot) {
getSql("updateFileSnapshotSql"),
new Object[] { snapshot.getLastEventType().getCode(), snapshot.getCrc32Checksum(),
snapshot.getFileSize(), snapshot.getFileModifiedTime(),
snapshot.getLastUpdateTime(), snapshot.getLastUpdateBy(),
snapshot.getLastUpdateTime(), snapshot.getLastUpdateBy(), snapshot.getChannelId(),
snapshot.getTriggerId(), snapshot.getRouterId(), snapshot.getRelativeDir(),
snapshot.getFileName() }, new int[] { Types.VARCHAR, Types.NUMERIC,
Types.NUMERIC, Types.NUMERIC, Types.TIMESTAMP, Types.VARCHAR,
Types.NUMERIC, Types.NUMERIC, Types.TIMESTAMP, Types.VARCHAR, Types.VARCHAR,
Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR })) {
snapshot.setCreateTime(snapshot.getLastUpdateTime());
sqlTransaction.prepareAndExecute(
getSql("insertFileSnapshotSql"),
new Object[] { snapshot.getLastEventType().getCode(),
snapshot.getCrc32Checksum(), snapshot.getFileSize(),
snapshot.getFileModifiedTime(), snapshot.getCreateTime(),
snapshot.getLastUpdateTime(), snapshot.getLastUpdateBy(),
snapshot.getLastUpdateTime(), snapshot.getLastUpdateBy(), snapshot.getChannelId(),
snapshot.getTriggerId(), snapshot.getRouterId(),
snapshot.getRelativeDir(), snapshot.getFileName() }, new int[] {
Types.VARCHAR, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC,
Types.TIMESTAMP, Types.TIMESTAMP, Types.VARCHAR, Types.VARCHAR,
Types.TIMESTAMP, Types.TIMESTAMP, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR,
Types.VARCHAR, Types.VARCHAR, Types.VARCHAR });
}
// now that we have captured an update, delete the row for cleanup
Expand Down Expand Up @@ -904,6 +904,7 @@ public FileSnapshot mapRow(Row rs) {
FileSnapshot fileSnapshot = new FileSnapshot();
fileSnapshot.setCrc32Checksum(rs.getLong("crc32_checksum"));
fileSnapshot.setCreateTime(rs.getDateTime("create_time"));
fileSnapshot.setChannelId(rs.getString("channel_id"));
fileSnapshot.setLastUpdateBy(rs.getString("last_update_by"));
fileSnapshot.setLastUpdateTime(rs.getDateTime("last_update_time"));
fileSnapshot.setFileModifiedTime(rs.getLong("file_modified_time"));
Expand Down
Expand Up @@ -63,15 +63,16 @@ public FileSyncServiceSqlMap(IDatabasePlatform platform, Map<String, String> rep
" values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?) ");

putSql("selectFileSnapshotSql",
" select trigger_id, router_id, relative_dir, file_name, last_event_type, crc32_checksum, " +
" select trigger_id, router_id, channel_id, relative_dir, file_name, " +
" last_event_type, crc32_checksum, " +
" file_size, file_modified_time, create_time, last_update_time, last_update_by " +
" from $(file_snapshot) where trigger_id=? and router_id=? ");

putSql("updateFileSnapshotSql",
" update $(file_snapshot) set " +
" last_event_type=?, crc32_checksum=?, " +
" file_size=?, file_modified_time=?, last_update_time=?, " +
" last_update_by=? " +
" last_update_by=?, channel_id=? " +
" where " +
" trigger_id=? and router_id=? and relative_dir=? and file_name=? ");

Expand Down Expand Up @@ -102,8 +103,8 @@ public FileSyncServiceSqlMap(IDatabasePlatform platform, Map<String, String> rep
" insert into $(file_snapshot) ( " +
" last_event_type, crc32_checksum, " +
" file_size, file_modified_time, create_time, last_update_time, " +
" last_update_by, trigger_id, router_id, relative_dir, file_name " +
" ) values(?,?,?,?,?,?,?,?,?,?,?) ");
" last_update_by, channel_id, trigger_id, router_id, relative_dir, file_name " +
" ) values(?,?,?,?,?,?,?,?,?,?,?, ?) ");

putSql("selectFileTriggerRoutersSql",
" select " +
Expand Down
Expand Up @@ -393,7 +393,9 @@ protected Trigger buildTriggerForSymmetricTable(String tableName) {
trigger.setChannelId(Constants.CHANNEL_HEARTBEAT);
} else if (TableConstants.getTableName(tablePrefix, TableConstants.SYM_FILE_SNAPSHOT)
.equals(tableName)) {
trigger.setChannelId(Constants.CHANNEL_FILESYNC);
trigger.setChannelId(Constants.CHANNEL_DYNAMIC);
trigger.setChannelExpression("$(curTriggerValue).$(curColumnPrefix)" + platform.alterCaseToMatchDatabaseDefaultCase("channel_id"));
trigger.setReloadChannelId(Constants.CHANNEL_FILESYNC_RELOAD);
trigger.setUseCaptureOldData(true);
trigger.setSyncOnIncomingBatch(false);
boolean syncEnabled = parameterService.is(ParameterConstants.FILE_SYNC_ENABLE);
Expand Down

0 comments on commit cb7400a

Please sign in to comment.