Skip to content

Commit

Permalink
0001643: Support multiple reload channels
Browse files Browse the repository at this point in the history
0001581: Add support for multiple channels for file sync
  • Loading branch information
chenson42 committed Mar 19, 2014
1 parent 1a491ce commit 8d55783
Show file tree
Hide file tree
Showing 23 changed files with 430 additions and 226 deletions.
Expand Up @@ -302,9 +302,9 @@ protected void init() {
outgoingBatchService, registrationService, stagingManager, this);
this.pushService = new PushService(parameterService, symmetricDialect,
dataExtractorService, acknowledgeService, transportManager, nodeService,
clusterService, nodeCommunicationService, statisticManager);
clusterService, nodeCommunicationService, statisticManager, configurationService);
this.pullService = new PullService(parameterService, symmetricDialect, nodeService,
dataLoaderService, registrationService, clusterService, nodeCommunicationService);
dataLoaderService, registrationService, clusterService, nodeCommunicationService, configurationService);
this.fileSyncService = new FileSyncService(this);
this.jobManager = createJobManager();

Expand Down
Expand Up @@ -24,6 +24,8 @@
import java.util.Collection;
import java.util.Date;

import org.jumpmind.symmetric.common.Constants;

/**
* Definition of a channel and it's priority. A channel is a group of tables
* that get synchronized together.
Expand Down Expand Up @@ -63,6 +65,10 @@ public class Channel implements Serializable {
private Date lastUpdateTime;

private String lastUpdateBy;

private boolean reloadFlag = false;

private boolean fileSyncFlag = false;

public Channel() {
}
Expand All @@ -73,10 +79,17 @@ public Channel(String id, int processingOrder) {
}

public Channel(String id, int processingOrder, int maxBatchSize, int maxBatchToSend, boolean enabled,
long extractPeriodMillis, boolean containsBigLobs, String batchAlgorithm) {
this(id, processingOrder, maxBatchSize, maxBatchToSend, enabled, extractPeriodMillis, containsBigLobs);
long extractPeriodMillis, boolean containsBigLobs, String batchAlgorithm, boolean reloadFlag, boolean filesyncFlag) {
this(id, processingOrder, maxBatchSize, maxBatchToSend, enabled, extractPeriodMillis, containsBigLobs, reloadFlag, filesyncFlag);
this.batchAlgorithm = batchAlgorithm;
}

public Channel(String id, int processingOrder, int maxBatchSize, int maxBatchToSend, boolean enabled,
long extractPeriodMillis, boolean containsBigLobs, boolean reloadFlag, boolean filesyncFlag) {
this(id, processingOrder, maxBatchSize, maxBatchToSend, enabled, extractPeriodMillis, containsBigLobs);
this.reloadFlag = reloadFlag;
this.fileSyncFlag = filesyncFlag;
}

public Channel(String id, int processingOrder, int maxBatchSize, int maxBatchToSend, boolean enabled,
long extractPeriodMillis, boolean containsBigLobs) {
Expand Down Expand Up @@ -143,14 +156,19 @@ public void setMaxBatchToSend(int maxBatchToSend) {
* @return true if a match is found
*/
public boolean isInList(Collection<? extends NodeChannel> channels) {
return findInList(channels) != null;
}


public Channel findInList(Collection<? extends NodeChannel> channels) {
if (channels != null) {
for (NodeChannel channel : channels) {
if (channel.getChannelId().equals(channelId)) {
return true;
return channel.getChannel();
}
}
}
return false;
return null;
}

public void setBatchAlgorithm(String batchAlgorithm) {
Expand Down Expand Up @@ -231,6 +249,22 @@ public Date getLastUpdateTime() {

public void setLastUpdateTime(Date lastUpdateTime) {
this.lastUpdateTime = lastUpdateTime;
}

public void setFileSyncFlag(boolean filesyncFlag) {
this.fileSyncFlag = filesyncFlag;
}

public boolean isFileSyncFlag() {
return fileSyncFlag || Constants.CHANNEL_FILESYNC.equals(channelId);
}

public void setReloadFlag(boolean reloadFlag) {
this.reloadFlag = reloadFlag;
}

public boolean isReloadFlag() {
return reloadFlag || Constants.CHANNEL_RELOAD.equals(channelId);
}

@Override
Expand Down
Expand Up @@ -33,25 +33,44 @@
import org.apache.commons.io.filefilter.OrFileFilter;
import org.apache.commons.io.filefilter.WildcardFileFilter;
import org.apache.commons.lang.StringUtils;
import org.jumpmind.symmetric.common.Constants;

public class FileTrigger implements Serializable {

private static final long serialVersionUID = 1L;

private String triggerId;

private String channelId = Constants.CHANNEL_FILESYNC;

private String reloadChannelId = Constants.CHANNEL_FILESYNC;

private String baseDir;

private boolean recurse;

private String includesFiles;

private String excludesFiles;

private boolean syncOnCreate = true;

private boolean syncOnModified = true;

private boolean syncOnDelete = true;

private boolean syncOnCtlFile = false;

private boolean deleteAfterSync = false;

private String beforeCopyScript;

private String afterCopyScript;

private Date createTime = new Date();

private String lastUpdateBy;

private Date lastUpdateTime;

public FileTrigger() {
Expand All @@ -73,7 +92,23 @@ public String getTriggerId() {
public void setTriggerId(String triggerId) {
this.triggerId = triggerId;
}

public String getChannelId() {
return channelId;
}

public void setChannelId(String channelId) {
this.channelId = channelId;
}

public String getReloadChannelId() {
return reloadChannelId;
}

public void setReloadChannelId(String reloadChannelId) {
this.reloadChannelId = reloadChannelId;
}

public String getBaseDir() {
return baseDir;
}
Expand Down
Expand Up @@ -202,5 +202,13 @@ public void setDataLoaderType(String type) {
public String getDataLoaderType() {
return channel.getDataLoaderType();
}

public void setReloadFlag(boolean value) {
this.channel.setReloadFlag(value);
}

public void setFileSyncFlag(boolean value) {
this.channel.setFileSyncFlag(value);
}

}
Expand Up @@ -22,8 +22,7 @@

import java.io.Serializable;
import java.util.List;

import org.jumpmind.symmetric.common.Constants;
import java.util.Map;

/**
* Indicates the status of an attempt to transport data from or to a remove
Expand All @@ -43,10 +42,12 @@ public static enum Status {
private long batchesProcessed;
private long reloadBatchesProcessed;
private boolean complete = false;
private Map<String, Channel> channels;

public RemoteNodeStatus(String nodeId) {
public RemoteNodeStatus(String nodeId, Map<String, Channel> channels) {
this.status = Status.NO_DATA;
this.nodeId = nodeId;
this.channels = channels;
}

public boolean failed() {
Expand Down Expand Up @@ -110,7 +111,8 @@ public void updateOutgoingStatus(List<OutgoingBatch> outgoingBatches, List<Batch
for (OutgoingBatch batch : outgoingBatches) {
batchesProcessed++;
dataProcessed += batch.totalEventCount();
if (Constants.CHANNEL_RELOAD.equals(batch.getChannelId())) {
Channel channel = channels.get(batch.getChannelId());
if (channel != null && channel.isReloadFlag()) {
reloadBatchesProcessed++;
}

Expand Down
Expand Up @@ -21,13 +21,20 @@
package org.jumpmind.symmetric.model;

import java.util.ArrayList;
import java.util.Map;

import org.jumpmind.exception.InterruptedException;
import org.jumpmind.util.AppUtils;

public class RemoteNodeStatuses extends ArrayList<RemoteNodeStatus> {

private static final long serialVersionUID = 1L;

Map<String, Channel> channels;

public RemoteNodeStatuses(Map<String, Channel> channels) {
this.channels = channels;
}

public boolean wasDataProcessed() {
boolean dataProcessed = false;
Expand Down Expand Up @@ -64,7 +71,7 @@ public boolean errorOccurred() {
public RemoteNodeStatus add(String nodeId) {
RemoteNodeStatus status = null;
if (nodeId != null) {
status = new RemoteNodeStatus(nodeId);
status = new RemoteNodeStatus(nodeId, channels);
add(status);
}
return status;
Expand Down
Expand Up @@ -57,6 +57,8 @@ public class Trigger implements Serializable {
private String sourceCatalogName;

private String channelId = Constants.CHANNEL_DEFAULT;

private String reloadChannelId = Constants.CHANNEL_RELOAD;

private boolean syncOnUpdate = true;

Expand Down Expand Up @@ -282,6 +284,14 @@ public String getChannelId() {
public void setChannelId(String channelId) {
this.channelId = channelId;
}

public String getReloadChannelId() {
return reloadChannelId;
}

public void setReloadChannelId(String reloadChannelId) {
this.reloadChannelId = reloadChannelId;
}

public boolean isSyncOnUpdate() {
return syncOnUpdate;
Expand Down
Expand Up @@ -79,7 +79,9 @@ public interface IConfigurationService {

public NodeChannel getNodeChannel(String channelId, boolean refreshExtractMillis);

public Channel getChannel (String channelId);
public Channel getChannel (String channelId);

public List<Channel> getFileSyncChannels();

public Map<String, Channel> getChannels(boolean refreshCache);

Expand Down
Expand Up @@ -35,7 +35,7 @@
import org.jumpmind.symmetric.transport.IOutgoingTransport;

public interface IFileSyncService {

public void trackChanges(boolean force);

public List<FileTrigger> getFileTriggers();
Expand Down
Expand Up @@ -32,6 +32,7 @@
import org.jumpmind.symmetric.io.stage.IStagingManager;
import org.jumpmind.symmetric.model.BatchAck;
import org.jumpmind.symmetric.model.BatchAckResult;
import org.jumpmind.symmetric.model.Channel;
import org.jumpmind.symmetric.model.OutgoingBatch;
import org.jumpmind.symmetric.model.OutgoingBatch.Status;
import org.jumpmind.symmetric.service.IAcknowledgeService;
Expand Down Expand Up @@ -130,11 +131,11 @@ public BatchAckResult ack(final BatchAck batch) {
}
}

//TODO: I should really be able to catch errors here, but can't do to how this is coded
outgoingBatchService.updateOutgoingBatch(outgoingBatch);
if (status == Status.OK) {
if (outgoingBatch.getChannelId().equals(Constants.CHANNEL_FILESYNC)){
//Acknowledge the file_sync in case the file needs deleted.
Channel channel = engine.getConfigurationService().getChannel(outgoingBatch.getChannelId());
if (channel != null && channel.isFileSyncFlag()){
/* Acknowledge the file_sync in case the file needs deleted. */
engine.getFileSyncService().acknowledgeFiles(outgoingBatch);
}
}
Expand Down

0 comments on commit 8d55783

Please sign in to comment.