Skip to content
Permalink
Browse files

0003856: Oracle bulk loader issue with a load only target node

  • Loading branch information...
jumpmind-josh committed Jan 14, 2019
1 parent b917fde commit 1bae33f9fb074604be9f1836069f03a57a042e53
@@ -78,66 +78,74 @@ public OracleBulkDatabaseWriter(IDatabasePlatform symmetricPlatform,

public boolean start(Table table) {
if (super.start(table)) {
if (isLoadOnly() && isSymmetricTable(targetTable.getNameLowerCase())) {
return true;
}
// TODO come up with smarter way to build procedures
buildBulkInsertProcedure(targetTable);

return true;
} else {
return false;
}
}

protected void bulkWrite(CsvData data) {
DataEventType dataEventType = data.getDataEventType();

if (lastEventType != null && !lastEventType.equals(dataEventType)) {
flush();
}

lastEventType = dataEventType;

boolean requiresFlush = false;
switch (dataEventType) {
case INSERT:
statistics.get(batch).increment(DataWriterStatisticConstants.ROWCOUNT);
statistics.get(batch).increment(DataWriterStatisticConstants.LINENUMBER);
if (filterBefore(data)) {
Object[] rowData = getPlatform()
.getObjectValues(batch.getBinaryEncoding(), getRowData(data, CsvData.ROW_DATA),
targetTable.getColumns(), false, writerSettings.isFitToColumn());

rowData = convertObjectValues(rowData, targetTable.getColumns());

for (int i = 0; i < rowData.length; i++) {

List<Object> columnList = null;
if (rowArrays.size() > i) {
columnList = rowArrays.get(i);
} else {
columnList = new ArrayList<Object>();
rowArrays.add(columnList);
}
columnList.add(rowData[i]);

if (columnList.size() >= maxRowsBeforeFlush) {
requiresFlush = true;
if (isLoadOnly() && isSymmetricTable(targetTable.getNameLowerCase())) {
writeDefault(data);
} else {
DataEventType dataEventType = data.getDataEventType();

if (lastEventType != null && !lastEventType.equals(dataEventType)) {
flush();
}

lastEventType = dataEventType;

boolean requiresFlush = false;
switch (dataEventType) {
case INSERT:
statistics.get(batch).increment(DataWriterStatisticConstants.ROWCOUNT);
statistics.get(batch).increment(DataWriterStatisticConstants.LINENUMBER);
if (filterBefore(data)) {
Object[] rowData = getPlatform()
.getObjectValues(batch.getBinaryEncoding(), getRowData(data, CsvData.ROW_DATA),
targetTable.getColumns(), false, writerSettings.isFitToColumn());

rowData = convertObjectValues(rowData, targetTable.getColumns());

for (int i = 0; i < rowData.length; i++) {

List<Object> columnList = null;
if (rowArrays.size() > i) {
columnList = rowArrays.get(i);
} else {
columnList = new ArrayList<Object>();
rowArrays.add(columnList);
}
columnList.add(rowData[i]);

if (columnList.size() >= maxRowsBeforeFlush) {
requiresFlush = true;
}
}
uncommittedCount++;
}
uncommittedCount++;
}
break;
case UPDATE:
case DELETE:
default:
break;
case UPDATE:
case DELETE:
default:
flush();
writeDefault(data);
break;
}

if (requiresFlush) {
flush();
writeDefault(data);
break;
}

if (requiresFlush) {
flush();
}

checkForEarlyCommit();
}

checkForEarlyCommit();
}

/**
@@ -333,16 +341,20 @@ protected void flush() {

@Override
public void end(Table table) {
flush();
super.end(table);
if (isLoadOnly() && isSymmetricTable(targetTable.getNameLowerCase())) {
super.end(table);
} else {
flush();
super.end(table);
}
}

protected void buildBulkDataType(int typeCode) {
String typeName = getTypeName(typeCode);
if (getPlatform().getSqlTemplate().queryForInt(
if (getPlatform("-1").getSqlTemplate().queryForInt(
"select count(*) from user_types where type_name=?", typeName) == 0) {
final String DDL = "create or replace type %s is table of %s";
getPlatform().getSqlTemplate().update(
getPlatform("-1").getSqlTemplate().update(
String.format(DDL, getTypeName(typeCode), getMappedType(typeCode)));
}
}
@@ -437,7 +449,7 @@ protected String buildProcedureName(String dmlAbbrev, Table table) {

protected void buildBulkInsertProcedure(Table table) {
String procedureName = buildProcedureName("i", table);
if (getPlatform().getSqlTemplate().queryForInt(
if (getPlatform("-1").getSqlTemplate().queryForInt(
"select count(*) from user_procedures where object_name=?", procedureName) == 0) {
List<Column> columns = getBulkLoadableColumns(table);
// needed for error codes

0 comments on commit 1bae33f

Please sign in to comment.
You can’t perform that action at this time.