Skip to content

Commit

Permalink
0003632: When file sync is configured and enabled, initial load of files
Browse files Browse the repository at this point in the history
cannot be turned off
  • Loading branch information
mmichalek committed Jul 20, 2018
1 parent d84277a commit cd74dc3
Show file tree
Hide file tree
Showing 3 changed files with 41 additions and 9 deletions.
Expand Up @@ -43,11 +43,13 @@
import org.jumpmind.symmetric.io.data.IDataWriter;
import org.jumpmind.symmetric.io.data.writer.DataWriterStatisticConstants;
import org.jumpmind.symmetric.io.stage.IStagedResource;
import org.jumpmind.symmetric.model.Channel;
import org.jumpmind.symmetric.model.FileSnapshot;
import org.jumpmind.symmetric.model.FileSnapshot.LastEventType;
import org.jumpmind.symmetric.model.FileTrigger;
import org.jumpmind.symmetric.model.FileTriggerRouter;
import org.jumpmind.symmetric.model.Node;
import org.jumpmind.symmetric.service.IConfigurationService;
import org.jumpmind.symmetric.service.IExtensionService;
import org.jumpmind.symmetric.service.IFileSyncService;
import org.jumpmind.symmetric.service.INodeService;
Expand All @@ -71,14 +73,16 @@ public class FileSyncZipDataWriter implements IDataWriter {
protected DataContext context;
protected INodeService nodeService;
protected IExtensionService extensionService;
protected IConfigurationService configurationService;

public FileSyncZipDataWriter(long maxBytesToSync, IFileSyncService fileSyncService,
INodeService nodeService, IStagedResource stagedResource, IExtensionService extensionService) {
INodeService nodeService, IStagedResource stagedResource, IExtensionService extensionService, IConfigurationService configurationService) {
this.maxBytesToSync = maxBytesToSync;
this.fileSyncService = fileSyncService;
this.stagedResource = stagedResource;
this.nodeService = nodeService;
this.extensionService = extensionService;
this.configurationService = configurationService;
}

public void open(DataContext context) {
Expand Down Expand Up @@ -107,13 +111,18 @@ public boolean start(Table table) {

public void write(CsvData data) {
DataEventType eventType = data.getDataEventType();

if (eventType == DataEventType.INSERT || eventType == DataEventType.UPDATE) {
if (eventType == DataEventType.INSERT) {
statistics.get(this.batch).increment(DataWriterStatisticConstants.INSERTCOUNT);
}
else {
statistics.get(this.batch).increment(DataWriterStatisticConstants.UPDATECOUNT);
}
if (filterInitialLoad(data)) {
return;
}

if (eventType == DataEventType.INSERT) {
statistics.get(this.batch).increment(DataWriterStatisticConstants.INSERTCOUNT);
}
else {
statistics.get(this.batch).increment(DataWriterStatisticConstants.UPDATECOUNT);
}
Map<String, String> columnData = data.toColumnNameValuePairs(
snapshotTable.getColumnNames(), CsvData.ROW_DATA);
Map<String, String> oldColumnData = data.toColumnNameValuePairs(
Expand Down Expand Up @@ -306,4 +315,27 @@ protected boolean isCClient(String nodeId) {
return cclient;
}

protected boolean filterInitialLoad(CsvData data) {
Channel channel = configurationService.getChannel(batch.getChannelId());
if (channel.isReloadFlag()) {
List<FileTriggerRouter> fileTriggerRouters = fileSyncService
.getFileTriggerRoutersForCurrentNode(false);
Map<String, String> columnData = data.toColumnNameValuePairs(
snapshotTable.getColumnNames(), CsvData.ROW_DATA);
String triggerId = columnData.get("TRIGGER_ID");
String routerId = columnData.get("ROUTER_ID");

for (FileTriggerRouter fileTriggerRouter : fileTriggerRouters) {
if (fileTriggerRouter.getTriggerId().equals(triggerId)
&& fileTriggerRouter.getRouterId().equals(routerId)) {
if (! fileTriggerRouter.isEnabled() || !fileTriggerRouter.isInitialLoadEnabled()) {
return true;
}
}
}
}

return false;
}

}
Expand Up @@ -128,7 +128,7 @@ protected IDataWriter buildWriter(long memoryThresholdInBytes) {
.getLong(ParameterConstants.TRANSPORT_MAX_BYTES_TO_SYNC);

FileSyncZipDataWriter fileSyncWriter = new FileSyncZipDataWriter(maxBytesToSync, fileSyncService,
nodeService, stagedResource, extensionService) {
nodeService, stagedResource, extensionService, configurationService) {
@Override
public void close() {
super.finish();
Expand Down
Expand Up @@ -588,7 +588,7 @@ public List<OutgoingBatch> sendFiles(ProcessInfo processInfo, Node targetNode,
Constants.STAGING_CATEGORY_OUTGOING, processInfo.getSourceNodeId(),
targetNode.getNodeId(), "filesync.zip");
dataWriter = new FileSyncZipDataWriter(maxBytesToSync, this,
engine.getNodeService(), stagedResource, engine.getExtensionService());
engine.getNodeService(), stagedResource, engine.getExtensionService(), engine.getConfigurationService());
}
log.debug("Extracting batch {} for filesync.", currentBatch.getNodeBatchId());

Expand Down

0 comments on commit cd74dc3

Please sign in to comment.