Skip to content
Permalink
Browse files

0003825: Oracle bulk loader using SQL*Loader sqlldr

  • Loading branch information...
elong
elong committed Dec 13, 2018
1 parent f72a317 commit 46cdec1869ba3aa8f5dfe8b271714d427b9b809c
@@ -23,7 +23,6 @@
import java.io.BufferedReader;
import java.io.ByteArrayOutputStream;
import java.io.File;
import java.io.IOException;
import java.io.InputStreamReader;
import java.io.OutputStream;
import java.io.OutputStreamWriter;
@@ -40,6 +39,7 @@
import org.jumpmind.db.util.BinaryEncoding;
import org.jumpmind.symmetric.common.Constants;
import org.jumpmind.symmetric.csv.CsvWriter;
import org.jumpmind.symmetric.io.data.Batch;
import org.jumpmind.symmetric.io.data.CsvData;
import org.jumpmind.symmetric.io.data.CsvUtils;
import org.jumpmind.symmetric.io.data.DataEventType;
@@ -126,6 +126,10 @@ public boolean start(Table table) {
}
if (dataResource == null) {
createStagingFile();
if (useIncomingStageFile) {
rows = 1;
return false;
}
}
return true;
} else {
@@ -135,7 +139,8 @@ public boolean start(Table table) {

protected void createStagingFile() {
long batchId = getBatch().getBatchId();
controlResource = stagingManager.create("bulkloaddir", StringUtils.leftPad(batchId + "-ctl", 14, "0"));
String baseName = StringUtils.leftPad(batchId + "-ctl", 14, "0");
controlResource = stagingManager.create("bulkloaddir", baseName);
String infile = null;
useIncomingStageFile = false;

@@ -153,7 +158,10 @@ protected void createStagingFile() {

try {
OutputStream out = controlResource.getOutputStream();
out.write(("LOAD DATA\nINFILE '" + infile + "'\nAPPEND INTO TABLE " + targetTable.getName() + "\n").getBytes());
out.write(("LOAD DATA\n").getBytes());
out.write(("INFILE '" + infile + "'\n").getBytes());
out.write(("BADFILE '" + baseName + ".bad'\n").getBytes());
out.write(("APPEND INTO TABLE " + targetTable.getName() + "\n").getBytes());

if (useIncomingStageFile) {
out.write("WHEN (01:06 = 'insert')\n".getBytes());
@@ -183,6 +191,7 @@ protected void createStagingFile() {
String local = column.getMappedTypeCode() == ColumnTypes.ORACLE_TIMESTAMPLTZ ? "LOCAL " : "";
columns.append(" TIMESTAMP" + scale + " WITH " + local + "TIME ZONE 'YYYY-MM-DD HH24:MI:SS.FF9 TZH:TZM'");
} else if (column.isOfBinaryType()) {
// TODO: use byte sequence instead?
columns.append(" ENCLOSED BY '<sym_blob>' AND '</sym_blob>'");
}
}
@@ -202,13 +211,19 @@ public void end(Table table) {
super.end(table);
}
}

@Override
public void end(Batch batch, boolean inError) {
try {
if (!inError) {
flush();
}
} finally {
super.end(batch, inError);
}
}

protected void bulkWrite(CsvData data) {
if (useIncomingStageFile) {
rows++;
// TODO: throw an exception that causes reading of incoming stage file to be skipped
return;
}
DataEventType dataEventType = data.getDataEventType();

switch (dataEventType) {
@@ -272,7 +287,9 @@ protected void bulkWrite(CsvData data) {

protected void flush() {
if (rows > 0) {
dataResource.close();
if (!useIncomingStageFile) {
dataResource.close();
}
statistics.get(batch).startTimer(DataWriterStatisticConstants.LOADMILLIS);
try {
File parentDir = controlResource.getFile().getParentFile();
@@ -288,29 +305,24 @@ protected void flush() {
ProcessBuilder pb = new ProcessBuilder(cmd);
pb.directory(parentDir);
pb.redirectErrorStream(true);
Process process = null;
try {
process = pb.start();

BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()));
String line = null;
while ((line = reader.readLine()) != null) {
if (!line.equals("")) {
logger.info("SQL*Loader: {}", line);
}
}
reader.close();
Process process = pb.start();
BufferedReader reader = new BufferedReader(new InputStreamReader(process.getInputStream()));
String line = null;
while ((line = reader.readLine()) != null) {
if (!line.equals("")) {
logger.info("SQL*Loader: {}", line);
}
}

int rc = process.waitFor();
if (rc == 2) {
if (!useIncomingStageFile) {
throw new RuntimeException("All or some rows were rejected.");
}
} else if (rc != 0) {
throw new RuntimeException("Process builder returned " + rc);
}
} catch (IOException e) {
throw new RuntimeException(e);
int rc = process.waitFor();
if (rc == 2) {
if (!useIncomingStageFile) {
System.exit(1);
throw new RuntimeException("All or some rows were rejected.");
}
} else if (rc != 0) {
System.exit(1);
throw new RuntimeException("Process builder returned " + rc);
}

if (!useIncomingStageFile) {
@@ -321,6 +333,9 @@ protected void flush() {
new File(absFile.getPath().replace(".create", ".log")).delete();
controlResource.delete();
} catch (Exception e) {
if (e instanceof RuntimeException) {
throw (RuntimeException) e;
}
throw new RuntimeException(e);
} finally {
statistics.get(batch).stopTimer(DataWriterStatisticConstants.LOADMILLIS);

0 comments on commit 46cdec1

Please sign in to comment.
You can’t perform that action at this time.