From 95219cb1337b08ac0d811b2f258727ae3759bc87 Mon Sep 17 00:00:00 2001 From: "Hicks, Josh" Date: Fri, 12 Jul 2019 08:57:20 -0400 Subject: [PATCH] 0004041: Extract only nodes --- .../android/AndroidSymmetricEngine.java | 5 +++ .../symmetric/ClientSymmetricEngine.java | 38 +++++++++++++++++++ .../symmetric/AbstractSymmetricEngine.java | 17 ++++++--- .../jumpmind/symmetric/ISymmetricEngine.java | 2 + .../jumpmind/symmetric/common/Constants.java | 2 + .../symmetric/common/ParameterConstants.java | 4 ++ .../symmetric/job/PushHeartbeatListener.java | 5 ++- .../org/jumpmind/symmetric/model/Node.java | 9 ++++- .../service/impl/AbstractService.java | 36 ++++++++++++++++++ .../service/impl/DataExtractorService.java | 28 +++++++------- .../symmetric/service/impl/DataService.java | 12 +++--- .../impl/FileSyncExtractorService.java | 5 ++- .../service/impl/TriggerRouterService.java | 20 ++++++---- .../symmetric/web/SymmetricEngineHolder.java | 24 ++++++++++++ 14 files changed, 172 insertions(+), 35 deletions(-) diff --git a/symmetric-android/src/main/java/org/jumpmind/symmetric/android/AndroidSymmetricEngine.java b/symmetric-android/src/main/java/org/jumpmind/symmetric/android/AndroidSymmetricEngine.java index 8367599505..63b49144e2 100644 --- a/symmetric-android/src/main/java/org/jumpmind/symmetric/android/AndroidSymmetricEngine.java +++ b/symmetric-android/src/main/java/org/jumpmind/symmetric/android/AndroidSymmetricEngine.java @@ -233,4 +233,9 @@ public IMonitorService getMonitorService() { public ISymmetricDialect getSymmetricDialect() { return this.symmetricDialect; } + + @Override + protected ISymmetricDialect checkExtractOnly() { + return getSymmetricDialect(); + } } diff --git a/symmetric-client/src/main/java/org/jumpmind/symmetric/ClientSymmetricEngine.java b/symmetric-client/src/main/java/org/jumpmind/symmetric/ClientSymmetricEngine.java index 7d9b4a87f7..bfd4c8139c 100644 --- a/symmetric-client/src/main/java/org/jumpmind/symmetric/ClientSymmetricEngine.java +++ b/symmetric-client/src/main/java/org/jumpmind/symmetric/ClientSymmetricEngine.java @@ -496,6 +496,36 @@ public int compare(File o1, File o2) { return files; } + protected ISymmetricDialect checkExtractOnly() { + if (parameterService.is(ParameterConstants.NODE_EXTRACT_ONLY, false)) { + TypedProperties properties = new TypedProperties(); + for (String prop : BasicDataSourcePropertyConstants.allProps ) { + properties.put(prop, parameterService.getString(ParameterConstants.EXTRACT_ONLY_PROPERTY_PREFIX + prop)); + } + + String[] sqlTemplateProperties = new String[] { + ParameterConstants.DB_FETCH_SIZE, + ParameterConstants.DB_QUERY_TIMEOUT_SECS, + ParameterConstants.JDBC_EXECUTE_BATCH_SIZE, + ParameterConstants.JDBC_ISOLATION_LEVEL, + ParameterConstants.JDBC_READ_STRINGS_AS_BYTES, + ParameterConstants.TREAT_BINARY_AS_LOB_ENABLED, + ParameterConstants.LOG_SLOW_SQL_THRESHOLD_MILLIS, + ParameterConstants.LOG_SQL_PARAMETERS_INLINE + }; + for (String prop : sqlTemplateProperties) { + properties.put(prop, parameterService.getString(ParameterConstants.EXTRACT_ONLY_PROPERTY_PREFIX + prop)); + } + + IDatabasePlatform extractPlatform = createDatabasePlatform(null, properties, null, true, true); + JdbcSymmetricDialectFactory dialectFactory = new JdbcSymmetricDialectFactory(parameterService, extractPlatform); + return dialectFactory.create(); + } + else { + return getSymmetricDialect(); + } + } + protected void checkLoadOnly() { if (parameterService.is(ParameterConstants.NODE_LOAD_ONLY, false)) { @@ -571,4 +601,12 @@ public IMonitorService getMonitorService() { public ISymmetricDialect getSymmetricDialect() { return this.symmetricDialect; } + + @Override + public ISymmetricDialect getExtractSymmetricDialect() { + return this.extractSymmetricDialect; + } + + + } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/AbstractSymmetricEngine.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/AbstractSymmetricEngine.java index ffbcc1e6aa..33fd5fe8c3 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/AbstractSymmetricEngine.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/AbstractSymmetricEngine.java @@ -169,6 +169,8 @@ abstract public class AbstractSymmetricEngine implements ISymmetricEngine { protected ISymmetricDialect symmetricDialect; + protected ISymmetricDialect extractSymmetricDialect; + protected INodeService nodeService; protected IConfigurationService configurationService; @@ -247,6 +249,8 @@ abstract public class AbstractSymmetricEngine implements ISymmetricEngine { abstract protected IDatabasePlatform createDatabasePlatform(TypedProperties properties); + abstract protected ISymmetricDialect checkExtractOnly(); + protected boolean registerEngine = true; protected AbstractSymmetricEngine(boolean registerEngine) { @@ -336,11 +340,14 @@ protected void init() { this.platform.setClearCacheModelTimeoutInMs(parameterService .getLong(ParameterConstants.CACHE_TIMEOUT_TABLES_IN_MS)); - + this.symmetricDialect = createSymmetricDialect(); + this.extractSymmetricDialect = checkExtractOnly(); + this.extensionService = createExtensionService(); this.extensionService.refresh(); this.symmetricDialect.setExtensionService(extensionService); + this.extractSymmetricDialect.setExtensionService(extensionService); this.parameterService.setExtensionService(extensionService); this.bandwidthService = new BandwidthService(parameterService); @@ -349,7 +356,7 @@ protected void init() { this.nodeService = new NodeService(this); this.configurationService = new ConfigurationService(parameterService, symmetricDialect, nodeService); - this.dataService = new DataService(this, extensionService); + this.dataService = new DataService(this, extensionService, extractSymmetricDialect); this.clusterService = createClusterService(); this.statisticService = new StatisticService(parameterService, symmetricDialect); this.statisticManager = createStatisticManager(); @@ -362,13 +369,13 @@ protected void init() { this.loadFilterService = new LoadFilterService(parameterService, symmetricDialect, configurationService); this.groupletService = new GroupletService(this); - this.triggerRouterService = new TriggerRouterService(this); + this.triggerRouterService = new TriggerRouterService(this, extractSymmetricDialect); this.outgoingBatchService = new OutgoingBatchService(parameterService, symmetricDialect, nodeService, configurationService, sequenceService, clusterService, extensionService); this.routerService = buildRouterService(); this.nodeCommunicationService = buildNodeCommunicationService(clusterService, nodeService, parameterService, configurationService, symmetricDialect); this.incomingBatchService = new IncomingBatchService(parameterService, symmetricDialect, clusterService); - this.dataExtractorService = new DataExtractorService(this); + this.dataExtractorService = new DataExtractorService(this, extractSymmetricDialect); this.transportManager = new TransportManagerFactory(this).create(); this.offlineTransportManager = new TransportManagerFactory(this).create(Constants.PROTOCOL_FILE); this.dataLoaderService = new DataLoaderService(this); @@ -387,7 +394,7 @@ protected void init() { nodeService, dataLoaderService, clusterService, nodeCommunicationService, configurationService, extensionService, offlineTransportManager); this.fileSyncService = buildFileSyncService(); - this.fileSyncExtractorService = new FileSyncExtractorService(this); + this.fileSyncExtractorService = new FileSyncExtractorService(this, extractSymmetricDialect); this.mailService = new MailService(parameterService, symmetricDialect); this.contextService = new ContextService(parameterService, symmetricDialect); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/ISymmetricEngine.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/ISymmetricEngine.java index 72712ee8a3..1733e61445 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/ISymmetricEngine.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/ISymmetricEngine.java @@ -242,6 +242,8 @@ public interface ISymmetricEngine { public ISymmetricDialect getSymmetricDialect(); + public ISymmetricDialect getExtractSymmetricDialect(); + public IJobManager getJobManager(); public IOutgoingBatchService getOutgoingBatchService(); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/common/Constants.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/common/Constants.java index d3a2aa77c4..4a6e11ae63 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/common/Constants.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/common/Constants.java @@ -77,6 +77,8 @@ private Constants() { public static final String DEPLOYMENT_SUB_TYPE_LOAD_ONLY = "load-only"; + public static final String DEPLOYMENT_SUB_TYPE_EXTRACT_ONLY = "extract-only"; + /** * Use this value for the router_id in {@link DataEvent} if the router is unknown. */ diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/common/ParameterConstants.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/common/ParameterConstants.java index fdf64976b5..a5e3408981 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/common/ParameterConstants.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/common/ParameterConstants.java @@ -470,6 +470,8 @@ private ParameterConstants() { public final static String NODE_LOAD_ONLY = "load.only"; + public final static String NODE_EXTRACT_ONLY = "extract.only"; + public final static String MYSQL_TINYINT_DDL_TO_BOOLEAN = "mysql.tinyint.ddl.to.boolean"; public final static String MYSQL_BULK_LOAD_MAX_ROWS_BEFORE_FLUSH = "mysql.bulk.load.max.rows.before.flush"; public final static String MYSQL_BULK_LOAD_MAX_BYTES_BEFORE_FLUSH = "mysql.bulk.load.max.bytes.before.flush"; @@ -477,6 +479,8 @@ private ParameterConstants() { public final static String MYSQL_BULK_LOAD_REPLACE = "mysql.bulk.load.replace"; public static final String LOAD_ONLY_PROPERTY_PREFIX = "target."; + + public static final String EXTRACT_ONLY_PROPERTY_PREFIX = "extract."; public final static String KAFKA_PRODUCER = "kafka.producer"; public final static String KAFKA_FORMAT = "kafka.format"; diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/job/PushHeartbeatListener.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/job/PushHeartbeatListener.java index 285fb9804a..e45f36ed3b 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/job/PushHeartbeatListener.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/job/PushHeartbeatListener.java @@ -68,9 +68,10 @@ public void heartbeat(Node me) { || me.getBatchToSendCount() != outgoingUnsentCount) { log.info("Some attribute(s) of node changed. Recording changes"); me.setDeploymentType(engine.getDeploymentType()); + me.setDeploymentSubType(engine.getDeploymentSubType()); me.setSymmetricVersion(Version.version()); - me.setDatabaseType(symmetricDialect.getName()); - me.setDatabaseVersion(symmetricDialect.getVersion()); + me.setDatabaseType(engine.getExtractSymmetricDialect().getName()); + me.setDatabaseVersion(engine.getExtractSymmetricDialect().getVersion()); me.setBatchInErrorCount(outgoingErrorCount); me.setBatchToSendCount(outgoingUnsentCount); me.setSchemaVersion(parameterService.getString(ParameterConstants.SCHEMA_VERSION)); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/Node.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/Node.java index 12e8cb3c0e..d75319b6de 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/model/Node.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/model/Node.java @@ -104,7 +104,14 @@ public Node(Properties properties) { setSyncUrl(properties.getProperty(ParameterConstants.SYNC_URL)); setSchemaVersion(properties.getProperty(ParameterConstants.SCHEMA_VERSION)); String loadOnly = properties.getProperty(ParameterConstants.NODE_LOAD_ONLY); - setDeploymentSubType(loadOnly != null && loadOnly.equals("true") ? Constants.DEPLOYMENT_SUB_TYPE_LOAD_ONLY : null); + String extractOnly = properties.getProperty(ParameterConstants.NODE_EXTRACT_ONLY); + String deploymentSubType = null; + if (loadOnly != null && loadOnly.equals("true")) { + deploymentSubType = Constants.DEPLOYMENT_SUB_TYPE_LOAD_ONLY; + } else if (extractOnly != null && extractOnly.equals("true")) { + deploymentSubType = Constants.DEPLOYMENT_SUB_TYPE_EXTRACT_ONLY; + } + this.deploymentSubType = deploymentSubType; } public Node(IParameterService parameterService, ISymmetricDialect symmetricDialect) { diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/AbstractService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/AbstractService.java index 12d63a5945..c3cb94e8f3 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/AbstractService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/AbstractService.java @@ -33,6 +33,7 @@ import org.apache.commons.lang.StringUtils; import org.apache.commons.lang.exception.ExceptionUtils; +import org.jumpmind.db.model.Table; import org.jumpmind.db.platform.IDatabasePlatform; import org.jumpmind.db.sql.ISqlTemplate; import org.jumpmind.db.sql.ISqlTransaction; @@ -69,6 +70,14 @@ abstract public class AbstractService implements IService { protected IDatabasePlatform platform; + protected ISymmetricDialect extractSymmetricDialect; + + protected IDatabasePlatform extractPlatform; + + protected ISqlTemplate extractSqlTemplate; + + protected ISqlTemplate extractSqlTemplateDirty; + protected String tablePrefix; private ISqlMap sqlMap; @@ -370,6 +379,33 @@ protected void assertNotNull(Object o, String message) { } } + protected boolean isSymmetricTable(String tableName) { + return tableName.toUpperCase().startsWith(this.tablePrefix.toUpperCase()); + } + + protected IDatabasePlatform getExtractPlatform(String tableName) { + return isSymmetricTable(tableName) ? symmetricDialect.getPlatform() : extractSymmetricDialect.getPlatform(); + } + + protected boolean isSymmetricTable(Table table) { + return table.getNameLowerCase().startsWith(this.tablePrefix.toLowerCase()); + } + + protected IDatabasePlatform getExtractPlatform(Table table) { + return isSymmetricTable(table) ? symmetricDialect.getPlatform() : extractSymmetricDialect.getPlatform(); + } + + public ISymmetricDialect getExtractSymmetricDialect() { + return extractSymmetricDialect == null ? symmetricDialect : extractSymmetricDialect; + } + + public void setExtractSymmetricDialect(ISymmetricDialect extractSymmetricDialect) { + this.extractPlatform = extractSymmetricDialect.getPlatform(); + this.extractSqlTemplate = extractSymmetricDialect.getPlatform().getSqlTemplate(); + this.extractSqlTemplateDirty = extractSymmetricDialect.getPlatform().getSqlTemplateDirty(); + this.extractSymmetricDialect = extractSymmetricDialect; + } + } \ No newline at end of file diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java index 3cefa73a09..7dc18f2770 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataExtractorService.java @@ -86,6 +86,7 @@ import org.jumpmind.symmetric.common.Constants; import org.jumpmind.symmetric.common.ParameterConstants; import org.jumpmind.symmetric.common.TableConstants; +import org.jumpmind.symmetric.db.ISymmetricDialect; import org.jumpmind.symmetric.io.data.Batch; import org.jumpmind.symmetric.io.data.Batch.BatchType; import org.jumpmind.symmetric.io.data.CsvConstants; @@ -214,8 +215,9 @@ protected enum ExtractMode { FOR_SYM_CLIENT, FOR_PAYLOAD_CLIENT, EXTRACT_ONLY }; private CustomizableThreadFactory threadPoolFactory; - public DataExtractorService(ISymmetricEngine engine) { + public DataExtractorService(ISymmetricEngine engine, ISymmetricDialect extractSymmetricDialect) { super(engine.getParameterService(), engine.getSymmetricDialect()); + setExtractSymmetricDialect(extractSymmetricDialect); this.outgoingBatchService = engine.getOutgoingBatchService(); this.routerService = engine.getRouterService(); this.dataService = engine.getDataService(); @@ -1783,14 +1785,14 @@ protected Table lookupAndOrderColumnsAccordingToTriggerHistory(String routerId, String tableName = triggerHistory.getSourceTableName(); Table table = null; if (useDatabaseDefinition) { - table = platform.getTableFromCache(catalogName, schemaName, tableName, false); + table = getExtractPlatform(tableName).getTableFromCache(catalogName, schemaName, tableName, false); if (table != null && table.getColumnCount() < triggerHistory.getParsedColumnNames().length) { /* * If the column count is less than what trigger history reports, then * chances are the table cache is out of date. */ - table = platform.getTableFromCache(catalogName, schemaName, tableName, true); + table = getExtractPlatform(tableName).getTableFromCache(catalogName, schemaName, tableName, true); } if (table != null) { @@ -2889,7 +2891,7 @@ protected void startNewCursor(final TriggerHistory triggerHistory, if (selfRefLevel == 0) { selectSql += selfRefParentColumnName + " is null or " + selfRefParentColumnName + " = " + selfRefChildColumnName + " "; } else { - DatabaseInfo info = symmetricDialect.getPlatform().getDatabaseInfo(); + DatabaseInfo info = extractSymmetricDialect.getPlatform().getDatabaseInfo(); String tableName = Table.getFullyQualifiedTableName(sourceTable.getCatalog(), sourceTable.getSchema(), sourceTable.getName(), info.getDelimiterToken(), info.getCatalogSeparator(), info.getSchemaSeparator()); String refSql= "select " + selfRefChildColumnName + " from " + tableName + @@ -2907,14 +2909,14 @@ protected void startNewCursor(final TriggerHistory triggerHistory, Channel channel = configurationService.getChannel(triggerRouter.getTrigger().getReloadChannelId()); - if (channel.isReloadFlag() && symmetricDialect.isInitialLoadTwoPassLob(this.sourceTable)) { + if (channel.isReloadFlag() && extractSymmetricDialect.isInitialLoadTwoPassLob(this.sourceTable)) { channel = new Channel(); channel.setContainsBigLob(!this.isLobFirstPass); - selectSql = symmetricDialect.getInitialLoadTwoPassLobSql(selectSql, this.sourceTable, this.isLobFirstPass); + selectSql = extractSymmetricDialect.getInitialLoadTwoPassLobSql(selectSql, this.sourceTable, this.isLobFirstPass); log.info("Querying {} pass LOB for table {}: {}", (this.isLobFirstPass ? "first" : "second"), sourceTable.getName(), selectSql); } - String sql = symmetricDialect.createInitialLoadSqlFor( + String sql = extractSymmetricDialect.createInitialLoadSqlFor( this.currentInitialLoadEvent.getNode(), triggerRouter, sourceTable, triggerHistory, channel, selectSql); for (IReloadVariableFilter filter : extensionService.getExtensionPointList(IReloadVariableFilter.class)) { @@ -2923,21 +2925,21 @@ protected void startNewCursor(final TriggerHistory triggerHistory, final String initialLoadSql = sql; final int expectedCommaCount = triggerHistory.getParsedColumnNames().length - 1; - final boolean selectedAsCsv = symmetricDialect.getParameterService().is( + final boolean selectedAsCsv = extractSymmetricDialect.getParameterService().is( ParameterConstants.INITIAL_LOAD_CONCAT_CSV_IN_SQL_ENABLED); - final boolean objectValuesWillNeedEscaped = !symmetricDialect.getTriggerTemplate() + final boolean objectValuesWillNeedEscaped = !extractSymmetricDialect.getTriggerTemplate() .useTriggerTemplateForColumnTemplatesDuringInitialLoad(); - final boolean[] isColumnPositionUsingTemplate = symmetricDialect.getColumnPositionUsingTemplate(sourceTable, triggerHistory); + final boolean[] isColumnPositionUsingTemplate = extractSymmetricDialect.getColumnPositionUsingTemplate(sourceTable, triggerHistory); log.debug(sql); - this.cursor = sqlTemplate.queryForCursor(initialLoadSql, new ISqlRowMapper() { + this.cursor = extractSqlTemplate.queryForCursor(initialLoadSql, new ISqlRowMapper() { public Data mapRow(Row row) { String csvRow = null; if (selectedAsCsv) { csvRow = row.stringValue(); } else if (objectValuesWillNeedEscaped) { - csvRow = platform.getCsvStringValue( - symmetricDialect.getBinaryEncoding(), sourceTable.getColumns(), + csvRow = extractPlatform.getCsvStringValue( + extractSymmetricDialect.getBinaryEncoding(), sourceTable.getColumns(), row, isColumnPositionUsingTemplate); } else { csvRow = row.csvValue(); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java index efe7dc5478..53859c8de7 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/DataService.java @@ -60,6 +60,7 @@ import org.jumpmind.symmetric.common.Constants; import org.jumpmind.symmetric.common.ParameterConstants; import org.jumpmind.symmetric.common.TableConstants; +import org.jumpmind.symmetric.db.ISymmetricDialect; import org.jumpmind.symmetric.db.SequenceIdentifier; import org.jumpmind.symmetric.ext.IHeartbeatListener; import org.jumpmind.symmetric.io.data.Batch; @@ -112,8 +113,9 @@ public class DataService extends AbstractService implements IDataService { private DataMapper dataMapper; - public DataService(ISymmetricEngine engine, IExtensionService extensionService) { + public DataService(ISymmetricEngine engine, IExtensionService extensionService, ISymmetricDialect extractSymmetricDialect) { super(engine.getParameterService(), engine.getSymmetricDialect()); + setExtractSymmetricDialect(extractSymmetricDialect); this.engine = engine; this.dataMapper = new DataMapper(); this.extensionService = extensionService; @@ -1408,7 +1410,7 @@ private Map insertLoadBatchesForReload(Node targetNode, String reloadChannel = getReloadChannelIdForTrigger(trigger, channels); Channel channel = channels.get(reloadChannel); - Table table = platform.getTableFromCache( + Table table = getExtractPlatform(triggerHistory.getSourceTableName()).getTableFromCache( triggerHistory.getSourceCatalogName(), triggerHistory.getSourceSchemaName(), triggerHistory.getSourceTableName(), false); @@ -1489,11 +1491,11 @@ protected long getDataCountForReload(Table table, Node targetNode, String select long rowCount = -1; if (parameterService.is(ParameterConstants.INITIAL_LOAD_USE_ESTIMATED_COUNTS) && (selectSql == null || StringUtils.isBlank(selectSql) || selectSql.replace(" ", "").equals("1=1"))) { - rowCount = platform.getEstimatedRowCount(table); + rowCount = extractPlatform.getEstimatedRowCount(table); } if (rowCount < 0) { - DatabaseInfo dbInfo = platform.getDatabaseInfo(); + DatabaseInfo dbInfo = extractPlatform.getDatabaseInfo(); String quote = dbInfo.getDelimiterToken(); String catalogSeparator = dbInfo.getCatalogSeparator(); String schemaSeparator = dbInfo.getSchemaSeparator(); @@ -1508,7 +1510,7 @@ protected long getDataCountForReload(Table table, Node targetNode, String select } try { - rowCount = sqlTemplateDirty.queryForLong(sql); + rowCount = extractSqlTemplateDirty.queryForLong(sql); } catch (SqlException ex) { log.error("Failed to execute row count SQL while starting reload. " + ex.getMessage() + ", SQL: \"" + sql + "\""); throw new InvalidSqlException(ex); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/FileSyncExtractorService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/FileSyncExtractorService.java index dc3199ba9c..5c3206a6e0 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/FileSyncExtractorService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/FileSyncExtractorService.java @@ -25,6 +25,7 @@ import org.jumpmind.symmetric.ISymmetricEngine; import org.jumpmind.symmetric.common.ParameterConstants; import org.jumpmind.symmetric.common.TableConstants; +import org.jumpmind.symmetric.db.ISymmetricDialect; import org.jumpmind.symmetric.file.FileSyncZipDataWriter; import org.jumpmind.symmetric.io.data.IDataProcessorListener; import org.jumpmind.symmetric.io.data.IDataWriter; @@ -57,8 +58,8 @@ public class FileSyncExtractorService extends DataExtractorService { private ITriggerRouterService triggerRouterService; private IExtensionService extensionService; - public FileSyncExtractorService(ISymmetricEngine engine) { - super(engine); + public FileSyncExtractorService(ISymmetricEngine engine, ISymmetricDialect extractSymmetricDialect) { + super(engine, extractSymmetricDialect); this.fileSyncService = engine.getFileSyncService(); this.nodeService = engine.getNodeService(); this.stagingManager = engine.getStagingManager(); diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/TriggerRouterService.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/TriggerRouterService.java index 63a40f67ca..a0bfc94225 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/TriggerRouterService.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/TriggerRouterService.java @@ -46,6 +46,7 @@ import org.jumpmind.db.model.Column; import org.jumpmind.db.model.Database; import org.jumpmind.db.model.Table; +import org.jumpmind.db.platform.IDatabasePlatform; import org.jumpmind.db.sql.ISqlRowMapper; import org.jumpmind.db.sql.ISqlTransaction; import org.jumpmind.db.sql.Row; @@ -58,6 +59,7 @@ import org.jumpmind.symmetric.config.ITriggerCreationListener; import org.jumpmind.symmetric.config.TriggerFailureListener; import org.jumpmind.symmetric.config.TriggerSelector; +import org.jumpmind.symmetric.db.ISymmetricDialect; import org.jumpmind.symmetric.io.data.DataEventType; import org.jumpmind.symmetric.model.Channel; import org.jumpmind.symmetric.model.Lock; @@ -137,8 +139,9 @@ public class TriggerRouterService extends AbstractService implements ITriggerRou */ private Map historyMap = Collections.synchronizedMap(new HashMap()); - public TriggerRouterService(ISymmetricEngine engine) { + public TriggerRouterService(ISymmetricEngine engine, ISymmetricDialect extractSymmetricDialect) { super(engine.getParameterService(), engine.getSymmetricDialect()); + setExtractSymmetricDialect(extractSymmetricDialect); this.clusterService = engine.getClusterService(); this.configurationService = engine.getConfigurationService(); this.statisticManager = engine.getStatisticManager(); @@ -1403,13 +1406,16 @@ protected List getTriggersForCurrentNode() { protected Set getTablesForTrigger(Trigger trigger, List triggers, boolean useTableCache) { Set
tables = new HashSet
(); + + IDatabasePlatform sourcePlatform = getExtractPlatform(trigger.getSourceTableName()); + try { boolean ignoreCase = this.parameterService .is(ParameterConstants.DB_METADATA_IGNORE_CASE); List catalogNames = new ArrayList(); if (trigger.isSourceCatalogNameWildCarded()) { - List all = platform.getDdlReader().getCatalogNames(); + List all = sourcePlatform.getDdlReader().getCatalogNames(); for (String catalogName : all) { if (trigger.matchesCatalogName(catalogName, ignoreCase)) { catalogNames.add(catalogName); @@ -1420,7 +1426,7 @@ protected Set
getTablesForTrigger(Trigger trigger, List triggers } } else { if (isBlank(trigger.getSourceCatalogName())) { - catalogNames.add(platform.getDefaultCatalog()); + catalogNames.add(sourcePlatform.getDefaultCatalog()); } else { catalogNames.add(trigger.getSourceCatalogNameUnescaped()); } @@ -1429,7 +1435,7 @@ protected Set
getTablesForTrigger(Trigger trigger, List triggers for (String catalogName : catalogNames) { List schemaNames = new ArrayList(); if (trigger.isSourceSchemaNameWildCarded()) { - List all = platform.getDdlReader().getSchemaNames(catalogName); + List all = sourcePlatform.getDdlReader().getSchemaNames(catalogName); for (String schemaName : all) { if (trigger.matchesSchemaName(schemaName, ignoreCase)) { schemaNames.add(schemaName); @@ -1440,7 +1446,7 @@ protected Set
getTablesForTrigger(Trigger trigger, List triggers } } else { if (isBlank(trigger.getSourceSchemaName())) { - schemaNames.add(platform.getDefaultSchema()); + schemaNames.add(sourcePlatform.getDefaultSchema()); } else { schemaNames.add(trigger.getSourceSchemaNameUnescaped()); } @@ -1448,7 +1454,7 @@ protected Set
getTablesForTrigger(Trigger trigger, List triggers for (String schemaName : schemaNames) { if (trigger.isSourceTableNameWildCarded()) { - Database database = symmetricDialect.getPlatform().readDatabase( + Database database = sourcePlatform.readDatabase( catalogName, schemaName, new String[] { "TABLE" }); Table[] tableArray = database.getTables(); @@ -1463,7 +1469,7 @@ protected Set
getTablesForTrigger(Trigger trigger, List triggers } } } else { - Table table = symmetricDialect.getPlatform().getTableFromCache( + Table table = sourcePlatform.getTableFromCache( catalogName, schemaName, trigger.getSourceTableNameUnescaped(), !useTableCache); if (table != null) { diff --git a/symmetric-server/src/main/java/org/jumpmind/symmetric/web/SymmetricEngineHolder.java b/symmetric-server/src/main/java/org/jumpmind/symmetric/web/SymmetricEngineHolder.java index 0ed47f003e..2fddae6cf6 100644 --- a/symmetric-server/src/main/java/org/jumpmind/symmetric/web/SymmetricEngineHolder.java +++ b/symmetric-server/src/main/java/org/jumpmind/symmetric/web/SymmetricEngineHolder.java @@ -54,6 +54,7 @@ import org.jumpmind.symmetric.ISymmetricEngine; import org.jumpmind.symmetric.SymmetricAdmin; import org.jumpmind.symmetric.SymmetricException; +import org.jumpmind.symmetric.common.Constants; import org.jumpmind.symmetric.common.ParameterConstants; import org.jumpmind.symmetric.common.SystemConstants; import org.jumpmind.symmetric.model.Node; @@ -283,6 +284,17 @@ protected ISymmetricEngine create(String propertiesFile) { engine = new ServerSymmetricEngine(propertiesFile != null ? file : null, springContext, this); engine.setDeploymentType(deploymentType); + + String loadOnly = properties.getProperty(ParameterConstants.NODE_LOAD_ONLY); + String extractOnly = properties.getProperty(ParameterConstants.NODE_EXTRACT_ONLY); + String deploymentSubType = null; + if (loadOnly != null && loadOnly.equals("true")) { + deploymentSubType = Constants.DEPLOYMENT_SUB_TYPE_LOAD_ONLY; + } else if (extractOnly != null && extractOnly.equals("true")) { + deploymentSubType = Constants.DEPLOYMENT_SUB_TYPE_EXTRACT_ONLY; + } + engine.setDeploymentSubType(deploymentSubType); + synchronized (this) { if (!engines.containsKey(engine.getEngineName())) { engines.put(engine.getEngineName(), engine); @@ -341,6 +353,18 @@ public ISymmetricEngine install(Properties passedInProperties) throws Exception } } + String extractOnlyPassword = properties.getProperty(ParameterConstants.EXTRACT_ONLY_PROPERTY_PREFIX + BasicDataSourcePropertyConstants.DB_POOL_PASSWORD); + + if (StringUtils.isNotBlank(extractOnlyPassword) && !extractOnlyPassword.startsWith(SecurityConstants.PREFIX_ENC)) { + try { + ISecurityService service = SecurityServiceFactory.create(SecurityServiceType.CLIENT, properties); + properties.setProperty(ParameterConstants.LOAD_ONLY_PROPERTY_PREFIX + BasicDataSourcePropertyConstants.DB_POOL_PASSWORD, + SecurityConstants.PREFIX_ENC + service.encrypt(extractOnlyPassword)); + } catch (Exception ex) { + log.warn("Could not encrypt extract only password", ex); + } + } + String engineName = validateRequiredProperties(properties); passedInProperties.setProperty(ParameterConstants.ENGINE_NAME, engineName);