Skip to content
This repository has been archived by the owner on Jul 9, 2021. It is now read-only.

Commit

Permalink
SQOOP-1341: Sqoop Export Upsert for MySQL lacks batch support
Browse files Browse the repository at this point in the history
(Andy Skelton via Jarek Jarcec Cecho)
  • Loading branch information
Jarek Jarcec Cecho committed Jun 28, 2014
1 parent 462bd91 commit d03faf3
Showing 1 changed file with 39 additions and 8 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,10 @@
import org.apache.hadoop.mapreduce.TaskAttemptContext;

import java.io.IOException;
import java.sql.Connection;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.util.List;

/**
* Output format for MySQL Update/insert functionality. We will use MySQL
Expand Down Expand Up @@ -63,7 +66,33 @@ public MySQLUpsertRecordWriter(TaskAttemptContext context)
* {@inheritDoc}
*/
@Override
protected String getUpdateStatement() {
protected PreparedStatement getPreparedStatement(
List<SqoopRecord> userRecords) throws SQLException {

PreparedStatement stmt = null;

// Synchronize on connection to ensure this does not conflict
// with the operations in the update thread.
Connection conn = getConnection();
synchronized (conn) {
stmt = conn.prepareStatement(getUpdateStatement(userRecords.size()));
}

// Inject the record parameters into the UPDATE and WHERE clauses. This
// assumes that the update key column is the last column serialized in
// by the underlying record. Our code auto-gen process for exports was
// responsible for taking care of this constraint.
int i = 0;
for (SqoopRecord record : userRecords) {
record.write(stmt, i);
i += columnNames.length;
}
stmt.addBatch();

return stmt;
}

protected String getUpdateStatement(int numRows) {
boolean first;
StringBuilder sb = new StringBuilder();
sb.append("INSERT INTO ");
Expand All @@ -80,14 +109,16 @@ protected String getUpdateStatement() {
}

sb.append(") VALUES(");
first = true;
for (int i = 0; i < columnNames.length; i++) {
if (first) {
first = false;
} else {
sb.append(", ");
for (int i = 0; i < numRows; i++) {
if (i > 0) {
sb.append("),(");
}
for (int j = 0; j < columnNames.length; j++) {
if (j > 0) {
sb.append(", ");
}
sb.append("?");
}
sb.append("?");
}

sb.append(") ON DUPLICATE KEY UPDATE ");
Expand Down

0 comments on commit d03faf3

Please sign in to comment.