Skip to content

Commit

Permalink
SAMZA-1056: Added wiring for High Level API state stores, their serde…
Browse files Browse the repository at this point in the history
…s and changelogs.

Provided join operator access to durable state stores.

Author: Prateek Maheshwari <pmaheshw@linkedin.com>

Reviewers: Jagadish Venkatraman <jagadish@apache.org>

Closes #309 from prateekm/operator-store-wiring
  • Loading branch information
prateekm committed Oct 4, 2017
1 parent ad80cf9 commit a671288
Show file tree
Hide file tree
Showing 39 changed files with 940 additions and 397 deletions.
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@ docs/_site
build
**/bin
samza-test/state
state/
docs/learn/documentation/*/api/javadocs
docs/learn/documentation/*/rest/javadocs
.DS_Store
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.samza.operators.windows.Window;
import org.apache.samza.operators.windows.WindowPane;
import org.apache.samza.serializers.KVSerde;
import org.apache.samza.serializers.Serde;

import java.time.Duration;
import java.util.ArrayList;
Expand Down Expand Up @@ -128,13 +129,17 @@ public interface MessageStream<M> {
* @param otherStream the other {@link MessageStream} to be joined with
* @param joinFn the function to join messages from this and the other {@link MessageStream}
* @param ttl the ttl for messages in each stream
* @param keySerde the serde for the join key
* @param messageSerde the serde for messages in this stream
* @param otherMessageSerde the serde for messages in the other stream
* @param <K> the type of join key
* @param <JM> the type of messages in the other stream
* @param <OM> the type of messages resulting from the {@code joinFn}
* @param <OM> the type of messages in the other stream
* @param <JM> the type of messages resulting from the {@code joinFn}
* @return the joined {@link MessageStream}
*/
<K, JM, OM> MessageStream<OM> join(MessageStream<JM> otherStream,
JoinFunction<? extends K, ? super M, ? super JM, ? extends OM> joinFn, Duration ttl);
<K, OM, JM> MessageStream<JM> join(MessageStream<OM> otherStream,
JoinFunction<? extends K, ? super M, ? super OM, ? extends JM> joinFn,
Serde<K> keySerde, Serde<M> messageSerde, Serde<OM> otherMessageSerde, Duration ttl);

/**
* Merges all {@code otherStreams} with this {@link MessageStream}.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,39 +20,34 @@
package org.apache.samza.operators.functions;

/**
* Allows user-specific handling of Watermark
* Allows handling of watermarks.
*/
public interface WatermarkFunction {

/**
* Processes the input watermark coming from upstream operators.
* This allows user-defined watermark handling, such as trigger events
* or propagate it to downstream.
* This allows custom watermark handling, such as triggering events or propagating it downstream.
*
* @param watermark input watermark
*/
void processWatermark(long watermark);

/**
* Returns the output watermark. This function will be invoked immediately after either
* of the following events:
*
* <ol>
*
* <li> Return of the transform function, e.g. {@link FlatMapFunction}.
*
* <li> Return of the processWatermark function.
*
* <li> Return from the transform function, e.g. {@link FlatMapFunction}.
* <li> Return from the {@link #processWatermark} function.
* </ol>
* Note: If the transform function returns a collection of messages, the output watermark
* will be emitted after the output collection has been propagated to downstream operators.
* This might delay the watermark propagation, which will cause more buffering and might
* have a performance impact.
*
*
*
* Note: If the transform function returns a collection of output, the output watermark
* will be emitted after the output collection is propagated to downstream operators. So
* it might delay the watermark propagation. The delay will cause more buffering and might
* have performance impact.
*
* @return output watermark, or null if the output watermark should not be updated. Samza
* guarantees that the same watermark value will be only emitted once.
* @return output watermark, or null if the output watermark should not be updated.
* Samza guarantees that the same watermark value will only be emitted once.
*/
Long getOutputWatermark();

}
Original file line number Diff line number Diff line change
Expand Up @@ -23,8 +23,8 @@
import org.apache.samza.checkpoint.OffsetManager;
import org.apache.samza.job.model.JobModel;
import org.apache.samza.metrics.ReadableMetricsRegistry;
import org.apache.samza.storage.StorageEngine;
import org.apache.samza.storage.TaskStorageManager;
import org.apache.samza.storage.kv.KeyValueStore;
import org.apache.samza.system.StreamMetadataCache;
import org.apache.samza.system.SystemStreamPartition;
import org.apache.samza.task.TaskContext;
Expand Down Expand Up @@ -79,9 +79,9 @@ public Set<SystemStreamPartition> getSystemStreamPartitions() {
}

@Override
public StorageEngine getStore(String storeName) {
public KeyValueStore getStore(String storeName) {
if (storageManager != null) {
return storageManager.apply(storeName);
return (KeyValueStore) storageManager.apply(storeName);
} else {
LOG.warn("No store found for name: {}", storeName);
return null;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,12 @@
public interface DistributedLockWithState {

/**
* Trie to acquire the lock, but first check if the state flag is set. If it is set, return false.
* Try to acquire the lock, but first check if the state flag is set. If it is set, return false.
* If the flag is not set, and lock is acquired - return true.
* Throw TimeOutException if could not acquire the lock.
* @param timeout Duration of lock acquiring timeout.
* @param unit Time Unit of the timeout defined above.
* @return true if lock is acquired successfully, false if state is already set.
* @throws TimeoutException if could not acquire the lock.
*/
boolean lockIfNotSet(long timeout, TimeUnit unit) throws TimeoutException;

Expand Down
63 changes: 47 additions & 16 deletions samza-core/src/main/java/org/apache/samza/execution/JobNode.java
Original file line number Diff line number Diff line change
Expand Up @@ -33,13 +33,15 @@
import org.apache.samza.config.JobConfig;
import org.apache.samza.config.MapConfig;
import org.apache.samza.config.SerializerConfig;
import org.apache.samza.config.StorageConfig;
import org.apache.samza.config.StreamConfig;
import org.apache.samza.config.TaskConfig;
import org.apache.samza.operators.StreamGraphImpl;
import org.apache.samza.operators.spec.InputOperatorSpec;
import org.apache.samza.operators.spec.JoinOperatorSpec;
import org.apache.samza.operators.spec.OperatorSpec;
import org.apache.samza.operators.spec.OutputStreamImpl;
import org.apache.samza.operators.spec.StatefulOperatorSpec;
import org.apache.samza.operators.spec.WindowOperatorSpec;
import org.apache.samza.operators.util.MathUtils;
import org.apache.samza.serializers.Serde;
Expand Down Expand Up @@ -129,6 +131,14 @@ public JobConfig generateConfig(String executionPlanJson) {
}
}

streamGraph.getAllOperatorSpecs().forEach(opSpec -> {
if (opSpec instanceof StatefulOperatorSpec) {
((StatefulOperatorSpec) opSpec).getStoreDescriptors()
.forEach(sd -> configs.putAll(sd.getStorageConfigs()));
// store key and message serdes are configured separately in #addSerdeConfigs
}
});

configs.put(CONFIG_INTERNAL_EXECUTION_PLAN, executionPlanJson);

// write input/output streams to configs
Expand Down Expand Up @@ -168,32 +178,42 @@ public JobConfig generateConfig(String executionPlanJson) {
*
* @param configs the configs to add serialized serde instances and stream serde configs to
*/
protected void addSerdeConfigs(Map<String, String> configs) {
void addSerdeConfigs(Map<String, String> configs) {
// collect all key and msg serde instances for streams
Map<String, Serde> keySerdes = new HashMap<>();
Map<String, Serde> msgSerdes = new HashMap<>();
Map<String, Serde> streamKeySerdes = new HashMap<>();
Map<String, Serde> streamMsgSerdes = new HashMap<>();
Map<StreamSpec, InputOperatorSpec> inputOperators = streamGraph.getInputOperators();
inEdges.forEach(edge -> {
String streamId = edge.getStreamSpec().getId();
InputOperatorSpec inputOperatorSpec = inputOperators.get(edge.getStreamSpec());
Serde keySerde = inputOperatorSpec.getKeySerde();
Serde valueSerde = inputOperatorSpec.getValueSerde();
keySerdes.put(streamId, keySerde);
msgSerdes.put(streamId, valueSerde);
streamKeySerdes.put(streamId, inputOperatorSpec.getKeySerde());
streamMsgSerdes.put(streamId, inputOperatorSpec.getValueSerde());
});
Map<StreamSpec, OutputStreamImpl> outputStreams = streamGraph.getOutputStreams();
outEdges.forEach(edge -> {
String streamId = edge.getStreamSpec().getId();
OutputStreamImpl outputStream = outputStreams.get(edge.getStreamSpec());
Serde keySerde = outputStream.getKeySerde();
Serde valueSerde = outputStream.getValueSerde();
keySerdes.put(streamId, keySerde);
msgSerdes.put(streamId, valueSerde);
streamKeySerdes.put(streamId, outputStream.getKeySerde());
streamMsgSerdes.put(streamId, outputStream.getValueSerde());
});

// collect all key and msg serde instances for stores
Map<String, Serde> storeKeySerdes = new HashMap<>();
Map<String, Serde> storeMsgSerdes = new HashMap<>();
streamGraph.getAllOperatorSpecs().forEach(opSpec -> {
if (opSpec instanceof StatefulOperatorSpec) {
((StatefulOperatorSpec) opSpec).getStoreDescriptors().forEach(storeDescriptor -> {
storeKeySerdes.put(storeDescriptor.getStoreName(), storeDescriptor.getKeySerde());
storeMsgSerdes.put(storeDescriptor.getStoreName(), storeDescriptor.getMsgSerde());
});
}
});

// for each unique serde instance, generate a unique name and serialize to config
HashSet<Serde> serdes = new HashSet<>(keySerdes.values());
serdes.addAll(msgSerdes.values());
// for each unique stream or store serde instance, generate a unique name and serialize to config
HashSet<Serde> serdes = new HashSet<>(streamKeySerdes.values());
serdes.addAll(streamMsgSerdes.values());
serdes.addAll(storeKeySerdes.values());
serdes.addAll(storeMsgSerdes.values());
SerializableSerde<Serde> serializableSerde = new SerializableSerde<>();
Base64.Encoder base64Encoder = Base64.getEncoder();
Map<Serde, String> serdeUUIDs = new HashMap<>();
Expand All @@ -205,17 +225,28 @@ protected void addSerdeConfigs(Map<String, String> configs) {
});

// set key and msg serdes for streams to the serde names generated above
keySerdes.forEach((streamId, serde) -> {
streamKeySerdes.forEach((streamId, serde) -> {
String streamIdPrefix = String.format(StreamConfig.STREAM_ID_PREFIX(), streamId);
String keySerdeConfigKey = streamIdPrefix + StreamConfig.KEY_SERDE();
configs.put(keySerdeConfigKey, serdeUUIDs.get(serde));
});

msgSerdes.forEach((streamId, serde) -> {
streamMsgSerdes.forEach((streamId, serde) -> {
String streamIdPrefix = String.format(StreamConfig.STREAM_ID_PREFIX(), streamId);
String valueSerdeConfigKey = streamIdPrefix + StreamConfig.MSG_SERDE();
configs.put(valueSerdeConfigKey, serdeUUIDs.get(serde));
});

// set key and msg serdes for stores to the serde names generated above
storeKeySerdes.forEach((storeName, serde) -> {
String keySerdeConfigKey = String.format(StorageConfig.KEY_SERDE(), storeName);
configs.put(keySerdeConfigKey, serdeUUIDs.get(serde));
});

storeMsgSerdes.forEach((storeName, serde) -> {
String msgSerdeConfigKey = String.format(StorageConfig.MSG_SERDE(), storeName);
configs.put(msgSerdeConfigKey, serdeUUIDs.get(serde));
});
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.samza.operators.windows.WindowPane;
import org.apache.samza.operators.windows.internal.WindowInternal;
import org.apache.samza.serializers.KVSerde;
import org.apache.samza.serializers.Serde;

import java.time.Duration;
import java.util.Collection;
Expand Down Expand Up @@ -112,15 +113,16 @@ public <K, WV> MessageStream<WindowPane<K, WV>> window(Window<M, K, WV> window)
}

@Override
public <K, JM, TM> MessageStream<TM> join(MessageStream<JM> otherStream,
JoinFunction<? extends K, ? super M, ? super JM, ? extends TM> joinFn, Duration ttl) {
OperatorSpec<?, JM> otherOpSpec = ((MessageStreamImpl<JM>) otherStream).getOperatorSpec();
JoinOperatorSpec<K, M, JM, TM> joinOpSpec =
OperatorSpecs.createJoinOperatorSpec(this.operatorSpec, otherOpSpec,
(JoinFunction<K, M, JM, TM>) joinFn, ttl.toMillis(), this.graph.getNextOpId());
public <K, OM, JM> MessageStream<JM> join(MessageStream<OM> otherStream,
JoinFunction<? extends K, ? super M, ? super OM, ? extends JM> joinFn,
Serde<K> keySerde, Serde<M> messageSerde, Serde<OM> otherMessageSerde, Duration ttl) {
OperatorSpec<?, OM> otherOpSpec = ((MessageStreamImpl<OM>) otherStream).getOperatorSpec();
JoinOperatorSpec<K, M, OM, JM> joinOpSpec =
OperatorSpecs.createJoinOperatorSpec(this.operatorSpec, otherOpSpec, (JoinFunction<K, M, OM, JM>) joinFn,
keySerde, messageSerde, otherMessageSerde, ttl.toMillis(), this.graph.getNextOpId());

this.operatorSpec.registerNextOperatorSpec(joinOpSpec);
otherOpSpec.registerNextOperatorSpec((OperatorSpec<JM, ?>) joinOpSpec);
otherOpSpec.registerNextOperatorSpec((OperatorSpec<OM, ?>) joinOpSpec);

return new MessageStreamImpl<>(this.graph, joinOpSpec);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,21 +18,22 @@
*/
package org.apache.samza.operators.functions;

import org.apache.samza.operators.impl.store.TimestampedValue;
import org.apache.samza.storage.kv.KeyValueStore;

/**
* An internal function that maintains state and join logic for one side of a two-way join.
*/
public interface PartialJoinFunction<K, M, JM, RM> extends InitableFunction, ClosableFunction {
public interface PartialJoinFunction<K, M, OM, JM> extends InitableFunction, ClosableFunction {

/**
* Joins a message in this stream with a message from another stream.
*
* @param m message from this input stream
* @param jm message from the other input stream
* @param om message from the other input stream
* @return the joined message in the output stream
*/
RM apply(M m, JM jm);
JM apply(M m, OM om);

/**
* Gets the key for the input message.
Expand All @@ -47,23 +48,6 @@ public interface PartialJoinFunction<K, M, JM, RM> extends InitableFunction, Clo
*
* @return the key value store containing the state for this stream
*/
KeyValueStore<K, PartialJoinMessage<M>> getState();
KeyValueStore<K, TimestampedValue<M>> getState();

class PartialJoinMessage<M> {
private final M message;
private final long receivedTimeMs;

public PartialJoinMessage(M message, long receivedTimeMs) {
this.message = message;
this.receivedTimeMs = receivedTimeMs;
}

public M getMessage() {
return message;
}

public long getReceivedTimeMs() {
return receivedTimeMs;
}
}
}
Loading

0 comments on commit a671288

Please sign in to comment.