Skip to content

Commit

Permalink
0002092: Should be able to remove mongo and amazon jar files and the …
Browse files Browse the repository at this point in the history
…application should still work
  • Loading branch information
chenson42 committed Dec 8, 2014
1 parent bebaba4 commit 4da7d24
Show file tree
Hide file tree
Showing 6 changed files with 79 additions and 15 deletions.
Expand Up @@ -20,25 +20,29 @@
*/
package org.jumpmind.symmetric.ext;

import java.lang.reflect.Constructor;
import java.util.List;

import org.jumpmind.db.platform.DatabaseNamesConstants;
import org.jumpmind.db.platform.IDatabasePlatform;
import org.jumpmind.extension.IBuiltInExtensionPoint;
import org.jumpmind.symmetric.ISymmetricEngine;
import org.jumpmind.symmetric.db.ISymmetricDialect;
import org.jumpmind.symmetric.io.RedshiftBulkDatabaseWriter;
import org.jumpmind.symmetric.io.data.IDataWriter;
import org.jumpmind.symmetric.io.data.writer.Conflict;
import org.jumpmind.symmetric.io.data.writer.IDatabaseWriterErrorHandler;
import org.jumpmind.symmetric.io.data.writer.IDatabaseWriterFilter;
import org.jumpmind.symmetric.io.data.writer.ResolvedData;
import org.jumpmind.symmetric.io.data.writer.TransformWriter;
import org.jumpmind.symmetric.io.stage.IStagingManager;
import org.jumpmind.symmetric.load.IDataLoaderFactory;
import org.jumpmind.symmetric.service.IParameterService;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

public class RedshiftBulkDataLoaderFactory implements IDataLoaderFactory, ISymmetricEngineAware, IBuiltInExtensionPoint {

protected final Logger log = LoggerFactory.getLogger(getClass());

private ISymmetricEngine engine;

public RedshiftBulkDataLoaderFactory() {
Expand All @@ -60,16 +64,32 @@ public IDataWriter getDataWriter(String sourceNodeId, ISymmetricDialect symmetri
String secretKey = param.getString("redshift.bulk.load.s3.secret.key");
String appendToCopyCommand = param.getString("redshift.append.to.copy.command");

return new RedshiftBulkDatabaseWriter(symmetricDialect.getPlatform(), engine.getStagingManager(), filters, errorHandlers,
maxRowsBeforeFlush, maxBytesBeforeFlush, bucket, accessKey, secretKey, appendToCopyCommand);
try {
Class<?> dbWriterClass = Class.forName("org.jumpmind.symmetric.io.RedshiftBulkDatabaseWriter");
Constructor<?> dbWriterConstructor = dbWriterClass.getConstructor(new Class<?>[] {
IDatabasePlatform.class, IStagingManager.class, List.class,
List.class, Integer.TYPE, Long.TYPE, String.class,
String.class, String.class, String.class });
return (IDataWriter) dbWriterConstructor.newInstance(
symmetricDialect.getPlatform(), engine.getStagingManager(), filters, errorHandlers,
maxRowsBeforeFlush, maxBytesBeforeFlush, bucket, accessKey, secretKey, appendToCopyCommand);

} catch (Exception e) {
log.warn("Failed to create the mongo database writer. Check to see if all of the required jars have been added");
if (e instanceof RuntimeException) {
throw (RuntimeException)e;
} else {
throw new RuntimeException(e);
}
}
}

public void setSymmetricEngine(ISymmetricEngine engine) {
this.engine = engine;
}

public boolean isPlatformSupported(IDatabasePlatform platform) {
return DatabaseNamesConstants.REDSHIFT.equals(platform.getName());
return true;//DatabaseNamesConstants.REDSHIFT.equals(platform.getName());
}

}
Expand Up @@ -20,6 +20,8 @@
*/
package org.jumpmind.symmetric.io;

import java.lang.reflect.Constructor;
import java.lang.reflect.Method;
import java.util.List;

import org.jumpmind.db.platform.IDatabasePlatform;
Expand All @@ -29,12 +31,15 @@
import org.jumpmind.symmetric.ext.ISymmetricEngineAware;
import org.jumpmind.symmetric.io.data.IDataWriter;
import org.jumpmind.symmetric.io.data.writer.Conflict;
import org.jumpmind.symmetric.io.data.writer.DatabaseWriterSettings;
import org.jumpmind.symmetric.io.data.writer.DefaultTransformWriterConflictResolver;
import org.jumpmind.symmetric.io.data.writer.IDatabaseWriterConflictResolver;
import org.jumpmind.symmetric.io.data.writer.IDatabaseWriterErrorHandler;
import org.jumpmind.symmetric.io.data.writer.IDatabaseWriterFilter;
import org.jumpmind.symmetric.io.data.writer.ResolvedData;
import org.jumpmind.symmetric.io.data.writer.TransformWriter;
import org.jumpmind.symmetric.load.DefaultDataLoaderFactory;
import org.jumpmind.symmetric.service.IParameterService;

public class MongoDataLoaderFactory extends DefaultDataLoaderFactory implements
ISymmetricEngineAware, IBuiltInExtensionPoint {
Expand Down Expand Up @@ -65,12 +70,44 @@ public IDataWriter getDataWriter(String sourceNodeId, ISymmetricDialect symmetri
TransformWriter transformWriter, List<IDatabaseWriterFilter> filters,
List<IDatabaseWriterErrorHandler> errorHandlers,
List<? extends Conflict> conflictSettings, List<ResolvedData> resolvedData) {
if (objectMapper instanceof SimpleDBObjectMapper) {
((SimpleDBObjectMapper)objectMapper).setDefaultDatabaseName(parameterService.getString("mongodb.default.databasename", "default"));
try {
if (objectMapper == null) {
objectMapper = (IDBObjectMapper)Class.forName("org.jumpmind.symmetric.io.SimpleDBObjectMapper").newInstance();
}
Method method = objectMapper.getClass().getMethod("setDefaultDatabaseName", String.class);
if (method != null) {
method.invoke(objectMapper, parameterService
.getString("mongodb.default.databasename", "default"));
}
} catch (Exception e) {
log.debug("Failed to call setDefaultDatabaseName on mapper", e);
}

try {
Class<?> clientManagerClass = Class
.forName("org.jumpmind.symmetric.io.SimpleMongoClientManager");
Constructor<?> clientManagerConstrutor = clientManagerClass
.getConstructor(new Class<?>[] { IParameterService.class, String.class });
Class<?> dbWriterClass = Class.forName("org.jumpmind.symmetric.io.MongoDatabaseWriter");
Constructor<?> dbWriterConstructor = dbWriterClass.getConstructor(new Class<?>[] {
IDBObjectMapper.class, IMongoClientManager.class,
IDatabaseWriterConflictResolver.class, DatabaseWriterSettings.class });
Object clientManager = clientManagerConstrutor.newInstance(parameterService, typeName);
return (IDataWriter) dbWriterConstructor.newInstance(
objectMapper,
clientManager,
new DefaultTransformWriterConflictResolver(transformWriter),
buildDatabaseWriterSettings(filters, errorHandlers, conflictSettings,
resolvedData));

} catch (Exception e) {
log.warn("Failed to create the mongo database writer. Check to see if all of the required jars have been added");
if (e instanceof RuntimeException) {
throw (RuntimeException)e;
} else {
throw new RuntimeException(e);
}
}
return new MongoDatabaseWriter(objectMapper, new SimpleMongoClientManager(parameterService, typeName),
new DefaultTransformWriterConflictResolver(transformWriter),
buildDatabaseWriterSettings(filters, errorHandlers, conflictSettings, resolvedData));
}

@Override
Expand Down
Expand Up @@ -28,7 +28,6 @@
import org.jumpmind.symmetric.io.data.writer.DataWriterStatisticConstants;
import org.jumpmind.symmetric.io.data.writer.DatabaseWriterSettings;
import org.jumpmind.symmetric.io.data.writer.IDatabaseWriterConflictResolver;
import org.jumpmind.symmetric.io.data.writer.AbstractDatabaseWriter.LoadStatus;

import com.mongodb.CommandResult;
import com.mongodb.DB;
Expand Down
3 changes: 0 additions & 3 deletions symmetric-client/src/main/resources/symmetric-ext-points.xml
Expand Up @@ -24,9 +24,6 @@

<bean id="mongoLoaderFactory" class="org.jumpmind.symmetric.io.MongoDataLoaderFactory">
<property name="typeName" value="mongodb" />
<property name="objectMapper">
<bean class="org.jumpmind.symmetric.io.SimpleDBObjectMapper" />
</property>
</bean>


Expand Down
Expand Up @@ -46,7 +46,7 @@

public class DefaultDataLoaderFactory implements IDataLoaderFactory, IBuiltInExtensionPoint {

private static final Logger log = LoggerFactory.getLogger(DefaultDataLoaderFactory.class);
protected final Logger log = LoggerFactory.getLogger(getClass());

protected IParameterService parameterService;

Expand Down
11 changes: 11 additions & 0 deletions symmetric-server/src/main/deploy/conf/symmetric-extensions.xml
Expand Up @@ -26,6 +26,17 @@
xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-3.0.xsd
http://www.springframework.org/schema/lang http://www.springframework.org/schema/lang/spring-lang-3.0.xsd" default-lazy-init="true">

<!--
Example of how to setup your own mongo data loader. If you want to provide your own mapping, then
implement an IDBObjectMapper and wire it in as the objectMapper
<bean id="myMongoLoaderFactory" class="org.jumpmind.symmetric.io.MongoDataLoaderFactory" >
<property name="typeName" value="mongodb" />
<property name="objectMapper" >
<bean class="org.jumpmind.symmetric.io.SimpleDBObjectMapper" />
</property>
</bean>
-->

<!--
This example shows how to publish XML messages to MqSeries. The following properties
would need to be added to symmetric.properties or another property file specified by the
Expand Down

0 comments on commit 4da7d24

Please sign in to comment.