Skip to content

Commit

Permalink
[FLINK-19001] Add more JavaDoc
Browse files Browse the repository at this point in the history
  • Loading branch information
igalshilman committed Aug 19, 2020
1 parent 09fa62e commit fb66deb
Show file tree
Hide file tree
Showing 2 changed files with 9 additions and 0 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@
*/
public final class StatefulFunctionDataStreamBuilder {

/** Creates a {@code StatefulFunctionDataStreamBuilder}. */
public static StatefulFunctionDataStreamBuilder builder(String pipelineName) {
FeedbackKey<Message> key = new FeedbackKey<>(pipelineName, 1);
return new StatefulFunctionDataStreamBuilder(key);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,12 +20,19 @@

import java.util.Map;
import java.util.Objects;
import org.apache.flink.annotation.Internal;
import org.apache.flink.statefun.sdk.io.EgressIdentifier;
import org.apache.flink.streaming.api.datastream.DataStream;

/**
* StatefulFunctionEgressStreams - this class holds a handle for every egress stream defined via
* {@link StatefulFunctionDataStreamBuilder#withEgressId(EgressIdentifier)}. see {@link
* #getDataStreamForEgressId(EgressIdentifier)}.
*/
public final class StatefulFunctionEgressStreams {
private final Map<EgressIdentifier<?>, DataStream<?>> egresses;

@Internal
StatefulFunctionEgressStreams(Map<EgressIdentifier<?>, DataStream<?>> egresses) {
this.egresses = Objects.requireNonNull(egresses);
}
Expand All @@ -45,6 +52,7 @@ public final class StatefulFunctionEgressStreams {
*/
@SuppressWarnings("unchecked")
public <T> DataStream<T> getDataStreamForEgressId(EgressIdentifier<T> id) {
Objects.requireNonNull(id);
DataStream<?> dataStream = egresses.get(id);
if (dataStream == null) {
throw new IllegalArgumentException("Unknown data stream for egress " + id);
Expand Down

0 comments on commit fb66deb

Please sign in to comment.