diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/AbstractSymmetricEngine.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/AbstractSymmetricEngine.java index 449e604ac8..f3983339ee 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/AbstractSymmetricEngine.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/AbstractSymmetricEngine.java @@ -302,9 +302,9 @@ protected void init() { outgoingBatchService, registrationService, stagingManager, this); this.pushService = new PushService(parameterService, symmetricDialect, dataExtractorService, acknowledgeService, transportManager, nodeService, - clusterService, nodeCommunicationService, statisticManager); + clusterService, nodeCommunicationService, statisticManager, configurationService); this.pullService = new PullService(parameterService, symmetricDialect, nodeService, - dataLoaderService, registrationService, clusterService, nodeCommunicationService); + dataLoaderService, registrationService, clusterService, nodeCommunicationService, configurationService); this.fileSyncService = new FileSyncService(this); this.jobManager = createJobManager(); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/Channel.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/Channel.java index 98ce6a3dc7..62c4f943aa 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/Channel.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/Channel.java @@ -24,6 +24,8 @@ import java.util.Collection; import java.util.Date; +import org.jumpmind.symmetric.common.Constants; + /** * Definition of a channel and it's priority. A channel is a group of tables * that get synchronized together. @@ -63,6 +65,10 @@ public class Channel implements Serializable { private Date lastUpdateTime; private String lastUpdateBy; + + private boolean reloadFlag = false; + + private boolean fileSyncFlag = false; public Channel() { } @@ -73,10 +79,17 @@ public Channel(String id, int processingOrder) { } public Channel(String id, int processingOrder, int maxBatchSize, int maxBatchToSend, boolean enabled, - long extractPeriodMillis, boolean containsBigLobs, String batchAlgorithm) { - this(id, processingOrder, maxBatchSize, maxBatchToSend, enabled, extractPeriodMillis, containsBigLobs); + long extractPeriodMillis, boolean containsBigLobs, String batchAlgorithm, boolean reloadFlag, boolean filesyncFlag) { + this(id, processingOrder, maxBatchSize, maxBatchToSend, enabled, extractPeriodMillis, containsBigLobs, reloadFlag, filesyncFlag); this.batchAlgorithm = batchAlgorithm; } + + public Channel(String id, int processingOrder, int maxBatchSize, int maxBatchToSend, boolean enabled, + long extractPeriodMillis, boolean containsBigLobs, boolean reloadFlag, boolean filesyncFlag) { + this(id, processingOrder, maxBatchSize, maxBatchToSend, enabled, extractPeriodMillis, containsBigLobs); + this.reloadFlag = reloadFlag; + this.fileSyncFlag = filesyncFlag; + } public Channel(String id, int processingOrder, int maxBatchSize, int maxBatchToSend, boolean enabled, long extractPeriodMillis, boolean containsBigLobs) { @@ -143,14 +156,19 @@ public void setMaxBatchToSend(int maxBatchToSend) { * @return true if a match is found */ public boolean isInList(Collection channels) { + return findInList(channels) != null; + } + + + public Channel findInList(Collection channels) { if (channels != null) { for (NodeChannel channel : channels) { if (channel.getChannelId().equals(channelId)) { - return true; + return channel.getChannel(); } } } - return false; + return null; } public void setBatchAlgorithm(String batchAlgorithm) { @@ -231,6 +249,22 @@ public Date getLastUpdateTime() { public void setLastUpdateTime(Date lastUpdateTime) { this.lastUpdateTime = lastUpdateTime; + } + + public void setFileSyncFlag(boolean filesyncFlag) { + this.fileSyncFlag = filesyncFlag; + } + + public boolean isFileSyncFlag() { + return fileSyncFlag || Constants.CHANNEL_FILESYNC.equals(channelId); + } + + public void setReloadFlag(boolean reloadFlag) { + this.reloadFlag = reloadFlag; + } + + public boolean isReloadFlag() { + return reloadFlag || Constants.CHANNEL_RELOAD.equals(channelId); } @Override diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/FileTrigger.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/FileTrigger.java index 63f70b0dd5..991e717cec 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/FileTrigger.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/FileTrigger.java @@ -33,25 +33,44 @@ import org.apache.commons.io.filefilter.OrFileFilter; import org.apache.commons.io.filefilter.WildcardFileFilter; import org.apache.commons.lang.StringUtils; +import org.jumpmind.symmetric.common.Constants; public class FileTrigger implements Serializable { private static final long serialVersionUID = 1L; private String triggerId; + + private String channelId = Constants.CHANNEL_FILESYNC; + + private String reloadChannelId = Constants.CHANNEL_FILESYNC; + private String baseDir; + private boolean recurse; + private String includesFiles; + private String excludesFiles; + private boolean syncOnCreate = true; + private boolean syncOnModified = true; + private boolean syncOnDelete = true; + private boolean syncOnCtlFile = false; + private boolean deleteAfterSync = false; + private String beforeCopyScript; + private String afterCopyScript; + private Date createTime = new Date(); + private String lastUpdateBy; + private Date lastUpdateTime; public FileTrigger() { @@ -73,7 +92,23 @@ public String getTriggerId() { public void setTriggerId(String triggerId) { this.triggerId = triggerId; } + + public String getChannelId() { + return channelId; + } + + public void setChannelId(String channelId) { + this.channelId = channelId; + } + public String getReloadChannelId() { + return reloadChannelId; + } + + public void setReloadChannelId(String reloadChannelId) { + this.reloadChannelId = reloadChannelId; + } + public String getBaseDir() { return baseDir; } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/NodeChannel.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/NodeChannel.java index 494c2b42d0..dc7ebbac25 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/NodeChannel.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/NodeChannel.java @@ -202,5 +202,13 @@ public void setDataLoaderType(String type) { public String getDataLoaderType() { return channel.getDataLoaderType(); } + + public void setReloadFlag(boolean value) { + this.channel.setReloadFlag(value); + } + + public void setFileSyncFlag(boolean value) { + this.channel.setFileSyncFlag(value); + } } \ No newline at end of file diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/RemoteNodeStatus.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/RemoteNodeStatus.java index dbd9ebce9d..7c59a7d83f 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/RemoteNodeStatus.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/RemoteNodeStatus.java @@ -22,8 +22,7 @@ import java.io.Serializable; import java.util.List; - -import org.jumpmind.symmetric.common.Constants; +import java.util.Map; /** * Indicates the status of an attempt to transport data from or to a remove @@ -43,10 +42,12 @@ public static enum Status { private long batchesProcessed; private long reloadBatchesProcessed; private boolean complete = false; + private Map channels; - public RemoteNodeStatus(String nodeId) { + public RemoteNodeStatus(String nodeId, Map channels) { this.status = Status.NO_DATA; this.nodeId = nodeId; + this.channels = channels; } public boolean failed() { @@ -110,7 +111,8 @@ public void updateOutgoingStatus(List outgoingBatches, List { private static final long serialVersionUID = 1L; + + Map channels; + + public RemoteNodeStatuses(Map channels) { + this.channels = channels; + } public boolean wasDataProcessed() { boolean dataProcessed = false; @@ -64,7 +71,7 @@ public boolean errorOccurred() { public RemoteNodeStatus add(String nodeId) { RemoteNodeStatus status = null; if (nodeId != null) { - status = new RemoteNodeStatus(nodeId); + status = new RemoteNodeStatus(nodeId, channels); add(status); } return status; diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/Trigger.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/Trigger.java index 3d16c56cf9..1bfc79baea 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/Trigger.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/Trigger.java @@ -57,6 +57,8 @@ public class Trigger implements Serializable { private String sourceCatalogName; private String channelId = Constants.CHANNEL_DEFAULT; + + private String reloadChannelId = Constants.CHANNEL_RELOAD; private boolean syncOnUpdate = true; @@ -282,6 +284,14 @@ public String getChannelId() { public void setChannelId(String channelId) { this.channelId = channelId; } + + public String getReloadChannelId() { + return reloadChannelId; + } + + public void setReloadChannelId(String reloadChannelId) { + this.reloadChannelId = reloadChannelId; + } public boolean isSyncOnUpdate() { return syncOnUpdate; diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IConfigurationService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IConfigurationService.java index e305cf9573..69070782a1 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IConfigurationService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IConfigurationService.java @@ -79,7 +79,9 @@ public interface IConfigurationService { public NodeChannel getNodeChannel(String channelId, boolean refreshExtractMillis); - public Channel getChannel (String channelId); + public Channel getChannel (String channelId); + + public List getFileSyncChannels(); public Map getChannels(boolean refreshCache); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IFileSyncService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IFileSyncService.java index cf071dfb70..80d44619cc 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IFileSyncService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/IFileSyncService.java @@ -35,7 +35,7 @@ import org.jumpmind.symmetric.transport.IOutgoingTransport; public interface IFileSyncService { - + public void trackChanges(boolean force); public List getFileTriggers(); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/AcknowledgeService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/AcknowledgeService.java index b487f28ff7..5be4ee39b8 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/AcknowledgeService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/AcknowledgeService.java @@ -32,6 +32,7 @@ import org.jumpmind.symmetric.io.stage.IStagingManager; import org.jumpmind.symmetric.model.BatchAck; import org.jumpmind.symmetric.model.BatchAckResult; +import org.jumpmind.symmetric.model.Channel; import org.jumpmind.symmetric.model.OutgoingBatch; import org.jumpmind.symmetric.model.OutgoingBatch.Status; import org.jumpmind.symmetric.service.IAcknowledgeService; @@ -130,11 +131,11 @@ public BatchAckResult ack(final BatchAck batch) { } } - //TODO: I should really be able to catch errors here, but can't do to how this is coded outgoingBatchService.updateOutgoingBatch(outgoingBatch); if (status == Status.OK) { - if (outgoingBatch.getChannelId().equals(Constants.CHANNEL_FILESYNC)){ - //Acknowledge the file_sync in case the file needs deleted. + Channel channel = engine.getConfigurationService().getChannel(outgoingBatch.getChannelId()); + if (channel != null && channel.isFileSyncFlag()){ + /* Acknowledge the file_sync in case the file needs deleted. */ engine.getFileSyncService().acknowledgeFiles(outgoingBatch); } } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/ConfigurationService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/ConfigurationService.java index 54ad51a14e..567c15c166 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/ConfigurationService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/ConfigurationService.java @@ -68,11 +68,11 @@ public ConfigurationService(IParameterService parameterService, ISymmetricDialec this.nodeService = nodeService; this.defaultChannels = new ArrayList(); this.defaultChannels.add(new Channel(Constants.CHANNEL_CONFIG, 0, 2000, 100, true, 0, true)); - this.defaultChannels.add(new Channel(Constants.CHANNEL_RELOAD, 1, 1, 1, true, 0, false)); + this.defaultChannels.add(new Channel(Constants.CHANNEL_RELOAD, 1, 1, 1, true, 0, false, true, false)); this.defaultChannels.add(new Channel(Constants.CHANNEL_HEARTBEAT, 2, 100, 100, true, 0, false)); this.defaultChannels.add(new Channel(Constants.CHANNEL_DEFAULT, 99999, 1000, 100, true, 0, false)); if (parameterService.is(ParameterConstants.FILE_SYNC_ENABLE)) { - this.defaultChannels.add(new Channel(Constants.CHANNEL_FILESYNC, 3, 100, 100, true, 0, false, "nontransactional")); + this.defaultChannels.add(new Channel(Constants.CHANNEL_FILESYNC, 3, 100, 100, true, 0, false, "nontransactional", false, true)); } setSqlMap(new ConfigurationServiceSqlMap(symmetricDialect.getPlatform(), createSqlReplacementTokens())); @@ -186,7 +186,8 @@ public void saveChannel(Channel channel, boolean reloadChannels) { channel.isUsePkDataToRoute() ? 1 : 0, channel.isContainsBigLob() ? 1 : 0, channel.isEnabled() ? 1 : 0, channel.getBatchAlgorithm(), channel.getExtractPeriodMillis(), channel.getDataLoaderType(), channel.getLastUpdateTime(), - channel.getLastUpdateBy(), + channel.getLastUpdateBy(), channel.isReloadFlag() ? 1 : 0, + channel.isFileSyncFlag() ? 1 : 0, channel.getChannelId() })) { channel.setCreateTime(new Date()); sqlTemplate.update( @@ -199,7 +200,9 @@ public void saveChannel(Channel channel, boolean reloadChannels) { channel.isContainsBigLob() ? 1 : 0, channel.isEnabled() ? 1 : 0, channel.getBatchAlgorithm(), channel.getExtractPeriodMillis(), channel.getDataLoaderType(), channel.getLastUpdateTime(), - channel.getLastUpdateBy(), channel.getCreateTime()}); + channel.getLastUpdateBy(), channel.getCreateTime() + , channel.isReloadFlag() ? 1 : 0, + channel.isFileSyncFlag() ? 1 : 0,}); } if (reloadChannels) { clearCache(); @@ -313,6 +316,8 @@ public NodeChannel mapRow(Row row) { nodeChannel.setCreateTime(row.getDateTime("create_time")); nodeChannel.setLastUpdateBy(row.getString("last_update_by")); nodeChannel.setLastUpdateTime(row.getDateTime("last_update_time")); + nodeChannel.setFileSyncFlag(row.getBoolean("file_sync_flag")); + nodeChannel.setReloadFlag(row.getBoolean("reload_flag")); return nodeChannel; }; }, nodeId); @@ -374,9 +379,16 @@ public void initDefaultChannels() { clearCache(); List channels = getNodeChannels(false); for (Channel defaultChannel : defaultChannels) { - if (!defaultChannel.isInList(channels)) { + Channel channel = defaultChannel.findInList(channels); + if (channel == null) { log.info("Auto-configuring {} channel", defaultChannel.getChannelId()); saveChannel(defaultChannel, true); + } else if (channel.getChannelId().equals(Constants.CHANNEL_RELOAD) && !channel.isReloadFlag()) { + channel.setReloadFlag(true); + saveChannel(channel, true); + } else if (channel.getChannelId().equals(Constants.CHANNEL_FILESYNC) && !channel.isFileSyncFlag()) { + channel.setFileSyncFlag(true); + saveChannel(channel, true); } else { log.debug("No need to create channel {}. It already exists", defaultChannel.getChannelId()); @@ -409,6 +421,18 @@ public ChannelMap getSuspendIgnoreChannelLists(final String nodeId) { } return map; } + + public List getFileSyncChannels() { + List list = new ArrayList(getChannels(false).values()); + Iterator it = list.iterator(); + while (it.hasNext()) { + Channel channel = it.next(); + if (!channel.isFileSyncFlag()) { + it.remove(); + } + } + return list; + } public Map getChannels(boolean refreshCache) { long channelCacheTimeoutInMs = parameterService.getLong( @@ -445,6 +469,8 @@ public Channel mapRow(Row row) { channel.setCreateTime(row.getDateTime("create_time")); channel.setLastUpdateBy(row.getString("last_update_by")); channel.setLastUpdateTime(row.getDateTime("last_update_time")); + channel.setReloadFlag(row.getBoolean("reload_flag")); + channel.setFileSyncFlag(row.getBoolean("file_sync_flag")); return channel; } }); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/ConfigurationServiceSqlMap.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/ConfigurationServiceSqlMap.java index 7cb9760b5a..d5f5bec090 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/ConfigurationServiceSqlMap.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/ConfigurationServiceSqlMap.java @@ -66,19 +66,19 @@ public ConfigurationServiceSqlMap(IDatabasePlatform platform, putSql("isChannelInUseSql", "select count(*) from $(trigger) where channel_id = ? "); putSql("selectChannelsSql", - "select c.channel_id, c.processing_order, c.max_batch_size, c.enabled, " + - " c.max_batch_to_send, c.max_data_to_route, c.use_old_data_to_route, " + - " c.use_row_data_to_route, c.use_pk_data_to_route, c.contains_big_lob, " + - " c.batch_algorithm, c.extract_period_millis, c.data_loader_type, " + - " c.last_update_time, c.last_update_by, c.create_time " + - " from $(channel) c order by c.processing_order asc, c.channel_id "); + "select c.channel_id, c.processing_order, c.max_batch_size, c.enabled, " + + " c.max_batch_to_send, c.max_data_to_route, c.use_old_data_to_route, " + + " c.use_row_data_to_route, c.use_pk_data_to_route, c.contains_big_lob, " + + " c.batch_algorithm, c.extract_period_millis, c.data_loader_type, " + + " c.last_update_time, c.last_update_by, c.create_time, c.reload_flag, c.file_sync_flag " + + " from $(channel) c order by c.processing_order asc, c.channel_id "); putSql("selectNodeChannelsSql", "select c.channel_id, nc.node_id, nc.ignore_enabled, nc.suspend_enabled, c.processing_order, " + " c.max_batch_size, c.enabled, c.max_batch_to_send, c.max_data_to_route, c.use_old_data_to_route, " + " c.use_row_data_to_route, c.use_pk_data_to_route, c.contains_big_lob, c.batch_algorithm, " - + " nc.last_extract_time, c.extract_period_millis, c.data_loader_type," + - " last_update_time, last_update_by, create_time " + + " nc.last_extract_time, c.extract_period_millis, c.data_loader_type, " + + " last_update_time, last_update_by, create_time, c.reload_flag, c.file_sync_flag " + " from $(channel) c left outer join " + " $(node_channel_ctl) nc on c.channel_id = nc.channel_id and nc.node_id = ? " + " order by c.processing_order asc, c.channel_id "); @@ -92,15 +92,16 @@ public ConfigurationServiceSqlMap(IDatabasePlatform platform, "insert into $(channel) (channel_id, processing_order, max_batch_size, " + " max_batch_to_send, max_data_to_route, use_old_data_to_route, use_row_data_to_route, " + " use_pk_data_to_route, contains_big_lob, enabled, batch_algorithm, description, " - + " extract_period_millis, data_loader_type, last_update_time, last_update_by, create_time) " - + " values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, null, ?, ?, ?, ?, ?) "); + + " extract_period_millis, data_loader_type, last_update_time, last_update_by, " + + " create_time, reload_flag, file_sync_flag) " + + " values (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, ?, null, ?, ?, ?, ?, ?, ?, ?) "); putSql("updateChannelSql", - "update $(channel) set processing_order=?, max_batch_size=?, " - + " max_batch_to_send=?, max_data_to_route=?, use_old_data_to_route=?, use_row_data_to_route=?, " + "update $(channel) set processing_order=?, max_batch_size=?, " + + " max_batch_to_send=?, max_data_to_route=?, use_old_data_to_route=?, use_row_data_to_route=?, " + " use_pk_data_to_route=?, contains_big_lob=?, enabled=?, batch_algorithm=?, extract_period_millis=?, " - + " data_loader_type=?, last_update_time=?, last_update_by=?" - + " where channel_id=? "); + + " data_loader_type=?, last_update_time=?, last_update_by=?, reload_flag=?, file_sync_flag=? " + + " where channel_id=? "); putSql("deleteNodeGroupLinkSql", "delete from $(node_group_link) where source_node_group_id=? and target_node_group_id=? "); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java index 084696c929..09a20b2b0a 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java @@ -305,7 +305,10 @@ private List filterBatchesForExtraction(OutgoingBatches batches, ChannelMap suspendIgnoreChannelsList) { if (parameterService.is(ParameterConstants.FILE_SYNC_ENABLE)) { - batches.filterBatchesForChannel(Constants.CHANNEL_FILESYNC); + List fileSyncChannels = configurationService.getFileSyncChannels(); + for (Channel channel : fileSyncChannels) { + batches.filterBatchesForChannel(channel); + } } // We now have either our local suspend/ignore list, or the combined @@ -929,7 +932,7 @@ protected Table lookupAndOrderColumnsAccordingToTriggerHistory(String routerId, } public RemoteNodeStatuses queueWork(boolean force) { - final RemoteNodeStatuses statuses = new RemoteNodeStatuses(); + final RemoteNodeStatuses statuses = new RemoteNodeStatuses(configurationService.getChannels(false)); Node identity = nodeService.findIdentity(); if (identity != null) { if (force || clusterService.lock(ClusterConstants.INITIAL_LOAD_EXTRACT)) { diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataLoaderService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataLoaderService.java index 13dd5bc7a1..609c9ea1ae 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataLoaderService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataLoaderService.java @@ -228,8 +228,8 @@ public List loadDataBatch(String batchData){ * Connect to the remote node and pull data. The acknowledgment of * commit/error status is sent separately after the data is processed. */ - public RemoteNodeStatus loadDataFromPull(Node remote) throws IOException { - RemoteNodeStatus status = new RemoteNodeStatus(remote != null ? remote.getNodeId() : null); + public RemoteNodeStatus loadDataFromPull(Node remote) throws IOException { + RemoteNodeStatus status = new RemoteNodeStatus(remote != null ? remote.getNodeId() : null, configurationService.getChannels(false)); loadDataFromPull(remote, status); return status; } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java index 74b6ced807..064bdd8a70 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java @@ -136,7 +136,7 @@ public boolean insertReloadEvent(TableReloadRequest request, boolean deleteAtCli transaction, triggerHistory, trigger.getChannelId(), - targetNode, + targetNode, String.format( "delete from %s where target_node_id='%s' and source_node_id='%s' and trigger_id='%s' and router_id='%s'", TableConstants @@ -258,7 +258,7 @@ public long insertReloadEvent(ISqlTransaction transaction, Node targetNode, triggerHistory = lookupTriggerHistory(triggerRouter.getTrigger()); } - String channelId = getChannelIdForTrigger(triggerRouter.getTrigger()); + String channelId = getReloadChannelIdForTrigger(triggerRouter.getTrigger(), engine.getConfigurationService().getChannels(false)); // initial_load_select for table can be overridden by populating the // row_data @@ -274,11 +274,22 @@ public long insertReloadEvent(ISqlTransaction transaction, Node targetNode, } } - private String getChannelIdForTrigger(Trigger trigger) { - String channelId = trigger.getChannelId(); - if (!Constants.CHANNEL_FILESYNC.equals(trigger.getChannelId()) && - parameterService.is(ParameterConstants.INITIAL_LOAD_USE_RELOAD_CHANNEL)) { - channelId = Constants.CHANNEL_RELOAD; + private String getReloadChannelIdForTrigger(Trigger trigger, Map channels) { + String channelId = trigger != null ? trigger.getChannelId() : Constants.CHANNEL_DEFAULT; + if (parameterService.is(ParameterConstants.INITIAL_LOAD_USE_RELOAD_CHANNEL)) { + Channel normalChannel = channels.get(channelId); + Channel reloadChannel = channels.get(trigger != null ? trigger.getReloadChannelId() : Constants.CHANNEL_RELOAD); + if (normalChannel.isFileSyncFlag()) { + if (reloadChannel != null && reloadChannel.isFileSyncFlag()) { + channelId = reloadChannel.getChannelId(); + } + } else { + if (reloadChannel != null && reloadChannel.isReloadFlag()) { + channelId = reloadChannel.getChannelId(); + } else { + channelId = Constants.CHANNEL_RELOAD; + } + } } return channelId; } @@ -472,6 +483,7 @@ private void insertLoadBatchesForReload(Node targetNode, long loadId, String cre List triggerHistories, Map> triggerRoutersByHistoryId, boolean transactional, ISqlTransaction transaction) { + Map channels = engine.getConfigurationService().getChannels(false); for (TriggerHistory triggerHistory : triggerHistories) { List triggerRouters = triggerRoutersByHistoryId.get(triggerHistory .getTriggerHistoryId()); @@ -480,10 +492,8 @@ private void insertLoadBatchesForReload(Node targetNode, long loadId, String cre engine.getGroupletService().isTargetEnabled(triggerRouter, targetNode)) { if (parameterService.is(ParameterConstants.INTITAL_LOAD_USE_EXTRACT_JOB)) { Trigger trigger = triggerRouter.getTrigger(); - Channel channel = engine.getConfigurationService().getChannel(trigger.getChannelId()); - if (parameterService.is(ParameterConstants.INITIAL_LOAD_USE_RELOAD_CHANNEL)) { - channel = engine.getConfigurationService().getChannel(Constants.CHANNEL_RELOAD); - } + String reloadChannel = getReloadChannelIdForTrigger(trigger, channels); + Channel channel = channels.get(reloadChannel); // calculate the number of batches needed for table. int numberOfBatches = triggerRouter.getInitialLoadBatchCount(); if (numberOfBatches <= 0) { @@ -574,7 +584,7 @@ protected void insertPurgeEvent(ISqlTransaction transaction, Node targetNode, sql = FormatUtils.replace("groupId", targetNode.getNodeGroupId(), sql); sql = FormatUtils.replace("externalId", targetNode.getExternalId(), sql); sql = FormatUtils.replace("nodeId", targetNode.getNodeId(), sql); - String channelId = getChannelIdForTrigger(triggerRouter.getTrigger()); + String channelId = getReloadChannelIdForTrigger(triggerRouter.getTrigger(), engine.getConfigurationService().getChannels(false)); Data data = new Data(triggerHistory.getSourceTableName(), DataEventType.SQL, CsvUtils.escapeCsvData(sql), null, triggerHistory, channelId, null, null); if (isLoad) { insertDataAndDataEventAndOutgoingBatch(transaction, data, targetNode.getNodeId(), @@ -587,9 +597,11 @@ protected void insertPurgeEvent(ISqlTransaction transaction, Node targetNode, public void insertSqlEvent(Node targetNode, String sql, boolean isLoad, long loadId, String createBy) { TriggerHistory history = engine.getTriggerRouterService().findTriggerHistoryForGenericSync(); - boolean useReloadChannel = parameterService.is(ParameterConstants.INITIAL_LOAD_USE_RELOAD_CHANNEL); + Trigger trigger = engine.getTriggerRouterService().getTriggerById(history.getTriggerId(), false); + String reloadChannelId = getReloadChannelIdForTrigger(trigger, engine.getConfigurationService().getChannels(false)); + Data data = new Data(history.getSourceTableName(), DataEventType.SQL, - CsvUtils.escapeCsvData(sql), null, history, useReloadChannel && isLoad ? Constants.CHANNEL_RELOAD : Constants.CHANNEL_CONFIG, null, null); + CsvUtils.escapeCsvData(sql), null, history, isLoad ? reloadChannelId : Constants.CHANNEL_CONFIG, null, null); if (isLoad) { insertDataAndDataEventAndOutgoingBatch(data, targetNode.getNodeId(), Constants.UNKNOWN_ROUTER_ID, isLoad, loadId, createBy); @@ -608,9 +620,10 @@ public void insertSqlEvent(ISqlTransaction transaction, Node targetNode, String protected void insertSqlEvent(ISqlTransaction transaction, TriggerHistory history, String channelId, Node targetNode, String sql, boolean isLoad, long loadId, String createBy) { - boolean useReloadChannel = parameterService.is(ParameterConstants.INITIAL_LOAD_USE_RELOAD_CHANNEL); + Trigger trigger = engine.getTriggerRouterService().getTriggerById(history.getTriggerId(), false); + String reloadChannelId = getReloadChannelIdForTrigger(trigger, engine.getConfigurationService().getChannels(false)); Data data = new Data(history.getSourceTableName(), DataEventType.SQL, - CsvUtils.escapeCsvData(sql), null, history, useReloadChannel && isLoad ? Constants.CHANNEL_RELOAD + CsvUtils.escapeCsvData(sql), null, history, isLoad ? reloadChannelId : channelId, null, null); if (isLoad) { insertDataAndDataEventAndOutgoingBatch(transaction, data, targetNode.getNodeId(), @@ -660,13 +673,17 @@ public void insertCreateEvent(final Node targetNode, TriggerHistory triggerHisto public void insertCreateEvent(ISqlTransaction transaction, Node targetNode, TriggerHistory triggerHistory, String xml, boolean isLoad, long loadId, String createBy) { + + Trigger trigger = engine.getTriggerRouterService().getTriggerById(triggerHistory.getTriggerId(), false); + String reloadChannelId = getReloadChannelIdForTrigger(trigger, engine.getConfigurationService().getChannels(false)); + Data data = new Data( triggerHistory.getSourceTableName(), DataEventType.CREATE, CsvUtils.escapeCsvData(xml), null, triggerHistory, - parameterService.is(ParameterConstants.INITIAL_LOAD_USE_RELOAD_CHANNEL) && isLoad ? Constants.CHANNEL_RELOAD + isLoad ? reloadChannelId : Constants.CHANNEL_CONFIG, null, null); try { if (isLoad) { @@ -827,7 +844,15 @@ public long insertDataAndDataEventAndOutgoingBatch(Data data, String nodeId, Str public long insertDataAndDataEventAndOutgoingBatch(ISqlTransaction transaction, Data data, String nodeId, String routerId, boolean isLoad, long loadId, String createBy, Status status) { long dataId = insertData(transaction, data); - return insertDataEventAndOutgoingBatch(transaction, dataId, data.getChannelId(), nodeId, + String channelId = data.getChannelId(); + if (isLoad) { + TriggerHistory history = data.getTriggerHistory(); + if (history != null) { + Trigger trigger = engine.getTriggerRouterService().getTriggerById(history.getTriggerId()); + channelId = getReloadChannelIdForTrigger(trigger, engine.getConfigurationService().getChannels(false)); + } + } + return insertDataEventAndOutgoingBatch(transaction, dataId, channelId, nodeId, data.getDataEventType(), routerId, isLoad, loadId, createBy, status); } @@ -838,8 +863,6 @@ public long insertDataAndDataEventAndOutgoingBatch(ISqlTransaction transaction, protected long insertDataEventAndOutgoingBatch(ISqlTransaction transaction, long dataId, String channelId, String nodeId, DataEventType eventType, String routerId, boolean isLoad, long loadId, String createBy, Status status) { - boolean useReloadChannel = parameterService.is(ParameterConstants.INITIAL_LOAD_USE_RELOAD_CHANNEL); - channelId = useReloadChannel && isLoad && !Constants.CHANNEL_FILESYNC.equals(channelId) ? Constants.CHANNEL_RELOAD : channelId; OutgoingBatch outgoingBatch = new OutgoingBatch( nodeId, channelId, status); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/FileSyncService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/FileSyncService.java index 6db797ee7e..da154cbf67 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/FileSyncService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/FileSyncService.java @@ -26,7 +26,9 @@ import java.io.OutputStream; import java.sql.Types; import java.util.ArrayList; +import java.util.Collection; import java.util.Date; +import java.util.Iterator; import java.util.List; import java.util.Map; import java.util.Set; @@ -56,10 +58,11 @@ import org.jumpmind.symmetric.io.stage.IStagedResource; import org.jumpmind.symmetric.io.stage.IStagingManager; import org.jumpmind.symmetric.model.BatchAck; +import org.jumpmind.symmetric.model.Channel; +import org.jumpmind.symmetric.model.Data; import org.jumpmind.symmetric.model.FileConflictStrategy; import org.jumpmind.symmetric.model.FileSnapshot; import org.jumpmind.symmetric.model.FileSnapshot.LastEventType; -import org.jumpmind.symmetric.model.Data; import org.jumpmind.symmetric.model.FileTrigger; import org.jumpmind.symmetric.model.FileTriggerRouter; import org.jumpmind.symmetric.model.IncomingBatch; @@ -94,7 +97,7 @@ public class FileSyncService extends AbstractOfflineDetectorService implements I INodeCommunicationExecutor { private ISymmetricEngine engine; - + // TODO cache trigger routers public FileSyncService(ISymmetricEngine engine) { @@ -105,7 +108,8 @@ public FileSyncService(ISymmetricEngine engine) { public void trackChanges(boolean force) { if (force || engine.getClusterService().lock(ClusterConstants.FILE_SYNC_TRACKER)) { - if (engine.getClusterService().lock(ClusterConstants.FILE_SYNC_SHARED, ClusterConstants.TYPE_EXCLUSIVE, + if (engine.getClusterService().lock(ClusterConstants.FILE_SYNC_SHARED, + ClusterConstants.TYPE_EXCLUSIVE, getParameterService().getLong(ParameterConstants.FILE_SYNC_LOCK_WAIT_MS))) { try { List fileTriggerRouters = getFileTriggerRoutersForCurrentNode(); @@ -118,17 +122,19 @@ public void trackChanges(boolean force) { for (FileSnapshot fileSnapshot : dirSnapshot) { File file = fileTriggerRouter.getFileTrigger() .createSourceFile(fileSnapshot); - String filePath = file.getParentFile().getPath().replace('\\', '/'); + String filePath = file.getParentFile().getPath() + .replace('\\', '/'); String fileName = file.getName(); String nodeId = findSourceNodeIdFromFileIncoming(filePath, fileName, fileSnapshot.getFileModifiedTime()); - if (StringUtils.isNotBlank(nodeId)) { + if (StringUtils.isNotBlank(nodeId)) { fileSnapshot.setLastUpdateBy(nodeId); } else { fileSnapshot.setLastUpdateBy(null); } log.debug("Captured change " + fileSnapshot.getLastEventType() - + " change of " + fileSnapshot.getFileName() + " (lastmodified=" + + " change of " + fileSnapshot.getFileName() + + " (lastmodified=" + fileSnapshot.getFileModifiedTime() + ",size=" + fileSnapshot.getFileSize() + ") from " + fileSnapshot.getLastUpdateBy()); @@ -144,13 +150,14 @@ public void trackChanges(boolean force) { deleteFromFileIncoming(); } finally { - engine.getClusterService().unlock(ClusterConstants.FILE_SYNC_SHARED, ClusterConstants.TYPE_EXCLUSIVE); + engine.getClusterService().unlock(ClusterConstants.FILE_SYNC_SHARED, + ClusterConstants.TYPE_EXCLUSIVE); if (!force) { engine.getClusterService().unlock(ClusterConstants.FILE_SYNC_TRACKER); } } } else { - log.debug("Did not run the track file sync changes process because it was shared locked"); + log.debug("Did not run the track file sync changes process because it was shared locked"); } } else { log.debug("Did not run the track file sync changes process because it was cluster locked"); @@ -183,8 +190,8 @@ public List getFileTriggerRoutersForCurrentNode() { } public List getFileTriggerRouters() { - return sqlTemplate.query( - getSql("selectFileTriggerRoutersSql"), new FileTriggerRouterMapper()); + return sqlTemplate.query(getSql("selectFileTriggerRoutersSql"), + new FileTriggerRouterMapper()); } public FileTriggerRouter getFileTriggerRouter(String triggerId, String routerId) { @@ -201,31 +208,34 @@ public void saveFileTrigger(FileTrigger fileTrigger) { fileTrigger.getIncludesFiles(), fileTrigger.getExcludesFiles(), fileTrigger.isSyncOnCreate() ? 1 : 0, fileTrigger.isSyncOnModified() ? 1 : 0, - fileTrigger.isSyncOnDelete() ? 1 : 0, + fileTrigger.isSyncOnDelete() ? 1 : 0, fileTrigger.isSyncOnCtlFile() ? 1 : 0, - fileTrigger.isDeleteAfterSync() ? 1 : 0, - fileTrigger.getBeforeCopyScript(), + fileTrigger.isDeleteAfterSync() ? 1 : 0, fileTrigger.getBeforeCopyScript(), fileTrigger.getAfterCopyScript(), fileTrigger.getLastUpdateBy(), - fileTrigger.getLastUpdateTime(), fileTrigger.getTriggerId() }, new int[] { + fileTrigger.getLastUpdateTime(), fileTrigger.getChannelId(), + fileTrigger.getReloadChannelId(), fileTrigger.getTriggerId() }, new int[] { Types.VARCHAR, Types.SMALLINT, Types.VARCHAR, Types.VARCHAR, Types.SMALLINT, Types.SMALLINT, Types.SMALLINT, Types.VARCHAR, - Types.VARCHAR, Types.VARCHAR, Types.TIMESTAMP, Types.VARCHAR })) { + Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, + Types.TIMESTAMP, Types.VARCHAR })) { fileTrigger.setCreateTime(fileTrigger.getLastUpdateTime()); - sqlTemplate.update(getSql("insertFileTriggerSql"), + sqlTemplate.update( + getSql("insertFileTriggerSql"), new Object[] { fileTrigger.getBaseDir(), fileTrigger.isRecurse() ? 1 : 0, fileTrigger.getIncludesFiles(), fileTrigger.getExcludesFiles(), fileTrigger.isSyncOnCreate() ? 1 : 0, fileTrigger.isSyncOnModified() ? 1 : 0, fileTrigger.isSyncOnDelete() ? 1 : 0, fileTrigger.isSyncOnCtlFile() ? 1 : 0, - fileTrigger.isDeleteAfterSync() ? 1 : 0, + fileTrigger.isDeleteAfterSync() ? 1 : 0, fileTrigger.getBeforeCopyScript(), fileTrigger.getAfterCopyScript(), fileTrigger.getLastUpdateBy(), fileTrigger.getLastUpdateTime(), - fileTrigger.getTriggerId(), fileTrigger.getCreateTime() }, new int[] { - Types.VARCHAR, Types.SMALLINT, Types.VARCHAR, Types.VARCHAR, + fileTrigger.getTriggerId(), fileTrigger.getCreateTime(), + fileTrigger.getChannelId(), fileTrigger.getReloadChannelId() }, + new int[] { Types.VARCHAR, Types.SMALLINT, Types.VARCHAR, Types.VARCHAR, Types.SMALLINT, Types.SMALLINT, Types.SMALLINT, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.TIMESTAMP, Types.VARCHAR, - Types.TIMESTAMP }); + Types.TIMESTAMP, Types.VARCHAR, Types.VARCHAR }); } } @@ -236,36 +246,33 @@ public void saveFileTriggerRouter(FileTriggerRouter fileTriggerRouter) { getSql("updateFileTriggerRouterSql"), new Object[] { fileTriggerRouter.isEnabled() ? 1 : 0, fileTriggerRouter.isInitialLoadEnabled() ? 1 : 0, - fileTriggerRouter.getTargetBaseDir(), + fileTriggerRouter.getTargetBaseDir(), fileTriggerRouter.getConflictStrategy().name(), - fileTriggerRouter.getLastUpdateBy(), - fileTriggerRouter.getLastUpdateTime(), + fileTriggerRouter.getLastUpdateBy(), fileTriggerRouter.getLastUpdateTime(), fileTriggerRouter.getFileTrigger().getTriggerId(), - fileTriggerRouter.getRouter().getRouterId() }, - new int[] { Types.SMALLINT, Types.SMALLINT, - Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, + fileTriggerRouter.getRouter().getRouterId() }, new int[] { Types.SMALLINT, + Types.SMALLINT, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.TIMESTAMP, Types.VARCHAR, Types.VARCHAR })) { fileTriggerRouter.setCreateTime(fileTriggerRouter.getLastUpdateTime()); sqlTemplate.update( getSql("insertFileTriggerRouterSql"), new Object[] { fileTriggerRouter.isEnabled() ? 1 : 0, fileTriggerRouter.isInitialLoadEnabled() ? 1 : 0, - fileTriggerRouter.getTargetBaseDir(), + fileTriggerRouter.getTargetBaseDir(), fileTriggerRouter.getConflictStrategy().name(), - fileTriggerRouter.getCreateTime(), - fileTriggerRouter.getLastUpdateBy(), + fileTriggerRouter.getCreateTime(), fileTriggerRouter.getLastUpdateBy(), fileTriggerRouter.getLastUpdateTime(), fileTriggerRouter.getFileTrigger().getTriggerId(), - fileTriggerRouter.getRouter().getRouterId() }, - new int[] { Types.SMALLINT, Types.SMALLINT, - Types.VARCHAR, Types.VARCHAR, Types.TIMESTAMP, Types.VARCHAR, - Types.TIMESTAMP, Types.VARCHAR, Types.VARCHAR }); + fileTriggerRouter.getRouter().getRouterId() }, new int[] { + Types.SMALLINT, Types.SMALLINT, Types.VARCHAR, Types.VARCHAR, + Types.TIMESTAMP, Types.VARCHAR, Types.TIMESTAMP, Types.VARCHAR, + Types.VARCHAR }); } } public void deleteFileTriggerRouter(FileTriggerRouter fileTriggerRouter) { - sqlTemplate.update(getSql("deleteFileTriggerRouterSql"), (Object) fileTriggerRouter.getFileTrigger() - .getTriggerId(), fileTriggerRouter.getRouter().getRouterId()); + sqlTemplate.update(getSql("deleteFileTriggerRouterSql"), (Object) fileTriggerRouter + .getFileTrigger().getTriggerId(), fileTriggerRouter.getRouter().getRouterId()); } public void deleteFileTrigger(FileTrigger fileTrigger) { @@ -358,155 +365,165 @@ synchronized public RemoteNodeStatuses pushFilesToNodes(boolean force) { public List sendFiles(ProcessInfo processInfo, Node targetNode, IOutgoingTransport outgoingTransport) { + List processedBatches = new ArrayList(); + List fileSyncChannels = engine.getConfigurationService().getFileSyncChannels(); + for (Channel channel : fileSyncChannels) { - OutgoingBatches batches = engine.getOutgoingBatchService().getOutgoingBatches( - targetNode.getNodeId(), false); - List activeBatches = batches - .filterBatchesForChannel(Constants.CHANNEL_FILESYNC); - - OutgoingBatch currentBatch = null; + OutgoingBatches batches = engine.getOutgoingBatchService().getOutgoingBatches( + targetNode.getNodeId(), false); + List activeBatches = batches.filterBatchesForChannel(channel); - IStagingManager stagingManager = engine.getStagingManager(); - long memoryThresholdInBytes = parameterService - .getLong(ParameterConstants.STREAM_TO_FILE_THRESHOLD); - IStagedResource stagedResource = stagingManager.create(memoryThresholdInBytes, Constants.STAGING_CATEGORY_OUTGOING, - processInfo.getSourceNodeId(), targetNode.getNodeId(), "filesync.zip"); - - try { + OutgoingBatch currentBatch = null; - long maxBytesToSync = parameterService - .getLong(ParameterConstants.TRANSPORT_MAX_BYTES_TO_SYNC); + IStagingManager stagingManager = engine.getStagingManager(); + long memoryThresholdInBytes = parameterService + .getLong(ParameterConstants.STREAM_TO_FILE_THRESHOLD); + IStagedResource stagedResource = stagingManager.create(memoryThresholdInBytes, + Constants.STAGING_CATEGORY_OUTGOING, processInfo.getSourceNodeId(), + targetNode.getNodeId(), "filesync.zip"); - FileSyncZipDataWriter dataWriter = new FileSyncZipDataWriter(maxBytesToSync, this, engine.getNodeService(), - stagedResource); try { - for (int i = 0; i < activeBatches.size(); i++) { - currentBatch = activeBatches.get(i); - processInfo.incrementBatchCount(); - processInfo.setCurrentBatchId(currentBatch.getBatchId()); - - ((DataExtractorService) engine.getDataExtractorService()).extractOutgoingBatch( - processInfo, targetNode, dataWriter, currentBatch, false, true, - DataExtractorService.ExtractMode.FOR_SYM_CLIENT); - - /* - * check to see if max bytes to sync has been reached and - * stop processing batches - */ - if (dataWriter.readyToSend()) { - break; - } - } - } finally { - dataWriter.finish(); - } - processInfo.setStatus(ProcessInfo.Status.TRANSFERRING); + long maxBytesToSync = parameterService + .getLong(ParameterConstants.TRANSPORT_MAX_BYTES_TO_SYNC); - for (int i = 0; i < activeBatches.size(); i++) { - activeBatches.get(i).setStatus(Status.SE); - } - engine.getOutgoingBatchService().updateOutgoingBatches(activeBatches); - - try { - if (stagedResource.exists()) { - InputStream is = stagedResource.getInputStream(); - try { - OutputStream os = outgoingTransport.openStream(); - IOUtils.copy(is, os); - os.flush(); - } catch (IOException e) { - throw new IoException(e); + FileSyncZipDataWriter dataWriter = new FileSyncZipDataWriter(maxBytesToSync, this, + engine.getNodeService(), stagedResource); + try { + for (int i = 0; i < activeBatches.size(); i++) { + currentBatch = activeBatches.get(i); + processInfo.incrementBatchCount(); + processInfo.setCurrentBatchId(currentBatch.getBatchId()); + + ((DataExtractorService) engine.getDataExtractorService()) + .extractOutgoingBatch(processInfo, targetNode, dataWriter, + currentBatch, false, true, + DataExtractorService.ExtractMode.FOR_SYM_CLIENT); + + /* + * check to see if max bytes to sync has been reached + * and stop processing batches + */ + if (dataWriter.readyToSend()) { + break; + } } + } finally { + dataWriter.finish(); } + processInfo.setStatus(ProcessInfo.Status.TRANSFERRING); + for (int i = 0; i < activeBatches.size(); i++) { - activeBatches.get(i).setStatus(Status.LD); + activeBatches.get(i).setStatus(Status.SE); } engine.getOutgoingBatchService().updateOutgoingBatches(activeBatches); - } finally { - stagedResource.close(); - } + try { + if (stagedResource.exists()) { + InputStream is = stagedResource.getInputStream(); + try { + OutputStream os = outgoingTransport.openStream(); + IOUtils.copy(is, os); + os.flush(); + } catch (IOException e) { + throw new IoException(e); + } + } - return activeBatches; + for (int i = 0; i < activeBatches.size(); i++) { + activeBatches.get(i).setStatus(Status.LD); + } + engine.getOutgoingBatchService().updateOutgoingBatches(activeBatches); - } catch (RuntimeException e) { - if (currentBatch != null) { - engine.getStatisticManager().incrementDataExtractedErrors( - currentBatch.getChannelId(), 1); - currentBatch.setSqlMessage(getRootMessage(e)); - currentBatch.revertStatsOnError(); - if (currentBatch.getStatus() != Status.IG) { - currentBatch.setStatus(Status.ER); + } finally { + stagedResource.close(); } - currentBatch.setErrorFlag(true); - engine.getOutgoingBatchService().updateOutgoingBatch(currentBatch); - if (isStreamClosedByClient(e)) { - log.warn( - "Failed to extract batch {}. The stream was closed by the client. The error was: {}", - currentBatch, getRootMessage(e)); + processedBatches.addAll(activeBatches); + + } catch (RuntimeException e) { + if (currentBatch != null) { + engine.getStatisticManager().incrementDataExtractedErrors( + currentBatch.getChannelId(), 1); + currentBatch.setSqlMessage(getRootMessage(e)); + currentBatch.revertStatsOnError(); + if (currentBatch.getStatus() != Status.IG) { + currentBatch.setStatus(Status.ER); + } + currentBatch.setErrorFlag(true); + engine.getOutgoingBatchService().updateOutgoingBatch(currentBatch); + + if (isStreamClosedByClient(e)) { + log.warn( + "Failed to extract batch {}. The stream was closed by the client. The error was: {}", + currentBatch, getRootMessage(e)); + } else { + log.error("Failed to extract batch {}", currentBatch, e); + } } else { - log.error("Failed to extract batch {}", currentBatch, e); + log.error("Could not log the outgoing batch status because the batch was null", + e); } - } else { - log.error("Could not log the outgoing batch status because the batch was null", e); - } - throw e; - } finally { - if (stagedResource != null) { - stagedResource.delete(); + throw e; + } finally { + if (stagedResource != null) { + stagedResource.delete(); + } } } + return processedBatches; } - - public void acknowledgeFiles(OutgoingBatch outgoingBatch){ + + public void acknowledgeFiles(OutgoingBatch outgoingBatch) { log.debug("Acknowledging file_sync outgoing batch-{}", outgoingBatch.getBatchId()); - ISqlReadCursor cursor = engine.getDataService().selectDataFor(outgoingBatch.getBatchId(), outgoingBatch.getChannelId()); + ISqlReadCursor cursor = engine.getDataService().selectDataFor( + outgoingBatch.getBatchId(), outgoingBatch.getChannelId()); Data data = null; - List filesToDelete = new ArrayList(); - Table snapshotTable = platform.getTableFromCache(TableConstants.getTableName(tablePrefix, TableConstants.SYM_FILE_SNAPSHOT), false); + List filesToDelete = new ArrayList(); + Table snapshotTable = platform.getTableFromCache( + TableConstants.getTableName(tablePrefix, TableConstants.SYM_FILE_SNAPSHOT), false); for (int i = 0; i < outgoingBatch.getInsertEventCount(); i++) { data = cursor.next(); - if (data != null && - (data.getDataEventType() == DataEventType.INSERT || - data.getDataEventType() == DataEventType.UPDATE)) { + if (data != null + && (data.getDataEventType() == DataEventType.INSERT || data.getDataEventType() == DataEventType.UPDATE)) { Map columnData = data.toColumnNameValuePairs( snapshotTable.getColumnNames(), CsvData.ROW_DATA); - + FileSnapshot fileSnapshot = new FileSnapshot(); fileSnapshot.setTriggerId(columnData.get("TRIGGER_ID")); fileSnapshot.setRouterId(columnData.get("ROUTER_ID")); - fileSnapshot.setFileModifiedTime(Long.parseLong(columnData.get("FILE_MODIFIED_TIME"))); + fileSnapshot.setFileModifiedTime(Long.parseLong(columnData + .get("FILE_MODIFIED_TIME"))); fileSnapshot.setFileName(columnData.get("FILE_NAME")); fileSnapshot.setRelativeDir(columnData.get("RELATIVE_DIR")); - fileSnapshot.setLastEventType(LastEventType.fromCode(columnData.get("LAST_EVENT_TYPE"))); - + fileSnapshot.setLastEventType(LastEventType.fromCode(columnData + .get("LAST_EVENT_TYPE"))); + FileTriggerRouter triggerRouter = this.getFileTriggerRouter( fileSnapshot.getTriggerId(), fileSnapshot.getRouterId()); if (triggerRouter != null) { FileTrigger fileTrigger = triggerRouter.getFileTrigger(); - - if(fileTrigger.isDeleteAfterSync()) { + + if (fileTrigger.isDeleteAfterSync()) { File file = fileTrigger.createSourceFile(fileSnapshot); if (!file.isDirectory()) { filesToDelete.add(file); - if(fileTrigger.isSyncOnCtlFile()) { + if (fileTrigger.isSyncOnCtlFile()) { filesToDelete.add(new File(file.getAbsolutePath() + ".ctl")); } } } } - } + } } - + if (cursor != null) { cursor.close(); cursor = null; } - + if (filesToDelete != null && filesToDelete.size() > 0) { for (File file : filesToDelete) { if (file != null && file.exists()) { @@ -597,8 +614,8 @@ protected void pushFilesToNode(NodeCommunication nodeCommunication, RemoteNodeSt protected List processZip(InputStream is, String sourceNodeId, ProcessInfo processInfo) throws IOException { - File unzipDir = new File(parameterService.getTempDirectory(), String.format("filesync_incoming/%s/%s", - engine.getNodeService().findIdentityNodeId(), + File unzipDir = new File(parameterService.getTempDirectory(), String.format( + "filesync_incoming/%s/%s", engine.getNodeService().findIdentityNodeId(), sourceNodeId)); FileUtils.deleteDirectory(unzipDir); unzipDir.mkdirs(); @@ -629,6 +646,7 @@ protected List processZip(InputStream is, String sourceNodeId, File syncScript = new File(batchDir, "sync.bsh"); IncomingBatch incomingBatch = new IncomingBatch(); + /* TODO need to get the actual channel id from the zip */ incomingBatch.setChannelId(Constants.CHANNEL_FILESYNC); incomingBatch.setBatchId(batchId); incomingBatch.setStatus(IncomingBatch.Status.LD); @@ -642,12 +660,15 @@ protected List processZip(InputStream is, String sourceNodeId, boolean isLocked = false; try { interpreter.set("log", log); - interpreter.set("batchDir", batchDir.getAbsolutePath().replace('\\', '/')); + interpreter.set("batchDir", batchDir.getAbsolutePath().replace('\\', '/')); interpreter.set("engine", engine); interpreter.set("sourceNodeId", sourceNodeId); - long waitMillis = getParameterService().getLong(ParameterConstants.FILE_SYNC_LOCK_WAIT_MS); - isLocked = engine.getClusterService().lock(ClusterConstants.FILE_SYNC_SHARED, ClusterConstants.TYPE_SHARED, waitMillis); + long waitMillis = getParameterService().getLong( + ParameterConstants.FILE_SYNC_LOCK_WAIT_MS); + isLocked = engine.getClusterService().lock( + ClusterConstants.FILE_SYNC_SHARED, ClusterConstants.TYPE_SHARED, + waitMillis); if (isLocked) { @SuppressWarnings("unchecked") Map filesToEventType = (Map) interpreter @@ -657,7 +678,9 @@ protected List processZip(InputStream is, String sourceNodeId, .setStatementCount(filesToEventType != null ? filesToEventType .size() : 0); } else { - throw new RuntimeException("Could not obtain file sync shared lock within " + waitMillis + " millis"); + throw new RuntimeException( + "Could not obtain file sync shared lock within " + waitMillis + + " millis"); } incomingBatch.setStatus(IncomingBatch.Status.OK); if (incomingBatchService.isRecordOkBatchesEnabled()) { @@ -676,7 +699,8 @@ protected List processZip(InputStream is, String sourceNodeId, } if (ex instanceof FileConflictException) { - log.error(ex.getMessage() + ". Failed to process file sync batch " + batchId); + log.error(ex.getMessage() + ". Failed to process file sync batch " + + batchId); } else { log.error("Failed to process file sync batch " + batchId, ex); } @@ -684,7 +708,8 @@ protected List processZip(InputStream is, String sourceNodeId, incomingBatch.setErrorFlag(true); incomingBatch.setStatus(IncomingBatch.Status.ER); incomingBatch.setSqlMessage(ex.getMessage()); - if (incomingBatchService.isRecordOkBatchesEnabled() || incomingBatch.isRetry()) { + if (incomingBatchService.isRecordOkBatchesEnabled() + || incomingBatch.isRetry()) { incomingBatchService.updateIncomingBatch(incomingBatch); } else { incomingBatchService.insertIncomingBatch(incomingBatch); @@ -693,7 +718,8 @@ protected List processZip(InputStream is, String sourceNodeId, break; } finally { if (isLocked) { - engine.getClusterService().unlock(ClusterConstants.FILE_SYNC_SHARED, ClusterConstants.TYPE_SHARED); + engine.getClusterService().unlock(ClusterConstants.FILE_SYNC_SHARED, + ClusterConstants.TYPE_SHARED); } } } else { @@ -745,7 +771,7 @@ protected void pullFilesFromNode(NodeCommunication nodeCommunication, RemoteNode sendAck(nodeCommunication.getNode(), identity, security, batchesProcessed, engine.getTransportManager()); } - + } catch (Exception e) { fireOffline(e, nodeCommunication.getNode(), status); } finally { @@ -762,7 +788,7 @@ protected void pullFilesFromNode(NodeCommunication nodeCommunication, RemoteNode protected RemoteNodeStatuses queueJob(boolean force, long minimumPeriodMs, String clusterLock, CommunicationType type) { - final RemoteNodeStatuses statuses = new RemoteNodeStatuses(); + final RemoteNodeStatuses statuses = new RemoteNodeStatuses(engine.getConfigurationService().getChannels(false)); Node identity = engine.getNodeService().findIdentity(false); if (identity != null && identity.isSyncEnabled()) { if (force || !engine.getClusterService().isInfiniteLocked(clusterLock)) { @@ -805,7 +831,8 @@ protected RemoteNodeStatuses queueJob(boolean force, long minimumPeriodMs, Strin class FileTriggerMapper implements ISqlRowMapper { public FileTrigger mapRow(Row rs) { FileTrigger fileTrigger = new FileTrigger(); - fileTrigger.setBaseDir(rs.getString("base_dir")==null?null:rs.getString("base_dir").replace('\\', '/')); + fileTrigger.setBaseDir(rs.getString("base_dir") == null ? null : rs.getString( + "base_dir").replace('\\', '/')); fileTrigger.setCreateTime(rs.getDateTime("create_time")); fileTrigger.setExcludesFiles(rs.getString("excludes_files")); fileTrigger.setIncludesFiles(rs.getString("includes_files")); @@ -820,6 +847,8 @@ public FileTrigger mapRow(Row rs) { fileTrigger.setSyncOnCtlFile(rs.getBoolean("sync_on_ctl_file")); fileTrigger.setDeleteAfterSync(rs.getBoolean("delete_after_sync")); fileTrigger.setTriggerId(rs.getString("trigger_id")); + fileTrigger.setChannelId(rs.getString("channel_id")); + fileTrigger.setReloadChannelId(rs.getString("reload_channel_id")); return fileTrigger; } } @@ -837,7 +866,8 @@ public FileTriggerRouter mapRow(Row rs) { fileTriggerRouter.setLastUpdateTime(rs.getDateTime("last_update_time")); fileTriggerRouter.setEnabled(rs.getBoolean("enabled")); fileTriggerRouter.setInitialLoadEnabled(rs.getBoolean("initial_load_enabled")); - fileTriggerRouter.setTargetBaseDir((rs.getString("target_base_dir")==null)?null:rs.getString("target_base_dir").replace('\\', '/')); + fileTriggerRouter.setTargetBaseDir((rs.getString("target_base_dir") == null) ? null + : rs.getString("target_base_dir").replace('\\', '/')); fileTriggerRouter.setRouter(engine.getTriggerRouterService().getRouterById( rs.getString("router_id"))); return fileTriggerRouter; @@ -853,12 +883,13 @@ public FileSnapshot mapRow(Row rs) { fileSnapshot.setLastUpdateTime(rs.getDateTime("last_update_time")); fileSnapshot.setFileModifiedTime(rs.getLong("file_modified_time")); fileSnapshot.setFileName(rs.getString("file_name")); - fileSnapshot.setRelativeDir(rs.getString("relative_dir")==null?null:rs.getString("relative_dir").replace('\\', '/')); + fileSnapshot.setRelativeDir(rs.getString("relative_dir") == null ? null : rs.getString( + "relative_dir").replace('\\', '/')); fileSnapshot.setFileSize(rs.getLong("file_size")); fileSnapshot.setLastEventType(LastEventType.fromCode(rs.getString("last_event_type"))); fileSnapshot.setTriggerId(rs.getString("trigger_id")); fileSnapshot.setRouterId(rs.getString("router_id")); return fileSnapshot; - } + } } } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/FileSyncServiceSqlMap.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/FileSyncServiceSqlMap.java index 3a0ea1ffab..749938c2de 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/FileSyncServiceSqlMap.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/FileSyncServiceSqlMap.java @@ -39,7 +39,7 @@ public FileSyncServiceSqlMap(IDatabasePlatform platform, Map rep " before_copy_script, " + " after_copy_script, " + " create_time, last_update_by, " + - " last_update_time " + + " last_update_time, channel_id, reload_channel_id " + " from $(file_trigger) "); putSql("triggerIdWhere", "where trigger_id=?"); @@ -50,15 +50,17 @@ public FileSyncServiceSqlMap(IDatabasePlatform platform, Map rep " excludes_files=?, sync_on_create=?, sync_on_modified=?, sync_on_delete=?, " + " sync_on_ctl_file=?, delete_after_sync=?, " + " before_copy_script=?, after_copy_script=?, " + - " last_update_by=?, last_update_time=? where trigger_id=? "); + " last_update_by=?, last_update_time=?, channel_id=?, reload_channel_id=? " + + "where trigger_id=? "); putSql("insertFileTriggerSql", " insert into $(file_trigger) (base_dir, recurse, includes_files, " + " excludes_files, sync_on_create, sync_on_modified, sync_on_delete, " + " sync_on_ctl_file, delete_after_sync, " + " before_copy_script, after_copy_script, " + - " last_update_by, last_update_time, trigger_id, create_time) " + - " values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?) "); + " last_update_by, last_update_time, trigger_id, create_time, " + + "channel_id, reload_channel_id) " + + " values(?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?) "); putSql("selectFileSnapshotSql", " select trigger_id, router_id, relative_dir, file_name, last_event_type, crc32_checksum, " + diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PullService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PullService.java index bd29aa8c39..9bc3dcb399 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PullService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PullService.java @@ -36,6 +36,7 @@ import org.jumpmind.symmetric.model.RemoteNodeStatuses; import org.jumpmind.symmetric.service.ClusterConstants; import org.jumpmind.symmetric.service.IClusterService; +import org.jumpmind.symmetric.service.IConfigurationService; import org.jumpmind.symmetric.service.IDataLoaderService; import org.jumpmind.symmetric.service.INodeCommunicationService; import org.jumpmind.symmetric.service.INodeCommunicationService.INodeCommunicationExecutor; @@ -62,21 +63,24 @@ public class PullService extends AbstractOfflineDetectorService implements IPull private INodeCommunicationService nodeCommunicationService; private IDataLoaderService dataLoaderService; + + private IConfigurationService configurationService; public PullService(IParameterService parameterService, ISymmetricDialect symmetricDialect, INodeService nodeService, IDataLoaderService dataLoaderService, IRegistrationService registrationService, IClusterService clusterService, - INodeCommunicationService nodeCommunicationService) { + INodeCommunicationService nodeCommunicationService, IConfigurationService configurationService) { super(parameterService, symmetricDialect); this.nodeService = nodeService; this.registrationService = registrationService; this.clusterService = clusterService; this.nodeCommunicationService = nodeCommunicationService; this.dataLoaderService = dataLoaderService; + this.configurationService = configurationService; } synchronized public RemoteNodeStatuses pullData(boolean force) { - final RemoteNodeStatuses statuses = new RemoteNodeStatuses(); + final RemoteNodeStatuses statuses = new RemoteNodeStatuses(configurationService.getChannels(false)); Node identity = nodeService.findIdentity(false); if (identity == null || identity.isSyncEnabled()) { long minimumPeriodMs = parameterService.getLong(ParameterConstants.PULL_MINIMUM_PERIOD_MS, -1); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PushService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PushService.java index 26e1ed41fc..daeac3d81a 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PushService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PushService.java @@ -43,6 +43,7 @@ import org.jumpmind.symmetric.service.ClusterConstants; import org.jumpmind.symmetric.service.IAcknowledgeService; import org.jumpmind.symmetric.service.IClusterService; +import org.jumpmind.symmetric.service.IConfigurationService; import org.jumpmind.symmetric.service.IDataExtractorService; import org.jumpmind.symmetric.service.INodeCommunicationService; import org.jumpmind.symmetric.service.INodeCommunicationService.INodeCommunicationExecutor; @@ -72,13 +73,15 @@ public class PushService extends AbstractOfflineDetectorService implements IPush private INodeCommunicationService nodeCommunicationService; private IStatisticManager statisticManager; + + private IConfigurationService configurationService; private Map startTimesOfNodesBeingPushedTo = new HashMap(); public PushService(IParameterService parameterService, ISymmetricDialect symmetricDialect, IDataExtractorService dataExtractorService, IAcknowledgeService acknowledgeService, ITransportManager transportManager, INodeService nodeService, - IClusterService clusterService, INodeCommunicationService nodeCommunicationService, IStatisticManager statisticManager) { + IClusterService clusterService, INodeCommunicationService nodeCommunicationService, IStatisticManager statisticManager, IConfigurationService configrationService) { super(parameterService, symmetricDialect); this.dataExtractorService = dataExtractorService; this.acknowledgeService = acknowledgeService; @@ -87,6 +90,7 @@ public PushService(IParameterService parameterService, ISymmetricDialect symmetr this.clusterService = clusterService; this.nodeCommunicationService = nodeCommunicationService; this.statisticManager = statisticManager; + this.configurationService = configrationService; } public Map getStartTimesOfNodesBeingPushedTo() { @@ -94,7 +98,7 @@ public Map getStartTimesOfNodesBeingPushedTo() { } synchronized public RemoteNodeStatuses pushData(boolean force) { - RemoteNodeStatuses statuses = new RemoteNodeStatuses(); + RemoteNodeStatuses statuses = new RemoteNodeStatuses(configurationService.getChannels(false)); Node identity = nodeService.findIdentity(false); if (identity != null && identity.isSyncEnabled()) { diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/RouterService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/RouterService.java index 9fd65b3499..9a27613c73 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/RouterService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/RouterService.java @@ -41,6 +41,7 @@ import org.jumpmind.symmetric.common.Constants; import org.jumpmind.symmetric.common.ParameterConstants; import org.jumpmind.symmetric.io.data.DataEventType; +import org.jumpmind.symmetric.model.Channel; import org.jumpmind.symmetric.model.Data; import org.jumpmind.symmetric.model.DataGap; import org.jumpmind.symmetric.model.DataMetaData; @@ -301,7 +302,7 @@ protected int routeDataForEachChannel(DataGapDetector gapDetector) { dataCount += routeDataForChannel(processInfo, nodeChannel, sourceNode, - producesCommonBatches(nodeChannel.getChannelId(), + producesCommonBatches(nodeChannel.getChannel(), triggerRouters.get(nodeChannel.getChannelId())), gapDetector); } else { if (log.isDebugEnabled()) { @@ -319,11 +320,12 @@ protected int routeDataForEachChannel(DataGapDetector gapDetector) { return dataCount; } - protected boolean producesCommonBatches(String channelId, + protected boolean producesCommonBatches(Channel channel, List allTriggerRoutersForChannel) { + String channelId = channel.getChannelId(); Boolean producesCommonBatches = !Constants.CHANNEL_CONFIG.equals(channelId) - && !Constants.CHANNEL_FILESYNC.equals(channelId) - && !Constants.CHANNEL_RELOAD.equals(channelId) + && !channel.isFileSyncFlag() + && !channel.isReloadFlag() && !Constants.CHANNEL_HEARTBEAT.equals(channelId) ? true : false; String nodeGroupId = parameterService.getNodeGroupId(); if (allTriggerRoutersForChannel != null) { diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/TriggerRouterService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/TriggerRouterService.java index cb230393e2..d0230a1c68 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/TriggerRouterService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/TriggerRouterService.java @@ -932,7 +932,7 @@ public void saveTrigger(Trigger trigger) { if (0 == sqlTemplate.update( getSql("updateTriggerSql"), new Object[] { trigger.getSourceCatalogName(), trigger.getSourceSchemaName(), - trigger.getSourceTableName(), trigger.getChannelId(), + trigger.getSourceTableName(), trigger.getChannelId(), trigger.getReloadChannelId(), trigger.isSyncOnUpdate() ? 1 : 0, trigger.isSyncOnInsert() ? 1 : 0, trigger.isSyncOnDelete() ? 1 : 0, trigger.isSyncOnIncomingBatch() ? 1 : 0, trigger.isUseStreamLobs() ? 1 : 0, trigger.isUseCaptureLobs() ? 1 : 0, @@ -945,7 +945,7 @@ public void saveTrigger(Trigger trigger) { trigger.getExcludedColumnNames(), trigger.getSyncKeyNames(), trigger.getLastUpdateBy(), trigger.getLastUpdateTime(), trigger.getExternalSelect(), trigger.getTriggerId() }, new int[] { - Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.SMALLINT, + Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.SMALLINT, Types.SMALLINT, Types.SMALLINT, Types.SMALLINT, Types.SMALLINT, Types.SMALLINT, Types.SMALLINT, Types.SMALLINT, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, @@ -955,7 +955,7 @@ public void saveTrigger(Trigger trigger) { sqlTemplate.update( getSql("insertTriggerSql"), new Object[] { trigger.getSourceCatalogName(), trigger.getSourceSchemaName(), - trigger.getSourceTableName(), trigger.getChannelId(), + trigger.getSourceTableName(), trigger.getChannelId(), trigger.getChannelId(), trigger.isSyncOnUpdate() ? 1 : 0, trigger.isSyncOnInsert() ? 1 : 0, trigger.isSyncOnDelete() ? 1 : 0, trigger.isSyncOnIncomingBatch() ? 1 : 0, trigger.isUseStreamLobs() ? 1 : 0, trigger.isUseCaptureLobs() ? 1 : 0, @@ -969,7 +969,7 @@ public void saveTrigger(Trigger trigger) { trigger.getCreateTime(), trigger.getLastUpdateBy(), trigger.getLastUpdateTime(), trigger.getExternalSelect(), trigger.getTriggerId() }, new int[] { - Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, + Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.SMALLINT, Types.SMALLINT, Types.SMALLINT, Types.SMALLINT, Types.SMALLINT, Types.SMALLINT, Types.SMALLINT, Types.SMALLINT, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, Types.VARCHAR, @@ -1711,6 +1711,7 @@ public Trigger mapRow(Row rs) { Trigger trigger = new Trigger(); trigger.setTriggerId(rs.getString("trigger_id")); trigger.setChannelId(rs.getString("channel_id")); + trigger.setReloadChannelId(rs.getString("reload_channel_id")); trigger.setSourceTableName(rs.getString("source_table_name")); trigger.setSyncOnInsert(rs.getBoolean("sync_on_insert")); trigger.setSyncOnUpdate(rs.getBoolean("sync_on_update")); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/TriggerRouterServiceSqlMap.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/TriggerRouterServiceSqlMap.java index bed251482d..8b6caf9bcd 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/TriggerRouterServiceSqlMap.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/TriggerRouterServiceSqlMap.java @@ -70,7 +70,7 @@ public TriggerRouterServiceSqlMap(IDatabasePlatform platform, putSql("selectTriggersColumnList", "" - + " t.trigger_id,t.channel_id,t.source_table_name,t.source_schema_name,t.source_catalog_name, " + + " t.trigger_id,t.channel_id,t.reload_channel_id,t.source_table_name,t.source_schema_name,t.source_catalog_name, " + " t.sync_on_insert,t.sync_on_update,t.sync_on_delete,t.sync_on_incoming_batch,t.use_stream_lobs, " + " t.use_capture_lobs,t.use_capture_old_data,t.use_handle_key_updates, " + " t.excluded_column_names, t.sync_key_names, " @@ -131,17 +131,17 @@ public TriggerRouterServiceSqlMap(IDatabasePlatform platform, putSql("insertTriggerSql", "" + "insert into $(trigger) " - + " (source_catalog_name,source_schema_name,source_table_name,channel_id,sync_on_update,sync_on_insert,sync_on_delete, " + + " (source_catalog_name,source_schema_name,source_table_name,channel_id,reload_channel_id,sync_on_update,sync_on_insert,sync_on_delete, " + " sync_on_incoming_batch,use_stream_lobs,use_capture_lobs,use_capture_old_data,use_handle_key_updates,name_for_update_trigger,name_for_insert_trigger,name_for_delete_trigger, " + " sync_on_update_condition,sync_on_insert_condition,sync_on_delete_condition,custom_on_update_text,custom_on_insert_text,custom_on_delete_text,tx_id_expression,excluded_column_names, sync_key_names, " + " create_time,last_update_by,last_update_time,external_select,trigger_id) " - + " values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?) "); + + " values (?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?,?) "); putSql("updateTriggerSql", "" + "update $(trigger) " + " set source_catalog_name=?,source_schema_name=?,source_table_name=?, " - + " channel_id=?,sync_on_update=?,sync_on_insert=?,sync_on_delete=?, " + + " channel_id=?,reload_channel_id=?,sync_on_update=?,sync_on_insert=?,sync_on_delete=?, " + " sync_on_incoming_batch=?,use_stream_lobs=?,use_capture_lobs=?,use_capture_old_data=?,use_handle_key_updates=?,name_for_update_trigger=?,name_for_insert_trigger=?, " + " name_for_delete_trigger=?,sync_on_update_condition=?,sync_on_insert_condition=?,sync_on_delete_condition=?,custom_on_update_text=?,custom_on_insert_text=?,custom_on_delete_text=?, " + " tx_id_expression=?,excluded_column_names=?,sync_key_names=?,last_update_by=?,last_update_time=?,external_select=? " diff --git a/symmetric-core/src/main/resources/symmetric-schema.xml b/symmetric-core/src/main/resources/symmetric-schema.xml index c64688a7e3..2e9445dcb9 100644 --- a/symmetric-core/src/main/resources/symmetric-schema.xml +++ b/symmetric-core/src/main/resources/symmetric-schema.xml @@ -36,6 +36,8 @@ + + @@ -146,6 +148,8 @@ + + @@ -662,6 +666,7 @@ + @@ -689,6 +694,9 @@ + + +