Skip to content

Commit

Permalink
Merge branch '3.10' of https://github.com/JumpMind/symmetric-ds.git i…
Browse files Browse the repository at this point in the history
…nto 3.10
  • Loading branch information
jumpmind-josh committed Oct 31, 2019
2 parents 602e2f3 + 9859752 commit a7245b0
Show file tree
Hide file tree
Showing 43 changed files with 394 additions and 114 deletions.
4 changes: 3 additions & 1 deletion .gitignore
Expand Up @@ -2,4 +2,6 @@
/.project
*.iml

/.idea/*
/.idea/*
*.ipr
*.iws
8 changes: 8 additions & 0 deletions symmetric-assemble/build.gradle
Expand Up @@ -17,6 +17,14 @@ buildscript {
}
}

apply plugin: 'java'
apply plugin: 'idea'
apply plugin: 'eclipse'

task develop {
dependsOn tasks.cleanEclipse, tasks.eclipse, tasks.cleanIdea, tasks.idea
}

task buildScriptDependencies(type: org.gradle.api.tasks.diagnostics.DependencyReportTask) {
configurations = project.buildscript.configurations
}
Expand Down
15 changes: 13 additions & 2 deletions symmetric-assemble/common.gradle
Expand Up @@ -23,6 +23,7 @@ dependencies {
allprojects {

apply plugin: 'eclipse'
apply plugin: 'idea'

group = 'org.jumpmind.symmetric'

Expand Down Expand Up @@ -114,6 +115,16 @@ subprojects { subproject ->
project.natures.remove('org.eclipse.buildship.core.gradleprojectnature')
}
}

idea {
module {
testResourceDirs = []
testSourceDirs = [ file('src/test/java'), file('src/test/groovy')]
resourceDirs += file('src/test/resources')
downloadJavadoc = true
downloadSources = true
}
}

javadoc {
failOnError = false
Expand Down Expand Up @@ -202,7 +213,7 @@ subprojects { subproject ->
h2Version = '1.3.176'
derbyVersion = '10.14.2.0'
mariaDbVersion = '2.4.0'
postgresqlVersion = '42.2.5'
postgresqlVersion = '42.2.8'
hsqldbVersion = '2.4.1'
sqliteVersion = '3.25.2'
firebirdVersion = '3.0.5'
Expand Down Expand Up @@ -287,7 +298,7 @@ subprojects { subproject ->
}

task develop {
dependsOn tasks.cleanEclipse, tasks.eclipse
dependsOn tasks.cleanEclipse, tasks.eclipse, tasks.cleanIdea, tasks.idea

doLast {
def wstFacetXml = "$projectDir/.settings/org.eclipse.wst.common.project.facet.core.xml"
Expand Down
Expand Up @@ -31,6 +31,8 @@ The SQL statement has access to the following variables that are replaced before

|:EXTERNAL_DATA|The external data for current row, as configured by sym_trigger.external_select.

|:DATA_EVENT_TYPE|The event type of either INSERT, UPDATE, or DELETE.

|:TABLE_NAME|The table name for the current row.

|:COLUMN_NAME|Variables named for each column name (in uppercase), which return the column value for the new row.
Expand Down
13 changes: 6 additions & 7 deletions symmetric-client/build.gradle
Expand Up @@ -8,12 +8,11 @@ apply from: symAssembleDir + '/common.gradle'
compile "org.springframework:spring-context:$springVersion"
compile "commons-cli:commons-cli:$commonsCliVersion"
compile "commons-vfs:commons-vfs:1.0"
// compile "com.jcraft:jsch:0.1.48"

provided "org.mongodb:mongo-java-driver:2.12.3"
provided "org.codehaus.mojo:animal-sniffer-annotations:$animalSnifferVersion"

provided ("com.amazon.redshift:redshift-jdbc42-no-awssdk:1.2.18.1036") {
provided ("com.amazon.redshift:redshift-jdbc42-no-awssdk:1.2.36.1060") {
exclude group: 'org.apache.httpcomponents'
exclude group: 'commons-logging'
exclude group: 'com.fasterxml.jackson.core'
Expand All @@ -28,13 +27,13 @@ apply from: symAssembleDir + '/common.gradle'
}

provided ("com.microsoft.azure:azure-storage:8.1.0") {
exclude group: 'com.fasterxml.jackson.core'
exclude group: 'org.slf4j'
}
exclude group: 'com.fasterxml.jackson.core'
exclude group: 'org.slf4j'
}

provided ("net.snowflake:snowflake-jdbc:3.6.27") {
exclude group: 'com.fasterxml.jackson.core'
}
exclude group: 'com.fasterxml.jackson.core'
}

provided 'org.apache.phoenix:phoenix:5.0.0-HBase-2.0'

Expand Down
Expand Up @@ -22,6 +22,7 @@

import java.util.HashMap;

import org.jumpmind.db.util.BasicDataSourcePropertyConstants;
import org.jumpmind.symmetric.db.AbstractTriggerTemplate;
import org.jumpmind.symmetric.db.ISymmetricDialect;
import org.jumpmind.symmetric.util.SymmetricUtils;
Expand All @@ -34,7 +35,7 @@ public MySqlTriggerTemplate(ISymmetricDialect symmetricDialect) {
stringColumnTemplate = "cast(if($(tableAlias).`$(columnName)` is null,'',concat('\"',replace(replace($(tableAlias).`$(columnName)`,'\\\\','\\\\\\\\'),'\"','\\\\\"'),'\"')) as char)\n" ;
geometryColumnTemplate = "if($(tableAlias).`$(columnName)` is null,'',concat('\"',replace(replace(astext($(tableAlias).`$(columnName)`),'\\\\','\\\\\\\\'),'\"','\\\\\"'),'\"'))\n" ;
numberColumnTemplate = "if($(tableAlias).`$(columnName)` is null,'',concat('\"',cast($(tableAlias).`$(columnName)` as char),'\"'))\n" ;
datetimeColumnTemplate = "if($(tableAlias).`$(columnName)` is null,'',concat('\"',cast($(tableAlias).`$(columnName)` as char),'\"'))\n" ;
datetimeColumnTemplate = "if($(tableAlias).`$(columnName)` is null" + getConvertZeroDateToNull() + ",'',concat('\"',cast($(tableAlias).`$(columnName)` as char),'\"'))\n" ;
clobColumnTemplate = stringColumnTemplate;
blobColumnTemplate = "if($(tableAlias).`$(columnName)` is null,'',concat('\"',hex($(tableAlias).`$(columnName)`),'\"'))\n" ;
booleanColumnTemplate = "if($(tableAlias).`$(columnName)` is null,'',concat('\"',cast($(tableAlias).`$(columnName)` as unsigned),'\"'))\n" ;
Expand Down Expand Up @@ -163,6 +164,13 @@ public MySqlTriggerTemplate(ISymmetricDialect symmetricDialect) {
"select concat($(columns)) from $(schemaName)$(tableName) t where $(whereClause) " );
}

protected String getConvertZeroDateToNull() {
if (symmetricDialect.getParameterService().getString(BasicDataSourcePropertyConstants.DB_POOL_URL).contains("zeroDateTimeBehavior=convertToNull")) {
return " or $(tableAlias).`$(columnName)` = '0000-00-00'";
}
return "";
}

@Override
protected String castDatetimeColumnToString(String columnName) {
return "cast(\n" + SymmetricUtils.quote(symmetricDialect, columnName) + " as char) as \n" + columnName;
Expand Down
Expand Up @@ -94,7 +94,7 @@ public PostgreSqlTriggerTemplate(ISymmetricDialect symmetricDialect) {
" $(custom_on_insert_text) \n" +
" return null; \n" +
" end; \n" +
" $function$ language plpgsql; " );
" $function$ language plpgsql" + getSecurityClause() + ";");

sqlTemplates.put("insertReloadTriggerTemplate" ,
"create or replace function $(schemaName)f$(triggerName)() returns trigger as $function$ \n" +
Expand All @@ -118,7 +118,7 @@ public PostgreSqlTriggerTemplate(ISymmetricDialect symmetricDialect) {
" $(custom_on_insert_text) \n" +
" return null; \n" +
" end; \n" +
" $function$ language plpgsql; " );
" $function$ language plpgsql" + getSecurityClause() + ";");


sqlTemplates.put("insertPostTriggerTemplate" ,
Expand Down Expand Up @@ -155,7 +155,7 @@ public PostgreSqlTriggerTemplate(ISymmetricDialect symmetricDialect) {
" $(custom_on_update_text) \n" +
" return null; \n" +
" end; \n" +
" $function$ language plpgsql; " );
" $function$ language plpgsql" + getSecurityClause() + ";");

sqlTemplates.put("updateReloadTriggerTemplate" ,
"create or replace function $(schemaName)f$(triggerName)() returns trigger as $function$ \n" +
Expand Down Expand Up @@ -185,7 +185,7 @@ public PostgreSqlTriggerTemplate(ISymmetricDialect symmetricDialect) {
" $(custom_on_update_text) \n" +
" return null; \n" +
" end; \n" +
" $function$ language plpgsql; " );
" $function$ language plpgsql" + getSecurityClause() + ";");

sqlTemplates.put("updatePostTriggerTemplate" ,
"create trigger $(triggerName) after update on $(schemaName)$(tableName) \n" +
Expand Down Expand Up @@ -214,7 +214,7 @@ public PostgreSqlTriggerTemplate(ISymmetricDialect symmetricDialect) {
" $(custom_on_delete_text) \n" +
" return null; \n" +
" end; \n" +
" $function$ language plpgsql; " );
" $function$ language plpgsql" + getSecurityClause() + ";");

sqlTemplates.put("deletePostTriggerTemplate" ,
"create trigger $(triggerName) after delete on $(schemaName)$(tableName) \n" +
Expand All @@ -237,5 +237,12 @@ protected String getCreateTimeExpression(ISymmetricDialect symmetricDialect) {
return String.format("CURRENT_TIMESTAMP AT TIME ZONE '%s'", timezone);
}
}

protected String getSecurityClause() {
if (symmetricDialect.getParameterService().is(ParameterConstants.POSTGRES_SECURITY_DEFINER, false)) {
return " security definer";
}
return "";
}

}
Expand Up @@ -26,15 +26,15 @@
import org.jumpmind.symmetric.ISymmetricEngine;
import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.service.ClusterConstants;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;

/*
* Background job that is responsible for purging already synchronized data
*/
/*
* Background job that is responsible for purging already synchronized data
*/
public class IncomingPurgeJob extends AbstractJob {

public IncomingPurgeJob(ISymmetricEngine engine, ThreadPoolTaskScheduler taskScheduler) {
super(ClusterConstants.PURGE_INCOMING, engine, taskScheduler);
public IncomingPurgeJob(ISymmetricEngine engine, ThreadPoolTaskScheduler taskScheduler) {
super(ClusterConstants.PURGE_INCOMING, engine, taskScheduler);
}

@Override
Expand All @@ -45,13 +45,13 @@ public JobDefaults getDefaults() {
}

@Override
public void doJob(boolean force) throws Exception {
engine.getPurgeService().purgeIncoming(force);
public void doJob(boolean force) throws Exception {
setProcessedCount(engine.getPurgeService().purgeIncoming(force));
}

@Override
public String getDeprecatedStartParameter() {
return ParameterConstants.START_PURGE_JOB_38;
}

}
}
Expand Up @@ -26,15 +26,15 @@
import org.jumpmind.symmetric.ISymmetricEngine;
import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.service.ClusterConstants;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;

/*
* Background job that is responsible for purging already synchronized data
*/
/*
* Background job that is responsible for purging already synchronized data
*/
public class OutgoingPurgeJob extends AbstractJob {

public OutgoingPurgeJob(ISymmetricEngine engine, ThreadPoolTaskScheduler taskScheduler) {
super(ClusterConstants.PURGE_OUTGOING, engine, taskScheduler);
public OutgoingPurgeJob(ISymmetricEngine engine, ThreadPoolTaskScheduler taskScheduler) {
super(ClusterConstants.PURGE_OUTGOING, engine, taskScheduler);
}

@Override
Expand All @@ -45,8 +45,8 @@ public JobDefaults getDefaults() {
}

@Override
public void doJob(boolean force) throws Exception {
engine.getPurgeService().purgeOutgoing(force);
public void doJob(boolean force) throws Exception {
setProcessedCount(engine.getPurgeService().purgeOutgoing(force));
}

@Override
Expand All @@ -55,4 +55,4 @@ public String getDeprecatedStartParameter() {
}


}
}
Expand Up @@ -45,8 +45,10 @@ public JobDefaults getDefaults() {
public void doJob(boolean force) throws Exception {
IStagingManager stagingManager = engine.getStagingManager();
if (stagingManager != null) {
stagingManager.clean(engine.getParameterService()
long processed = stagingManager.clean(engine.getParameterService()
.getLong(ParameterConstants.STREAM_TO_FILE_TIME_TO_LIVE_MS));

setProcessedCount(processed);
}
}

Expand Down
Expand Up @@ -513,6 +513,7 @@ private ParameterConstants() {
public final static String REDSHIFT_BULK_LOAD_S3_SECRET_KEY = "redshift.bulk.load.s3.secret.key";
public final static String REDSHIFT_BULK_LOAD_S3_ENDPOINT = "redshift.bulk.load.s3.endpoint";

public final static String POSTGRES_SECURITY_DEFINER = "postgres.security.definer";

public static Map<String, ParameterMetaData> getParameterMetaData() {
return parameterMetaData;
Expand Down
Expand Up @@ -21,6 +21,7 @@
package org.jumpmind.symmetric.job;


import java.util.HashSet;
import java.util.Set;

import org.apache.commons.lang.StringUtils;
Expand Down Expand Up @@ -87,7 +88,10 @@ public void heartbeat(Node me) {
}

log.debug("Updating my node info");
if (engine.getOutgoingBatchService().countOutgoingBatchesUnsentHeartbeat() == 0) {
Set<Node> targetNodes = new HashSet<Node>();
targetNodes.addAll(engine.getNodeService().findNodesWhoPullFromMe());
targetNodes.addAll(engine.getNodeService().findNodesToPushTo());
if (engine.getOutgoingBatchService().countOutgoingBatchesUnsentHeartbeat() < targetNodes.size() || targetNodes.size() == 0) {
engine.getNodeService().updateNodeHostForCurrentNode();
}
log.debug("Done updating my node info");
Expand Down

0 comments on commit a7245b0

Please sign in to comment.