Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/3.9' into 3.10
Browse files Browse the repository at this point in the history
  • Loading branch information
chenson42 committed Nov 6, 2018
2 parents 8c19dcb + ce80314 commit edb120c
Show file tree
Hide file tree
Showing 21 changed files with 145 additions and 50 deletions.
2 changes: 1 addition & 1 deletion symmetric-assemble/common.gradle
Expand Up @@ -191,7 +191,7 @@ subprojects { subproject ->
powerMockVersion = '1.5.3'
mysqlVersion = '5.1.45'
servletVersion = '3.1.0'
springVersion = '4.3.13.RELEASE'
springVersion = '4.3.16.RELEASE'
jtdsVersion = '1.2.8'
voltDbVersion = '6.2'
bouncyCastleVersion = '1.59'
Expand Down
Expand Up @@ -3,4 +3,4 @@ distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-4.1-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-4.10.2-bin.zip
9 changes: 9 additions & 0 deletions symmetric-assemble/src/asciidoc/appendix/mysql.ad
Expand Up @@ -31,6 +31,15 @@ grant create routine on *.* to symmetric;
grant process on *.* to symmetric;
----

Starting in MySQL 5.7.6, the "PROCESS" privilege is also required for the MySQL user that is modifying the application tables.
This is required to look up the transaction id. Internally, the trigger will submit this query during an insert/update/delete:

select TRX_ID from INFORMATION_SCHEMA.INNODB_TRX where TRX_MYSQL_THREAD_ID = CONNECTION_ID();

----
grant process on *.* to db_user;
----

MySQL allows '0000-00-00 00:00:00' to be entered as a value for datetime and timestamp columns.
JDBC cannot deal with a date value with a year of 0. In order to work around this SymmetricDS can be configured to treat date and time
columns as varchar columns for data capture and data load. To enable this feature set the db.treat.date.time.as.varchar.enabled property to true.
Expand Down
Expand Up @@ -191,20 +191,20 @@ public void createRequiredDatabaseObjects() {

String wkt2geom = this.parameterService.getTablePrefix() + "_" + "wkt2geom";
if (!installed(SQL_OBJECT_INSTALLED, wkt2geom)) {
String sql = " CREATE OR REPLACE "
+ " FUNCTION $(functionName)( "
+ " clob_in IN CLOB) "
+ " RETURN SDO_GEOMETRY "
+ " AS "
+ " v_out SDO_GEOMETRY := NULL; "
+ " BEGIN "
+ " IF clob_in IS NOT NULL THEN "
+ " IF DBMS_LOB.GETLENGTH(clob_in) > 0 THEN "
+ " v_out := SDO_GEOMETRY(clob_in); "
+ " END IF; "
+ " END IF; "
+ " RETURN v_out; "
+ " END $(functionName); ";
String sql = " CREATE OR REPLACE FUNCTION $(functionName) ( \r\n"
+ " clob_in IN CLOB, \r\n"
+ " srid_in IN INTEGER) \r\n"
+ " RETURN SDO_GEOMETRY \r\n"
+ " AS \r\n"
+ " v_out SDO_GEOMETRY := NULL; \r\n"
+ " BEGIN \r\n"
+ " IF clob_in IS NOT NULL THEN \r\n"
+ " IF DBMS_LOB.GETLENGTH(clob_in) > 0 THEN \r\n"
+ " v_out := SDO_GEOMETRY(clob_in, srid_in); \r\n"
+ " END IF; \r\n"
+ " END IF; \r\n"
+ " RETURN v_out; \r\n"
+ " END $(functionName); \r\n";
install(sql, wkt2geom);
}

Expand Down
@@ -1,6 +1,8 @@
package org.jumpmind.symmetric.io;

import org.jumpmind.db.platform.IDatabasePlatform;
import org.jumpmind.db.sql.JdbcSqlTemplate;
import org.jumpmind.db.sql.JdbcSqlTransaction;
import org.jumpmind.symmetric.common.ContextConstants;
import org.jumpmind.symmetric.io.data.Batch;
import org.jumpmind.symmetric.io.data.CsvData;
Expand All @@ -22,6 +24,7 @@ public AbstractBulkDatabaseWriter(IDatabasePlatform symmetricPlatform, IDatabase
public void start(Batch batch) {
super.start(batch);
if (isFallBackToDefault()) {
getTransaction().setInBatchMode(false);
log.debug("Writing batch " + batch.getBatchId() + " on channel " + batch.getChannelId() + " to node " + batch.getTargetNodeId() + " using DEFAULT loader");
}else{
log.debug("Writing batch " + batch.getBatchId() + " on channel " + batch.getChannelId() + " to node " + batch.getTargetNodeId() + " using BULK loader");
Expand Down
Expand Up @@ -34,7 +34,7 @@ protected LoadStatus insert(CsvData data) {

@Override
protected LoadStatus update(CsvData data, boolean applyChangesOnly, boolean useConflictDetection) {
LoadStatus loadStatus = super.insert(data);
LoadStatus loadStatus = super.update(data, applyChangesOnly, useConflictDetection);
if (loadStatus == LoadStatus.CONFLICT) {
loadStatus = LoadStatus.SUCCESS;
}
Expand Down
Expand Up @@ -202,16 +202,17 @@ public boolean invoke() {

@Override
public boolean invoke(boolean force) {
IParameterService parameterService = engine.getParameterService();
long recordStatisticsCountThreshold = parameterService.getLong(ParameterConstants.STATISTIC_RECORD_COUNT_THRESHOLD,-1);

boolean ok = checkPrerequsites(force);
if (!ok) {
return false;
}

try {
try {
MDC.put("engineName", engine.getEngineName());

IParameterService parameterService = engine.getParameterService();
long recordStatisticsCountThreshold = parameterService.getLong(ParameterConstants.STATISTIC_RECORD_COUNT_THRESHOLD,-1);

boolean ok = checkPrerequsites(force);
if (!ok) {
return false;
}

long startTime = System.currentTimeMillis();
try {
if (!running.compareAndSet(false, true)) { // This ensures this job only runs once on this instance.
Expand Down
2 changes: 1 addition & 1 deletion symmetric-core/build.gradle
Expand Up @@ -7,7 +7,7 @@ apply from: symAssembleDir + '/common.gradle'
compile project(":symmetric-util")
compile "commons-fileupload:commons-fileupload:$commonsFileuploadVersion"
compile "javax.mail:mail:1.4.5"
compile "com.fasterxml.jackson.core:jackson-databind:2.9.3"
compile "com.fasterxml.jackson.core:jackson-databind:2.9.5"
compile "com.google.code.gson:gson:2.8.2"
compile "org.springframework:spring-core:$springVersion"

Expand Down
Expand Up @@ -231,7 +231,7 @@ else if (Long.class.equals(propertyTypeClass)) {
}
}
}
sendKafkaMessage(pojo, kafkaDataKey);
sendKafkaMessageByObject(pojo, kafkaDataKey);
} else {
throw new RuntimeException("Unable to find a POJO to load for AVRO based message onto Kafka for table : " + tableName);
}
Expand Down Expand Up @@ -396,19 +396,23 @@ public void earlyCommit(DataContext context) {
public void batchComplete(DataContext context) {
if (!context.getBatch().getChannelId().equals("heartbeat") && !context.getBatch().getChannelId().equals("config")) {
String batchFileName = "batch-" + context.getBatch().getSourceNodeId() + "-" + context.getBatch().getBatchId();

KafkaProducer<String, String> producer = new KafkaProducer<String, String>(configs);
try {
if (confluentUrl == null && kafkaDataMap.size() > 0) {
StringBuffer kafkaText = new StringBuffer();


for (Map.Entry<String, List<String>> entry : kafkaDataMap.entrySet()) {
for (String row : entry.getValue()) {
if (messageBy.equals(KAFKA_MESSAGE_BY_ROW)) {
sendKafkaMessage(row, entry.getKey());
sendKafkaMessage(producer, row, entry.getKey());
} else {
kafkaText.append(row);
}
}
if (messageBy.equals(KAFKA_MESSAGE_BY_BATCH)) {
sendKafkaMessage(kafkaText.toString(), entry.getKey());
sendKafkaMessage(producer, kafkaText.toString(), entry.getKey());
}
}
kafkaDataMap = new HashMap<String, List<String>>();
Expand All @@ -417,6 +421,7 @@ public void batchComplete(DataContext context) {
log.warn("Unable to write batch to Kafka " + batchFileName, e);
e.printStackTrace();
} finally {
producer.close();
context.put(KAFKA_TEXT_CACHE, new HashMap<String, List<String>>());
tableNameCache.clear();
tableColumnCache = new HashMap<String, Map<String, String>>();
Expand All @@ -430,15 +435,12 @@ public void batchCommitted(DataContext context) {
public void batchRolledback(DataContext context) {
}

public void sendKafkaMessage(String kafkaText, String topic) {
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(configs);
public void sendKafkaMessage(KafkaProducer<String, String> producer, String kafkaText, String topic) {
producer.send(new ProducerRecord<String, String>(topic, kafkaText));
log.debug("Data to be sent to Kafka-" + kafkaText);

producer.close();
}

public void sendKafkaMessage(Object bean, String topic) {
public void sendKafkaMessageByObject(Object bean, String topic) {
KafkaProducer<String, Object> producer = new KafkaProducer<String, Object>(configs);
producer.send(new ProducerRecord<String, Object>(topic, bean));
producer.close();
Expand Down
Expand Up @@ -61,7 +61,7 @@ public MonitorEvent check(Monitor monitor) {
MonitorEvent event = new MonitorEvent();
long usage = 0;
if (tenuredPool != null) {
usage = (long) (tenuredPool.getUsage().getUsed() / tenuredPool.getUsage().getMax());
usage = (long) ((double)tenuredPool.getUsage().getUsed() / (double)tenuredPool.getUsage().getMax() * 100);
}
event.setValue(usage);
return event;
Expand Down
Expand Up @@ -38,6 +38,7 @@
import org.apache.commons.collections.map.CaseInsensitiveMap;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.NotImplementedException;
import org.apache.commons.lang.ObjectUtils;
import org.apache.commons.lang.StringEscapeUtils;
import org.apache.commons.lang.StringUtils;
import org.jumpmind.db.model.Column;
Expand Down Expand Up @@ -464,7 +465,7 @@ public void insertReloadEvents(Node targetNode, boolean reverse, List<TableReloa
createBy, transactional, transaction);
}
}
Map<String, TableReloadRequest> mapReloadRequests = convertReloadListToMap(reloadRequests);
Map<String, TableReloadRequest> mapReloadRequests = convertReloadListToMap(reloadRequests, triggerRouters, isFullLoad);

String symNodeSecurityReloadChannel = null;
int totalTableCount = 0;
Expand Down Expand Up @@ -585,17 +586,37 @@ private String findChannelFor(TriggerHistory history, List<TriggerRouter> trigge
}

@SuppressWarnings("unchecked")
protected Map<String, TableReloadRequest> convertReloadListToMap(List<TableReloadRequest> reloadRequests) {
protected Map<String, TableReloadRequest> convertReloadListToMap(List<TableReloadRequest> reloadRequests, List<TriggerRouter> triggerRouters, boolean isFullLoad) {
if (reloadRequests == null) {
return null;
}
Map<String, TableReloadRequest> reloadMap = new CaseInsensitiveMap();
for (TableReloadRequest item : reloadRequests) {
reloadMap.put(item.getIdentifier(), item);
for (TableReloadRequest reloadRequest : reloadRequests) {
if (!isFullLoad) {
validate(reloadRequest, triggerRouters);
}
reloadMap.put(reloadRequest.getIdentifier(), reloadRequest);
}
return reloadMap;
}

protected void validate(TableReloadRequest reloadRequest, List<TriggerRouter> triggerRouters) {
boolean validMatch = false;
for (TriggerRouter triggerRouter : triggerRouters) {
if (ObjectUtils.equals(triggerRouter.getTriggerId(), reloadRequest.getTriggerId())
&& ObjectUtils.equals(triggerRouter.getRouterId(), reloadRequest.getRouterId())) {
validMatch = true;
break;
}
}

if (!validMatch) {
throw new SymmetricException("Table reload request submitted which does not have a valid trigger/router "
+ "combination in sym_trigger_router. Request trigger id: '" + reloadRequest.getTriggerId() + "' router id: '"
+ reloadRequest.getRouterId() + "' create time: " + reloadRequest.getCreateTime());
}
}

private void callReloadListeners(boolean before, Node targetNode, boolean transactional,
ISqlTransaction transaction, long loadId) {
for (IReloadListener listener : extensionService.getExtensionPointList(IReloadListener.class)) {
Expand Down
Expand Up @@ -146,6 +146,10 @@ public void trackChanges(boolean force) {
try {
log.debug("Tracking changes for file sync");
Node local = engine.getNodeService().findIdentity();
if (local == null) {
log.warn("Not running file sync trackChanges because the local node is not available yet. It may not be registered yet.");
return;
}
ProcessInfo processInfo = engine.getStatisticManager().newProcessInfo(
new ProcessInfoKey(local.getNodeId(), null, ProcessType.FILE_SYNC_TRACKER));
boolean useCrc = engine.getParameterService().is(ParameterConstants.FILE_SYNC_USE_CRC);
Expand Down
Expand Up @@ -933,7 +933,11 @@ public AuthenticationStatus getAuthenticationStatus(String nodeId, String securi
if (node == null) {
retVal = AuthenticationStatus.REGISTRATION_REQUIRED;
} else if (!syncEnabled(node)) {
retVal = AuthenticationStatus.SYNC_DISABLED;
if(registrationOpen(node)){
retVal = AuthenticationStatus.REGISTRATION_REQUIRED;
}else{
retVal = AuthenticationStatus.SYNC_DISABLED;
}
} else if (!isNodeAuthorized(nodeId, securityToken)) {
retVal = AuthenticationStatus.FORBIDDEN;
}
Expand All @@ -946,6 +950,14 @@ protected boolean syncEnabled(Node node) {
syncEnabled = node.isSyncEnabled();
}
return syncEnabled;
}
}

protected boolean registrationOpen(Node node){
NodeSecurity security = findNodeSecurity(node.getNodeId());
if(security != null){
return security.isRegistrationEnabled();
}
return false;
}

}
Expand Up @@ -1106,6 +1106,7 @@ protected int insertDataEvents(ProcessInfo processInfo, ChannelRouterContext con
context.setLastLoadId(loadId);
}
batch.setLoadId(loadId);
context.setNeedsCommitted(true);
} else {
context.setLastLoadId(-1);
}
Expand Down
Expand Up @@ -44,6 +44,7 @@

import org.jumpmind.symmetric.ISymmetricEngine;
import org.jumpmind.symmetric.Version;
import org.jumpmind.symmetric.common.Constants;
import org.jumpmind.symmetric.common.ContextConstants;
import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.model.Channel;
Expand Down Expand Up @@ -285,6 +286,7 @@ protected void addUsageProperties(Properties prop) {

prop.put("db_type", symmetricDialect.getName());
prop.put("db_version", symmetricDialect.getVersion());
long mobileNodeCount = 0;
Set<String> databaseTypes = new HashSet<String>();
for (Node clientNode : engine.getNodeService().findAllNodes()) {
String databaseType = clientNode.getDatabaseType() + " " + clientNode.getDatabaseVersion();
Expand All @@ -295,7 +297,12 @@ protected void addUsageProperties(Properties prop) {
break;
}
}
String deployType = clientNode.getDeploymentType();
if (deployType.equals("android") || deployType.equals(Constants.DEPLOYMENT_TYPE_CCLIENT)) {
mobileNodeCount++;
}
}
prop.put("mobile_nodes", mobileNodeCount);

Calendar cal = Calendar.getInstance();
cal.add(Calendar.DATE, -1);
Expand Down
Expand Up @@ -22,6 +22,7 @@

import java.sql.Types;

import org.apache.commons.lang.StringUtils;
import org.jumpmind.db.model.Column;
import org.jumpmind.db.model.TypeMap;
import org.jumpmind.db.platform.DatabaseInfo;
Expand All @@ -42,7 +43,7 @@ protected void appendColumnParameter(StringBuilder sql, Column column) {
sql.append("TO_TIMESTAMP_TZ(?, 'YYYY-MM-DD HH24:MI:SS.FF TZH:TZM')")
.append(",");
} else if (isGeometry(column)) {
sql.append("SYM_WKT2GEOM(?)").append(",");
sql.append("SYM_WKT2GEOM(?,").append(buildSRIDSelect(column)).append(")").append(",");
} else if (column.getJdbcTypeName().startsWith("XMLTYPE")) {
sql.append("XMLTYPE(?)").append(",");
} else {
Expand All @@ -57,7 +58,7 @@ protected void appendColumnEquals(StringBuilder sql, Column column) {
.append(" = TO_TIMESTAMP_TZ(?, 'YYYY-MM-DD HH24:MI:SS.FF TZH:TZM')");
} else if (isGeometry(column)) {
sql.append(quote).append(column.getName()).append(quote).append(" = ")
.append("SYM_WKT2GEOM(?)");
.append("SYM_WKT2GEOM(?,").append(buildSRIDSelect(column)).append(")");
} else if (column.getJdbcTypeName().startsWith("XMLTYPE")) {
sql.append(quote).append(column.getName()).append(quote).append(" = ")
.append("XMLTYPE(?)");
Expand Down Expand Up @@ -97,4 +98,14 @@ protected boolean isGeometry(Column column) {
return false;
}
}

protected String buildSRIDSelect(Column column) {
if (!StringUtils.isEmpty(schemaName)) {
return String.format("(select SRID from all_sdo_geom_metadata where owner = '%s' and table_name = '%s' and column_name = '%s')",
schemaName.toUpperCase(), tableName.toUpperCase(), column.getName().toUpperCase());
} else {
return String.format("(select SRID from user_sdo_geom_metadata where table_name = '%s' and column_name = '%s')",
tableName.toUpperCase(), column.getName().toUpperCase());
}
}
}

0 comments on commit edb120c

Please sign in to comment.