Skip to content

Commit

Permalink
Browse files Browse the repository at this point in the history
0005198: Clean up the consistent use of table constants for
registration, trigger installation, extract, and export
  • Loading branch information
erilong committed Jan 19, 2022
1 parent 9df1c51 commit 834694c
Show file tree
Hide file tree
Showing 13 changed files with 93 additions and 197 deletions.
Expand Up @@ -124,12 +124,7 @@ public static File createSnapshot(ISymmetricEngine engine, IProgressListener lis
log.info("Creating snapshot file in " + tmpDir.getAbsolutePath());
try (FileWriter fwriter = new FileWriter(new File(tmpDir, "config-export.csv"))) {
engine.getDataExtractorService().extractConfigurationStandalone(engine.getNodeService().findIdentity(),
fwriter, TableConstants.SYM_NODE, TableConstants.SYM_NODE_SECURITY,
TableConstants.SYM_NODE_IDENTITY, TableConstants.SYM_NODE_HOST,
TableConstants.SYM_NODE_CHANNEL_CTL, TableConstants.SYM_CONSOLE_ROLE,
TableConstants.SYM_CONSOLE_USER, TableConstants.SYM_CONSOLE_ROLE_PRIVILEGE,
TableConstants.SYM_MONITOR_EVENT, TableConstants.SYM_CONSOLE_EVENT,
TableConstants.SYM_CONSOLE_USER_HIST);
fwriter, TableConstants.getConfigTablesExcludedFromExport());
} catch (Exception e) {
log.warn("Failed to export symmetric configuration", e);
}
Expand Down
Expand Up @@ -21,7 +21,10 @@
package org.jumpmind.symmetric.common;

import java.util.ArrayList;
import java.util.Collection;
import java.util.HashSet;
import java.util.List;
import java.util.Set;

import org.apache.commons.lang3.StringUtils;

Expand Down Expand Up @@ -86,152 +89,92 @@ public class TableConstants {
public static final String SYM_CONTEXT = "context";
public static final String SYM_JOB = "job";
public static final String SYM_TABLE_RELOAD_STATUS = "table_reload_status";
private static volatile List<String> tablesWithPrefix;
private static volatile List<String> configTablesWithPrefix;
private static volatile List<String> tablesWithoutPrefix;
protected static boolean hasConsoleSchema = TableConstants.class.getResourceAsStream("/console-schema.xml") != null;

public static final List<String> getTables(String tablePrefix) {
if (tablesWithPrefix == null) {
synchronized (TableConstants.class) {
if (tablesWithPrefix == null) {
tablesWithPrefix = populateAllTables(tablePrefix);
}
}
/**
* Set of all SymmetricDS configuration and runtime tables.
*/
public static final Set<String> getTables(String tablePrefix) {
Set<String> tables = new HashSet<String>();
addPrefixToTableNames(tables, tablePrefix, SYM_CHANNEL, SYM_CONFLICT, SYM_CONTEXT, SYM_DATA, SYM_DATA_GAP, SYM_DATA_EVENT, SYM_EXTRACT_REQUEST,
SYM_EXTENSION, SYM_FILE_INCOMING, SYM_FILE_SNAPSHOT, SYM_FILE_TRIGGER, SYM_FILE_TRIGGER_ROUTER, SYM_GROUPLET, SYM_GROUPLET_LINK,
SYM_INCOMING_BATCH, SYM_INCOMING_ERROR, SYM_JOB, SYM_LOAD_FILTER, SYM_LOCK, SYM_MONITOR, SYM_MONITOR_EVENT, SYM_NODE, SYM_NODE_CHANNEL_CTL,
SYM_NODE_COMMUNICATION, SYM_NODE_GROUP, SYM_NODE_GROUP_CHANNEL_WND, SYM_NODE_GROUP_LINK, SYM_NODE_HOST, SYM_NODE_HOST_CHANNEL_STATS,
SYM_NODE_HOST_JOB_STATS, SYM_NODE_HOST_STATS, SYM_NODE_IDENTITY, SYM_NODE_SECURITY, SYM_NOTIFICATION, SYM_OUTGOING_BATCH, SYM_PARAMETER,
SYM_REGISTRATION_REDIRECT, SYM_REGISTRATION_REQUEST, SYM_ROUTER, SYM_SEQUENCE, SYM_TABLE_RELOAD_REQUEST, SYM_TABLE_RELOAD_STATUS,
SYM_TRANSFORM_TABLE, SYM_TRANSFORM_COLUMN, SYM_TRIGGER, SYM_TRIGGER_HIST, SYM_TRIGGER_ROUTER, SYM_TRIGGER_ROUTER_GROUPLET);
if (hasConsoleSchema) {
addPrefixToTableNames(tables, tablePrefix, SYM_CONSOLE_EVENT, SYM_CONSOLE_USER, SYM_CONSOLE_USER_HIST, SYM_CONSOLE_ROLE,
SYM_CONSOLE_ROLE_PRIVILEGE, SYM_CONSOLE_TABLE_STATS, SYM_DESIGN_DIAGRAM, SYM_DIAGRAM_GROUP);
}
return new ArrayList<String>(tablesWithPrefix);
return tables;
}

public static final List<String> getConfigTables(String tablePrefix) {
if (configTablesWithPrefix == null) {
synchronized (TableConstants.class) {
if (configTablesWithPrefix == null) {
configTablesWithPrefix = populateConfigTables(tablePrefix);
}
}
}
return new ArrayList<String>(configTablesWithPrefix);
public static final Set<String> getTablesWithoutPrefix() {
return getTables("");
}

public static final List<String> getTablesWithoutPrefix() {
if (tablesWithoutPrefix == null) {
synchronized (TableConstants.class) {
if (tablesWithoutPrefix == null) {
tablesWithoutPrefix = populateAllTables("");
}
}
/**
* Tables sent in the configuration batch for registration, with the order of tables specified by this list. Each table gets a trigger history record, and
* it also gets a trigger installed for change capture unless it is excluded (see getConfigTablesWithoutCapture() method). The tables are also included in a
* configuration export unless the table is returned by getConfigTablesExcludedFromExport().
*/
public static final List<String> getConfigTables(String tablePrefix) {
List<String> tables = new ArrayList<String>();
addPrefixToTableNames(tables, tablePrefix, SYM_NODE_GROUP, SYM_NODE_GROUP_LINK, SYM_NODE, SYM_NODE_HOST, SYM_NODE_SECURITY, SYM_PARAMETER,
SYM_CHANNEL, SYM_NODE_GROUP_CHANNEL_WND, SYM_TRIGGER, SYM_ROUTER, SYM_TRIGGER_ROUTER, SYM_TRANSFORM_TABLE, SYM_LOAD_FILTER,
SYM_TRANSFORM_COLUMN, SYM_CONFLICT, SYM_TABLE_RELOAD_REQUEST, SYM_GROUPLET, SYM_GROUPLET_LINK, SYM_TRIGGER_ROUTER_GROUPLET, SYM_FILE_TRIGGER,
SYM_FILE_TRIGGER_ROUTER, SYM_FILE_SNAPSHOT, SYM_NODE_IDENTITY, SYM_EXTENSION, SYM_MONITOR, SYM_MONITOR_EVENT, SYM_NOTIFICATION, SYM_JOB);
if (hasConsoleSchema) {
addPrefixToTableNames(tables, tablePrefix, SYM_CONSOLE_ROLE, SYM_CONSOLE_USER, SYM_CONSOLE_ROLE_PRIVILEGE, SYM_CONSOLE_USER_HIST,
SYM_DESIGN_DIAGRAM, SYM_DIAGRAM_GROUP);
}
return tablesWithoutPrefix;
return tables;
}

protected static List<String> populateConfigTables(String tablePrefix) {
List<String> configTables = new ArrayList<String>();
configTables.add(getTableName(tablePrefix, TableConstants.SYM_NODE_GROUP));
configTables.add(getTableName(tablePrefix, TableConstants.SYM_NODE_GROUP_LINK));
configTables.add(getTableName(tablePrefix, TableConstants.SYM_NODE));
configTables.add(getTableName(tablePrefix, TableConstants.SYM_NODE_HOST));
configTables.add(getTableName(tablePrefix, TableConstants.SYM_NODE_SECURITY));
configTables.add(getTableName(tablePrefix, TableConstants.SYM_PARAMETER));
configTables.add(getTableName(tablePrefix, TableConstants.SYM_CHANNEL));
configTables.add(getTableName(tablePrefix, TableConstants.SYM_NODE_GROUP_CHANNEL_WND));
configTables.add(getTableName(tablePrefix, TableConstants.SYM_TRIGGER));
configTables.add(getTableName(tablePrefix, TableConstants.SYM_ROUTER));
configTables.add(getTableName(tablePrefix, TableConstants.SYM_TRIGGER_ROUTER));
configTables.add(getTableName(tablePrefix, TableConstants.SYM_TRANSFORM_TABLE));
configTables.add(getTableName(tablePrefix, TableConstants.SYM_LOAD_FILTER));
configTables.add(getTableName(tablePrefix, TableConstants.SYM_TRANSFORM_COLUMN));
configTables.add(getTableName(tablePrefix, TableConstants.SYM_CONFLICT));
configTables.add(getTableName(tablePrefix, TableConstants.SYM_TABLE_RELOAD_REQUEST));
configTables.add(getTableName(tablePrefix, TableConstants.SYM_GROUPLET));
configTables.add(getTableName(tablePrefix, TableConstants.SYM_GROUPLET_LINK));
configTables.add(getTableName(tablePrefix, TableConstants.SYM_TRIGGER_ROUTER_GROUPLET));
configTables.add(getTableName(tablePrefix, TableConstants.SYM_FILE_TRIGGER));
configTables.add(getTableName(tablePrefix, TableConstants.SYM_FILE_TRIGGER_ROUTER));
configTables.add(getTableName(tablePrefix, TableConstants.SYM_FILE_SNAPSHOT));
configTables.add(getTableName(tablePrefix, TableConstants.SYM_NODE_IDENTITY));
configTables.add(getTableName(tablePrefix, TableConstants.SYM_EXTENSION));
configTables.add(getTableName(tablePrefix, TableConstants.SYM_MONITOR));
configTables.add(getTableName(tablePrefix, TableConstants.SYM_MONITOR_EVENT));
configTables.add(getTableName(tablePrefix, TableConstants.SYM_NOTIFICATION));
configTables.add(getTableName(tablePrefix, TableConstants.SYM_JOB));
return configTables;
/**
* Which tables from getConfigTables() should not be sent during registration. These tables will still have a trigger installed for capturing and sending
* changes.
*/
public static final String[] getConfigTablesExcludedFromRegistration() {
return new String[] { SYM_MONITOR_EVENT };
}

protected static List<String> populateAllTables(String tablePrefix) {
List<String> tables = new ArrayList<String>();
tables.add(getTableName(tablePrefix, SYM_TRIGGER));
tables.add(getTableName(tablePrefix, SYM_TRIGGER_ROUTER));
tables.add(getTableName(tablePrefix, SYM_ROUTER));
tables.add(getTableName(tablePrefix, SYM_TRANSFORM_TABLE));
tables.add(getTableName(tablePrefix, SYM_LOAD_FILTER));
tables.add(getTableName(tablePrefix, SYM_TRANSFORM_COLUMN));
tables.add(getTableName(tablePrefix, SYM_TRIGGER_HIST));
tables.add(getTableName(tablePrefix, SYM_CHANNEL));
tables.add(getTableName(tablePrefix, SYM_NODE_GROUP));
tables.add(getTableName(tablePrefix, SYM_NODE_GROUP_LINK));
tables.add(getTableName(tablePrefix, SYM_NODE));
tables.add(getTableName(tablePrefix, SYM_NODE_HOST));
tables.add(getTableName(tablePrefix, SYM_NODE_SECURITY));
tables.add(getTableName(tablePrefix, SYM_NODE_IDENTITY));
tables.add(getTableName(tablePrefix, SYM_NODE_SECURITY));
tables.add(getTableName(tablePrefix, SYM_NODE_CHANNEL_CTL));
tables.add(getTableName(tablePrefix, SYM_NODE_GROUP_CHANNEL_WND));
tables.add(getTableName(tablePrefix, SYM_PARAMETER));
tables.add(getTableName(tablePrefix, SYM_NODE_HOST_CHANNEL_STATS));
tables.add(getTableName(tablePrefix, SYM_NODE_HOST_STATS));
tables.add(getTableName(tablePrefix, SYM_NODE_HOST_JOB_STATS));
tables.add(getTableName(tablePrefix, SYM_REGISTRATION_REDIRECT));
tables.add(getTableName(tablePrefix, SYM_REGISTRATION_REQUEST));
tables.add(getTableName(tablePrefix, SYM_DATA));
tables.add(getTableName(tablePrefix, SYM_DATA_GAP));
tables.add(getTableName(tablePrefix, SYM_DATA_EVENT));
tables.add(getTableName(tablePrefix, SYM_OUTGOING_BATCH));
tables.add(getTableName(tablePrefix, SYM_INCOMING_BATCH));
tables.add(getTableName(tablePrefix, SYM_EXTRACT_REQUEST));
tables.add(getTableName(tablePrefix, SYM_LOCK));
tables.add(getTableName(tablePrefix, SYM_CONFLICT));
tables.add(getTableName(tablePrefix, SYM_INCOMING_ERROR));
tables.add(getTableName(tablePrefix, SYM_SEQUENCE));
tables.add(getTableName(tablePrefix, SYM_NODE_COMMUNICATION));
tables.add(getTableName(tablePrefix, SYM_TABLE_RELOAD_REQUEST));
tables.add(getTableName(tablePrefix, SYM_GROUPLET));
tables.add(getTableName(tablePrefix, SYM_GROUPLET_LINK));
tables.add(getTableName(tablePrefix, SYM_TRIGGER_ROUTER_GROUPLET));
tables.add(getTableName(tablePrefix, TableConstants.SYM_FILE_TRIGGER));
tables.add(getTableName(tablePrefix, TableConstants.SYM_FILE_TRIGGER_ROUTER));
tables.add(getTableName(tablePrefix, TableConstants.SYM_FILE_SNAPSHOT));
tables.add(getTableName(tablePrefix, TableConstants.SYM_FILE_INCOMING));
tables.add(getTableName(tablePrefix, SYM_CONSOLE_USER));
tables.add(getTableName(tablePrefix, SYM_CONSOLE_ROLE));
tables.add(getTableName(tablePrefix, SYM_CONSOLE_ROLE_PRIVILEGE));
tables.add(getTableName(tablePrefix, SYM_CONSOLE_USER_HIST));
tables.add(getTableName(tablePrefix, SYM_CONSOLE_EVENT));
tables.add(getTableName(tablePrefix, SYM_CONSOLE_TABLE_STATS));
tables.add(getTableName(tablePrefix, SYM_EXTENSION));
tables.add(getTableName(tablePrefix, SYM_MONITOR));
tables.add(getTableName(tablePrefix, SYM_MONITOR_EVENT));
tables.add(getTableName(tablePrefix, SYM_NOTIFICATION));
tables.add(getTableName(tablePrefix, SYM_CONTEXT));
tables.add(getTableName(tablePrefix, SYM_JOB));
tables.add(getTableName(tablePrefix, SYM_TABLE_RELOAD_STATUS));
/**
* Which tables from getConfigTables() should not have a trigger installed for capturing changes. In other words, these are tables that are sent only during
* registration, and they won't receive changes.
*/
public static final Set<String> getConfigTablesWithoutCapture(String tablePrefix) {
Set<String> tables = new HashSet<String>();
addPrefixToTableNames(tables, tablePrefix, SYM_NODE_IDENTITY);
return tables;
}

public static final List<String> getTablesThatSync(String tablePrefix) {
List<String> tables = new ArrayList<String>(getConfigTables(tablePrefix));
tables.removeAll(getTablesThatDoNotSync(tablePrefix));
return tables;
/**
* Which tables from getConfigTables() should be excluded from a configuration export.
*/
public static final String[] getConfigTablesExcludedFromExport() {
return new String[] { SYM_NODE, SYM_NODE_SECURITY, SYM_NODE_IDENTITY, SYM_NODE_HOST, SYM_NODE_CHANNEL_CTL, SYM_FILE_SNAPSHOT, SYM_CONSOLE_USER,
SYM_CONSOLE_ROLE, SYM_CONSOLE_ROLE_PRIVILEGE, SYM_CONSOLE_USER_HIST, SYM_TABLE_RELOAD_REQUEST, SYM_CONSOLE_EVENT, SYM_MONITOR_EVENT };
}

public static final List<String> getTablesThatDoNotSync(String tablePrefix) {
List<String> tables = new ArrayList<String>(2);
tables.add(getTableName(tablePrefix, SYM_NODE_IDENTITY));
tables.add(getTableName(tablePrefix, SYM_NODE_CHANNEL_CTL));
tables.add(getTableName(tablePrefix, SYM_CONSOLE_EVENT));
tables.add(getTableName(tablePrefix, SYM_TABLE_RELOAD_STATUS));
tables.add(getTableName(tablePrefix, SYM_CONSOLE_TABLE_STATS));
/**
* List of configuration tables that should be used during a configuration export.
*/
public static final List<String> getConfigTablesForExport(String tablePrefix) {
List<String> tables = getConfigTables(tablePrefix);
for (String table : getConfigTablesExcludedFromExport()) {
tables.remove(getTableName(tablePrefix, table));
}
return tables;
}

protected static final void addPrefixToTableNames(Collection<String> collection, String tablePrefix, String... names) {
for (String name : names) {
collection.add(getTableName(tablePrefix, name));
}
}

public static String getTableName(String tablePrefix, String tableSuffix) {
return String.format("%s%s%s", tablePrefix, StringUtils.isNotBlank(tablePrefix) ? "_" : "", tableSuffix);
}
Expand Down
Expand Up @@ -51,8 +51,8 @@
import org.jumpmind.symmetric.Version;
import org.jumpmind.symmetric.common.Constants;
import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.ext.IDatabaseUpgradeListener;
import org.jumpmind.symmetric.ext.IDatabaseInstallStatementListener;
import org.jumpmind.symmetric.ext.IDatabaseUpgradeListener;
import org.jumpmind.symmetric.io.data.DataEventType;
import org.jumpmind.symmetric.model.Channel;
import org.jumpmind.symmetric.model.Node;
Expand Down Expand Up @@ -429,15 +429,6 @@ protected void prefixConfigDatabase(Database targetTables) {
platform.prefixDatabase(parameterService.getTablePrefix(), targetTables);
}

public Table getTable(TriggerHistory triggerHistory, boolean useCache) {
if (triggerHistory != null) {
return platform.getTableFromCache(triggerHistory.getSourceCatalogName(), triggerHistory.getSourceSchemaName(),
triggerHistory.getSourceTableName(), !useCache);
} else {
return null;
}
}

/*
* @return true if SQL was executed.
*/
Expand Down
Expand Up @@ -175,8 +175,6 @@ public List<String> createPurgeSqlForMultipleTables(Node node, TriggerRouter tri
*/
public boolean isTransactionIdOverrideSupported();

public Table getTable(TriggerHistory triggerHistory, boolean useCache);

public long insertWithGeneratedKey(final String sql, final SequenceIdentifier sequenceId);

public long insertWithGeneratedKey(final String sql, final SequenceIdentifier identifier, Object... args);
Expand Down
Expand Up @@ -307,7 +307,7 @@ protected List<DbCompareTables> getTablesToCompare() {

protected List<DbCompareTables> loadTablesFromConfig() {
List<Trigger> triggers = sourceEngine.getTriggerRouterService().getTriggersForCurrentNode(true);
List<String> configTables = TableConstants.getTables(sourceEngine.getTablePrefix());
Set<String> configTables = TableConstants.getTables(sourceEngine.getTablePrefix());
List<String> tableNames = new ArrayList<String>();
for (Trigger trigger : triggers) {
if (!configTables.contains(trigger.getFullyQualifiedSourceTableName()) &&
Expand Down
Expand Up @@ -20,7 +20,6 @@
*/
package org.jumpmind.symmetric.service;

import java.io.OutputStream;
import java.io.Writer;
import java.util.Date;
import java.util.List;
Expand All @@ -40,12 +39,8 @@
* This service provides an API to extract and stream data from a source database.
*/
public interface IDataExtractorService {
public void extractConfigurationStandalone(Node node, OutputStream out);

public void extractConfigurationStandalone(Node node, Writer out, String... tablesToIgnore);

public void extractConfigurationOnly(Node node, OutputStream out);

public List<OutgoingBatchWithPayload> extractToPayload(ProcessInfo processInfo, Node targetNode, PayloadType payloadType, boolean useJdbcTimestampFormat,
boolean useUpsertStatements, boolean useDelimiterIdentifiers);

Expand Down
Expand Up @@ -245,8 +245,6 @@ public String getTriggerName(DataEventType dml, int maxTriggerNameLength, Trigge

public void syncTriggers(StringBuilder sqlBuffer, boolean genAlways);

public void addExtraConfigTable(String table);

public Map<Trigger, Exception> getFailedTriggers();

public Map<Integer, List<TriggerRouter>> fillTriggerRoutersByHistIdAndSortHist(
Expand Down
Expand Up @@ -21,8 +21,8 @@
package org.jumpmind.symmetric.service.impl;

import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.commons.lang3.StringUtils;
import org.jumpmind.db.platform.IDatabasePlatform;
Expand Down Expand Up @@ -75,7 +75,7 @@ public String getSql(String... keys) {

public static Map<String, String> mergeSqlReplacementTokens(Map<String, String> replacementTokens, String tablePrefix) {
Map<String, String> map = new HashMap<String, String>();
List<String> tables = TableConstants.getTablesWithoutPrefix();
Set<String> tables = TableConstants.getTablesWithoutPrefix();
for (String table : tables) {
map.put(table, String.format("%s%s%s", tablePrefix, StringUtils.isNotBlank(tablePrefix) ? "_" : "", table));
}
Expand Down

0 comments on commit 834694c

Please sign in to comment.