-
Notifications
You must be signed in to change notification settings - Fork 2.7k
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
NIFI-11402 - PutBigQuery fix for case sensitivity and error handling #7140
Conversation
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for the bugfix. I proposed a unit test that could be added to the PutBigQueryTest to validate the change. Other than that it is +1 from my side.
@@ -303,6 +301,7 @@ private void finishProcessing(ProcessSession session, FlowFile flowFile, StreamW | |||
flowFile = session.putAttribute(flowFile, BigQueryAttributes.JOB_NB_RECORDS_ATTR, isBatch() ? "0" : String.valueOf(appendSuccessCount.get() * recordBatchCount)); | |||
session.penalize(flowFile); | |||
session.transfer(flowFile, REL_FAILURE); | |||
error.set(null); // set error to null for next execution |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Thanks for catching this. What do you think about adding a unit test covering this scenario (failing before the change and passing after it)
@Test
void testNextFlowFileProcessedWhenIntermittentErrorResolved() {
when(writeClient.createWriteStream(isA(CreateWriteStreamRequest.class))).thenReturn(writeStream);
TableSchema myTableSchema = mockTableSchema(FIELD_1_NAME, TableFieldSchema.Type.STRING, FIELD_2_NAME, TableFieldSchema.Type.STRING);
when(writeStream.getTableSchema()).thenReturn(myTableSchema);
when(streamWriter.append(isA(ProtoRows.class), isA(Long.class)))
.thenReturn(ApiFutures.immediateFailedFuture(new StatusRuntimeException(Status.INTERNAL)))
.thenReturn(ApiFutures.immediateFuture(AppendRowsResponse.newBuilder().setAppendResult(mock(AppendRowsResponse.AppendResult.class)).build()));
runner.setProperty(PutBigQuery.RETRY_COUNT, "0");
runner.enqueue(csvContentWithLines(1));
runner.enqueue(csvContentWithLines(1));
runner.run(2);
verify(streamWriter, times(2)).append(any(ProtoRows.class), anyLong());
runner.assertQueueEmpty();
runner.assertTransferCount(PutBigQuery.REL_FAILURE, 1);
runner.assertTransferCount(PutBigQuery.REL_SUCCESS, 1);
}
@@ -434,6 +433,9 @@ private static Map<String, Object> convertMapRecord(Map<String, Object> map) { | |||
Map<String, Object> result = new HashMap<>(); | |||
for (String key : map.keySet()) { | |||
Object obj = map.get(key); | |||
// BigQuery is not case sensitive on the column names but the protobuf message | |||
// expect all column names to be lower case | |||
key = key.toLowerCase(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This is good to know. Thanks.
Thanks for the review @bejancsaba and thanks for providing a unit test, definitely agree that's much better with one, I added it to my PR. Much appreciated. |
This closes #7140. Signed-off-by: Csaba Bejan <bejan.csaba@gmail.com>
Thanks @pvillard31 looks good, merged and backported to support/nifi-1.x as well |
Summary
NIFI-11402
Fix case sensitivity issue, by forcing lower case on the field names.
Fix error handling by setting the error to null after the error has been processed. Otherwise, any subsequent execution would keep failing with the same error and the user would have to create a new instance of the processor.
Tracking
Please complete the following tracking steps prior to pull request creation.
Issue Tracking
Pull Request Tracking
NIFI-00000
NIFI-00000
Pull Request Formatting
main
branchVerification
Please indicate the verification steps performed prior to pull request creation.
Build
mvn clean install -P contrib-check
Licensing
LICENSE
andNOTICE
filesDocumentation