Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
3.8

Conflicts:
	symmetric-jdbc/src/test/resources/db-test.properties
  • Loading branch information
chenson42 committed Apr 28, 2017
2 parents 7c3ce1c + 42da3a5 commit 7066525
Show file tree
Hide file tree
Showing 41 changed files with 439 additions and 83 deletions.
5 changes: 3 additions & 2 deletions symmetric-assemble/src/asciidoc/appendix/postgresql.ad
Expand Up @@ -8,13 +8,14 @@ Starting with PostgreSQL 8.3, SymmetricDS supports the transaction identifier.
Binary Large Object (BLOB) replication is supported for both byte array (BYTEA) and object ID (OID) data types.

In order to function properly, SymmetricDS needs to use session variables.
On PostgreSQL, session variables are enabled using a custom variable class. Add the following line to the postgresql.conf file of PostgreSQL server:
Before PostgreSQL 9.2, session variables are enabled using a custom variable class. Add the following line to the postgresql.conf file of PostgreSQL server on versions before 9.2:

----
custom_variable_classes = 'symmetric'
----

This setting is required, and SymmetricDS will log an error and exit if it cannot set session variables.
This setting is required on versions before 9.2, and SymmetricDS will log an error and exit if it cannot set session variables.
PostgreSQL versions 9.2 or later do not require this setting.

Before database triggers can be created by in PostgreSQL, the plpgsql language handler must be installed on the database.
If plpgsql is not already installed, the following statements can be run by the administrator on the database:
Expand Down
1 change: 1 addition & 0 deletions symmetric-assemble/src/asciidoc/configuration/channels.ad
Expand Up @@ -61,6 +61,7 @@ ifdef::pro[]
.Advanced Options
endif::pro[]

Group Link Direction:: For a node group link that is reversible, the channel can specify either "push" or "pull" to override the default group link communication. If this field is empty, the default group link communication is used.
Enabled:: Indicates whether the channel is enabled or disabled. If a channel is disabled, data is still captured for changes
that occur on the source system, but it will not be routed and sent to the target until the channel is re-enabled.
Reload Channel:: Indicates whether a channel is available for initial loads and reverse initial loads.
Expand Down
Expand Up @@ -35,7 +35,7 @@ ifdef::pro[]
endif::pro[]

Sync Configuration:: Determines if configuration is also sent through this group link. By default this is checked and configuration will communicate on this path. There are configurations that might cause configuration to continuously loop through the network as a result this might need to be unchecked for some links.

Reversible:: Allows the communication link to send in the reverse direction if specified on the channel. A push link can be overridden to pull and a pull link can be overridden to push using a setting on the channel.

.Sample Group Links
====
Expand Down
7 changes: 5 additions & 2 deletions symmetric-assemble/src/asciidoc/installation.ad
Expand Up @@ -263,8 +263,11 @@ bin/sym_service stop

A single SymmetricDS node may be clustered across a series of instances, creating a web farm. A node might be clustered to provide load balancing and failover, for example.

When clustered, a hardware load balancer is typically used
to round robin client requests to the cluster. The load balancer should be configured for stateless connections.
When clustered, a hardware load balancer is typically used.

For clustered nodes running SymmetricDS 3.8 and later the recommended approach is to configure the load balancer to use sticky sessions or ensure the staging directory for all nodes in the cluster are using a shared network drive.

For clustered nodes running SymmetricDS 3.7 and earlier it is recommended to round robin client requests to the cluster and configure the load balancer for stateless connections.

ifndef::pro[]
Also, the `sync.url` (discussed in <<Node Properties File>>) SymmetricDS property should be set to the URL of the load balancer.
Expand Down
2 changes: 1 addition & 1 deletion symmetric-assemble/src/asciidoc/tutorials/tutorials.ad
Expand Up @@ -17,7 +17,7 @@ endif::pro[]



Copyright (C) 2007-2015 JumpMind, Inc
Copyright (C) 2007-2017 JumpMind, Inc

Version @appVersion@

Expand Down
2 changes: 1 addition & 1 deletion symmetric-assemble/src/asciidoc/user-guide.ad
Expand Up @@ -13,7 +13,7 @@ ifndef::pro[]
= SymmetricDS @appMajorVersion@ User Guide
endif::pro[]

Copyright (C) 2007-2015 JumpMind, Inc
Copyright (C) 2007-2017 JumpMind, Inc

Version @appVersion@

Expand Down
Expand Up @@ -68,6 +68,7 @@ public JobManager(ISymmetricEngine engine) {
this.jobs.add(new InitialLoadExtractorJob(engine,taskScheduler));
this.jobs.add(new MonitorJob(engine, taskScheduler));
this.jobs.add(new ReportStatusJob(engine, taskScheduler));
this.jobs.add(new SyncConfigJob(engine, taskScheduler));
}

@Override
Expand Down
@@ -0,0 +1,53 @@
/**
* Licensed to JumpMind Inc under one or more contributor
* license agreements. See the NOTICE file distributed
* with this work for additional information regarding
* copyright ownership. JumpMind Inc licenses this file
* to you under the GNU General Public License, version 3.0 (GPLv3)
* (the "License"); you may not use this file except in compliance
* with the License.
*
* You should have received a copy of the GNU General Public License,
* version 3.0 (GPLv3) along with this library; if not, see
* <http://www.gnu.org/licenses/>.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.jumpmind.symmetric.job;

import org.jumpmind.symmetric.ISymmetricEngine;
import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.service.ClusterConstants;
import org.springframework.scheduling.concurrent.ThreadPoolTaskScheduler;

/*
* Background job that checks to see if config needs to be synced from registration server
*/
public class SyncConfigJob extends AbstractJob {

public SyncConfigJob(ISymmetricEngine engine, ThreadPoolTaskScheduler taskScheduler) {
super("job.sync.config", engine, taskScheduler);
}

@Override
public boolean isAutoStartConfigured() {
return engine.getParameterService().is(ParameterConstants.START_SYNC_CONFIG_JOB);
}

@Override
public String getClusterLockName() {
return ClusterConstants.SYNC_CONFIG;
}

@Override
public void doJob(boolean force) throws Exception {
engine.getPullService().pullConfigData(false);
}

}
Expand Up @@ -152,7 +152,7 @@ public void exportTestDatabaseSQL() throws Exception {
return;
}

final int EXPECTED_VARCHAR_MAX_COUNT = engine.getDatabasePlatform().getName().equals(DatabaseNamesConstants.SQLITE) ? 294 : 50;
final int EXPECTED_VARCHAR_MAX_COUNT = engine.getDatabasePlatform().getName().equals(DatabaseNamesConstants.SQLITE) ? 295 : 50;
final String EXPECTED_VARCHAR_MAX_STRING = "varchar(" + Integer.MAX_VALUE + ")";
final int actualVarcharMaxCount = StringUtils.countMatches(output, EXPECTED_VARCHAR_MAX_STRING);
String msg = String.format("Expected %s, but got %s in the following output %s",
Expand Down
Expand Up @@ -618,6 +618,10 @@ public synchronized boolean start(boolean startJobs) {
parameterService.getSyncUrl())) {
heartbeat(false);
}

if (parameterService.is(ParameterConstants.AUTO_SYNC_CONFIG_AT_STARTUP, true)) {
pullService.pullConfigData(false);
}

} else {
log.info("Starting unregistered node [group={}, externalId={}]",
Expand Down
Expand Up @@ -56,6 +56,7 @@ private ParameterConstants() {
public final static String START_ROUTE_JOB = "start.route.job";
public final static String START_HEARTBEAT_JOB = "start.heartbeat.job";
public final static String START_SYNCTRIGGERS_JOB = "start.synctriggers.job";
public final static String START_SYNC_CONFIG_JOB = "start.sync.config.job";
public final static String START_STATISTIC_FLUSH_JOB = "start.stat.flush.job";
public final static String START_STAGE_MGMT_JOB = "start.stage.management.job";
public final static String START_WATCHDOG_JOB = "start.watchdog.job";
Expand Down Expand Up @@ -116,6 +117,7 @@ private ParameterConstants() {
public final static String AUTO_CONFIGURE_DATABASE = "auto.config.database";
public final static String AUTO_SYNC_TRIGGERS = "auto.sync.triggers";
public final static String AUTO_SYNC_TRIGGERS_AT_STARTUP = "auto.sync.triggers.at.startup";
public final static String AUTO_SYNC_CONFIG_AT_STARTUP = "auto.sync.config.at.startup";
public final static String AUTO_SYNC_TRIGGERS_AFTER_CONFIG_CHANGED = "auto.sync.triggers.after.config.change";
public final static String AUTO_SYNC_TRIGGERS_AFTER_CONFIG_LOADED = "auto.sync.triggers.after.config.loaded";
public final static String AUTO_REFRESH_AFTER_CONFIG_CHANGED = "auto.refresh.after.config.changes.detected";
Expand Down
Expand Up @@ -118,6 +118,9 @@ public String afterUpgrade(ISymmetricDialect symmetricDialect, String tablePrefi
engine.getSqlTemplate().update("update " + tablePrefix + "_" + TableConstants.SYM_CHANNEL +
" set max_batch_size = 10000 where reload_flag = 1 and max_batch_size = 10000");
}

engine.getPullService().pullConfigData(false);

return sb.toString();
}

Expand Down
Expand Up @@ -30,6 +30,7 @@
import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.common.TableConstants;
import org.jumpmind.symmetric.db.AbstractSymmetricDialect;
import org.jumpmind.symmetric.model.Trigger;
import org.jumpmind.symmetric.service.IContextService;
import org.jumpmind.symmetric.service.IParameterService;
import org.jumpmind.symmetric.service.impl.ContextService;
Expand Down Expand Up @@ -116,6 +117,12 @@ public boolean isClobSyncSupported() {
public boolean isTransactionIdOverrideSupported() {
return false;
}

@Override
protected String getDbSpecificDataHasChangedCondition(Trigger trigger) {
/* gets filled/replaced by trigger template as it will compare by each column */
return "$(anyColumnChanged)";
}

@Override
public void truncateTable(String tableName) {
Expand Down
Expand Up @@ -24,9 +24,16 @@

import java.util.HashMap;

import org.jumpmind.db.model.Column;
import org.jumpmind.db.model.Table;
import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.db.AbstractSymmetricDialect;
import org.jumpmind.symmetric.db.AbstractTriggerTemplate;
import org.jumpmind.symmetric.io.data.DataEventType;
import org.jumpmind.symmetric.model.Channel;
import org.jumpmind.symmetric.model.Trigger;
import org.jumpmind.symmetric.model.TriggerHistory;
import org.jumpmind.util.FormatUtils;

public class SqliteTriggerTemplate extends AbstractTriggerTemplate {

Expand Down Expand Up @@ -73,7 +80,7 @@ public SqliteTriggerTemplate(AbstractSymmetricDialect symmetricDialect) {
.put("updateTriggerTemplate",
"create trigger $(triggerName) after update on $(schemaName)$(tableName) \n"
+ "for each row \n"
+ " when ($(syncOnUpdateCondition) and $(syncOnIncomingBatchCondition)) \n"
+ " when ($(syncOnUpdateCondition) and $(syncOnIncomingBatchCondition)) and ($(dataHasChangedCondition)) \n"
+ " begin \n"
+ " insert into $(defaultCatalog)$(prefixName)_data (table_name, event_type, trigger_hist_id, pk_data, row_data, old_data, channel_id, transaction_id, source_node_id, external_data, create_time) \n"
+ " values( \n" + " '$(targetTableName)', \n" + " 'U', \n"
Expand Down Expand Up @@ -103,4 +110,35 @@ public SqliteTriggerTemplate(AbstractSymmetricDialect symmetricDialect) {

// formatter:on
}

@Override
protected String replaceTemplateVariables(DataEventType dml, Trigger trigger,
TriggerHistory history, Channel channel, String tablePrefix, Table originalTable, Table table,
String defaultCatalog, String defaultSchema, String ddl) {
ddl = super.replaceTemplateVariables(dml, trigger, history, channel, tablePrefix, originalTable, table,
defaultCatalog, defaultSchema, ddl);

ddl = FormatUtils.replace("anyColumnChanged",
buildColumnsAreNotEqualString(table, newTriggerValue, oldTriggerValue), ddl);

return ddl;
}

private String buildColumnsAreNotEqualString(Table table, String table1Name, String table2Name){
StringBuilder builder = new StringBuilder();

for(Column column : table.getColumns()){
if (builder.length() > 0) {
builder.append(" or ");
}

builder.append(String.format("((%1$s.\"%2$s\" IS NOT NULL AND %3$s.\"%2$s\" IS NOT NULL AND %1$s.\"%2$s\"<>%3$s.\"%2$s\") or "
+ "(%1$s.\"%2$s\" IS NULL AND %3$s.\"%2$s\" IS NOT NULL) or "
+ "(%1$s.\"%2$s\" IS NOT NULL AND %3$s.\"%2$s\" IS NULL))", table1Name, column.getName(), table2Name));
}
if (builder.length() == 0) {
builder.append("1=1");
}
return builder.toString();
}
}
Expand Up @@ -58,6 +58,8 @@ public class Node implements Serializable, Comparable<Node> {
* sync software.
*/
private String schemaVersion;

private String configVersion;

/**
* Record the type of database the node hosts.
Expand Down Expand Up @@ -104,6 +106,7 @@ public Node(IParameterService parameterService, ISymmetricDialect symmetricDiale
setDatabaseVersion(symmetricDialect.getVersion());
setSyncUrl(parameterService.getSyncUrl());
setSchemaVersion(parameterService.getString(ParameterConstants.SCHEMA_VERSION));
setConfigVersion(Version.version());
}

public Node(String nodeId, String syncURL, String version) {
Expand Down Expand Up @@ -146,6 +149,14 @@ public void setSchemaVersion(String version) {
this.schemaVersion = StringUtils.abbreviate(version, MAX_VERSION_SIZE);
}

public String getConfigVersion() {
return configVersion;
}

public void setConfigVersion(String configVersion) {
this.configVersion = configVersion;
}

public boolean isSyncEnabled() {
return syncEnabled;
}
Expand Down
Expand Up @@ -27,7 +27,7 @@ public class ProcessInfoKey implements Serializable {
private static final long serialVersionUID = 1L;

public enum ProcessType {
ANY, PUSH_JOB, PULL_JOB, PUSH_HANDLER, PULL_HANDLER, REST_PULL_HANLDER, OFFLINE_PUSH, OFFLINE_PULL, ROUTER_JOB, INSERT_LOAD_EVENTS, GAP_DETECT, ROUTER_READER, MANUAL_LOAD, FILE_SYNC_PULL_JOB, FILE_SYNC_PUSH_JOB, FILE_SYNC_PULL_HANDLER, FILE_SYNC_PUSH_HANDLER, FILE_SYNC_TRACKER, INITIAL_LOAD_EXTRACT_JOB, FILE_SYNC_INITIAL_LOAD_EXTRACT_JOB;
ANY, PUSH_JOB, PULL_JOB, PUSH_HANDLER, PULL_HANDLER, REST_PULL_HANLDER, OFFLINE_PUSH, OFFLINE_PULL, ROUTER_JOB, INSERT_LOAD_EVENTS, GAP_DETECT, ROUTER_READER, MANUAL_LOAD, FILE_SYNC_PULL_JOB, FILE_SYNC_PUSH_JOB, FILE_SYNC_PULL_HANDLER, FILE_SYNC_PUSH_HANDLER, FILE_SYNC_TRACKER, INITIAL_LOAD_EXTRACT_JOB, FILE_SYNC_INITIAL_LOAD_EXTRACT_JOB, PULL_CONFIG_JOB;

public String toString() {
switch (this) {
Expand All @@ -39,6 +39,8 @@ public String toString() {
return "Database Push";
case PULL_JOB:
return "Database Pull";
case PULL_CONFIG_JOB:
return "Config Pull";
case PUSH_HANDLER:
return "Service Database Push";
case PULL_HANDLER:
Expand Down
Expand Up @@ -39,7 +39,8 @@ public class ClusterConstants {
public static final String PURGE_DATA_GAPS = "PURGE_DATA_GAPS";
public static final String HEARTBEAT = "HEARTBEAT";
public static final String INITIAL_LOAD_EXTRACT = "INITIAL_LOAD_EXTRACT";
public static final String SYNCTRIGGERS = "SYNCTRIGGERS";
public static final String SYNCTRIGGERS = "SYNCTRIGGERS";
public static final String SYNC_CONFIG = "SYNC_CONFIG";
public static final String WATCHDOG = "WATCHDOG";
public static final String STATISTICS = "STATISTICS";
public static final String FILE_SYNC_TRACKER = "FILE_SYNC_TRACKER";
Expand Down
Expand Up @@ -51,6 +51,8 @@ public interface IDataLoaderService {

public List<IncomingBatch> loadDataFromOfflineTransport(Node remote, RemoteNodeStatus status, IIncomingTransport transport) throws IOException;

public void loadDataFromConfig(Node remote, RemoteNodeStatus status, boolean force) throws IOException;

public List<String> getAvailableDataLoaderFactories();

public List<IncomingBatch> loadDataBatch(String batchData);
Expand Down
Expand Up @@ -22,6 +22,7 @@

import org.jumpmind.symmetric.model.Node;
import org.jumpmind.symmetric.model.NodeGroupLinkAction;
import org.jumpmind.symmetric.model.RemoteNodeStatus;
import org.jumpmind.symmetric.model.RemoteNodeStatuses;

/**
Expand All @@ -32,4 +33,6 @@ public interface IPullService extends IOfflineDetectorService {

public RemoteNodeStatuses pullData(boolean force);

public RemoteNodeStatus pullConfigData(boolean force);

}
Expand Up @@ -44,6 +44,7 @@
import static org.jumpmind.symmetric.service.ClusterConstants.TYPE_EXCLUSIVE;
import static org.jumpmind.symmetric.service.ClusterConstants.TYPE_SHARED;
import static org.jumpmind.symmetric.service.ClusterConstants.WATCHDOG;
import static org.jumpmind.symmetric.service.ClusterConstants.SYNC_CONFIG;

import java.util.Collection;
import java.util.Date;
Expand Down Expand Up @@ -71,7 +72,8 @@ public class ClusterService extends AbstractService implements IClusterService {

private static final String[] actions = new String[] { ROUTE, PULL, PUSH, HEARTBEAT, PURGE_INCOMING, PURGE_OUTGOING,
PURGE_STATISTICS, SYNCTRIGGERS, PURGE_DATA_GAPS, STAGE_MANAGEMENT, WATCHDOG, STATISTICS, FILE_SYNC_PULL,
FILE_SYNC_PUSH, FILE_SYNC_TRACKER, FILE_SYNC_SCAN, INITIAL_LOAD_EXTRACT, OFFLINE_PUSH, OFFLINE_PULL, MONITOR };
FILE_SYNC_PUSH, FILE_SYNC_TRACKER, FILE_SYNC_SCAN, INITIAL_LOAD_EXTRACT, OFFLINE_PUSH, OFFLINE_PULL, MONITOR,
SYNC_CONFIG };

private static final String[] sharedActions = new String[] { FILE_SYNC_SHARED };

Expand Down

0 comments on commit 7066525

Please sign in to comment.