Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

NIFI-4152 Initial commit of ListenTCPRecord #1987

Closed
wants to merge 2 commits into from

Conversation

bbende
Copy link
Contributor

@bbende bbende commented Jul 6, 2017

Adds a ListenTCPRecord processor which can read records from the InputStream of a TCP connection.

Copy link
Contributor

@pvillard31 pvillard31 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Overall it LGTM, left few comments. Also can you rebase against master so that I can run few tests with this new processor?

public class ListenTCPRecord extends AbstractProcessor {

static final PropertyDescriptor PORT = new PropertyDescriptor
.Builder().name("Port")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

displayName() is missing on few properties


static final PropertyDescriptor MAX_CONNECTIONS = new PropertyDescriptor.Builder()
.name("Max Number of TCP Connections")
.description("The maximum number of concurrent TCP connections to accept.")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should we add in the property description the comment you added in the capability description?
"In cases where clients are keeping a connection open, the concurrent tasks for the processor should be adjusted to match the Max Number of TCP Connections allowed, so that there is a task processing each connection."

@bbende
Copy link
Contributor Author

bbende commented Jul 31, 2017

@pvillard31 thanks for reviewing, I just rebased against master and made your suggested changes, let me know of anything else, thanks!

@pvillard31
Copy link
Contributor

Hey @bbende, just built this PR and did some tests.

The template I used is here:
https://gist.github.com/pvillard31/5ecea5932bf70fc622e30be8512601b6

Then I send messages using nc:

$> nc localhost 9876
2016-11-08 21:24:23,029 FINE Yellow
2016-11-08 21:24:23,029 INFO Test Message 1
2016-11-08 21:24:23,029 WARN Red
2016-11-08 21:24:23,029 ERROR Green
2016-11-08 21:24:23,029 FATAL Blue
...

Observations:

  • If I start nc but don't send any message, it'll fail after the read timeout with the following message (and kill my nc connection):
ListenTCPRecord[id=aefaaba3-015d-1000-cecb-dd76899c8193] Error processing records: null: java.net.SocketTimeoutException

I'm not sure if we want to raise a bulletin alert if the source is not sending any message. Thoughts?

  • If I start nc and send messages, but less than the maximum number of records per flow file (1000), then it'll generate a flow file only after 30 seconds without receiving any message (and it'll generate a bulletin).

  • If I start nc and keep sending messages, it'll generate flow files containing exactly the number set for the property "record batch size".

While I understand this is the intended behaviour, I'm not sure if this is clear enough in the description of the processor. In particular, raising a bulletin and killing the connection in case no data is received could seem weird (and it's not similar to the way ListenTCP is working).

Instead of killing the connection, what about just generating a flow file with the amount data available at this moment? Or change the level of the log message to info? And probably improve the message to let the user knows that the connection has been closed because no data has been received since X seconds? Thoughts?

@bbende
Copy link
Contributor Author

bbende commented Aug 7, 2017

@pvillard31 thanks for trying it out... I made some changes so that it should only produce a bulletin when there is an error other than a read timeout, and on read timeouts I made it leave it the connection open and requeue it to try again, so it will only close the connection on other errors.

Also, I found a bug in PutTCP that I introduced in another PR, so I fixed that here as well.

Copy link
Contributor

@pvillard31 pvillard31 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, merged to master, thanks for the changes @bbende

@pvillard31
Copy link
Contributor

Forgot the magic words after rebasing... Can you close the PR @bbende? sorry about that.

@bbende
Copy link
Contributor Author

bbende commented Aug 8, 2017

Thanks! closing...

@bbende bbende closed this Aug 8, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
2 participants