New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Implement ProcessingTimeTimeout for RocksDBStateStore #1

Closed
chitralverma opened this Issue Sep 24, 2018 · 5 comments

Comments

2 participants
@chitralverma
Copy link
Contributor

chitralverma commented Sep 24, 2018

Extending the capabilities of this state store, I propose the feature of state expiration/ timeout based on processing time. This feature is available when you do stateful aggregations using Spark's FlatMapGroupWithState (FMGWS) API.

FMGWS is not very flexible in terms of options and usage and the timeouts require query to progress in order timeout keys. With addition of this feature to RocksDB State Store, key expiration is truly decoupled from the query engine in structured streaming, with the DB itself taking care of TTLs.

Will raise a pull request for this shortly, along with certain test scenarios.

@chermenin chermenin assigned chermenin and chitralverma and unassigned chermenin Oct 23, 2018

@chermenin

This comment has been minimized.

Copy link
Owner

chermenin commented Oct 23, 2018

@chitralverma let me know how is the progress of implementing this feature, please?

@chitralverma

This comment has been minimized.

Copy link
Contributor

chitralverma commented Oct 27, 2018

@chermenin Apologies for the late response, I implemented this feature using Rocks' TTLDB. Although that works as expected, but its comes with certain caveats. The way it works is described below, from their official doc.

(int32_t)Timestamp(creation) is suffixed to values in Put internally
Expired TTL values deleted in compaction only:(Timestamp+ttl<time_now)
Get/Iterator may return expired entries(compaction not run on them yet)
Different TTL may be used during different Opens

So, you see the expiration is bound to compaction. To maintain throughput and performance, Rocks creates multiple files which are compacted based on certain constraints. Now, even though a compaction filter may be manually defined, the entries will not be evicted till db compacts. Over compaction degrades performance. Thus making the features of TTLDB only partially usable and the testing of the process non deterministic.

An alternative I thought of was to create an in memory collection of keys with their respective deadlines, which will be referenced and updated on every get and set. This collection will hold precedence over the compaction based expiration provided by TTLDB and thus it can be ensured that now expired entries are returned while maintaining the performance of statestore operations.

The caveat of the alternative approach is that it moves back closer to the default implementation provided by spark i.e. the HDFSStateStore which maintains in memory maps.

I have a the alternative approach ready, but do let me know your thoughts. Thanks.

@chermenin

This comment has been minimized.

Copy link
Owner

chermenin commented Oct 29, 2018

@chitralverma So, I've looked at your comment and I'd like to say that I like your approach. Well, even if we will keep the keys in memory it will be better then keeping everything there as implemented in HDFSStateStoreProvider. Anyway, can you submit PRs for both of the approaches you described? I think, that they can be good alternatives to each other (more performance with more memory used, or vice versa). We can describe all limitations in the documentation and users will self-decide what to use. What do you think?

@chitralverma

This comment has been minimized.

Copy link
Contributor

chitralverma commented Nov 11, 2018

@chermenin i've raised a pull request for you to review #2 .
Also, there's been a travis failure due to oraclejdk-10 deprecation.

Thanks,

@chitralverma

This comment has been minimized.

Copy link
Contributor

chitralverma commented Dec 10, 2018

merged. closing issue.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment