diff --git a/symmetric-assemble/common.gradle b/symmetric-assemble/common.gradle index db142f406b..bb013d0c22 100644 --- a/symmetric-assemble/common.gradle +++ b/symmetric-assemble/common.gradle @@ -294,6 +294,7 @@ subprojects { subproject -> provided "org.apache.geronimo.specs:geronimo-j2ee-connector_1.6_spec:1.0" provided ("com.datastax.cassandra:cassandra-driver-core:3.1.4") { exclude group: 'org.slf4j' + exclude group: 'com.google.guava' } provided ("nl.cad:tps-parse:1.0.15-SNAPSHOT") { exclude group: 'commons-lang', module: 'commons-lang' @@ -309,6 +310,11 @@ subprojects { subproject -> exclude group: 'org.slf4j' } + provided ('com.google.cloud:google-cloud-bigquery:1.99.0') { + exclude group: 'com.google.protobuf' + } + + testCompile fileTree(dir: System.getProperty("user.home") + '/.symmetricds/lib', include: '*.jar') testCompile "junit:junit:$junitVersion" testCompile "org.hamcrest:hamcrest-all:$hamcrestVersion" diff --git a/symmetric-assemble/src/asciidoc/appendix/bigquery.ad b/symmetric-assemble/src/asciidoc/appendix/bigquery.ad new file mode 100644 index 0000000000..ff5ddb6505 --- /dev/null +++ b/symmetric-assemble/src/asciidoc/appendix/bigquery.ad @@ -0,0 +1,75 @@ +=== Google Big Query + +Send changes from your relational database to Google's Big Query. + +==== Setup + +Big Query is only supported as a load only node in SymmetricDS. See <> for details on setting up a load only node in SymmetricDS. + +ifdef::pro[] +Setup the Big Query node by using the <> wizard and selecting Big Query as the type. + +image::appendix/bigquery-database-settings.png[] + +After hitting next you can setup advanced options for your Snowflake node. + +endif::pro[] + +ifndef::pro[] + +.Example properties to setup a Google Big Query load only node +---- +load.only=true + +target.db.url=bigquery\://cloud.google.com +target.db.driver=google + + +---- + +endif::pro[] + +==== Loading Data Into Big Query + + + +ifndef::pro[] +===== Setup reload channels for bulk loading. + +Update any reload channels that will be used on the table triggers that will capture changes and send them to Big Query by setting the column data_loader_type to 'bulk'. It is also recommended to increase the batch size so that larger CSV files will be processed instead of the default size on reloads of 10,000 rows. + + +.Example SQL to setup the main reload channel to use bulk and also update the batch sizes. +[source, SQL] +---- +update sym_channel set data_loader_type='bulk', max_batch_size=500000 where channel_id='reload' +---- +endif::pro[] + +===== Big Query Authentication + +Create a JSON credentials file through your Big Query account + +https://cloud.google.com/docs/authentication/getting-started + + +ifdef::pro[] + +Provide this file path on the advanced settings while setting up a Big Query node. The advanced settings also requires that you provide a project ID and location for your Big Query project. + +==== +NOTE: You will need to use your Google Big Query dataset name in the target schema of the router that is used to route data to Big Query. +==== + +image::appendix/bigquery-advanced-settings-snowflake-managed.png[] +endif::pro[] +ifndef::pro[] +[source, properties] +---- +google.bigquery.project.id= +google.bigquery.security.credentials.path= +google.bigquery.location= + +---- +endif::pro[] + diff --git a/symmetric-assemble/src/asciidoc/appendix/databases.ad b/symmetric-assemble/src/asciidoc/appendix/databases.ad index 5b0868dfff..5392b86330 100644 --- a/symmetric-assemble/src/asciidoc/appendix/databases.ad +++ b/symmetric-assemble/src/asciidoc/appendix/databases.ad @@ -22,6 +22,17 @@ by database. |Transactional DDL |Load Only +|Big Query +|All +|N +|N +|N +|Y +|N +|N +|N +|Y + |DB2 |9.5 |N @@ -486,7 +497,7 @@ When locating a table, SymmetricDS uses the default catalog and schema unless th |select sys_context('USERENV', 'CURRENT_SCHEMA') from dual |=== - +include::bigquery.ad[] include::db2.ad[] include::derby.ad[] include::firebird.ad[] diff --git a/symmetric-assemble/src/asciidoc/appendix/hbase.ad b/symmetric-assemble/src/asciidoc/appendix/hbase.ad index 222a7e773f..3fd194fc73 100644 --- a/symmetric-assemble/src/asciidoc/appendix/hbase.ad +++ b/symmetric-assemble/src/asciidoc/appendix/hbase.ad @@ -1,7 +1,22 @@ === HBase -The HBase database is a load only database in SymmetricDS. It does require the phoenix jdbc driver though to utilize it. This driver should be downloaded and placed in the /lib folder of SymmetricDS and restarted. +==== Empty HBase -Tested with jar : phoenix-5.0.0-HBase-2.0-client.jar +If you are setting up replication to HBase and the tables are not already present in Hbase SymmetricDS can create them through the phoenix JDBC driver. This driver maintains some additional meta data about the tables so that they can be accessed using SQL through the JDBC driver. +This configuration is setup as a <> in SymmetricDS. It does require the phoenix jdbc driver though to utilize it. This driver should be downloaded and placed in the /lib folder of SymmetricDS and restarted. + +==== Existing HBase + +If you are setting up replication to an HBase database that already has tables present you will need to follow the steps below. + +Setup a new H2 node that will contain all the SymmetricDS runtime tables. To do this go through the <> setup and select type H2 and provide a name for the database (it will create a new one locally if not present). This will allow SymmetricDS to create tables such as incoming_batch etc to maintain the replication. + +Next you will need to setup a <> (or use the default channel) and set the `data_loader_type` to *hbase*. + +Finally setup a parameter that contains the path of your hbase-site.xml file. + +hbase.site.xml.path + +All changes captured will now use the HBase data loader to load into an existing HBase table. \ No newline at end of file diff --git a/symmetric-assemble/src/asciidoc/appendix/snowflake.ad b/symmetric-assemble/src/asciidoc/appendix/snowflake.ad index e6d9e39962..ab122543d9 100644 --- a/symmetric-assemble/src/asciidoc/appendix/snowflake.ad +++ b/symmetric-assemble/src/asciidoc/appendix/snowflake.ad @@ -4,7 +4,7 @@ Send changes from your relational database to Snowflake. ==== Setup -Snowflake is only support as a load only node in SymmetricDS. See <> for details on setting up a load only node in SymmetricDS. +Snowflake is only supported as a load only node in SymmetricDS. See <> for details on setting up a load only node in SymmetricDS. ifdef::pro[] diff --git a/symmetric-assemble/src/asciidoc/images/appendix/bigquery-advanced-settings.png b/symmetric-assemble/src/asciidoc/images/appendix/bigquery-advanced-settings.png new file mode 100644 index 0000000000..6fce8b58c3 Binary files /dev/null and b/symmetric-assemble/src/asciidoc/images/appendix/bigquery-advanced-settings.png differ diff --git a/symmetric-assemble/src/asciidoc/images/appendix/bigquery-database-settings.png b/symmetric-assemble/src/asciidoc/images/appendix/bigquery-database-settings.png new file mode 100644 index 0000000000..f3b4b9c548 Binary files /dev/null and b/symmetric-assemble/src/asciidoc/images/appendix/bigquery-database-settings.png differ diff --git a/symmetric-client/src/main/java/org/jumpmind/symmetric/ClientSymmetricEngine.java b/symmetric-client/src/main/java/org/jumpmind/symmetric/ClientSymmetricEngine.java index f05f6bf0db..f1b629ad4f 100644 --- a/symmetric-client/src/main/java/org/jumpmind/symmetric/ClientSymmetricEngine.java +++ b/symmetric-client/src/main/java/org/jumpmind/symmetric/ClientSymmetricEngine.java @@ -331,7 +331,7 @@ protected IDatabasePlatform createDatabasePlatform(TypedProperties properties) { public static IDatabasePlatform createDatabasePlatform(ApplicationContext springContext, TypedProperties properties, DataSource dataSource, boolean waitOnAvailableDatabase) { - return createDatabasePlatform(springContext, properties, dataSource, waitOnAvailableDatabase, false); + return createDatabasePlatform(springContext, properties, dataSource, waitOnAvailableDatabase, properties.is(ParameterConstants.NODE_LOAD_ONLY)); } public static IDatabasePlatform createDatabasePlatform(ApplicationContext springContext, TypedProperties properties, DataSource dataSource, boolean waitOnAvailableDatabase, boolean isLoadOnly) { @@ -344,6 +344,23 @@ public static IDatabasePlatform createDatabasePlatform(ApplicationContext spring return new CassandraPlatform(createSqlTemplateSettings(properties), dbUrl.substring(12)); } else if (dbDriver != null && dbDriver.contains("kafka")) { return new KafkaPlatform(createSqlTemplateSettings(properties)); + } else if (dbUrl != null && dbUrl.startsWith("bigquery://")) { + try { + HttpTransportOptions transportOptions = BigQueryOptions.getDefaultHttpTransportOptions(); + transportOptions = transportOptions.toBuilder().setConnectTimeout(60000).setReadTimeout(60000) + .build(); + + BigQuery bigquery = BigQueryOptions.newBuilder() + .setProjectId(properties.get(ParameterConstants.GOOGLE_BIG_QUERY_PROJECT_ID)) + .setLocation(properties.get(ParameterConstants.GOOGLE_BIG_QUERY_LOCATION, "US")) + .setCredentials(ServiceAccountCredentials.fromStream( + new FileInputStream(properties.get(ParameterConstants.GOOGLE_BIG_QUERY_SECURITY_CREDENTIALS_PATH)))) + .setTransportOptions(transportOptions) + .build().getService(); + return new BigQueryPlatform(createSqlTemplateSettings(properties), bigquery); + } catch (Exception e) { + throw new RuntimeException(e); + } } } String jndiName = properties.getProperty(ParameterConstants.DB_JNDI_NAME); diff --git a/symmetric-client/src/main/java/org/jumpmind/symmetric/ext/BigQueryDataLoaderFactory.java b/symmetric-client/src/main/java/org/jumpmind/symmetric/ext/BigQueryDataLoaderFactory.java new file mode 100644 index 0000000000..660f8a466f --- /dev/null +++ b/symmetric-client/src/main/java/org/jumpmind/symmetric/ext/BigQueryDataLoaderFactory.java @@ -0,0 +1,69 @@ +package org.jumpmind.symmetric.ext; + +import java.util.List; + +import org.jumpmind.db.platform.DatabaseNamesConstants; +import org.jumpmind.db.platform.IDatabasePlatform; +import org.jumpmind.db.platform.bigquery.BigQueryPlatform; +import org.jumpmind.symmetric.ISymmetricEngine; +import org.jumpmind.symmetric.db.ISymmetricDialect; +import org.jumpmind.symmetric.io.BigQueryBulkDatabaseWriter; +import org.jumpmind.symmetric.io.data.IDataWriter; +import org.jumpmind.symmetric.io.data.writer.Conflict; +import org.jumpmind.symmetric.io.data.writer.IDatabaseWriterErrorHandler; +import org.jumpmind.symmetric.io.data.writer.IDatabaseWriterFilter; +import org.jumpmind.symmetric.io.data.writer.ResolvedData; +import org.jumpmind.symmetric.io.data.writer.TransformWriter; +import org.jumpmind.symmetric.io.stage.IStagingManager; +import org.jumpmind.symmetric.load.AbstractDataLoaderFactory; +import org.jumpmind.symmetric.load.IDataLoaderFactory; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import com.google.cloud.bigquery.BigQuery; + +public class BigQueryDataLoaderFactory extends AbstractDataLoaderFactory implements IDataLoaderFactory { + protected final Logger log = LoggerFactory.getLogger(getClass()); + + private IStagingManager stagingManager; + + private BigQuery bigquery; + + public BigQueryDataLoaderFactory(ISymmetricEngine engine) { + this.stagingManager = engine.getStagingManager(); + this.parameterService = engine.getParameterService(); + this.bigquery = ((BigQueryPlatform) engine.getSymmetricDialect().getTargetPlatform()).getBigQuery(); + + } + + public String getTypeName() { + return "bigquery_bulk"; + } + + public IDataWriter getDataWriter(String sourceNodeId, ISymmetricDialect symmetricDialect, + TransformWriter transformWriter, + List filters, List errorHandlers, + List conflictSettings, List resolvedData) { + + try { + return new BigQueryBulkDatabaseWriter(symmetricDialect.getPlatform(), symmetricDialect.getTargetPlatform(), + symmetricDialect.getTablePrefix(), stagingManager, filters, errorHandlers, parameterService, + buildParameterDatabaseWritterSettings(), this.bigquery); + + } catch (Exception e) { + log.warn( + "Failed to create the big query database writer.", + e); + if (e instanceof RuntimeException) { + throw (RuntimeException) e; + } else { + throw new RuntimeException(e); + } + } + } + + public boolean isPlatformSupported(IDatabasePlatform platform) { + return (DatabaseNamesConstants.BIGQUERY.equals(platform.getName())); + } + +} diff --git a/symmetric-client/src/main/java/org/jumpmind/symmetric/ext/BulkDataLoaderFactory.java b/symmetric-client/src/main/java/org/jumpmind/symmetric/ext/BulkDataLoaderFactory.java index 6b8c996209..e5474a7f34 100644 --- a/symmetric-client/src/main/java/org/jumpmind/symmetric/ext/BulkDataLoaderFactory.java +++ b/symmetric-client/src/main/java/org/jumpmind/symmetric/ext/BulkDataLoaderFactory.java @@ -73,6 +73,9 @@ public IDataWriter getDataWriter(String sourceNodeId, ISymmetricDialect symmetri } else if (platformName != null && platformName.startsWith(DatabaseNamesConstants.SNOWFLAKE)) { return new SnowflakeBulkDataLoaderFactory(engine).getDataWriter(sourceNodeId, symmetricDialect, transformWriter, filters, errorHandlers, conflictSettings, resolvedData); + } else if (DatabaseNamesConstants.BIGQUERY.equals(engine.getSymmetricDialect().getTargetPlatform().getName())) { + return new BigQueryDataLoaderFactory(engine).getDataWriter(sourceNodeId, symmetricDialect, transformWriter, + filters, errorHandlers, conflictSettings, resolvedData); } else { return new JdbcBatchBulkDatabaseWriter(symmetricDialect.getPlatform(), platform, symmetricDialect.getTablePrefix(), buildParameterDatabaseWritterSettings()); diff --git a/symmetric-client/src/main/java/org/jumpmind/symmetric/io/BigQueryBulkDatabaseWriter.java b/symmetric-client/src/main/java/org/jumpmind/symmetric/io/BigQueryBulkDatabaseWriter.java new file mode 100644 index 0000000000..9d09b3b75d --- /dev/null +++ b/symmetric-client/src/main/java/org/jumpmind/symmetric/io/BigQueryBulkDatabaseWriter.java @@ -0,0 +1,84 @@ +package org.jumpmind.symmetric.io; + + +import java.io.File; +import java.io.FileNotFoundException; +import java.io.IOException; +import java.io.OutputStream; +import java.nio.channels.Channels; +import java.sql.SQLException; +import java.util.List; + +import org.jumpmind.db.model.Table; +import org.jumpmind.db.platform.IDatabasePlatform; +import org.jumpmind.symmetric.io.data.writer.DatabaseWriterSettings; +import org.jumpmind.symmetric.io.data.writer.IDatabaseWriterErrorHandler; +import org.jumpmind.symmetric.io.data.writer.IDatabaseWriterFilter; +import org.jumpmind.symmetric.io.stage.IStagingManager; +import org.jumpmind.symmetric.service.IParameterService; + +import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.FormatOptions; +import com.google.cloud.bigquery.Job; +import com.google.cloud.bigquery.JobId; +import com.google.cloud.bigquery.JobStatistics.LoadStatistics; +import com.google.cloud.bigquery.TableDataWriteChannel; +import com.google.cloud.bigquery.TableId; +import com.google.cloud.bigquery.WriteChannelConfiguration; +import com.google.common.io.Files; + + +public class BigQueryBulkDatabaseWriter extends CloudBulkDatabaseWriter { + + BigQuery bigquery; + + public BigQueryBulkDatabaseWriter(IDatabasePlatform symmetricPlatform, IDatabasePlatform targetPlatform, + String tablePrefix, IStagingManager stagingManager, List filters, + List errorHandlers, IParameterService parameterService, DatabaseWriterSettings writerSettings, + BigQuery bq) throws FileNotFoundException, IOException { + super(symmetricPlatform, targetPlatform, tablePrefix, stagingManager, filters, errorHandlers, parameterService, writerSettings); + fieldTerminator = ","; + bigquery = bq; + } + + @Override + public void loadToCloudDatabase() throws SQLException { + try { + File csvPath = this.stagedInputFile.getFile(); + + TableId tableId = TableId.of(this.targetTable.getSchema(), this.targetTable.getName()); + WriteChannelConfiguration writeChannelConfiguration = + WriteChannelConfiguration.newBuilder(tableId).setFormatOptions(FormatOptions.csv()).setAutodetect(false).setDestinationTable(tableId).build(); + // The location must be specified; other fields can be auto-detected. + JobId jobId = JobId.newBuilder() + .setLocation(bigquery.getOptions().getLocation()) + .setProject(bigquery.getOptions().getProjectId()).build(); + + TableDataWriteChannel writer = bigquery.writer(jobId, writeChannelConfiguration); + // Write data to writer + OutputStream stream = Channels.newOutputStream(writer); + Files.copy(csvPath, stream); + stream.close(); + + // Get load job + Job job = writer.getJob(); + job = job.waitFor(); + LoadStatistics stats = job.getStatistics(); + } catch (Exception ex) { + throw getPlatform().getSqlTemplate().translate(ex); + } + } + + @Override + protected Table lookupTableAtTarget(Table sourceTable) { + return sourceTable; + } + + @Override + public void copyToCloudStorage() throws SQLException { + } + + @Override + public void cleanUpCloudStorage() throws SQLException { + } +} diff --git a/symmetric-client/src/main/java/org/jumpmind/symmetric/io/CloudBulkDatabaseWriter.java b/symmetric-client/src/main/java/org/jumpmind/symmetric/io/CloudBulkDatabaseWriter.java index b6b2d0644a..a82c542c56 100644 --- a/symmetric-client/src/main/java/org/jumpmind/symmetric/io/CloudBulkDatabaseWriter.java +++ b/symmetric-client/src/main/java/org/jumpmind/symmetric/io/CloudBulkDatabaseWriter.java @@ -314,7 +314,7 @@ public void bulkWrite(CsvData data) { default: flush(); context.put(ContextConstants.CONTEXT_BULK_WRITER_TO_USE, "default"); - super.write(data); + writeDefault(data); break; } diff --git a/symmetric-client/src/main/java/org/jumpmind/symmetric/io/HbaseDataLoaderFactory.java b/symmetric-client/src/main/java/org/jumpmind/symmetric/io/HbaseDataLoaderFactory.java new file mode 100644 index 0000000000..0e1c6016c3 --- /dev/null +++ b/symmetric-client/src/main/java/org/jumpmind/symmetric/io/HbaseDataLoaderFactory.java @@ -0,0 +1,61 @@ +package org.jumpmind.symmetric.io; + +import java.util.List; + +import org.jumpmind.extension.IBuiltInExtensionPoint; +import org.jumpmind.symmetric.ISymmetricEngine; +import org.jumpmind.symmetric.common.ParameterConstants; +import org.jumpmind.symmetric.db.ISymmetricDialect; +import org.jumpmind.symmetric.ext.ISymmetricEngineAware; +import org.jumpmind.symmetric.io.data.IDataWriter; +import org.jumpmind.symmetric.io.data.writer.Conflict; +import org.jumpmind.symmetric.io.data.writer.IDatabaseWriterErrorHandler; +import org.jumpmind.symmetric.io.data.writer.IDatabaseWriterFilter; +import org.jumpmind.symmetric.io.data.writer.ResolvedData; +import org.jumpmind.symmetric.io.data.writer.TransformWriter; +import org.jumpmind.symmetric.load.DefaultDataLoaderFactory; + +public class HbaseDataLoaderFactory extends DefaultDataLoaderFactory implements + ISymmetricEngineAware, IBuiltInExtensionPoint { + + protected ISymmetricEngine engine; + + protected String typeName = "hbase"; + + protected String hbaseSiteXmlPath; + + protected IDataWriter hbaseDataWriter; + + public HbaseDataLoaderFactory() { + super(); + } + + @Override + public void setSymmetricEngine(ISymmetricEngine engine) { + this.engine = engine; + this.parameterService = engine.getParameterService(); + } + + @Override + public String getTypeName() { + return typeName; + } + + public void setTypeName(String typeName) { + this.typeName = typeName; + } + + + @Override + public IDataWriter getDataWriter(String sourceNodeId, ISymmetricDialect symmetricDialect, + TransformWriter transformWriter, List filters, + List errorHandlers, + List conflictSettings, List resolvedData) { + + if (hbaseDataWriter == null) { + this.hbaseSiteXmlPath = parameterService.getString(ParameterConstants.HBASE_SITE_XML_PATH); + this.hbaseDataWriter = new HbaseDatabaseWriter(this.hbaseSiteXmlPath); + } + return this.hbaseDataWriter; + } +} diff --git a/symmetric-client/src/main/java/org/jumpmind/symmetric/io/HbaseDatabaseWriter.java b/symmetric-client/src/main/java/org/jumpmind/symmetric/io/HbaseDatabaseWriter.java new file mode 100644 index 0000000000..5fa57941d4 --- /dev/null +++ b/symmetric-client/src/main/java/org/jumpmind/symmetric/io/HbaseDatabaseWriter.java @@ -0,0 +1,129 @@ +package org.jumpmind.symmetric.io; + +import java.io.IOException; +import java.util.ArrayList; +import java.util.List; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.hbase.HBaseConfiguration; +import org.apache.hadoop.hbase.TableName; +import org.apache.hadoop.hbase.client.Connection; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.Delete; +import org.apache.hadoop.hbase.client.Put; +import org.apache.hadoop.hbase.client.Table; +import org.jumpmind.db.model.Column; +import org.jumpmind.symmetric.io.data.CsvData; +import org.jumpmind.symmetric.io.data.writer.AbstractDatabaseWriter; + +public class HbaseDatabaseWriter extends AbstractDatabaseWriter { + + private Configuration config; + private Connection connection; + private Table table; + private String hbaseSiteXmlPath; + + public HbaseDatabaseWriter(String hbaseSiteXmlPath) { + this.hbaseSiteXmlPath = hbaseSiteXmlPath; + } + + protected void setup() { + try { + if (config == null) { + Configuration config = HBaseConfiguration.create(); + config.addResource(new Path(this.hbaseSiteXmlPath)); + } + + if (connection == null) { + log.debug("Establishing connection to HBase"); + connection = ConnectionFactory.createConnection(config); + } + if (table == null) { + log.debug("Connected to HBase, now looking up table " + this.targetTable.getName()); + table = connection.getTable(TableName.valueOf(this.targetTable.getName())); + } + } catch (IOException e) { + log.error("Unable to connect to HBase ", e); + } + } + + protected LoadStatus put(CsvData data) { + try { + setup(); + Put put = new Put(data.getPkData(this.targetTable)[0].getBytes()); + + String[] values = data.getParsedData(CsvData.ROW_DATA); + Column[] columns = sourceTable.getColumns(); + + List putList = new ArrayList(); + + for (int i = 0; i < columns.length; i++) { + if (columns[i].getName().contains(":")) { + log.debug("Preparing put statement into Hbase."); + String[] split = columns[i].getName().split(":"); + byte[] columnFamily = split[0].getBytes(); + byte[] columnName = split[1].getBytes(); + + put.addColumn(columnFamily, columnName, values[i].getBytes()); + putList.add(put); + } + } + + log.debug("Put list for HBase complete with a size of " + putList.size()); + table.put(putList); + log.debug("Put rows into HBase now closing connection"); + table.close(); + } catch (IOException e) { + log.error("Unable to load data into HBase ", e); + throw new RuntimeException(e); + } + + return LoadStatus.SUCCESS; + } + + @Override + protected LoadStatus insert(CsvData data) { + return put(data); + } + + @Override + protected LoadStatus delete(CsvData data, boolean useConflictDetection) { + setup(); + + String[] pkData = data.getParsedData(CsvData.PK_DATA); + if (pkData != null && pkData.length == 1) { + Delete delete = new Delete(pkData[0].getBytes()); + try { + table.delete(delete); + } catch (IOException e) { + log.error("Unable to delete data for table " + this.targetTable.getName() + + ", for primary key " + pkData[0]); + throw new RuntimeException(e); + } + } + return LoadStatus.SUCCESS; + + } + + @Override + protected LoadStatus update(CsvData data, boolean applyChangesOnly, boolean useConflictDetection) { + return put(data); + } + + @Override + protected boolean create(CsvData data) { + return false; + } + + @Override + protected boolean sql(CsvData data) { + return false; + } + + @Override + protected void logFailureDetails(Throwable e, CsvData data, boolean logLastDmlDetails) { + } + + +} diff --git a/symmetric-client/src/main/resources/symmetric-ext-points.xml b/symmetric-client/src/main/resources/symmetric-ext-points.xml index 8ff5217484..e41ca4e714 100644 --- a/symmetric-client/src/main/resources/symmetric-ext-points.xml +++ b/symmetric-client/src/main/resources/symmetric-ext-points.xml @@ -18,6 +18,10 @@ + + + + diff --git a/symmetric-core/build.gradle b/symmetric-core/build.gradle index 12bae55290..110b68a7db 100644 --- a/symmetric-core/build.gradle +++ b/symmetric-core/build.gradle @@ -10,6 +10,7 @@ apply from: symAssembleDir + '/common.gradle' compile "com.fasterxml.jackson.core:jackson-databind:2.9.8" compile "com.google.code.gson:gson:2.8.5" compile "org.springframework:spring-core:$springVersion" + provided "org.apache.hbase:hbase-client:1.3.6" testCompile project(path: ':symmetric-util', configuration: 'testArtifacts') testCompile project(path: ':symmetric-jdbc', configuration: 'testArtifacts') diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/common/ParameterConstants.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/common/ParameterConstants.java index 977a700c7d..82684284f8 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/common/ParameterConstants.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/common/ParameterConstants.java @@ -528,6 +528,13 @@ private ParameterConstants() { public final static String[] ALL_JDBC_PARAMS = new String[] { DB_FETCH_SIZE, DB_QUERY_TIMEOUT_SECS, JDBC_EXECUTE_BATCH_SIZE, JDBC_ISOLATION_LEVEL, JDBC_READ_STRINGS_AS_BYTES, TREAT_BINARY_AS_LOB_ENABLED, LOG_SLOW_SQL_THRESHOLD_MILLIS, LOG_SQL_PARAMETERS_INLINE }; + public final static String GOOGLE_BIG_QUERY_MAX_ROWS_PER_RPC = "google.bigquery.max.rows.per.rpc"; + public final static String GOOGLE_BIG_QUERY_LOCATION = "google.bigquery.location"; + public final static String GOOGLE_BIG_QUERY_PROJECT_ID = "google.bigquery.project.id"; + public final static String GOOGLE_BIG_QUERY_SECURITY_CREDENTIALS_PATH = "google.bigquery.security.credentials.path"; + + public final static String HBASE_SITE_XML_PATH = "hbase.site.xml.path"; + public static Map getParameterMetaData() { return parameterMetaData; } diff --git a/symmetric-core/src/main/java/org/jumpmind/symmetric/load/DefaultDataLoaderFactory.java b/symmetric-core/src/main/java/org/jumpmind/symmetric/load/DefaultDataLoaderFactory.java index 24e4b1253a..89717154b6 100644 --- a/symmetric-core/src/main/java/org/jumpmind/symmetric/load/DefaultDataLoaderFactory.java +++ b/symmetric-core/src/main/java/org/jumpmind/symmetric/load/DefaultDataLoaderFactory.java @@ -27,11 +27,13 @@ import org.apache.commons.lang.StringUtils; import org.jumpmind.db.platform.IDatabasePlatform; +import org.jumpmind.db.platform.bigquery.BigQueryPlatform; import org.jumpmind.db.sql.ISqlTransaction; import org.jumpmind.extension.IBuiltInExtensionPoint; import org.jumpmind.symmetric.common.ParameterConstants; import org.jumpmind.symmetric.db.ISymmetricDialect; import org.jumpmind.symmetric.io.data.IDataWriter; +import org.jumpmind.symmetric.io.data.writer.BigQueryDatabaseWriter; import org.jumpmind.symmetric.io.data.writer.CassandraDatabaseWriter; import org.jumpmind.symmetric.io.data.writer.Conflict; import org.jumpmind.symmetric.io.data.writer.Conflict.PingBack; @@ -47,6 +49,8 @@ import org.slf4j.Logger; import org.slf4j.LoggerFactory; +import com.google.cloud.bigquery.BigQuery; + public class DefaultDataLoaderFactory extends AbstractDataLoaderFactory implements IDataLoaderFactory, IBuiltInExtensionPoint { protected final Logger log = LoggerFactory.getLogger(getClass()); @@ -86,6 +90,26 @@ public IDataWriter getDataWriter(final String sourceNodeId, final ISymmetricDial } } + if (symmetricDialect.getTargetPlatform().getClass().getSimpleName().equals("BigQueryPlatform")) { + try { + return new BigQueryDatabaseWriter(symmetricDialect.getPlatform(), symmetricDialect.getTargetPlatform(), + symmetricDialect.getTablePrefix(), new DefaultTransformWriterConflictResolver(transformWriter), + buildDatabaseWriterSettings(filters, errorHandlers, conflictSettings, resolvedData), + ((BigQueryPlatform) symmetricDialect.getTargetPlatform()).getBigQuery(), + parameterService.getInt(ParameterConstants.GOOGLE_BIG_QUERY_MAX_ROWS_PER_RPC, 100)); + + } catch (Exception e) { + log.warn( + "Failed to create the big query database writer.", + e); + if (e instanceof RuntimeException) { + throw (RuntimeException) e; + } else { + throw new RuntimeException(e); + } + } + } + if (symmetricDialect.getTargetPlatform().getClass().getSimpleName().equals("KafkaPlatform")) { try { if (filters == null) { diff --git a/symmetric-core/src/main/resources/symmetric-default.properties b/symmetric-core/src/main/resources/symmetric-default.properties index 9aa92a0066..fc7cc2d223 100644 --- a/symmetric-core/src/main/resources/symmetric-default.properties +++ b/symmetric-core/src/main/resources/symmetric-default.properties @@ -2745,6 +2745,32 @@ job.log.miner.period.time.ms=10000 # Type: boolean postgres.security.definer=false +# Google BigQuery parameter to control the number of rows sent through the insertAll +# command while sending data into BigQuery. +# +# DatabaseOverridable: true +# Tags: bigquery +# Type: integer +google.bigquery.max.rows.per.rpc=100 + +# Google BigQuery location. +# +# DatabaseOverridable: true +# Tags: bigquery +google.bigquery.location=US + +# Google BigQuery project id. +# +# DatabaseOverridable: true +# Tags: bigquery +google.bigquery.project.id= + +# Google BigQuery security credentials path for the json file containing the credentials to connect to Big Query. +# +# DatabaseOverridable: true +# Tags: bigquery +google.bigquery.security.credentials.path"; + # Determines if the size of a LOB value should be checked before extracting to prevent # a JVM crash that can occur if the size of a LOB is bigger than the max size of a java array # 2^31 - 1 diff --git a/symmetric-db/src/main/java/org/jumpmind/db/platform/DatabaseNamesConstants.java b/symmetric-db/src/main/java/org/jumpmind/db/platform/DatabaseNamesConstants.java index 0496ed0289..96dd7f8bfe 100644 --- a/symmetric-db/src/main/java/org/jumpmind/db/platform/DatabaseNamesConstants.java +++ b/symmetric-db/src/main/java/org/jumpmind/db/platform/DatabaseNamesConstants.java @@ -59,6 +59,7 @@ private DatabaseNamesConstants() { public final static String KAFKA = "kafka"; public final static String SNOWFLAKE = "snowflake"; public final static String HBASE = "hbase"; + public final static String BIGQUERY = "bigquery"; public final static String HANA = "hdb"; } diff --git a/symmetric-db/src/main/java/org/jumpmind/db/platform/bigquery/BigQueryDdlBuilder.java b/symmetric-db/src/main/java/org/jumpmind/db/platform/bigquery/BigQueryDdlBuilder.java new file mode 100644 index 0000000000..f2370effaa --- /dev/null +++ b/symmetric-db/src/main/java/org/jumpmind/db/platform/bigquery/BigQueryDdlBuilder.java @@ -0,0 +1,118 @@ +package org.jumpmind.db.platform.bigquery; + +import java.sql.Types; + +import org.jumpmind.db.model.Column; +import org.jumpmind.db.model.Database; +import org.jumpmind.db.model.IIndex; +import org.jumpmind.db.model.Table; +import org.jumpmind.db.platform.AbstractDdlBuilder; +import org.jumpmind.db.platform.DatabaseNamesConstants; + +import com.google.cloud.bigquery.BigQuery; + +public class BigQueryDdlBuilder extends AbstractDdlBuilder { + + BigQuery bigQuery; + + public BigQueryDdlBuilder(BigQuery bq) { + super(DatabaseNamesConstants.BIGQUERY); + this.delimitedIdentifierModeOn=false; + this.bigQuery = bq; + + getDatabaseInfo().setDelimitedIdentifiersSupported(false); + + databaseInfo.addNativeTypeMapping(Types.INTEGER, "INT64"); + databaseInfo.addNativeTypeMapping(Types.BIT, "INT64"); + databaseInfo.addNativeTypeMapping(Types.BIGINT, "NUMERIC"); + databaseInfo.addNativeTypeMapping(Types.TINYINT, "NUMERIC"); + databaseInfo.addNativeTypeMapping(Types.DOUBLE, "NUMERIC"); + databaseInfo.addNativeTypeMapping(Types.DECIMAL, "NUMERIC"); + + databaseInfo.addNativeTypeMapping(Types.BINARY, "BYTES"); + databaseInfo.addNativeTypeMapping(Types.BLOB, "BYTES"); + + databaseInfo.addNativeTypeMapping(Types.VARCHAR, "STRING"); + databaseInfo.addNativeTypeMapping(Types.CHAR, "STRING"); + databaseInfo.addNativeTypeMapping(Types.LONGVARBINARY, "STRING"); + databaseInfo.addNativeTypeMapping(Types.LONGVARCHAR, "STRING"); + databaseInfo.addNativeTypeMapping(Types.VARBINARY, "STRING"); + databaseInfo.addNativeTypeMapping(Types.NVARCHAR, "STRING"); + databaseInfo.addNativeTypeMapping(Types.CLOB, "STRING"); + + databaseInfo.setForeignKeysSupported(false); + databaseInfo.setPrimaryKeyEmbedded(true); + databaseInfo.setIndicesSupported(false); + + databaseInfo.setHasSize(Integer.valueOf(Types.CHAR), false); + databaseInfo.setHasSize(Integer.valueOf(Types.VARCHAR), false); + databaseInfo.setHasSize(Integer.valueOf(Types.BINARY), false); + databaseInfo.setHasSize(Integer.valueOf(Types.VARBINARY), false); + databaseInfo.setHasSize(Integer.valueOf(Types.NCHAR), false); + databaseInfo.setHasSize(Integer.valueOf(Types.NVARCHAR), false); + databaseInfo.setHasSize(Integer.valueOf(Types.NUMERIC), false); + databaseInfo.setHasSize(Integer.valueOf(Types.DECIMAL), false); + + databaseInfo.setHasPrecisionAndScale(Types.NUMERIC, false); + databaseInfo.setHasPrecisionAndScale(Types.DECIMAL, false); + } + + @Override + protected void mergeOrRemovePlatformTypes(Database currentModel, Database desiredModel) { + super.mergeOrRemovePlatformTypes(currentModel, desiredModel); + for (Table table : desiredModel.getTables()) { + for (Column col : table.getColumns()) { + col.setPrimaryKey(false); + col.setAutoIncrement(false); + col.setRequired(false); + col.setDefaultValue(null); + + if (col.getMappedTypeCode() == Types.CHAR) { + col.setMappedTypeCode(Types.VARCHAR); + } else if (col.getMappedTypeCode() == Types.BIT) { + col.setMappedTypeCode(Types.INTEGER); + } else if (col.getMappedTypeCode() == Types.LONGVARCHAR) { + col.setMappedTypeCode(Types.VARCHAR); + } else if (col.getMappedTypeCode() == Types.BLOB) { + col.setMappedTypeCode(Types.BINARY); + } else if (col.getMappedTypeCode() == Types.DECIMAL ) { + col.setMappedTypeCode(Types.NUMERIC); + } + } + } + } + + @Override + protected void writePrimaryKeyStmt(Table table, Column[] primaryKeyColumns, StringBuilder ddl) { + } + + @Override + protected void writeEmbeddedPrimaryKeysStmt(Table table, StringBuilder ddl) { + } + + @Override + protected void writeExternalPrimaryKeysCreateStmt(Table table, Column[] primaryKeyColumns, StringBuilder ddl) { + } + + @Override + protected void writeColumnAutoIncrementStmt(Table table, Column column, StringBuilder ddl) { + } + + @Override + protected void writeEmbeddedIndexCreateStmt(Table table, IIndex index, StringBuilder ddl) { + } + + @Override + protected void writeEmbeddedIndicesStmt(Table table, StringBuilder ddl) { + } + + @Override + protected void writeColumnUniqueStmt(StringBuilder ddl) { + } + + @Override + protected void writeColumnDefaultValueStmt(Table table, Column column, StringBuilder ddl) { + } + + +} diff --git a/symmetric-db/src/main/java/org/jumpmind/db/platform/bigquery/BigQueryDdlReader.java b/symmetric-db/src/main/java/org/jumpmind/db/platform/bigquery/BigQueryDdlReader.java new file mode 100644 index 0000000000..7431a1cca6 --- /dev/null +++ b/symmetric-db/src/main/java/org/jumpmind/db/platform/bigquery/BigQueryDdlReader.java @@ -0,0 +1,139 @@ +package org.jumpmind.db.platform.bigquery; + +import java.sql.Types; +import java.util.ArrayList; +import java.util.Collection; +import java.util.List; +import java.util.Set; + +import org.jumpmind.db.model.Column; +import org.jumpmind.db.model.Database; +import org.jumpmind.db.model.ForeignKey; +import org.jumpmind.db.model.Table; +import org.jumpmind.db.model.Trigger; +import org.jumpmind.db.platform.IDdlReader; +import org.jumpmind.db.sql.ISqlTransaction; +import org.jumpmind.db.util.BinaryEncoding; +import org.jumpmind.db.util.TableRow; + +import com.google.api.gax.paging.Page; +import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.Dataset; +import com.google.cloud.bigquery.LegacySQLTypeName; +import com.google.cloud.bigquery.StandardTableDefinition; +import com.google.cloud.bigquery.TableId; + +public class BigQueryDdlReader implements IDdlReader { + + BigQuery bigQuery; + + public BigQueryDdlReader(BigQuery bq) { + this.bigQuery = bq; + } + + @Override + public Database readTables(String catalog, String schema, String[] tableTypes) { + return null; + } + + @Override + public Table readTable(String catalog, String schema, String tableName) { + com.google.cloud.bigquery.Table bqTable = this.bigQuery.getTable(TableId.of(schema, tableName)); + Table table = null; + if (bqTable != null && bqTable.getDefinition() instanceof StandardTableDefinition) { + + StandardTableDefinition defn = (StandardTableDefinition) bqTable.getDefinition(); + table = new Table(catalog, schema, tableName); + + for (com.google.cloud.bigquery.Field bqField : defn.getSchema().getFields()) { + Column column = new Column(bqField.getName(), false, getTypeCode(bqField.getType()), 0, 0); + table.addColumn(column); + } + } + return table; + } + + protected int getTypeCode(LegacySQLTypeName legacyType) { + int typeCode = Types.OTHER; + + if (legacyType.equals(LegacySQLTypeName.INTEGER)) { + typeCode = Types.INTEGER; + } else if (legacyType.equals(LegacySQLTypeName.BOOLEAN)) { + typeCode = Types.BOOLEAN; + } else if (legacyType.equals(LegacySQLTypeName.BYTES)) { + typeCode = Types.BINARY; + } else if (legacyType.equals(LegacySQLTypeName.DATE)) { + typeCode = Types.DATE; + } else if (legacyType.equals(LegacySQLTypeName.DATETIME)) { + typeCode = Types.TIMESTAMP; + } else if (legacyType.equals(LegacySQLTypeName.FLOAT)) { + typeCode = Types.FLOAT; + } else if (legacyType.equals(LegacySQLTypeName.NUMERIC)) { + typeCode = Types.NUMERIC; + } else if (legacyType.equals(LegacySQLTypeName.STRING)) { + typeCode = Types.VARCHAR; + } else if (legacyType.equals(LegacySQLTypeName.TIMESTAMP)) { + typeCode = Types.TIMESTAMP; + } + + return typeCode; + } + + @Override + public List getTableTypes() { + return null; + } + + @Override + public List getCatalogNames() { + return null; + } + + @Override + public List getSchemaNames(String catalog) { + Page datasets = this.bigQuery.listDatasets(); + List schemas = new ArrayList(); + + while (datasets.hasNextPage()) { + for (Dataset ds : datasets.getNextPage().getValues()) { + schemas.add(ds.toString()); + } + } + return schemas; + } + + @Override + public List getTableNames(String catalog, String schema, String[] tableTypes) { + return null; + } + + @Override + public List getColumnNames(String catalog, String schema, String tableName) { + return null; + } + + @Override + public List getTriggers(String catalog, String schema, String tableName) { + return null; + } + + @Override + public Trigger getTriggerFor(Table table, String name) { + return null; + } + + @Override + public Collection getExportedKeys(Table table) { + return null; + } + + @Override + public List getExportedForeignTableRows(ISqlTransaction transaction, List tableRows, Set visited) { + return null; + } + + @Override + public List getImportedForeignTableRows(List tableRows, Set visited, BinaryEncoding encoding) { + return null; + } +} diff --git a/symmetric-db/src/main/java/org/jumpmind/db/platform/bigquery/BigQueryPlatform.java b/symmetric-db/src/main/java/org/jumpmind/db/platform/bigquery/BigQueryPlatform.java new file mode 100644 index 0000000000..d05dd22e2f --- /dev/null +++ b/symmetric-db/src/main/java/org/jumpmind/db/platform/bigquery/BigQueryPlatform.java @@ -0,0 +1,69 @@ +package org.jumpmind.db.platform.bigquery; + +import org.jumpmind.db.platform.AbstractDatabasePlatform; +import org.jumpmind.db.platform.IDdlBuilder; +import org.jumpmind.db.platform.IDdlReader; +import org.jumpmind.db.sql.ISqlTemplate; +import org.jumpmind.db.sql.SqlTemplateSettings; + +import com.google.cloud.bigquery.BigQuery; + +public class BigQueryPlatform extends AbstractDatabasePlatform { + + ISqlTemplate sqlTemplate; + BigQuery bigquery; + + public BigQueryPlatform(SqlTemplateSettings settings, BigQuery bigquery) { + super(settings); + + this.bigquery = bigquery; + sqlTemplate = new BigQuerySqlTemplate(bigquery); + this.ddlBuilder = new BigQueryDdlBuilder(bigquery); + this.ddlReader = new BigQueryDdlReader(bigquery); + } + + @Override + public String getName() { + return "bigquery"; + } + + @Override + public String getDefaultSchema() { + return null; + } + + @Override + public String getDefaultCatalog() { + return null; + } + + @Override + public T getDataSource() { + return null; + } + + @Override + public ISqlTemplate getSqlTemplate() { + return sqlTemplate; + } + + @Override + public ISqlTemplate getSqlTemplateDirty() { + return null; + } + + @Override + public IDdlBuilder getDdlBuilder() { + return this.ddlBuilder; + } + + @Override + public IDdlReader getDdlReader() { + return new BigQueryDdlReader(this.bigquery); + } + + public BigQuery getBigQuery() { + return bigquery; + } + +} diff --git a/symmetric-db/src/main/java/org/jumpmind/db/platform/bigquery/BigQuerySqlTemplate.java b/symmetric-db/src/main/java/org/jumpmind/db/platform/bigquery/BigQuerySqlTemplate.java new file mode 100644 index 0000000000..a55e7de3a3 --- /dev/null +++ b/symmetric-db/src/main/java/org/jumpmind/db/platform/bigquery/BigQuerySqlTemplate.java @@ -0,0 +1,65 @@ +package org.jumpmind.db.platform.bigquery; + +import static org.apache.commons.lang.StringUtils.isNotBlank; + +import java.io.FileInputStream; +import java.util.UUID; + +import org.jumpmind.db.sql.AbstractJavaDriverSqlTemplate; +import org.jumpmind.db.sql.ISqlResultsListener; +import org.jumpmind.db.sql.ISqlStatementSource; + +import com.google.auth.oauth2.ServiceAccountCredentials; +import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.BigQueryOptions; +import com.google.cloud.bigquery.Job; +import com.google.cloud.bigquery.JobId; +import com.google.cloud.bigquery.JobInfo; +import com.google.cloud.bigquery.QueryJobConfiguration; + +public class BigQuerySqlTemplate extends AbstractJavaDriverSqlTemplate { + + BigQuery bigquery; + + public BigQuerySqlTemplate(BigQuery bq) { + bigquery = bq; + } + @Override + public String getDatabaseProductName() { + return "bigquery"; + } + + @Override + public int update(boolean autoCommit, boolean failOnError, boolean failOnDrops, boolean failOnSequenceCreate, int commitRate, + ISqlResultsListener listener, ISqlStatementSource source) { + + for (String statement = source.readSqlStatement(); statement != null; statement = source + .readSqlStatement()) { + if (isNotBlank(statement)) { + try { + QueryJobConfiguration queryConfig = QueryJobConfiguration.newBuilder(statement).build(); + + JobId jobId = JobId.of(UUID.randomUUID().toString()); + Job queryJob = bigquery.create(JobInfo.newBuilder(queryConfig).setJobId(jobId).build()); + + // Wait for the query to complete. + queryJob = queryJob.waitFor(); + + // Check for errors + if (queryJob == null) { + throw new RuntimeException("Job no longer exists"); + } else if (queryJob.getStatus().getError() != null) { + // You can also look at queryJob.getStatus().getExecutionErrors() for all + // errors, not just the latest one. + throw new RuntimeException(queryJob.getStatus().getError().toString()); + } + + } catch (InterruptedException e) { + throw new RuntimeException(e); + } + } + + } + return 1; + } +} diff --git a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/BigQueryDatabaseWriter.java b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/BigQueryDatabaseWriter.java new file mode 100644 index 0000000000..32a09eaf90 --- /dev/null +++ b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/BigQueryDatabaseWriter.java @@ -0,0 +1,148 @@ +package org.jumpmind.symmetric.io.data.writer; + +import java.io.FileNotFoundException; +import java.io.IOException; +import java.util.Map; +import java.util.stream.Collectors; +import java.util.stream.IntStream; + +import org.jumpmind.db.model.Table; +import org.jumpmind.db.platform.IDatabasePlatform; +import org.jumpmind.db.sql.DmlStatement.DmlType; +import org.jumpmind.symmetric.io.data.Batch; +import org.jumpmind.symmetric.io.data.CsvData; +import org.jumpmind.symmetric.io.data.DataEventType; + +import com.google.cloud.bigquery.BigQuery; +import com.google.cloud.bigquery.InsertAllRequest; +import com.google.cloud.bigquery.InsertAllRequest.RowToInsert; +import com.google.cloud.bigquery.InsertAllResponse; +import com.google.cloud.bigquery.TableId; + +public class BigQueryDatabaseWriter extends DynamicDefaultDatabaseWriter { + BigQuery bigquery; + InsertAllRequest.Builder insertAllRequestBuilder; + TableId currentTableId; + + int maxRowsToInsertPerRPC; + int rowsAdded; + int updateCount; + int deleteCount; + + public BigQueryDatabaseWriter(IDatabasePlatform symmetricPlatform, IDatabasePlatform targetPlatform, String prefix, + IDatabaseWriterConflictResolver conflictResolver, DatabaseWriterSettings settings, BigQuery bq, int maxRowsPerRpc) + throws FileNotFoundException, IOException { + super(symmetricPlatform, targetPlatform, prefix, conflictResolver, settings); + this.maxRowsToInsertPerRPC = maxRowsPerRpc; + bigquery = bq; + } + + + @Override + protected void prepare() { + if (isSymmetricTable(this.targetTable != null ? this.targetTable.getName() : "")) { + super.prepare(); + } else { + + } + } + + @Override + protected void prepare(String sql, CsvData data) { + + } + + /* + * Checks if a send sql event type was for the sym_node table. If it is the send sql shoudl run against Cassandra tables otherwise it is an internal Symmetric + * send sql. + */ + protected boolean isUserSendSql(String sql, CsvData data) { + return data.getDataEventType().equals(DataEventType.SQL) + && this.targetTable.getNameLowerCase().equals(this.getTablePrefix().toLowerCase() + "_node") + && !sql.toLowerCase().contains("from " + this.getTablePrefix().toLowerCase() + "_node"); + } + + @Override + public int prepareAndExecute(String sql, CsvData data) { + return 1; + } + + @Override + protected int execute(CsvData data, String[] values) { + if (isSymmetricTable(this.targetTable != null ? this.targetTable.getName() : "")) { + return super.execute(data, values); + } + + if (this.currentDmlStatement.getDmlType().equals(DmlType.INSERT)) { + + if (currentTableId == null) { + currentTableId = TableId.of(this.targetTable.getSchema(), this.targetTable.getName()); + } + + if (insertAllRequestBuilder == null) { + insertAllRequestBuilder = InsertAllRequest.newBuilder(currentTableId); + } + + if (rowsAdded < maxRowsToInsertPerRPC) { + Map map = IntStream.range(0, this.currentDmlStatement.getColumns().length).boxed() + .collect(Collectors.toMap(i -> (String) this.currentDmlStatement.getColumns()[i].getName(), i -> values[i])); + + insertAllRequestBuilder.addRow(RowToInsert.of(map)); + rowsAdded++; + } + + if (rowsAdded == maxRowsToInsertPerRPC) { + InsertAllResponse response = + bigquery.insertAll(insertAllRequestBuilder.build()); + rowsAdded = 0; + insertAllRequestBuilder = null; + } + } else if (this.currentDmlStatement.getDmlType().equals(DmlType.UPDATE)) { + updateCount++; + } else if (this.currentDmlStatement.getDmlType().equals(DmlType.DELETE)) { + deleteCount++; + } + + return 1; + } + + @Override + public void start(Batch batch) { + super.start(batch); + updateCount=0; + deleteCount=0; + } + + @Override + public void end(Table table) { + super.end(table); + + if (!isSymmetricTable(this.targetTable != null ? this.targetTable.getName() : "") && insertAllRequestBuilder != null) { + InsertAllResponse response = + bigquery.insertAll(insertAllRequestBuilder.build()); + rowsAdded = 0; + insertAllRequestBuilder = null; + currentTableId = null; + } + } + + @Override + public void end(Batch batch, boolean inError) { + super.end(batch, inError); + + if (updateCount > 0 || deleteCount > 0) { + log.warn("Google BigQuery only supported for inserts, detected " + updateCount + + " updates and " + deleteCount + " deletes, which will not be replicated."); + } + } + + + @Override + protected Table lookupTableAtTarget(Table sourceTable) { + if (sourceTable != null && isSymmetricTable(sourceTable.getName())) { + return super.lookupTableAtTarget(sourceTable); + } + + return sourceTable; + } +} diff --git a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/DefaultDatabaseWriterConflictResolver.java b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/DefaultDatabaseWriterConflictResolver.java index adda59f157..273e91b7e0 100644 --- a/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/DefaultDatabaseWriterConflictResolver.java +++ b/symmetric-io/src/main/java/org/jumpmind/symmetric/io/data/writer/DefaultDatabaseWriterConflictResolver.java @@ -92,10 +92,10 @@ protected boolean isTimestampNewer(Conflict conflict, AbstractDatabaseWriter wri // If you are in this situation because of an instance where the conflict exists // because the row doesn't exist, then existing simply needs to be null if (existingStr != null) { -// int split = existingStr.lastIndexOf(" "); -// existingTs = FormatUtils.parseDate(existingStr.substring(0, split).trim(), -// FormatUtils.TIMESTAMP_PATTERNS, -// TimeZone.getTimeZone(existingStr.substring(split).trim())); +// int split = existingStr.lastIndexOf(" "); +// existingTs = FormatUtils.parseDate(existingStr.substring(0, split).trim(), +// FormatUtils.TIMESTAMP_PATTERNS, +// TimeZone.getTimeZone(existingStr.substring(split).trim())); existingTs = FormatUtils.parseTimestampWithTimezone(existingStr, FormatUtils.TIMESTAMP_WITH_TIMEZONE_PATTERNS); } // Get the loadingTs with timezone diff --git a/symmetric-jdbc/src/main/java/org/jumpmind/db/platform/hbase/HbasePlatform.java b/symmetric-jdbc/src/main/java/org/jumpmind/db/platform/hbase/HbasePlatform.java index c6b4670751..d48536ae64 100644 --- a/symmetric-jdbc/src/main/java/org/jumpmind/db/platform/hbase/HbasePlatform.java +++ b/symmetric-jdbc/src/main/java/org/jumpmind/db/platform/hbase/HbasePlatform.java @@ -1,4 +1,4 @@ -package org.jumpmind.db.platform.hbase; + package org.jumpmind.db.platform.hbase; import javax.sql.DataSource; diff --git a/symmetric-util/src/main/java/org/jumpmind/util/FormatUtils.java b/symmetric-util/src/main/java/org/jumpmind/util/FormatUtils.java index ddddd649ce..bb64cc9be7 100644 --- a/symmetric-util/src/main/java/org/jumpmind/util/FormatUtils.java +++ b/symmetric-util/src/main/java/org/jumpmind/util/FormatUtils.java @@ -59,6 +59,10 @@ public final class FormatUtils { public static final FastDateFormat TIMESTAMP_FORMATTER = FastDateFormat .getInstance("yyyy-MM-dd HH:mm:ss.SSS"); + public static final String[] TIMESTAMP_WITH_TIMEZONE_PATTERNS = { + "yyyy-MM-dd HH:mm:ss.n xxx" + }; + public static final FastDateFormat TIME_FORMATTER = FastDateFormat.getInstance("HH:mm:ss.SSS"); /* special characters for wildcard triggers */