Skip to content
Permalink
Browse files

0003880: Snowflake data loader

  • Loading branch information...
jumpmind-josh committed Mar 8, 2019
1 parent 88fb1f3 commit 27749d0aebbcec6da72c0409dff9c8c7e8b6da60
@@ -239,7 +239,9 @@ subprojects { subproject ->
}
provided "org.apache.kafka:kafka-clients:1.1.0"
provided "org.apache.avro:avro:1.8.2"
provided "io.confluent:kafka-avro-serializer:3.2.1"
provided ("io.confluent:kafka-avro-serializer:3.2.1") {
exclude group: 'com.fasterxml.jackson.core'
}

testCompile fileTree(dir: System.getProperty("user.home") + '/.symmetricds/lib', include: '*.jar')
testCompile "junit:junit:$junitVersion"
@@ -487,26 +487,26 @@ When locating a table, SymmetricDS uses the default catalog and schema unless th

|===

include::ignite.ad[]
include::db2.ad[]
include::derby.ad[]
include::firebird.ad[]
include::greenplum.ad[]
include::h2.ad[]
include::hsqldb.ad[]
include::ignite.ad[]
include::informix.ad[]
include::interbase.ad[]
include::kafka.ad[]
include::mariadb.ad[]
include::mysql.ad[]
include::mongodb.ad[]
include::mssqlserver.ad[]
include::mysql.ad[]
include::oracle.ad[]
include::postgresql.ad[]
include::sybase-sqlanywhere.ad[]
include::mssqlserver.ad[]
include::sqllite.ad[]
include::sybase-ase.ad[]
include::redshift.ad[]
include::snowflake.ad[]
include::sqlite.ad[]
include::sybase-ase.ad[]
include::sybase-sqlanywhere.ad[]
include::teradata.ad[]
include::tibero.ad[]
@@ -35,6 +35,8 @@ continually loaded from SymmetricDS, the "vacuum" command can be run after commi
for the "batch commit" event, like this:

[source, JAVA]
----
for (String tablename : context.getParsedTables().keySet()) {
engine.getSqlTemplate().update("vacuum " + tablename, new Object[] { } );
}
}
----
@@ -0,0 +1,100 @@
=== Snowflake

Send changes from your relational database to Snowflake.

==== Setup

Snowflake is only support as a load only node in SymmetricDS. See <<Load Only Node >> for details on setting up a load only node in SymmetricDS.


ifdef::pro[]
Setup the Snowflake node by using the <<Add Node>> wizard and selecting Snowflake as the type.

image::appendix/snowflake-database-settings.png[]

After hitting next you can setup advanced options for your Snowflake node.

endif::pro[]

ifndef::pro[]

.Example properties to setup a Snowflake load only node
----
load.only=true
target.db.driver=net.snowflake.client.jdbc.SnowflakeDriver
target.db.url=jdbc:snowflake://<account_name>.snowflakecomputing.com/?db=<database_name>
target.db.user=<snowflake_user>
target.db.password=<snowflake_password>
----

endif::pro[]

==== Loading Data Into Snowflake



ifndef::pro[]
===== Setup reload channels for bulk loading.

Update any reload channels that will be used on the table triggers that will capture changes and send them to snowflake by setting the column data_loader_type to 'bulk'. It is also recommended to increase the batch size so that larger CSV files will be processed instead of the default size on reloads of 10,000 rows.


.Example SQL to setup the main reload channel to use bulk and also update the batch sizes.
[source, SQL]
----
update sym_channel set data_loader_type='bulk', max_batch_size=500000 where channel_id='reload'
----
endif::pro[]

===== Choose a bulk load storage option

SymmetricDS will create and send CSV files to the a desired storage location (see below) as part of the load. Once the CSV files have been uploaded to a selected storage area Snowflake's COPY INTO command will be used to load the data into Snowflake. Once the COPY INTO has completed SymmetricDS will also remove the CSV file from the storage container.

.There are currently 3 supported storage options to stage the CSV files prior to loading into Snowflake
* Snowflake Managed (internal storage)
* AWS: S3
* Azure: Storage Account


SNOWFLAKE MANAGED:: Use a Snowflake managed internal stage.
ifdef::pro[]
image::appendix/snowflake-advanced-settings-snowflake-managed.png[]
endif::pro[]
ifndef::pro[]
[source, properties]
----
snowflake.staging.type=SNOWFLAKE_INTERNAL
snowflake.internal.stage.name=<defaults to symmetricds>
----
endif::pro[]


AWS S3:: Use an existing AWS S3 cloud storage.
ifdef::pro[]
image::appendix/snowflake-advanced-settings-aws-s3.png[]
endif::pro[]
ifndef::pro[]
[source, properties]
----
snowflake.staging.type=AWS_S3
cloud.bulk.load.s3.bucket=
cloud.bulk.load.s3.access.key=
cloud.bulk.load.s3.secret.key=
----
endif::pro[]


AZURE Storage Account:: Use an existing Azure Storage Account.
ifdef::pro[]
image::appendix/snowflake-advanced-settings-azure.png[]
endif::pro[]
ifndef::pro[]
[source, properties]
----
snowflake.staging.type=AZURE
cloud.bulk.load.azure.account.name=<storage account name>
cloud.bulk.load.azure.account.key=
cloud.bulk.load.azure.blob.container=<defaults to symmetricds>
cloud.bulk.load.azure.sas.token=
----
endif::pro[]
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@@ -13,12 +13,21 @@ apply from: symAssembleDir + '/common.gradle'
provided "org.mongodb:mongo-java-driver:2.12.3"
provided "org.codehaus.mojo:animal-sniffer-annotations:$animalSnifferVersion"

provided ("com.amazonaws:aws-java-sdk:1.9.17") {
provided ("com.amazonaws:aws-java-sdk:1.11.510") {
exclude group: 'org.apache.httpcomponents'
exclude group: 'commons-logging'
exclude group: 'com.fasterxml.jackson.core'
exclude group: 'commons-codec'
}

provided ("com.microsoft.azure:azure-storage:8.1.0") {
exclude group: 'com.fasterxml.jackson.core'
}

provided ("net.snowflake:snowflake-jdbc:3.6.27") {
exclude group: 'com.fasterxml.jackson.core'
}

provided 'org.apache.httpcomponents:httpclient:4.5.4' // This is required by com.amazonaws:aws-java-sdk. It is called out here to upgrade the version because of a user's security concerns.

testCompile project(path: ':symmetric-util', configuration: 'testArtifacts')
@@ -27,6 +27,7 @@
import org.jumpmind.db.platform.IDatabasePlatform;
import org.jumpmind.symmetric.ISymmetricEngine;
import org.jumpmind.symmetric.db.ISymmetricDialect;
import org.jumpmind.symmetric.io.RedshiftBulkDatabaseWriter;
import org.jumpmind.symmetric.io.data.IDataWriter;
import org.jumpmind.symmetric.io.data.writer.Conflict;
import org.jumpmind.symmetric.io.data.writer.IDatabaseWriterErrorHandler;
@@ -60,32 +61,9 @@ public IDataWriter getDataWriter(String sourceNodeId, ISymmetricDialect symmetri
List<IDatabaseWriterFilter> filters, List<IDatabaseWriterErrorHandler> errorHandlers,
List<? extends Conflict> conflictSettings, List<ResolvedData> resolvedData) {

int maxRowsBeforeFlush = parameterService.getInt("redshift.bulk.load.max.rows.before.flush", 100000);
long maxBytesBeforeFlush = parameterService.getLong("redshift.bulk.load.max.bytes.before.flush", 1000000000);
String bucket = parameterService.getString("redshift.bulk.load.s3.bucket");
String accessKey = parameterService.getString("redshift.bulk.load.s3.access.key");
String secretKey = parameterService.getString("redshift.bulk.load.s3.secret.key");
String appendToCopyCommand = parameterService.getString("redshift.append.to.copy.command");
String s3Endpoint = parameterService.getString("redshift.bulk.load.s3.endpoint");

try {
Class<?> dbWriterClass = Class.forName("org.jumpmind.symmetric.io.RedshiftBulkDatabaseWriter");
Constructor<?> dbWriterConstructor = dbWriterClass.getConstructor(new Class<?>[] {
IDatabasePlatform.class, IStagingManager.class, List.class,
List.class, Integer.TYPE, Long.TYPE, String.class,
String.class, String.class, String.class, String.class });
return (IDataWriter) dbWriterConstructor.newInstance(symmetricDialect.getPlatform(),
symmetricDialect.getTargetPlatform(), symmetricDialect.getTablePrefix(), stagingManager, filters, errorHandlers,
maxRowsBeforeFlush, maxBytesBeforeFlush, bucket, accessKey, secretKey, appendToCopyCommand, s3Endpoint);

} catch (Exception e) {
log.warn("Failed to create the redshift database writer. Check to see if all of the required jars have been added");
if (e instanceof RuntimeException) {
throw (RuntimeException)e;
} else {
throw new RuntimeException(e);
}
}
return new RedshiftBulkDatabaseWriter(symmetricDialect.getPlatform(), symmetricDialect.getTargetPlatform(),
symmetricDialect.getTablePrefix(), stagingManager, filters, errorHandlers, parameterService);

}

public boolean isPlatformSupported(IDatabasePlatform platform) {
@@ -21,12 +21,10 @@

public class SnowflakeBulkDataLoaderFactory implements IDataLoaderFactory {

private NativeJdbcExtractor jdbcExtractor;
private IStagingManager stagingManager;
private IParameterService parameterService;

public SnowflakeBulkDataLoaderFactory(ISymmetricEngine engine) {
this.jdbcExtractor = JdbcUtils.getNativeJdbcExtractory();
this.stagingManager = engine.getStagingManager();
this.parameterService = engine.getParameterService();
}
@@ -40,10 +38,8 @@ public IDataWriter getDataWriter(String sourceNodeId, ISymmetricDialect symmetri
List<IDatabaseWriterFilter> filters, List<IDatabaseWriterErrorHandler> errorHandlers,
List<? extends Conflict> conflictSettings, List<ResolvedData> resolvedData) {

int maxRowsBeforeFlush = parameterService.getInt("snowflake.bulk.load.max.rows.before.flush", 100000);

return new SnowflakeBulkDatabaseWriter(symmetricDialect.getPlatform(), symmetricDialect.getTargetPlatform(),
symmetricDialect.getTablePrefix(), stagingManager);
symmetricDialect.getTablePrefix(), stagingManager, filters, errorHandlers, parameterService);
}

public boolean isPlatformSupported(IDatabasePlatform platform) {
Oops, something went wrong.

0 comments on commit 27749d0

Please sign in to comment.
You can’t perform that action at this time.