From c68eac5684318564335bc824dfb8d6fa06c8f63f Mon Sep 17 00:00:00 2001 From: Stefan Richter Date: Tue, 14 Mar 2017 14:45:00 +0100 Subject: [PATCH] [FLINK-6044] Replace calls to InputStream#read(...) with the indended InputStream#readFully(...) --- .../TypeSerializerSerializationProxy.java | 2 +- .../common/typeutils/base/BigIntSerializer.java | 7 ++++--- .../core/memory/DataOutputViewStreamWrapper.java | 2 +- .../main/java/org/apache/flink/types/Record.java | 14 +++++++------- .../runtime/TestDataOutputSerializer.java | 10 +++++----- .../runtime/kryo/KryoClearedBufferTest.java | 5 +---- .../flink/runtime/util/DataOutputSerializer.java | 2 +- .../io/network/api/writer/RecordWriterTest.java | 2 +- .../flink/streaming/runtime/io/TestEvent.java | 2 +- 9 files changed, 22 insertions(+), 24 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationProxy.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationProxy.java index cebd348307fce..cb8967b207a60 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationProxy.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/TypeSerializerSerializationProxy.java @@ -94,7 +94,7 @@ public void read(DataInputView in) throws IOException { // read in a way that allows the stream to recover from exceptions int serializerBytes = in.readInt(); byte[] buffer = new byte[serializerBytes]; - in.read(buffer); + in.readFully(buffer); try { typeSerializer = InstantiationUtil.deserializeObject(buffer, userClassLoader); } catch (ClassNotFoundException e) { diff --git a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BigIntSerializer.java b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BigIntSerializer.java index 73b2f5472d314..041165d56d82d 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BigIntSerializer.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/typeutils/base/BigIntSerializer.java @@ -18,12 +18,13 @@ package org.apache.flink.api.common.typeutils.base; -import java.io.IOException; -import java.math.BigInteger; import org.apache.flink.annotation.Internal; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataOutputView; +import java.io.IOException; +import java.math.BigInteger; + /** * Serializer for serializing/deserializing BigInteger values including null values. */ @@ -130,7 +131,7 @@ public static BigInteger readBigInteger(DataInputView source) throws IOException } } final byte[] bytes = new byte[len - 4]; - source.read(bytes); + source.readFully(bytes); return new BigInteger(bytes); } diff --git a/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputViewStreamWrapper.java b/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputViewStreamWrapper.java index 9ec9c29d048fe..4e4553274ded0 100644 --- a/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputViewStreamWrapper.java +++ b/flink-core/src/main/java/org/apache/flink/core/memory/DataOutputViewStreamWrapper.java @@ -57,7 +57,7 @@ public void write(DataInputView source, int numBytes) throws IOException { while (numBytes > 0) { int toCopy = Math.min(numBytes, tempBuffer.length); - source.read(tempBuffer, 0, toCopy); + source.readFully(tempBuffer, 0, toCopy); write(tempBuffer, 0, toCopy); numBytes -= toCopy; } diff --git a/flink-core/src/main/java/org/apache/flink/types/Record.java b/flink-core/src/main/java/org/apache/flink/types/Record.java index 9990ddfa3f649..c29675177523c 100644 --- a/flink-core/src/main/java/org/apache/flink/types/Record.java +++ b/flink-core/src/main/java/org/apache/flink/types/Record.java @@ -19,6 +19,12 @@ package org.apache.flink.types; +import org.apache.flink.annotation.Public; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.MemoryUtils; +import org.apache.flink.util.InstantiationUtil; + import java.io.DataInput; import java.io.DataOutput; import java.io.EOFException; @@ -27,12 +33,6 @@ import java.io.UTFDataFormatException; import java.nio.ByteOrder; -import org.apache.flink.annotation.Public; -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; -import org.apache.flink.core.memory.MemoryUtils; -import org.apache.flink.util.InstantiationUtil; - /** * The Record represents a multi-valued data record. @@ -1808,7 +1808,7 @@ public void write(DataInputView source, int numBytes) throws IOException { throw new IOException("Could not write " + numBytes + " bytes since the buffer is full."); } - source.read(this.memory,this.position, numBytes); + source.readFully(this.memory,this.position, numBytes); this.position += numBytes; } } diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TestDataOutputSerializer.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TestDataOutputSerializer.java index 87be6dbfcb01f..d830a219b9eb7 100644 --- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TestDataOutputSerializer.java +++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/TestDataOutputSerializer.java @@ -18,16 +18,16 @@ package org.apache.flink.api.java.typeutils.runtime; +import org.apache.flink.core.memory.DataInputView; +import org.apache.flink.core.memory.DataOutputView; +import org.apache.flink.core.memory.MemoryUtils; + import java.io.EOFException; import java.io.IOException; import java.io.UTFDataFormatException; import java.nio.ByteBuffer; import java.nio.ByteOrder; -import org.apache.flink.core.memory.DataInputView; -import org.apache.flink.core.memory.DataOutputView; -import org.apache.flink.core.memory.MemoryUtils; - public final class TestDataOutputSerializer implements DataOutputView { private byte[] buffer; @@ -301,7 +301,7 @@ public void write(DataInputView source, int numBytes) throws IOException { throw new EOFException("Could not write " + numBytes + " bytes. Buffer overflow."); } - source.read(this.buffer, this.position, numBytes); + source.readFully(this.buffer, this.position, numBytes); this.position += numBytes; } diff --git a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoClearedBufferTest.java b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoClearedBufferTest.java index d85ff952a1108..8b420bcbd899e 100644 --- a/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoClearedBufferTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/java/typeutils/runtime/kryo/KryoClearedBufferTest.java @@ -22,13 +22,11 @@ import com.esotericsoftware.kryo.Serializer; import com.esotericsoftware.kryo.io.Input; import com.esotericsoftware.kryo.io.Output; - import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.configuration.ConfigConstants; import org.apache.flink.core.memory.DataInputView; import org.apache.flink.core.memory.DataInputViewStreamWrapper; import org.apache.flink.core.memory.DataOutputView; - import org.junit.Assert; import org.junit.Test; @@ -36,7 +34,6 @@ import java.io.EOFException; import java.io.IOException; import java.io.Serializable; -import java.nio.charset.StandardCharsets; import java.util.Arrays; public class KryoClearedBufferTest { @@ -178,7 +175,7 @@ public void write(DataInputView source, int numBytes) throws IOException { byte[] tempBuffer = new byte[numBytes]; - source.read(tempBuffer); + source.readFully(tempBuffer); System.arraycopy(tempBuffer, 0, buffer, position, numBytes); diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataOutputSerializer.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataOutputSerializer.java index 18940ede761a4..4f1cf77d05439 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataOutputSerializer.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/DataOutputSerializer.java @@ -324,7 +324,7 @@ public void write(DataInputView source, int numBytes) throws IOException { throw new EOFException("Could not write " + numBytes + " bytes. Buffer overflow."); } - source.read(this.buffer, this.position, numBytes); + source.readFully(this.buffer, this.position, numBytes); this.position += numBytes; } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java index 7d83fb51b47c9..98d4f65dfadee 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/io/network/api/writer/RecordWriterTest.java @@ -542,7 +542,7 @@ public void write(DataOutputView out) throws IOException { @Override public void read(DataInputView in) throws IOException { - in.read(bytes); + in.readFully(bytes); } } diff --git a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/TestEvent.java b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/TestEvent.java index 286477a1bc40b..9fcb7fe5e35fa 100644 --- a/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/TestEvent.java +++ b/flink-streaming-java/src/test/java/org/apache/flink/streaming/runtime/io/TestEvent.java @@ -58,7 +58,7 @@ public void write(DataOutputView out) throws IOException { public void read(DataInputView in) throws IOException { this.magicNumber = in.readLong(); this.payload = new byte[in.readInt()]; - in.read(this.payload); + in.readFully(this.payload); } // ------------------------------------------------------------------------