New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
[NiFi-3973] Add PutKudu Processor for ingesting data to Kudu (2nd) #2020
Conversation
this is a continuation of work from #1874 |
|
||
protected KuduClient kuduClient = null; | ||
protected KuduTable kuduTable = null; | ||
protected KuduSession kuduSession = null; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
is a kudu client/table/session object tolerant of multi threading? If someone gives this processor more than one thread it will be using these same objects multiple times as-is. If they do not allow multi threading you'll want to have these objects pooled, or made on demand, or restrict the processor to serial execution only.
|
||
kuduClient = (kuduClient == null) ? getKuduConnection(kuduMasters) : kuduClient; | ||
kuduTable = (kuduTable == null) ? kuduClient.openTable(tableName) : kuduTable; | ||
kuduSession = (kuduSession == null) ? kuduClient.newSession() : kuduSession; |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
what if the session were to die while the processor was executing? Do these objects represent connections? If yes we probably need to do this object lazily/on-demand triggered during an actual process execution rather than onScheduled. If not, and the connection were to die then the processor would require manual restarting.
@joewitt , thanks for the goods points. I looked at PutCassandra, PutHBase, PutKite, to build this PutKudu, and so forget to move those objects to onTrigger when following your suggestion, PutParquet as example. I just fixed it, please review and let me know what else I should adjust? |
will have to take more of a look. As a general practice to help reviewers it is probably best during the review/feedback phase to just add new commits rather than force commit on top of it. This helps to see/discuss progress rather than starting over. No biggie just bringing it up to help you help your reviewers. At this stage I need an instance of Kudu to test against. Did some looking into it last night and it looked like it was going to be a bit involved than I had time for. So hopefully myself or someone else familiar with kudu can help more soon. You've done some really nice work in here for sure. As for other processors with similar patterns the gotcha can be whether their clients auto reconnect, are thread safe, etc.. Each situation can differ quite a lot. Thanks |
@joewitt , my apologize again. I'm still new to the process, so mess it up a bit. I did some manual test with 2 diff Kudu env, and with CSV file as a source. Just in case, you or other reviewers ask about GetKudu processor, I will add one after this PutKudu get approved. |
Hi @joewitt , there is a sandbox that has Kudu instance (and related components) that you can quickly spin up a VM and test the processor. https://kudu.apache.org/docs/quickstart.html |
row.addBinary(colIdx, record.getAsString(colName).getBytes()); | ||
break; | ||
case INT8: | ||
case INT16: |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we write this as a short
using row.addShort(..)
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Would be useful to allow users to Upsert
as well as Insert
. This would ideally be configurable via the processor's properties.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
case INT8:
case INT16:
I got to throw an exception, because the RecordReader doesn't have getAsShort(), only getAsInt(), and if I use getAsInt() for Int8 and Int16, it will throw an exception.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
You can cast it from int
to short
before calling addShort(..)
. If the user is using an int
field to store a value inclusive of a short
range then it's safe to cast. If the user is storing something outside the range of a short
then an Exception
will be thrown. I think this is a better approach since it allows users to utilize the INT8
and INT16
column types if they are able to guarantee that they're in contract with those value ranges.
final Map<String,String> attributes = new HashMap<String, String>(); | ||
final AtomicReference<Throwable> exceptionHolder = new AtomicReference<>(null); | ||
final RecordReaderFactory recordReaderFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class); | ||
final KuduClient kuduClient = getKuduConnection(kuduMasters); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Could we instead create the client during an OnEnabled
method so that we don't build a new KuduClient
for each incoming FlowFile
?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
He had it in on enabled previously. I encouraged moving to a lazy creation model because otherwise if the connection breaks there was no logic to restore it without manual intervention. It should not happen per flow file either though of course. Some sort of connection pooling would be appropriate. I dont know enough about the kudu connection client logic to have better advice there but onEnabled is not great and per flowfile is probably not either.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
ah okay, Joe. Good point, I'm not sure if the client auto reconnects or not. This might be worth investigating @cammachusa ...or we can leave it like now.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We may need to at least close the client at the end then.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
The client itself is already a "lazy connection" model of sorts -- it'll manage its own connection pooling, is threadsafe, etc, so no need to do any additional pooling or reconnects on exceptions.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
thanks, @toddlipcon . So it's safe to create it once and reuse it?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
@rickysaltzer and @toddlipcon , since KuduClient (connection) is threadsafe, we can put it on onScheduled (onEnabled). But, I don't know if I should put KuduTable and KuduSession on the onScheduled as well? Leaving them at onTrigger which means every flowfile it has to open the table , and create a new session. What would you recommend?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
KuduClient and KuduTable are safe to create once and not recreate. KuduSession is not thread safe, though, so each individual thread that is doing work should create one and close it when it is done. These are relatively light-weight, though (they dont require new network connections or anything).
final RecordReaderFactory recordReaderFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class); | ||
final KuduClient kuduClient = getKuduConnection(kuduMasters); | ||
final KuduTable kuduTable = getKuduTable(kuduClient, tableName); | ||
final KuduSession kuduSession = getKuduSession(kuduClient); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It would be nice if the user could set our session to ignore duplicate rows (e.g. insert-ignore
) rather than updating / upserting them. A new property called Insert Operation
could have Insert
, Insert-Ignore
, and Upsert
. If Insert-Ignore
is set, then it sets setIgnoreAllDuplicateRows
when it creates the session. Further, if Upsert
is set, then it creates a new Upsert
operation instead of Insert
.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added some first pass comments. Overall it looks great! Thanks again for your PR.
Look like, i messed up the history again. I just pushed a new commit to the branch, then comments gone :( |
@@ -84,6 +86,14 @@ | |||
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR) | |||
.build(); | |||
|
|||
protected static final PropertyDescriptor INSERT_OPERATION = new PropertyDescriptor.Builder() | |||
.name("INSERT OPERATION") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Let's just capitalize the words instead of each letter.
protected static final PropertyDescriptor INSERT_OPERATION = new PropertyDescriptor.Builder() | ||
.name("INSERT OPERATION") | ||
.description("Specify operation for this processor. Insert-Ignore will ignore duplicated rows") | ||
.allowableValues("Insert", "Insert-Ignore", "Upsert") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's generally best practice for us to use an Enum instead of a list of strings when using allowableValues
..it mainly just lends itself to cleaner looking code later down the road.
skipHeadLine = context.getProperty(SKIP_HEAD_LINE).asBoolean(); | ||
operation = context.getProperty(INSERT_OPERATION).getValue(); | ||
|
||
kuduClient = getKuduConnection(kuduMasters); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Since @toddlipcon confirmed that Kudu uses lazy connections, we should instead revert back to using OnEnabled
so that we only create the connection once when the process is enabled.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Added some additional comments
Hi @joewitt and @rickysaltzer , I am following the PutParquet to implement the PutKudu, and so have the same dependencies as the PutParquet's, and got it build success. But build on Travis was always fail. And I realized this morning that the reason is Record Reader is not properly referenced and deployed for my PutKudu (I deployed it manually to test the PutKudu first). I checked the log, and here is the message: "ClassNotFoundException: org.apache.nifi.serialization.RecordReaderFactory" |
@joewitt might be the best person to answer @cammachusa's question regarding Travis. I seem to recall that it can be finicky, but not 100% the current state of CI stability. |
@joewitt , would you please help? I couldn't start nifi, it keep saying "ClassNotFoundException: org.apache.nifi.serialization.RecordReaderFactory" . I can build it without any error, but when starting nifi, ... Some of my dependency on RecordReader may not be correct, or missing something? Thanks |
Hi @rickysaltzer , I have figured out the Record Reader reference issue, and fixed it. Would you please continue to review the code? |
I will continue reviewing later today or tomorrow, thanks for your patience. |
protected static final PropertyDescriptor INSERT_OPERATION = new PropertyDescriptor.Builder() | ||
.name("Insert Operation") | ||
.description("Specify operation for this processor. Insert-Ignore will ignore duplicated rows") | ||
.allowableValues(OperationType.INSERT.toString(), OperationType.INSERT_IGNORE.toString(), OperationType.UPSERT.toString()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Little trick to note, you can use OperationType.values()
. You don't need to change though
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
👍 It makes the code clean. Why not :-)
break; | ||
case INT8: | ||
case INT16: | ||
short temp = Short.parseShort(record.getAsInt(colName).toString()); |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
It's a bit inefficient to cast an int to string and then back to short. Instead you can just cast it like...
short temp = (short) record.getAsInt(colName)
If the value is out of range of a short
it will just return as -1
. If you try to parse the short as a string, it can cause an exception to be thrown (java.lang.NumberFormatException
).
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I tried it already, but it throw error: "error: incompatible types: Integer cannot be converted to short"
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Now I understand why; because getAsInt(). return Integer obj. So what about: (short) record.getAsInt(colName).intValue()?
protected KuduClient kuduClient; | ||
protected KuduTable kuduTable; | ||
|
||
@OnScheduled |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I still think we should be doing this OnEnabled
since Kudu manages the connections for us. If we do it OnSchedule
, then we end up creating the client every time a record is passed through.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Sorry, I forget to change it
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
A couple things left to address, and then I will pull down locally to test.
I've got your changes locally to do some testing, thanks! |
Still seeing this error - what did you do to fix it?
|
sorry @rickysaltzer , I was rush, so didn't run the test when switch to onEnabled. Interestingly!, the OnEnabled is not being called, so objects defined in it are all null when running test cases. Maybe I should have both OnEnabled (for KuduClient and KuduTable), and OnScheduled for the rest of variables. Any advise? |
@cammachusa thanks! I think maybe you need to change the arguments given to your method once you use |
Hi @rickysaltzer , switching to ConfigurationContext still doesn't work, all still null. As I noticed that none of Put/Getxxx processors (e.g. PutHBase, PutParquet, PutCassandra) uses OnEnable, they use OnSchedule instead. And, as I understand OnScheduled only get kicked off once (can be every 5 minutes or 10, depend on user's conf), but onTrigger will be triggered several times depend on how many flowfile you have in one schedule. @joewitt may mean OnSchedule, not OnEnabled, because I never used OnEnable to my PutKudu (recall his comments a couple week ago about thread safe of Kudu client). |
@rickysaltzer any comments? |
Hey @cammachusa - I will take a look today..been traveling. |
@rickysaltzer , remind :-) |
@cammachusa thanks for the reminder :) - I've been a bit busy due to business travel. |
I've made some changes that allows the tests to run successfully. I'm going to test out this in an actual NiFi installation and then provide you my changes. Unfortunately this may not happen today due to my schedule. |
@cammachusa seems I can't update this PR with my own code. My changes are below so you can merge them in and then push them again. I was able to get a sample pipeline working locally which is great. I want to do a little bit more testing before we merge it in. |
The last commit, I switched to onScheduled already. Don't know why you don't get it :( . Thanks for your review and code @rickysaltzer . There are some code style check errors. I'm on it now. |
@cammachusa I'm having a hard time getting your new commits...as well as even understanding how I got them in the first place. @joewitt am I missing something regarding getting the new commits? I'm running a fetch on the repo located here on github. |
False alarm, I got it now :) |
@rickysaltzer , hahaha. When will you close this PR, I am waiting for your review every day :=) |
I am going to be going through my final rounds of testing, so by the end of this week, I promise :) |
@cammachusa - I updated my branch with a rebase, will you pull it down, make sure things are good for you and then force push it to this review? Thanks!! Link: rickysaltzer@1101ffc |
@rickysaltzer , I looked at your branch, it's identical with my branch. So sure, it will work. Just to clarify, what you mean by "pull it down, ... and force push it to this review"? Does that mean, pull your branch, and merge to mine, and push to Pull Request? Or pull your branch, and push to this PR, to clean up all previous commits to have just one? If the second, I haven't done cross push, can you help with what git commands I should run to "cross push" and get your branch? |
You could push my force push my commit to this PR and that'll be fine, too. |
@rickysaltzer , I don't know how to do that, can you give me some detail guidance? (from getting your commit to pushing to this PR) |
hey @cammachusa sorry for gap in time, been pretty busy. I don't know if this is the best way to do it, but here's a shot :) First reset your
Add my repo as a remote
Fetch my repo and branches
Merge my rebase onto your branch.
Make sure you see the following commit for your latest
|
@rickysaltzer thanks for the guide :-) Working on it now |
@@ -31,7 +31,7 @@ | |||
<bundle> | |||
<group>org.apache.nifi</group> | |||
<artifact>nifi-standard-nar</artifact> | |||
<version>1.3.0-SNAPSHOT</version> | |||
<version>1.4.0-SNAPSHOT</version> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I'm reverting this..not sure why this is changed.
@@ -33,7 +33,7 @@ | |||
<bundle> | |||
<group>org.apache.nifi</group> | |||
<artifact>nifi-standard-nar</artifact> | |||
<version>1.3.0-SNAPSHOT</version> | |||
<version>1.4.0-SNAPSHOT</version> |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Same here
revert finger print version back to 1.3
@rickysaltzer , I though the version should be consistence for all, so. Anyway, I revert it 1.3 for the files above, and rebase to have 1 commit. Can we try to close this PR by the ween :-) ? it has been a long... time :) |
I committed the changes below Thanks for your patience, and hope to see another contribution in the future! Ricky |
Thanks @rickysaltzer , I didn't mean to rush you :-) . It's just because I have to report to my boss, since it's the end of the sprint. It was nice working with you, and sure I will contribute more to the community. |
@cammach @cammachusa please close this PR since it has been merged. |
@cammachusa did you happen to write the GetKudu processor that you were talking about in this thread? Thx! Greg |
…mits)
Thank you for submitting a contribution to Apache NiFi.
In order to streamline the review of the contribution we ask you
to ensure the following steps have been taken:
For all changes:
Is there a JIRA ticket associated with this PR? Is it referenced
in the commit message?
Does your PR title start with NIFI-XXXX where XXXX is the JIRA number you are trying to resolve? Pay particular attention to the hyphen "-" character.
Has your PR been rebased against the latest commit within the target branch (typically master)?
Is your initial contribution a single, squashed commit?
For code changes:
For documentation related changes:
Note:
Please ensure that once the PR is submitted, you check travis-ci for build issues and submit an update to your PR as soon as possible.