Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

SAMZA-1423: Implement time series storage for joins and windows #303

Closed
wants to merge 16 commits into from
1 change: 1 addition & 0 deletions build.gradle
Expand Up @@ -178,6 +178,7 @@ project(":samza-core_$scalaVersion") {
compile "org.scala-lang:scala-library:$scalaLibVersion"
compile "org.slf4j:slf4j-api:$slf4jVersion"
testCompile project(":samza-api").sourceSets.test.output
testCompile project(":samza-kv-rocksdb_$scalaVersion")
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why core depends on kv-rocksDb? Seems this will cause circular dependency. I would prefer the test cases added to kv-rocksdb to avoid this.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That would mean that all tests for operators (window/ join) that instantiate stores would now be a part of samza-kv-rocksdb. Let's sync up offline since the scope of that seems to be larger than this PR.

Also, I believe circular dependencies in testCompile tasks are fine in gradle.

testCompile "junit:junit:$junitVersion"
testCompile "org.mockito:mockito-all:$mockitoVersion"
testCompile "org.powermock:powermock-api-mockito:$powerMockVersion"
Expand Down
@@ -0,0 +1,40 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/

package org.apache.samza.storage.kv;

import java.util.Iterator;

/**
* An iterator that must be closed.
*
* <p>
* Implement close to free resources assigned to the iterator such as open file handles, persistent state etc.
*
* @param <V> the type of value returned by this iterator
*/
public interface ClosableIterator<V> extends Iterator<V> {

/**
* Closes this iterator and frees resources assigned to it.
*
* It is illegal to invoke {@link #next()} and {@link #hasNext()} after an iterator has been closed.
*/
public void close();
}
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.samza.operators.impl.store;

/**
* The store key used in the {@link TimeSeriesStore} to uniquely identify a row.
*/
public class TimeSeriesKey<K> {

// version for backwards compatibility
private static final byte VERSION = 0x00;
private final K key;
private final long timestamp;

private final long seqNum;

public TimeSeriesKey(K k, long time, long seq) {
key = k;
timestamp = time;
seqNum = seq;
}

public K getKey() {
return key;
}

public long getTimestamp() {
return timestamp;
}

public byte getVersion() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

seems a little bit weird that it's a member fn to return the static val.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks, will fix it!

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@xinyuiscool I accidentally committed this without addressing this feedback. However, I ended up doing a follow-up patch that addresses your comment - 92894e5

return VERSION;
}

@Override
public boolean equals(Object o) {
if (this == o) return true;
if (o == null || getClass() != o.getClass()) return false;

TimeSeriesKey<?> that = (TimeSeriesKey<?>) o;

if (timestamp != that.timestamp) return false;
if (seqNum != that.seqNum) return false;
return key != null ? key.equals(that.key) : that.key == null;
}

@Override
public int hashCode() {
int result = key != null ? key.hashCode() : 0;
result = 31 * result + (int) (timestamp ^ (timestamp >>> 32));
result = 31 * result + (int) (seqNum ^ (seqNum >>> 32));
return result;
}

public long getSeqNum() {
return seqNum;
}

@Override
public String toString() {
return String.format("TimeSeriesKey {key: %s timestamp: %s seqNum: %s}", key, timestamp, seqNum);
}
}
@@ -0,0 +1,96 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/
package org.apache.samza.operators.impl.store;

import org.apache.samza.SamzaException;
import org.apache.samza.serializers.Serde;

import java.nio.ByteBuffer;

/**
* A {@link Serde} for {@link TimeSeriesKey}s.
*
* <p>
* This wraps the actual key's serde with serializers for timestamp, version number and sequence number.
*
* A {@link TimeSeriesKeySerde} serializes a key as follows:
* +-------------------------+------------------+----------------+------------------+
* | serialized-key bytes | timestamp | version (0) | seqNum |
* |(serialized by keySerde) | | | |
* +-------------------------+------------------+----------------+------------------+
* +---serialized key len----+-------8 bytes----+---1 byte-------+---7 bytes---------+
*
* @param <K> the type of the wrapped key
*/
public class TimeSeriesKeySerde<K> implements Serde<TimeSeriesKey<K>> {

private static final long SEQUENCE_NUM_MASK = 0x00ffffffffffffffL;
private static final int TIMESTAMP_SIZE = 8;
private static final int SEQNUM_SIZE = 8;

private final Serde<K> keySerde;

public TimeSeriesKeySerde(Serde<K> keySerde) {
this.keySerde = keySerde;
}

@Override
public byte[] toBytes(TimeSeriesKey<K> timeSeriesKey) {
K key = timeSeriesKey.getKey();
long timestamp = timeSeriesKey.getTimestamp();
long seqNum = timeSeriesKey.getSeqNum();

byte[] serializedKey = keySerde.toBytes(key);
int keySize = serializedKey == null ? 0 : serializedKey.length;

// append the timestamp and sequence number to the serialized key bytes
ByteBuffer buf = ByteBuffer.allocate(keySize + TIMESTAMP_SIZE + SEQNUM_SIZE);
if (serializedKey != null) {
buf.put(serializedKey);
}
buf.putLong(timestamp);
buf.putLong(seqNum & SEQUENCE_NUM_MASK);

return buf.array();
}

@Override
public TimeSeriesKey<K> fromBytes(byte[] timeSeriesKeyBytes) {
// First obtain the key bytes, and deserialize them. Later de-serialize the timestamp and sequence number
ByteBuffer buf = ByteBuffer.wrap(timeSeriesKeyBytes);
int keySize = timeSeriesKeyBytes.length - TIMESTAMP_SIZE - SEQNUM_SIZE;
K key = null;

if (keySize != 0) {
byte[] keyBytes = new byte[keySize];
buf.get(keyBytes);
key = keySerde.fromBytes(keyBytes);
}

long timeStamp = buf.getLong();
long seqNum = buf.getLong();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This relies on version being 0. Safer to do the right thing here now.
Q: What should happen if the version doesn't match the field in the class? Should we throw here?

long version = seqNum & ~SEQUENCE_NUM_MASK;

if (version != 0) {
throw new SamzaException("Version is not zero. Sequence number: " + seqNum);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This check should be version != TimeSeriesKey.VERSION.

}
return new TimeSeriesKey(key, timeStamp, seqNum);
}
}
@@ -0,0 +1,80 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*
*/

package org.apache.samza.operators.impl.store;

import org.apache.samza.storage.kv.ClosableIterator;

/**
* A key-value store that allows entries to be queried and stored based on time ranges.
*
* Operations on the store can be invoked from multiple threads. Hence, implementations are expected to be thread-safe.
*
* @param <K> the type of key in the store
* @param <V> the type of value in the store
*/
public interface TimeSeriesStore<K, V> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should V be something extends fromTimestampedValue?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not really, <V> should be completely agnostic to types here for this to be a general purpose API. However, the return type from get should be an Iterator<TimeStampedValue<V>> (as it's being done)


/**
* Insert a key and the value in the store with the provided timestamp.
*
* @param key the key to insert
* @param val the value to insert
* @param timestamp the timestamp in milliseconds
*/
void put(K key, V val, long timestamp);

/**
* Returns an iterator over values for the given key in the provided time-range - [{@code startTimestamp}, {@code endTimestamp})
*
* Values returned by the iterator are ordered by their timestamp. Values with the same timestamp are
* returned in their order of insertion.
*
* <p> The iterator <b>must</b> be closed after use by calling {@link #close}. Not doing so will result in memory leaks.
*
* @param key the key to look up in the store
* @param startTimestamp the start timestamp of the range, inclusive
* @param endTimestamp the end timestamp of the range, exclusive
* @throws IllegalArgumentException when startTimeStamp &gt; endTimestamp, or when either of them is negative
*/
ClosableIterator<TimestampedValue<V>> get(K key, long startTimestamp, long endTimestamp);

/**
* Removes all values for this key in the given time-range.
*
* @param key the key to look up in the store
* @param startTimestamp the start timestamp of the range, inclusive
* @param endTimeStamp the end timestamp of the range, exclusive
* @throws IllegalArgumentException when startTimeStamp &gt; endTimeStamp, or when either of them is negative
*/
void remove(K key, long startTimestamp, long endTimeStamp);

/**
* Flushes this time series store, if applicable.
*/
void flush();

/**
* Closes this store.
*
* Use this to perform final clean-ups, release acquired resources etc.
*/
void close();
}