Skip to content

Commit

Permalink
[FLINK-2976] [streaming-java, streaming-scala] Set JobVertexID based …
Browse files Browse the repository at this point in the history
…on stream node hash

[comments] Remove unused argument to method

[comments] Add more comments to stream node hashing

Add name to hash in order to detect swapped nodes when they have names

Improve error message on non-unique user-specified IDs

[comments] Add comment to stream node hashing
  • Loading branch information
uce committed Jan 11, 2016
1 parent a3c0185 commit ad7b21b
Show file tree
Hide file tree
Showing 11 changed files with 979 additions and 45 deletions.
Expand Up @@ -53,6 +53,23 @@ public DataStreamSink<T> name(String name) {
return this;
}

/**
* Sets an ID for this operator.
*
* <p>The specified ID is used to assign the same operator ID across job
* submissions (for example when starting a job from a savepoint).
*
* <p><strong>Important</strong>: this ID needs to be unique per
* transformation and job. Otherwise, job submission will fail.
*
* @param uid The unique user-specified ID of this transformation.
* @return The operator with the specified ID.
*/
public DataStreamSink<T> uid(String uid) {
transformation.setUid(uid);
return this;
}

/**
* Sets the parallelism for this sink. The degree must be higher than zero.
*
Expand Down
Expand Up @@ -62,6 +62,23 @@ public SingleOutputStreamOperator<T, O> name(String name){
return this;
}

/**
* Sets an ID for this operator.
*
* <p>The specified ID is used to assign the same operator ID across job
* submissions (for example when starting a job from a savepoint).
*
* <p><strong>Important</strong>: this ID needs to be unique per
* transformation and job. Otherwise, job submission will fail.
*
* @param uid The unique user-specified ID of this transformation.
* @return The operator with the specified ID.
*/
public SingleOutputStreamOperator<T, O> uid(String uid) {
transformation.setUid(uid);
return this;
}

/**
* Sets the parallelism for this operator. The degree must be 1 or more.
*
Expand Down
Expand Up @@ -420,6 +420,13 @@ public void setResourceStrategy(Integer vertexID, ResourceStrategy strategy) {
}
}

void setTransformationId(Integer nodeId, String transformationId) {
StreamNode node = streamNodes.get(nodeId);
if (node != null) {
node.setTransformationId(transformationId);
}
}

public StreamNode getStreamNode(Integer vertexID) {
return streamNodes.get(vertexID);
}
Expand Down
Expand Up @@ -182,6 +182,9 @@ private Collection<Integer> transform(StreamTransformation<?> transform) {
if (transform.getResourceStrategy() != StreamGraph.ResourceStrategy.DEFAULT) {
streamGraph.setResourceStrategy(transform.getId(), transform.getResourceStrategy());
}
if (transform.getUid() != null) {
streamGraph.setTransformationId(transform.getId(), transform.getUid());
}

return transformedIds;
}
Expand Down
Expand Up @@ -65,6 +65,8 @@ public class StreamNode implements Serializable {

private InputFormat<?, ?> inputFormat;

private String transformationId;

public StreamNode(StreamExecutionEnvironment env, Integer id, StreamOperator<?> operator,
String operatorName, List<OutputSelector<?>> outputSelector,
Class<? extends AbstractInvokable> jobVertexClass) {
Expand Down Expand Up @@ -242,6 +244,14 @@ public void setStateKeySerializer(TypeSerializer<?> stateKeySerializer) {
this.stateKeySerializer = stateKeySerializer;
}

String getTransformationId() {
return transformationId;
}

void setTransformationId(String transformationId) {
this.transformationId = transformationId;
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down

0 comments on commit ad7b21b

Please sign in to comment.