Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -641,22 +641,34 @@ private void executeDML(ProcessContext context, ProcessSession session, FlowFile
Record currentRecord;
List<Integer> fieldIndexes = sqlHolder.getFieldIndexes();

while ((currentRecord = recordParser.nextRecord()) != null) {
Object[] values = currentRecord.getValues();
if (values != null) {
if (fieldIndexes != null) {
for (int i = 0; i < fieldIndexes.size(); i++) {
ps.setObject(i + 1, values[fieldIndexes.get(i)]);
}
} else {
// If there's no index map, assume all values are included and set them in order
for (int i = 0; i < values.length; i++) {
ps.setObject(i + 1, values[i]);
}
}
ps.addBatch();
}
}
while ((currentRecord = recordParser.nextRecord()) != null) {
Object[] values = currentRecord.getValues();
if (values != null) {
if (fieldIndexes != null) {
for (int i = 0; i < fieldIndexes.size(); i++) {
// If DELETE type, insert the object twice because of the null check (see generateDelete for details)
if (DELETE_TYPE.equalsIgnoreCase(statementType)) {
ps.setObject(i * 2 + 1, values[fieldIndexes.get(i)]);
ps.setObject(i * 2 + 2, values[fieldIndexes.get(i)]);
} else {
ps.setObject(i + 1, values[fieldIndexes.get(i)]);
}
}
} else {
// If there's no index map, assume all values are included and set them in order
for (int i = 0; i < values.length; i++) {
// If DELETE type, insert the object twice because of the null check (see generateDelete for details)
if (DELETE_TYPE.equalsIgnoreCase(statementType)) {
ps.setObject(i * 2 + 1, values[i]);
ps.setObject(i * 2 + 2, values[i]);
} else {
ps.setObject(i + 1, values[i]);
}
}
}
ps.addBatch();
}
}

log.debug("Executing query {}", new Object[]{sqlHolder});
ps.executeBatch();
Expand Down Expand Up @@ -935,14 +947,20 @@ SqlAndIncludedColumns generateDelete(final RecordSchema recordSchema, final Stri
sqlBuilder.append(" AND ");
}

String columnName;
if (settings.escapeColumnNames) {
sqlBuilder.append(tableSchema.getQuotedIdentifierString())
.append(desc.getColumnName())
.append(tableSchema.getQuotedIdentifierString());
columnName = tableSchema.getQuotedIdentifierString() + desc.getColumnName() + tableSchema.getQuotedIdentifierString();
} else {
sqlBuilder.append(desc.getColumnName());
columnName = desc.getColumnName();
}
sqlBuilder.append(" = ?");
// Need to build a null-safe construct for the WHERE clause, since we are using PreparedStatement and won't know if the values are null. If they are null,
// then the filter should be "column IS null" vs "column = null". Since we don't know whether the value is null, we can use the following construct (from NIFI-3742):
// (column = ? OR (column is null AND ? is null))
sqlBuilder.append("(");
sqlBuilder.append(columnName);
sqlBuilder.append(" = ? OR (");
sqlBuilder.append(columnName);
sqlBuilder.append(" is null AND ? is null))");
includedColumns.add(i);

}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -144,7 +144,7 @@ class TestPutDatabaseRecord {
assertEquals('INSERT INTO PERSONS (id, name, code) VALUES (?,?,?)',
generateInsert(schema, 'PERSONS', tableSchema, settings).sql)

assertEquals('DELETE FROM PERSONS WHERE id = ? AND name = ? AND code = ?',
assertEquals('DELETE FROM PERSONS WHERE (id = ? OR (id is null AND ? is null)) AND (name = ? OR (name is null AND ? is null)) AND (code = ? OR (code is null AND ? is null))',
generateDelete(schema, 'PERSONS', tableSchema, settings).sql)
}
}
Expand Down Expand Up @@ -608,6 +608,51 @@ class TestPutDatabaseRecord {
conn.close()
}

@Test
void testDeleteWithNulls() throws InitializationException, ProcessException, SQLException, IOException {
recreateTable("PERSONS", createPersons)
Connection conn = dbcp.getConnection()
Statement stmt = conn.createStatement()
stmt.execute("INSERT INTO PERSONS VALUES (1,'rec1', 101)")
stmt.execute("INSERT INTO PERSONS VALUES (2,'rec2', null)")
stmt.execute("INSERT INTO PERSONS VALUES (3,'rec3', 103)")
stmt.close()

final MockRecordParser parser = new MockRecordParser()
runner.addControllerService("parser", parser)
runner.enableControllerService(parser)

parser.addSchemaField("id", RecordFieldType.INT)
parser.addSchemaField("name", RecordFieldType.STRING)
parser.addSchemaField("code", RecordFieldType.INT)

parser.addRecord(2, 'rec2', null)

runner.setProperty(PutDatabaseRecord.RECORD_READER_FACTORY, 'parser')
runner.setProperty(PutDatabaseRecord.STATEMENT_TYPE, PutDatabaseRecord.DELETE_TYPE)
runner.setProperty(PutDatabaseRecord.TABLE_NAME, 'PERSONS')

runner.enqueue(new byte[0])
runner.run()

runner.assertTransferCount(PutDatabaseRecord.REL_SUCCESS, 1)
stmt = conn.createStatement()
final ResultSet rs = stmt.executeQuery('SELECT * FROM PERSONS')
assertTrue(rs.next())
assertEquals(1, rs.getInt(1))
assertEquals('rec1', rs.getString(2))
assertEquals(101, rs.getInt(3))
assertTrue(rs.next())
assertEquals(3, rs.getInt(1))
assertEquals('rec3', rs.getString(2))
assertEquals(103, rs.getInt(3))
assertFalse(rs.next())

stmt.close()
conn.close()
}


private void recreateTable(String tableName, String createSQL) throws ProcessException, SQLException {
final Connection conn = dbcp.getConnection()
final Statement stmt = conn.createStatement()
Expand Down