Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SAMZA-1691: Support get iterable from KeyValueStore #492

Closed
wants to merge 2 commits into from

Conversation

xinyuiscool
Copy link
Contributor

Right now for KeyValueStore we have a range query to return an iterator. For usage in BEAM, we need a iterable which will 1) create the snapshot when called, and 2) create an iterator when needed. Add the iterate() function in KeyValueStore to support it. It's implemented as follows:

  1. for rocksDb, it will create the iterator when it's called, which will has a snapshot of the elements. Then every time when the iterator is needed, we will seek the iterator from beginning;

  2. for inMemoryDb, it will create the snapshot submap when iterate() is called. The submap is an iterable and it can return a new iterator when needed.


package org.apache.samza.storage.kv;

public interface KeyValueIterable<K, V> extends Iterable<Entry<K, V>> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does this need a "close" method so that you can close the internal iterator (e.g. tell internal RocksDB iterator to clean up snapshot)?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good question. I was thinking about this when I did the change. The reason that I didn't add the close method in KeyValueIterable is that the KeyValueIterator itself already has a close() method in the API, so it should be the responsibility of KeyValueIterator to close it (might even be confusing if we also have another close() here). Think of this as a factory class that creates the iterator, but close() should be on the iterator itself.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I agree that KeyValueIterator should have the logic for closing the RocksDB snapshot. However, someone needs to call KeyValueIterator.close when it is done being used. If the KeyValueIterable uses the KeyValueIterator, then KeyValueIterable needs to provide a way for someone to call KeyValueIterator.close, or KeyValueIterable needs to call KeyValueIterator.close automatically.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I believe when the users will get hold of KeyValueIterable instances when they call getIterator() from the iterable. So they can close the iterators by themselves.

@vjagadish
Copy link
Contributor

vjagadish commented May 2, 2018

@xinyuiscool : Can you please provide a bit more background on why we need the new API for BEAM? Instead, could the iterator be lazy-loaded where it is used?

@xinyuiscool
Copy link
Contributor Author

@vjagadish : sure. BEAM needs an Iterable interface so that it can create iterators when it's needed (can happen multiple times too). The iterator needs to be set to be the starting position from the snapshot when the iterable was created. So for rocksDb, I reset the position when returning the iterator (no concurrency within a task allowed here), and for inmemory, I creates a submap so I can create new iterator for it every time.

Copy link
Contributor

@sborya sborya left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

few clarifying questions and suggestions.

store.close();
}

private byte[] genKey(ByteArrayOutputStream outputStream, String prefix, int i) throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you need to pass outputStream to this method. It is just used for generation of byte arr, isn't it?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it's easier for me to reuse the outputStream (and the buffer with it), right :)?

store.close();
}

private byte[] genKey(ByteArrayOutputStream outputStream, String prefix, int i) throws Exception {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

same as in the other test.

val list : util.ArrayList[K] = new util.ArrayList[K]()
list.add(from)
list.add(to)
logAccess(DBOperation.ITERATE, serializeKeys(list), store.iterate(from, to))
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why do you need this list? is it the name of the iterator?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, I believe this is for logging every access of the store to a stream.

@@ -160,4 +160,11 @@ class KeyValueStorageEngine[K, V](
}

override def getStoreProperties: StoreProperties = storeProperties

override def iterate(from: K, to: K): KeyValueIterable[K, V] = {
updateTimer(metrics.iterateNs) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

not sure how this works..

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Scala magic :). It's called currying, I finally remembered the name of it :)

@@ -114,4 +114,7 @@ class LoggedStore[K, V](
store.close
}

override def iterate(from: K, to: K): KeyValueIterable[K, V] = {
store.iterate(from, to)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

where this iterate comes from?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I added it to the general interface of KeyValueIterable.

Copy link
Contributor

@vjagadish vjagadish left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @xinyuiscool for the PR.. mostly questions so that I understand this better.

* @return an iterable for the specified key range.
* @throws NullPointerException if null is used for {@code from} or {@code to}.
*/
default KeyValueIterable<K, V> iterate(K from, K to) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. It'd be great to explicitly clarify on the javadocs that this is a snapshot of the store.
  2. Additionally, the name iterate does not make it obvious that it's a point-in-time snapshot and that it's immutable once created. Personally, I would prefer to explicitly specify the API as snapshot():
default KeyValueIterable<K, V> snapshot(K from, K to); 

or even better:

default KeyValueSnapshot<K,V> snapshot(K from, K to);

and KeyValueSnapshot exposes a single iterator() API

KeyValueIterator<K, V> newIterator();

Let me know what you think!
3. Java docs should tie this in context with range() and provide usage examples.
My expectation on usage-semantics of the two APIs would be:
range == Iterate the store once;
snapshot == obtain an immutable snapshot that can be iterated over independently multiple times.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Iterable is a factory class which creates iterator, that's a java standard so I don't want to mess with it. I don't think it's a good idea at all to call it snapshot which implements a java iterable interface. Since it's not returning the snapshot, I don't want to call this function snapshot either. I am willing to take other names if iterate() is not clear enough.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that we snapshot the iterator when the iterable is created from this function, and
the iteration results is guaranteed to reflect the snapshot if only one iterator is in use at a time.

@xinyuiscool : The docs are a bit under-specified since we are mixing implementation notes and API-semantics.

  1. What is the expectation on the Iterable returned by each call to store.iterate(from, to) when the underlying store is modified between the calls - are modifications reflected?

  2. Does iterable.iterator() return a new iterator each time it is called?

  3. Does every iterator returned by iterable.iterator() see the exact same view of the store? traverse from "fromIndex" to "toIndex"?

  4. Can iterators regress? Currently, calling iterable.iterator() will reset the cursor in some cases. I don't think this behavior is desirable. Please do check with @nickpan47 on this as well.

@@ -203,6 +204,18 @@ class RocksDbKeyValueStore(
new RocksDbIterator(iter)
}

def iterate(from: Array[Byte], to: Array[Byte]): KeyValueIterable[Array[Byte], Array[Byte]] = {
//snapshot the iterator
val iter : RocksDbRangeIterator = range(from, to).asInstanceOf[RocksDbRangeIterator]
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Could we leak the actual iterator here? Often, compactions and clean-ups will be delayed until the iterators are closed. So, would be good to cleanly tear this down.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

sorry, I don't know what you mean. An iterable is a way to create iterator as Java doc here: https://docs.oracle.com/javase/8/docs/api/java/lang/Iterable.html. I believe it's the responsibility of the holder of the iterator who should do the close(). Iterable is the factory, why is it concerned about the compaction and clearnup?

new KeyValueIterable[Array[Byte], Array[Byte]] {
def iterator(): KeyValueIterator[Array[Byte], Array[Byte]] = {
// reset to the beginning
iter.seek(from)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I do have a concern with re-purposing the Iterator returned by range() and using it to mint an Iterable here. For example, Would thread-2 calling getIterator end-up resetting the cursor for thread-1? Please let me know.

KeyValueIterable iterable = store.iterate();
thread-1:
Iterator i1 = iterable.getIterator();
iterate using i1;
thread-2:
Iterator i2 = iterable.getIterator();
iterate using i2;

Alternately, RocksDb has a top-level notion of a 'snapshot()'. Does the JNI binding for RocksDb expose it? IIUC, it seems to be achieving exactly what we want.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

NO multithreading in a task in this use case.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I didn't know about snapshot. Right now when I created the iterator, it does the snapshot at that point and it fits the purpose (single thread rocksDb access). To use snapshot, I think I need to do some benchmarking to understand the performance aspect of it. I am not comfortable using it directly.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Looks like we should be able to make this logic work for normal use cases, basically the logic should be:

if (!iter.isClosed() && iter.next() == false) {
// if the cached iterator already goes to the end, we can reset it for reuse
iter.seek(from)
iter
} else {
// in case the cached iter is not done or already closed, create a new one.
range(from, to)
}

Looks like this solution should be okey. As a side note, Flink does similar things for their entries() iterable: https://github.com/apache/flink/blob/master/flink-state-backends/flink-statebackend-rocksdb/src/main/java/org/apache/flink/contrib/streaming/state/RocksDBMapState.java, line 161.

@xinyuiscool
Copy link
Contributor Author

Seems from the rocksDb doc, snapshot is way more expensive than iterator:

A Snapshot API allows an application to create a point-in-time view of a database. The Get and Iterator APIs can be used to read data from a specified snapshot. In a sense, a Snapshot and an Iterator both provide a point-in-time view of the database, but their implementations are different. Short-lived/foreground scans are best done via an iterator while long-running/background scans are better done via a snapshot. An iterator keeps a reference count on all underlying files that correspond to that point-in-time-view of the database - these files are not deleted until the Iterator is released. A snapshot, on the other hand, does not prevent file deletions; instead the compaction process understands the existence of snapshots and promises never to delete a key that is visible in any existing snapshot.

Snapshots are not persisted across database restarts: a reload of the RocksDB library (via a server restart) releases all pre-existing snapshots.

Intead of reusing the same iterator all the time, the current impl will detect multiple live iterators and
create new copies. It still keep the same guarentee that the iterator reflects the snapshot if we only have
one iterator in use.
@asfgit asfgit closed this in fa49e72 May 7, 2018
@@ -112,4 +112,14 @@ class InMemoryKeyValueStore(val metrics: KeyValueStoreMetrics = new KeyValueStor
}
found
}

override def iterate(from: Array[Byte], to: Array[Byte]): KeyValueIterable[Array[Byte], Array[Byte]] = {
// snapshot the iterable
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: we are snapshotting the store here, and not the iterable.

* @return an iterable for the specified key range.
* @throws NullPointerException if null is used for {@code from} or {@code to}.
*/
default KeyValueIterable<K, V> iterate(K from, K to) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note that we snapshot the iterator when the iterable is created from this function, and
the iteration results is guaranteed to reflect the snapshot if only one iterator is in use at a time.

@xinyuiscool : The docs are a bit under-specified since we are mixing implementation notes and API-semantics.

  1. What is the expectation on the Iterable returned by each call to store.iterate(from, to) when the underlying store is modified between the calls - are modifications reflected?

  2. Does iterable.iterator() return a new iterator each time it is called?

  3. Does every iterator returned by iterable.iterator() see the exact same view of the store? traverse from "fromIndex" to "toIndex"?

  4. Can iterators regress? Currently, calling iterable.iterator() will reset the cursor in some cases. I don't think this behavior is desirable. Please do check with @nickpan47 on this as well.

@vjagadish
Copy link
Contributor

vjagadish commented May 9, 2018

To close the loop on this PR. Had an offline discussion with @xinyuiscool, @nickpan47 and @prateekm and we agreed on the following:

  • The need for the BEAM runner is to return an Iterable whose iterators are not impacted by mutations to the underlying store.
  • We will rename the API to snapshot() to actually indicate what its semantics is.
  • We will return an immutable, point-in-time snapshot of the store. We will leverage the RocksDb's db.snapshot() instead of re-purposing db.iterator(). This will also ensure that the behavior is consistent across in-memory and rocks-db stores. The follow-up PR is in https://github.com/apache/samza/pull/510/files .

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
4 participants