Skip to content

Commit

Permalink
0005489: Backport support snapshot improvements
Browse files Browse the repository at this point in the history
  • Loading branch information
erilong committed Sep 30, 2022
1 parent 47b3f16 commit 03616ea
Show file tree
Hide file tree
Showing 10 changed files with 442 additions and 317 deletions.

Large diffs are not rendered by default.

Expand Up @@ -534,6 +534,9 @@ private ParameterConstants() {
public final static String CLOUD_BULK_LOAD_AZURE_SAS_TOKEN = "cloud.bulk.load.azure.sas.token";

public final static String SNAPSHOT_FILE_INCLUDE_HOSTNAME = "snapshot.file.include.hostname";
public final static String SNAPSHOT_MAX_FILES = "snapshot.max.files";
public final static String SNAPSHOT_MAX_BATCHES = "snapshot.max.batches";
public final static String SNAPSHOT_MAX_NODE_CHANNELS = "snapshot.max.node.channels";

public final static String REDSHIFT_APPEND_TO_COPY_COMMAND = "redshift.append.to.copy.command";
public final static String REDSHIFT_BULK_LOAD_MAX_ROWS_BEFORE_FLUSH = "redshift.bulk.load.max.rows.before.flush";
Expand Down
Expand Up @@ -112,6 +112,8 @@ public interface ITriggerRouterService {
public void saveRouter(Router router);

public List<TriggerRouter> getAllTriggerRoutersForCurrentNode(String sourceNodeGroupId);

public List<TriggerRouter> getTriggerRoutersForTargetNode(String targetNodeGroupId);

/**
* Get a list of all the triggers that have been defined for the system.
Expand Down
Expand Up @@ -982,6 +982,12 @@ public List<TriggerRouter> getAllTriggerRoutersForCurrentNode(String sourceNodeG
return triggerRouters;
}

public List<TriggerRouter> getTriggerRoutersForTargetNode(String targetNodeGroupId) {
List<TriggerRouter> triggerRouters = enhanceTriggerRouters(sqlTemplate.query(getTriggerRouterSql("activeTriggersForTargetNodeGroupSql"),
new TriggerRouterMapper(), targetNodeGroupId));
return triggerRouters;
}

public List<TriggerRouter> getAllTriggerRoutersForReloadForCurrentNode(
String sourceNodeGroupId, String targetNodeGroupId) {
return enhanceTriggerRouters(sqlTemplate.query(
Expand Down
Expand Up @@ -96,6 +96,8 @@ public TriggerRouterServiceSqlMap(IDatabasePlatform platform,

putSql("activeTriggersForSourceNodeGroupSql", "" + "where r.source_node_group_id = ? ");

putSql("activeTriggersForTargetNodeGroupSql", "where r.target_node_group_id = ?");

putSql("activeTriggersForReloadSql", ""
+ "where r.source_node_group_id = ? and "
+ " r.target_node_group_id = ? and t.channel_id != ? and tr.enabled=1 order by "
Expand Down
Expand Up @@ -143,7 +143,7 @@ public static List<LogSummary> getLogSummaries(String engineName, Level level) {
}
}

public File getLogDir() {
public static File getLogDir() {
if (helper != null) {
return helper.getLogDir();
}
Expand Down
Expand Up @@ -41,6 +41,7 @@
import org.jumpmind.symmetric.Version;
import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.db.ISymmetricDialect;
import org.jumpmind.symmetric.model.Node;
import org.jumpmind.util.AppUtils;
import org.jumpmind.util.CollectionUtils;
import org.jumpmind.util.FormatUtils;
Expand Down Expand Up @@ -141,7 +142,21 @@ public static final void replaceSystemAndEnvironmentVariables(Properties propert
}
}
}


public static String replaceNodeVariables(Node sourceNode, Node targetNode, String str) {
if (sourceNode != null) {
str = FormatUtils.replace("sourceNodeId", sourceNode.getNodeId(), str);
str = FormatUtils.replace("sourceExternalId", sourceNode.getExternalId(), str);
str = FormatUtils.replace("sourceNodeGroupId", sourceNode.getNodeGroupId(), str);
}
if (targetNode != null) {
str = FormatUtils.replace("targetNodeId", targetNode.getNodeGroupId(), str);
str = FormatUtils.replace("targetExternalId", targetNode.getExternalId(), str);
str = FormatUtils.replace("targetNodeGroupId", targetNode.getNodeGroupId(), str);
}
return str;
}

public static void logNotices() {
synchronized (SymmetricUtils.class) {
if (isNoticeLogged) {
Expand Down
21 changes: 21 additions & 0 deletions symmetric-core/src/main/resources/symmetric-default.properties
Expand Up @@ -3019,6 +3019,27 @@ cloud.bulk.load.azure.sas.token=
# Type: boolean
snapshot.file.include.hostname=false

# Max number of files to write in directory listing for support snapshot.
#
# DatabaseOverridable: true
# Tags: other
# Type: integer
snapshot.max.files=50000

# Max number of batches to write to statistics listing for support snapshot.
#
# DatabaseOverridable: true
# Tags: other
# Type: integer
snapshot.max.batches=10000

# Max number of nodes and channels for batch statistics, after which it will group by node only.
#
# DatabaseOverridable: true
# Tags: other
# Type: integer
snapshot.max.node.channels=5000

# Log Miner job to find changes from a database archive log
#
# DatabaseOverridable: false
Expand Down
Expand Up @@ -20,6 +20,7 @@
*/
package org.jumpmind.symmetric.io.stage;

import java.io.File;
import java.util.Set;

public interface IStagingManager {
Expand All @@ -36,4 +37,5 @@ public interface IStagingManager {

public StagingFileLock acquireFileLock(String serverInfo, Object... path);

public File getStagingDirectory();
}
Expand Up @@ -319,6 +319,11 @@ public StagingFileLock acquireFileLock(String serverInfo, Object... path) {
return stagingFileLock;
}

@Override
public File getStagingDirectory() {
return directory;
}

protected static final DirectoryStream.Filter<Path> STAGING_FILE_FILTER = new DirectoryStream.Filter<Path>() {
@Override
public boolean accept(Path entry) {
Expand Down

0 comments on commit 03616ea

Please sign in to comment.