Skip to content

Commit

Permalink
0004041: Extract only nodes
Browse files Browse the repository at this point in the history
  • Loading branch information
jumpmind-josh committed Jul 12, 2019
1 parent a9ac67a commit 95219cb
Show file tree
Hide file tree
Showing 14 changed files with 172 additions and 35 deletions.
Expand Up @@ -233,4 +233,9 @@ public IMonitorService getMonitorService() {
public ISymmetricDialect getSymmetricDialect() {
return this.symmetricDialect;
}

@Override
protected ISymmetricDialect checkExtractOnly() {
return getSymmetricDialect();
}
}
Expand Up @@ -496,6 +496,36 @@ public int compare(File o1, File o2) {
return files;
}

protected ISymmetricDialect checkExtractOnly() {
if (parameterService.is(ParameterConstants.NODE_EXTRACT_ONLY, false)) {
TypedProperties properties = new TypedProperties();
for (String prop : BasicDataSourcePropertyConstants.allProps ) {
properties.put(prop, parameterService.getString(ParameterConstants.EXTRACT_ONLY_PROPERTY_PREFIX + prop));
}

String[] sqlTemplateProperties = new String[] {
ParameterConstants.DB_FETCH_SIZE,
ParameterConstants.DB_QUERY_TIMEOUT_SECS,
ParameterConstants.JDBC_EXECUTE_BATCH_SIZE,
ParameterConstants.JDBC_ISOLATION_LEVEL,
ParameterConstants.JDBC_READ_STRINGS_AS_BYTES,
ParameterConstants.TREAT_BINARY_AS_LOB_ENABLED,
ParameterConstants.LOG_SLOW_SQL_THRESHOLD_MILLIS,
ParameterConstants.LOG_SQL_PARAMETERS_INLINE
};
for (String prop : sqlTemplateProperties) {
properties.put(prop, parameterService.getString(ParameterConstants.EXTRACT_ONLY_PROPERTY_PREFIX + prop));
}

IDatabasePlatform extractPlatform = createDatabasePlatform(null, properties, null, true, true);
JdbcSymmetricDialectFactory dialectFactory = new JdbcSymmetricDialectFactory(parameterService, extractPlatform);
return dialectFactory.create();
}
else {
return getSymmetricDialect();
}
}

protected void checkLoadOnly() {
if (parameterService.is(ParameterConstants.NODE_LOAD_ONLY, false)) {

Expand Down Expand Up @@ -571,4 +601,12 @@ public IMonitorService getMonitorService() {
public ISymmetricDialect getSymmetricDialect() {
return this.symmetricDialect;
}

@Override
public ISymmetricDialect getExtractSymmetricDialect() {
return this.extractSymmetricDialect;
}



}
Expand Up @@ -169,6 +169,8 @@ abstract public class AbstractSymmetricEngine implements ISymmetricEngine {

protected ISymmetricDialect symmetricDialect;

protected ISymmetricDialect extractSymmetricDialect;

protected INodeService nodeService;

protected IConfigurationService configurationService;
Expand Down Expand Up @@ -247,6 +249,8 @@ abstract public class AbstractSymmetricEngine implements ISymmetricEngine {

abstract protected IDatabasePlatform createDatabasePlatform(TypedProperties properties);

abstract protected ISymmetricDialect checkExtractOnly();

protected boolean registerEngine = true;

protected AbstractSymmetricEngine(boolean registerEngine) {
Expand Down Expand Up @@ -336,11 +340,14 @@ protected void init() {
this.platform.setClearCacheModelTimeoutInMs(parameterService
.getLong(ParameterConstants.CACHE_TIMEOUT_TABLES_IN_MS));


this.symmetricDialect = createSymmetricDialect();
this.extractSymmetricDialect = checkExtractOnly();

this.extensionService = createExtensionService();
this.extensionService.refresh();
this.symmetricDialect.setExtensionService(extensionService);
this.extractSymmetricDialect.setExtensionService(extensionService);
this.parameterService.setExtensionService(extensionService);

this.bandwidthService = new BandwidthService(parameterService);
Expand All @@ -349,7 +356,7 @@ protected void init() {
this.nodeService = new NodeService(this);
this.configurationService = new ConfigurationService(parameterService, symmetricDialect,
nodeService);
this.dataService = new DataService(this, extensionService);
this.dataService = new DataService(this, extensionService, extractSymmetricDialect);
this.clusterService = createClusterService();
this.statisticService = new StatisticService(parameterService, symmetricDialect);
this.statisticManager = createStatisticManager();
Expand All @@ -362,13 +369,13 @@ protected void init() {
this.loadFilterService = new LoadFilterService(parameterService, symmetricDialect,
configurationService);
this.groupletService = new GroupletService(this);
this.triggerRouterService = new TriggerRouterService(this);
this.triggerRouterService = new TriggerRouterService(this, extractSymmetricDialect);
this.outgoingBatchService = new OutgoingBatchService(parameterService, symmetricDialect,
nodeService, configurationService, sequenceService, clusterService, extensionService);
this.routerService = buildRouterService();
this.nodeCommunicationService = buildNodeCommunicationService(clusterService, nodeService, parameterService, configurationService, symmetricDialect);
this.incomingBatchService = new IncomingBatchService(parameterService, symmetricDialect, clusterService);
this.dataExtractorService = new DataExtractorService(this);
this.dataExtractorService = new DataExtractorService(this, extractSymmetricDialect);
this.transportManager = new TransportManagerFactory(this).create();
this.offlineTransportManager = new TransportManagerFactory(this).create(Constants.PROTOCOL_FILE);
this.dataLoaderService = new DataLoaderService(this);
Expand All @@ -387,7 +394,7 @@ protected void init() {
nodeService, dataLoaderService, clusterService, nodeCommunicationService,
configurationService, extensionService, offlineTransportManager);
this.fileSyncService = buildFileSyncService();
this.fileSyncExtractorService = new FileSyncExtractorService(this);
this.fileSyncExtractorService = new FileSyncExtractorService(this, extractSymmetricDialect);
this.mailService = new MailService(parameterService, symmetricDialect);
this.contextService = new ContextService(parameterService, symmetricDialect);

Expand Down
Expand Up @@ -242,6 +242,8 @@ public interface ISymmetricEngine {

public ISymmetricDialect getSymmetricDialect();

public ISymmetricDialect getExtractSymmetricDialect();

public IJobManager getJobManager();

public IOutgoingBatchService getOutgoingBatchService();
Expand Down
Expand Up @@ -77,6 +77,8 @@ private Constants() {

public static final String DEPLOYMENT_SUB_TYPE_LOAD_ONLY = "load-only";

public static final String DEPLOYMENT_SUB_TYPE_EXTRACT_ONLY = "extract-only";

/**
* Use this value for the router_id in {@link DataEvent} if the router is unknown.
*/
Expand Down
Expand Up @@ -470,13 +470,17 @@ private ParameterConstants() {

public final static String NODE_LOAD_ONLY = "load.only";

public final static String NODE_EXTRACT_ONLY = "extract.only";

public final static String MYSQL_TINYINT_DDL_TO_BOOLEAN = "mysql.tinyint.ddl.to.boolean";
public final static String MYSQL_BULK_LOAD_MAX_ROWS_BEFORE_FLUSH = "mysql.bulk.load.max.rows.before.flush";
public final static String MYSQL_BULK_LOAD_MAX_BYTES_BEFORE_FLUSH = "mysql.bulk.load.max.bytes.before.flush";
public final static String MYSQL_BULK_LOAD_LOCAL = "mysql.bulk.load.local";
public final static String MYSQL_BULK_LOAD_REPLACE = "mysql.bulk.load.replace";

public static final String LOAD_ONLY_PROPERTY_PREFIX = "target.";

public static final String EXTRACT_ONLY_PROPERTY_PREFIX = "extract.";

public final static String KAFKA_PRODUCER = "kafka.producer";
public final static String KAFKA_FORMAT = "kafka.format";
Expand Down
Expand Up @@ -68,9 +68,10 @@ public void heartbeat(Node me) {
|| me.getBatchToSendCount() != outgoingUnsentCount) {
log.info("Some attribute(s) of node changed. Recording changes");
me.setDeploymentType(engine.getDeploymentType());
me.setDeploymentSubType(engine.getDeploymentSubType());
me.setSymmetricVersion(Version.version());
me.setDatabaseType(symmetricDialect.getName());
me.setDatabaseVersion(symmetricDialect.getVersion());
me.setDatabaseType(engine.getExtractSymmetricDialect().getName());
me.setDatabaseVersion(engine.getExtractSymmetricDialect().getVersion());
me.setBatchInErrorCount(outgoingErrorCount);
me.setBatchToSendCount(outgoingUnsentCount);
me.setSchemaVersion(parameterService.getString(ParameterConstants.SCHEMA_VERSION));
Expand Down
Expand Up @@ -104,7 +104,14 @@ public Node(Properties properties) {
setSyncUrl(properties.getProperty(ParameterConstants.SYNC_URL));
setSchemaVersion(properties.getProperty(ParameterConstants.SCHEMA_VERSION));
String loadOnly = properties.getProperty(ParameterConstants.NODE_LOAD_ONLY);
setDeploymentSubType(loadOnly != null && loadOnly.equals("true") ? Constants.DEPLOYMENT_SUB_TYPE_LOAD_ONLY : null);
String extractOnly = properties.getProperty(ParameterConstants.NODE_EXTRACT_ONLY);
String deploymentSubType = null;
if (loadOnly != null && loadOnly.equals("true")) {
deploymentSubType = Constants.DEPLOYMENT_SUB_TYPE_LOAD_ONLY;
} else if (extractOnly != null && extractOnly.equals("true")) {
deploymentSubType = Constants.DEPLOYMENT_SUB_TYPE_EXTRACT_ONLY;
}
this.deploymentSubType = deploymentSubType;
}

public Node(IParameterService parameterService, ISymmetricDialect symmetricDialect) {
Expand Down
Expand Up @@ -33,6 +33,7 @@

import org.apache.commons.lang.StringUtils;
import org.apache.commons.lang.exception.ExceptionUtils;
import org.jumpmind.db.model.Table;
import org.jumpmind.db.platform.IDatabasePlatform;
import org.jumpmind.db.sql.ISqlTemplate;
import org.jumpmind.db.sql.ISqlTransaction;
Expand Down Expand Up @@ -69,6 +70,14 @@ abstract public class AbstractService implements IService {

protected IDatabasePlatform platform;

protected ISymmetricDialect extractSymmetricDialect;

protected IDatabasePlatform extractPlatform;

protected ISqlTemplate extractSqlTemplate;

protected ISqlTemplate extractSqlTemplateDirty;

protected String tablePrefix;

private ISqlMap sqlMap;
Expand Down Expand Up @@ -370,6 +379,33 @@ protected void assertNotNull(Object o, String message) {
}
}

protected boolean isSymmetricTable(String tableName) {
return tableName.toUpperCase().startsWith(this.tablePrefix.toUpperCase());
}

protected IDatabasePlatform getExtractPlatform(String tableName) {
return isSymmetricTable(tableName) ? symmetricDialect.getPlatform() : extractSymmetricDialect.getPlatform();
}

protected boolean isSymmetricTable(Table table) {
return table.getNameLowerCase().startsWith(this.tablePrefix.toLowerCase());
}

protected IDatabasePlatform getExtractPlatform(Table table) {
return isSymmetricTable(table) ? symmetricDialect.getPlatform() : extractSymmetricDialect.getPlatform();
}

public ISymmetricDialect getExtractSymmetricDialect() {
return extractSymmetricDialect == null ? symmetricDialect : extractSymmetricDialect;
}

public void setExtractSymmetricDialect(ISymmetricDialect extractSymmetricDialect) {
this.extractPlatform = extractSymmetricDialect.getPlatform();
this.extractSqlTemplate = extractSymmetricDialect.getPlatform().getSqlTemplate();
this.extractSqlTemplateDirty = extractSymmetricDialect.getPlatform().getSqlTemplateDirty();
this.extractSymmetricDialect = extractSymmetricDialect;
}



}
Expand Up @@ -86,6 +86,7 @@
import org.jumpmind.symmetric.common.Constants;
import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.common.TableConstants;
import org.jumpmind.symmetric.db.ISymmetricDialect;
import org.jumpmind.symmetric.io.data.Batch;
import org.jumpmind.symmetric.io.data.Batch.BatchType;
import org.jumpmind.symmetric.io.data.CsvConstants;
Expand Down Expand Up @@ -214,8 +215,9 @@ protected enum ExtractMode { FOR_SYM_CLIENT, FOR_PAYLOAD_CLIENT, EXTRACT_ONLY };

private CustomizableThreadFactory threadPoolFactory;

public DataExtractorService(ISymmetricEngine engine) {
public DataExtractorService(ISymmetricEngine engine, ISymmetricDialect extractSymmetricDialect) {
super(engine.getParameterService(), engine.getSymmetricDialect());
setExtractSymmetricDialect(extractSymmetricDialect);
this.outgoingBatchService = engine.getOutgoingBatchService();
this.routerService = engine.getRouterService();
this.dataService = engine.getDataService();
Expand Down Expand Up @@ -1783,14 +1785,14 @@ protected Table lookupAndOrderColumnsAccordingToTriggerHistory(String routerId,
String tableName = triggerHistory.getSourceTableName();
Table table = null;
if (useDatabaseDefinition) {
table = platform.getTableFromCache(catalogName, schemaName, tableName, false);
table = getExtractPlatform(tableName).getTableFromCache(catalogName, schemaName, tableName, false);

if (table != null && table.getColumnCount() < triggerHistory.getParsedColumnNames().length) {
/*
* If the column count is less than what trigger history reports, then
* chances are the table cache is out of date.
*/
table = platform.getTableFromCache(catalogName, schemaName, tableName, true);
table = getExtractPlatform(tableName).getTableFromCache(catalogName, schemaName, tableName, true);
}

if (table != null) {
Expand Down Expand Up @@ -2889,7 +2891,7 @@ protected void startNewCursor(final TriggerHistory triggerHistory,
if (selfRefLevel == 0) {
selectSql += selfRefParentColumnName + " is null or " + selfRefParentColumnName + " = " + selfRefChildColumnName + " ";
} else {
DatabaseInfo info = symmetricDialect.getPlatform().getDatabaseInfo();
DatabaseInfo info = extractSymmetricDialect.getPlatform().getDatabaseInfo();
String tableName = Table.getFullyQualifiedTableName(sourceTable.getCatalog(), sourceTable.getSchema(),
sourceTable.getName(), info.getDelimiterToken(), info.getCatalogSeparator(), info.getSchemaSeparator());
String refSql= "select " + selfRefChildColumnName + " from " + tableName +
Expand All @@ -2907,14 +2909,14 @@ protected void startNewCursor(final TriggerHistory triggerHistory,

Channel channel = configurationService.getChannel(triggerRouter.getTrigger().getReloadChannelId());

if (channel.isReloadFlag() && symmetricDialect.isInitialLoadTwoPassLob(this.sourceTable)) {
if (channel.isReloadFlag() && extractSymmetricDialect.isInitialLoadTwoPassLob(this.sourceTable)) {
channel = new Channel();
channel.setContainsBigLob(!this.isLobFirstPass);
selectSql = symmetricDialect.getInitialLoadTwoPassLobSql(selectSql, this.sourceTable, this.isLobFirstPass);
selectSql = extractSymmetricDialect.getInitialLoadTwoPassLobSql(selectSql, this.sourceTable, this.isLobFirstPass);
log.info("Querying {} pass LOB for table {}: {}", (this.isLobFirstPass ? "first" : "second"), sourceTable.getName(), selectSql);
}

String sql = symmetricDialect.createInitialLoadSqlFor(
String sql = extractSymmetricDialect.createInitialLoadSqlFor(
this.currentInitialLoadEvent.getNode(), triggerRouter, sourceTable, triggerHistory, channel, selectSql);

for (IReloadVariableFilter filter : extensionService.getExtensionPointList(IReloadVariableFilter.class)) {
Expand All @@ -2923,21 +2925,21 @@ protected void startNewCursor(final TriggerHistory triggerHistory,

final String initialLoadSql = sql;
final int expectedCommaCount = triggerHistory.getParsedColumnNames().length - 1;
final boolean selectedAsCsv = symmetricDialect.getParameterService().is(
final boolean selectedAsCsv = extractSymmetricDialect.getParameterService().is(
ParameterConstants.INITIAL_LOAD_CONCAT_CSV_IN_SQL_ENABLED);
final boolean objectValuesWillNeedEscaped = !symmetricDialect.getTriggerTemplate()
final boolean objectValuesWillNeedEscaped = !extractSymmetricDialect.getTriggerTemplate()
.useTriggerTemplateForColumnTemplatesDuringInitialLoad();
final boolean[] isColumnPositionUsingTemplate = symmetricDialect.getColumnPositionUsingTemplate(sourceTable, triggerHistory);
final boolean[] isColumnPositionUsingTemplate = extractSymmetricDialect.getColumnPositionUsingTemplate(sourceTable, triggerHistory);
log.debug(sql);

this.cursor = sqlTemplate.queryForCursor(initialLoadSql, new ISqlRowMapper<Data>() {
this.cursor = extractSqlTemplate.queryForCursor(initialLoadSql, new ISqlRowMapper<Data>() {
public Data mapRow(Row row) {
String csvRow = null;
if (selectedAsCsv) {
csvRow = row.stringValue();
} else if (objectValuesWillNeedEscaped) {
csvRow = platform.getCsvStringValue(
symmetricDialect.getBinaryEncoding(), sourceTable.getColumns(),
csvRow = extractPlatform.getCsvStringValue(
extractSymmetricDialect.getBinaryEncoding(), sourceTable.getColumns(),
row, isColumnPositionUsingTemplate);
} else {
csvRow = row.csvValue();
Expand Down

0 comments on commit 95219cb

Please sign in to comment.