Skip to content

Commit

Permalink
Merge branch '3.12' of https://github.com/JumpMind/symmetric-ds.git i…
Browse files Browse the repository at this point in the history
…nto 3.12
  • Loading branch information
joshhicks committed Mar 1, 2021
2 parents ff70f02 + 64ccf38 commit e62974a
Show file tree
Hide file tree
Showing 28 changed files with 139 additions and 39 deletions.
Expand Up @@ -153,7 +153,7 @@ public void exportTestDatabaseSQL() throws Exception {
return;
}

final int EXPECTED_VARCHAR_MAX_COUNT = engine.getDatabasePlatform().getName().equals(DatabaseNamesConstants.SQLITE) ? 317 : 60;
final int EXPECTED_VARCHAR_MAX_COUNT = engine.getDatabasePlatform().getName().equals(DatabaseNamesConstants.SQLITE) ? 318 : 60;
final String EXPECTED_VARCHAR_MAX_STRING = "varchar(" + Integer.MAX_VALUE + ")";
final int actualVarcharMaxCount = StringUtils.countMatches(output, EXPECTED_VARCHAR_MAX_STRING);
String msg = String.format("Expected %s, but got %s in the following output %s",
Expand Down
Expand Up @@ -133,7 +133,7 @@ public class SymmetricAdmin extends AbstractCommandLauncher {

private static final String CMD_RESTORE_FILE_CONFIGURATION = "restore-config";

private static final String[] NO_ENGINE_REQUIRED = { CMD_EXPORT_PROPERTIES, CMD_ENCRYPT_TEXT, CMD_OBFUSCATE_TEXT, CMD_LIST_ENGINES, CMD_MODULE, CMD_BACKUP_FILE_CONFIGURATION, CMD_RESTORE_FILE_CONFIGURATION };
private static final String[] NO_ENGINE_REQUIRED = { CMD_EXPORT_PROPERTIES, CMD_ENCRYPT_TEXT, CMD_OBFUSCATE_TEXT, CMD_LIST_ENGINES, CMD_MODULE, CMD_BACKUP_FILE_CONFIGURATION, CMD_RESTORE_FILE_CONFIGURATION, CMD_CREATE_WAR };

private static final String OPTION_NODE = "node";

Expand Down Expand Up @@ -683,7 +683,10 @@ private void generateWar(CommandLine line, List<String> args) throws Exception {
final File workingDirectory = new File(AppUtils.getSymHome() + "/.war");
FileUtils.deleteDirectory(workingDirectory);
FileUtils.copyDirectory(new File(AppUtils.getSymHome() + "/web"), workingDirectory);
FileUtils.copyToDirectory(new File(AppUtils.getSymHome() + "/conf/instance.uuid"), new File(workingDirectory, "WEB-INF/classes"));
File instanceIdFile = new File(AppUtils.getSymHome() + "/conf/instance.uuid");
if (instanceIdFile.canRead()) {
FileUtils.copyToDirectory(instanceIdFile, new File(workingDirectory, "WEB-INF/classes"));
}

boolean useProperties = (line.hasOption(OPTION_PROPERTIES_FILE) || line.hasOption(OPTION_ENGINE)) &&
propertiesFile != null && propertiesFile.exists();
Expand Down
Expand Up @@ -3,7 +3,7 @@ bigquery=com.fasterxml.jackson.core:jackson-core:2.10.2, com.google.api:api-comm
cassandra=com.datastax.cassandra:cassandra-driver-core:3.1.4
db2=jdbc.db2:db2jcc:9.7, net.sf.jt400:jt400:9.7
derby=org.apache.derby:derby:10.14.2.0, org.apache.derby:derbytools:10.14.2.0
firebird=org.firebirdsql.jdbc:jaybird-jdk18:3.0.8, javax.resource:connector-api:1.5
firebird=org.firebirdsql.jdbc:jaybird:4.0.2.java8, javax.resource:connector-api:1.5
#h2=com.h2database:h2:1.3.176
hana=com.sap.cloud.db.jdbc:ngdbc:2.3.62
hbase=com.google.guava:guava:28.1-android, com.google.protobuf:protobuf-java:2.5.0, com.jcraft:jsch:0.1.42, commons-configuration:commons-configuration:1.6, commons-digester:commons-digester:1.8, commons-el:commons-el:1.0, commons-httpclient:commons-httpclient:3.1, commons-net:commons-net:3.1, com.yammer.metrics:metrics-core:2.2.0, io.netty:netty:3.7.0.Final, io.netty:netty-all:4.0.50.Final, org.apache.avro:avro:1.8.2, org.apache.commons:commons-compress:1.8.1, org.apache.commons:commons-math3:3.1.1, org.apache.directory.api:api-asn1-api:1.0.0-M20, org.apache.directory.api:api-util:1.0.0-M20, org.apache.directory.server:apacheds-i18n:2.0.0-M15, org.apache.directory.server:apacheds-kerberos-codec:2.0.0-M15, org.apache.hadoop:hadoop-annotations:2.5.1, org.apache.hadoop:hadoop-auth:2.5.1, org.apache.hadoop:hadoop-common:2.5.1, org.apache.hadoop:hadoop-mapreduce-client-core:2.5.1, org.apache.hadoop:hadoop-yarn-api:2.5.1, org.apache.hadoop:hadoop-yarn-common:2.5.1, org.apache.hbase:hbase-annotations:1.3.6, org.apache.hbase:hbase-client:1.3.6, org.apache.hbase:hbase-common:1.3.6, org.apache.hbase:hbase-protocol:1.3.6, org.apache.hbase.thirdparty:hbase-shaded-gson:3.0.0, org.apache.htrace:htrace-core:3.1.0-incubating, org.apache.httpcomponents:httpclient:4.5.10, org.apache.httpcomponents:httpcore:4.4.12, org.apache.phoenix:phoenix:5.0.0-HBase-2.0, org.apache.zookeeper:zookeeper:3.4.8, org.jruby.jcodings:jcodings:1.0.8, org.jruby.joni:joni:2.1.2, xmlenc:xmlenc:0.52
Expand Down
Expand Up @@ -537,7 +537,9 @@ protected void autoConfigRegistrationServer() {
if (node == null) {
buildTablesFromDdlUtilXmlIfProvided();
loadFromScriptIfProvided();
parameterService.setDatabaseHasBeenInitialized(true);
parameterService.rereadParameters();
extensionService.refresh();
}

node = nodeService.findIdentity();
Expand Down
Expand Up @@ -43,6 +43,7 @@
import org.jumpmind.symmetric.io.data.CsvUtils;
import org.jumpmind.symmetric.io.data.DataEventType;
import org.jumpmind.symmetric.io.data.IDataWriter;
import org.jumpmind.symmetric.io.data.writer.AbstractDatabaseWriter;
import org.jumpmind.symmetric.io.data.writer.BigQueryDatabaseWriter;
import org.jumpmind.symmetric.io.data.writer.CassandraDatabaseWriter;
import org.jumpmind.symmetric.io.data.writer.Conflict;
Expand Down Expand Up @@ -163,8 +164,11 @@ protected void beforeResolutionAttempt(CsvData csvData, Conflict conflict) {

@Override
protected void afterResolutionAttempt(CsvData csvData, Conflict conflict) {
DynamicDefaultDatabaseWriter writer = transformWriter.getNestedWriterOfType(DynamicDefaultDatabaseWriter.class);
if (Boolean.TRUE.equals(writer.getContext().get(AbstractDatabaseWriter.TRANSACTION_ABORTED))) {
return;
}
if (conflict.getPingBack() == PingBack.SINGLE_ROW) {
DynamicDefaultDatabaseWriter writer = transformWriter.getNestedWriterOfType(DynamicDefaultDatabaseWriter.class);
ISqlTransaction transaction = writer.getTransaction();
if (transaction != null) {
symmetricDialect.disableSyncTriggers(transaction, sourceNodeId);
Expand All @@ -173,7 +177,6 @@ protected void afterResolutionAttempt(CsvData csvData, Conflict conflict) {
if (conflict.getResolveType() == ResolveConflict.NEWER_WINS &&
conflict.getDetectType() != DetectConflict.USE_TIMESTAMP &&
conflict.getDetectType() != DetectConflict.USE_VERSION) {
DynamicDefaultDatabaseWriter writer = transformWriter.getNestedWriterOfType(DynamicDefaultDatabaseWriter.class);
Boolean isWinner = (Boolean) writer.getContext().get(DatabaseConstants.IS_CONFLICT_WINNER);
if (isWinner != null && isWinner == true) {
writer.getContext().remove(DatabaseConstants.IS_CONFLICT_WINNER);
Expand Down
Expand Up @@ -72,6 +72,8 @@ public interface IDataLoaderService {

public List<IncomingError> getIncomingErrors(long batchId, String nodeId);

public IncomingError getIncomingError(long batchId, String nodeId, long rowNumber);

public IncomingError getCurrentIncomingError(long batchId, String nodeId);

public void insertIncomingError(ISqlTransaction transaction, IncomingError incomingError);
Expand Down
Expand Up @@ -162,9 +162,6 @@ protected void initInstanceId() {
instanceIdFile = defaultFile;
} else {
instanceIdURL = getClass().getClassLoader().getResource("/instance.uuid");
if (instanceIdURL == null) {
instanceIdFile = defaultFile;
}
}

if (instanceIdFile != null) {
Expand Down Expand Up @@ -194,12 +191,12 @@ protected void initInstanceId() {
if (newInstanceId == null) {
newInstanceId = generateInstanceId(AppUtils.getHostName());
}
instanceId = newInstanceId;
isUpgradedInstanceId = true;
if (instanceIdFile != null) {
try {
instanceIdFile.getParentFile().mkdirs();
IOUtils.write(newInstanceId, new FileOutputStream(instanceIdFile), Charset.defaultCharset());
instanceId = newInstanceId;
isUpgradedInstanceId = true;
} catch (Exception ex) {
throw new SymmetricException("Failed to save file '" + instanceIdFile + "' Please correct and restart this node.", ex);
}
Expand Down
Expand Up @@ -886,6 +886,11 @@ public List<IncomingError> getIncomingErrors(long batchId, String nodeId) {
batchId, nodeId);
}

public IncomingError getIncomingError(long batchId, String nodeId, long rowNumber) {
return sqlTemplate.queryForObject(getSql("selectIncomingErrorSql") + " and failed_row_number = ?",
new IncomingErrorMapper(), batchId, nodeId, rowNumber);
}

public IncomingError getCurrentIncomingError(long batchId, String nodeId) {
return sqlTemplate.queryForObject(getSql("selectCurrentIncomingErrorSql"),
new IncomingErrorMapper(), batchId, nodeId);
Expand Down
Expand Up @@ -262,6 +262,8 @@ public boolean acquireIncomingBatch(IncomingBatch batch) {
|| !parameterService.is(ParameterConstants.INCOMING_BATCH_SKIP_DUPLICATE_BATCHES_ENABLED)) {
okayToProcess = true;
batch.setErrorFlag(existingBatch.isErrorFlag());
batch.setFailedLineNumber(existingBatch.getFailedLineNumber());
batch.setFailedRowNumber(existingBatch.getFailedRowNumber());
existingBatch.setStatus(Status.LD);
log.info("Retrying batch {}", batch.getNodeBatchId());
} else if (existingBatch.getStatus() == Status.IG) {
Expand Down Expand Up @@ -380,6 +382,8 @@ public int updateIncomingBatch(ISqlTransaction transaction, IncomingBatch batch)
if (batch.getStatus() == IncomingBatch.Status.OK) {
batch.setErrorFlag(false);
batch.setFailedDataId(0);
batch.setFailedLineNumber(0l);
batch.setFailedRowNumber(0l);
}
batch.setLastUpdatedHostName(clusterService.getServerId());
count = transaction.prepareAndExecute(getSql("updateIncomingBatchSql"),
Expand Down
Expand Up @@ -46,6 +46,7 @@
import org.jumpmind.symmetric.io.data.DataContext;
import org.jumpmind.symmetric.io.data.IDataProcessorListener;
import org.jumpmind.symmetric.io.data.ProtocolException;
import org.jumpmind.symmetric.io.data.writer.AbstractDatabaseWriter;
import org.jumpmind.symmetric.io.data.writer.Conflict;
import org.jumpmind.symmetric.io.data.writer.ConflictException;
import org.jumpmind.symmetric.io.data.writer.DefaultDatabaseWriter;
Expand Down Expand Up @@ -182,7 +183,9 @@ protected void enableSyncTriggers(DataContext context) {
try {
ISqlTransaction transaction = context.findSymmetricTransaction(engine.getTablePrefix());
if (transaction != null) {
symmetricDialect.enableSyncTriggers(transaction);
if (!Boolean.TRUE.equals(context.get(AbstractDatabaseWriter.TRANSACTION_ABORTED))) {
symmetricDialect.enableSyncTriggers(transaction);
}
}
} catch (Exception ex) {
log.error("", ex);
Expand Down Expand Up @@ -292,6 +295,9 @@ public void batchInError(DataContext context, Throwable ex) {
}

ISqlTransaction transaction = context.findSymmetricTransaction(engine.getTablePrefix());
if (Boolean.TRUE.equals(context.get(AbstractDatabaseWriter.TRANSACTION_ABORTED))) {
transaction = null;
}

if (currentBatch.getStatus() == Status.ER) {
if (context.getTable() != null && context.getData() != null) {
Expand Down Expand Up @@ -319,6 +325,9 @@ public void batchInError(DataContext context, Throwable ex) {
error.setConflictId(conflict.getConflictId());
}
}
if (context.get(AbstractDatabaseWriter.CONFLICT_IGNORE) != null) {
error.setResolveIgnore(true);
}
if (transaction != null) {
dataLoaderService.insertIncomingError(transaction, error);
} else {
Expand All @@ -329,10 +338,18 @@ public void batchInError(DataContext context, Throwable ex) {
if (transaction != null) {
transaction.rollback();
}
if (context.get(AbstractDatabaseWriter.CONFLICT_IGNORE) != null) {
IncomingError error = dataLoaderService.getIncomingError(currentBatch.getBatchId(), currentBatch.getNodeId(),
currentBatch.getFailedRowNumber());
if (error != null) {
error.setResolveIgnore(true);
dataLoaderService.updateIncomingError(error);
}
}
}
}
}

if (transaction != null) {
if (incomingBatchService.isRecordOkBatchesEnabled()
|| this.currentBatch.isRetry()) {
Expand Down
Expand Up @@ -304,7 +304,7 @@ private String[] getPropFileNames() {
FilenameFilter filter = new FilenameFilter() {
@Override
public boolean accept(File dir, String name) {
return name.toLowerCase().endsWith(EXT_PROPERTIES);
return name.toLowerCase().endsWith(EXT_PROPERTIES) && modules.containsKey(name.substring(0, name.indexOf(".")));
}
};
return dir.list(filter);
Expand Down
Expand Up @@ -29,13 +29,12 @@
public class MsSql2005DdlBuilder extends MsSql2000DdlBuilder {

public MsSql2005DdlBuilder() {
super();
this.databaseName = DatabaseNamesConstants.MSSQL2005;

databaseInfo.addNativeTypeMapping(Types.BLOB, "IMAGE", Types.BLOB);
databaseInfo.addNativeTypeMapping(Types.CLOB, "CLOB", Types.CLOB);
databaseInfo.addNativeTypeMapping(Types.SQLXML, "XML", Types.SQLXML);

}
}

protected void dropDefaultConstraint(String tableName, String columnName, StringBuilder ddl) {
println( "BEGIN ", ddl);
Expand Down
Expand Up @@ -36,8 +36,8 @@ public class MsSql2008DdlBuilder extends MsSql2005DdlBuilder {
public static final String CHANGE_TRACKING_SYM_PREFIX = "SymmetricDS";

public MsSql2008DdlBuilder() {
super();
this.databaseName = DatabaseNamesConstants.MSSQL2008;

databaseInfo.addNativeTypeMapping(Types.DATE, "DATE", Types.DATE);
databaseInfo.addNativeTypeMapping(Types.TIME, "TIME", Types.TIME);
databaseInfo.addNativeTypeMapping(ColumnTypes.MSSQL_SQL_VARIANT, "SQL_VARIANT", Types.BLOB);
Expand Down
Expand Up @@ -5,6 +5,7 @@
public class MsSql2016DdlBuilder extends MsSql2008DdlBuilder {

public MsSql2016DdlBuilder() {
super();
this.databaseName = DatabaseNamesConstants.MSSQL2016;
}
}
Expand Up @@ -55,6 +55,8 @@ abstract public class AbstractDatabaseWriter implements IDataWriter {

public static final String TRANSACTION_ABORTED = "DatabaseWriter.TransactionAborted";

public static final String CONFLICT_IGNORE = "DatabaseWriter.ConflictIgnore";

public static enum LoadStatus {
SUCCESS, CONFLICT
};
Expand Down
Expand Up @@ -67,6 +67,8 @@ public void needsResolved(AbstractDatabaseWriter writer, CsvData data, LoadStatu
performChainedFallbackForInsert(writer, data, conflict);
} else if (!conflict.isResolveRowOnly()) {
throw new IgnoreBatchException();
} else {
ignoreRow(writer);
}
break;
case IGNORE:
Expand All @@ -76,7 +78,6 @@ public void needsResolved(AbstractDatabaseWriter writer, CsvData data, LoadStatu
default:
attemptToResolve(resolvedData, data, writer, conflict);
break;

}
}
break;
Expand All @@ -103,6 +104,8 @@ public void needsResolved(AbstractDatabaseWriter writer, CsvData data, LoadStatu
performChainedFallbackForUpdate(writer, data, conflict);
} else if (!conflict.isResolveRowOnly()) {
throw new IgnoreBatchException();
} else {
ignoreRow(writer);
}
break;
case IGNORE:
Expand Down Expand Up @@ -150,6 +153,8 @@ public void needsResolved(AbstractDatabaseWriter writer, CsvData data, LoadStatu
if (status == LoadStatus.CONFLICT && writer.getContext().getLastError() != null) {
status = performChainedFallbackForDelete(writer, data, conflict);
}
} else {
ignoreRow(writer);
}

if (status == LoadStatus.CONFLICT) {
Expand Down Expand Up @@ -181,6 +186,7 @@ public void needsResolved(AbstractDatabaseWriter writer, CsvData data, LoadStatu

writer.getContext().setLastError(null);
logConflictResolution(conflict, data, writer, resolvedData, lineNumber);
checkIfTransactionAborted(writer, data, conflict);
}

protected void performChainedFallbackForInsert(AbstractDatabaseWriter writer, CsvData data, Conflict conflict) {
Expand Down Expand Up @@ -381,6 +387,12 @@ protected void performFallbackToInsert(AbstractDatabaseWriter writer, CsvData cs
}
}

protected void ignoreRow(AbstractDatabaseWriter writer) {
if (Boolean.TRUE.equals(writer.getContext().get(AbstractDatabaseWriter.TRANSACTION_ABORTED))) {
writer.getContext().put(AbstractDatabaseWriter.CONFLICT_IGNORE, true);
}
}

protected void checkIfTransactionAborted(AbstractDatabaseWriter writer, CsvData csvData, Conflict conflict) {
if (Boolean.TRUE.equals(writer.getContext().get(AbstractDatabaseWriter.TRANSACTION_ABORTED))) {
throw new ConflictException(csvData, writer.getTargetTable(), false, conflict,
Expand Down
Expand Up @@ -234,6 +234,7 @@ protected LoadStatus insert(CsvData data) {
try {
if (isRequiresSavePointsInTransaction && conflictResolver != null && conflictResolver.isIgnoreRow(this, data)) {
statistics.get(batch).increment(DataWriterStatisticConstants.IGNOREROWCOUNT);
this.currentDmlStatement = null;
return LoadStatus.SUCCESS;
}

Expand Down Expand Up @@ -307,6 +308,7 @@ protected LoadStatus delete(CsvData data, boolean useConflictDetection) {
try {
if (isRequiresSavePointsInTransaction && conflictResolver != null && conflictResolver.isIgnoreRow(this, data)) {
statistics.get(batch).increment(DataWriterStatisticConstants.IGNOREROWCOUNT);
this.currentDmlStatement = null;
return LoadStatus.SUCCESS;
}

Expand Down Expand Up @@ -439,6 +441,7 @@ protected LoadStatus update(CsvData data, boolean applyChangesOnly, boolean useC
try {
if (isRequiresSavePointsInTransaction && conflictResolver != null && conflictResolver.isIgnoreRow(this, data)) {
statistics.get(batch).increment(DataWriterStatisticConstants.IGNOREROWCOUNT);
this.currentDmlStatement = null;
return LoadStatus.SUCCESS;
}

Expand Down

0 comments on commit e62974a

Please sign in to comment.