Skip to content

Commit

Permalink
Merge branch '3.9' of https://github.com/JumpMind/symmetric-ds.git in…
Browse files Browse the repository at this point in the history
…to 3.9
  • Loading branch information
erilong committed Oct 31, 2018
2 parents c451685 + 7fdfa5c commit 6109bc0
Show file tree
Hide file tree
Showing 9 changed files with 37 additions and 17 deletions.
2 changes: 1 addition & 1 deletion symmetric-assemble/common.gradle
Expand Up @@ -191,7 +191,7 @@ subprojects { subproject ->
powerMockVersion = '1.5.3'
mysqlVersion = '5.1.45'
servletVersion = '3.1.0'
springVersion = '4.3.13.RELEASE'
springVersion = '4.3.16.RELEASE'
jtdsVersion = '1.2.8'
voltDbVersion = '6.2'
bouncyCastleVersion = '1.59'
Expand Down
Expand Up @@ -3,4 +3,4 @@ distributionBase=GRADLE_USER_HOME
distributionPath=wrapper/dists
zipStoreBase=GRADLE_USER_HOME
zipStorePath=wrapper/dists
distributionUrl=https\://services.gradle.org/distributions/gradle-4.1-bin.zip
distributionUrl=https\://services.gradle.org/distributions/gradle-4.10.2-bin.zip
9 changes: 9 additions & 0 deletions symmetric-assemble/src/asciidoc/appendix/mysql.ad
Expand Up @@ -31,6 +31,15 @@ grant create routine on *.* to symmetric;
grant process on *.* to symmetric;
----

Starting in MySQL 5.7.6, the "PROCESS" privilege is also required for the MySQL user that is modifying the application tables.
This is required to look up the transaction id. Internally, the trigger will submit this query during an insert/update/delete:

select TRX_ID from INFORMATION_SCHEMA.INNODB_TRX where TRX_MYSQL_THREAD_ID = CONNECTION_ID();

----
grant process on *.* to db_user;
----

MySQL allows '0000-00-00 00:00:00' to be entered as a value for datetime and timestamp columns.
JDBC cannot deal with a date value with a year of 0. In order to work around this SymmetricDS can be configured to treat date and time
columns as varchar columns for data capture and data load. To enable this feature set the db.treat.date.time.as.varchar.enabled property to true.
Expand Down
2 changes: 1 addition & 1 deletion symmetric-core/build.gradle
Expand Up @@ -7,7 +7,7 @@ apply from: symAssembleDir + '/common.gradle'
compile project(":symmetric-util")
compile "commons-fileupload:commons-fileupload:$commonsFileuploadVersion"
compile "javax.mail:mail:1.4.5"
compile "com.fasterxml.jackson.core:jackson-databind:2.9.3"
compile "com.fasterxml.jackson.core:jackson-databind:2.9.5"
compile "com.google.code.gson:gson:2.8.2"
compile "org.springframework:spring-core:$springVersion"

Expand Down
Expand Up @@ -231,7 +231,7 @@ else if (Long.class.equals(propertyTypeClass)) {
}
}
}
sendKafkaMessage(pojo, kafkaDataKey);
sendKafkaMessageByObject(pojo, kafkaDataKey);
} else {
throw new RuntimeException("Unable to find a POJO to load for AVRO based message onto Kafka for table : " + tableName);
}
Expand Down Expand Up @@ -396,19 +396,23 @@ public void earlyCommit(DataContext context) {
public void batchComplete(DataContext context) {
if (!context.getBatch().getChannelId().equals("heartbeat") && !context.getBatch().getChannelId().equals("config")) {
String batchFileName = "batch-" + context.getBatch().getSourceNodeId() + "-" + context.getBatch().getBatchId();

KafkaProducer<String, String> producer = new KafkaProducer<String, String>(configs);
try {
if (confluentUrl == null && kafkaDataMap.size() > 0) {
StringBuffer kafkaText = new StringBuffer();


for (Map.Entry<String, List<String>> entry : kafkaDataMap.entrySet()) {
for (String row : entry.getValue()) {
if (messageBy.equals(KAFKA_MESSAGE_BY_ROW)) {
sendKafkaMessage(row, entry.getKey());
sendKafkaMessage(producer, row, entry.getKey());
} else {
kafkaText.append(row);
}
}
if (messageBy.equals(KAFKA_MESSAGE_BY_BATCH)) {
sendKafkaMessage(kafkaText.toString(), entry.getKey());
sendKafkaMessage(producer, kafkaText.toString(), entry.getKey());
}
}
kafkaDataMap = new HashMap<String, List<String>>();
Expand All @@ -417,6 +421,7 @@ public void batchComplete(DataContext context) {
log.warn("Unable to write batch to Kafka " + batchFileName, e);
e.printStackTrace();
} finally {
producer.close();
context.put(KAFKA_TEXT_CACHE, new HashMap<String, List<String>>());
tableNameCache.clear();
tableColumnCache = new HashMap<String, Map<String, String>>();
Expand All @@ -430,15 +435,12 @@ public void batchCommitted(DataContext context) {
public void batchRolledback(DataContext context) {
}

public void sendKafkaMessage(String kafkaText, String topic) {
KafkaProducer<String, String> producer = new KafkaProducer<String, String>(configs);
public void sendKafkaMessage(KafkaProducer<String, String> producer, String kafkaText, String topic) {
producer.send(new ProducerRecord<String, String>(topic, kafkaText));
log.debug("Data to be sent to Kafka-" + kafkaText);

producer.close();
}

public void sendKafkaMessage(Object bean, String topic) {
public void sendKafkaMessageByObject(Object bean, String topic) {
KafkaProducer<String, Object> producer = new KafkaProducer<String, Object>(configs);
producer.send(new ProducerRecord<String, Object>(topic, bean));
producer.close();
Expand Down
Expand Up @@ -61,7 +61,7 @@ public MonitorEvent check(Monitor monitor) {
MonitorEvent event = new MonitorEvent();
long usage = 0;
if (tenuredPool != null) {
usage = (long) (tenuredPool.getUsage().getUsed() / tenuredPool.getUsage().getMax());
usage = (long) ((double)tenuredPool.getUsage().getUsed() / (double)tenuredPool.getUsage().getMax() * 100);
}
event.setValue(usage);
return event;
Expand Down
Expand Up @@ -465,7 +465,7 @@ public void insertReloadEvents(Node targetNode, boolean reverse, List<TableReloa
createBy, transactional, transaction);
}
}
Map<String, TableReloadRequest> mapReloadRequests = convertReloadListToMap(reloadRequests, triggerRouters);
Map<String, TableReloadRequest> mapReloadRequests = convertReloadListToMap(reloadRequests, triggerRouters, isFullLoad);

String symNodeSecurityReloadChannel = null;
int totalTableCount = 0;
Expand Down Expand Up @@ -586,13 +586,15 @@ private String findChannelFor(TriggerHistory history, List<TriggerRouter> trigge
}

@SuppressWarnings("unchecked")
protected Map<String, TableReloadRequest> convertReloadListToMap(List<TableReloadRequest> reloadRequests, List<TriggerRouter> triggerRouters) {
protected Map<String, TableReloadRequest> convertReloadListToMap(List<TableReloadRequest> reloadRequests, List<TriggerRouter> triggerRouters, boolean isFullLoad) {
if (reloadRequests == null) {
return null;
}
Map<String, TableReloadRequest> reloadMap = new CaseInsensitiveMap();
for (TableReloadRequest reloadRequest : reloadRequests) {
validate(reloadRequest, triggerRouters);
if (!isFullLoad) {
validate(reloadRequest, triggerRouters);
}
reloadMap.put(reloadRequest.getIdentifier(), reloadRequest);
}
return reloadMap;
Expand Down
7 changes: 6 additions & 1 deletion symmetric-db/src/main/java/org/jumpmind/db/sql/Row.java
Expand Up @@ -265,7 +265,12 @@ protected void checkForColumn(String columnName) {
}

final private java.util.Date getDate(String value, String[] pattern) {
return FormatUtils.parseDate(value, pattern);
int fractionIndex = value.lastIndexOf(".");
if (fractionIndex > 0 && value.substring(fractionIndex, value.length()).length() > 3) {
return Timestamp.valueOf(value);
} else {
return FormatUtils.parseDate(value, pattern);
}
}

public Object[] toArray(String[] keys) {
Expand Down
4 changes: 3 additions & 1 deletion symmetric-sqlexplorer/build.gradle
Expand Up @@ -38,7 +38,9 @@ artifacts {
}

configurations.archives.with {
artifacts.remove artifacts.find { it.archiveTask.is war }


artifacts.remove artifacts.find { it.type == 'war' }
}

dependencies {
Expand Down

0 comments on commit 6109bc0

Please sign in to comment.