Skip to content

Commit

Permalink
0001581: Add support for multiple channels for file sync
Browse files Browse the repository at this point in the history
  • Loading branch information
chenson42 committed Mar 24, 2014
1 parent f7abf64 commit cdb7e56
Show file tree
Hide file tree
Showing 7 changed files with 36 additions and 17 deletions.
Expand Up @@ -66,6 +66,7 @@ public static LastEventType fromCode(String code) {
private String fileName;
private LastEventType lastEventType;
private String channelId;
private String reloadChannelId;
private long crc32Checksum;
private long oldCrc32Checksum;
private long fileSize;
Expand All @@ -90,12 +91,14 @@ public FileSnapshot(FileSnapshot copy) {
this.lastUpdateBy = copy.lastUpdateBy;
this.lastUpdateTime = copy.lastUpdateTime;
this.channelId = copy.channelId;
this.reloadChannelId = copy.getReloadChannelId();
}

public FileSnapshot(FileTriggerRouter fileTriggerRouter, File file, LastEventType lastEventType) {
boolean isDelete = lastEventType == LastEventType.DELETE;
this.triggerId = fileTriggerRouter.getFileTrigger().getTriggerId();
this.channelId = fileTriggerRouter.getFileTrigger().getChannelId();
this.reloadChannelId = fileTriggerRouter.getFileTrigger().getReloadChannelId();
this.routerId = fileTriggerRouter.getRouter().getRouterId();
this.lastEventType = lastEventType;
this.lastUpdateTime = new Date();
Expand Down Expand Up @@ -164,6 +167,14 @@ public void setChannelId(String channelId) {
public String getChannelId() {
return channelId;
}

public void setReloadChannelId(String reloadChannelId) {
this.reloadChannelId = reloadChannelId;
}

public String getReloadChannelId() {
return reloadChannelId;
}

public String getRelativeDir() {
return relativeDir;
Expand Down
Expand Up @@ -211,7 +211,8 @@ public void extractConfigurationStandalone(Node targetNode, Writer writer,

for (int i = triggerRouters.size() - 1; i >= 0; i--) {
TriggerRouter triggerRouter = triggerRouters.get(i);
if (!triggerRouter.getTrigger().getChannelId().equals(Constants.CHANNEL_FILESYNC)) {
String channelId = triggerRouter.getTrigger().getChannelId();
if (Constants.CHANNEL_CONFIG.equals(channelId) || Constants.CHANNEL_HEARTBEAT.equals(channelId)) {
TriggerHistory triggerHistory = triggerRouterService
.getNewestTriggerHistoryForTrigger(triggerRouter.getTrigger()
.getTriggerId(), null, null, triggerRouter.getTrigger()
Expand Down Expand Up @@ -244,7 +245,8 @@ public void extractConfigurationStandalone(Node targetNode, Writer writer,

for (int i = 0; i < triggerRouters.size(); i++) {
TriggerRouter triggerRouter = triggerRouters.get(i);
if (!triggerRouter.getTrigger().getChannelId().equals(Constants.CHANNEL_FILESYNC)) {
String channelId = triggerRouter.getTrigger().getChannelId();
if (Constants.CHANNEL_CONFIG.equals(channelId) || Constants.CHANNEL_HEARTBEAT.equals(channelId)) {
TriggerHistory triggerHistory = triggerRouterService
.getNewestTriggerHistoryForTrigger(triggerRouter.getTrigger()
.getTriggerId(), null, null, null);
Expand Down
Expand Up @@ -590,7 +590,7 @@ private void insertFileSyncBatchForReload(Node targetNode, long loadId, String c
for (Channel channel : channels) {
if (channel.isReloadFlag()) {
insertReloadEvent(transaction, targetNode, fileSyncSnapshotTriggerRouter,
fileSyncSnapshotHistory, "channel_id='" + channel.getChannelId() + "'",
fileSyncSnapshotHistory, "reload_channel_id='" + channel.getChannelId() + "'",
true, loadId, createBy, Status.NE, channel.getChannelId());
if (!transactional) {
transaction.commit();
Expand Down
Expand Up @@ -336,9 +336,10 @@ public void save(ISqlTransaction sqlTransaction, FileSnapshot snapshot) {
new Object[] { snapshot.getLastEventType().getCode(), snapshot.getCrc32Checksum(),
snapshot.getFileSize(), snapshot.getFileModifiedTime(),
snapshot.getLastUpdateTime(), snapshot.getLastUpdateBy(), snapshot.getChannelId(),
snapshot.getReloadChannelId(),
snapshot.getTriggerId(), snapshot.getRouterId(), snapshot.getRelativeDir(),
snapshot.getFileName() }, new int[] { Types.VARCHAR, Types.NUMERIC,
Types.NUMERIC, Types.NUMERIC, Types.TIMESTAMP, Types.VARCHAR, Types.VARCHAR,
Types.NUMERIC, Types.NUMERIC, Types.TIMESTAMP, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR,
Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR })) {
snapshot.setCreateTime(snapshot.getLastUpdateTime());
sqlTransaction.prepareAndExecute(
Expand All @@ -347,10 +348,11 @@ public void save(ISqlTransaction sqlTransaction, FileSnapshot snapshot) {
snapshot.getCrc32Checksum(), snapshot.getFileSize(),
snapshot.getFileModifiedTime(), snapshot.getCreateTime(),
snapshot.getLastUpdateTime(), snapshot.getLastUpdateBy(), snapshot.getChannelId(),
snapshot.getReloadChannelId(),
snapshot.getTriggerId(), snapshot.getRouterId(),
snapshot.getRelativeDir(), snapshot.getFileName() }, new int[] {
Types.VARCHAR, Types.NUMERIC, Types.NUMERIC, Types.NUMERIC,
Types.TIMESTAMP, Types.TIMESTAMP, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR,
Types.TIMESTAMP, Types.TIMESTAMP, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR,
Types.VARCHAR, Types.VARCHAR, Types.VARCHAR });
}
// now that we have captured an update, delete the row for cleanup
Expand Down Expand Up @@ -905,6 +907,7 @@ public FileSnapshot mapRow(Row rs) {
fileSnapshot.setCrc32Checksum(rs.getLong("crc32_checksum"));
fileSnapshot.setCreateTime(rs.getDateTime("create_time"));
fileSnapshot.setChannelId(rs.getString("channel_id"));
fileSnapshot.setReloadChannelId(rs.getString("reload_channel_id"));
fileSnapshot.setLastUpdateBy(rs.getString("last_update_by"));
fileSnapshot.setLastUpdateTime(rs.getDateTime("last_update_time"));
fileSnapshot.setFileModifiedTime(rs.getLong("file_modified_time"));
Expand Down
Expand Up @@ -63,7 +63,7 @@ public FileSyncServiceSqlMap(IDatabasePlatform platform, Map<String, String> rep
" values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?) ");

putSql("selectFileSnapshotSql",
" select trigger_id, router_id, channel_id, relative_dir, file_name, " +
" select trigger_id, router_id, channel_id, reload_channel_id, relative_dir, file_name, " +
" last_event_type, crc32_checksum, " +
" file_size, file_modified_time, create_time, last_update_time, last_update_by " +
" from $(file_snapshot) where trigger_id=? and router_id=? ");
Expand All @@ -72,7 +72,7 @@ public FileSyncServiceSqlMap(IDatabasePlatform platform, Map<String, String> rep
" update $(file_snapshot) set " +
" last_event_type=?, crc32_checksum=?, " +
" file_size=?, file_modified_time=?, last_update_time=?, " +
" last_update_by=?, channel_id=? " +
" last_update_by=?, channel_id=?, reload_channel_id=? " +
" where " +
" trigger_id=? and router_id=? and relative_dir=? and file_name=? ");

Expand Down Expand Up @@ -103,8 +103,8 @@ public FileSyncServiceSqlMap(IDatabasePlatform platform, Map<String, String> rep
" insert into $(file_snapshot) ( " +
" last_event_type, crc32_checksum, " +
" file_size, file_modified_time, create_time, last_update_time, " +
" last_update_by, channel_id, trigger_id, router_id, relative_dir, file_name " +
" ) values(?,?,?,?,?,?,?,?,?,?,?, ?) ");
" last_update_by, channel_id, reload_channel_id, trigger_id, router_id, relative_dir, file_name " +
" ) values(?,?,?,?,?,?,?,?,?,?,?,?,?) ");

putSql("selectFileTriggerRoutersSql",
" select " +
Expand Down
5 changes: 3 additions & 2 deletions symmetric-core/src/main/resources/symmetric-schema.xml
Expand Up @@ -138,6 +138,7 @@
<column name="relative_dir" type="VARCHAR" size="255" required="true" primaryKey="true" description="The path to the file starting at the base_dir" />
<column name="file_name" type="VARCHAR" size="128" required="true" primaryKey="true" description="The name of the file that changed." />
<column name="channel_id" type="VARCHAR" size="20" required="true" default="filesync" description="The channel_id of the channel that data changes will flow through." />
<column name="reload_channel_id" type="VARCHAR" size="20" required="true" default="filesync_reload" description="The channel_id of the channel that data changes will flow through." />
<column name="last_event_type" type="CHAR" size="1" required="true" description="The type of event captured by this entry. 'C' is for create, 'M' is for modified, and 'D' is for deleted." />
<column name="crc32_checksum" type="BIGINT" description="File checksum. Can be used to determine if file content has changed." />
<column name="file_size" type="BIGINT" description="The size in bytes of the file at the time this change was detected." />
Expand All @@ -146,14 +147,14 @@
<column name="last_update_by" type="VARCHAR" size="50" description="The user who last updated this entry." />
<column name="create_time" type="TIMESTAMP" required="true" description="Timestamp when this entry was created." />
<index name="idx_f_snpsht_chid">
<index-column name="channel_id" />
<index-column name="reload_channel_id" />
</index>
</table>

<table name="file_trigger" description="This table defines files or sets of files for which changes will be captured for file synchronization">
<column name="trigger_id" type="VARCHAR" size="50" required="true" primaryKey="true" description="Unique identifier for a trigger." />
<column name="channel_id" type="VARCHAR" size="20" required="true" default="filesync" description="The channel_id of the channel that data changes will flow through." />
<column name="reload_channel_id" type="VARCHAR" size="20" required="true" default="reload" description="The channel_id of the channel that will be used for reloads." />
<column name="reload_channel_id" type="VARCHAR" size="20" required="true" default="filesync_reload" description="The channel_id of the channel that will be used for reloads." />
<column name="base_dir" type="VARCHAR" size="255" required="true" description="The base directory on the client that will be synchronized." />
<column name="recurse" type="BOOLEANINT" size="1" required="true" default="1" description="Whether to synchronize child directories." />
<column name="includes_files" type="VARCHAR" size="255" description="Wildcard-enabled, comma-separated list of file to include in synchronization." />
Expand Down
Expand Up @@ -84,7 +84,7 @@ catalog,
schema,
table,SYM_CHANNEL
keys,CHANNEL_ID
columns,CHANNEL_ID,PROCESSING_ORDER,MAX_BATCH_SIZE,MAX_BATCH_TO_SEND,MAX_DATA_TO_ROUTE,EXTRACT_PERIOD_MILLIS,ENABLED,USE_OLD_DATA_TO_ROUTE,USE_ROW_DATA_TO_ROUTE,USE_PK_DATA_TO_ROUTE,CONTAINS_BIG_LOB,BATCH_ALGORITHM,DATA_LOADER_TYPE,DESCRIPTION,CREATE_TIME,LAST_UPDATE_BY,LAST_UPDATE_TIME
columns,CHANNEL_ID,PROCESSING_ORDER,MAX_BATCH_SIZE,MAX_BATCH_TO_SEND,MAX_DATA_TO_ROUTE,EXTRACT_PERIOD_MILLIS,ENABLED,USE_OLD_DATA_TO_ROUTE,USE_ROW_DATA_TO_ROUTE,USE_PK_DATA_TO_ROUTE,RELOAD_FLAG,FILE_SYNC_FLAG,CONTAINS_BIG_LOB,BATCH_ALGORITHM,DATA_LOADER_TYPE,DESCRIPTION,CREATE_TIME,LAST_UPDATE_BY,LAST_UPDATE_TIME
sql,delete from sym_channel
catalog,
schema,
Expand Down Expand Up @@ -114,11 +114,13 @@ insert,"server","client","W",,,
catalog,
schema,
table,SYM_CHANNEL
insert,"config","0","2000","100","10000","0","1","1","1","1","1","default","default",,"2013-05-19 10:00:36.654",,"2013-05-19 10:00:36.648"
insert,"reload","1","1","1","10000","0","1","1","1","1","0","default","default",,"2013-05-19 10:00:36.656",,"2013-05-19 10:00:36.656"
insert,"heartbeat","2","100","100","10000","0","1","1","1","1","0","default","default",,"2013-05-19 10:00:36.657",,"2013-05-19 10:00:36.657"
insert,"default","99999","1000","100","10000","0","1","1","1","1","0","default","default",,"2013-05-19 10:00:36.658",,"2013-05-19 10:00:36.658"
insert,"filesync","3","100","100","10000","0","1","1","1","1","0","default","default",,"2013-05-19 10:00:36.659",,"2013-05-19 10:00:36.659"
insert,"config","0","2000","100","10000","0","1","1","1","1","0","0","1","default","default",,"2014-03-21 13:07:51.515",,"2014-03-21 13:07:51.514"
insert,"reload","1","1","1","10000","0","1","1","1","1","1","0","0","default","default",,"2014-03-21 13:07:51.516",,"2014-03-21 13:07:51.515"
insert,"heartbeat","2","100","100","10000","0","1","1","1","1","0","0","0","default","default",,"2014-03-21 13:07:51.517",,"2014-03-21 13:07:51.516"
insert,"default","99999","1000","100","10000","0","1","1","1","1","0","0","0","default","default",,"2014-03-21 13:07:51.517",,"2014-03-21 13:07:51.517"
insert,"filesync","3","100","100","10000","0","1","1","1","1","0","1","0","nontransactional","default",,"2014-03-21 13:07:51.518",,"2014-03-21 13:07:51.518"
insert,"dynamic","99999","1000","100","10000","0","1","1","1","1","0","0","0","default","default",,"2014-03-22 18:48:35.485",,"2014-03-22 18:48:35.482"
insert,"filesync_reload","1","100","100","10000","0","1","1","1","1","1","1","0","nontransactional","default",,"2014-03-22 18:48:35.497",,"2014-03-22 18:48:35.497"
catalog,
schema,
table,SYM_ROUTER
Expand Down

0 comments on commit cdb7e56

Please sign in to comment.