From 66c675a9e090a3ef95ee361b2e0373e423b8c0e7 Mon Sep 17 00:00:00 2001 From: Eric Long Date: Wed, 15 May 2019 08:58:43 -0400 Subject: [PATCH 1/4] 0003958: Add table_name variable to subselect router --- .../java/org/jumpmind/symmetric/route/SubSelectDataRouter.java | 1 + 1 file changed, 1 insertion(+) diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/route/SubSelectDataRouter.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/route/SubSelectDataRouter.java index 5aff1846fe..24a541d792 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/route/SubSelectDataRouter.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/route/SubSelectDataRouter.java @@ -82,6 +82,7 @@ public Set routeToNodes(SimpleRouterContext routingContext, DataMetaData sqlParams.put("NODE_GROUP_ID", dataMetaData.getRouter().getNodeGroupLink() .getTargetNodeGroupId()); sqlParams.put("EXTERNAL_DATA", dataMetaData.getData().getExternalData()); + sqlParams.put("TABLE_NAME", dataMetaData.getData().getTableName()); ISqlTemplate template = symmetricDialect.getPlatform().getSqlTemplate(); List ids = template.query(String.format("%s%s", sql, subSelect), new StringMapper(), sqlParams); From 46ec1e324508948b07395b85749242c6e72b8602 Mon Sep 17 00:00:00 2001 From: Jared Frees_Home Date: Wed, 15 May 2019 11:30:59 -0400 Subject: [PATCH 2/4] Remove debug message from install --- .../main/java/org/jumpmind/symmetric/wrapper/WindowsService.java | 1 - 1 file changed, 1 deletion(-) diff --git a/symmetric-wrapper/src/main/java/org/jumpmind/symmetric/wrapper/WindowsService.java b/symmetric-wrapper/src/main/java/org/jumpmind/symmetric/wrapper/WindowsService.java index 7c65f60696..07f712f72c 100644 --- a/symmetric-wrapper/src/main/java/org/jumpmind/symmetric/wrapper/WindowsService.java +++ b/symmetric-wrapper/src/main/java/org/jumpmind/symmetric/wrapper/WindowsService.java @@ -209,7 +209,6 @@ public void run() { String line = null, curLine = null; boolean isHeaderLine = true; while ((curLine = stdout.readLine()) != null && line == null) { - System.out.println(curLine); if (isHeaderLine) { isHeaderLine = false; } else if (line == null && !curLine.trim().equals("")) { From ea4e3601ffa545221ebf0384e91ef3ee83285661 Mon Sep 17 00:00:00 2001 From: Dave Cramer Date: Wed, 15 May 2019 12:56:01 -0400 Subject: [PATCH 3/4] 0003959: Merged cherry-picked commit from 3.9 to 3.10 --- .../io/SimpleMongoClientManager.java | 41 ++++++-------- .../impl/AbstractParameterService.java | 56 +++++++++---------- .../impl/NodeCommunicationService.java | 28 ++++------ 3 files changed, 55 insertions(+), 70 deletions(-) diff --git a/symmetric-client/src/main/java/org/jumpmind/symmetric/io/SimpleMongoClientManager.java b/symmetric-client/src/main/java/org/jumpmind/symmetric/io/SimpleMongoClientManager.java index 9fedf384fd..36ccee7b83 100644 --- a/symmetric-client/src/main/java/org/jumpmind/symmetric/io/SimpleMongoClientManager.java +++ b/symmetric-client/src/main/java/org/jumpmind/symmetric/io/SimpleMongoClientManager.java @@ -42,7 +42,7 @@ public class SimpleMongoClientManager implements IMongoClientManager { * This is static because the MongoClient is thread safe and wraps a pool of * connections */ - protected static Map clients = new HashMap(); + protected final static Map clients = new HashMap(); protected DB currentDB; @@ -52,36 +52,31 @@ public SimpleMongoClientManager(IParameterService parameterService, String name) } @Override - public MongoClient get() { + public synchronized MongoClient get() { MongoClient client = clients.get(name); if (client == null) { - synchronized (clients) { - if (client == null) { - int port = 27017; - String host = "localhost"; - - if (parameterService != null) { - port = parameterService.getInt(name + MongoConstants.PORT, port); - host = parameterService.getString(name + MongoConstants.HOST, host); - } - String dbUrl = "mongodb://" + host + ":" + port; - if (parameterService != null) { - dbUrl = parameterService.getString(name + MongoConstants.URL, dbUrl); - } - try { - client = new MongoClient(new MongoClientURI(dbUrl)); - clients.put(name, client); - } catch (UnknownHostException e) { - throw new SymmetricException(e); - } - } + int port = 27017; + String host = "localhost"; + if (parameterService != null) { + port = parameterService.getInt(name + MongoConstants.PORT, port); + host = parameterService.getString(name + MongoConstants.HOST, host); + } + String dbUrl = "mongodb://" + host + ":" + port; + if (parameterService != null) { + dbUrl = parameterService.getString(name + MongoConstants.URL, dbUrl); + } + try { + client = new MongoClient(new MongoClientURI(dbUrl)); + clients.put(name, client); + } catch (UnknownHostException e) { + throw new SymmetricException(e); } } return client; } @Override - public DB getDB(String name) { + public synchronized DB getDB(String name) { if (currentDB == null || !currentDB.getName().equals(name)) { currentDB = get().getDB(name); /** diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/AbstractParameterService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/AbstractParameterService.java index 28cb4a00cd..ceaf7f52df 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/AbstractParameterService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/AbstractParameterService.java @@ -151,28 +151,23 @@ public synchronized void rereadParameters() { getParameters(); } - protected TypedProperties getParameters() { + protected synchronized TypedProperties getParameters() { long timeoutTime = System.currentTimeMillis() - cacheTimeoutInMs; - // Quick check if cache is timed out + // see if the parameters have timed out if (parameters == null || (cacheTimeoutInMs > 0 && lastTimeParameterWereCached < timeoutTime)) { - synchronized (this) { - // Synchronized check to prevent calling reread parameters multiple times - if (parameters == null || (cacheTimeoutInMs > 0 && lastTimeParameterWereCached < timeoutTime)) { - try { - parameters = rereadApplicationParameters(); - SymmetricUtils.replaceSystemAndEnvironmentVariables(parameters); - lastTimeParameterWereCached = System.currentTimeMillis(); - cacheTimeoutInMs = getInt(ParameterConstants.PARAMETER_REFRESH_PERIOD_IN_MS); - } catch (SqlException ex) { - if (parameters != null) { - log.warn("Could not read database parameters. We will try again later", ex); - } else { - log.error("Could not read database parameters and they have not yet been initialized"); - throw ex; - } - throw ex; - } + try { + parameters = rereadApplicationParameters(); + SymmetricUtils.replaceSystemAndEnvironmentVariables(parameters); + lastTimeParameterWereCached = System.currentTimeMillis(); + cacheTimeoutInMs = getInt(ParameterConstants.PARAMETER_REFRESH_PERIOD_IN_MS); + } catch (SqlException ex) { + if (parameters != null) { + log.warn("Could not read database parameters. We will try again later", ex); + } else { + log.error("Could not read database parameters and they have not yet been initialized"); + throw ex; } + throw ex; } } return parameters; @@ -187,7 +182,7 @@ public Date getLastTimeParameterWereCached() { } - public String getExternalId() { + public synchronized String getExternalId() { if (externalId==null) { String value = getString(ParameterConstants.EXTERNAL_ID); value = substituteScripts(value); @@ -196,10 +191,10 @@ public String getExternalId() { log.debug("External Id eval results in: {}",externalId); } } - return externalId; + return externalId; } - public String getSyncUrl() { + public synchronized String getSyncUrl() { if (syncUrl==null) { String value = getString(ParameterConstants.SYNC_URL); value = substituteScripts(value); @@ -211,10 +206,10 @@ public String getSyncUrl() { log.debug("Sync URL eval results in: {}",syncUrl); } } - return syncUrl; + return syncUrl; } - public String getNodeGroupId() { + public synchronized String getNodeGroupId() { if (nodeGroupId==null) { String value = getString(ParameterConstants.NODE_GROUP_ID); value = substituteScripts(value); @@ -223,10 +218,10 @@ public String getNodeGroupId() { log.debug("Node Group Id eval results in: {}",nodeGroupId); } } - return nodeGroupId; + return nodeGroupId; } - public String getRegistrationUrl() { + public synchronized String getRegistrationUrl() { if (registrationUrl==null) { String value = getString(ParameterConstants.REGISTRATION_URL); value = substituteScripts(value); @@ -238,10 +233,10 @@ public String getRegistrationUrl() { log.debug("Registration URL eval results in: {}",registrationUrl); } } - return registrationUrl; + return registrationUrl; } - public String getEngineName() { + public synchronized String getEngineName() { if (engineName==null) { String value = getString(ParameterConstants.ENGINE_NAME,"SymmetricDS"); value = substituteScripts(value); @@ -263,7 +258,7 @@ public Map getReplacementValues() { return replacementValues; } - public void setDatabaseHasBeenInitialized(boolean databaseHasBeenInitialized) { + public synchronized void setDatabaseHasBeenInitialized(boolean databaseHasBeenInitialized) { if (this.databaseHasBeenInitialized != databaseHasBeenInitialized) { this.databaseHasBeenInitialized = databaseHasBeenInitialized; this.parameters = null; @@ -272,7 +267,7 @@ public void setDatabaseHasBeenInitialized(boolean databaseHasBeenInitialized) { abstract public TypedProperties getDatabaseParameters(String externalId, String nodeGroupId); - protected TypedProperties rereadDatabaseParameters(Properties p) { + protected synchronized TypedProperties rereadDatabaseParameters(Properties p) { if (databaseHasBeenInitialized) { TypedProperties properties = getDatabaseParameters(ParameterConstants.ALL, ParameterConstants.ALL); @@ -281,7 +276,6 @@ protected TypedProperties rereadDatabaseParameters(Properties p) { properties.putAll(getDatabaseParameters( p.getProperty(ParameterConstants.EXTERNAL_ID), p.getProperty(ParameterConstants.NODE_GROUP_ID))); - databaseHasBeenInitialized = true; return properties; } else { return new TypedProperties(); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/NodeCommunicationService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/NodeCommunicationService.java index 45fade3eff..a2d6edbb42 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/NodeCommunicationService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/NodeCommunicationService.java @@ -96,25 +96,21 @@ public NodeCommunicationService(IClusterService clusterService, INodeService nod } } - private final void initialize() { + private synchronized final void initialize() { if (!initialized) { - synchronized (this) { - if (!initialized) { - if (clusterService.isClusteringEnabled()) { - try { - int locksCleared = sqlTemplate.update(getSql("clearLocksOnRestartSql"), - clusterService.getServerId()); - if (locksCleared > 0) { - log.info("Cleared {} node communication locks for {}", locksCleared, - clusterService.getServerId()); - } - } finally { - initialized = true; - } - } else { - initialized = true; + if (clusterService.isClusteringEnabled()) { + try { + int locksCleared = sqlTemplate.update(getSql("clearLocksOnRestartSql"), + clusterService.getServerId()); + if (locksCleared > 0) { + log.info("Cleared {} node communication locks for {}", locksCleared, + clusterService.getServerId()); } + } finally { + initialized = true; } + } else { + initialized = true; } } } From 9356cf389fbedbf562440f7453475e1349c27351 Mon Sep 17 00:00:00 2001 From: Eric Long Date: Wed, 15 May 2019 16:39:29 -0400 Subject: [PATCH 4/4] 0003958: Add table_name variable to subselect router --- .../configuration/routers/subselect.ad | 22 ++++++++++++++----- 1 file changed, 16 insertions(+), 6 deletions(-) diff --git a/symmetric-assemble/src/asciidoc/configuration/routers/subselect.ad b/symmetric-assemble/src/asciidoc/configuration/routers/subselect.ad index 1534e1d298..32d4c40765 100644 --- a/symmetric-assemble/src/asciidoc/configuration/routers/subselect.ad +++ b/symmetric-assemble/src/asciidoc/configuration/routers/subselect.ad @@ -21,14 +21,24 @@ where c.node_group_id=:NODE_GROUP_ID and c.sync_enabled=1 and ... ---- -As you can see, you have access to information about the node currently under consideration for routing through -the 'c' alias, for example c.external_id . There are two node-related tokens you can use in your expression: +The SQL statement has access to the following variables that are replaced before running: -. :NODE_GROUP_ID -. :EXTERNAL_DATA +.Variables available to the subselect router +[cols=".^2,8"] +|=== + +|:NODE_GROUP_ID|The target node group ID that is configured for the router. + +|:EXTERNAL_DATA|The external data for current row, as configured by sym_trigger.external_select. + +|:TABLE_NAME|The table name for the current row. + +|:COLUMN_NAME|Variables named for each column name (in uppercase), which return the column value for the new row. + +|:OLD_COLUMN_NAME|Variables named for each column name (in uppercase and prefixed with OLD_), which return the column value for the old row. + +|=== -Column names representing data for the row in question are prefixed with a colon as well., for example: :EMPLOYEE_ID, or :OLD_EMPLOYEE_ID. -Here, the OLD_ prefix indicates the value before the change in cases where the old data has been captured. .Sample Use Case for Subselect Router ====