Skip to content

Commit

Permalink
Merge branch '3.12' of https://github.com/JumpMind/symmetric-ds.git i…
Browse files Browse the repository at this point in the history
…nto 3.12
  • Loading branch information
joshhicks committed Jun 9, 2021
2 parents 094fcf5 + f126ea6 commit 91bbfca
Show file tree
Hide file tree
Showing 9 changed files with 118 additions and 62 deletions.
Expand Up @@ -112,15 +112,27 @@ protected boolean alterLockEscalation () {
+ tablePrefix.toLowerCase()
+ "_data_event', '"
+ tablePrefix.toLowerCase()
+ "_monitor_event') and (i.allow_row_locks !='true' "
+ "_monitor_event', '"
+ tablePrefix.toLowerCase()
+ "_table_reload_status', '"
+ tablePrefix.toLowerCase()
+ "_extract_request', '"
+ tablePrefix.toLowerCase()
+ "_table_reload_request', '"
+ tablePrefix.toLowerCase()
+ "_trigger_hist') and (i.allow_row_locks !='true' "
+ lockEscalationClause
+ ")") > 0) {
log.info("Updating indexes to prevent lock escalation");

String dataTable = platform.alterCaseToMatchDatabaseDefaultCase(tablePrefix + "_data");
String dataEventTable = platform.alterCaseToMatchDatabaseDefaultCase(tablePrefix + "_data_event");
String outgoingBatchTable = platform.alterCaseToMatchDatabaseDefaultCase(tablePrefix + "_outgoing_batch");
String monitorEventTable = platform.alterCaseToMatchDatabaseDefaultCase(tablePrefix) + "_monitor_event";
String monitorEventTable = platform.alterCaseToMatchDatabaseDefaultCase(tablePrefix + "_monitor_event");
String tableReloadStatusTable = platform.alterCaseToMatchDatabaseDefaultCase(tablePrefix + "_table_reload_status");
String extractRequestTable = platform.alterCaseToMatchDatabaseDefaultCase(tablePrefix + "_extract_request");
String tableReloadRequestTable = platform.alterCaseToMatchDatabaseDefaultCase(tablePrefix + "_table_reload_request");
String triggerHistTable = platform.alterCaseToMatchDatabaseDefaultCase(tablePrefix + "_trigger_hist");

sqlTemplate.update("ALTER INDEX ALL ON " + dataTable
+ " SET (ALLOW_ROW_LOCKS = ON)");
Expand All @@ -130,6 +142,14 @@ protected boolean alterLockEscalation () {
+ " SET (ALLOW_ROW_LOCKS = ON)");
sqlTemplate.update("ALTER INDEX ALL ON " + monitorEventTable
+ " SET (ALLOW_ROW_LOCKS = ON)");
sqlTemplate.update("ALTER INDEX ALL ON " + tableReloadStatusTable
+ " SET (ALLOW_ROW_LOCKS = ON)");
sqlTemplate.update("ALTER INDEX ALL ON " + extractRequestTable
+ " SET (ALLOW_ROW_LOCKS = ON)");
sqlTemplate.update("ALTER INDEX ALL ON " + tableReloadRequestTable
+ " SET (ALLOW_ROW_LOCKS = ON)");
sqlTemplate.update("ALTER INDEX ALL ON " + triggerHistTable
+ " SET (ALLOW_ROW_LOCKS = ON)");

if (parameterService.is(ParameterConstants.MSSQL_LOCK_ESCALATION_DISABLED, true)) {
sqlTemplate.update("ALTER INDEX ALL ON " + dataTable
Expand All @@ -140,6 +160,14 @@ protected boolean alterLockEscalation () {
+ " SET (ALLOW_PAGE_LOCKS = OFF)");
sqlTemplate.update("ALTER INDEX ALL ON " + monitorEventTable
+ " SET (ALLOW_PAGE_LOCKS = OFF)");
sqlTemplate.update("ALTER INDEX ALL ON " + tableReloadStatusTable
+ " SET (ALLOW_PAGE_LOCKS = OFF)");
sqlTemplate.update("ALTER INDEX ALL ON " + extractRequestTable
+ " SET (ALLOW_PAGE_LOCKS = OFF)");
sqlTemplate.update("ALTER INDEX ALL ON " + tableReloadRequestTable
+ " SET (ALLOW_PAGE_LOCKS = OFF)");
sqlTemplate.update("ALTER INDEX ALL ON " + triggerHistTable
+ " SET (ALLOW_PAGE_LOCKS = OFF)");

sqlTemplate.update("ALTER TABLE " + dataTable
+ " SET (LOCK_ESCALATION = DISABLE)");
Expand All @@ -149,6 +177,14 @@ protected boolean alterLockEscalation () {
+ " SET (LOCK_ESCALATION = DISABLE)");
sqlTemplate.update("ALTER TABLE " + monitorEventTable
+ " SET (LOCK_ESCALATION = DISABLE)");
sqlTemplate.update("ALTER TABLE " + tableReloadStatusTable
+ " SET (LOCK_ESCALATION = DISABLE)");
sqlTemplate.update("ALTER TABLE " + extractRequestTable
+ " SET (LOCK_ESCALATION = DISABLE)");
sqlTemplate.update("ALTER TABLE " + tableReloadRequestTable
+ " SET (LOCK_ESCALATION = DISABLE)");
sqlTemplate.update("ALTER TABLE " + triggerHistTable
+ " SET (LOCK_ESCALATION = DISABLE)");
}
return true;
} else {
Expand Down
Expand Up @@ -49,7 +49,7 @@ public SymmetricPushClient(String nodeId, String securityToken, String syncUrl)

public void open() {
try {
transport = new HttpOutgoingTransport(new HttpTransportManager(), new URL(buildUrl()), 30000, true, 0, -1, null,
transport = new HttpOutgoingTransport(new HttpTransportManager(), new URL(buildUrl()), 30000, 30000, true, 0, -1, null,
null, false, -1, false);
writer = new ProtocolDataWriter(nodeId, transport.openWriter(), false, false, false);
writer.start(batch);
Expand Down
Expand Up @@ -279,6 +279,7 @@ private ParameterConstants() {

public final static String TRANSPORT_HTTP_MANUAL_REDIRECTS_ENABLED = "http.manual.redirects.enabled";
public final static String TRANSPORT_HTTP_TIMEOUT = "http.timeout.ms";
public final static String TRANSPORT_HTTP_CONNECT_TIMEOUT = "http.connect.timeout.ms";
public final static String TRANSPORT_HTTP_PUSH_STREAM_ENABLED = "http.push.stream.output.enabled";
public final static String TRANSPORT_HTTP_PUSH_STREAM_SIZE = "http.push.stream.output.size";
public final static String TRANSPORT_HTTP_USE_COMPRESSION_CLIENT = "http.compression";
Expand Down
Expand Up @@ -43,6 +43,7 @@
import org.apache.commons.beanutils.PropertyUtils;
import org.apache.commons.lang3.StringUtils;
import org.apache.commons.lang3.time.DateUtils;
import org.apache.commons.text.StringEscapeUtils;
import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.clients.producer.ProducerRecord;
Expand Down Expand Up @@ -245,10 +246,10 @@ public boolean beforeWrite(DataContext context, Table table, CsvData data) {
}
}
} else if (outputFormat.equals(KAFKA_FORMAT_XML)) {
kafkaText.append("<row entity=\"").append(table.getName()).append("\"").append(" dml=\"").append(data.getDataEventType())
kafkaText.append("<row entity=\"").append(StringEscapeUtils.escapeXml11(table.getName())).append("\"").append(" dml=\"").append(data.getDataEventType())
.append("\">");
for (int i = 0; i < table.getColumnNames().length; i++) {
kafkaText.append("<data key=\"").append(table.getColumnNames()[i]).append("\">").append(rowData[i]).append("</data>");
kafkaText.append("<data key=\"").append(StringEscapeUtils.escapeXml11(table.getColumnNames()[i])).append("\">").append(StringEscapeUtils.escapeXml11(rowData[i])).append("</data>");
}
kafkaText.append("</row>");
} else if (outputFormat.equals(KAFKA_FORMAT_AVRO)) {
Expand Down
Expand Up @@ -68,6 +68,8 @@ public class HttpOutgoingTransport implements IOutgoingWithResponseTransport {
private HttpConnection connection;

private int httpTimeout;

private int httpConnectTimeout;

private boolean useCompression;

Expand All @@ -87,13 +89,14 @@ public class HttpOutgoingTransport implements IOutgoingWithResponseTransport {

private Map<String, String> requestProperties;

public HttpOutgoingTransport(HttpTransportManager httpTransportManager, URL url, int httpTimeout, boolean useCompression,
public HttpOutgoingTransport(HttpTransportManager httpTransportManager, URL url, int httpTimeout, int httpConnectTimeout, boolean useCompression,
int compressionStrategy, int compressionLevel, String nodeId,
String securityToken, boolean streamOutputEnabled, int streamOutputSize,
boolean fileUpload) {
this.httpTransportManager = httpTransportManager;
this.url = url;
this.httpTimeout = httpTimeout;
this.httpConnectTimeout = httpConnectTimeout;
this.useCompression = useCompression;
this.compressionLevel = compressionLevel;
this.compressionStrategy = compressionStrategy;
Expand All @@ -104,11 +107,11 @@ public HttpOutgoingTransport(HttpTransportManager httpTransportManager, URL url,
this.fileUpload = fileUpload;
}

public HttpOutgoingTransport(HttpTransportManager httpTransportManager, URL url, int httpTimeout, boolean useCompression,
public HttpOutgoingTransport(HttpTransportManager httpTransportManager, URL url, int httpTimeout, int httpConnectTimeout, boolean useCompression,
int compressionStrategy, int compressionLevel, String nodeId,
String securityToken, boolean streamOutputEnabled, int streamOutputSize,
boolean fileUpload, Map<String, String> requestProperties) {
this(httpTransportManager, url, httpTimeout, useCompression, compressionStrategy, compressionLevel, nodeId, securityToken,
this(httpTransportManager, url, httpTimeout, httpConnectTimeout, useCompression, compressionStrategy, compressionLevel, nodeId, securityToken,
streamOutputEnabled, streamOutputSize, fileUpload);
this.requestProperties = requestProperties;
}
Expand Down Expand Up @@ -201,7 +204,7 @@ private HttpConnection requestReservation(String queue) {
try {
connection = httpTransportManager.openConnection(url, nodeId, securityToken);
connection.setUseCaches(false);
connection.setConnectTimeout(httpTimeout);
connection.setConnectTimeout(httpConnectTimeout);
connection.setReadTimeout(httpTimeout);
connection.setRequestMethod("HEAD");
connection.setRequestProperty(WebConstants.CHANNEL_QUEUE, queue);
Expand All @@ -223,7 +226,7 @@ public OutputStream openStream() {
connection.setDoInput(true);
connection.setDoOutput(true);
connection.setUseCaches(false);
connection.setConnectTimeout(httpTimeout);
connection.setConnectTimeout(httpConnectTimeout);
connection.setReadTimeout(httpTimeout);

if (requestProperties != null) {
Expand Down
Expand Up @@ -146,7 +146,7 @@ protected int sendMessage(URL url, String nodeId, String securityToken, String d
conn.setRequestMethod("POST");
conn.setAllowUserInteraction(false);
conn.setDoOutput(true);
conn.setConnectTimeout(getHttpTimeOutInMs());
conn.setConnectTimeout(getHttpConnectTimeOutInMs());
conn.setReadTimeout(getHttpTimeOutInMs());
try (OutputStream os = conn.getOutputStream()) {
writeMessage(os, data);
Expand Down Expand Up @@ -223,6 +223,10 @@ public boolean isOutputStreamEnabled() {
public int getHttpTimeOutInMs() {
return engine.getParameterService().getInt(ParameterConstants.TRANSPORT_HTTP_TIMEOUT);
}

public int getHttpConnectTimeOutInMs() {
return engine.getParameterService().getInt(ParameterConstants.TRANSPORT_HTTP_CONNECT_TIMEOUT);
}

public boolean isUseCompression(Node targetNode) {
// if the node is local, no need to use compression
Expand Down Expand Up @@ -277,23 +281,23 @@ public IOutgoingWithResponseTransport getPushTransport(Node remote, Node local,
String securityToken, Map<String, String> requestProperties,
String registrationUrl) throws IOException {
URL url = new URL(buildURL("push", remote, local, securityToken, registrationUrl));
return new HttpOutgoingTransport(this, url, getHttpTimeOutInMs(), isUseCompression(remote),
return new HttpOutgoingTransport(this, url, getHttpTimeOutInMs(), getHttpConnectTimeOutInMs(), isUseCompression(remote),
getCompressionStrategy(), getCompressionLevel(), local.getNodeId(),
securityToken, isOutputStreamEnabled(), getOutputStreamSize(), false, requestProperties);
}

public IOutgoingWithResponseTransport getPushTransport(Node remote, Node local,
String securityToken, String registrationUrl) throws IOException {
URL url = new URL(buildURL("push", remote, local, securityToken, registrationUrl));
return new HttpOutgoingTransport(this, url, getHttpTimeOutInMs(), isUseCompression(remote),
return new HttpOutgoingTransport(this, url, getHttpTimeOutInMs(), getHttpConnectTimeOutInMs(), isUseCompression(remote),
getCompressionStrategy(), getCompressionLevel(), local.getNodeId(),
securityToken, isOutputStreamEnabled(), getOutputStreamSize(), false);
}

public IOutgoingWithResponseTransport getFilePushTransport(Node remote, Node local,
String securityToken, String registrationUrl) throws IOException {
URL url = new URL(buildURL("filesync/push", remote, local, securityToken, registrationUrl));
return new HttpOutgoingTransport(this, url, getHttpTimeOutInMs(), isUseCompression(remote),
return new HttpOutgoingTransport(this, url, getHttpTimeOutInMs(), getHttpConnectTimeOutInMs(), isUseCompression(remote),
getCompressionStrategy(), getCompressionLevel(), local.getNodeId(),
securityToken, isOutputStreamEnabled(), getOutputStreamSize(), true);
}
Expand Down Expand Up @@ -321,7 +325,7 @@ public IOutgoingWithResponseTransport getBandwidthPushTransport(Node remote, Nod
Map<String, String> requestProperties, String registrationUrl) throws IOException
{
URL url = new URL(resolveURL(remote.getSyncUrl(), registrationUrl) + "/" + "bandwidth?direction=push");
return new HttpOutgoingTransport(this, url, getHttpTimeOutInMs(), isUseCompression(remote),
return new HttpOutgoingTransport(this, url, getHttpTimeOutInMs(), getHttpConnectTimeOutInMs(), isUseCompression(remote),
getCompressionStrategy(), getCompressionLevel(), local.getNodeId(),
securityToken, isOutputStreamEnabled(), getOutputStreamSize(), false, requestProperties);
}
Expand Down Expand Up @@ -356,7 +360,7 @@ public static Map<String, String> buildRegistrationRequestProperties(Node node,
protected HttpConnection createGetConnectionFor(URL url, String nodeId, String securityToken) throws IOException {
HttpConnection conn = openConnection(url, nodeId, securityToken);
conn.setRequestProperty("accept-encoding", "gzip");
conn.setConnectTimeout(getHttpTimeOutInMs());
conn.setConnectTimeout(getHttpConnectTimeOutInMs());
conn.setReadTimeout(getHttpTimeOutInMs());
conn.setRequestMethod("GET");
return conn;
Expand Down
Expand Up @@ -485,13 +485,20 @@ send.ack.keepalive.ms=30000
# Type: integer
time.between.ack.retries.ms=5000

# Sets both the connection and read timeout on the internal HttpUrlConnection
# Sets the read timeout on the internal HttpUrlConnection
#
# DatabaseOverridable: true
# Tags: transport
# Type: integer
http.timeout.ms=90000

# Sets the connection timeout on the internal HttpUrlConnection
#
# DatabaseOverridable: true
# Tags: transport
# Type: integer
http.connect.timeout.ms=90000

# Whether or not to use compression over HTTP connections.
# Currently, this setting only affects the push connection of the source node.
# Compression on a pull is enabled using a filter in the web.xml for the PullServlet.
Expand Down

0 comments on commit 91bbfca

Please sign in to comment.