Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-5480] Introduce user-provided hash for JobVertexes #3117

Closed
wants to merge 4 commits into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -18,6 +18,7 @@
package org.apache.flink.streaming.connectors.cassandra;

import com.datastax.driver.core.Cluster;
import org.apache.flink.annotation.PublicEvolving;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.api.common.typeutils.TypeSerializer;
import org.apache.flink.api.java.tuple.Tuple;
Expand Down Expand Up @@ -85,6 +86,7 @@ public CassandraSink<IN> name(String name) {
* @param uid The unique user-specified ID of this transformation.
* @return The operator with the specified ID.
*/
@PublicEvolving
public CassandraSink<IN> uid(String uid) {
if (useDataStreamSink) {
getSinkTransformation().setUid(uid);
Expand All @@ -94,6 +96,36 @@ public CassandraSink<IN> uid(String uid) {
return this;
}

/**
* Sets an user provided hash for this operator. This will be used AS IS the create the JobVertexID.
* <p/>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Not needed

* <p>The user provided hash is an alternative to the generated hashes, that is considered when identifying an
* operator through the default hash mechanics fails (e.g. because of changes between Flink versions).
* <p/>
* <p><strong>Important</strong>: this should be used as a workaround or for trouble shooting. The provided hash
* needs to be unique per transformation and job. Otherwise, job submission will fail. Furthermore, you cannot
* assign user-specified hash to intermediate nodes in an operator chain and trying so will let your job fail.
*
* <p>
* A use case for this is in migration between Flink versions or changing the jobs in a way that changes the
* automatically generated hashes. In this case, providing the previous hashes directly through this method (e.g.
* obtained from old logs) can help to reestablish a lost mapping from states to their target operator.
* <p/>
*
* @param uidHash The user provided hash for this operator. This will become the JobVertexID, which is shown in the
* logs and web ui.
* @return The operator with the user provided hash.
*/
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We don't annotate classes outside of some projects, but should we add the @PublicEvolving annotation here as well (also missing for uid in this class).

@PublicEvolving
public CassandraSink<IN> setUidHash(String uidHash) {
if (useDataStreamSink) {
getSinkTransformation().setUidHash(uidHash);
} else {
getStreamTransformation().setUidHash(uidHash);
}
return this;
}

/**
* Sets the parallelism for this sink. The degree must be higher than zero.
*
Expand Down
Expand Up @@ -130,7 +130,7 @@ private boolean generateNodeHash(
boolean isChainingEnabled) {

// Check for user-specified ID
String userSpecifiedHash = node.getTransformationId();
String userSpecifiedHash = node.getTransformationUID();

if (userSpecifiedHash == null) {
// Check that all input nodes have their hashes computed
Expand Down Expand Up @@ -192,7 +192,7 @@ private boolean generateNodeHash(
* Generates a hash from a user-specified ID.
*/
private byte[] generateUserSpecifiedHash(StreamNode node, Hasher hasher) {
hasher.putString(node.getTransformationId(), Charset.forName("UTF-8"));
hasher.putString(node.getTransformationUID(), Charset.forName("UTF-8"));

return hasher.hash().asBytes();
}
Expand Down
Expand Up @@ -76,6 +76,32 @@ public DataStreamSink<T> uid(String uid) {
return this;
}

/**
* Sets an user provided hash for this operator. This will be used AS IS the create the JobVertexID.
* <p/>
* <p>The user provided hash is an alternative to the generated hashes, that is considered when identifying an
* operator through the default hash mechanics fails (e.g. because of changes between Flink versions).
* <p/>
* <p><strong>Important</strong>: this should be used as a workaround or for trouble shooting. The provided hash
* needs to be unique per transformation and job. Otherwise, job submission will fail. Furthermore, you cannot
* assign user-specified hash to intermediate nodes in an operator chain and trying so will let your job fail.
*
* <p>
* A use case for this is in migration between Flink versions or changing the jobs in a way that changes the
* automatically generated hashes. In this case, providing the previous hashes directly through this method (e.g.
* obtained from old logs) can help to reestablish a lost mapping from states to their target operator.
* <p/>
*
* @param uidHash The user provided hash for this operator. This will become the JobVertexID, which is shown in the
* logs and web ui.
* @return The operator with the user provided hash.
*/
@PublicEvolving
public DataStreamSink<T> setUidHash(String uidHash) {
transformation.setUidHash(uidHash);
return this;
}

/**
* Sets the parallelism for this sink. The degree must be higher than zero.
*
Expand Down
Expand Up @@ -87,6 +87,32 @@ public SingleOutputStreamOperator<T> uid(String uid) {
return this;
}

/**
* Sets an user provided hash for this operator. This will be used AS IS the create the JobVertexID.
* <p/>
* <p>The user provided hash is an alternative to the generated hashes, that is considered when identifying an
* operator through the default hash mechanics fails (e.g. because of changes between Flink versions).
* <p/>
* <p><strong>Important</strong>: this should be used as a workaround or for trouble shooting. The provided hash
* needs to be unique per transformation and job. Otherwise, job submission will fail. Furthermore, you cannot
* assign user-specified hash to intermediate nodes in an operator chain and trying so will let your job fail.
*
* <p>
* A use case for this is in migration between Flink versions or changing the jobs in a way that changes the
* automatically generated hashes. In this case, providing the previous hashes directly through this method (e.g.
* obtained from old logs) can help to reestablish a lost mapping from states to their target operator.
* <p/>
*
* @param uidHash The user provided hash for this operator. This will become the JobVertexID, which is shown in the
* logs and web ui.
* @return The operator with the user provided hash.
*/
@PublicEvolving
public SingleOutputStreamOperator<T> setUidHash(String uidHash) {
transformation.setUidHash(uidHash);
return this;
}

/**
* Sets the parallelism for this operator. The degree must be 1 or more.
*
Expand Down
Expand Up @@ -17,18 +17,6 @@

package org.apache.flink.streaming.api.graph;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

import org.apache.flink.annotation.Internal;
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.io.InputFormat;
Expand All @@ -41,6 +29,7 @@
import org.apache.flink.optimizer.plan.StreamingPlan;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.streaming.api.collector.selector.OutputSelector;
import org.apache.flink.streaming.api.environment.CheckpointConfig;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
Expand All @@ -49,7 +38,6 @@
import org.apache.flink.streaming.api.operators.StreamOperator;
import org.apache.flink.streaming.api.operators.StreamSource;
import org.apache.flink.streaming.api.operators.TwoInputStreamOperator;
import org.apache.flink.runtime.state.AbstractStateBackend;
import org.apache.flink.streaming.runtime.partitioner.ConfigurableStreamPartitioner;
import org.apache.flink.streaming.runtime.partitioner.ForwardPartitioner;
import org.apache.flink.streaming.runtime.partitioner.RebalancePartitioner;
Expand All @@ -63,6 +51,18 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.io.File;
import java.io.FileOutputStream;
import java.io.IOException;
import java.io.PrintWriter;
import java.util.ArrayList;
import java.util.Collection;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;

/**
* Class representing the streaming topology. It contains all the information
* necessary to build the jobgraph for the execution.
Expand Down Expand Up @@ -472,10 +472,17 @@ public void setInputFormat(Integer vertexID, InputFormat<?, ?> inputFormat) {
getStreamNode(vertexID).setInputFormat(inputFormat);
}

void setTransformationId(Integer nodeId, String transformationId) {
void setTransformationUID(Integer nodeId, String transformationId) {
StreamNode node = streamNodes.get(nodeId);
if (node != null) {
node.setTransformationUID(transformationId);
}
}

void setTransformationUserHash(Integer nodeId, String nodeHash) {
StreamNode node = streamNodes.get(nodeId);
if (node != null) {
node.setTransformationId(transformationId);
node.setUserHash(nodeHash);
}
}

Expand Down
Expand Up @@ -208,7 +208,10 @@ private Collection<Integer> transform(StreamTransformation<?> transform) {
streamGraph.setBufferTimeout(transform.getId(), transform.getBufferTimeout());
}
if (transform.getUid() != null) {
streamGraph.setTransformationId(transform.getId(), transform.getUid());
streamGraph.setTransformationUID(transform.getId(), transform.getUid());
}
if (transform.getUserProvidedNodeHash() != null) {
streamGraph.setTransformationUserHash(transform.getId(), transform.getUserProvidedNodeHash());
}

return transformedIds;
Expand Down
Expand Up @@ -148,7 +148,7 @@ private boolean generateNodeHash(
boolean isChainingEnabled) {

// Check for user-specified ID
String userSpecifiedHash = node.getTransformationId();
String userSpecifiedHash = node.getTransformationUID();

if (userSpecifiedHash == null) {
// Check that all input nodes have their hashes computed
Expand Down Expand Up @@ -210,7 +210,7 @@ private boolean generateNodeHash(
* Generates a hash from a user-specified ID.
*/
private byte[] generateUserSpecifiedHash(StreamNode node, Hasher hasher) {
hasher.putString(node.getTransformationId(), Charset.forName("UTF-8"));
hasher.putString(node.getTransformationUID(), Charset.forName("UTF-8"));

return hasher.hash().asBytes();
}
Expand Down
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.flink.streaming.api.graph;

import org.apache.flink.util.StringUtils;

import java.util.HashMap;
import java.util.Map;

/**
* StreamGraphHasher that works with user provided hashes. This is useful in case we want to set (alternative) hashes
* explicitly, e.g. to provide a way of manual backwards compatibility between versions when the mechanism of generating
* hashes has changed in an incompatible way.
*
*/
public class StreamGraphUserHashHasher implements StreamGraphHasher {

@Override
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

empty lines above missing

public Map<Integer, byte[]> traverseStreamGraphAndGenerateHashes(StreamGraph streamGraph) {
HashMap<Integer, byte[]> hashResult = new HashMap<>();
for (StreamNode streamNode : streamGraph.getStreamNodes()) {

String userHash = streamNode.getUserHash();

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

empty line

if (null != userHash) {
for (StreamEdge inEdge : streamNode.getInEdges()) {
if (StreamingJobGraphGenerator.isChainable(inEdge, streamGraph)) {
throw new UnsupportedOperationException("Cannot assign user-specified hash "
+ "to intermediate node in chain. This will be supported in future "
+ "versions of Flink. As a work around start new chain at task "
+ streamNode.getOperatorName() + ".");
}
}

hashResult.put(streamNode.getId(), StringUtils.hexStringToByte(userHash));
}
}

return hashResult;
}
}
Expand Up @@ -69,7 +69,8 @@ public class StreamNode implements Serializable {

private InputFormat<?, ?> inputFormat;

private String transformationId;
private String transformationUID;
private String userHash;

public StreamNode(StreamExecutionEnvironment env,
Integer id,
Expand Down Expand Up @@ -272,12 +273,20 @@ public void setStateKeySerializer(TypeSerializer<?> stateKeySerializer) {
this.stateKeySerializer = stateKeySerializer;
}

public String getTransformationId() {
return transformationId;
public String getTransformationUID() {
return transformationUID;
}

void setTransformationId(String transformationId) {
this.transformationId = transformationId;
void setTransformationUID(String transformationId) {
this.transformationUID = transformationId;
}

public String getUserHash() {
return userHash;
}

public void setUserHash(String userHash) {
this.userHash = userHash;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Since this is executed on the client before submitting the job it might be helpful to do some early sanity checking here. The expected String is a hex representation of a JobVertexID

}

@Override
Expand Down
Expand Up @@ -51,8 +51,8 @@
import org.slf4j.LoggerFactory;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
import java.util.List;
Expand Down Expand Up @@ -89,7 +89,7 @@ public class StreamingJobGraphGenerator {
public StreamingJobGraphGenerator(StreamGraph streamGraph) {
this.streamGraph = streamGraph;
this.defaultStreamGraphHasher = new StreamGraphHasherV2();
this.legacyStreamGraphHashers = Collections.<StreamGraphHasher>singletonList(new StreamGraphHasherV1());
this.legacyStreamGraphHashers = Arrays.asList(new StreamGraphHasherV1(), new StreamGraphUserHashHasher());
}

private void init() {
Expand Down Expand Up @@ -185,7 +185,7 @@ private List<StreamEdge> createChain(
List<StreamEdge> nonChainableOutputs = new ArrayList<StreamEdge>();

for (StreamEdge outEdge : streamGraph.getStreamNode(currentNodeId).getOutEdges()) {
if (isChainable(outEdge)) {
if (isChainable(outEdge, streamGraph)) {
chainableOutputs.add(outEdge);
} else {
nonChainableOutputs.add(outEdge);
Expand Down Expand Up @@ -426,7 +426,7 @@ private void connect(Integer headOfChain, StreamEdge edge) {
}
}

private boolean isChainable(StreamEdge edge) {
public static boolean isChainable(StreamEdge edge, StreamGraph streamGraph) {
StreamNode upStreamVertex = edge.getSourceVertex();
StreamNode downStreamVertex = edge.getTargetVertex();

Expand Down