From 341b3403d58337b87a8caffd87bc6a0f63292ea2 Mon Sep 17 00:00:00 2001 From: Nico Kruber Date: Mon, 4 Sep 2017 17:21:52 +0200 Subject: [PATCH] [FLINK-7774][network] fix not clearing deserializers on closing an input --- .../runtime/io/network/api/reader/AbstractRecordReader.java | 1 + .../apache/flink/streaming/runtime/io/StreamInputProcessor.java | 1 + .../flink/streaming/runtime/io/StreamTwoInputProcessor.java | 1 + 3 files changed, 3 insertions(+) diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java index c5aeef79c43d5..29f2b6d6a3fac 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/io/network/api/reader/AbstractRecordReader.java @@ -120,6 +120,7 @@ public void clearBuffers() { if (buffer != null && !buffer.isRecycled()) { buffer.recycle(); } + deserializer.clear(); } } } diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java index 263077dcecfbd..609f8b89eff46 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamInputProcessor.java @@ -264,6 +264,7 @@ public void cleanup() throws IOException { if (buffer != null && !buffer.isRecycled()) { buffer.recycle(); } + deserializer.clear(); } // cleanup the barrier handler resources diff --git a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java index a25540d179419..78741471b1bc5 100644 --- a/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java +++ b/flink-streaming-java/src/main/java/org/apache/flink/streaming/runtime/io/StreamTwoInputProcessor.java @@ -329,6 +329,7 @@ public void cleanup() throws IOException { if (buffer != null && !buffer.isRecycled()) { buffer.recycle(); } + deserializer.clear(); } // cleanup the barrier handler resources