Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/3.11' into 3.12
Browse files Browse the repository at this point in the history
Conflicts:
	symmetric-assemble/common.gradle
	symmetric-assemble/gradle.properties
	symmetric-core/build.gradle
	symmetric-io/build.gradle
	symmetric-server/build.gradle
	symmetric-sqlexplorer/build.gradle
  • Loading branch information
erilong committed Mar 23, 2020
2 parents 334bad9 + 7758ea3 commit 910f205
Show file tree
Hide file tree
Showing 20 changed files with 255 additions and 56 deletions.
2 changes: 1 addition & 1 deletion symmetric-assemble/build.gradle
@@ -1,7 +1,7 @@
buildscript {
repositories {
jcenter()
maven { url 'http://repo.spring.io/plugins-release' }
maven { url 'https://repo.spring.io/plugins-release' }
}

dependencies {
Expand Down
@@ -0,0 +1,36 @@
/**
* Licensed to JumpMind Inc under one or more contributor
* license agreements. See the NOTICE file distributed
* with this work for additional information regarding
* copyright ownership. JumpMind Inc licenses this file
* to you under the GNU General Public License, version 3.0 (GPLv3)
* (the "License"); you may not use this file except in compliance
* with the License.
*
* You should have received a copy of the GNU General Public License,
* version 3.0 (GPLv3) along with this library; if not, see
* <http://www.gnu.org/licenses/>.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.jumpmind.symmetric.db.ase;

import org.jumpmind.symmetric.db.ISymmetricDialect;

public class Ase16TriggerTemplate extends AseTriggerTemplate {

public Ase16TriggerTemplate(ISymmetricDialect symmetricDialect) {
super(symmetricDialect);
}

@Override
protected String getOrderClause() {
return "order 1";
}

}
Expand Up @@ -53,7 +53,11 @@ public class AseSymmetricDialect extends AbstractSymmetricDialect implements ISy

public AseSymmetricDialect(IParameterService parameterService, IDatabasePlatform platform) {
super(parameterService, platform);
this.triggerTemplate = new AseTriggerTemplate(this);
if (getMajorVersion() >= 16) {
this.triggerTemplate = new Ase16TriggerTemplate(this);
} else {
this.triggerTemplate = new AseTriggerTemplate(this);
}
}

@Override
Expand Down
Expand Up @@ -60,7 +60,7 @@ public AseTriggerTemplate(ISymmetricDialect symmetricDialect) {

sqlTemplates = new HashMap<String,String>();
sqlTemplates.put("insertTriggerTemplate" ,
"create trigger $(triggerName) on $(schemaName)$(tableName) for insert as \n" +
"create trigger $(triggerName) on $(schemaName)$(tableName) for insert " + getOrderClause() + " as\n" +
" begin \n" +
" set nocount on \n" +
" declare @clientapplname varchar(50) \n" +
Expand Down Expand Up @@ -100,7 +100,7 @@ public AseTriggerTemplate(ISymmetricDialect symmetricDialect) {


sqlTemplates.put("updateTriggerTemplate" ,
"create trigger $(triggerName) on $(schemaName)$(tableName) for update as \n" +
"create trigger $(triggerName) on $(schemaName)$(tableName) for update " + getOrderClause() + " as\n" +
" begin \n" +
" set nocount on \n" +
" declare @DataRow varchar(16384) \n" +
Expand Down Expand Up @@ -172,9 +172,8 @@ public AseTriggerTemplate(ISymmetricDialect symmetricDialect) {
" end \n"
);


sqlTemplates.put("deleteTriggerTemplate" ,
"create trigger $(triggerName) on $(schemaName)$(tableName) for delete as \n" +
"create trigger $(triggerName) on $(schemaName)$(tableName) for delete " + getOrderClause() + " as\n" +
" begin \n" +
" set nocount on \n" +
" declare @OldPk varchar(2000) \n" +
Expand Down Expand Up @@ -211,6 +210,10 @@ public AseTriggerTemplate(ISymmetricDialect symmetricDialect) {
"select $(columns) from $(schemaName)$(tableName) t where $(whereClause) " );
}

protected String getOrderClause() {
return "";
}

@Override
protected String replaceTemplateVariables(DataEventType dml, Trigger trigger,
TriggerHistory history, Channel channel, String tablePrefix, Table originalTable, Table table,
Expand Down
Expand Up @@ -87,15 +87,14 @@ protected String getSystemSchemaName() {

@Override
public void createRequiredDatabaseObjects() {
String sql = "select " + getSourceNodeExpression() + " from " + parameterService.getTablePrefix() + "_node_identity";
String sql = "select " + getSourceNodeExpression() + " from sysibm.sysdummy1";
try {
platform.getSqlTemplate().query(sql);
} catch (Exception e) {
log.debug("Failed checking for variable (usually means it doesn't exist yet) '" + sql + "'", e);
platform.getSqlTemplate().update("create variable " + getSourceNodeExpression() + " varchar(50)");
}
sql = "select " + parameterService.getTablePrefix() + VAR_TRIGGER_DISABLED + " from " + parameterService.getTablePrefix()
+ "_node_identity";
sql = "select " + parameterService.getTablePrefix() + VAR_TRIGGER_DISABLED + " from sysibm.sysdummy1";
try {
platform.getSqlTemplate().query(sql);
} catch (Exception e) {
Expand Down
Expand Up @@ -121,7 +121,7 @@ public MySqlTriggerTemplate(ISymmetricDialect symmetricDialect, boolean isConver
" insert into $(defaultCatalog)$(prefixName)_data (table_name, event_type, trigger_hist_id, pk_data, channel_id, transaction_id, source_node_id, external_data, create_time)\n" +
" values( \n" +
" '$(targetTableName)', \n" +
" 'U', \n" +
" 'R', \n" +
" $(triggerHistoryId), \n" +
" concat($(oldKeys) \n" +
" ), \n" +
Expand Down
Expand Up @@ -213,6 +213,11 @@ public void createRequiredDatabaseObjects() {
+ " END $(functionName); \r\n";
install(sql, wkt2geom);
}
}

@Override
public boolean createOrAlterTablesIfNecessary(String... tableNames) {
boolean isAltered = super.createOrAlterTablesIfNecessary(tableNames);

boolean isNoOrder = parameterService.is(ParameterConstants.DBDIALECT_ORACLE_SEQUENCE_NOORDER, false);
String seqName = getSequenceName(SequenceIdentifier.DATA).toUpperCase();
Expand All @@ -228,8 +233,9 @@ public void createRequiredDatabaseObjects() {
log.info("DDL applied: " + sql);
platform.getSqlTemplate().update(sql);
}
return isAltered;
}

@Override
public void dropRequiredDatabaseObjects() {
String blobToClob = this.parameterService.getTablePrefix() + "_" + "blob2clob";
Expand Down
Expand Up @@ -21,6 +21,7 @@
package org.jumpmind.symmetric.db.postgresql;

import java.sql.Types;
import java.util.Date;

import org.jumpmind.db.model.Table;
import org.jumpmind.db.platform.IDatabasePlatform;
Expand Down Expand Up @@ -51,11 +52,23 @@ public class PostgreSqlSymmetricDialect extends AbstractSymmetricDialect impleme
" select count(*) from information_schema.routines " +
" where routine_name = '$(functionName)' and specific_schema = '$(defaultSchema)'" ;

static final String SQL_SELECT_TRANSACTIONS = "select min(a.xact_start) from pg_stat_activity a join pg_catalog.pg_locks l on l.pid = a.pid where l.mode = 'RowExclusiveLock'";

private Boolean supportsTransactionId = null;

public PostgreSqlSymmetricDialect(IParameterService parameterService, IDatabasePlatform platform) {
super(parameterService, platform);
this.triggerTemplate = new PostgreSqlTriggerTemplate(this);

if (parameterService.is(ParameterConstants.ROUTING_GAPS_USE_TRANSACTION_VIEW)) {
try {
getEarliestTransactionStartTime();
supportsTransactionViews = true;
log.info("Enabling use of transaction views for data gap detection.");
} catch (Exception ex) {
log.warn("Cannot enable use of transaction views for data gap detection.", ex);
}
}
}

@Override
Expand Down Expand Up @@ -230,6 +243,21 @@ public boolean supportsTransactionId() {
return supportsTransactionId;
}

@Override
public Date getEarliestTransactionStartTime() {
Date minStartTime = platform.getSqlTemplate().queryForObject(SQL_SELECT_TRANSACTIONS, Date.class);
if (minStartTime == null) {
minStartTime = new Date();
}
return minStartTime;
}

@Override
public boolean supportsTransactionViews() {
return supportsTransactionViews
&& parameterService.is(ParameterConstants.ROUTING_GAPS_USE_TRANSACTION_VIEW);
}

public void cleanDatabase() {
}

Expand Down
@@ -1,18 +1,20 @@
package org.jumpmind.symmetric.io;

import org.jumpmind.db.model.Table;
import org.jumpmind.db.platform.IDatabasePlatform;
import org.jumpmind.db.sql.DmlStatement;
import org.jumpmind.db.sql.JdbcSqlTemplate;
import org.jumpmind.db.sql.JdbcSqlTransaction;
import org.jumpmind.symmetric.SymmetricException;
import org.jumpmind.symmetric.common.ContextConstants;
import org.jumpmind.symmetric.io.data.Batch;
import org.jumpmind.symmetric.io.data.CsvData;
import org.jumpmind.symmetric.io.data.writer.DatabaseWriterSettings;

public class JdbcBatchBulkDatabaseWriter extends AbstractBulkDatabaseWriter {

private DmlStatement previousDmlStatement;
private int lastRowCount = 0;

private int expectedRowCount = 0;

public JdbcBatchBulkDatabaseWriter(IDatabasePlatform symmetricPlatform, IDatabasePlatform targetPlatform,
String tablePrefix, DatabaseWriterSettings writerSettings) {
Expand All @@ -29,33 +31,77 @@ public void start(Batch batch) {
}
}

@Override
protected void bulkWrite(CsvData data) {
writeDefault(data);
}

@Override
protected LoadStatus delete(CsvData data, boolean useConflictDetection) {
LoadStatus loadStatus = super.delete(data, useConflictDetection);
if (!getTransaction().isInBatchMode()) {
return loadStatus;
}
checkForConflict(true);
return LoadStatus.SUCCESS;
}

@Override
protected LoadStatus insert(CsvData data) {
LoadStatus loadStatus = super.insert(data);
if (loadStatus == LoadStatus.CONFLICT) {
loadStatus = LoadStatus.SUCCESS;
LoadStatus loadStatus = super.insert(data);
if (!getTransaction().isInBatchMode()) {
return loadStatus;
}
return loadStatus;
checkForConflict(true);
return LoadStatus.SUCCESS;
}

@Override
protected LoadStatus update(CsvData data, boolean applyChangesOnly, boolean useConflictDetection) {
LoadStatus loadStatus = super.update(data, applyChangesOnly, useConflictDetection);
if (loadStatus == LoadStatus.CONFLICT) {
loadStatus = LoadStatus.SUCCESS;
if (!getTransaction().isInBatchMode()) {
return loadStatus;
}
return loadStatus;
checkForConflict(true);
return LoadStatus.SUCCESS;
}

@Override
protected void bulkWrite(CsvData data) {
writeDefault(data);
protected void checkForConflict(boolean isDml) {
if (isDml) {
expectedRowCount++;
}
if (getTransaction().getUnflushedMarkers(false).size() == 0) {
if (expectedRowCount != lastRowCount) {
throw new SymmetricException("JdbcBatchBulkDataWriter was in conflict, will attempt to fallback using default writer.");
}
expectedRowCount=0;
lastRowCount=0;
}
}

@Override
protected void prepare() {
getTransaction().flush();
super.prepare();
if (getTransaction().isInBatchMode()) {
lastRowCount = getTransaction().flush();
checkForConflict(false);
}
super.prepare();
}

protected int execute(CsvData data, String[] values) {
lastRowCount = super.execute(data, values);
return lastRowCount;
}

@Override
public void end(Batch batch, boolean inError) {
if (getTransaction().isInBatchMode()) {
lastRowCount = getTransaction().flush();
checkForConflict(false);
}
super.end(batch, inError);
}


}

Expand Up @@ -209,7 +209,6 @@ private ParameterConstants() {
public final static String ROUTING_STALE_DATA_ID_GAP_TIME = "routing.stale.dataid.gap.time.ms";
public final static String ROUTING_STALE_GAP_BUSY_EXPIRE_TIME = "routing.stale.gap.busy.expire.time.ms";
public final static String ROUTING_LARGEST_GAP_SIZE = "routing.largest.gap.size";
// public final static String ROUTING_DATA_READER_TYPE_GAP_RETENTION_MINUTES = "routing.data.reader.type.gap.retention.period.minutes";
public final static String ROUTING_DATA_READER_ORDER_BY_DATA_ID_ENABLED = "routing.data.reader.order.by.gap.id.enabled";
public final static String ROUTING_DATA_READER_INTO_MEMORY_ENABLED = "routing.data.reader.into.memory.enabled";
public final static String ROUTING_DATA_READER_THRESHOLD_GAPS_TO_USE_GREATER_QUERY = "routing.data.reader.threshold.gaps.to.use.greater.than.query";
Expand All @@ -221,6 +220,8 @@ private ParameterConstants() {
public final static String ROUTING_MAX_GAP_CHANGES = "routing.max.gap.changes";
public final static String ROUTING_USE_COMMON_GROUPS = "routing.use.common.groups";
public final static String ROUTING_USE_NON_COMMON_FOR_INCOMING = "routing.use.non.common.for.incoming";
public final static String ROUTING_GAPS_USE_TRANSACTION_VIEW = "routing.gaps.use.transaction.view";
public final static String ROUTING_GAPS_TRANSACTION_VIEW_CLOCK_SYNC_THRESHOLD_MS = "routing.gaps.transaction.view.clock.sync.threshold";

public final static String INCOMING_BATCH_SKIP_DUPLICATE_BATCHES_ENABLED = "incoming.batches.skip.duplicates";
@Deprecated
Expand Down
Expand Up @@ -156,8 +156,8 @@ public void verifyDatabaseIsCompatible() {
}

public void initTablesAndDatabaseObjects() {
createOrAlterTablesIfNecessary();
createRequiredDatabaseObjects();
createOrAlterTablesIfNecessary();
platform.resetCachedTableModel();
}

Expand Down
Expand Up @@ -149,7 +149,7 @@ public void write(CsvData data) {
try {
snapshot.setFileSize(Long.parseLong(columnData.get("FILE_SIZE")));
} catch (NumberFormatException nfe) {
log.info("Checksum was not a number : " + columnData.get("FILE_SIZE") + " for file " + columnData.get("FILE_NAME"));
log.info("File size was not a number : " + columnData.get("FILE_SIZE") + " for file " + columnData.get("FILE_NAME"));
}
snapshot.setLastUpdateBy(columnData.get("LAST_UPDATE_BY"));
snapshot.setFileName(columnData.get("FILE_NAME"));
Expand Down
Expand Up @@ -60,21 +60,15 @@ public MonitorEvent check(Monitor monitor) {

OutgoingBatches outgoingBatches = outgoingBatchService.getOutgoingBatchErrors(1000);
for (OutgoingBatch batch : outgoingBatches.getBatches()) {
int batchErrorMinutes = (int) (System.currentTimeMillis() - batch.getCreateTime().getTime()) / 60000;
if (batchErrorMinutes >= monitor.getThreshold()) {
outgoingErrorCount++;
outgoingErrors.add(batch);
}
outgoingErrorCount++;
outgoingErrors.add(batch);
}

int incomingErrorCount = 0;
List<IncomingBatch> incomingBatches = incomingBatchService.findIncomingBatchErrors(1000);
for (IncomingBatch batch : incomingBatches) {
int batchErrorMinutes = (int) (System.currentTimeMillis() - batch.getCreateTime().getTime()) / 60000;
if (batchErrorMinutes >= monitor.getThreshold()) {
incomingErrorCount++;
incomingErrors.add(batch);
}
incomingErrorCount++;
incomingErrors.add(batch);
}

event.setValue(outgoingErrorCount + incomingErrorCount);
Expand Down
Expand Up @@ -163,6 +163,7 @@ protected void reset() {
if (date != null) {
earliestTransactionTime = date.getTime() - parameterService.getLong(
ParameterConstants.DBDIALECT_ORACLE_TRANSACTION_VIEW_CLOCK_SYNC_THRESHOLD_MS, 60000);
log.debug("Earliest transaction time is {}", earliestTransactionTime);
}
routingStartTime = symmetricDialect.getDatabaseTime();
}
Expand Down

0 comments on commit 910f205

Please sign in to comment.