Skip to content

Commit

Permalink
Merge branch '3.10' of https://github.com/JumpMind/symmetric-ds.git i…
Browse files Browse the repository at this point in the history
…nto 3.10
  • Loading branch information
jumpmind-josh committed May 16, 2019
2 parents 243b442 + 9356cf3 commit fabd6cc
Show file tree
Hide file tree
Showing 6 changed files with 72 additions and 77 deletions.
22 changes: 16 additions & 6 deletions symmetric-assemble/src/asciidoc/configuration/routers/subselect.ad
Expand Up @@ -21,14 +21,24 @@ where c.node_group_id=:NODE_GROUP_ID
and c.sync_enabled=1 and ...
----

As you can see, you have access to information about the node currently under consideration for routing through
the 'c' alias, for example c.external_id . There are two node-related tokens you can use in your expression:
The SQL statement has access to the following variables that are replaced before running:

. :NODE_GROUP_ID
. :EXTERNAL_DATA
.Variables available to the subselect router
[cols=".^2,8"]
|===

|:NODE_GROUP_ID|The target node group ID that is configured for the router.

|:EXTERNAL_DATA|The external data for current row, as configured by sym_trigger.external_select.

|:TABLE_NAME|The table name for the current row.

|:COLUMN_NAME|Variables named for each column name (in uppercase), which return the column value for the new row.

|:OLD_COLUMN_NAME|Variables named for each column name (in uppercase and prefixed with OLD_), which return the column value for the old row.

|===

Column names representing data for the row in question are prefixed with a colon as well., for example: :EMPLOYEE_ID, or :OLD_EMPLOYEE_ID.
Here, the OLD_ prefix indicates the value before the change in cases where the old data has been captured.

.Sample Use Case for Subselect Router
====
Expand Down
Expand Up @@ -42,7 +42,7 @@ public class SimpleMongoClientManager implements IMongoClientManager {
* This is static because the MongoClient is thread safe and wraps a pool of
* connections
*/
protected static Map<String, MongoClient> clients = new HashMap<String, MongoClient>();
protected final static Map<String, MongoClient> clients = new HashMap<String, MongoClient>();

protected DB currentDB;

Expand All @@ -52,36 +52,31 @@ public SimpleMongoClientManager(IParameterService parameterService, String name)
}

@Override
public MongoClient get() {
public synchronized MongoClient get() {
MongoClient client = clients.get(name);
if (client == null) {
synchronized (clients) {
if (client == null) {
int port = 27017;
String host = "localhost";

if (parameterService != null) {
port = parameterService.getInt(name + MongoConstants.PORT, port);
host = parameterService.getString(name + MongoConstants.HOST, host);
}
String dbUrl = "mongodb://" + host + ":" + port;
if (parameterService != null) {
dbUrl = parameterService.getString(name + MongoConstants.URL, dbUrl);
}
try {
client = new MongoClient(new MongoClientURI(dbUrl));
clients.put(name, client);
} catch (UnknownHostException e) {
throw new SymmetricException(e);
}
}
int port = 27017;
String host = "localhost";
if (parameterService != null) {
port = parameterService.getInt(name + MongoConstants.PORT, port);
host = parameterService.getString(name + MongoConstants.HOST, host);
}
String dbUrl = "mongodb://" + host + ":" + port;
if (parameterService != null) {
dbUrl = parameterService.getString(name + MongoConstants.URL, dbUrl);
}
try {
client = new MongoClient(new MongoClientURI(dbUrl));
clients.put(name, client);
} catch (UnknownHostException e) {
throw new SymmetricException(e);
}
}
return client;
}

@Override
public DB getDB(String name) {
public synchronized DB getDB(String name) {
if (currentDB == null || !currentDB.getName().equals(name)) {
currentDB = get().getDB(name);
/**
Expand Down
Expand Up @@ -82,6 +82,7 @@ public Set<String> routeToNodes(SimpleRouterContext routingContext, DataMetaData
sqlParams.put("NODE_GROUP_ID", dataMetaData.getRouter().getNodeGroupLink()
.getTargetNodeGroupId());
sqlParams.put("EXTERNAL_DATA", dataMetaData.getData().getExternalData());
sqlParams.put("TABLE_NAME", dataMetaData.getData().getTableName());
ISqlTemplate template = symmetricDialect.getPlatform().getSqlTemplate();
List<String> ids = template.query(String.format("%s%s", sql, subSelect),
new StringMapper(), sqlParams);
Expand Down
Expand Up @@ -151,28 +151,23 @@ public synchronized void rereadParameters() {
getParameters();
}

protected TypedProperties getParameters() {
protected synchronized TypedProperties getParameters() {
long timeoutTime = System.currentTimeMillis() - cacheTimeoutInMs;
// Quick check if cache is timed out
// see if the parameters have timed out
if (parameters == null || (cacheTimeoutInMs > 0 && lastTimeParameterWereCached < timeoutTime)) {
synchronized (this) {
// Synchronized check to prevent calling reread parameters multiple times
if (parameters == null || (cacheTimeoutInMs > 0 && lastTimeParameterWereCached < timeoutTime)) {
try {
parameters = rereadApplicationParameters();
SymmetricUtils.replaceSystemAndEnvironmentVariables(parameters);
lastTimeParameterWereCached = System.currentTimeMillis();
cacheTimeoutInMs = getInt(ParameterConstants.PARAMETER_REFRESH_PERIOD_IN_MS);
} catch (SqlException ex) {
if (parameters != null) {
log.warn("Could not read database parameters. We will try again later", ex);
} else {
log.error("Could not read database parameters and they have not yet been initialized");
throw ex;
}
throw ex;
}
try {
parameters = rereadApplicationParameters();
SymmetricUtils.replaceSystemAndEnvironmentVariables(parameters);
lastTimeParameterWereCached = System.currentTimeMillis();
cacheTimeoutInMs = getInt(ParameterConstants.PARAMETER_REFRESH_PERIOD_IN_MS);
} catch (SqlException ex) {
if (parameters != null) {
log.warn("Could not read database parameters. We will try again later", ex);
} else {
log.error("Could not read database parameters and they have not yet been initialized");
throw ex;
}
throw ex;
}
}
return parameters;
Expand All @@ -187,7 +182,7 @@ public Date getLastTimeParameterWereCached() {
}


public String getExternalId() {
public synchronized String getExternalId() {
if (externalId==null) {
String value = getString(ParameterConstants.EXTERNAL_ID);
value = substituteScripts(value);
Expand All @@ -196,10 +191,10 @@ public String getExternalId() {
log.debug("External Id eval results in: {}",externalId);
}
}
return externalId;
return externalId;
}

public String getSyncUrl() {
public synchronized String getSyncUrl() {
if (syncUrl==null) {
String value = getString(ParameterConstants.SYNC_URL);
value = substituteScripts(value);
Expand All @@ -211,10 +206,10 @@ public String getSyncUrl() {
log.debug("Sync URL eval results in: {}",syncUrl);
}
}
return syncUrl;
return syncUrl;
}

public String getNodeGroupId() {
public synchronized String getNodeGroupId() {
if (nodeGroupId==null) {
String value = getString(ParameterConstants.NODE_GROUP_ID);
value = substituteScripts(value);
Expand All @@ -223,10 +218,10 @@ public String getNodeGroupId() {
log.debug("Node Group Id eval results in: {}",nodeGroupId);
}
}
return nodeGroupId;
return nodeGroupId;
}

public String getRegistrationUrl() {
public synchronized String getRegistrationUrl() {
if (registrationUrl==null) {
String value = getString(ParameterConstants.REGISTRATION_URL);
value = substituteScripts(value);
Expand All @@ -238,10 +233,10 @@ public String getRegistrationUrl() {
log.debug("Registration URL eval results in: {}",registrationUrl);
}
}
return registrationUrl;
return registrationUrl;
}

public String getEngineName() {
public synchronized String getEngineName() {
if (engineName==null) {
String value = getString(ParameterConstants.ENGINE_NAME,"SymmetricDS");
value = substituteScripts(value);
Expand All @@ -263,7 +258,7 @@ public Map<String, String> getReplacementValues() {
return replacementValues;
}

public void setDatabaseHasBeenInitialized(boolean databaseHasBeenInitialized) {
public synchronized void setDatabaseHasBeenInitialized(boolean databaseHasBeenInitialized) {
if (this.databaseHasBeenInitialized != databaseHasBeenInitialized) {
this.databaseHasBeenInitialized = databaseHasBeenInitialized;
this.parameters = null;
Expand All @@ -272,7 +267,7 @@ public void setDatabaseHasBeenInitialized(boolean databaseHasBeenInitialized) {

abstract public TypedProperties getDatabaseParameters(String externalId, String nodeGroupId);

protected TypedProperties rereadDatabaseParameters(Properties p) {
protected synchronized TypedProperties rereadDatabaseParameters(Properties p) {
if (databaseHasBeenInitialized) {
TypedProperties properties = getDatabaseParameters(ParameterConstants.ALL,
ParameterConstants.ALL);
Expand All @@ -281,7 +276,6 @@ protected TypedProperties rereadDatabaseParameters(Properties p) {
properties.putAll(getDatabaseParameters(
p.getProperty(ParameterConstants.EXTERNAL_ID),
p.getProperty(ParameterConstants.NODE_GROUP_ID)));
databaseHasBeenInitialized = true;
return properties;
} else {
return new TypedProperties();
Expand Down
Expand Up @@ -96,25 +96,21 @@ public NodeCommunicationService(IClusterService clusterService, INodeService nod
}
}

private final void initialize() {
private synchronized final void initialize() {
if (!initialized) {
synchronized (this) {
if (!initialized) {
if (clusterService.isClusteringEnabled()) {
try {
int locksCleared = sqlTemplate.update(getSql("clearLocksOnRestartSql"),
clusterService.getServerId());
if (locksCleared > 0) {
log.info("Cleared {} node communication locks for {}", locksCleared,
clusterService.getServerId());
}
} finally {
initialized = true;
}
} else {
initialized = true;
if (clusterService.isClusteringEnabled()) {
try {
int locksCleared = sqlTemplate.update(getSql("clearLocksOnRestartSql"),
clusterService.getServerId());
if (locksCleared > 0) {
log.info("Cleared {} node communication locks for {}", locksCleared,
clusterService.getServerId());
}
} finally {
initialized = true;
}
} else {
initialized = true;
}
}
}
Expand Down
Expand Up @@ -209,7 +209,6 @@ public void run() {
String line = null, curLine = null;
boolean isHeaderLine = true;
while ((curLine = stdout.readLine()) != null && line == null) {
System.out.println(curLine);
if (isHeaderLine) {
isHeaderLine = false;
} else if (line == null && !curLine.trim().equals("")) {
Expand Down

0 comments on commit fabd6cc

Please sign in to comment.