diff --git a/symmetric-client/pom.xml b/symmetric-client/pom.xml index 49662bf6b8..9cc06f53a5 100644 --- a/symmetric-client/pom.xml +++ b/symmetric-client/pom.xml @@ -158,6 +158,11 @@ jaybird provided + + com.amazonaws + aws-java-sdk + provided + org.apache.geronimo.specs diff --git a/symmetric-client/src/main/java/org/jumpmind/symmetric/ext/RedshiftBulkDataLoaderFactory.java b/symmetric-client/src/main/java/org/jumpmind/symmetric/ext/RedshiftBulkDataLoaderFactory.java new file mode 100644 index 0000000000..e3ee036cad --- /dev/null +++ b/symmetric-client/src/main/java/org/jumpmind/symmetric/ext/RedshiftBulkDataLoaderFactory.java @@ -0,0 +1,74 @@ +/** + * Licensed to JumpMind Inc under one or more contributor + * license agreements. See the NOTICE file distributed + * with this work for additional information regarding + * copyright ownership. JumpMind Inc licenses this file + * to you under the GNU General Public License, version 3.0 (GPLv3) + * (the "License"); you may not use this file except in compliance + * with the License. + * + * You should have received a copy of the GNU General Public License, + * version 3.0 (GPLv3) along with this library; if not, see + * . + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.jumpmind.symmetric.ext; + +import java.util.List; + +import org.jumpmind.db.platform.DatabaseNamesConstants; +import org.jumpmind.db.platform.IDatabasePlatform; +import org.jumpmind.extension.IBuiltInExtensionPoint; +import org.jumpmind.symmetric.ISymmetricEngine; +import org.jumpmind.symmetric.db.ISymmetricDialect; +import org.jumpmind.symmetric.io.RedshiftBulkDatabaseWriter; +import org.jumpmind.symmetric.io.data.IDataWriter; +import org.jumpmind.symmetric.io.data.writer.Conflict; +import org.jumpmind.symmetric.io.data.writer.IDatabaseWriterErrorHandler; +import org.jumpmind.symmetric.io.data.writer.IDatabaseWriterFilter; +import org.jumpmind.symmetric.io.data.writer.ResolvedData; +import org.jumpmind.symmetric.io.data.writer.TransformWriter; +import org.jumpmind.symmetric.load.IDataLoaderFactory; +import org.jumpmind.symmetric.service.IParameterService; + +public class RedshiftBulkDataLoaderFactory implements IDataLoaderFactory, ISymmetricEngineAware, IBuiltInExtensionPoint { + + private ISymmetricEngine engine; + + public RedshiftBulkDataLoaderFactory() { + } + + public String getTypeName() { + return "redshift_bulk"; + } + + public IDataWriter getDataWriter(String sourceNodeId, ISymmetricDialect symmetricDialect, TransformWriter transformWriter, + List filters, List errorHandlers, + List conflictSettings, List resolvedData) { + + IParameterService param = engine.getParameterService(); + int maxRowsBeforeFlush = param.getInt("redshift.bulk.load.max.rows.before.flush", 100000); + long maxBytesBeforeFlush = param.getLong("redshift.bulk.load.max.bytes.before.flush", 1000000000); + String bucket = param.getString("redshift.bulk.load.s3.bucket"); + String accessKey = param.getString("redshift.bulk.load.s3.access.key"); + String secretKey = param.getString("redshift.bulk.load.s3.secret.key"); + + return new RedshiftBulkDatabaseWriter(symmetricDialect.getPlatform(), engine.getStagingManager(), maxRowsBeforeFlush, + maxBytesBeforeFlush, bucket, accessKey, secretKey); + } + + public void setSymmetricEngine(ISymmetricEngine engine) { + this.engine = engine; + } + + public boolean isPlatformSupported(IDatabasePlatform platform) { + return DatabaseNamesConstants.REDSHIFT.equals(platform.getName()); + } + +} diff --git a/symmetric-client/src/main/java/org/jumpmind/symmetric/io/RedshiftBulkDatabaseWriter.java b/symmetric-client/src/main/java/org/jumpmind/symmetric/io/RedshiftBulkDatabaseWriter.java new file mode 100644 index 0000000000..585279726a --- /dev/null +++ b/symmetric-client/src/main/java/org/jumpmind/symmetric/io/RedshiftBulkDatabaseWriter.java @@ -0,0 +1,189 @@ +/** + * Licensed to JumpMind Inc under one or more contributor + * license agreements. See the NOTICE file distributed + * with this work for additional information regarding + * copyright ownership. JumpMind Inc licenses this file + * to you under the GNU General Public License, version 3.0 (GPLv3) + * (the "License"); you may not use this file except in compliance + * with the License. + * + * You should have received a copy of the GNU General Public License, + * version 3.0 (GPLv3) along with this library; if not, see + * . + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.jumpmind.symmetric.io; + +import java.sql.Connection; +import java.sql.SQLException; +import java.sql.Statement; + +import org.jumpmind.db.model.Column; +import org.jumpmind.db.model.Table; +import org.jumpmind.db.platform.IDatabasePlatform; +import org.jumpmind.db.sql.JdbcSqlTransaction; +import org.jumpmind.symmetric.csv.CsvWriter; +import org.jumpmind.symmetric.io.data.CsvData; +import org.jumpmind.symmetric.io.data.CsvUtils; +import org.jumpmind.symmetric.io.data.DataEventType; +import org.jumpmind.symmetric.io.data.writer.DataWriterStatisticConstants; +import org.jumpmind.symmetric.io.data.writer.DefaultDatabaseWriter; +import org.jumpmind.symmetric.io.stage.IStagedResource; +import org.jumpmind.symmetric.io.stage.IStagingManager; + +import com.amazonaws.AmazonClientException; +import com.amazonaws.AmazonServiceException; +import com.amazonaws.auth.BasicAWSCredentials; +import com.amazonaws.services.s3.AmazonS3; +import com.amazonaws.services.s3.AmazonS3Client; + +public class RedshiftBulkDatabaseWriter extends DefaultDatabaseWriter { + + protected IStagingManager stagingManager; + protected IStagedResource stagedInputFile; + protected int loadedRows = 0; + protected long loadedBytes = 0; + protected boolean needsExplicitIds; + protected Table table = null; + + protected int maxRowsBeforeFlush; + protected long maxBytesBeforeFlush; + private String bucket; + private String accessKey; + private String secretKey; + + public RedshiftBulkDatabaseWriter(IDatabasePlatform platform, IStagingManager stagingManager, int maxRowsBeforeFlush, + long maxBytesBeforeFlush, String bucket, String accessKey, String secretKey) { + super(platform); + this.stagingManager = stagingManager; + this.maxRowsBeforeFlush = maxRowsBeforeFlush; + this.maxBytesBeforeFlush = maxBytesBeforeFlush; + this.bucket = bucket; + this.accessKey = accessKey; + this.secretKey = secretKey; + } + + public boolean start(Table table) { + this.table = table; + if (super.start(table)) { + needsExplicitIds = false; + for (Column column : targetTable.getColumns()) { + if (column.isAutoIncrement()) { + needsExplicitIds = true; + break; + } + } + + if (stagedInputFile == null) { + createStagingFile(); + } + return true; + } else { + return false; + } + } + + @Override + public void end(Table table) { + try { + flush(); + stagedInputFile.close(); + stagedInputFile.delete(); + } finally { + super.end(table); + } + } + + public void write(CsvData data) { + DataEventType dataEventType = data.getDataEventType(); + + switch (dataEventType) { + case INSERT: + statistics.get(batch).increment(DataWriterStatisticConstants.STATEMENTCOUNT); + statistics.get(batch).increment(DataWriterStatisticConstants.LINENUMBER); + statistics.get(batch).startTimer(DataWriterStatisticConstants.DATABASEMILLIS); + try { + String[] parsedData = data.getParsedData(CsvData.ROW_DATA); + String formattedData = CsvUtils.escapeCsvData(parsedData, '\n', '"', CsvWriter.ESCAPE_MODE_DOUBLED, "\\N"); + stagedInputFile.getWriter().write(formattedData); + loadedRows++; + loadedBytes += formattedData.getBytes().length; + } catch (Exception ex) { + throw getPlatform().getSqlTemplate().translate(ex); + } finally { + statistics.get(batch).stopTimer(DataWriterStatisticConstants.DATABASEMILLIS); + } + break; + case UPDATE: + case DELETE: + default: + flush(); + super.write(data); + break; + } + + if (loadedRows >= maxRowsBeforeFlush || loadedBytes >= maxBytesBeforeFlush) { + flush(); + } + } + + protected void flush() { + if (loadedRows > 0) { + stagedInputFile.close(); + statistics.get(batch).startTimer(DataWriterStatisticConstants.DATABASEMILLIS); + AmazonS3 s3client = new AmazonS3Client(new BasicAWSCredentials(accessKey, secretKey)); + String objectKey = stagedInputFile.getFile().getName(); + try { + s3client.putObject(bucket, objectKey, stagedInputFile.getFile()); + } catch (AmazonServiceException ase) { + log.error("Exception from AWS service: " + ase.getMessage()); + } catch (AmazonClientException ace) { + log.error("Exception from AWS client: " + ace.getMessage()); + } + + try { + JdbcSqlTransaction jdbcTransaction = (JdbcSqlTransaction) transaction; + Connection c = jdbcTransaction.getConnection(); + String sql = "COPY " + getTargetTable().getFullyQualifiedTableName() + + " (" + Table.getCommaDeliminatedColumns(table.getColumns()) + + ") FROM 's3://" + bucket + "/" + objectKey + + "' CREDENTIALS 'aws_access_key_id=" + accessKey + ";aws_secret_access_key=" + secretKey + + "' CSV DATEFORMAT 'YYYY-MM-DD HH:MI:SS' " + (needsExplicitIds ? "EXPLICIT_IDS" : ""); + Statement stmt = c.createStatement(); + + log.debug(sql); + stmt.execute(sql); + stmt.close(); + transaction.commit(); + } catch (SQLException ex) { + throw platform.getSqlTemplate().translate(ex); + } finally { + statistics.get(batch).stopTimer(DataWriterStatisticConstants.DATABASEMILLIS); + } + + stagedInputFile.delete(); + try { + s3client.deleteObject(bucket, objectKey); + } catch (AmazonServiceException ase) { + log.error("Exception from AWS service: " + ase.getMessage()); + } catch (AmazonClientException ace) { + log.error("Exception from AWS client: " + ace.getMessage()); + } + + createStagingFile(); + loadedRows = 0; + loadedBytes = 0; + } + } + + protected void createStagingFile() { + stagedInputFile = stagingManager.create(0, "bulkloaddir", table.getName() + getBatch().getBatchId() + ".csv"); + } + +} diff --git a/symmetric-client/src/main/resources/symmetric-ext-points.xml b/symmetric-client/src/main/resources/symmetric-ext-points.xml index f650aa824a..41003f5b75 100644 --- a/symmetric-client/src/main/resources/symmetric-ext-points.xml +++ b/symmetric-client/src/main/resources/symmetric-ext-points.xml @@ -36,5 +36,7 @@ + + \ No newline at end of file diff --git a/symmetric-parent/pom.xml b/symmetric-parent/pom.xml index aaff72a9ee..b7275f31c8 100644 --- a/symmetric-parent/pom.xml +++ b/symmetric-parent/pom.xml @@ -939,6 +939,11 @@ sqlite-jdbc ${version.sqlite} + + com.amazonaws + aws-java-sdk + 1.8.9.1 + org.mockito mockito-all