Skip to content

Commit

Permalink
0001920: Redshift database dialect - add bullk loader
Browse files Browse the repository at this point in the history
  • Loading branch information
erilong committed Aug 25, 2014
1 parent f5d2885 commit f85dda2
Show file tree
Hide file tree
Showing 5 changed files with 275 additions and 0 deletions.
5 changes: 5 additions & 0 deletions symmetric-client/pom.xml
Expand Up @@ -158,6 +158,11 @@
<artifactId>jaybird</artifactId>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk</artifactId>
<scope>provided</scope>
</dependency>
<!-- Required for jaybird -->
<dependency>
<groupId>org.apache.geronimo.specs</groupId>
Expand Down
@@ -0,0 +1,74 @@
/**
* Licensed to JumpMind Inc under one or more contributor
* license agreements. See the NOTICE file distributed
* with this work for additional information regarding
* copyright ownership. JumpMind Inc licenses this file
* to you under the GNU General Public License, version 3.0 (GPLv3)
* (the "License"); you may not use this file except in compliance
* with the License.
*
* You should have received a copy of the GNU General Public License,
* version 3.0 (GPLv3) along with this library; if not, see
* <http://www.gnu.org/licenses/>.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.jumpmind.symmetric.ext;

import java.util.List;

import org.jumpmind.db.platform.DatabaseNamesConstants;
import org.jumpmind.db.platform.IDatabasePlatform;
import org.jumpmind.extension.IBuiltInExtensionPoint;
import org.jumpmind.symmetric.ISymmetricEngine;
import org.jumpmind.symmetric.db.ISymmetricDialect;
import org.jumpmind.symmetric.io.RedshiftBulkDatabaseWriter;
import org.jumpmind.symmetric.io.data.IDataWriter;
import org.jumpmind.symmetric.io.data.writer.Conflict;
import org.jumpmind.symmetric.io.data.writer.IDatabaseWriterErrorHandler;
import org.jumpmind.symmetric.io.data.writer.IDatabaseWriterFilter;
import org.jumpmind.symmetric.io.data.writer.ResolvedData;
import org.jumpmind.symmetric.io.data.writer.TransformWriter;
import org.jumpmind.symmetric.load.IDataLoaderFactory;
import org.jumpmind.symmetric.service.IParameterService;

public class RedshiftBulkDataLoaderFactory implements IDataLoaderFactory, ISymmetricEngineAware, IBuiltInExtensionPoint {

private ISymmetricEngine engine;

public RedshiftBulkDataLoaderFactory() {
}

public String getTypeName() {
return "redshift_bulk";
}

public IDataWriter getDataWriter(String sourceNodeId, ISymmetricDialect symmetricDialect, TransformWriter transformWriter,
List<IDatabaseWriterFilter> filters, List<IDatabaseWriterErrorHandler> errorHandlers,
List<? extends Conflict> conflictSettings, List<ResolvedData> resolvedData) {

IParameterService param = engine.getParameterService();
int maxRowsBeforeFlush = param.getInt("redshift.bulk.load.max.rows.before.flush", 100000);
long maxBytesBeforeFlush = param.getLong("redshift.bulk.load.max.bytes.before.flush", 1000000000);
String bucket = param.getString("redshift.bulk.load.s3.bucket");
String accessKey = param.getString("redshift.bulk.load.s3.access.key");
String secretKey = param.getString("redshift.bulk.load.s3.secret.key");

return new RedshiftBulkDatabaseWriter(symmetricDialect.getPlatform(), engine.getStagingManager(), maxRowsBeforeFlush,
maxBytesBeforeFlush, bucket, accessKey, secretKey);
}

public void setSymmetricEngine(ISymmetricEngine engine) {
this.engine = engine;
}

public boolean isPlatformSupported(IDatabasePlatform platform) {
return DatabaseNamesConstants.REDSHIFT.equals(platform.getName());
}

}
@@ -0,0 +1,189 @@
/**
* Licensed to JumpMind Inc under one or more contributor
* license agreements. See the NOTICE file distributed
* with this work for additional information regarding
* copyright ownership. JumpMind Inc licenses this file
* to you under the GNU General Public License, version 3.0 (GPLv3)
* (the "License"); you may not use this file except in compliance
* with the License.
*
* You should have received a copy of the GNU General Public License,
* version 3.0 (GPLv3) along with this library; if not, see
* <http://www.gnu.org/licenses/>.
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.jumpmind.symmetric.io;

import java.sql.Connection;
import java.sql.SQLException;
import java.sql.Statement;

import org.jumpmind.db.model.Column;
import org.jumpmind.db.model.Table;
import org.jumpmind.db.platform.IDatabasePlatform;
import org.jumpmind.db.sql.JdbcSqlTransaction;
import org.jumpmind.symmetric.csv.CsvWriter;
import org.jumpmind.symmetric.io.data.CsvData;
import org.jumpmind.symmetric.io.data.CsvUtils;
import org.jumpmind.symmetric.io.data.DataEventType;
import org.jumpmind.symmetric.io.data.writer.DataWriterStatisticConstants;
import org.jumpmind.symmetric.io.data.writer.DefaultDatabaseWriter;
import org.jumpmind.symmetric.io.stage.IStagedResource;
import org.jumpmind.symmetric.io.stage.IStagingManager;

import com.amazonaws.AmazonClientException;
import com.amazonaws.AmazonServiceException;
import com.amazonaws.auth.BasicAWSCredentials;
import com.amazonaws.services.s3.AmazonS3;
import com.amazonaws.services.s3.AmazonS3Client;

public class RedshiftBulkDatabaseWriter extends DefaultDatabaseWriter {

protected IStagingManager stagingManager;
protected IStagedResource stagedInputFile;
protected int loadedRows = 0;
protected long loadedBytes = 0;
protected boolean needsExplicitIds;
protected Table table = null;

protected int maxRowsBeforeFlush;
protected long maxBytesBeforeFlush;
private String bucket;
private String accessKey;
private String secretKey;

public RedshiftBulkDatabaseWriter(IDatabasePlatform platform, IStagingManager stagingManager, int maxRowsBeforeFlush,
long maxBytesBeforeFlush, String bucket, String accessKey, String secretKey) {
super(platform);
this.stagingManager = stagingManager;
this.maxRowsBeforeFlush = maxRowsBeforeFlush;
this.maxBytesBeforeFlush = maxBytesBeforeFlush;
this.bucket = bucket;
this.accessKey = accessKey;
this.secretKey = secretKey;
}

public boolean start(Table table) {
this.table = table;
if (super.start(table)) {
needsExplicitIds = false;
for (Column column : targetTable.getColumns()) {
if (column.isAutoIncrement()) {
needsExplicitIds = true;
break;
}
}

if (stagedInputFile == null) {
createStagingFile();
}
return true;
} else {
return false;
}
}

@Override
public void end(Table table) {
try {
flush();
stagedInputFile.close();
stagedInputFile.delete();
} finally {
super.end(table);
}
}

public void write(CsvData data) {
DataEventType dataEventType = data.getDataEventType();

switch (dataEventType) {
case INSERT:
statistics.get(batch).increment(DataWriterStatisticConstants.STATEMENTCOUNT);
statistics.get(batch).increment(DataWriterStatisticConstants.LINENUMBER);
statistics.get(batch).startTimer(DataWriterStatisticConstants.DATABASEMILLIS);
try {
String[] parsedData = data.getParsedData(CsvData.ROW_DATA);
String formattedData = CsvUtils.escapeCsvData(parsedData, '\n', '"', CsvWriter.ESCAPE_MODE_DOUBLED, "\\N");
stagedInputFile.getWriter().write(formattedData);
loadedRows++;
loadedBytes += formattedData.getBytes().length;
} catch (Exception ex) {
throw getPlatform().getSqlTemplate().translate(ex);
} finally {
statistics.get(batch).stopTimer(DataWriterStatisticConstants.DATABASEMILLIS);
}
break;
case UPDATE:
case DELETE:
default:
flush();
super.write(data);
break;
}

if (loadedRows >= maxRowsBeforeFlush || loadedBytes >= maxBytesBeforeFlush) {
flush();
}
}

protected void flush() {
if (loadedRows > 0) {
stagedInputFile.close();
statistics.get(batch).startTimer(DataWriterStatisticConstants.DATABASEMILLIS);
AmazonS3 s3client = new AmazonS3Client(new BasicAWSCredentials(accessKey, secretKey));
String objectKey = stagedInputFile.getFile().getName();
try {
s3client.putObject(bucket, objectKey, stagedInputFile.getFile());
} catch (AmazonServiceException ase) {
log.error("Exception from AWS service: " + ase.getMessage());
} catch (AmazonClientException ace) {
log.error("Exception from AWS client: " + ace.getMessage());
}

try {
JdbcSqlTransaction jdbcTransaction = (JdbcSqlTransaction) transaction;
Connection c = jdbcTransaction.getConnection();
String sql = "COPY " + getTargetTable().getFullyQualifiedTableName() +
" (" + Table.getCommaDeliminatedColumns(table.getColumns()) +
") FROM 's3://" + bucket + "/" + objectKey +
"' CREDENTIALS 'aws_access_key_id=" + accessKey + ";aws_secret_access_key=" + secretKey +
"' CSV DATEFORMAT 'YYYY-MM-DD HH:MI:SS' " + (needsExplicitIds ? "EXPLICIT_IDS" : "");
Statement stmt = c.createStatement();

log.debug(sql);
stmt.execute(sql);
stmt.close();
transaction.commit();
} catch (SQLException ex) {
throw platform.getSqlTemplate().translate(ex);
} finally {
statistics.get(batch).stopTimer(DataWriterStatisticConstants.DATABASEMILLIS);
}

stagedInputFile.delete();
try {
s3client.deleteObject(bucket, objectKey);
} catch (AmazonServiceException ase) {
log.error("Exception from AWS service: " + ase.getMessage());
} catch (AmazonClientException ace) {
log.error("Exception from AWS client: " + ace.getMessage());
}

createStagingFile();
loadedRows = 0;
loadedBytes = 0;
}
}

protected void createStagingFile() {
stagedInputFile = stagingManager.create(0, "bulkloaddir", table.getName() + getBatch().getBatchId() + ".csv");
}

}
2 changes: 2 additions & 0 deletions symmetric-client/src/main/resources/symmetric-ext-points.xml
Expand Up @@ -36,5 +36,7 @@
<bean id="mssqlBulkLoaderFactory" class="org.jumpmind.symmetric.ext.MsSqlBulkDataLoaderFactory" />

<bean id="mysqlBulkLoaderFactory" class="org.jumpmind.symmetric.ext.MySqlBulkDataLoaderFactory" />

<bean id="redshiftBulkLoaderFactory" class="org.jumpmind.symmetric.ext.RedshiftBulkDataLoaderFactory" />

</beans>
5 changes: 5 additions & 0 deletions symmetric-parent/pom.xml
Expand Up @@ -939,6 +939,11 @@
<artifactId>sqlite-jdbc</artifactId>
<version>${version.sqlite}</version>
</dependency>
<dependency>
<groupId>com.amazonaws</groupId>
<artifactId>aws-java-sdk</artifactId>
<version>1.8.9.1</version>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-all</artifactId>
Expand Down

0 comments on commit f85dda2

Please sign in to comment.