Navigation Menu

Skip to content

Commit

Permalink
Merge remote-tracking branch 'origin/3.10' into 3.11
Browse files Browse the repository at this point in the history
Conflicts:
	symmetric-client/src/main/java/org/jumpmind/symmetric/ClientSymmetricEngine.java
	symmetric-core/src/main/java/org/jumpmind/symmetric/common/ParameterConstants.java
	symmetric-core/src/main/java/org/jumpmind/symmetric/service/impl/RouterService.java
	symmetric-db/src/main/java/org/jumpmind/db/platform/DatabaseNamesConstants.java
	symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/DefaultDatabaseWriterConflictResolver.java
  • Loading branch information
jumpmind-josh committed Jan 21, 2020
2 parents e4e4309 + 5902094 commit d4a37e9
Show file tree
Hide file tree
Showing 28 changed files with 1,087 additions and 11 deletions.
6 changes: 6 additions & 0 deletions symmetric-assemble/common.gradle
Expand Up @@ -294,6 +294,7 @@ subprojects { subproject ->
provided "org.apache.geronimo.specs:geronimo-j2ee-connector_1.6_spec:1.0"
provided ("com.datastax.cassandra:cassandra-driver-core:3.1.4") {
exclude group: 'org.slf4j'
exclude group: 'com.google.guava'
}
provided ("nl.cad:tps-parse:1.0.15-SNAPSHOT") {
exclude group: 'commons-lang', module: 'commons-lang'
Expand All @@ -309,6 +310,11 @@ subprojects { subproject ->
exclude group: 'org.slf4j'
}

provided ('com.google.cloud:google-cloud-bigquery:1.99.0') {
exclude group: 'com.google.protobuf'
}


testCompile fileTree(dir: System.getProperty("user.home") + '/.symmetricds/lib', include: '*.jar')
testCompile "junit:junit:$junitVersion"
testCompile "org.hamcrest:hamcrest-all:$hamcrestVersion"
Expand Down
75 changes: 75 additions & 0 deletions symmetric-assemble/src/asciidoc/appendix/bigquery.ad
@@ -0,0 +1,75 @@
=== Google Big Query

Send changes from your relational database to Google's Big Query.

==== Setup

Big Query is only supported as a load only node in SymmetricDS. See <<Load Only Node >> for details on setting up a load only node in SymmetricDS.

ifdef::pro[]
Setup the Big Query node by using the <<Add Node>> wizard and selecting Big Query as the type.

image::appendix/bigquery-database-settings.png[]

After hitting next you can setup advanced options for your Snowflake node.

endif::pro[]

ifndef::pro[]

.Example properties to setup a Google Big Query load only node
----
load.only=true

target.db.url=bigquery\://cloud.google.com
target.db.driver=google


----

endif::pro[]

==== Loading Data Into Big Query



ifndef::pro[]
===== Setup reload channels for bulk loading.

Update any reload channels that will be used on the table triggers that will capture changes and send them to Big Query by setting the column data_loader_type to 'bulk'. It is also recommended to increase the batch size so that larger CSV files will be processed instead of the default size on reloads of 10,000 rows.


.Example SQL to setup the main reload channel to use bulk and also update the batch sizes.
[source, SQL]
----
update sym_channel set data_loader_type='bulk', max_batch_size=500000 where channel_id='reload'
----
endif::pro[]

===== Big Query Authentication

Create a JSON credentials file through your Big Query account

https://cloud.google.com/docs/authentication/getting-started


ifdef::pro[]

Provide this file path on the advanced settings while setting up a Big Query node. The advanced settings also requires that you provide a project ID and location for your Big Query project.

====
NOTE: You will need to use your Google Big Query dataset name in the target schema of the router that is used to route data to Big Query.
====

image::appendix/bigquery-advanced-settings-snowflake-managed.png[]
endif::pro[]
ifndef::pro[]
[source, properties]
----
google.bigquery.project.id=<Google Big Query Project ID>
google.bigquery.security.credentials.path=<Local Path to JSON Credentials File>
google.bigquery.location=<Google Big Query Location: defaults to US>

----
endif::pro[]

13 changes: 12 additions & 1 deletion symmetric-assemble/src/asciidoc/appendix/databases.ad
Expand Up @@ -22,6 +22,17 @@ by database.
|Transactional DDL
|Load Only

|Big Query
|All
|N
|N
|N
|Y
|N
|N
|N
|Y

|DB2
|9.5
|N
Expand Down Expand Up @@ -486,7 +497,7 @@ When locating a table, SymmetricDS uses the default catalog and schema unless th
|select sys_context('USERENV', 'CURRENT_SCHEMA') from dual

|===

include::bigquery.ad[]
include::db2.ad[]
include::derby.ad[]
include::firebird.ad[]
Expand Down
19 changes: 17 additions & 2 deletions symmetric-assemble/src/asciidoc/appendix/hbase.ad
@@ -1,7 +1,22 @@

=== HBase

The HBase database is a load only database in SymmetricDS. It does require the phoenix jdbc driver though to utilize it. This driver should be downloaded and placed in the /lib folder of SymmetricDS and restarted.
==== Empty HBase

Tested with jar : phoenix-5.0.0-HBase-2.0-client.jar
If you are setting up replication to HBase and the tables are not already present in Hbase SymmetricDS can create them through the phoenix JDBC driver. This driver maintains some additional meta data about the tables so that they can be accessed using SQL through the JDBC driver.

This configuration is setup as a <<Load Only Node>> in SymmetricDS. It does require the phoenix jdbc driver though to utilize it. This driver should be downloaded and placed in the /lib folder of SymmetricDS and restarted.

==== Existing HBase

If you are setting up replication to an HBase database that already has tables present you will need to follow the steps below.

Setup a new H2 node that will contain all the SymmetricDS runtime tables. To do this go through the <<Add Node>> setup and select type H2 and provide a name for the database (it will create a new one locally if not present). This will allow SymmetricDS to create tables such as incoming_batch etc to maintain the replication.

Next you will need to setup a <<Channels, channel>> (or use the default channel) and set the `data_loader_type` to *hbase*.

Finally setup a parameter that contains the path of your hbase-site.xml file.

hbase.site.xml.path

All changes captured will now use the HBase data loader to load into an existing HBase table.
2 changes: 1 addition & 1 deletion symmetric-assemble/src/asciidoc/appendix/snowflake.ad
Expand Up @@ -4,7 +4,7 @@ Send changes from your relational database to Snowflake.

==== Setup

Snowflake is only support as a load only node in SymmetricDS. See <<Load Only Node >> for details on setting up a load only node in SymmetricDS.
Snowflake is only supported as a load only node in SymmetricDS. See <<Load Only Node >> for details on setting up a load only node in SymmetricDS.


ifdef::pro[]
Expand Down
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Sorry, something went wrong. Reload?
Sorry, we cannot display this file.
Sorry, this file is invalid so it cannot be displayed.
Expand Up @@ -331,7 +331,7 @@ protected IDatabasePlatform createDatabasePlatform(TypedProperties properties) {

public static IDatabasePlatform createDatabasePlatform(ApplicationContext springContext, TypedProperties properties,
DataSource dataSource, boolean waitOnAvailableDatabase) {
return createDatabasePlatform(springContext, properties, dataSource, waitOnAvailableDatabase, false);
return createDatabasePlatform(springContext, properties, dataSource, waitOnAvailableDatabase, properties.is(ParameterConstants.NODE_LOAD_ONLY));
}
public static IDatabasePlatform createDatabasePlatform(ApplicationContext springContext, TypedProperties properties,
DataSource dataSource, boolean waitOnAvailableDatabase, boolean isLoadOnly) {
Expand All @@ -344,6 +344,23 @@ public static IDatabasePlatform createDatabasePlatform(ApplicationContext spring
return new CassandraPlatform(createSqlTemplateSettings(properties), dbUrl.substring(12));
} else if (dbDriver != null && dbDriver.contains("kafka")) {
return new KafkaPlatform(createSqlTemplateSettings(properties));
} else if (dbUrl != null && dbUrl.startsWith("bigquery://")) {
try {
HttpTransportOptions transportOptions = BigQueryOptions.getDefaultHttpTransportOptions();
transportOptions = transportOptions.toBuilder().setConnectTimeout(60000).setReadTimeout(60000)
.build();

BigQuery bigquery = BigQueryOptions.newBuilder()
.setProjectId(properties.get(ParameterConstants.GOOGLE_BIG_QUERY_PROJECT_ID))
.setLocation(properties.get(ParameterConstants.GOOGLE_BIG_QUERY_LOCATION, "US"))
.setCredentials(ServiceAccountCredentials.fromStream(
new FileInputStream(properties.get(ParameterConstants.GOOGLE_BIG_QUERY_SECURITY_CREDENTIALS_PATH))))
.setTransportOptions(transportOptions)
.build().getService();
return new BigQueryPlatform(createSqlTemplateSettings(properties), bigquery);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
String jndiName = properties.getProperty(ParameterConstants.DB_JNDI_NAME);
Expand Down
@@ -0,0 +1,69 @@
package org.jumpmind.symmetric.ext;

import java.util.List;

import org.jumpmind.db.platform.DatabaseNamesConstants;
import org.jumpmind.db.platform.IDatabasePlatform;
import org.jumpmind.db.platform.bigquery.BigQueryPlatform;
import org.jumpmind.symmetric.ISymmetricEngine;
import org.jumpmind.symmetric.db.ISymmetricDialect;
import org.jumpmind.symmetric.io.BigQueryBulkDatabaseWriter;
import org.jumpmind.symmetric.io.data.IDataWriter;
import org.jumpmind.symmetric.io.data.writer.Conflict;
import org.jumpmind.symmetric.io.data.writer.IDatabaseWriterErrorHandler;
import org.jumpmind.symmetric.io.data.writer.IDatabaseWriterFilter;
import org.jumpmind.symmetric.io.data.writer.ResolvedData;
import org.jumpmind.symmetric.io.data.writer.TransformWriter;
import org.jumpmind.symmetric.io.stage.IStagingManager;
import org.jumpmind.symmetric.load.AbstractDataLoaderFactory;
import org.jumpmind.symmetric.load.IDataLoaderFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.cloud.bigquery.BigQuery;

public class BigQueryDataLoaderFactory extends AbstractDataLoaderFactory implements IDataLoaderFactory {
protected final Logger log = LoggerFactory.getLogger(getClass());

private IStagingManager stagingManager;

private BigQuery bigquery;

public BigQueryDataLoaderFactory(ISymmetricEngine engine) {
this.stagingManager = engine.getStagingManager();
this.parameterService = engine.getParameterService();
this.bigquery = ((BigQueryPlatform) engine.getSymmetricDialect().getTargetPlatform()).getBigQuery();

}

public String getTypeName() {
return "bigquery_bulk";
}

public IDataWriter getDataWriter(String sourceNodeId, ISymmetricDialect symmetricDialect,
TransformWriter transformWriter,
List<IDatabaseWriterFilter> filters, List<IDatabaseWriterErrorHandler> errorHandlers,
List<? extends Conflict> conflictSettings, List<ResolvedData> resolvedData) {

try {
return new BigQueryBulkDatabaseWriter(symmetricDialect.getPlatform(), symmetricDialect.getTargetPlatform(),
symmetricDialect.getTablePrefix(), stagingManager, filters, errorHandlers, parameterService,
buildParameterDatabaseWritterSettings(), this.bigquery);

} catch (Exception e) {
log.warn(
"Failed to create the big query database writer.",
e);
if (e instanceof RuntimeException) {
throw (RuntimeException) e;
} else {
throw new RuntimeException(e);
}
}
}

public boolean isPlatformSupported(IDatabasePlatform platform) {
return (DatabaseNamesConstants.BIGQUERY.equals(platform.getName()));
}

}
Expand Up @@ -73,6 +73,9 @@ public IDataWriter getDataWriter(String sourceNodeId, ISymmetricDialect symmetri
} else if (platformName != null && platformName.startsWith(DatabaseNamesConstants.SNOWFLAKE)) {
return new SnowflakeBulkDataLoaderFactory(engine).getDataWriter(sourceNodeId, symmetricDialect, transformWriter,
filters, errorHandlers, conflictSettings, resolvedData);
} else if (DatabaseNamesConstants.BIGQUERY.equals(engine.getSymmetricDialect().getTargetPlatform().getName())) {
return new BigQueryDataLoaderFactory(engine).getDataWriter(sourceNodeId, symmetricDialect, transformWriter,
filters, errorHandlers, conflictSettings, resolvedData);
} else {
return new JdbcBatchBulkDatabaseWriter(symmetricDialect.getPlatform(), platform,
symmetricDialect.getTablePrefix(), buildParameterDatabaseWritterSettings());
Expand Down
@@ -0,0 +1,84 @@
package org.jumpmind.symmetric.io;


import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.channels.Channels;
import java.sql.SQLException;
import java.util.List;

import org.jumpmind.db.model.Table;
import org.jumpmind.db.platform.IDatabasePlatform;
import org.jumpmind.symmetric.io.data.writer.DatabaseWriterSettings;
import org.jumpmind.symmetric.io.data.writer.IDatabaseWriterErrorHandler;
import org.jumpmind.symmetric.io.data.writer.IDatabaseWriterFilter;
import org.jumpmind.symmetric.io.stage.IStagingManager;
import org.jumpmind.symmetric.service.IParameterService;

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.FormatOptions;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobId;
import com.google.cloud.bigquery.JobStatistics.LoadStatistics;
import com.google.cloud.bigquery.TableDataWriteChannel;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.WriteChannelConfiguration;
import com.google.common.io.Files;


public class BigQueryBulkDatabaseWriter extends CloudBulkDatabaseWriter {

BigQuery bigquery;

public BigQueryBulkDatabaseWriter(IDatabasePlatform symmetricPlatform, IDatabasePlatform targetPlatform,
String tablePrefix, IStagingManager stagingManager, List<IDatabaseWriterFilter> filters,
List<IDatabaseWriterErrorHandler> errorHandlers, IParameterService parameterService, DatabaseWriterSettings writerSettings,
BigQuery bq) throws FileNotFoundException, IOException {
super(symmetricPlatform, targetPlatform, tablePrefix, stagingManager, filters, errorHandlers, parameterService, writerSettings);
fieldTerminator = ",";
bigquery = bq;
}

@Override
public void loadToCloudDatabase() throws SQLException {
try {
File csvPath = this.stagedInputFile.getFile();

TableId tableId = TableId.of(this.targetTable.getSchema(), this.targetTable.getName());
WriteChannelConfiguration writeChannelConfiguration =
WriteChannelConfiguration.newBuilder(tableId).setFormatOptions(FormatOptions.csv()).setAutodetect(false).setDestinationTable(tableId).build();
// The location must be specified; other fields can be auto-detected.
JobId jobId = JobId.newBuilder()
.setLocation(bigquery.getOptions().getLocation())
.setProject(bigquery.getOptions().getProjectId()).build();

TableDataWriteChannel writer = bigquery.writer(jobId, writeChannelConfiguration);
// Write data to writer
OutputStream stream = Channels.newOutputStream(writer);
Files.copy(csvPath, stream);
stream.close();

// Get load job
Job job = writer.getJob();
job = job.waitFor();
LoadStatistics stats = job.getStatistics();
} catch (Exception ex) {
throw getPlatform().getSqlTemplate().translate(ex);
}
}

@Override
protected Table lookupTableAtTarget(Table sourceTable) {
return sourceTable;
}

@Override
public void copyToCloudStorage() throws SQLException {
}

@Override
public void cleanUpCloudStorage() throws SQLException {
}
}
Expand Up @@ -314,7 +314,7 @@ public void bulkWrite(CsvData data) {
default:
flush();
context.put(ContextConstants.CONTEXT_BULK_WRITER_TO_USE, "default");
super.write(data);
writeDefault(data);
break;
}

Expand Down

0 comments on commit d4a37e9

Please sign in to comment.