Navigation Menu

Skip to content

Commit

Permalink
0005081: MongoDB converted to a load only node and upgraded driver to
Browse files Browse the repository at this point in the history
4.3.1
  • Loading branch information
joshahicks committed Oct 11, 2021
1 parent 181b6bf commit 2e91d15
Show file tree
Hide file tree
Showing 10 changed files with 143 additions and 139 deletions.
84 changes: 42 additions & 42 deletions symmetric-assemble/src/asciidoc/appendix/mongodb.ad
@@ -1,58 +1,58 @@

=== MongoDB

ifndef::pro[]
MongoDB is only support in the professional version of SymmetricDS.
endif::pro[]

ifdef::pro[]

Use `symadmin module install mongodb` to install driver files, or copy your own files into the `lib` sub-directory.

Since SymmetricDS is trigger based and there are not triggers in MongoDB data can only synchronized to a MongoDB instance. The runtime SymmetricDS
tables will also need to be installed in a full relational database to support integration with MongoDB.
MongoDB can be setup as a load only or a log based node.

TIP: The simplest solution to support MongoDB is to add a new node (see <<Add Node>>) that is connected to an H2 database to store all the SYM_* runtime tables.

The MongoDB data loader maps relational database rows to MongoDB documents in collections. To use the preconfigured MongoDB data loader,
you set the `data_loader_type` to *MongoDB* on a <<Channels, channel>>.
*Load Only* - Select this mode at setup if you only need to load data into Mongo (no capture).
*Log Based* - Select this mode if you need to capture changes in Mongo to be sent out to other nodes.

ifdef::pro[]
image::appendix/mongodb-new-channel.png[]
endif::pro[]

Tables that should be synchronized to MongoDB should be configured to use this channel.
==== Setup

ifdef::pro[]
image::appendix/mongodb-trigger.png[]
endif::pro[]
Obtain the connection url from MongoDB. On their dashboard select "Connect".

In order to point it to a MongoDB instance set the following properties in the engines properties file.
image::appendix/mongodb-setup-1.png[]

[source,properties]
----
mongodb.username=xxxx
mongodb.password=xxxx
mongodb.host=xxxx
mongodb.port=xxxx
mongodb.default.databasename=default
mongodb.url=
----

By default, the catalog or schema passed by SymmetricDS will be used for the MongoDB database name. The table passed by SymmetricDS
will be used as the MongoDB collection name. If the catalog or schema are not set, the default database name property is used as the
database name.

The _id of the MongoDB document will be the primary key of the database record. If the table has a composite primary key, then the
_id will be an embedded document that has name value pairs of the composite key. The body of the document will be name value pairs
of the table column name and table row value.

SymmetricDS uses the MongoDB Java Driver to upsert documents.

TIP: SymmetricDS transforms can be used to transform the data. If a complex mapping is required that is not supported by transforms, then
the `IDBObjectMapper` can be implemented and a new `MongoDataLoaderFactory` can be wired up
as an extension point.

==== Shared Clusters

Use the mongodb.url parameter to connect to a shared cluster in Mongo.
Select the "Connect your application"

image::appendix/mongodb-setup-2.png[]

Choose the Java driver to produce the proper connection string for driver used by SymmetricDS

image::appendix/mongodb-setup-3.png[]

Add a new node to SymmetricDS

image::appendix/mongodb-setup-4.png[]

On the advanced options you can setup bulk loading and a database to use. If no database is used it will create and use one named "symmetricds".

image::appendix/mongodb-setup-5.png[]


==== Supported Operations for CDC

SymmetricDS uses the change streams feature of MongoDB to monitor ("watch") for changes that occur in MongoDB. The change stream API though does not support old data.

*Updates* - In the case of updates only the final updated values are provided to the change stream. This is sufficient for most replications as long as conflict detection is not needed.

*Deletes* - When a delete occurs the only value provided through the change stream is the _id that was deleted. This represents MongoDBs identifier for this row (similar to a rowid on other platforms).

In order to fully support deletes all replicated tables must contain an _id column and the following parameter must be turned on.

[source,properties]
----
mongodb.url=mongodb://host1:27017,host2:27017
mongodb.use.mongo.ids=true
----

endif::pro[]


Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Expand Up @@ -447,6 +447,8 @@ private ParameterConstants() {
GOOGLE_BIG_QUERY_PROJECT_ID, GOOGLE_BIG_QUERY_SECURITY_CREDENTIALS_PATH };
public final static String HBASE_SITE_XML_PATH = "hbase.site.xml.path";
public final static String MONGO_DEFAULT_DATABASE = "mongodb.default.database";
public final static String MONGO_USE_MONGO_IDS = "mongodb.use.mongo.ids";
public final static String[] ALL_MONGODB_PARAMS = new String[] { MONGO_DEFAULT_DATABASE, MONGO_USE_MONGO_IDS };

public static Map<String, ParameterMetaData> getParameterMetaData() {
return parameterMetaData;
Expand Down
Expand Up @@ -36,7 +36,7 @@
import org.jumpmind.symmetric.service.ITriggerRouterService;
import org.jumpmind.util.FormatUtils;

class ColumnsAccordingToTriggerHistory {
public class ColumnsAccordingToTriggerHistory {
private Map<CacheKey, Table> cache = new HashMap<CacheKey, Table>();
private Node sourceNode;
private Node targetNode;
Expand Down
Expand Up @@ -59,27 +59,27 @@

public class SelectFromTableSource extends SelectFromSource {
private final Logger log = LoggerFactory.getLogger(getClass());
private OutgoingBatch outgoingBatch;
private List<SelectFromTableEvent> selectFromTableEventsToSend;
private SelectFromTableEvent currentInitialLoadEvent;
private ISqlReadCursor<Data> cursor;
private SimpleRouterContext routingContext;
private Node node;
private Set<Node> nodeSet;
private TriggerRouter triggerRouter;
private Map<String, IDataRouter> routers;
private IDataRouter dataRouter;
private ColumnsAccordingToTriggerHistory columnsAccordingToTriggerHistory;
private String overrideSelectSql;
private boolean initialLoadSelectUsed;
private boolean isSelfReferencingFk;
private int selfRefLevel;
private String selfRefParentColumnName;
private String selfRefChildColumnName;
private boolean isFirstRow;
private boolean isLobFirstPass;
private boolean isConfiguration;
private boolean isInitialLoadUseColumnTemplates;
protected OutgoingBatch outgoingBatch;
protected List<SelectFromTableEvent> selectFromTableEventsToSend;
protected SelectFromTableEvent currentInitialLoadEvent;
protected ISqlReadCursor<Data> cursor;
protected SimpleRouterContext routingContext;
protected Node node;
protected Set<Node> nodeSet;
protected TriggerRouter triggerRouter;
protected Map<String, IDataRouter> routers;
protected IDataRouter dataRouter;
protected ColumnsAccordingToTriggerHistory columnsAccordingToTriggerHistory;
protected String overrideSelectSql;
protected boolean initialLoadSelectUsed;
protected boolean isSelfReferencingFk;
protected int selfRefLevel;
protected String selfRefParentColumnName;
protected String selfRefChildColumnName;
protected boolean isFirstRow;
protected boolean isLobFirstPass;
protected boolean isConfiguration;
protected boolean isInitialLoadUseColumnTemplates;

public SelectFromTableSource(ISymmetricEngine engine, OutgoingBatch outgoingBatch, Batch batch, SelectFromTableEvent event) {
super(engine);
Expand Down
Expand Up @@ -70,81 +70,83 @@ public void selectedTabChange(SelectedTabChangeEvent event) {
addComponent(tabSheet);
Connection c = null;
try {
c = ((DataSource) db.getPlatform().getDataSource()).getConnection();
DatabaseMetaData metaData = c.getMetaData();
tabSheet.addTab(createTabData(createGridWithReflection(DatabaseMetaData.class, metaData)), "Meta Data");
tabSheet.addTab(createTabData(createGridWithReflection(Connection.class, c)), "Connection");
try {
ResultSet rs = null;
try {
rs = metaData.getClientInfoProperties();
} catch (SQLException e) {
log.debug("Could not create Client Info Properties tab", e.getMessage());
}
Grid<List<Object>> clientInfoProperties = CommonUiUtils.putResultsInGrid(rs, Integer.MAX_VALUE, false);
clientInfoProperties.setSizeFull();
tabSheet.addTab(createTabData(clientInfoProperties), "Client Info Properties");
} catch (AbstractMethodError e) {
log.debug("Could not create Client Info Properties tab", e);
}
try {
Grid<List<Object>> catalogs = CommonUiUtils.putResultsInGrid(metaData.getCatalogs(), Integer.MAX_VALUE, false);
catalogs.setSizeFull();
tabSheet.addTab(createTabData(catalogs), "Catalogs");
} catch (AbstractMethodError e) {
log.debug("Could not create Catalogs tab", e);
}
try {
Grid<List<Object>> schemas;
try {
schemas = CommonUiUtils.putResultsInGrid(metaData.getSchemas(), Integer.MAX_VALUE, false);
} catch (SQLException e) {
schemas = CommonUiUtils.putResultsInGrid(metaData.getSchemas("", null), Integer.MAX_VALUE, false);
}
schemas.setSizeFull();
tabSheet.addTab(createTabData(schemas), "Schemas");
} catch (AbstractMethodError e) {
log.debug("Could not create Schemas tab", e);
}
try {
Grid<List<Object>> tableTypes = CommonUiUtils.putResultsInGrid(metaData.getTableTypes(), Integer.MAX_VALUE, false);
tableTypes.setSizeFull();
tabSheet.addTab(createTabData(tableTypes), "Table Types");
} catch (AbstractMethodError e) {
log.debug("Could not create Table Types tab", e);
}
try {
Grid<List<Object>> dataTypes = CommonUiUtils.putResultsInGrid(metaData.getTypeInfo(), Integer.MAX_VALUE, false);
dataTypes.setSizeFull();
tabSheet.addTab(createTabData(dataTypes), "Data Types");
} catch (AbstractMethodError e) {
log.debug("Could not create Data Types tab", e);
}
try {
tabSheet.addTab(createTabData(createGridFromString(metaData.getNumericFunctions(), "Numeric Functions")), "Numeric Functions");
} catch (AbstractMethodError e) {
log.debug("Could not create Numeric Functions tab", e);
}
try {
tabSheet.addTab(createTabData(createGridFromString(metaData.getStringFunctions(), "String Functions")), "String Functions");
} catch (AbstractMethodError e) {
log.debug("Could not create String Functions tab", e);
}
try {
tabSheet.addTab(createTabData(createGridFromString(metaData.getSystemFunctions(), "System Functions")), "System Functions");
} catch (AbstractMethodError e) {
log.debug("Could not create System Functions tab", e);
}
try {
tabSheet.addTab(createTabData(createGridFromString(metaData.getTimeDateFunctions(), "Date/Time Functions")), "Date/Time Functions");
} catch (AbstractMethodError e) {
log.debug("Could not create Date/Time Functions tab", e);
}
try {
tabSheet.addTab(createTabData(createGridFromString(metaData.getSQLKeywords(), "Keywords")), "Keywords");
} catch (AbstractMethodError e) {
log.debug("Could not create Keywords tab", e);
}
if (db.getPlatform().getDataSource() != null) {
c = ((DataSource) db.getPlatform().getDataSource()).getConnection();
DatabaseMetaData metaData = c.getMetaData();
tabSheet.addTab(createTabData(createGridWithReflection(DatabaseMetaData.class, metaData)), "Meta Data");
tabSheet.addTab(createTabData(createGridWithReflection(Connection.class, c)), "Connection");
try {
ResultSet rs = null;
try {
rs = metaData.getClientInfoProperties();
} catch (SQLException e) {
log.debug("Could not create Client Info Properties tab", e.getMessage());
}
Grid<List<Object>> clientInfoProperties = CommonUiUtils.putResultsInGrid(rs, Integer.MAX_VALUE, false);
clientInfoProperties.setSizeFull();
tabSheet.addTab(createTabData(clientInfoProperties), "Client Info Properties");
} catch (AbstractMethodError e) {
log.debug("Could not create Client Info Properties tab", e);
}
try {
Grid<List<Object>> catalogs = CommonUiUtils.putResultsInGrid(metaData.getCatalogs(), Integer.MAX_VALUE, false);
catalogs.setSizeFull();
tabSheet.addTab(createTabData(catalogs), "Catalogs");
} catch (AbstractMethodError e) {
log.debug("Could not create Catalogs tab", e);
}
try {
Grid<List<Object>> schemas;
try {
schemas = CommonUiUtils.putResultsInGrid(metaData.getSchemas(), Integer.MAX_VALUE, false);
} catch (SQLException e) {
schemas = CommonUiUtils.putResultsInGrid(metaData.getSchemas("", null), Integer.MAX_VALUE, false);
}
schemas.setSizeFull();
tabSheet.addTab(createTabData(schemas), "Schemas");
} catch (AbstractMethodError e) {
log.debug("Could not create Schemas tab", e);
}
try {
Grid<List<Object>> tableTypes = CommonUiUtils.putResultsInGrid(metaData.getTableTypes(), Integer.MAX_VALUE, false);
tableTypes.setSizeFull();
tabSheet.addTab(createTabData(tableTypes), "Table Types");
} catch (AbstractMethodError e) {
log.debug("Could not create Table Types tab", e);
}
try {
Grid<List<Object>> dataTypes = CommonUiUtils.putResultsInGrid(metaData.getTypeInfo(), Integer.MAX_VALUE, false);
dataTypes.setSizeFull();
tabSheet.addTab(createTabData(dataTypes), "Data Types");
} catch (AbstractMethodError e) {
log.debug("Could not create Data Types tab", e);
}
try {
tabSheet.addTab(createTabData(createGridFromString(metaData.getNumericFunctions(), "Numeric Functions")), "Numeric Functions");
} catch (AbstractMethodError e) {
log.debug("Could not create Numeric Functions tab", e);
}
try {
tabSheet.addTab(createTabData(createGridFromString(metaData.getStringFunctions(), "String Functions")), "String Functions");
} catch (AbstractMethodError e) {
log.debug("Could not create String Functions tab", e);
}
try {
tabSheet.addTab(createTabData(createGridFromString(metaData.getSystemFunctions(), "System Functions")), "System Functions");
} catch (AbstractMethodError e) {
log.debug("Could not create System Functions tab", e);
}
try {
tabSheet.addTab(createTabData(createGridFromString(metaData.getTimeDateFunctions(), "Date/Time Functions")), "Date/Time Functions");
} catch (AbstractMethodError e) {
log.debug("Could not create Date/Time Functions tab", e);
}
try {
tabSheet.addTab(createTabData(createGridFromString(metaData.getSQLKeywords(), "Keywords")), "Keywords");
} catch (AbstractMethodError e) {
log.debug("Could not create Keywords tab", e);
}
}
} catch (SQLException e) {
log.error("", e);
} finally {
Expand Down

0 comments on commit 2e91d15

Please sign in to comment.