Skip to content

Commit

Permalink
0004190: Google Big Query Support
Browse files Browse the repository at this point in the history
  • Loading branch information
jumpmind-josh committed Dec 10, 2019
1 parent d682f40 commit b248b08
Show file tree
Hide file tree
Showing 15 changed files with 792 additions and 3 deletions.
4 changes: 4 additions & 0 deletions symmetric-assemble/common.gradle
Expand Up @@ -271,6 +271,7 @@ subprojects { subproject ->
provided "org.apache.geronimo.specs:geronimo-j2ee-connector_1.6_spec:1.0"
provided ("com.datastax.cassandra:cassandra-driver-core:3.1.4") {
exclude group: 'org.slf4j'
exclude group: 'com.google.guava'
}
provided ("nl.cad:tps-parse:1.0.15-SNAPSHOT") {
exclude group: 'commons-lang', module: 'commons-lang'
Expand All @@ -286,6 +287,9 @@ subprojects { subproject ->
exclude group: 'org.slf4j'
}

provided 'com.google.cloud:google-cloud-bigquery:1.99.0'


testCompile fileTree(dir: System.getProperty("user.home") + '/.symmetricds/lib', include: '*.jar')
testCompile "junit:junit:$junitVersion"
testCompile "org.hamcrest:hamcrest-all:$hamcrestVersion"
Expand Down
Expand Up @@ -24,6 +24,7 @@
import static org.apache.commons.lang.StringUtils.isNotBlank;

import java.io.File;
import java.io.FileInputStream;
import java.io.StringReader;
import java.lang.reflect.Constructor;
import java.nio.charset.Charset;
Expand All @@ -46,6 +47,7 @@
import org.jumpmind.db.platform.DatabaseNamesConstants;
import org.jumpmind.db.platform.IDatabasePlatform;
import org.jumpmind.db.platform.JdbcDatabasePlatformFactory;
import org.jumpmind.db.platform.bigquery.BigQueryPlatform;
import org.jumpmind.db.platform.cassandra.CassandraPlatform;
import org.jumpmind.db.platform.generic.GenericJdbcDatabasePlatform;
import org.jumpmind.db.platform.kafka.KafkaPlatform;
Expand Down Expand Up @@ -84,6 +86,11 @@
import org.springframework.jndi.JndiObjectFactoryBean;
import org.xml.sax.InputSource;

import com.google.auth.oauth2.ServiceAccountCredentials;
import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.BigQueryOptions;
import com.google.cloud.http.HttpTransportOptions;

/**
* Represents the client portion of a SymmetricDS engine. This class can be used
* to embed SymmetricDS into another application.
Expand Down Expand Up @@ -305,7 +312,7 @@ protected IDatabasePlatform createDatabasePlatform(TypedProperties properties) {

public static IDatabasePlatform createDatabasePlatform(ApplicationContext springContext, TypedProperties properties,
DataSource dataSource, boolean waitOnAvailableDatabase) {
return createDatabasePlatform(springContext, properties, dataSource, waitOnAvailableDatabase, false);
return createDatabasePlatform(springContext, properties, dataSource, waitOnAvailableDatabase, properties.is(ParameterConstants.NODE_LOAD_ONLY));
}
public static IDatabasePlatform createDatabasePlatform(ApplicationContext springContext, TypedProperties properties,
DataSource dataSource, boolean waitOnAvailableDatabase, boolean isLoadOnly) {
Expand All @@ -318,6 +325,23 @@ public static IDatabasePlatform createDatabasePlatform(ApplicationContext spring
return new CassandraPlatform(createSqlTemplateSettings(properties), dbUrl.substring(12));
} else if (dbDriver != null && dbDriver.contains("kafka")) {
return new KafkaPlatform(createSqlTemplateSettings(properties));
} else if (dbUrl != null && dbUrl.startsWith("bigquery://")) {
try {
HttpTransportOptions transportOptions = BigQueryOptions.getDefaultHttpTransportOptions();
transportOptions = transportOptions.toBuilder().setConnectTimeout(60000).setReadTimeout(60000)
.build();

BigQuery bigquery = BigQueryOptions.newBuilder()
.setProjectId(properties.get(ParameterConstants.GOOGLE_BIG_QUERY_PROJECT_ID))
.setLocation(properties.get(ParameterConstants.GOOGLE_BIG_QUERY_LOCATION, "US"))
.setCredentials(ServiceAccountCredentials.fromStream(
new FileInputStream(properties.get(ParameterConstants.GOOGLE_BIG_QUERY_SECURITY_CREDENTIALS_PATH))))
.setTransportOptions(transportOptions)
.build().getService();
return new BigQueryPlatform(createSqlTemplateSettings(properties), bigquery);
} catch (Exception e) {
throw new RuntimeException(e);
}
}
}
String jndiName = properties.getProperty(ParameterConstants.DB_JNDI_NAME);
Expand Down Expand Up @@ -528,7 +552,17 @@ protected void checkLoadOnly() {
for (String prop : kafkaProperties) {
properties.put(prop, parameterService.getString(prop));
}


String[] bigQueryProperties = new String[] {
ParameterConstants.GOOGLE_BIG_QUERY_PROJECT_ID,
ParameterConstants.GOOGLE_BIG_QUERY_LOCATION,
ParameterConstants.GOOGLE_BIG_QUERY_SECURITY_CREDENTIALS_PATH,
};

for (String prop : bigQueryProperties) {
properties.put(prop, parameterService.getString(prop));
}

IDatabasePlatform targetPlatform = createDatabasePlatform(null, properties, null, true, true);
DataSource loadDataSource = targetPlatform.getDataSource();
if (targetPlatform instanceof GenericJdbcDatabasePlatform) {
Expand Down
@@ -0,0 +1,69 @@
package org.jumpmind.symmetric.ext;

import java.util.List;

import org.jumpmind.db.platform.DatabaseNamesConstants;
import org.jumpmind.db.platform.IDatabasePlatform;
import org.jumpmind.db.platform.bigquery.BigQueryPlatform;
import org.jumpmind.symmetric.ISymmetricEngine;
import org.jumpmind.symmetric.db.ISymmetricDialect;
import org.jumpmind.symmetric.io.BigQueryBulkDatabaseWriter;
import org.jumpmind.symmetric.io.data.IDataWriter;
import org.jumpmind.symmetric.io.data.writer.Conflict;
import org.jumpmind.symmetric.io.data.writer.IDatabaseWriterErrorHandler;
import org.jumpmind.symmetric.io.data.writer.IDatabaseWriterFilter;
import org.jumpmind.symmetric.io.data.writer.ResolvedData;
import org.jumpmind.symmetric.io.data.writer.TransformWriter;
import org.jumpmind.symmetric.io.stage.IStagingManager;
import org.jumpmind.symmetric.load.AbstractDataLoaderFactory;
import org.jumpmind.symmetric.load.IDataLoaderFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.cloud.bigquery.BigQuery;

public class BigQueryDataLoaderFactory extends AbstractDataLoaderFactory implements IDataLoaderFactory {
protected final Logger log = LoggerFactory.getLogger(getClass());

private IStagingManager stagingManager;

private BigQuery bigquery;

public BigQueryDataLoaderFactory(ISymmetricEngine engine) {
this.stagingManager = engine.getStagingManager();
this.parameterService = engine.getParameterService();
this.bigquery = ((BigQueryPlatform) engine.getSymmetricDialect().getTargetPlatform()).getBigQuery();

}

public String getTypeName() {
return "bigquery_bulk";
}

public IDataWriter getDataWriter(String sourceNodeId, ISymmetricDialect symmetricDialect,
TransformWriter transformWriter,
List<IDatabaseWriterFilter> filters, List<IDatabaseWriterErrorHandler> errorHandlers,
List<? extends Conflict> conflictSettings, List<ResolvedData> resolvedData) {

try {
return new BigQueryBulkDatabaseWriter(symmetricDialect.getPlatform(), symmetricDialect.getTargetPlatform(),
symmetricDialect.getTablePrefix(), stagingManager, filters, errorHandlers, parameterService,
buildParameterDatabaseWritterSettings(), this.bigquery);

} catch (Exception e) {
log.warn(
"Failed to create the big query database writer.",
e);
if (e instanceof RuntimeException) {
throw (RuntimeException) e;
} else {
throw new RuntimeException(e);
}
}
}

public boolean isPlatformSupported(IDatabasePlatform platform) {
return (DatabaseNamesConstants.BIGQUERY.equals(platform.getName()));
}

}
Expand Up @@ -72,6 +72,9 @@ public IDataWriter getDataWriter(String sourceNodeId, ISymmetricDialect symmetri
engine.getSymmetricDialect().getTargetPlatform().getName().startsWith(DatabaseNamesConstants.SNOWFLAKE)) {
return new SnowflakeBulkDataLoaderFactory(engine).getDataWriter(sourceNodeId, symmetricDialect, transformWriter,
filters, errorHandlers, conflictSettings, resolvedData);
} else if (DatabaseNamesConstants.BIGQUERY.equals(engine.getSymmetricDialect().getTargetPlatform().getName())) {
return new BigQueryDataLoaderFactory(engine).getDataWriter(sourceNodeId, symmetricDialect, transformWriter,
filters, errorHandlers, conflictSettings, resolvedData);
} else {
return new JdbcBatchBulkDatabaseWriter(symmetricDialect.getPlatform(), symmetricDialect.getTargetPlatform(),
symmetricDialect.getTablePrefix(), buildParameterDatabaseWritterSettings());
Expand Down
@@ -0,0 +1,84 @@
package org.jumpmind.symmetric.io;


import java.io.File;
import java.io.FileNotFoundException;
import java.io.IOException;
import java.io.OutputStream;
import java.nio.channels.Channels;
import java.sql.SQLException;
import java.util.List;

import org.jumpmind.db.model.Table;
import org.jumpmind.db.platform.IDatabasePlatform;
import org.jumpmind.symmetric.io.data.writer.DatabaseWriterSettings;
import org.jumpmind.symmetric.io.data.writer.IDatabaseWriterErrorHandler;
import org.jumpmind.symmetric.io.data.writer.IDatabaseWriterFilter;
import org.jumpmind.symmetric.io.stage.IStagingManager;
import org.jumpmind.symmetric.service.IParameterService;

import com.google.cloud.bigquery.BigQuery;
import com.google.cloud.bigquery.FormatOptions;
import com.google.cloud.bigquery.Job;
import com.google.cloud.bigquery.JobId;
import com.google.cloud.bigquery.JobStatistics.LoadStatistics;
import com.google.cloud.bigquery.TableDataWriteChannel;
import com.google.cloud.bigquery.TableId;
import com.google.cloud.bigquery.WriteChannelConfiguration;
import com.google.common.io.Files;


public class BigQueryBulkDatabaseWriter extends CloudBulkDatabaseWriter {

BigQuery bigquery;

public BigQueryBulkDatabaseWriter(IDatabasePlatform symmetricPlatform, IDatabasePlatform targetPlatform,
String tablePrefix, IStagingManager stagingManager, List<IDatabaseWriterFilter> filters,
List<IDatabaseWriterErrorHandler> errorHandlers, IParameterService parameterService, DatabaseWriterSettings writerSettings,
BigQuery bq) throws FileNotFoundException, IOException {
super(symmetricPlatform, targetPlatform, tablePrefix, stagingManager, filters, errorHandlers, parameterService, writerSettings);
fieldTerminator = ",";
bigquery = bq;
}

@Override
public void loadToCloudDatabase() throws SQLException {
try {
File csvPath = this.stagedInputFile.getFile();

TableId tableId = TableId.of(this.targetTable.getSchema(), this.targetTable.getName());
WriteChannelConfiguration writeChannelConfiguration =
WriteChannelConfiguration.newBuilder(tableId).setFormatOptions(FormatOptions.csv()).setAutodetect(false).setDestinationTable(tableId).build();
// The location must be specified; other fields can be auto-detected.
JobId jobId = JobId.newBuilder()
.setLocation(bigquery.getOptions().getLocation())
.setProject(bigquery.getOptions().getProjectId()).build();

TableDataWriteChannel writer = bigquery.writer(jobId, writeChannelConfiguration);
// Write data to writer
OutputStream stream = Channels.newOutputStream(writer);
Files.copy(csvPath, stream);
stream.close();

// Get load job
Job job = writer.getJob();
job = job.waitFor();
LoadStatistics stats = job.getStatistics();
} catch (Exception ex) {
throw getPlatform().getSqlTemplate().translate(ex);
}
}

@Override
protected Table lookupTableAtTarget(Table sourceTable) {
return sourceTable;
}

@Override
public void copyToCloudStorage() throws SQLException {
}

@Override
public void cleanUpCloudStorage() throws SQLException {
}
}
Expand Up @@ -314,7 +314,7 @@ public void bulkWrite(CsvData data) {
default:
flush();
context.put(ContextConstants.CONTEXT_BULK_WRITER_TO_USE, "default");
super.write(data);
writeDefault(data);
break;
}

Expand Down
Expand Up @@ -515,6 +515,11 @@ private ParameterConstants() {

public final static String POSTGRES_SECURITY_DEFINER = "postgres.security.definer";

public final static String GOOGLE_BIG_QUERY_MAX_ROWS_PER_RPC = "google.bigquery.max.rows.per.rpc";
public final static String GOOGLE_BIG_QUERY_LOCATION = "google.bigquery.location";
public final static String GOOGLE_BIG_QUERY_PROJECT_ID = "google.bigquery.project.id";
public final static String GOOGLE_BIG_QUERY_SECURITY_CREDENTIALS_PATH = "google.bigquery.security.credentials.path";

public static Map<String, ParameterMetaData> getParameterMetaData() {
return parameterMetaData;
}
Expand Down
Expand Up @@ -27,11 +27,13 @@

import org.apache.commons.lang.StringUtils;
import org.jumpmind.db.platform.IDatabasePlatform;
import org.jumpmind.db.platform.bigquery.BigQueryPlatform;
import org.jumpmind.db.sql.ISqlTransaction;
import org.jumpmind.extension.IBuiltInExtensionPoint;
import org.jumpmind.symmetric.common.ParameterConstants;
import org.jumpmind.symmetric.db.ISymmetricDialect;
import org.jumpmind.symmetric.io.data.IDataWriter;
import org.jumpmind.symmetric.io.data.writer.BigQueryDatabaseWriter;
import org.jumpmind.symmetric.io.data.writer.CassandraDatabaseWriter;
import org.jumpmind.symmetric.io.data.writer.Conflict;
import org.jumpmind.symmetric.io.data.writer.Conflict.PingBack;
Expand All @@ -47,6 +49,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import com.google.cloud.bigquery.BigQuery;

public class DefaultDataLoaderFactory extends AbstractDataLoaderFactory implements IDataLoaderFactory, IBuiltInExtensionPoint {

protected final Logger log = LoggerFactory.getLogger(getClass());
Expand Down Expand Up @@ -86,6 +90,26 @@ public IDataWriter getDataWriter(final String sourceNodeId, final ISymmetricDial
}
}

if (symmetricDialect.getTargetPlatform().getClass().getSimpleName().equals("BigQueryPlatform")) {
try {
return new BigQueryDatabaseWriter(symmetricDialect.getPlatform(), symmetricDialect.getTargetPlatform(),
symmetricDialect.getTablePrefix(), new DefaultTransformWriterConflictResolver(transformWriter),
buildDatabaseWriterSettings(filters, errorHandlers, conflictSettings, resolvedData),
((BigQueryPlatform) symmetricDialect.getTargetPlatform()).getBigQuery(),
parameterService.getInt(ParameterConstants.GOOGLE_BIG_QUERY_MAX_ROWS_PER_RPC, 100));

} catch (Exception e) {
log.warn(
"Failed to create the big query database writer.",
e);
if (e instanceof RuntimeException) {
throw (RuntimeException) e;
} else {
throw new RuntimeException(e);
}
}
}

if (symmetricDialect.getTargetPlatform().getClass().getSimpleName().equals("KafkaPlatform")) {
try {
if (filters == null) {
Expand Down
26 changes: 26 additions & 0 deletions symmetric-core/src/main/resources/symmetric-default.properties
Expand Up @@ -2655,6 +2655,32 @@ snapshot.file.include.hostname=false
# Type: boolean
postgres.security.definer=false

# Google BigQuery parameter to control the number of rows sent through the insertAll
# command while sending data into BigQuery.
#
# DatabaseOverridable: true
# Tags: bigquery
# Type: integer
google.bigquery.max.rows.per.rpc=100

# Google BigQuery location.
#
# DatabaseOverridable: true
# Tags: bigquery
google.bigquery.location=US

# Google BigQuery project id.
#
# DatabaseOverridable: true
# Tags: bigquery
google.bigquery.project.id=

# Google BigQuery security credentials path for the json file containing the credentials to connect to Big Query.
#
# DatabaseOverridable: true
# Tags: bigquery
google.bigquery.security.credentials.path";

# Determines if the size of a LOB value should be checked before extracting to prevent
# a JVM crash that can occur if the size of a LOB is bigger than the max size of a java array
# 2^31 - 1
Expand Down
Expand Up @@ -59,5 +59,6 @@ private DatabaseNamesConstants() {
public final static String KAFKA = "kafka";
public final static String SNOWFLAKE = "snowflake";
public final static String HBASE = "hbase";
public final static String BIGQUERY = "bigquery";

}

0 comments on commit b248b08

Please sign in to comment.