Skip to content

Commit

Permalink
0004041: Extract only nodes
Browse files Browse the repository at this point in the history
  • Loading branch information
erilong committed Nov 11, 2019
1 parent 0c38a31 commit 04d46f5
Show file tree
Hide file tree
Showing 22 changed files with 182 additions and 308 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ public AndroidSymmetricEngine(String registrationUrl, String externalId, String
this.databaseHelper = databaseHelper;
this.androidContext = androidContext;
init();
this.symmetricDialect.setTargetPlatform(this.symmetricDialect.getPlatform());
}

@Override
Expand Down Expand Up @@ -229,18 +228,4 @@ public IMonitorService getMonitorService() {
throw new NotImplementedException();
}

@Override
public ISymmetricDialect getSymmetricDialect() {
return this.symmetricDialect;
}

@Override
protected ISymmetricDialect checkExtractOnly() {
return getSymmetricDialect();
}

@Override
public ISymmetricDialect getExtractSymmetricDialect() {
return this.symmetricDialect;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,6 @@
import org.apache.commons.dbcp.BasicDataSource;
import org.apache.commons.io.FileUtils;
import org.apache.commons.lang.StringUtils;
import org.jumpmind.db.platform.DatabaseNamesConstants;
import org.jumpmind.db.platform.IDatabasePlatform;
import org.jumpmind.db.platform.JdbcDatabasePlatformFactory;
import org.jumpmind.db.platform.cassandra.CassandraPlatform;
Expand Down Expand Up @@ -250,7 +249,6 @@ protected void init() {
"Failed to initialize the extension points. Please fix the problem and restart the server.",
ex);
}
checkLoadOnly();
} catch (RuntimeException ex) {
destroy();
throw ex;
Expand Down Expand Up @@ -296,6 +294,34 @@ protected ISymmetricDialect createSymmetricDialect() {
return new JdbcSymmetricDialectFactory(parameterService, platform).create();
}

@Override
protected ISymmetricDialect createTargetDialect() {
if (parameterService.is(ParameterConstants.NODE_LOAD_ONLY, false)) {
TypedProperties properties = new TypedProperties();
String prefix = ParameterConstants.LOAD_ONLY_PROPERTY_PREFIX;
copyProperties(properties, prefix, BasicDataSourcePropertyConstants.ALL_PROPS);
copyProperties(properties, prefix, ParameterConstants.ALL_JDBC_PARAMS);
copyProperties(properties, "", ParameterConstants.ALL_KAFKA_PARAMS);

IDatabasePlatform targetPlatform = createDatabasePlatform(null, properties, null, true, true);

if (targetPlatform instanceof GenericJdbcDatabasePlatform) {
targetPlatform.getDatabaseInfo().setNotNullColumnsSupported(parameterService.is(prefix +
ParameterConstants.CREATE_TABLE_NOT_NULL_COLUMNS, true));
}

return new JdbcSymmetricDialectFactory(parameterService, targetPlatform).create();
} else {
return getSymmetricDialect();
}
}

private void copyProperties(TypedProperties properties, String prefix, String[] parameterNames) {
for (String name : parameterNames) {
properties.put(name, parameterService.getString(prefix + name));
}
}

@Override
protected IDatabasePlatform createDatabasePlatform(TypedProperties properties) {
IDatabasePlatform platform = createDatabasePlatform(springContext, properties, dataSource,
Expand All @@ -311,15 +337,15 @@ public static IDatabasePlatform createDatabasePlatform(ApplicationContext spring
DataSource dataSource, boolean waitOnAvailableDatabase, boolean isLoadOnly) {
log.info("Initializing connection to database");
if (dataSource == null) {
if (isLoadOnly) {
String dbUrl = properties.get(BasicDataSourcePropertyConstants.DB_POOL_URL);
String dbDriver = properties.get(BasicDataSourcePropertyConstants.DB_POOL_DRIVER);
if (dbUrl != null && dbUrl.startsWith("cassandra://")) {
return new CassandraPlatform(createSqlTemplateSettings(properties), dbUrl.substring(12));
} else if (dbDriver != null && dbDriver.contains("kafka")) {
return new KafkaPlatform(createSqlTemplateSettings(properties));
}
}
if (isLoadOnly) {
String dbUrl = properties.get(BasicDataSourcePropertyConstants.DB_POOL_URL);
String dbDriver = properties.get(BasicDataSourcePropertyConstants.DB_POOL_DRIVER);
if (dbUrl != null && dbUrl.startsWith("cassandra://")) {
return new CassandraPlatform(createSqlTemplateSettings(properties), dbUrl.substring(12));
} else if (dbDriver != null && dbDriver.contains("kafka")) {
return new KafkaPlatform(createSqlTemplateSettings(properties));
}
}
String jndiName = properties.getProperty(ParameterConstants.DB_JNDI_NAME);
if (StringUtils.isNotBlank(jndiName)) {
try {
Expand Down Expand Up @@ -495,95 +521,6 @@ public int compare(File o1, File o2) {
});
return files;
}

protected ISymmetricDialect checkExtractOnly() {
if (parameterService.is(ParameterConstants.NODE_EXTRACT_ONLY, false)) {
TypedProperties properties = new TypedProperties();
for (String prop : BasicDataSourcePropertyConstants.allProps ) {
properties.put(prop, parameterService.getString(ParameterConstants.EXTRACT_ONLY_PROPERTY_PREFIX + prop));
}

String[] sqlTemplateProperties = new String[] {
ParameterConstants.DB_FETCH_SIZE,
ParameterConstants.DB_QUERY_TIMEOUT_SECS,
ParameterConstants.JDBC_EXECUTE_BATCH_SIZE,
ParameterConstants.JDBC_ISOLATION_LEVEL,
ParameterConstants.JDBC_READ_STRINGS_AS_BYTES,
ParameterConstants.TREAT_BINARY_AS_LOB_ENABLED,
ParameterConstants.LOG_SLOW_SQL_THRESHOLD_MILLIS,
ParameterConstants.LOG_SQL_PARAMETERS_INLINE
};
for (String prop : sqlTemplateProperties) {
properties.put(prop, parameterService.getString(ParameterConstants.EXTRACT_ONLY_PROPERTY_PREFIX + prop));
}

IDatabasePlatform extractPlatform = createDatabasePlatform(null, properties, null, true, true);
JdbcSymmetricDialectFactory dialectFactory = new JdbcSymmetricDialectFactory(parameterService, extractPlatform);
return dialectFactory.create();
}
else {
return getSymmetricDialect();
}
}

protected void checkLoadOnly() {
if (parameterService.is(ParameterConstants.NODE_LOAD_ONLY, false)) {

TypedProperties properties = new TypedProperties();
for (String prop : BasicDataSourcePropertyConstants.allProps ) {
properties.put(prop, parameterService.getString(ParameterConstants.LOAD_ONLY_PROPERTY_PREFIX + prop));
}

String[] sqlTemplateProperties = new String[] {
ParameterConstants.DB_FETCH_SIZE,
ParameterConstants.DB_QUERY_TIMEOUT_SECS,
ParameterConstants.JDBC_EXECUTE_BATCH_SIZE,
ParameterConstants.JDBC_ISOLATION_LEVEL,
ParameterConstants.JDBC_READ_STRINGS_AS_BYTES,
ParameterConstants.TREAT_BINARY_AS_LOB_ENABLED,
ParameterConstants.LOG_SLOW_SQL_THRESHOLD_MILLIS,
ParameterConstants.LOG_SQL_PARAMETERS_INLINE
};
for (String prop : sqlTemplateProperties) {
properties.put(prop, parameterService.getString(ParameterConstants.LOAD_ONLY_PROPERTY_PREFIX + prop));
}

String[] kafkaProperties = new String[] {
ParameterConstants.KAFKA_PRODUCER,
ParameterConstants.KAFKA_MESSAGE_BY,
ParameterConstants.KAFKA_TOPIC_BY,
ParameterConstants.KAFKA_FORMAT
};

for (String prop : kafkaProperties) {
properties.put(prop, parameterService.getString(prop));
}

IDatabasePlatform targetPlatform = createDatabasePlatform(null, properties, null, true, true);
DataSource loadDataSource = targetPlatform.getDataSource();
if (targetPlatform instanceof GenericJdbcDatabasePlatform) {
if (targetPlatform.getName() == null || targetPlatform.getName().equals(DatabaseNamesConstants.GENERIC)) {
String name = null;
try {
String nameVersion[] = JdbcDatabasePlatformFactory.determineDatabaseNameVersionSubprotocol(loadDataSource);
name = (String.format("%s%s", nameVersion[0], nameVersion[1]).toLowerCase());
}
catch (Exception e) {
log.info("Unable to determine database name and version, " + e.getMessage());
}
if (name == null) {
name = DatabaseNamesConstants.GENERIC;
}
((GenericJdbcDatabasePlatform) targetPlatform).setName(name);
}
targetPlatform.getDatabaseInfo().setNotNullColumnsSupported(parameterService.is(ParameterConstants.LOAD_ONLY_PROPERTY_PREFIX + ParameterConstants.CREATE_TABLE_NOT_NULL_COLUMNS, true));
}
getSymmetricDialect().setTargetPlatform(targetPlatform);
}
else {
getSymmetricDialect().setTargetPlatform(getSymmetricDialect().getPlatform());
}
}

public ApplicationContext getSpringContext() {
return springContext;
Expand All @@ -596,17 +533,5 @@ public File snapshot() {
public IMonitorService getMonitorService() {
return monitorService;
}

@Override
public ISymmetricDialect getSymmetricDialect() {
return this.symmetricDialect;
}

@Override
public ISymmetricDialect getExtractSymmetricDialect() {
return this.extractSymmetricDialect;
}



}
Original file line number Diff line number Diff line change
Expand Up @@ -39,43 +39,44 @@ public IDataWriter getDataWriter(String sourceNodeId, ISymmetricDialect symmetri
dataLoaderFactories.put(factory.getTypeName(), factory);
}

IDatabasePlatform platform = engine.getTargetDialect().getPlatform();
String platformName = platform.getName();

if (engine.getParameterService().is(ParameterConstants.JDBC_EXECUTE_BULK_BATCH_OVERRIDE, false)) {
return new JdbcBatchBulkDatabaseWriter(symmetricDialect.getPlatform(), symmetricDialect.getTargetPlatform(),
return new JdbcBatchBulkDatabaseWriter(symmetricDialect.getPlatform(), platform,
symmetricDialect.getTablePrefix(), buildParameterDatabaseWritterSettings());
} else if (DatabaseNamesConstants.MYSQL.equals(engine.getSymmetricDialect().getTargetPlatform().getName())) {
} else if (DatabaseNamesConstants.MYSQL.equals(platformName)) {
return new MySqlBulkDataLoaderFactory(engine).getDataWriter(sourceNodeId, symmetricDialect, transformWriter,
filters, errorHandlers, conflictSettings, resolvedData);
} else if (DatabaseNamesConstants.MSSQL2000.equals(engine.getSymmetricDialect().getTargetPlatform().getName())
|| DatabaseNamesConstants.MSSQL2005.equals(engine.getSymmetricDialect().getTargetPlatform().getName())
|| DatabaseNamesConstants.MSSQL2008.equals(engine.getSymmetricDialect().getTargetPlatform().getName())) {
} else if (DatabaseNamesConstants.MSSQL2000.equals(platformName)
|| DatabaseNamesConstants.MSSQL2005.equals(platformName)
|| DatabaseNamesConstants.MSSQL2008.equals(platformName)) {
return new MsSqlBulkDataLoaderFactory(engine).getDataWriter(sourceNodeId, symmetricDialect, transformWriter,
filters, errorHandlers, conflictSettings, resolvedData);
} else if (DatabaseNamesConstants.ORACLE.equals(engine.getSymmetricDialect().getTargetPlatform().getName())) {
} else if (DatabaseNamesConstants.ORACLE.equals(platformName)) {
return new OracleBulkDataLoaderFactory(engine).getDataWriter(sourceNodeId, symmetricDialect, transformWriter,
filters, errorHandlers, conflictSettings, resolvedData);
} else if (DatabaseNamesConstants.TIBERO.equals(engine.getSymmetricDialect().getTargetPlatform().getName())) {
} else if (DatabaseNamesConstants.TIBERO.equals(platformName)) {
return new TiberoBulkDataLoaderFactory(engine).getDataWriter(sourceNodeId, symmetricDialect, transformWriter,
filters, errorHandlers, conflictSettings, resolvedData);
} else if (DatabaseNamesConstants.POSTGRESQL.equals(engine.getSymmetricDialect().getTargetPlatform().getName())
|| DatabaseNamesConstants.POSTGRESQL95.equals(engine.getSymmetricDialect().getTargetPlatform().getName())
|| DatabaseNamesConstants.GREENPLUM.equals(engine.getSymmetricDialect().getTargetPlatform().getName())) {
} else if (DatabaseNamesConstants.POSTGRESQL.equals(platformName)
|| DatabaseNamesConstants.POSTGRESQL95.equals(platformName)
|| DatabaseNamesConstants.GREENPLUM.equals(platformName)) {
return new PostgresBulkDataLoaderFactory(engine).getDataWriter(sourceNodeId, symmetricDialect, transformWriter,
filters, errorHandlers, conflictSettings, resolvedData);
} else if (DatabaseNamesConstants.REDSHIFT.equals(engine.getSymmetricDialect().getTargetPlatform().getName())) {
} else if (DatabaseNamesConstants.REDSHIFT.equals(platformName)) {
return new RedshiftBulkDataLoaderFactory(engine).getDataWriter(sourceNodeId, symmetricDialect, transformWriter,
filters, errorHandlers, conflictSettings, resolvedData);
} else if (engine.getSymmetricDialect().getTargetPlatform().getName() != null &&
engine.getSymmetricDialect().getTargetPlatform().getName().startsWith(DatabaseNamesConstants.TERADATA)) {
} else if (platformName != null && platformName.startsWith(DatabaseNamesConstants.TERADATA)) {
return new TeradataBulkDataLoaderFactory(engine).getDataWriter(sourceNodeId, symmetricDialect, transformWriter,
filters, errorHandlers, conflictSettings, resolvedData);
} else if (engine.getSymmetricDialect().getTargetPlatform().getName() != null &&
engine.getSymmetricDialect().getTargetPlatform().getName().startsWith(DatabaseNamesConstants.SNOWFLAKE)) {
} else if (platformName != null && platformName.startsWith(DatabaseNamesConstants.SNOWFLAKE)) {
return new SnowflakeBulkDataLoaderFactory(engine).getDataWriter(sourceNodeId, symmetricDialect, transformWriter,
filters, errorHandlers, conflictSettings, resolvedData);
} else {
return new JdbcBatchBulkDatabaseWriter(symmetricDialect.getPlatform(), symmetricDialect.getTargetPlatform(),
return new JdbcBatchBulkDatabaseWriter(symmetricDialect.getPlatform(), platform,
symmetricDialect.getTablePrefix(), buildParameterDatabaseWritterSettings());
}
}
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -171,8 +171,6 @@ abstract public class AbstractSymmetricEngine implements ISymmetricEngine {

protected ISymmetricDialect symmetricDialect;

protected ISymmetricDialect extractSymmetricDialect;

protected INodeService nodeService;

protected IConfigurationService configurationService;
Expand Down Expand Up @@ -252,8 +250,6 @@ abstract public class AbstractSymmetricEngine implements ISymmetricEngine {
abstract protected ITypedPropertiesFactory createTypedPropertiesFactory();

abstract protected IDatabasePlatform createDatabasePlatform(TypedProperties properties);

abstract protected ISymmetricDialect checkExtractOnly();

protected boolean registerEngine = true;

Expand Down Expand Up @@ -346,12 +342,11 @@ protected void init() {


this.symmetricDialect = createSymmetricDialect();
this.extractSymmetricDialect = checkExtractOnly();
this.symmetricDialect.setTargetDialect(createTargetDialect());

this.extensionService = createExtensionService();
this.extensionService.refresh();
this.symmetricDialect.setExtensionService(extensionService);
this.extractSymmetricDialect.setExtensionService(extensionService);
this.parameterService.setExtensionService(extensionService);
this.contextService = new ContextService(parameterService, symmetricDialect);

Expand All @@ -361,7 +356,7 @@ protected void init() {
this.nodeService = new NodeService(this);
this.configurationService = new ConfigurationService(parameterService, symmetricDialect,
nodeService);
this.dataService = new DataService(this, extensionService, extractSymmetricDialect);
this.dataService = new DataService(this, extensionService);
this.clusterService = createClusterService();
this.statisticService = new StatisticService(parameterService, symmetricDialect);
this.statisticManager = createStatisticManager();
Expand All @@ -374,14 +369,14 @@ protected void init() {
this.loadFilterService = new LoadFilterService(parameterService, symmetricDialect,
configurationService);
this.groupletService = new GroupletService(this);
this.triggerRouterService = new TriggerRouterService(this, extractSymmetricDialect);
this.triggerRouterService = new TriggerRouterService(this);
this.outgoingBatchService = new OutgoingBatchService(parameterService, symmetricDialect,
nodeService, configurationService, sequenceService, clusterService, extensionService);
this.routerService = buildRouterService();
this.nodeCommunicationService = buildNodeCommunicationService(clusterService, nodeService, parameterService, configurationService, symmetricDialect);
this.incomingBatchService = new IncomingBatchService(parameterService, symmetricDialect, clusterService);
this.initialLoadService = new InitialLoadService(this);
this.dataExtractorService = new DataExtractorService(this, extractSymmetricDialect);
this.dataExtractorService = new DataExtractorService(this);
this.transportManager = new TransportManagerFactory(this).create();
this.offlineTransportManager = new TransportManagerFactory(this).create(Constants.PROTOCOL_FILE);
this.dataLoaderService = new DataLoaderService(this);
Expand All @@ -400,7 +395,7 @@ protected void init() {
nodeService, dataLoaderService, clusterService, nodeCommunicationService,
configurationService, extensionService, offlineTransportManager);
this.fileSyncService = buildFileSyncService();
this.fileSyncExtractorService = new FileSyncExtractorService(this, extractSymmetricDialect);
this.fileSyncExtractorService = new FileSyncExtractorService(this);
this.mailService = new MailService(parameterService, symmetricDialect);

String updateServiceClassName = properties.get(ParameterConstants.UPDATE_SERVICE_CLASS);
Expand Down Expand Up @@ -465,6 +460,10 @@ protected INodeCommunicationService buildNodeCommunicationService(IClusterServic

abstract protected ISymmetricDialect createSymmetricDialect();

protected ISymmetricDialect createTargetDialect() {
return getSymmetricDialect();
}

abstract protected IExtensionService createExtensionService();

abstract protected IJobManager createJobManager();
Expand Down Expand Up @@ -1307,7 +1306,17 @@ public IUpdateService getUpdateService() {
public String getNodeId() {
return getNodeService().findIdentityNodeId();
}


@Override
public ISymmetricDialect getSymmetricDialect() {
return symmetricDialect;
}

@Override
public ISymmetricDialect getTargetDialect() {
return symmetricDialect.getTargetDialect();
}

@Override
public String toString() {
return "Engine " + getNodeId() + " " + super.toString();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -241,9 +241,9 @@ public interface ISymmetricEngine {
public IDataService getDataService();

public ISymmetricDialect getSymmetricDialect();

public ISymmetricDialect getExtractSymmetricDialect();

public ISymmetricDialect getTargetDialect();
public IJobManager getJobManager();

public IOutgoingBatchService getOutgoingBatchService();
Expand Down
Loading

0 comments on commit 04d46f5

Please sign in to comment.