Skip to content

Commit

Permalink
Merge branch '3.9' of https://github.com/JumpMind/symmetric-ds.git in…
Browse files Browse the repository at this point in the history
…to 3.9
  • Loading branch information
jumpmind-josh committed Mar 22, 2018
2 parents cdb5c44 + 551881c commit 466b3d2
Show file tree
Hide file tree
Showing 10 changed files with 118 additions and 17 deletions.
Expand Up @@ -157,6 +157,7 @@ private ParameterConstants() {
public final static String INITIAL_LOAD_SCHEMA_DUMP_COMMAND = "initial.load.schema.dump.command";
public final static String INITIAL_LOAD_SCHEMA_LOAD_COMMAND = "initial.load.schema.load.command";
public final static String INITIAL_LOAD_EXTRACT_AND_SEND_WHEN_STAGED = "initial.load.extract.and.send.when.staged";
public final static String INITIAL_LOAD_TRANSPORT_MAX_BYTES_TO_SYNC = "initial.load.transport.max.bytes.to.sync";

public final static String CREATE_TABLE_WITHOUT_DEFAULTS = "create.table.without.defaults";
public final static String CREATE_TABLE_WITHOUT_FOREIGN_KEYS = "create.table.without.foreign.keys";
Expand All @@ -167,6 +168,7 @@ private ParameterConstants() {
public final static String STREAM_TO_FILE_ENABLED = "stream.to.file.enabled";
public final static String STREAM_TO_FILE_THRESHOLD = "stream.to.file.threshold.bytes";
public final static String STREAM_TO_FILE_TIME_TO_LIVE_MS = "stream.to.file.ttl.ms";
public final static String STREAM_TO_FILE_MIN_TIME_TO_LIVE_MS = "stream.to.file.min.ttl.ms";
public final static String STREAM_TO_FILE_PURGE_ON_TTL_ENABLED = "stream.to.file.purge.on.ttl.enabled";

public final static String PARAMETER_REFRESH_PERIOD_IN_MS = "parameter.reload.timeout.ms";
Expand Down
Expand Up @@ -57,6 +57,7 @@ public long clean(long ttlInMs) {

protected long purgeStagingBasedOnDatabaseStatus(long ttlInMs) {
boolean recordIncomingBatchesEnabled = engine.getIncomingBatchService().isRecordOkBatchesEnabled();
long minTtlInMs = engine.getParameterService().getLong(ParameterConstants.STREAM_TO_FILE_MIN_TIME_TO_LIVE_MS,600000);
List<Long> outgoingBatches = ttlInMs == 0 ? new ArrayList<Long>() : engine.getOutgoingBatchService().getAllBatches();
List<BatchId> incomingBatches = ttlInMs == 0 ? new ArrayList<BatchId>() : engine.getIncomingBatchService().getAllBatches();
Map<String, Long> biggestIncomingByNode = getBiggestBatchIds(incomingBatches);
Expand All @@ -74,6 +75,7 @@ protected long purgeStagingBasedOnDatabaseStatus(long ttlInMs) {
*/
if (resource != null && !resource.isInUse()) {
boolean resourceIsOld = (System.currentTimeMillis() - resource.getLastUpdateTime()) > ttlInMs;
boolean resourceClearsMinTimeHurdle = (System.currentTimeMillis() - resource.getLastUpdateTime()) > minTtlInMs;
if (path[0].equals(STAGING_CATEGORY_OUTGOING)) {
try {
Long batchId = new Long(path[path.length - 1]);
Expand All @@ -94,7 +96,8 @@ protected long purgeStagingBasedOnDatabaseStatus(long ttlInMs) {
BatchId batchId = new BatchId(new Long(path[path.length - 1]), path[1]);
Long biggestBatchId = biggestIncomingByNode.get(batchId.getNodeId());
if ((recordIncomingBatchesEnabled && !incomingBatches.contains(batchId) &&
biggestBatchId != null && biggestBatchId > batchId.getBatchId())
biggestBatchId != null && biggestBatchId > batchId.getBatchId() &&
resourceClearsMinTimeHurdle)
|| (!recordIncomingBatchesEnabled && resourceIsOld) || ttlInMs == 0) {
purgedFileCount++;
purgedFileSize+=resource.getSize();
Expand Down
Expand Up @@ -211,7 +211,10 @@ public void sortChannels(List<NodeChannel> channels) {
final HashMap<String, Date> errorChannels = new HashMap<String, Date>();
for (OutgoingBatch batch : batches) {
if (batch.isErrorFlag()) {
errorChannels.put(batch.getChannelId(), batch.getLastUpdatedTime());
Date date = errorChannels.get(batch.getChannelId());
if (date == null || batch.getLastUpdatedTime().compareTo(date) > 0) {
errorChannels.put(batch.getChannelId(), batch.getLastUpdatedTime());
}
}
}

Expand Down
Expand Up @@ -705,6 +705,9 @@ protected List<OutgoingBatch> extract(final ProcessInfo extractInfo, final Node
}
}

final long initialLoadMaxBytesToSync = parameterService.getLong(ParameterConstants.INITIAL_LOAD_TRANSPORT_MAX_BYTES_TO_SYNC);
long totalBytesSend = 0;
boolean logMaxBytesReached = false;
Iterator<OutgoingBatch> activeBatchIter = activeBatches.iterator();
for (int i = 0; i < futures.size(); i++) {
Future<FutureOutgoingBatch> future = futures.get(i);
Expand All @@ -731,12 +734,27 @@ protected List<OutgoingBatch> extract(final ProcessInfo extractInfo, final Node
}

if (streamToFileEnabled || mode == ExtractMode.FOR_PAYLOAD_CLIENT || (currentBatch.isExtractJobFlag() && parameterService.is(ParameterConstants.INITIAL_LOAD_USE_EXTRACT_JOB))) {

if(totalBytesSend > initialLoadMaxBytesToSync) {
if(!logMaxBytesReached) {
logMaxBytesReached = true;
log.info(
"Reached the total byte threshold for initial load after {} of {} batches were send for node '{}' (send {} bytes, the max is {}). "
+ "The remaining batches will be send on a subsequent sync.",
new Object[] { i, futures.size(), targetNode.getNodeId(), totalBytesSend, maxBytesToSync });
}
transferInfo.setStatus(ProcessStatus.OK);
break;
}

transferInfo.setStatus(ProcessInfo.ProcessStatus.TRANSFERRING);
transferInfo.setCurrentLoadId(currentBatch.getLoadId());
boolean isRetry = extractBatch.isRetry() && extractBatch.getOutgoingBatch().getStatus() != OutgoingBatch.Status.IG;

currentBatch = sendOutgoingBatch(transferInfo, targetNode, currentBatch, isRetry,
dataWriter, writer, mode);

totalBytesSend += currentBatch.getByteCount();
}

processedBatches.add(currentBatch);
Expand Down
Expand Up @@ -200,7 +200,7 @@ private static BatchAck getBatchInfo(Map<String, ? extends Object> parameters, l

batchInfo.setIgnored(getParamAsBoolean(parameters, WebConstants.ACK_IGNORE_COUNT + batchId));
String status = getParam(parameters, WebConstants.ACK_BATCH_NAME + batchId, "").trim();
batchInfo.setOk(status.equalsIgnoreCase(WebConstants.ACK_BATCH_OK));
batchInfo.setOk(status.equalsIgnoreCase(WebConstants.ACK_BATCH_OK) || status.equalsIgnoreCase(WebConstants.ACK_BATCH_RESEND));
batchInfo.setResend(status.equalsIgnoreCase(WebConstants.ACK_BATCH_RESEND));
batchInfo.setStartTime(getParamAsNum(parameters, WebConstants.ACK_START_TIME + batchId));
if (!batchInfo.isOk()) {
Expand Down
17 changes: 16 additions & 1 deletion symmetric-core/src/main/resources/symmetric-default.properties
Expand Up @@ -253,6 +253,15 @@ stream.to.file.threshold.bytes=0
# Tags: transport
stream.to.file.ttl.ms=3600000

# If stream.to.file.enabled is false and staging is purged based on the database, then this
# is the minimum amount of time a staging file will be retained after it is purged from the
# database
#
# DatabaseOverridable: true
# Tags: transport
stream.to.file.min.ttl.ms=1800000


# When this is set to false, then batches in the staging area will only be purged after they have been
# purged from the database. If this is set to true, then batches will be purged based on the
# stream.to.file.ttl.ms setting.
Expand Down Expand Up @@ -358,7 +367,7 @@ transport.type=http
#
# DatabaseOverridable: true
# Tags: transport
transport.max.bytes.to.sync=1048576
transport.max.bytes.to.sync=104857600

# Networks errors will be logged at INFO level since they are retried.
# After the maximum number of millis for network errors that continue in succession, the logging
Expand Down Expand Up @@ -517,6 +526,12 @@ registration.require.initial.load=true
# Type: boolean
initial.load.block.channels=true

# This is the number of maximum number of bytes to synchronize in one connect during an initial load.
#
# DatabaseOverridable: true
# Tags: load
initial.load.transport.max.bytes.to.sync=524288000

# Allow other channels to load when initial.load.block.channels is true and the
# reload channel goes into error. When the initial load runs while changes are
# being made, it can lead to foreign key errors that might resolve when the other
Expand Down
Expand Up @@ -20,15 +20,19 @@
*/
package org.jumpmind.symmetric.model;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertTrue;

import java.util.ArrayList;
import java.util.Calendar;
import java.util.Date;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import org.jumpmind.symmetric.model.AbstractBatch.Status;
import org.jumpmind.util.AppUtils;
import static org.junit.Assert.*;
import org.junit.Test;

public class OutgoingBatchesTest {
Expand Down Expand Up @@ -222,41 +226,92 @@ public void testChannelSortingTwoErrors() {
channels.add(channelB);
channels.add(channelA);

Date startTime = new Date();
Calendar cal = Calendar.getInstance();
cal.setTime(startTime);

// Channel A is in error and has oldest batch
List<OutgoingBatch> batches = new ArrayList<OutgoingBatch>();
OutgoingBatch batch1 = new OutgoingBatch("1", channelA.getChannelId(), Status.NE);
batch1.setStatus(OutgoingBatch.Status.ER);
batch1.setErrorFlag(true);
batch1.setLastUpdatedTime(new Date());
batch1.setLastUpdatedTime(cal.getTime());
batches.add(batch1);

AppUtils.sleep(50);

// Channel B is in error and has newest batch
cal.setTime(startTime);
cal.add(Calendar.SECOND, 1);
OutgoingBatch batch2 = new OutgoingBatch("1", channelB.getChannelId(), Status.NE);
batch2.setStatus(OutgoingBatch.Status.ER);
batch2.setErrorFlag(true);
batch2.setLastUpdatedTime(new Date());
batch2.setLastUpdatedTime(cal.getTime());
batches.add(batch2);

// Channel C is fine
OutgoingBatch batch3 = new OutgoingBatch("1", channelC.getChannelId(), Status.NE);
batches.add(batch3);

OutgoingBatches outgoingBatches = new OutgoingBatches(batches);

outgoingBatches.sortChannels(channels);

// Order should be non-error channels, followed by error channels ordered by oldest first
assertEquals(channelC, channels.get(0));
assertEquals(channelA, channels.get(1));
assertEquals(channelB, channels.get(2));

AppUtils.sleep(50);

batch1.setLastUpdatedTime(new Date());
// Make channel A look like it just tried, so it's newest
cal.setTime(startTime);
cal.add(Calendar.SECOND, 2);
batch1.setLastUpdatedTime(cal.getTime());

outgoingBatches.sortChannels(channels);

assertEquals(channelC, channels.get(0));
assertEquals(channelB, channels.get(1));
assertEquals(channelA, channels.get(2));

// Additional batch on channel A that is old
cal.setTime(startTime);
cal.add(Calendar.SECOND, -1);
OutgoingBatch batch4 = new OutgoingBatch("1", channelA.getChannelId(), Status.NE);
batch4.setStatus(OutgoingBatch.Status.ER);
batch4.setErrorFlag(true);
batch4.setLastUpdatedTime(cal.getTime());
batches.add(batch4);

// Additional batch on channel B that is oldest
cal.setTime(startTime);
cal.add(Calendar.SECOND, -2);
OutgoingBatch batch5 = new OutgoingBatch("1", channelB.getChannelId(), Status.NE);
batch5.setStatus(OutgoingBatch.Status.ER);
batch5.setErrorFlag(true);
batch5.setLastUpdatedTime(cal.getTime());
batches.add(batch5);

// Make channel B look like it just tried, so it's newest
cal.setTime(startTime);
cal.add(Calendar.SECOND, 3);
batch2.setLastUpdatedTime(cal.getTime());

// Sorting should get maximum last update time for each channel and sort error channels by oldest first
outgoingBatches.sortChannels(channels);

assertEquals(channelC, channels.get(0));
assertEquals(channelA, channels.get(1));
assertEquals(channelB, channels.get(2));

// Make channel A look like it just tried, so it's newest
cal.setTime(startTime);
cal.add(Calendar.SECOND, 4);
batch1.setLastUpdatedTime(cal.getTime());

// Sorting should get maximum last update time for each channel and sort error channels by oldest first
outgoingBatches.sortChannels(channels);

assertEquals(channelC, channels.get(0));
assertEquals(channelB, channels.get(1));
assertEquals(channelA, channels.get(2));
}

protected OutgoingBatches buildSampleBatches(String channelId, int batchCount) {
Expand Down
Expand Up @@ -158,7 +158,7 @@ public abstract class TypeMap
registerJdbcType(Types.VARBINARY, VARBINARY, JdbcTypeCategoryEnum.BINARY);
registerJdbcType(Types.VARCHAR, VARCHAR, JdbcTypeCategoryEnum.TEXTUAL);
registerJdbcType(ORACLE_TIMESTAMPTZ, TIMESTAMPTZ, JdbcTypeCategoryEnum.DATETIME);
registerJdbcType(ORACLE_TIMESTAMPLTZ, TIMESTAMPLTZ, JdbcTypeCategoryEnum.DATETIME);
registerJdbcType(ORACLE_TIMESTAMPLTZ, TIMESTAMPLTZ, JdbcTypeCategoryEnum.DATETIME);

// only available in JDK 1.4 and above:
if (PlatformUtils.supportsJava14JdbcTypes())
Expand Down
Expand Up @@ -1023,8 +1023,8 @@ public void setValues(PreparedStatement ps, Object[] args, int[] argTypes,
StatementCreatorUtils.setParameterValue(ps, i, verifyArgType(arg, argType), arg);
}
} catch (SQLException ex) {
log.warn("Parameter arg '{}' type: {} caused exception: {}", arg, argType, ex.getMessage());
throw ex;
String msg = String.format("Parameter arg '%s' type: %s caused exception: %s", arg, TypeMap.getJdbcTypeName(argType), ex.getMessage());
throw new SQLException(msg, ex);
}
}
}
Expand Down
Expand Up @@ -193,14 +193,19 @@ protected boolean isPidRunning(int pid) {
pb.redirectErrorStream(true);
BufferedReader stdout = new BufferedReader(new InputStreamReader(proc.getInputStream()));
String line = stdout.readLine();
line = stdout.readLine();
do {
line = stdout.readLine();
} while (line != null && line.trim().equals(""));
stdout.close();

if (line != null) {
String[] array = line.split("\\s+");
if (array.length > 0) {
foundProcess = true;
isRunning = array[0].toLowerCase().contains(javaExe);
if (!isRunning) {
System.out.println("Ignoring old process ID being used by " + array[0]);
}
}
}

Expand Down

0 comments on commit 466b3d2

Please sign in to comment.