Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/3.10' into 3.11
Browse files Browse the repository at this point in the history
Conflicts:
	symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/PurgeService.java
  • Loading branch information
erilong committed Oct 11, 2019
2 parents ff07d8b + e9a7af8 commit 6770e36
Show file tree
Hide file tree
Showing 8 changed files with 45 additions and 32 deletions.
Expand Up @@ -73,7 +73,7 @@ public NodeHost(String nodeId, String instanceId) {

public void refresh(IDatabasePlatform platform, String instanceId) {
this.instanceId = instanceId;
this.hostName = AppUtils.getHostName();
this.hostName = StringUtils.left(AppUtils.getHostName(), 60);
setIpAddress(AppUtils.getIpAddress());
this.osUser = System.getProperty("user.name");
this.osName = System.getProperty("os.name");
Expand Down Expand Up @@ -106,7 +106,7 @@ public String getHostName() {
}

public void setHostName(String hostName) {
this.hostName = hostName;
this.hostName = StringUtils.left(hostName, 60);
}

public String getInstanceId() {
Expand Down
Expand Up @@ -489,6 +489,7 @@ public String getServerId() {
}
}

serverId = StringUtils.left(serverId, 255);
log.info("This node picked a server id of {}", serverId);
}
return serverId;
Expand Down
Expand Up @@ -1049,8 +1049,9 @@ protected OutgoingBatch extractOutgoingBatch(ProcessInfo extractInfo, Node targe
IDataReader dataReader = buildExtractDataReader(sourceNode, targetNode, currentBatch, extractInfo);
try {
new DataProcessor(dataReader, writer, listener, "extract").process(ctx);
} catch (ProtocolException e) {
if (!configurationService.getNodeChannel(currentBatch.getChannelId(), false).getChannel().isContainsBigLob()) {
} catch (Exception e) {
if ((e instanceof ProtocolException || (e instanceof SQLException && ((SQLException) e).getErrorCode() == 6502)) &&
!configurationService.getNodeChannel(currentBatch.getChannelId(), false).getChannel().isContainsBigLob()) {
log.warn(e.getMessage());
log.info("Re-attempting extraction for batch {} with contains_big_lobs temporarily enabled for channel {}",
currentBatch.getBatchId(), currentBatch.getChannelId());
Expand Down
Expand Up @@ -99,7 +99,6 @@
import org.jumpmind.symmetric.service.INodeService;
import org.jumpmind.symmetric.service.ITriggerRouterService;
import org.jumpmind.symmetric.service.impl.TransformService.TransformTableNodeGroupLink;
import org.jumpmind.util.AppUtils;
import org.jumpmind.util.FormatUtils;

/**
Expand Down Expand Up @@ -2701,7 +2700,7 @@ public void insertDataGap(DataGap gap) {
public void insertDataGap(ISqlTransaction transaction, DataGap gap) {
log.debug("Inserting data gap: {}", gap);
transaction.prepareAndExecute(getSql("insertDataGapSql"),
new Object[] { AppUtils.getHostName(), gap.getStartId(), gap.getEndId(),
new Object[] { engine.getClusterService().getServerId(), gap.getStartId(), gap.getEndId(),
gap.getCreateTime() }, new int[] {
Types.VARCHAR, Types.NUMERIC, Types.NUMERIC, Types.TIMESTAMP });
}
Expand Down
Expand Up @@ -26,6 +26,7 @@
import java.util.List;
import java.util.Map;

import org.apache.commons.lang.StringUtils;
import org.jumpmind.db.sql.ISqlRowMapper;
import org.jumpmind.db.sql.Row;
import org.jumpmind.symmetric.common.ParameterConstants;
Expand Down Expand Up @@ -90,7 +91,7 @@ public MonitorService(IParameterService parameterService, ISymmetricDialect symm
this.extensionService = extensionService;
this.clusterService = clusterService;
this.contextService = contextService;
hostName = AppUtils.getHostName();
hostName = StringUtils.left(AppUtils.getHostName(), 60);

IMonitorType monitorExtensions[] = { new MonitorTypeBatchError(), new MonitorTypeBatchUnsent(), new MonitorTypeCpu(),
new MonitorTypeDataGap(), new MonitorTypeDisk(), new MonitorTypeMemory(), new MonitorTypeUnrouted(), new MonitorTypeLog(), new MonitorTypeOfflineNodes() };
Expand Down
Expand Up @@ -294,20 +294,20 @@ private long purgeDataRows(final Calendar time) {
return dataDeletedCount;
}

private long purgeStranded(final Calendar time) {
private long purgeStranded(final Calendar time) {
log.info("Getting range for stranded data events");
int maxNumOfDataEventsToPurgeInTx = parameterService
.getInt(ParameterConstants.PURGE_MAX_NUMBER_OF_EVENT_BATCH_IDS);
int maxNumOfDataEventsToPurgeInTx = parameterService.getInt(ParameterConstants.PURGE_MAX_NUMBER_OF_EVENT_BATCH_IDS);
long minGapStartId = sqlTemplateDirty.queryForLong(getSql("minDataGapStartId"));
long[] minMaxEvent = queryForMinMax(getSql("selectStrandedDataEventRangeSql"), new Object[] { time.getTime() });
long[] minMaxEvent = queryForMinMax(getSql("selectStrandedDataEventRangeSql"), new Object[0]);
int strandedEventDeletedCount = purgeByMinMax(minMaxEvent, minGapStartId, MinMaxDeleteSql.STRANDED_DATA_EVENT,
time.getTime(), maxNumOfDataEventsToPurgeInTx);
statisticManager.incrementPurgedDataEventRows(strandedEventDeletedCount);

log.info("Getting range for stranded data");
int maxNumOfDataIdsToPurgeInTx = parameterService
.getInt(ParameterConstants.PURGE_MAX_NUMBER_OF_DATA_IDS);
long[] minMax = queryForMinMax(getSql("selectDataRangeSql"), new Object[0]);
int maxNumOfDataIdsToPurgeInTx = parameterService.getInt(ParameterConstants.PURGE_MAX_NUMBER_OF_DATA_IDS);
long minDataId = sqlTemplateDirty.queryForLong(getSql("selectDataMinSql"));
long minDataEventId = sqlTemplateDirty.queryForLong(getSql("selectDataEventMinSql"));
long[] minMax = new long[] { minDataId, Math.min(minDataEventId, minGapStartId)-1 };
int strandedDeletedCount = purgeByMinMax(minMax, minGapStartId, MinMaxDeleteSql.STRANDED_DATA,
time.getTime(), maxNumOfDataIdsToPurgeInTx);
statisticManager.incrementPurgedDataRows(strandedDeletedCount);
Expand Down Expand Up @@ -395,6 +395,7 @@ private int purgeByMinMax(long[] minMax, long minGapStartId, MinMaxDeleteSql ide
idSqlType, idSqlType, idSqlType, idSqlType, Types.VARCHAR};
break;
case DATA_RANGE:
case STRANDED_DATA:
deleteSql = getSql("deleteDataByRangeSql");
args = new Object[] { minId, maxId, cutoffTime };
argTypes = new int[] { idSqlType, idSqlType, Types.TIMESTAMP };
Expand Down Expand Up @@ -423,15 +424,10 @@ private int purgeByMinMax(long[] minMax, long minGapStartId, MinMaxDeleteSql ide
args = new Object[] { minId, maxId };
argTypes = new int[] { idSqlType, idSqlType };
break;
case STRANDED_DATA:
deleteSql = getSql("deleteStrandedData");
args = new Object[] { minId, maxId, minGapStartId, cutoffTime, minId, maxId };
argTypes = new int[] { idSqlType, idSqlType, idSqlType, Types.TIMESTAMP, idSqlType, idSqlType};
break;
case STRANDED_DATA_EVENT:
deleteSql = getSql("deleteStrandedDataEvent");
args = new Object[] { minId, maxId, cutoffTime, minId, maxId };
argTypes = new int[] { idSqlType, idSqlType, Types.TIMESTAMP, idSqlType, idSqlType };
args = new Object[] { minId, maxId, cutoffTime };
argTypes = new int[] { idSqlType, idSqlType, Types.TIMESTAMP };
break;
}

Expand All @@ -440,6 +436,10 @@ private int purgeByMinMax(long[] minMax, long minGapStartId, MinMaxDeleteSql ide
log.debug("Deleted {} rows", count);
totalCount += count;

if (count == 0 && (identifier == MinMaxDeleteSql.STRANDED_DATA || identifier == MinMaxDeleteSql.STRANDED_DATA_EVENT)) {
break;
}

if (System.currentTimeMillis() - ts > DateUtils.MILLIS_PER_MINUTE * 5) {
log.info("Purged {} of {} rows so far using {} statements", new Object[] {
totalCount, identifier.toString().toLowerCase(), totalDeleteStmts });
Expand Down
Expand Up @@ -67,13 +67,9 @@ public PurgeServiceSqlMap(IDatabasePlatform platform, Map<String, String> replac

putSql("updateStrandedBatchesByChannel", "update $(outgoing_batch) set status=? where channel_id=? and status != ?");

putSql("deleteStrandedData" ,
"delete from $(data) where " +
" data_id between ? and ? and " +
" data_id < ? and " +
" create_time < ? and " +
" data_id not in (select e.data_id from $(data_event) e where " +
" e.data_id between ? and ?) " );
putSql("selectDataEventMinSql", "select min(data_id) from $(data_event)");

putSql("selectDataMinSql", "select min(data_id) from $(data)");

putSql("deleteDataSql" ,
"delete from $(data) where " +
Expand Down Expand Up @@ -122,14 +118,12 @@ public PurgeServiceSqlMap(IDatabasePlatform platform, Map<String, String> replac

putSql("selectStrandedDataEventRangeSql" ,
"select min(batch_id) as min_id, max(batch_id)+1 as max_id from $(data_event) " +
"where create_time < ? " +
"and batch_id < (select min(batch_id) from $(outgoing_batch))");
"where batch_id < (select min(batch_id) from $(outgoing_batch))");

putSql("deleteStrandedDataEvent",
"delete from $(data_event) " +
"where batch_id between ? and ? " +
"and create_time < ? " +
"and batch_id not in (select batch_id from $(outgoing_batch) where batch_id between ? and ?)");
"and create_time < ? ");

putSql("minOutgoingBatchNotStatusSql",
"select min(batch_id) from $(outgoing_batch) where status != ?");
Expand Down
Expand Up @@ -452,6 +452,11 @@ private void fillTables(List<Table> tablesToFill, List<Table> orderedTables, Map
}

ISqlTransaction tran = platform.getSqlTemplate().startSqlTransaction();
DatabaseInfo dbInfo = platform.getDatabaseInfo();
String quote = dbInfo.getDelimiterToken();
String catalogSeparator = dbInfo.getCatalogSeparator();
String schemaSeparator = dbInfo.getSchemaSeparator();

int rowsInTransaction = 0;
int rowsTotal = 0;

Expand All @@ -468,6 +473,11 @@ private void fillTables(List<Table> tablesToFill, List<Table> orderedTables, Map
for (int i = 0; i < numRowsToGenerate; i++) {

for (Table table : orderedTables) {
if (table.hasAutoIncrementColumn()) {
log.info("Turning on identity insert for table " + table.getName());
tran.allowInsertIntoAutoIncrementColumns(true, table, quote, catalogSeparator, schemaSeparator);
}

int dmlType = INSERT;
if (tableProperties != null && tableProperties.containsKey(table.getName())) {
dmlType = randomIUD(tableProperties.get(table.getName()));
Expand Down Expand Up @@ -519,6 +529,10 @@ private void fillTables(List<Table> tablesToFill, List<Table> orderedTables, Map
rowsInTransaction = 0;
AppUtils.sleep(interval);
}
if (table.hasAutoIncrementColumn()) {
log.info("Turning off identity insert for table " + table.getName());
tran.allowInsertIntoAutoIncrementColumns(false, table, quote, catalogSeparator, schemaSeparator);
}
}

}
Expand Down Expand Up @@ -882,10 +896,13 @@ private void clearDependentColumnValues() {
}

private Object generateRandomValueForColumn(Table table, Column column) {

Object objectValue = null;
int type = column.getMappedTypeCode();
if (column.getPlatformColumns() != null && column.getPlatformColumns().get(platform.getName()) != null && column.getPlatformColumns().get(platform.getName()).isEnum()) {
objectValue = column.getPlatformColumns().get(platform.getName()).getEnumValues()[new Random().nextInt(column.getPlatformColumns().get(platform.getName()).getEnumValues().length)];
} else if (column.getJdbcTypeName() != null && column.getJdbcTypeName().equals("uniqueidentifier")) {
objectValue = randomUUID();
} else if (column.isTimestampWithTimezone()) {
objectValue = String.format("%s %s",
FormatUtils.TIMESTAMP_FORMATTER.format(randomDate()),
Expand Down

0 comments on commit 6770e36

Please sign in to comment.