Skip to content

Commit

Permalink
Merge branch '3.12' of https://github.com/JumpMind/symmetric-ds.git i…
Browse files Browse the repository at this point in the history
…nto 3.12
  • Loading branch information
erilong committed Jun 24, 2020
2 parents 2f4bd8a + 3d8c067 commit c769e73
Show file tree
Hide file tree
Showing 9 changed files with 89 additions and 94 deletions.
2 changes: 1 addition & 1 deletion symmetric-client/build.gradle
Expand Up @@ -19,7 +19,7 @@ apply from: symAssembleDir + '/common.gradle'
compileOnly "org.apache.derby:derby:$derbyVersion"
compileOnly "org.hsqldb:hsqldb:$hsqldbVersion"
compileOnly "org.xerial:sqlite-jdbc:$sqliteVersion"
compileOnly "org.mongodb:mongo-java-driver:2.12.3"
compileOnly "org.mongodb:mongodb-driver:3.12.5"

compileOnly ("org.apache.hbase:hbase-client:1.3.6") {
exclude group: 'log4j'
Expand Down
Expand Up @@ -22,17 +22,16 @@

import java.util.Map;

import org.bson.Document;
import org.jumpmind.db.model.Table;

import com.mongodb.DBObject;

public interface IDBObjectMapper {

public DBObject mapToDBObject(Table table, Map<String, String> newData, Map<String, String> oldData,
Map<String, String> pkData, boolean mapKeyOnly);

public String mapToCollection(Table table);

public String mapToDatabase(Table table);

public Document mapToDocument(Table table, Map<String, String> newData, Map<String, String> oldData,
Map<String, String> pkData, boolean mapKeyOnly);

}
Expand Up @@ -20,14 +20,14 @@
*/
package org.jumpmind.symmetric.io;

import com.mongodb.DB;
import com.mongodb.MongoClient;
import com.mongodb.client.MongoDatabase;

public interface IMongoClientManager {

public MongoClient get();
public MongoClient getClient(String databaseName);

public DB getDB(String name);
public MongoDatabase getDB(String name);

public String getName();

Expand Down
Expand Up @@ -44,8 +44,6 @@
public class MongoDataLoaderFactory extends DefaultDataLoaderFactory implements
ISymmetricEngineAware, IBuiltInExtensionPoint {

protected ISymmetricEngine engine;

protected String typeName = "mongodb";

protected IDBObjectMapper objectMapper;
Expand Down
Expand Up @@ -22,19 +22,19 @@

import java.util.Map;

import org.bson.Document;
import org.jumpmind.symmetric.SymmetricException;
import org.jumpmind.symmetric.io.data.CsvData;
import org.jumpmind.symmetric.io.data.writer.AbstractDatabaseWriter;
import org.jumpmind.symmetric.io.data.writer.DataWriterStatisticConstants;
import org.jumpmind.symmetric.io.data.writer.DatabaseWriterSettings;
import org.jumpmind.symmetric.io.data.writer.IDatabaseWriterConflictResolver;

import com.mongodb.CommandResult;
import com.mongodb.DB;
import com.mongodb.DBCollection;
import com.mongodb.DBObject;
import com.mongodb.WriteConcern;
import com.mongodb.WriteResult;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoDatabase;
import com.mongodb.client.model.ReplaceOptions;
import com.mongodb.client.result.DeleteResult;
import com.mongodb.client.result.UpdateResult;

/**
* The default mapping is that a catalog or schema or catalog.schema is mapped
Expand Down Expand Up @@ -96,22 +96,21 @@ protected void logFailureDetails(Throwable e, CsvData data, boolean logLastDmlDe
protected LoadStatus upsert(CsvData data) {
statistics.get(batch).startTimer(DataWriterStatisticConstants.LOADMILLIS);
try {
DB db = clientManager.getDB(objectMapper.mapToDatabase(this.targetTable));
DBCollection collection = db.getCollection(objectMapper
MongoDatabase db = clientManager.getDB(objectMapper.mapToDatabase(this.targetTable));
MongoCollection<Document> collection = db.getCollection(objectMapper
.mapToCollection(this.targetTable));
String[] columnNames = sourceTable.getColumnNames();
Map<String, String> newData = data
.toColumnNameValuePairs(columnNames, CsvData.ROW_DATA);
Map<String, String> oldData = data
.toColumnNameValuePairs(columnNames, CsvData.OLD_DATA);
Map<String, String> pkData = data.toKeyColumnValuePairs(this.sourceTable);
DBObject query = objectMapper
.mapToDBObject(sourceTable, newData, oldData, pkData, true);
DBObject object = objectMapper.mapToDBObject(sourceTable, newData, oldData, pkData,
Document query = objectMapper
.mapToDocument(sourceTable, newData, oldData, pkData, true);
Document object = objectMapper.mapToDocument(sourceTable, newData, oldData, pkData,
false);
WriteResult results = collection.update(query, object, true, false,
WriteConcern.ACKNOWLEDGED);
if (results.getN() == 1) {
UpdateResult results = collection.replaceOne(query, object, new ReplaceOptions().upsert(true));
if (results.getModifiedCount() == 1 || results.getUpsertedId() != null) {
return LoadStatus.SUCCESS;
} else {
throw new SymmetricException("Failed to write data: " + object.toString());
Expand All @@ -125,21 +124,21 @@ protected LoadStatus upsert(CsvData data) {
protected LoadStatus delete(CsvData data, boolean useConflictDetection) {
statistics.get(batch).startTimer(DataWriterStatisticConstants.LOADMILLIS);
try {
DB db = clientManager.getDB(objectMapper.mapToDatabase(this.targetTable));
DBCollection collection = db.getCollection(objectMapper
MongoDatabase db = clientManager.getDB(objectMapper.mapToDatabase(this.targetTable));
MongoCollection<Document> collection = db.getCollection(objectMapper
.mapToCollection(this.targetTable));
String[] columnNames = sourceTable.getColumnNames();
Map<String, String> newData = data
.toColumnNameValuePairs(columnNames, CsvData.ROW_DATA);
Map<String, String> oldData = data
.toColumnNameValuePairs(columnNames, CsvData.OLD_DATA);
Map<String, String> pkData = data.toKeyColumnValuePairs(this.sourceTable);
DBObject query = objectMapper
.mapToDBObject(sourceTable, newData, oldData, pkData, true);
WriteResult results = collection.remove(query, WriteConcern.ACKNOWLEDGED);
if (results.getN() != 1) {
Document query = objectMapper
.mapToDocument(sourceTable, newData, oldData, pkData, true);
DeleteResult results = collection.deleteOne(query);
if (results.getDeletedCount() != 1) {
log.warn("Attempted to remove a single object" + query.toString()
+ ". Instead removed: " + results.getN());
+ ". Instead removed: " + results.getDeletedCount());
}
return LoadStatus.SUCCESS;
} finally {
Expand All @@ -157,10 +156,10 @@ protected boolean create(CsvData data) {
protected boolean sql(CsvData data) {
statistics.get(batch).startTimer(DataWriterStatisticConstants.LOADMILLIS);
try {
DB db = clientManager.getDB(objectMapper.mapToDatabase(this.targetTable));
MongoDatabase db = clientManager.getDB(objectMapper.mapToDatabase(this.targetTable));
String command = data.getParsedData(CsvData.ROW_DATA)[0];
log.info("About to run command: {}", command);
CommandResult results = db.command(command);
Document results = db.runCommand(new Document(command, 1));
log.info("The results of the command were: {}", results);
} finally {
statistics.get(batch).stopTimer(DataWriterStatisticConstants.LOADMILLIS);
Expand Down
Expand Up @@ -26,21 +26,20 @@
import java.util.Map;
import java.util.Set;

import org.bson.Document;
import org.jumpmind.db.model.Table;

import com.mongodb.BasicDBObject;
import com.mongodb.DBObject;

public class SimpleDBObjectMapper implements IDBObjectMapper {

String defaultDatabaseName = "default";

public DBObject mapToDBObject(Table table, Map<String, String> newData,
Map<String, String> oldData, Map<String, String> pkData, boolean mapKeyOnly) {
public Document mapToDocument(Table table, Map<String, String> newData,
Map<String, String> oldData, Map<String, String> pkData, boolean mapKeyOnly)
{
if (mapKeyOnly) {
return buildWithKey(table, newData, oldData, pkData);
return buildDocumentWithKey(table, newData, oldData, pkData);
} else {
return buildWithKeyAndData(table, newData, oldData, pkData);
return buildDocumentWithKeyAndData(table, newData, oldData, pkData);
}
}

Expand All @@ -67,8 +66,9 @@ public String mapToDatabase(Table table) {
return mongoDatabaseName;
}

protected BasicDBObject buildWithKey(Table table, Map<String, String> newData,
Map<String, String> oldData, Map<String, String> pkData) {
protected Document buildDocumentWithKey(Table table, Map<String, String> newData,
Map<String, String> oldData, Map<String, String> pkData)
{
if (oldData == null || oldData.size() == 0) {
oldData = pkData;
}
Expand All @@ -77,35 +77,27 @@ protected BasicDBObject buildWithKey(Table table, Map<String, String> newData,
}

String[] keyNames = table.getPrimaryKeyColumnNames();

BasicDBObject object = new BasicDBObject();

// TODO support property to just let mongodb create ids?

Document document = new Document();
if (keyNames != null && keyNames.length > 0) {
if (keyNames.length == 1) {
object.put("_id", oldData.get(keyNames[0]));
} else {
BasicDBObject key = new BasicDBObject();
for (String keyName : keyNames) {
key.put(keyName, oldData.get(keyName));
}
object.put("_id", key);
for (String keyName : keyNames) {
document.put(keyName, oldData.get(keyName));
}
}

return object;
return document;
}

protected BasicDBObject buildWithKeyAndData(Table table, Map<String, String> newData,
Map<String, String> oldData, Map<String, String> pkData) {
BasicDBObject object = buildWithKey(table, newData, oldData, pkData);
protected Document buildDocumentWithKeyAndData(Table table, Map<String, String> newData,
Map<String, String> oldData, Map<String, String> pkData)
{
Document document = buildDocumentWithKey(table, newData, oldData, pkData);

Set<String> newDataKeys = newData.keySet();
for (String newDataKey : newDataKeys) {
object.put(newDataKey, newData.get(newDataKey));
document.put(newDataKey, newData.get(newDataKey));
}

return object;
return document;
}

public void setDefaultDatabaseName(String defaultDatabaseName) {
Expand Down
Expand Up @@ -20,17 +20,18 @@
*/
package org.jumpmind.symmetric.io;

import java.net.UnknownHostException;
import java.util.Arrays;
import java.util.HashMap;
import java.util.Map;

import org.jumpmind.symmetric.SymmetricException;
import org.jumpmind.symmetric.service.IParameterService;

import com.mongodb.DB;
import com.mongodb.MongoClient;
import com.mongodb.MongoClientURI;
import com.mongodb.MongoClientOptions;
import com.mongodb.MongoCredential;
import com.mongodb.ServerAddress;
import com.mongodb.WriteConcern;
import com.mongodb.client.MongoDatabase;

public class SimpleMongoClientManager implements IMongoClientManager {

Expand All @@ -44,60 +45,52 @@ public class SimpleMongoClientManager implements IMongoClientManager {
*/
protected final static Map<String, MongoClient> clients = new HashMap<String, MongoClient>();

protected DB currentDB;
protected MongoDatabase currentDB;

public SimpleMongoClientManager(IParameterService parameterService, String name) {
this.name = name;
this.parameterService = parameterService;
}

@Override
public synchronized MongoClient get() {
public synchronized MongoClient getClient(String databaseName) {
MongoClient client = clients.get(name);
if (client == null) {
int port = 27017;
String host = "localhost";
String host = "localhost";
if (parameterService != null) {
port = parameterService.getInt(name + MongoConstants.PORT, port);
host = parameterService.getString(name + MongoConstants.HOST, host);
}
String dbUrl = "mongodb://" + host + ":" + port;
String username = null;
char[] password = null;
if (parameterService != null) {
dbUrl = parameterService.getString(name + MongoConstants.URL, dbUrl);
username = parameterService.getString(this.name + MongoConstants.USERNAME, username);
String passwordString = parameterService.getString(this.name + MongoConstants.PASSWORD,
null);
if (passwordString != null) {
password = passwordString.toCharArray();
}
}
try {
client = new MongoClient(new MongoClientURI(dbUrl));
clients.put(name, client);
} catch (UnknownHostException e) {
throw new SymmetricException(e);
}
MongoCredential credential = null;
credential = MongoCredential.createCredential(username, databaseName, password);
client = new MongoClient(Arrays.asList(new ServerAddress(host, port)),
credential, new MongoClientOptions.Builder().build());
clients.put(name, client);
}
return client;
}

@Override
public synchronized DB getDB(String name) {
if (currentDB == null || !currentDB.getName().equals(name)) {
currentDB = get().getDB(name);
public synchronized MongoDatabase getDB(String databaseName) {
if (currentDB == null || !currentDB.getName().equals(databaseName)) {
currentDB = getClient(databaseName).getDatabase(databaseName);
/**
* TODO make this a property
*/
currentDB.setWriteConcern(WriteConcern.ACKNOWLEDGED);
String username = null;
char[] password = null;
if (parameterService != null) {
username = parameterService.getString(name + MongoConstants.USERNAME, username);
String passwordString = parameterService.getString(name + MongoConstants.PASSWORD,
null);
if (passwordString != null) {
password = passwordString.toCharArray();
}
}

if (username != null && !currentDB.authenticate(username, password)) {
throw new SymmetricException("Failed to authenticate with the mongo database: "
+ name);
}
currentDB.withWriteConcern(WriteConcern.ACKNOWLEDGED);
}
return currentDB;
}
Expand Down
Expand Up @@ -2128,6 +2128,19 @@ public Trigger mapRow(Row rs) {
if (!StringUtils.isBlank(text)) {
trigger.setCustomOnDeleteText(text);
}

text = rs.getString("custom_before_insert_text");
if (!StringUtils.isBlank(text)) {
trigger.setCustomBeforeInsertText(text);
}
text = rs.getString("custom_before_update_text");
if (!StringUtils.isBlank(text)) {
trigger.setCustomBeforeUpdateText(text);
}
text = rs.getString("custom_before_delete_text");
if (!StringUtils.isBlank(text)) {
trigger.setCustomBeforeDeleteText(text);
}

condition = rs.getString("external_select");
if (!StringUtils.isBlank(condition)) {
Expand Down
Expand Up @@ -79,6 +79,7 @@ public TriggerRouterServiceSqlMap(IDatabasePlatform platform,
+ " t.name_for_delete_trigger,t.name_for_insert_trigger,t.name_for_update_trigger, "
+ " t.sync_on_insert_condition,t.sync_on_update_condition,t.sync_on_delete_condition, "
+ " t.custom_on_insert_text,t.custom_on_update_text,t.custom_on_delete_text, "
+ " t.custom_before_insert_text,t.custom_before_update_text,t.custom_before_delete_text, "
+ " t.tx_id_expression,t.external_select,t.channel_expression, t.stream_row, "
+ " t.create_time as t_create_time, "
+ " t.last_update_time as t_last_update_time, t.last_update_by as t_last_update_by ");
Expand Down

0 comments on commit c769e73

Please sign in to comment.