Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SAMZA-1423: Implement time series storage for joins and windows #303

Closed
wants to merge 16 commits into from
1 change: 1 addition & 0 deletions build.gradle
Expand Up @@ -167,6 +167,7 @@ project(":samza-core_$scalaVersion") {
compile "org.scala-lang:scala-library:$scalaLibVersion"
compile "org.slf4j:slf4j-api:$slf4jVersion"
testCompile project(":samza-api").sourceSets.test.output
testCompile project(":samza-kv-rocksdb_$scalaVersion")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why core depends on kv-rocksDb? Seems this will cause circular dependency. I would prefer the test cases added to kv-rocksdb to avoid this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would mean that all tests for operators (window/ join) that instantiate stores would now be a part of samza-kv-rocksdb. Let's sync up offline since the scope of that seems to be larger than this PR.

Also, I believe circular dependencies in testCompile tasks are fine in gradle.

testCompile "junit:junit:$junitVersion"
testCompile "org.mockito:mockito-all:$mockitoVersion"
testCompile "org.powermock:powermock-api-mockito:$powerMockVersion"
Expand Down
@@ -0,0 +1,39 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.samza.storage.kv;

import java.util.Iterator;

/**
* An iterator that can be closed.
*
* <p> Implement close to free resources assigned to the iterator such as open file handles, persistent state etc.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Prefer putting the paragraph tag on the preceding empty line for readability, here and elsewhere.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

good find , I will fix it uniformly everywhere! :)

*
* @param <V> the type of value returned by this iterator
*/
public interface ClosableIterator<V> extends Iterator<V> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like it would be better to leverage the java Closeable interface so we can support AutoCloseable for try-with-resources
https://docs.oracle.com/javase/7/docs/api/java/lang/AutoCloseable.html

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

  1. While the AutoClosable idiom is useful for file handles, it expects the caller to catch the checked Exception each time they call iterator.close(). Almost always, In Samza we 'd want to shutdown when there's an exception during iterator.close(), So, a runtime exception makes more sense.

  2. Furthermore, iterator.close() on our existing KVStore API does not throw any checked Exception. I'd prefer to lean on the side of consistency, and refactor it later if there's enough consensus. Happy to discuss offline, and follow-up!

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we use KeyValueIterator instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We agreed that since all accesses to the store are by a particular key anyway, there's no need to return a KVIterator. The user of the store knows the key when methods are invoked.


/**
* Closes this iterator and frees resources assigned to it.
*
* It is illegal to invoke {@link #next()} and {@link #hasNext()} after an iterator has been closed.
*/
public void close();
}
@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.samza.operators.impl.store;

/**
* The store key used in the {@link TimeSeriesStore} to uniquely identify a row.
*/
public class TimeSeriesKey<K> {

private final K key;
private final long timestamp;

// allows a maximum of 2 Billion entries per window per key. maybe, long?
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

int should be fine, can remove comment.

private final int seqNum;

public TimeSeriesKey(K k, long time, int seq) {
key = k;
timestamp = time;
seqNum = seq;
}

public K getKey() {
return key;
}

public long getTimestamp() {
return timestamp;
}

public int getSeqNum() {
return seqNum;
}

@Override
public int hashCode() {
int result = key != null ? key.hashCode() : 0;
result = 31 * result + (int) (timestamp ^ (timestamp >>> 32));
result = 31 * result + seqNum;
return result;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || !getClass().equals(o.getClass())) return false;

TimeSeriesKey<?> that = (TimeSeriesKey<?>) o;

if (timestamp != that.timestamp) return false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good call.

if (seqNum != that.seqNum) return false;
return key != null ? key.equals(that.key) : that.key == null;
}

@Override
public String toString() {
return "TimeSeriesKey{" +
"key=" + key +
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: Indentation. Maybe String.format?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Wil fix, thanks!

", timestamp=" + timestamp +
", seqNum=" + seqNum +
'}';
}
}
@@ -0,0 +1,88 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.samza.operators.impl.store;

import org.apache.samza.serializers.Serde;
import java.nio.ByteBuffer;

/**
* A {@link Serde} for {@link TimeSeriesKey}s.
*
* <p> This wraps the actual key's serde with serializers for timestamp and sequence number.
*
* A {@link TimeSeriesKeySerde} serializes a key as follows:
* +-------------------------+------------------+------------+
* | serialized-key bytes | timestamp | seq num |
* |(serialized by keySerde) | | |
* +-------------------------+------------------+------------+
* +---serialized key len----+-------8 bytes----+---4 bytes--+
*
* @param <K> the type of the wrapped key
*/
public class TimeSeriesKeySerde<K> implements Serde<TimeSeriesKey<K>> {

private static final int TIMESTAMP_SIZE = 8;
private static final int SEQNUM_SIZE = 4;

private final Serde<K> keySerde;

public TimeSeriesKeySerde(Serde<K> keySerde) {
this.keySerde = keySerde;
}

@Override
public byte[] toBytes(TimeSeriesKey<K> timeSeriesKey) {
K key = timeSeriesKey.getKey();
long timestamp = timeSeriesKey.getTimestamp();
int seqNum = timeSeriesKey.getSeqNum();

byte[] serializedKey = keySerde.toBytes(key);
int keySize = serializedKey == null ? 0 : serializedKey.length;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why do we allow null key bytes? Are there cases where timestamp + seq is sufficient?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Null bytes for key are needed to implement non-keyed tumbling and session windows. Basically, in that case, the start range of the timestamp will uniquely identify a window. The start range of the timestamp in combination with the sequence number will uniquely identify an entry in the store


// append the timestamp and sequence number to the serialized key bytes
ByteBuffer buf = ByteBuffer.allocate(keySize + TIMESTAMP_SIZE + SEQNUM_SIZE);
if (serializedKey != null) {
buf.put(serializedKey);
}
buf.putLong(timestamp);
buf.putInt(seqNum);

return buf.array();
}

@Override
public TimeSeriesKey<K> fromBytes(byte[] timeSeriesKeyBytes) {
// First obtain the key bytes, and deserialize them. Later de-serialize the timestamp and sequence number
ByteBuffer buf = ByteBuffer.wrap(timeSeriesKeyBytes);
int keySize = timeSeriesKeyBytes.length - TIMESTAMP_SIZE - SEQNUM_SIZE;
K key = null;

if (keySize != 0) {
byte[] keyBytes = new byte[keySize];
buf.get(keyBytes);
key = keySerde.fromBytes(keyBytes);
}

long timeStamp = buf.getLong();
int seqNum = buf.getInt();

return new TimeSeriesKey(key, timeStamp, seqNum);
}
}
@@ -0,0 +1,74 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/

package org.apache.samza.operators.impl.store;

import org.apache.samza.storage.kv.ClosableIterator;

/**
* A key-value store that allows entries to be queried and stored based on time ranges.
*
* Operations on the store can be invoked from multiple threads. Hence, implementations are expected to be thread-safe.
*
* @param <K> the type of key in the store
* @param <V> the type of value in the store
*/

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: Delete newline.

public interface TimeSeriesStore<K, V> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should V be something extends fromTimestampedValue?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really, <V> should be completely agnostic to types here for this to be a general purpose API. However, the return type from get should be an Iterator<TimeStampedValue<V>> (as it's being done)


/**
* Insert a key and the value to the store with the provided timestamp.
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

s/to the store/in the store.

*
* @param key the key to insert
* @param val the value to insert
* @param timeStamp the timestamp in milliseconds
*/
void put(K key, V val, Long timeStamp);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

'timestamp' everywhere.
'long' everywhere.


/**
* Returns an iterator over values for the given key in the provided time-range - [{@code startTimestamp}, {@code endTimestamp})
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

"Iterator must be closed after use... " etc.

*
* Values returned by the iterator are ordered by their timestamp. Values with the same timestamp are
* returned in their order of insertion.
*
* @param key the key to look up in the store
* @param startTimestamp the start timestamp of the range, inclusive
* @param endTimeStamp the end timestamp of the range, exclusive
* @throws IllegalArgumentException when startTimeStamp &gt; endTimeStamp, or when either of them is negative
*/
ClosableIterator<TimeSeriesValue<V>> get(K key, Long startTimestamp, Long endTimeStamp);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Do we need range(K k1, K k2)/all?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There's no usecase currently that requires a range scan across keys, or an iterator across all keys in the store. We agreed that we'll add it should a scenario arise


/**
* Removes all values for this key in the given time-range.
*
* @param key the key to look up in the store
* @param startTimestamp the start timestamp of the range, inclusive
* @param endTimeStamp the end timestamp of the range, exclusive
* @throws IllegalArgumentException when startTimeStamp &gt; endTimeStamp, or when either of them is negative
*/
void remove(K key, Long startTimestamp, Long endTimeStamp);

/**
* Closes this store.
*
* Use this to perform final clean-ups, release acquired resources etc.
*/
void close();
}