From 6386983239bd3024b395c865ec4fd33e232ca5a3 Mon Sep 17 00:00:00 2001 From: Bowen Li Date: Wed, 30 Aug 2017 09:35:03 -0700 Subject: [PATCH 1/4] FLINK-7422 Upgrade Kinesis Client Library (KCL) and AWS SDK in flink-connector-kinesis --- flink-connectors/flink-connector-kinesis/pom.xml | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/flink-connectors/flink-connector-kinesis/pom.xml b/flink-connectors/flink-connector-kinesis/pom.xml index c5a91b178c2a0..83934f64608be 100644 --- a/flink-connectors/flink-connector-kinesis/pom.xml +++ b/flink-connectors/flink-connector-kinesis/pom.xml @@ -33,8 +33,8 @@ under the License. flink-connector-kinesis_${scala.binary.version} flink-connector-kinesis - 1.10.71 - 1.6.2 + 1.11.171 + 1.8.1 0.12.5 From 3e60bc63fc1a9c884bdf3d3bd2fe869c1c631849 Mon Sep 17 00:00:00 2001 From: Bowen Li Date: Sat, 9 Sep 2017 00:03:13 -0700 Subject: [PATCH 2/4] FLINK-6549 Improve error message for type mismatches with side outputs --- .../flink/streaming/runtime/tasks/OperatorChain.java | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java index b15f126db2c05..b0a96ec26ecb7 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java @@ -526,6 +526,15 @@ protected void pushToOperator(StreamRecord record) { StreamRecord copy = castRecord.copy(serializer.copy(castRecord.getValue())); operator.setKeyContextElement1(copy); operator.processElement(copy); + } catch (ClassCastException e) { + ClassCastException replace = new ClassCastException( + String.format("%s. Failed pushing OutputTag with id '%s' to operator. " + + "This can occur when multiple OutputTags with different types " + + "but identical names are being used.", + e.getMessage(), outputTag.getId())); + + throw new ExceptionInChainedOperatorException(replace); + } catch (Exception e) { throw new ExceptionInChainedOperatorException(e); } From c55adb7a4b1bd224b0578f0bad583fa58a42b3ae Mon Sep 17 00:00:00 2001 From: Bowen Li Date: Sat, 9 Sep 2017 00:06:17 -0700 Subject: [PATCH 3/4] add comment --- .../org/apache/flink/streaming/runtime/tasks/OperatorChain.java | 1 + 1 file changed, 1 insertion(+) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java index b0a96ec26ecb7..7cbec8bd17476 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java @@ -527,6 +527,7 @@ protected void pushToOperator(StreamRecord record) { operator.setKeyContextElement1(copy); operator.processElement(copy); } catch (ClassCastException e) { + // Enrich error message ClassCastException replace = new ClassCastException( String.format("%s. Failed pushing OutputTag with id '%s' to operator. " + "This can occur when multiple OutputTags with different types " + From dc2e68202f7ba063fa8f2a0ea22cbe9de55f95b8 Mon Sep 17 00:00:00 2001 From: Bowen Li Date: Sat, 9 Sep 2017 00:14:34 -0700 Subject: [PATCH 4/4] fix grammer --- .../org/apache/flink/streaming/runtime/tasks/OperatorChain.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java index 7cbec8bd17476..38279822ec2d4 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/tasks/OperatorChain.java @@ -529,7 +529,7 @@ protected void pushToOperator(StreamRecord record) { } catch (ClassCastException e) { // Enrich error message ClassCastException replace = new ClassCastException( - String.format("%s. Failed pushing OutputTag with id '%s' to operator. " + + String.format("%s. Failed to push OutputTag with id '%s' to operator. " + "This can occur when multiple OutputTags with different types " + "but identical names are being used.", e.getMessage(), outputTag.getId()));