Skip to content

Commit

Permalink
0002296: MySQL bulk loader produced invalid data file if the first
Browse files Browse the repository at this point in the history
column in table is binary.
0002297: MySQL bulk loader should deliminate column names in load
command
  • Loading branch information
chenson42 committed Oct 22, 2015
1 parent 57223b8 commit c58879b
Showing 1 changed file with 28 additions and 4 deletions.
Expand Up @@ -128,21 +128,27 @@ public void write(CsvData data) {
writer.setForceQualifier(true);
writer.setNullString("\\N");
Column[] columns = targetTable.getColumns();
boolean lastWasBinary = false;
for (int i = 0; i < columns.length; i++) {
if (columns[i].isOfBinaryType() && parsedData[i] != null) {
if (i > 0) {
out.write(',');
out.write(',');
}
out.write('"');
if (batch.getBinaryEncoding().equals(BinaryEncoding.HEX)) {
out.write(escape(Hex.decodeHex(parsedData[i].toCharArray())));
out.write(escape(Hex.decodeHex(parsedData[i].toCharArray())));
} else if (batch.getBinaryEncoding().equals(BinaryEncoding.BASE64)) {
out.write(escape(Base64.decodeBase64(parsedData[i].getBytes())));
out.write(escape(Base64.decodeBase64(parsedData[i].getBytes())));
}
out.write('"');
lastWasBinary = true;
} else {
if (lastWasBinary) {
out.write(',');
}
writer.write(parsedData[i], true);
writer.flush();
lastWasBinary = false;
}
}
writer.endRecord();
Expand Down Expand Up @@ -190,7 +196,7 @@ protected void flush() {
(isReplace ? "REPLACE " : "IGNORE ") + "INTO TABLE " +
this.getTargetTable().getQualifiedTableName(quote, catalogSeparator, schemaSeparator) +
" FIELDS TERMINATED BY ',' OPTIONALLY ENCLOSED BY '\"' ESCAPED BY '\\\\' LINES TERMINATED BY '\\n' STARTING BY ''" +
" (" + Table.getCommaDeliminatedColumns(table.getColumns()) + ")";
" (" + getCommaDeliminatedColumns(table.getColumns()) + ")";
Statement stmt = c.createStatement();

//TODO: clean this up, deal with errors, etc.?
Expand All @@ -209,6 +215,24 @@ protected void flush() {
loadedBytes = 0;
}
}

protected String getCommaDeliminatedColumns(Column[] cols) {
DatabaseInfo dbInfo = platform.getDatabaseInfo();
String quote = dbInfo.getDelimiterToken();
StringBuilder columns = new StringBuilder();
if (cols != null && cols.length > 0) {
for (Column column : cols) {
columns.append(quote);
columns.append(column.getName());
columns.append(quote);
columns.append(",");
}
columns.replace(columns.length() - 1, columns.length(), "");
return columns.toString();
} else {
return " ";
}
}

protected byte[] escape(byte[] byteData) {
ArrayList<Integer> indexes = new ArrayList<Integer>();
Expand Down

0 comments on commit c58879b

Please sign in to comment.