Skip to content

Commit

Permalink
Merge branch '3.8' of https://github.com/JumpMind/symmetric-ds.git in…
Browse files Browse the repository at this point in the history
…to 3.8
  • Loading branch information
jumpmind-josh committed Aug 18, 2017
2 parents 0d85b90 + e578608 commit a59bf73
Show file tree
Hide file tree
Showing 30 changed files with 188 additions and 109 deletions.
12 changes: 9 additions & 3 deletions symmetric-assemble/src/asciidoc/configuration/table-triggers.ad
Expand Up @@ -56,9 +56,15 @@ Sync On Insert:: Determines if changes will be captured for inserts.
Sync On Update:: Determines if changes will be captured for updates.
Sync On Delete:: Determines if changes will be captured for deletes.
Reload Channel Id:: The channel_id of the channel that will be used for initial loads.
Sync On Insert Condition:: Specify a condition for the insert trigger firing using an expression specific to the database. On most platforms, it is added to an "IF" statement in the trigger text. On SQL-Server it is added to the "WHERE" clause of a query for inserted/deleted logical tables.
Sync On Update Condition:: Specify a condition for the update trigger firing using an expression specific to the database. On most platforms, it is added to an "IF" statement in the trigger text. On SQL-Server it is added to the "WHERE" clause of a query for inserted/deleted logical tables.
Sync On Delete Condition:: Specify a condition for the delete trigger firing using an expression specific to the database. On most platforms, it is added to an "IF" statement in the trigger text. On SQL-Server it is added to the "WHERE" clause of a query for inserted/deleted logical tables.
Sync On Insert Condition:: Specify a condition for the insert trigger firing using an expression specific to the database. On most platforms, it is added to an "IF" statement in the trigger text. On SQL-Server it is added to the "WHERE" clause of a query for inserted/deleted logical tables. See Sync Condition Example.
Sync On Update Condition:: Specify a condition for the update trigger firing using an expression specific to the database. On most platforms, it is added to an "IF" statement in the trigger text. On SQL-Server it is added to the "WHERE" clause of a query for inserted/deleted logical tables. See Sync Condition Example.
Sync On Delete Condition:: Specify a condition for the delete trigger firing using an expression specific to the database. On most platforms, it is added to an "IF" statement in the trigger text. On SQL-Server it is added to the "WHERE" clause of a query for inserted/deleted logical tables. See Sync Condition Example.
Sync Condition Example:: Sync Conditions can access both old values and new values of a field/column using "old_" and "new_" respectively. For example, if your column is id and your condition checks the value coming in to be 'test', your condition will be:
+
----
new_id = 'test'
----

Custom Insert Trigger Text:: Specify insert trigger text (SQL) to execute after the SymmetricDS trigger fires. This field is not applicable for H2, HSQLDB 1.x or Apache Derby.
Custom Update Trigger Text:: Specify update trigger text (SQL) to execute after the SymmetricDS trigger fires. This field is not applicable for H2, HSQLDB 1.x or Apache Derby.
Custom Delete Trigger Text:: Specify delete trigger text (SQL) to execute after the SymmetricDS trigger fires. This field is not applicable for H2, HSQLDB 1.x or Apache Derby.
Expand Down
Expand Up @@ -94,11 +94,11 @@ public void removeTrigger(StringBuilder sqlBuffer, String catalogName, String sc
}

public void disableSyncTriggers(ISqlTransaction transaction, String nodeId) {

//NUODB does not currently support this
}

public void enableSyncTriggers(ISqlTransaction transaction) {

//NUODB does not currently support this
}

public String getSyncTriggersExpression() {
Expand Down
Expand Up @@ -31,7 +31,7 @@ public class NuoDbTriggerTemplate extends AbstractTriggerTemplate {
public NuoDbTriggerTemplate(ISymmetricDialect symmetricDialect) {
super(symmetricDialect);
emptyColumnTemplate = "''" ;
stringColumnTemplate = "cast(case when $(tableAlias).`$(columnName)` is null then '' else concat('\"',replace(replace($(tableAlias).`$(columnName)`,'\\\\','\\\\\\\\'),'\"','\\\\\"'),'\"') end as char)\n" ;
stringColumnTemplate = "cast(case when $(tableAlias).`$(columnName)` is null then '' else concat('\"',replace(replace($(tableAlias).`$(columnName)`,'\\\\','\\\\\\\\'),'\"','\\\"'),'\"') end as char)\n" ;
numberColumnTemplate = "case when $(tableAlias).\"$(columnName)\" is null then '' else concat('\"',cast($(tableAlias).\"$(columnName)\" as char),'\"') end \n" ;
datetimeColumnTemplate = "case when $(tableAlias).\"$(columnName)\" is null then '' else concat('\"',cast($(tableAlias).\"$(columnName)\" as char),'\"') end\n" ;
clobColumnTemplate = stringColumnTemplate;
Expand Down
Expand Up @@ -39,6 +39,7 @@
import org.springframework.jmx.export.annotation.ManagedResource;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.scheduling.support.CronTrigger;
import org.springframework.scheduling.support.SimpleTriggerContext;

@ManagedResource(description = "The management interface for a job")
abstract public class AbstractJob implements Runnable, IJob {
Expand Down Expand Up @@ -93,8 +94,9 @@ public void start() {
int timeBetweenRunsInMs = engine.getParameterService().getInt(
jobName + ".period.time.ms", -1);
if (!StringUtils.isBlank(cronExpression)) {
log.info("Starting {} with cron expression: {}", jobName, cronExpression);
this.scheduledJob = taskScheduler.schedule(this, new CronTrigger(cronExpression));
CronTrigger cron = new CronTrigger(cronExpression);
log.info("Starting {} with cron expression: {}. Next scheduled time is: {}", jobName, cronExpression, cron.nextExecutionTime(new SimpleTriggerContext()));
this.scheduledJob = taskScheduler.schedule(this, cron);
started = true;
} else {
int startDelay = randomTimeSlot.getRandomValueSeededByExternalId();
Expand Down
Expand Up @@ -515,7 +515,7 @@ public void testExportCsvToDirectory() throws Exception {
Assert.assertTrue(a.isFile());
List<String> lines = FileUtils.readLines(a);
Assert.assertEquals(9, lines.size());
Assert.assertEquals("\"id\",\"string_value\"", lines.get(5));
Assert.assertTrue("\"id\",\"string_value\"".equalsIgnoreCase(lines.get(5)));
Assert.assertEquals("\"1\",\"This is a test of a\"", lines.get(6));
Assert.assertEquals("\"2\",\"This is a test of a\"", lines.get(7));

Expand All @@ -524,7 +524,7 @@ public void testExportCsvToDirectory() throws Exception {
Assert.assertTrue(b.isFile());
lines = FileUtils.readLines(b);
Assert.assertEquals(10, lines.size());
Assert.assertEquals("\"id\",\"string_value\"", lines.get(5));
Assert.assertTrue("\"id\",\"string_value\"".equalsIgnoreCase(lines.get(5)));
Assert.assertEquals("\"1\",\"This is a test of b\"", lines.get(6));
Assert.assertEquals("\"2\",\"This is a test of b\"", lines.get(7));
Assert.assertEquals("\"3\",\"This is line 3 of b\"", lines.get(8));
Expand All @@ -534,7 +534,7 @@ public void testExportCsvToDirectory() throws Exception {
Assert.assertTrue(c.isFile());
lines = FileUtils.readLines(c);
Assert.assertEquals(9, lines.size());
Assert.assertEquals("\"id\",\"string_value\"", lines.get(5));
Assert.assertTrue("\"id\",\"string_value\"".equalsIgnoreCase(lines.get(5)));
Assert.assertEquals("\"1\",\"This is a test of c\"", lines.get(6));
Assert.assertEquals("\"2\",\"This is a test of c\"", lines.get(7));

Expand Down Expand Up @@ -580,5 +580,8 @@ protected Row findInList(List<Row> rows, String pk, Object pkValue) {
}
return null;
}




}
Expand Up @@ -91,7 +91,7 @@ public class ConfigurationChangedDatabaseWriterFilter extends DatabaseWriterFilt
final String CTX_KEY_REINITIALIZED = "Reinitialized."
+ ConfigurationChangedDatabaseWriterFilter.class.getSimpleName() + hashCode();

final String CTX_KEY_FILE_SYNC_TRIGGERS_NEEDED = "FileSyncTriggers."
final String CTX_KEY_FILE_SYNC_ENABLED = "FileSyncEnabled."
+ ConfigurationChangedDatabaseWriterFilter.class.getSimpleName() + hashCode();

private ISymmetricEngine engine;
Expand Down Expand Up @@ -130,7 +130,7 @@ public void afterWrite(DataContext context, Table table, CsvData data) {
recordConflictFlushNeeded(context, table);
recordNodeSecurityFlushNeeded(context, table);
recordNodeFlushNeeded(context, table, data);
recordFileSyncTriggersNeeded(context, table, data);
recordFileSyncEnabled(context, table, data);
}

private void recordGroupletFlushNeeded(DataContext context, Table table) {
Expand Down Expand Up @@ -223,9 +223,9 @@ private void recordNodeFlushNeeded(DataContext context, Table table, CsvData dat
}
}

private void recordFileSyncTriggersNeeded(DataContext context, Table table, CsvData data) {
if (isFileSyncTriggersNeeded(table, data)) {
context.put(CTX_KEY_FILE_SYNC_TRIGGERS_NEEDED, true);
private void recordFileSyncEnabled(DataContext context, Table table, CsvData data) {
if (isFileSyncEnabled(table, data)) {
context.put(CTX_KEY_FILE_SYNC_ENABLED, true);
}
}

Expand Down Expand Up @@ -268,10 +268,11 @@ private boolean isJobManagerRestartNeeded(Table table, CsvData data) {
&& data.getCsvData(CsvData.ROW_DATA).contains("job.");
}

private boolean isFileSyncTriggersNeeded(Table table, CsvData data) {
private boolean isFileSyncEnabled(Table table, CsvData data) {
return matchesTable(table, TableConstants.SYM_PARAMETER)
&& data.getCsvData(CsvData.ROW_DATA) != null
&& data.getCsvData(CsvData.ROW_DATA).contains("file.sync.enable");
&& data.getCsvData(CsvData.ROW_DATA).contains(ParameterConstants.FILE_SYNC_ENABLE)
&& data.getCsvData(CsvData.ROW_DATA).contains("true");
}

private boolean isTransformFlushNeeded(Table table) {
Expand Down Expand Up @@ -335,13 +336,14 @@ public void syncEnded(DataContext context, List<IncomingBatch> batchesProcessed,
context.remove(CTX_KEY_RESYNC_TABLE_NEEDED);
}

if (context.get(CTX_KEY_FILE_SYNC_TRIGGERS_NEEDED) != null
if (context.get(CTX_KEY_FILE_SYNC_ENABLED) != null
&& parameterService.is(ParameterConstants.AUTO_SYNC_TRIGGERS)) {
log.info("About to syncTriggers for file snapshot because the file sync parameter has changed");
engine.clearCaches();
Table fileSnapshotTable = engine.getDatabasePlatform()
.getTableFromCache(TableConstants.getTableName(engine.getTablePrefix(), TableConstants.SYM_FILE_SNAPSHOT), false);
engine.getTriggerRouterService().syncTriggers(fileSnapshotTable, false);
context.remove(CTX_KEY_FILE_SYNC_TRIGGERS_NEEDED);
context.remove(CTX_KEY_FILE_SYNC_ENABLED);
}
}

Expand Down
Expand Up @@ -109,15 +109,19 @@ public void updateIncomingStatus(List<IncomingBatch> incomingBatches) {
}

public void updateOutgoingStatus(List<OutgoingBatch> outgoingBatches, List<BatchAck> batches) {
int numberOfAcks = 0;
if (batches != null) {
numberOfAcks = batches.size();
for (BatchAck batch : batches) {
if (!batch.isOk()) {
status = Status.DATA_ERROR;
}
}
}

int numberOfBatches = 0;
if (outgoingBatches != null) {
numberOfBatches = outgoingBatches.size();
for (OutgoingBatch batch : outgoingBatches) {
batchesProcessed++;
dataProcessed += batch.totalEventCount();
Expand All @@ -131,6 +135,10 @@ public void updateOutgoingStatus(List<OutgoingBatch> outgoingBatches, List<Batch
}
}
}

if (numberOfAcks != numberOfBatches) {
status = Status.DATA_ERROR;
}

if (status != Status.DATA_ERROR && dataProcessed > 0) {
status = Status.DATA_PROCESSED;
Expand Down
Expand Up @@ -462,7 +462,7 @@ protected void queueSyncTriggers(SimpleRouterContext routingContext, DataMetaDat
routingContext.put(CTX_KEY_RESYNC_NEEDED, Boolean.TRUE);
} else if (tableMatches(dataMetaData, TableConstants.SYM_PARAMETER)) {
if (dataMetaData.getData().getCsvData(CsvData.ROW_DATA) != null
&& dataMetaData.getData().getCsvData(CsvData.ROW_DATA).contains("file.sync.enable")) {
&& dataMetaData.getData().getCsvData(CsvData.ROW_DATA).contains(ParameterConstants.FILE_SYNC_ENABLE)) {
routingContext.put(CTX_KEY_FILE_SYNC_TRIGGERS_NEEDED, Boolean.TRUE);
}
}
Expand Down
Expand Up @@ -67,5 +67,7 @@ public boolean extractBatchRange(Writer writer, String nodeId, Date startBatchTi
public void requestExtractRequest(ISqlTransaction transaction, String nodeId, String channelId, TriggerRouter triggerRouter, long startBatchId, long endBatchId);

public void resetExtractRequest(OutgoingBatch batch);


public void removeBatchFromStaging(OutgoingBatch batch);

}
Expand Up @@ -37,7 +37,6 @@
import org.jumpmind.db.platform.IDatabasePlatform;
import org.jumpmind.db.sql.ISqlTemplate;
import org.jumpmind.db.sql.ISqlTransaction;
import org.jumpmind.symmetric.SymmetricException;
import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.common.TableConstants;
import org.jumpmind.symmetric.db.ISymmetricDialect;
Expand All @@ -47,6 +46,7 @@
import org.jumpmind.symmetric.model.NodeSecurity;
import org.jumpmind.symmetric.model.OutgoingBatch;
import org.jumpmind.symmetric.service.IAcknowledgeService;
import org.jumpmind.symmetric.service.IDataExtractorService;
import org.jumpmind.symmetric.service.IParameterService;
import org.jumpmind.symmetric.service.IService;
import org.jumpmind.symmetric.transport.IOutgoingWithResponseTransport;
Expand Down Expand Up @@ -293,8 +293,8 @@ protected void sendAck(Node remote, Node local, NodeSecurity localSecurity,
}


protected List<BatchAck> readAcks(List<OutgoingBatch> batches, IOutgoingWithResponseTransport transport,
ITransportManager transportManager, IAcknowledgeService acknowledgeService)
protected List<BatchAck> readAcks(List<OutgoingBatch> batches, IOutgoingWithResponseTransport transport,
ITransportManager transportManager, IAcknowledgeService acknowledgeService, IDataExtractorService dataExtratorService)
throws IOException {

Set<Long> batchIds = new HashSet<Long>(batches.size());
Expand All @@ -319,11 +319,6 @@ protected List<BatchAck> readAcks(List<OutgoingBatch> batches, IOutgoingWithRes
}
} while (line != null);

if (StringUtils.isBlank(ackString)) {
throw new SymmetricException("Did not receive an acknowledgement for the batches sent. "
+ "The 'ack string' was: '%s' and the 'extended ack string' was: '%s'", ackString, ackExtendedString);
}

List<BatchAck> batchAcks = transportManager.readAcknowledgement(ackString,
ackExtendedString);

Expand All @@ -339,8 +334,20 @@ protected List<BatchAck> readAcks(List<OutgoingBatch> batches, IOutgoingWithRes
}

for (Long batchId : batchIds) {
if (batchId < batchIdInError) {
log.error("We expected but did not receive an ack for batch {}", batchId);
if (batchId < batchIdInError) {
for (OutgoingBatch outgoingBatch : batches) {
if (outgoingBatch.getBatchId() == batchId) {
log.warn("We expected but did not receive an ack for batch {}.", outgoingBatch.getNodeBatchId());
if (dataExtratorService != null) {
if (!outgoingBatch.isLoadFlag()) {
log.info("This could be because the batch is corrupt. Removing the batch from staging");
dataExtratorService.removeBatchFromStaging(outgoingBatch);
} else {
log.info("This could be because the batch is corrupt. Not removing the batch because it was a load batch, but you may need to clear the batch from staging manually");
}
}
}
}
}
}

Expand Down
Expand Up @@ -2127,6 +2127,16 @@ public void close() {
}

}

@Override
public void removeBatchFromStaging(OutgoingBatch batch) {
IStagedResource resource = getStagedResource(batch);
if (resource != null) {
resource.delete();
} else {
log.info("Could not remove batch {} from staging because it did not exist", batch.getNodeBatchId());
}
}

class SelectFromTableEvent {

Expand Down
Expand Up @@ -411,7 +411,7 @@ public void save(List<FileSnapshot> changes) {

public void save(ISqlTransaction sqlTransaction, FileSnapshot snapshot) {
snapshot.setLastUpdateTime(new Date());
if (0 == sqlTransaction.prepareAndExecute(
if (0 >= sqlTransaction.prepareAndExecute(
getSql("updateFileSnapshotSql"),
new Object[] { snapshot.getLastEventType().getCode(), snapshot.getCrc32Checksum(),
snapshot.getFileSize(), snapshot.getFileModifiedTime(),
Expand Down Expand Up @@ -808,7 +808,7 @@ protected void pushFilesToNode(NodeCommunication nodeCommunication, RemoteNodeSt
((FileOutgoingTransport) transport).setProcessedBatches(batches);
}
List<BatchAck> batchAcks = readAcks(batches, transport,
transportManager, engine.getAcknowledgeService());
transportManager, engine.getAcknowledgeService(), null);
status.updateOutgoingStatus(batches, batchAcks);
}
if (!status.failed() && batches.size() > 0) {
Expand Down Expand Up @@ -996,7 +996,7 @@ protected void updateFileIncoming(String nodeId, Map<String, String> filesToEven
long lastUpdateTime = file.lastModified();
int updateCount = sqlTemplate.update(getSql("updateFileIncoming"), nodeId,
lastUpdateTime, eventType, dirName, fileName);
if (updateCount == 0) {
if (updateCount <= 0) {
sqlTemplate.update(getSql("insertFileIncoming"), nodeId, lastUpdateTime, eventType,
dirName, fileName);
}
Expand Down

0 comments on commit a59bf73

Please sign in to comment.