Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-9592][flink-connector-filesystem] added ability to hook file state changing #6824

Closed

Conversation

kent2171
Copy link

@kent2171 kent2171 commented Oct 11, 2018

What is the purpose of the change:

This pull-request adds ability to hook the moment of file state changing

Brief change log:

  • when file is moved from inProgress to pending state the list of pre-configured hooks will be called
  • when file is moved from pending into final state the list of pre-configured hooks will be called

Verifying this change:

The following tests verify that hooks are called in proper time:

  • testThatOnInProgressToPendingCallbackIsFiredWhenFilesAreMovedToPendingStateByTimeout
  • testThatOnInProgressToPendingCallbackIsFiredWhenFilesAreMovedToPendingStateBySize
  • testThatOnInProgressToPendingCallbackIsFiredWhenFunctionIsClosed
  • testThatOnPendingToFinalCallbackIsFiredWhenCheckpointingIsCompleted

Does this pull request potentially affect one of the following parts:

  • Dependencies (does it add or upgrade a dependency): (no)
  • The public API, i.e., is any changed class annotated with @public(Evolving): (no)
  • The serializers: (no)
  • The runtime per-record code paths (performance sensitive): (no)
  • Anything that affects deployment or recovery: JobManager (and its components), Checkpointing, - Yarn/Mesos, ZooKeeper: (no)
  • The S3 file system connector: (yes)

Documentation:

  • Does this pull request introduce a new feature? (yes)
  • If yes, how is the feature documented? Usage information about this feature was added to the description of org.apache.flink.streaming.connectors.fs.bucketing.BucketingSink class

@kent2171 kent2171 changed the title FLINK-9592 [flink-connector-filesystem] added ability to add hooks for file state changing FLINK-9592 [flink-connector-filesystem] added ability to hook file state changing Oct 11, 2018
@pnowojski pnowojski changed the title FLINK-9592 [flink-connector-filesystem] added ability to hook file state changing [FLINK-9592][flink-connector-filesystem] added ability to hook file state changing Oct 11, 2018
@kl0u
Copy link
Contributor

kl0u commented Oct 12, 2018

Hi @kent2171 ,

I had a look at the PR. I also wrote the same comment at the associated JIRA but I also include it here.

In general, as I said earlier, I like the idea of having Callbacks to notify when a file changes state.
As far as the design/implementation of the current PR is concerned, the following are my comments:

  1. The FileStateChangedCallback seems to be pretty limiting, and probably designed with a specific usecase in mind. It assumes that the user would like to do sth with the underlying file system when the file changes state (e.g. write a special file). But other usecases may need to do a REST call, or update a DB, or in general communicate with another system.

Given the above, I would suggest that the function should have an open() and a close() method which are called once and are responsible for allocating and freeing resources. The open() should potentially take the flinkConfig as argument and initialize any long-living resources, e.g. connections to databases, a connection to the filesystem, etc, and the close should be responsible for freeing them. This will allow the sink to accommodate a broader variety of usecases. Now for the methods themselves, I do not yet have a definite answer on what should be included as argument, but I would also include a Context as an argument. This will allow for future-proofing the method, as we will be able to add stuff in the Context if we want to expose more stuff in the future, rather than deprecating the already existing API and creating a new one.

  1. IMPORTANT CONSIDERATIONS to keep in mind: all this is a "best-effort" reporting of state changes, as, for example, if a failure happens after transitioning a file to its "final" state, but before calling the hook, then you will never get the notification. This behavior is aligned with Flink's metric system, where metrics are not checkpointed. In our case though, the scenario described above is more tricky to accommodate as we are talking about integration with external systems.

Let me know what you think about the above!

Copy link
Contributor

@kl0u kl0u left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I expressed my concerns in the comment above.

@kent2171
Copy link
Author

Hi @kl0u , thx for your time

  1. totally agree with you
  2. also agree, that it's very important note

Also I got a question, in the latest releases we got a StreamingFileSink, does it mean, that BucketingSink will be deprecated soon, and if we need any hooks, we should implement them in StreamingFileSink instead of BucketingSink ?

thx !

@kl0u
Copy link
Contributor

kl0u commented Oct 15, 2018

Hi @kent2171 ! I am not so sure if the BucketingSink is going to be deprecated soon. The reason that the new StreamingFileSink for now requires newer Hadoop versions.

But specifically for this new feature, I would say to implement it on top of the new StreamingFileSink, as this is definitely going to be main filesystem sink in the future.

@kent2171
Copy link
Author

ok, will return with the proposal, thx @kl0u

@kent2171 kent2171 closed this Oct 15, 2018
@kl0u
Copy link
Contributor

kl0u commented Oct 15, 2018

Perfect @kent2171 ! Looking forward to your design proposal.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
4 participants