Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SAMZA-1423: Implement time series storage for joins and windows #303

Closed
wants to merge 16 commits into from

Conversation

vjagadish1989
Copy link
Contributor

Notable changes:

  • New interface for storing and retrieving time series data.
  • New store and serde implementation for use in windows and joins

Pending:

  • Documentation, and minor clean-ups
  • Wire-up of stores from ExecutionPlanner
  • Usage of the store to implement various windows and joins

@vjagadish1989
Copy link
Contributor Author

@prateekm @nickpan47 @jmakes @xinyuiscool - Please review :-)

Copy link
Contributor

@jmakes jmakes left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'll try to take a deeper look later, but here were a couple things that stood out at first glance.

*
* @param <V> the type of value returned by this iterator
*/
public interface ClosableIterator<V> extends Iterator<V> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like it would be better to leverage the java Closeable interface so we can support AutoCloseable for try-with-resources
https://docs.oracle.com/javase/7/docs/api/java/lang/AutoCloseable.html

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. While the AutoClosable idiom is useful for file handles, it expects the caller to catch the checked Exception each time they call iterator.close(). Almost always, In Samza we 'd want to shutdown when there's an exception during iterator.close(), So, a runtime exception makes more sense.

  2. Furthermore, iterator.close() on our existing KVStore API does not throw any checked Exception. I'd prefer to lean on the side of consistency, and refactor it later if there's enough consensus. Happy to discuss offline, and follow-up!

*/
public class TimeSeriesValue<V> {
private final V value;
private final Long timestamp;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this timestamp always match the timestamp in the key? If so, does it need to be in both the key and the value?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There are no such requirements.

  • On the write side, you'd call put(k, v, timestamp)
  • On the query side, you ask the store to give you all values for a key within a timestamp range. The return value is an iterator that returns the value along with the timestamp they were added to the store.

There's no need to store the timestamp in both the key and the value.

/**
* An iterator that can be closed.
*
* <p> Implement close to free resources assigned to the iterator such as open file handles, persistent state etc.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prefer putting the paragraph tag on the preceding empty line for readability, here and elsewhere.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good find , I will fix it uniformly everywhere! :)

private final K key;
private final long timestamp;

// allows a maximum of 2 Billion entries per window per key. maybe, long?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

int should be fine, can remove comment.

@Override
public String toString() {
return "TimeSeriesKey{" +
"key=" + key +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: Indentation. Maybe String.format?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wil fix, thanks!

int seqNum = timeSeriesKey.getSeqNum();

byte[] serializedKey = keySerde.toBytes(key);
int keySize = serializedKey == null ? 0 : serializedKey.length;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we allow null key bytes? Are there cases where timestamp + seq is sufficient?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Null bytes for key are needed to implement non-keyed tumbling and session windows. Basically, in that case, the start range of the timestamp will uniquely identify a window. The start range of the timestamp in combination with the sequence number will uniquely identify an entry in the store

* @param <K> the type of key in the store
* @param <V> the type of value in the store
*/

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: Delete newline.

}

@Override
public TimeSeriesValue<V> next() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this return the key as well? See question about range.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since all access to the store are based on a key K and a timestamp, it's not necessary for the key to be returned again as a part of the iterator.

*
* @param <V> the type of the value
*/
public class TimeSeriesValue<V> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use the KV class from the serde PR when it's available?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Well, I ended up renaming the TimeSeriesValue<V> to TimestampedValue<V>.

In general, prefer new classes (with their own semantic meanings) over KV<K, V> or Pair<K,V>.

* Since timestamps are at the granularity of milliseconds, multiple entries added in the same
* millisecond are distinguished by a monotonically increasing sequence number.
*/
private int seqNum = 0;
Copy link
Contributor

@prateekm prateekm Sep 26, 2017

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's a potential issue in this implementation with ordering of values for the same key in the same millisecond when the seqNum wraps.

I think we should be OK though: We get 2G seq nums before we wrap. Given that we have 8 byte timestamp + 4 byte seqNum overhead per key, this is at the very minimum (12 bytes * 2 G) = 24 GB for a store with empty keys and values. Even an (int, int) key value store would have 20 * 2 = 40 GB storage at a minimum (there's additional rocksDB ttl timestamp and write amplification overhead), and this is per TaskInstance.

Would prefer changing this to an AtomicInteger instead of a int + synchronization.

Just throwing it out there since we were also considering storing a version number with the key: with a 4 byte overhead per key, we can use 7 LSB from an AtomicLong seq num + a 1 byte version num to solve both issues.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thank you for the suggestion.

After discussion we agreed the following:

  • The wrapping is a non-issue if we use an atomic long
  • We'll use a version for the MSB, and use the next 7 bytes for storing the long value in the sequence number


TimeSeriesKey<?> that = (TimeSeriesKey<?>) o;

if (timestamp != that.timestamp) return false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call.

*
* @param <V> the type of value returned by this iterator
*/
public interface ClosableIterator<V> extends Iterator<V> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use KeyValueIterator instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We agreed that since all accesses to the store are by a particular key anyway, there's no need to return a KVIterator. The user of the store knows the key when methods are invoked.

}

@Override
public void remove(K key, Long startTimestamp, Long endTimeStamp) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add a remove(K key) too?

Copy link
Contributor

@prateekm prateekm left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM, with one question about version mismatch handling. Thanks!

return seqNum;
}


Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Delete extra newline.

public class TimeSeriesKeySerde<K> implements Serde<TimeSeriesKey<K>> {

private static final long SEQUENCE_NUM_MASK = 0x00ffffffffffffffL;

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Delete newline.

}

long timeStamp = buf.getLong();
long seqNum = buf.getLong();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This relies on version being 0. Safer to do the right thing here now.
Q: What should happen if the version doesn't match the field in the class? Should we throw here?

* @param key the key to look up in the store
* @throws IllegalArgumentException when startTimeStamp &gt; endTimeStamp, or when either of them is negative
*/
void remove(K key);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe add the same version for get? Or can remove this one too, now that I know that we don't need to use this for join.

* +----------------------+--------8 bytes----+----1 bytes-+---------7 bytes--------+----value size----------
* +----------------------------------STORE KEY-------------------------------------+---STORE VAL-----------+
* </pre>
* An 8 byte timestamp, a one byte version and a 7 byte sequence number are appended to the provided key and this
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: Indent lines 66-69 by -1.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for catching this!

* <p>
* A {@link TimeSeriesStoreImpl} can be backed by persistent stores like rocksDB, in-memory stores, change-logged
* stores, cached stores (or any combination of these).
*
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: What I meant to say was that we don't need newline if the p tag is on its own line (here and everywhere else).

/**
* An immutable pair of a value, and its corresponding timestamp.
*
* <p> Iterators on {@link TimeSeriesStore}s always return {@link TimestampedValue}s
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can remove this sentence.

* An 8 byte timestamp, a one byte version and a 7 byte sequence number are appended to the provided key and this
* combination is used as the key in the k-v store. The provided value is stored as is.
*
* <p> This class is thread-safe and concurrent reads/writes are expected.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Does the KVStore API/impl provide read after write consistency across threads as well? I.e. do we have any guarantees about when a value written by T1 will be visible to T2?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The RocksDb docs state that the store is "thread-safe". We'll have to find out more on whether they mean:

  • Concurrent operations on the store will not corrupt state
  • Operations on the store "happen-before" other operations? That directly translates to visibility across threads.

[2] is a much stronger guarantee on [1]. I'd expect us to support [2]

* Returns an iterator over values for the given key in the provided time-range - [{@code startTimestamp}, {@code endTimestamp})
*
* Values returned by the iterator are ordered by their timestamp. Values with the same timestamp are
* returned in their order of insertion. The iterator must be closed after use by calling {@link #close}.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

IMHO the "Iterator must be closed" warning is worth putting on a newline and putting a <b> around must. Forgetting to do so will lead to memory leaks.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 for the extra emphasis

Copy link
Contributor

@xinyuiscool xinyuiscool left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LGTM. Have a few comments.

@@ -167,6 +167,7 @@ project(":samza-core_$scalaVersion") {
compile "org.scala-lang:scala-library:$scalaLibVersion"
compile "org.slf4j:slf4j-api:$slf4jVersion"
testCompile project(":samza-api").sourceSets.test.output
testCompile project(":samza-kv-rocksdb_$scalaVersion")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why core depends on kv-rocksDb? Seems this will cause circular dependency. I would prefer the test cases added to kv-rocksdb to avoid this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would mean that all tests for operators (window/ join) that instantiate stores would now be a part of samza-kv-rocksdb. Let's sync up offline since the scope of that seems to be larger than this PR.

Also, I believe circular dependencies in testCompile tasks are fine in gradle.

return timestamp;
}

public byte getVersion() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems a little bit weird that it's a member fn to return the static val.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, will fix it!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xinyuiscool I accidentally committed this without addressing this feedback. However, I ended up doing a follow-up patch that addresses your comment - 92894e5

* @param <K> the type of key in the store
* @param <V> the type of value in the store
*/
public interface TimeSeriesStore<K, V> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should V be something extends fromTimestampedValue?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really, <V> should be completely agnostic to types here for this to be a general purpose API. However, the return type from get should be an Iterator<TimeStampedValue<V>> (as it's being done)

}

@Override
public ClosableIterator<TimestampedValue<V>> get(K key, long startTimestamp, long endTimestamp) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why not close the iterator automatically when it reaches to the end? Now we have to remind user to close it after using.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  • Closing iterators automatically means that iterator.hasNext() is stateful - The invocation of hasNext immediately after the iterator has been closed will return an InvalidStateException. It also forces that hasNext can only return false once.

  • Alternately, If we choose to return false during hasNext(), we should have a way to distinguish these 2 cases:
    false returned due to no elements to iterate upon
    false returned due to iterator being closed.

For these reasons, and to maintain consistency with our already existing iterator APIs that are built on top of RocksDb, I'd prefer to avoid auto-closing iterators.

}
}

private static class TimeSeriesStoreIterator<K, V> implements ClosableIterator<TimestampedValue<V>> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

as mentioned above, is it possible to automatically close it when it reaches the end?

@asfgit asfgit closed this in 56d564c Oct 2, 2017
long version = seqNum & ~SEQUENCE_NUM_MASK;

if (version != 0) {
throw new SamzaException("Version is not zero. Sequence number: " + seqNum);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check should be version != TimeSeriesKey.VERSION.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
5 participants