Skip to content

Commit

Permalink
Merge branch '3.10' of https://github.com/JumpMind/symmetric-ds.git i…
Browse files Browse the repository at this point in the history
…nto 3.10
  • Loading branch information
jumpmind-josh committed Jun 3, 2019
2 parents 98f7f30 + 77ef6de commit 336ee02
Show file tree
Hide file tree
Showing 22 changed files with 290 additions and 88 deletions.
12 changes: 12 additions & 0 deletions symmetric-assemble/src/asciidoc/appendix/mysql.ad
Expand Up @@ -51,6 +51,18 @@ If you are using UTF-8 encoding in the database, you might consider using the ch
jdbc:mysql://hostname/databasename?tinyInt1isBit=false&characterEncoding=utf8
----

If you are using Amazon Web Services to run the MySQL instance, Amazon by default uses binary log replication and disables the creation
of triggers. In order to allow SymmetricDS to function correctly, the parameter 'log_bin_trust_function_creators' needs to be set
to a value of '1'. You can set this value by logging into the database as root and enter the following command:

----
mysql -u USERNAME -p
set global log_bin_trust_function_creators=1;
----

If you can not log into the server as root, then you can set the parameter in a new parameter group on the Relational Database Services
(RDS) web console. See the documentation from RDS for details.

.Supported Data Types
|===
|Data Type|Supported?
Expand Down
Expand Up @@ -167,13 +167,17 @@ protected Object appendVirtualTableStringValue(Object value, StringBuilder out)
}
out.append(escapeString(value));
out.append("'");

} else if (value instanceof Date) {
out.append("'");
synchronized (DATE_FORMATTER) {
out.append(DATE_FORMATTER.format(value));
}
out.append("'");
} else if (value instanceof byte[]) {
out.append("'");
value = convertBytesToString((byte[]) value, ((byte[]) value).length);
out.append(escapeString(value));
out.append("'");
} else {
throw new IllegalStateException(String.format("Type not supported: %s", value.getClass().getName()));
}
Expand Down
Expand Up @@ -39,6 +39,7 @@
import org.jumpmind.symmetric.db.ISymmetricDialect;
import org.jumpmind.symmetric.model.Trigger;
import org.jumpmind.symmetric.service.IParameterService;
import org.jumpmind.symmetric.util.SymmetricUtils;

/*
* Sybase dialect was tested with jconn4 JDBC driver.
Expand Down Expand Up @@ -247,12 +248,16 @@ public void enableSyncTriggers(ISqlTransaction transaction) {
}

public String getSyncTriggersExpression() {
return "$(defaultCatalog)$(defaultSchema)"+parameterService.getTablePrefix()+"_triggers_disabled(0) = 0";
return SymmetricUtils.quote(this, platform.getDefaultCatalog()) +
".$(defaultSchema)"+
parameterService.getTablePrefix()+"_triggers_disabled(0) = 0";
}

@Override
public String getTransactionTriggerExpression(String defaultCatalog, String defaultSchema, Trigger trigger) {
return platform.getDefaultCatalog() + ".$(defaultSchema)"+parameterService.getTablePrefix()+"_txid(0)";
return SymmetricUtils.quote(this, platform.getDefaultCatalog()) +
".$(defaultSchema)"+
parameterService.getTablePrefix()+"_txid(0)";
}

@Override
Expand Down
Expand Up @@ -151,13 +151,16 @@ public static File createSnapshot(ISymmetricEngine engine) {
}
}

List<String> catalogNames = engine.getDatabasePlatform().getDdlReader().getCatalogNames();
List<Trigger> triggers = triggerRouterService.getTriggers();
for (Trigger trigger : triggers) {
Table table = engine.getDatabasePlatform().getTableFromCache(trigger.getSourceCatalogName(), trigger.getSourceSchemaName(),
trigger.getSourceTableName(), false);
if (table != null) {
addTableToMap(catalogSchemas, new CatalogSchema(table.getCatalog(), table.getSchema()), table);
}
if (StringUtils.isBlank(trigger.getSourceCatalogName()) || catalogNames.contains(trigger.getSourceCatalogName())) {
Table table = engine.getDatabasePlatform().getTableFromCache(trigger.getSourceCatalogName(), trigger.getSourceSchemaName(),
trigger.getSourceTableName(), false);
if (table != null) {
addTableToMap(catalogSchemas, new CatalogSchema(table.getCatalog(), table.getSchema()), table);
}
}
}

for (CatalogSchema catalogSchema : catalogSchemas.keySet()) {
Expand All @@ -170,10 +173,13 @@ public static File createSnapshot(ISymmetricEngine engine) {
} else {
String extra = "";
if (!isDefaultCatalog && catalogSchema.getCatalog() != null) {
extra += catalogSchema.getCatalog() + "-";
extra += catalogSchema.getCatalog();
export.setCatalog(catalogSchema.getCatalog());
}
if (!isDefaultSchema && catalogSchema.getSchema() != null) {
if (!extra.equals("")) {
extra += "-";
}
extra += catalogSchema.getSchema();
export.setSchema(catalogSchema.getSchema());
}
Expand Down
Expand Up @@ -41,7 +41,9 @@ public class ServerConstants {

public static final String SERVER_ALLOW_DIR_LISTING = "server.allow.dir.list";
public static final String SERVER_ALLOW_HTTP_METHODS = "server.allow.http.methods";
public static final String SERVER_DISALLOW_HTTP_METHODS = "server.disallow.http.methods";
public static final String SERVER_DISALLOW_HTTP_METHODS = "server.disallow.http.methods";

public final static String SERVER_HTTP_COOKIES_ENABLED = "server.http.cookies.enabled";

public final static String STREAM_TO_FILE_ENCRYPT_ENABLED = "stream.to.file.encrypt.enabled";
public final static String STREAM_TO_FILE_COMPRESSION_ENABLED = "stream.to.file.compression.enabled";
Expand Down
Expand Up @@ -43,6 +43,7 @@
import org.jumpmind.symmetric.service.RegistrationRequiredException;
import org.jumpmind.symmetric.transport.AuthenticationException;
import org.jumpmind.symmetric.transport.ConnectionRejectedException;
import org.jumpmind.symmetric.transport.NoReservationException;
import org.jumpmind.symmetric.transport.ServiceUnavailableException;
import org.jumpmind.symmetric.transport.SyncDisabledException;

Expand Down Expand Up @@ -111,6 +112,9 @@ protected void fireOffline(Exception exception, Node remoteNode, RemoteNodeStatu
} else if (isRegistrationRequired(exception)) {
log.warn("Registration is needed before communicating with {} at {}", new Object[] {remoteNode, syncUrl});
status.setStatus(Status.REGISTRATION_REQUIRED);
} else if (isNoReservation(exception)) {
log.warn("Missing reservation during push with {}", new Object[] { remoteNode });
status.setStatus(Status.BUSY);
} else if (getHttpException(exception) != null) {
HttpException http = getHttpException(exception);
if (shouldLogTransportError(remoteNode.getNodeId())) {
Expand Down Expand Up @@ -241,6 +245,18 @@ protected boolean isRegistrationNotOpen(Exception ex) {
return registrationNotOpen;
}

protected boolean isNoReservation(Exception ex) {
boolean noReservation = false;
if (ex != null) {
Throwable cause = getRootCause(ex);
noReservation = cause instanceof NoReservationException;
if (noReservation == false && (ex instanceof NoReservationException)) {
noReservation = true;
}
}
return noReservation;
}

protected HttpException getHttpException(Exception ex) {
HttpException exception = null;
if (ex != null) {
Expand Down
Expand Up @@ -73,7 +73,11 @@ public AbstractParameterService() {
public BigDecimal getDecimal(String key, BigDecimal defaultVal) {
String val = getString(key);
if (val != null) {
return new BigDecimal(val);
try {
return new BigDecimal(val);
} catch (NumberFormatException ex) {
TypedProperties.logPropertiesException(log, key, val);
}
}
return defaultVal;
}
Expand Down Expand Up @@ -106,8 +110,12 @@ public int getInt(String key) {

public int getInt(String key, int defaultVal) {
String val = getString(key);
if (StringUtils.isNotBlank(val)) {
return Integer.parseInt(val.trim());
if (val != null) {
try {
return Integer.parseInt(val.trim());
} catch (NumberFormatException ex) {
TypedProperties.logPropertiesException(log, key, val);
}
}
return defaultVal;
}
Expand All @@ -119,7 +127,11 @@ public long getLong(String key) {
public long getLong(String key, long defaultVal) {
String val = getString(key);
if (val != null) {
return Long.parseLong(val);
try {
return Long.parseLong(val);
} catch (NumberFormatException ex) {
TypedProperties.logPropertiesException(log, key, val);
}
}
return defaultVal;
}
Expand Down
Expand Up @@ -445,7 +445,7 @@ private void logDataReceivedFromPush(Node sourceNode, List<IncomingBatch> batchL
log.info("{} data and {} batches loaded during push request from {}. There were {} batches in error",
new Object[] { okDataCount, okBatchesCount, sourceNode.toString(), errorBatchesCount });
} else {
log.info("{} data and {} batches loaded during push request from {}.",
log.info("{} data and {} batches loaded during push request from {}",
new Object[] { okDataCount, okBatchesCount, sourceNode.toString() });
}
statisticManager.addJobStats(sourceNode.getNodeId(), 1, "Push Handler",
Expand Down
@@ -0,0 +1,7 @@
package org.jumpmind.symmetric.transport;

public class NoReservationException extends OfflineException {

private static final long serialVersionUID = 1L;

}
Expand Up @@ -42,6 +42,7 @@
import org.jumpmind.symmetric.transport.AuthenticationException;
import org.jumpmind.symmetric.transport.ConnectionRejectedException;
import org.jumpmind.symmetric.transport.IOutgoingWithResponseTransport;
import org.jumpmind.symmetric.transport.NoReservationException;
import org.jumpmind.symmetric.transport.ServiceUnavailableException;
import org.jumpmind.symmetric.transport.SyncDisabledException;
import org.jumpmind.symmetric.web.WebConstants;
Expand Down Expand Up @@ -290,6 +291,8 @@ private void analyzeResponseCode(int code) throws IOException {
throw new ConnectionRejectedException();
} else if (WebConstants.SC_SERVICE_UNAVAILABLE == code) {
throw new ServiceUnavailableException();
} else if (WebConstants.SC_NO_RESERVATION == code) {
throw new NoReservationException();
} else if (WebConstants.SC_FORBIDDEN == code) {
throw new AuthenticationException();
} else if (WebConstants.SYNC_DISABLED == code) {
Expand Down
Expand Up @@ -59,6 +59,10 @@ public class WebConstants {

public static final int SC_SERVICE_BUSY = 670;

public static final int SC_SERVICE_ERROR = 601;

public static final int SC_NO_RESERVATION = 604;

public static final int SC_NO_CONTENT = 204;

public static final int SC_OK = 200;
Expand Down

0 comments on commit 336ee02

Please sign in to comment.