New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
NIFI-3704: Add PutDatabaseRecord processor #1677
Conversation
An end-to-end flow can be time-consuming to set up, I have a template as a Gist here. |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I've run your example Gist and also my custom flow.
I left few comments. Please take a look
session.transfer(flowFile, REL_FAILURE); | ||
} | ||
} | ||
session.transfer(flowFile, REL_SUCCESS); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I got
org.apache.nifi.processor.exception.FlowFileHandlingException: StandardFlowFileRecord[uuid=....] is not the most recent version of this FlowFile within this session
Exception here when an incoming FlowFile does not have sql value the FlowFile is routed to 'REL_FAILURE' above, but routed to 'REL_SUCCESS', too.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Oops! Good catch, will fix and add a unit test
.expressionLanguageSupported(true) | ||
.build(); | ||
|
||
static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Is this BATCH_SIZE
used somewhere? I couldn't find usage of it.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Nope, another copy-paste error, will remove
|
||
static final AllowableValue IGNORE_UNMATCHED_FIELD = new AllowableValue("Ignore Unmatched Fields", "Ignore Unmatched Fields", | ||
"Any field in the document that cannot be mapped to a column in the database is ignored"); | ||
static final AllowableValue FAIL_UNMATCHED_FIELD = new AllowableValue("Fail", "Fail", |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Other AllowableValues are named as 'XXX Unmatched Fields' or 'XXX on Unmatched Columns' but this value is just 'Fail'. Probably better to be consistent.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree, I copied these from ConvertJSONToSQL, but can improve this here.
return; | ||
} | ||
|
||
final boolean includePrimaryKeys = UPDATE_TYPE.equals(statementType) && updateKeys == null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
When I tried to update a table which has a primary key column, I got:
routing to failure:
org.apache.nifi.processor.exception.ProcessException: Table 'xxx' does not have a Primary Key and no Update Keys were specified
This UPDATE_TYPE.equals(statementType)
should use equalsIgnoreCase as other code do.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Agree, another copy-paste error on my part, will change
442d6ad
to
ee7a4c5
Compare
@mattyb149 Thanks for the updated commit. I confirmed that my comments are incorporated. It works as expected in most cases, however, I found a case which needs to be addressed. Let's say I have a table with a primary key like this: create table tutorials_tbl(
tutorial_id INT NOT NULL,
tutorial_title VARCHAR(100) NOT NULL,
tutorial_author VARCHAR(40) NOT NULL,
submission_date DATE,
PRIMARY KEY ( tutorial_id )
); And insert some rows, then update a row, especially update its primary key: update tutorials_tbl set tutorial_id = 110 where tutorial_id = 11; Above update query generates following JSON via CaptureChangeMySQL: { "type" : "update",
"timestamp" : 1492648209000,
"binlog_filename" : "mysql-server-bin.000004",
"binlog_position" : 97152,
"database" : "nifi_test",
"table_name" : "tutorials_tbl",
"table_id" : 222,
"columns" : [ {
"id" : 1, "name" : "tutorial_id", "column_type" : 4, "last_value" : 11, "value" : 110
}, {
"id" : 2, "name" : "tutorial_title", "column_type" : 12, "last_value" : "11th", "value" : "11th"
}, {
"id" : 3, "name" : "tutorial_author", "column_type" : 12, "last_value" : "koji", "value" : "koji"
}, {
"id" : 4, "name" : "submission_date", "column_type" : 91, "last_value" : null, "value" : null
} ]}
[ { "tutorial_id" : 110, "tutorial_title" : "11th", "tutorial_author" : "koji", "submission_date" : null } ] Finally, PutDatabaseRecord generates an update sql statement with We might be able to handle this by generating two delete and insert records in a NiFi flow, or do something smart at PutDatabaseRecord. |
Hmm, would this be a problem with Update Keys in general, or only Primary Keys when Update Keys are not specified? Either way, should we be including (and expecting) last_value when the statement type is update? |
ee7a4c5
to
f776d51
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Hi @mattyb149 , I confirmed my previous comments are incorporated, and for the primary key updates, I think it's kind of a corner-case, so I'm fine with the updated documentation warning about it.
During my test, I found other things that I'd like to discuss with you. Please find the comments.
Also, I found that CaptureChangeMySQL does not capture truncate table
event. I raised a JIRA for this.
https://issues.apache.org/jira/browse/NIFI-3728
Thanks!
.description("Successfully created FlowFile from SQL query result set.") | ||
.build(); | ||
|
||
static final Relationship REL_RETRY = new Relationship.Builder() |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
REL_RETRY
is not used. Do we still want this relationship?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Yes, I forgot to distinguish between SQLNonTransientException and other SQL exceptions, my intent is to handle errors the same way PutSQL does. Will update to use REL_RETRY when appropriate.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this processor support Rollback on Failure? If so, perhaps we should wait on #1658 until this is merged?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Great idea, enabling Rollback on Failure with PutDatabaseRecord would be helpful. Let's do that after we finalize PutDatabaseRecord review process. I will rebase #1658 to include PutDatabaseRecord once this get merged.
Object sql = currentRecord.getValue(sqlField); | ||
if (sql != null && !StringUtils.isEmpty((String) sql)) { | ||
// Execute the statement as-is | ||
s.execute((String) sql); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In an end-to-end CDC flow, we would use CaptureChangeMySQL to get database events to stream into this PutDatabaseRecord as your template shared on Gist does. Those database events include SQL statements such as begin
and commit
, and such statements are executed literary here.
I think it would be great if users can configure CaptureChangeMySQL whether to emit begin
and commit
event or not, as these statements do not have significant meaning for synchronizing change since CaptureChangeMySQL emits FlowFile per updated record, but just for EnforceOrder to order events correctly.
If we can eliminate these events, we can minimize the number of FlowFiles which would lead us to a better performance.
Also, (for MySQL at least) begin
and commit
is not a database or table specific event. When I replicate changes from table A to table B using CaptureChangeMySQL and PutDatabaseRecord, I saw following behavior and felt begin
and commit
are a little bit disturbing:
- Insert a row into table A
- 3 events are emitted, begin, insert A and commit. Then CaptureChangeMySQL emitted 3 FlowFiles.
- PutDatabaseRecord execute 3 SQLs. begin, insert B and commit.
- Another 3 events are emitted derived from
insert B
via MySQL bin-log, begin, insert B and commit. - Since I have configured CaptureChangeMySQL only listens to
table A
, it only emitted 2 FlowFiles, begin and commit. - PutDatabaseRecord executes the begin and commit SQL.
Do we have to take these "begin" and "commit" statements into account for a CDC flow?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I agree that it should be configurable by the user whether to include begin and commit directives. For step 5 above, the "begin" event is received before the table map event, and as you point out, it is not associated to a table-specific event, so it is not filtered out. I have written up NIFI-3730 to address this.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I reviewed and merged NIFI-3730, thanks!
6090fd0
to
6ef1d31
Compare
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@mattyb149 Thanks for updating this PR. I tested error scenarios to see how failure and retry relationships work and added few more comments. Please check.
} | ||
} | ||
} catch (final MalformedRecordException | SchemaNotFoundException | IOException e) { | ||
throw new ProcessException("Failed to determine schema of data records for " + flowFile, e); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
This ProcessException will be caught by the outside catch block which logs the exception and yields context, but does not re-throw it nor route incoming FlowFile to anywhere. As a result FlowFileHandlingException is thrown. This is reproducible by specifying non-existing schema name.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Confirmed that a FlowFile is routed to 'failure' when a schema is not found with specified name. Thanks!
} | ||
|
||
log.debug("Executing query {}", new Object[]{sqlHolder}); | ||
ps.executeBatch(); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I assumed that this processor can be used not only for CDC use-cases, but also for simply ingesting multiple records as a single batch insertion. It works as expected for normal cases. However when I tested with a FlowFile containing multiple records mixed with successful and failure records, a BatchUpdateException was thrown, and the incoming FlowFile was routed to "retry". Database table has records without the failed ones.
This would cause data duplication if the same FlowFile is processed again, as it inserts the successful records again (it's possible depends on primary/unique key setting).
I put nifi template and SQLs in this Gist to reproduce this behavior.
I would expect this processor to rollback without commit records partially. Or create FlowFiles for each destination relationship containing associated records. How do you think?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I think we should rollback RDBMS session and add exception details to FlowFile attribute then route it to 'retry' so that it can be analyzed and retry later. As an example, I got following exception:
java.sql.BatchUpdateException: Duplicate entry '2' for key 'PRIMARY'
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Confirmed that a RDBMS transaction is rolled back when any row failed to be updated. Thanks!
…ions on Update records
…aNotFoundException
6ef1d31
to
e1db55a
Compare
@mattyb149 I reviewed the updates and tested CDC scenario, that became much simpler without begin/commit and DDL events. Also, non CDC scenario seems to work as expected. +1 I will squash and merge this PR into master. Thanks for your tremendous effort for realizing entire CDC flow! |
Thank you for submitting a contribution to Apache NiFi.
In order to streamline the review of the contribution we ask you
to ensure the following steps have been taken:
For all changes:
Is there a JIRA ticket associated with this PR? Is it referenced
in the commit message?
Does your PR title start with NIFI-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
Has your PR been rebased against the latest commit within the target branch (typically master)?
Is your initial contribution a single, squashed commit?
For code changes:
For documentation related changes:
Note:
Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible.