Skip to content

Commit

Permalink
Merge branch '3.9' of https://github.com/JumpMind/symmetric-ds.git in…
Browse files Browse the repository at this point in the history
…to 3.9
  • Loading branch information
chenson42 committed Oct 30, 2018
2 parents 51f7d83 + bb5f6bc commit 7fdfa5c
Show file tree
Hide file tree
Showing 17 changed files with 138 additions and 53 deletions.
2 changes: 1 addition & 1 deletion symmetric-assemble/common.gradle
Expand Up @@ -191,7 +191,7 @@ subprojects { subproject ->
powerMockVersion = '1.5.3'
mysqlVersion = '5.1.45'
servletVersion = '3.1.0'
springVersion = '4.3.13.RELEASE'
springVersion = '4.3.16.RELEASE'
jtdsVersion = '1.2.8'
voltDbVersion = '6.2'
bouncyCastleVersion = '1.59'
Expand Down
Expand Up @@ -3,4 +3,4 @@ distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-4.1-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-4.10.2-bin.zip
9 changes: 9 additions & 0 deletions symmetric-assemble/src/asciidoc/appendix/mysql.ad
Expand Up @@ -31,6 +31,15 @@ grant create routine on *.* to symmetric;
grant process on *.* to symmetric;
----

Starting in MySQL 5.7.6, the "PROCESS" privilege is also required for the MySQL user that is modifying the application tables.
This is required to look up the transaction id. Internally, the trigger will submit this query during an insert/update/delete:

select TRX_ID from INFORMATION_SCHEMA.INNODB_TRX where TRX_MYSQL_THREAD_ID = CONNECTION_ID();

----
grant process on *.* to db_user;
----

MySQL allows '0000-00-00 00:00:00' to be entered as a value for datetime and timestamp columns.
JDBC cannot deal with a date value with a year of 0. In order to work around this SymmetricDS can be configured to treat date and time
columns as varchar columns for data capture and data load. To enable this feature set the db.treat.date.time.as.varchar.enabled property to true.
Expand Down
Expand Up @@ -191,20 +191,20 @@ public void createRequiredDatabaseObjects() {

String wkt2geom = this.parameterService.getTablePrefix() + "_" + "wkt2geom";
if (!installed(SQL_OBJECT_INSTALLED, wkt2geom)) {
String sql = " CREATE OR REPLACE "
+ " FUNCTION $(functionName)( "
+ " clob_in IN CLOB) "
+ " RETURN SDO_GEOMETRY "
+ " AS "
+ " v_out SDO_GEOMETRY := NULL; "
+ " BEGIN "
+ " IF clob_in IS NOT NULL THEN "
+ " IF DBMS_LOB.GETLENGTH(clob_in) > 0 THEN "
+ " v_out := SDO_GEOMETRY(clob_in); "
+ " END IF; "
+ " END IF; "
+ " RETURN v_out; "
+ " END $(functionName); ";
String sql = " CREATE OR REPLACE FUNCTION $(functionName) ( \r\n"
+ " clob_in IN CLOB, \r\n"
+ " srid_in IN INTEGER) \r\n"
+ " RETURN SDO_GEOMETRY \r\n"
+ " AS \r\n"
+ " v_out SDO_GEOMETRY := NULL; \r\n"
+ " BEGIN \r\n"
+ " IF clob_in IS NOT NULL THEN \r\n"
+ " IF DBMS_LOB.GETLENGTH(clob_in) > 0 THEN \r\n"
+ " v_out := SDO_GEOMETRY(clob_in, srid_in); \r\n"
+ " END IF; \r\n"
+ " END IF; \r\n"
+ " RETURN v_out; \r\n"
+ " END $(functionName); \r\n";
install(sql, wkt2geom);
}

Expand Down
@@ -1,6 +1,8 @@
package org.jumpmind.symmetric.io;

import org.jumpmind.db.platform.IDatabasePlatform;
import org.jumpmind.db.sql.JdbcSqlTemplate;
import org.jumpmind.db.sql.JdbcSqlTransaction;
import org.jumpmind.symmetric.common.ContextConstants;
import org.jumpmind.symmetric.io.data.Batch;
import org.jumpmind.symmetric.io.data.CsvData;
Expand All @@ -22,6 +24,7 @@ public AbstractBulkDatabaseWriter(IDatabasePlatform symmetricPlatform, IDatabase
public void start(Batch batch) {
super.start(batch);
if (isFallBackToDefault()) {
getTransaction().setInBatchMode(false);
log.debug("Writing batch " + batch.getBatchId() + " on channel " + batch.getChannelId() + " to node " + batch.getTargetNodeId() + " using DEFAULT loader");
}else{
log.debug("Writing batch " + batch.getBatchId() + " on channel " + batch.getChannelId() + " to node " + batch.getTargetNodeId() + " using BULK loader");
Expand Down
Expand Up @@ -34,7 +34,7 @@ protected LoadStatus insert(CsvData data) {

@Override
protected LoadStatus update(CsvData data, boolean applyChangesOnly, boolean useConflictDetection) {
LoadStatus loadStatus = super.insert(data);
LoadStatus loadStatus = super.update(data, applyChangesOnly, useConflictDetection);
if (loadStatus == LoadStatus.CONFLICT) {
loadStatus = LoadStatus.SUCCESS;
}
Expand Down
2 changes: 1 addition & 1 deletion symmetric-core/build.gradle
Expand Up @@ -7,7 +7,7 @@ apply from: symAssembleDir + '/common.gradle'
compile project(":symmetric-util")
compile "commons-fileupload:commons-fileupload:$commonsFileuploadVersion"
compile "javax.mail:mail:1.4.5"
compile "com.fasterxml.jackson.core:jackson-databind:2.9.3"
compile "com.fasterxml.jackson.core:jackson-databind:2.9.5"
compile "com.google.code.gson:gson:2.8.2"
compile "org.springframework:spring-core:$springVersion"

Expand Down
Expand Up @@ -231,7 +231,7 @@ else if (Long.class.equals(propertyTypeClass)) {
}
}
}
sendKafkaMessage(pojo, kafkaDataKey);
sendKafkaMessageByObject(pojo, kafkaDataKey);
} else {
throw new RuntimeException("Unable to find a POJO to load for AVRO based message onto Kafka for table : " + tableName);
}
Expand Down Expand Up @@ -396,19 +396,23 @@ public void earlyCommit(DataContext context) {
public void batchComplete(DataContext context) {
if (!context.getBatch().getChannelId().equals("heartbeat") && !context.getBatch().getChannelId().equals("config")) {
String batchFileName = "batch-" + context.getBatch().getSourceNodeId() + "-" + context.getBatch().getBatchId();

KafkaProducer<String, String> producer = new KafkaProducer<String, String>(configs);
try {
if (confluentUrl == null && kafkaDataMap.size() > 0) {
StringBuffer kafkaText = new StringBuffer();


for (Map.Entry<String, List<String>> entry : kafkaDataMap.entrySet()) {
for (String row : entry.getValue()) {
if (messageBy.equals(KAFKA_MESSAGE_BY_ROW)) {
sendKafkaMessage(row, entry.getKey());
sendKafkaMessage(producer, row, entry.getKey());
} else {
kafkaText.append(row);
}
}
if (messageBy.equals(KAFKA_MESSAGE_BY_BATCH)) {
sendKafkaMessage(kafkaText.toString(), entry.getKey());
sendKafkaMessage(producer, kafkaText.toString(), entry.getKey());
}
}
kafkaDataMap = new HashMap<String, List<String>>();
Expand All @@ -417,6 +421,7 @@ public void batchComplete(DataContext context) {
log.warn("Unable to write batch to Kafka " + batchFileName, e);
e.printStackTrace();
} finally {
producer.close();
context.put(KAFKA_TEXT_CACHE, new HashMap<String, List<String>>());
tableNameCache.clear();
tableColumnCache = new HashMap<String, Map<String, String>>();
Expand All @@ -430,15 +435,12 @@ public void batchCommitted(DataContext context) {
public void batchRolledback(DataContext context) {
}

public void sendKafkaMessage(String kafkaText, String topic) {
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(configs);
public void sendKafkaMessage(KafkaProducer<String, String> producer, String kafkaText, String topic) {
producer.send(new ProducerRecord<String, String>(topic, kafkaText));
log.debug("Data to be sent to Kafka-" + kafkaText);

producer.close();
}

public void sendKafkaMessage(Object bean, String topic) {
public void sendKafkaMessageByObject(Object bean, String topic) {
KafkaProducer<String, Object> producer = new KafkaProducer<String, Object>(configs);
producer.send(new ProducerRecord<String, Object>(topic, bean));
producer.close();
Expand Down
Expand Up @@ -61,7 +61,7 @@ public MonitorEvent check(Monitor monitor) {
MonitorEvent event = new MonitorEvent();
long usage = 0;
if (tenuredPool != null) {
usage = (long) (tenuredPool.getUsage().getUsed() / tenuredPool.getUsage().getMax());
usage = (long) ((double)tenuredPool.getUsage().getUsed() / (double)tenuredPool.getUsage().getMax() * 100);
}
event.setValue(usage);
return event;
Expand Down
Expand Up @@ -38,6 +38,7 @@
import org.apache.commons.collections.map.CaseInsensitiveMap;
import org.apache.commons.io.IOUtils;
import org.apache.commons.lang.NotImplementedException;
import org.apache.commons.lang.ObjectUtils;
import org.apache.commons.lang.StringEscapeUtils;
import org.apache.commons.lang.StringUtils;
import org.jumpmind.db.model.Column;
Expand Down Expand Up @@ -464,7 +465,7 @@ public void insertReloadEvents(Node targetNode, boolean reverse, List<TableReloa
createBy, transactional, transaction);
}
}
Map<String, TableReloadRequest> mapReloadRequests = convertReloadListToMap(reloadRequests);
Map<String, TableReloadRequest> mapReloadRequests = convertReloadListToMap(reloadRequests, triggerRouters, isFullLoad);

String symNodeSecurityReloadChannel = null;
int totalTableCount = 0;
Expand Down Expand Up @@ -585,17 +586,37 @@ private String findChannelFor(TriggerHistory history, List<TriggerRouter> trigge
}

@SuppressWarnings("unchecked")
protected Map<String, TableReloadRequest> convertReloadListToMap(List<TableReloadRequest> reloadRequests) {
protected Map<String, TableReloadRequest> convertReloadListToMap(List<TableReloadRequest> reloadRequests, List<TriggerRouter> triggerRouters, boolean isFullLoad) {
if (reloadRequests == null) {
return null;
}
Map<String, TableReloadRequest> reloadMap = new CaseInsensitiveMap();
for (TableReloadRequest item : reloadRequests) {
reloadMap.put(item.getIdentifier(), item);
for (TableReloadRequest reloadRequest : reloadRequests) {
if (!isFullLoad) {
validate(reloadRequest, triggerRouters);
}
reloadMap.put(reloadRequest.getIdentifier(), reloadRequest);
}
return reloadMap;
}

protected void validate(TableReloadRequest reloadRequest, List<TriggerRouter> triggerRouters) {
boolean validMatch = false;
for (TriggerRouter triggerRouter : triggerRouters) {
if (ObjectUtils.equals(triggerRouter.getTriggerId(), reloadRequest.getTriggerId())
&& ObjectUtils.equals(triggerRouter.getRouterId(), reloadRequest.getRouterId())) {
validMatch = true;
break;
}
}

if (!validMatch) {
throw new SymmetricException("Table reload request submitted which does not have a valid trigger/router "
+ "combination in sym_trigger_router. Request trigger id: '" + reloadRequest.getTriggerId() + "' router id: '"
+ reloadRequest.getRouterId() + "' create time: " + reloadRequest.getCreateTime());
}
}

private void callReloadListeners(boolean before, Node targetNode, boolean transactional,
ISqlTransaction transaction, long loadId) {
for (IReloadListener listener : extensionService.getExtensionPointList(IReloadListener.class)) {
Expand Down
Expand Up @@ -146,6 +146,10 @@ public void trackChanges(boolean force) {
try {
log.debug("Tracking changes for file sync");
Node local = engine.getNodeService().findIdentity();
if (local == null) {
log.warn("Not running file sync trackChanges because the local node is not available yet. It may not be registered yet.");
return;
}
ProcessInfo processInfo = engine.getStatisticManager().newProcessInfo(
new ProcessInfoKey(local.getNodeId(), null, ProcessType.FILE_SYNC_TRACKER));
boolean useCrc = engine.getParameterService().is(ParameterConstants.FILE_SYNC_USE_CRC);
Expand Down
Expand Up @@ -933,7 +933,11 @@ public AuthenticationStatus getAuthenticationStatus(String nodeId, String securi
if (node == null) {
retVal = AuthenticationStatus.REGISTRATION_REQUIRED;
} else if (!syncEnabled(node)) {
retVal = AuthenticationStatus.SYNC_DISABLED;
if(registrationOpen(node)){
retVal = AuthenticationStatus.REGISTRATION_REQUIRED;
}else{
retVal = AuthenticationStatus.SYNC_DISABLED;
}
} else if (!isNodeAuthorized(nodeId, securityToken)) {
retVal = AuthenticationStatus.FORBIDDEN;
}
Expand All @@ -946,6 +950,14 @@ protected boolean syncEnabled(Node node) {
syncEnabled = node.isSyncEnabled();
}
return syncEnabled;
}
}

protected boolean registrationOpen(Node node){
NodeSecurity security = findNodeSecurity(node.getNodeId());
if(security != null){
return security.isRegistrationEnabled();
}
return false;
}

}
Expand Up @@ -1343,20 +1343,14 @@ public void dropTriggers(TriggerHistory history) {

protected void dropTriggers(TriggerHistory history, StringBuilder sqlBuffer) {
try {
if (StringUtils.isNotBlank(history.getNameForInsertTrigger())) {
symmetricDialect.removeTrigger(sqlBuffer, history.getSourceCatalogName(), history.getSourceSchemaName(),
history.getNameForInsertTrigger(), history.getSourceTableName());
}
dropTrigger(sqlBuffer, history.getSourceCatalogName(), history.getSourceSchemaName(),
history.getNameForInsertTrigger(), history.getSourceTableName());

if (StringUtils.isNotBlank(history.getNameForDeleteTrigger())) {
symmetricDialect.removeTrigger(sqlBuffer, history.getSourceCatalogName(), history.getSourceSchemaName(),
history.getNameForDeleteTrigger(), history.getSourceTableName());
}
dropTrigger(sqlBuffer, history.getSourceCatalogName(), history.getSourceSchemaName(),
history.getNameForDeleteTrigger(), history.getSourceTableName());

if (StringUtils.isNotBlank(history.getNameForUpdateTrigger())) {
symmetricDialect.removeTrigger(sqlBuffer, history.getSourceCatalogName(), history.getSourceSchemaName(),
history.getNameForUpdateTrigger(), history.getSourceTableName());
}
dropTrigger(sqlBuffer, history.getSourceCatalogName(), history.getSourceSchemaName(),
history.getNameForUpdateTrigger(), history.getSourceTableName());

if (parameterService.is(ParameterConstants.AUTO_SYNC_TRIGGERS)) {
for (ITriggerCreationListener l : extensionService.getExtensionPointList(ITriggerCreationListener.class)) {
Expand All @@ -1381,6 +1375,16 @@ protected void dropTriggers(TriggerHistory history, StringBuilder sqlBuffer) {
log.error("Error while dropping triggers for table {}", history.getSourceTableName(), ex);
}
}

protected void dropTrigger(StringBuilder sqlBuffer, String catalog, String schema, String triggerName, String tableName) {
if (StringUtils.isNotBlank(triggerName)) {
try {
symmetricDialect.removeTrigger(sqlBuffer, catalog, schema, triggerName, tableName);
} catch (Throwable e) {
log.error("Error while dropping trigger {} for table {}", triggerName, tableName, e);
}
}
}

protected List<TriggerRouter> toList(Collection<List<TriggerRouter>> source) {
ArrayList<TriggerRouter> list = new ArrayList<TriggerRouter>();
Expand Down Expand Up @@ -1507,8 +1511,8 @@ public void syncTriggers(Table table, boolean force) {
List<Trigger> triggersForCurrentNode = getTriggersForCurrentNode();
List<TriggerHistory> activeTriggerHistories = getActiveTriggerHistories();
for (Trigger trigger : triggersForCurrentNode) {
if (trigger.matches(table, platform.getDefaultCatalog(), platform.getDefaultSchema(),
ignoreCase)) {
if (trigger.matches(table, platform.getDefaultCatalog(), platform.getDefaultSchema(), ignoreCase) &&
(!trigger.isSourceTableNameWildCarded() || !containsExactMatchForSourceTableName(table, triggersForCurrentNode, ignoreCase))) {
log.info("Synchronizing triggers for {}", table.getFullyQualifiedTableName());
updateOrCreateDatabaseTriggers(trigger, table, null, force, true, activeTriggerHistories);
log.info("Done synchronizing triggers for {}", table.getFullyQualifiedTableName());
Expand Down

0 comments on commit 7fdfa5c

Please sign in to comment.