Skip to content
Permalink
Browse files

0003825: Oracle bulk loader using SQL*Loader sqlldr

  • Loading branch information...
elong
elong committed Dec 13, 2018
1 parent 46cdec1 commit a90a01b45860c2055c70b6a1a6b8293166805995
@@ -52,6 +52,12 @@

public class OracleBulkDatabaseWriter extends AbstractBulkDatabaseWriter {

protected final static String BLOB_START = "<symblob>";

protected final static String BLOB_END = "</symblob>";

protected final static String END_OF_LINE = "<symeol/>";

protected final Logger logger = LoggerFactory.getLogger(getClass());

protected IStagingManager stagingManager;
@@ -159,8 +165,11 @@ protected void createStagingFile() {
try {
OutputStream out = controlResource.getOutputStream();
out.write(("LOAD DATA\n").getBytes());
out.write(("INFILE '" + infile + "'\n").getBytes());
out.write(("BADFILE '" + baseName + ".bad'\n").getBytes());
out.write(("INFILE '" + infile + "'").getBytes());
if (hasBinaryType) {
out.write((" \"str '" + END_OF_LINE + "'\"").getBytes());
}
out.write(("\nBADFILE '" + baseName + ".bad'\n").getBytes());
out.write(("APPEND INTO TABLE " + targetTable.getName() + "\n").getBytes());

if (useIncomingStageFile) {
@@ -191,8 +200,7 @@ protected void createStagingFile() {
String local = column.getMappedTypeCode() == ColumnTypes.ORACLE_TIMESTAMPLTZ ? "LOCAL " : "";
columns.append(" TIMESTAMP" + scale + " WITH " + local + "TIME ZONE 'YYYY-MM-DD HH24:MI:SS.FF9 TZH:TZM'");
} else if (column.isOfBinaryType()) {
// TODO: use byte sequence instead?
columns.append(" ENCLOSED BY '<sym_blob>' AND '</sym_blob>'");
columns.append(" ENCLOSED BY '" + BLOB_START + "' AND '" + BLOB_END + "'");
}
}
columns.append(")\n");
@@ -246,19 +254,19 @@ protected void bulkWrite(CsvData data) {
if (i > 0) {
out.write(',');
}
out.write("<sym_blob>".getBytes());
out.write(BLOB_START.getBytes());
if (batch.getBinaryEncoding().equals(BinaryEncoding.HEX)) {
out.write(Hex.decodeHex(parsedData[i].toCharArray()));
} else if (batch.getBinaryEncoding().equals(BinaryEncoding.BASE64)) {
out.write(Base64.decodeBase64(parsedData[i].getBytes()));
}
out.write("</sym_blob>".getBytes());
out.write(BLOB_END.getBytes());
} else {
writer.write(parsedData[i], true);
writer.flush();
}
}
writer.endRecord();
writer.write(END_OF_LINE);
writer.close();
byteData = out.toByteArray();
} else {
@@ -317,11 +325,9 @@ protected void flush() {
int rc = process.waitFor();
if (rc == 2) {
if (!useIncomingStageFile) {
System.exit(1);
throw new RuntimeException("All or some rows were rejected.");
}
} else if (rc != 0) {
System.exit(1);
throw new RuntimeException("Process builder returned " + rc);
}

0 comments on commit a90a01b

Please sign in to comment.
You can’t perform that action at this time.