From aec753c8829a5cf7cc975a70507126cb96abe096 Mon Sep 17 00:00:00 2001 From: Sebastian Kruse Date: Thu, 7 May 2015 13:34:12 +0200 Subject: [PATCH 1/4] [FLINK-1980] allowing users to decorate input streams * add a decorateInputStream() method as hook in FileInputFormat * provide a InputStreamFSInputWrapper to conveniently wrap InputStreams * base existing .deflate file support on these changes * add a test to verify the decoration --- .../flink/api/common/io/FileInputFormat.java | 28 ++++-- .../io/InflaterInputStreamFSInputWrapper.java | 33 +------ .../common/io/InputStreamFSInputWrapper.java | 60 +++++++++++++ .../api/common/io/FileInputFormatTest.java | 89 +++++++++++++++++-- 4 files changed, 167 insertions(+), 43 deletions(-) create mode 100644 flink-core/src/main/java/org/apache/flink/api/common/io/InputStreamFSInputWrapper.java diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java index 82e4c54906d84..f4d5c355ae909 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java @@ -641,11 +641,7 @@ public void open(FileInputSplit fileSplit) throws IOException { try { this.stream = isot.waitForCompletion(); - // Wrap stream in a extracting (decompressing) stream if file ends with .deflate. - if(fileSplit.getPath().getName().endsWith(DEFLATE_SUFFIX)) { - this.stream = new InflaterInputStreamFSInputWrapper(stream); - } - + this.stream = decorateInputStream(this.stream, fileSplit); } catch (Throwable t) { throw new IOException("Error opening the Input Split " + fileSplit.getPath() + @@ -657,7 +653,27 @@ public void open(FileInputSplit fileSplit) throws IOException { this.stream.seek(this.splitStart); } } - + + /** + * This method allows to wrap/decorate the raw {@link FSDataInputStream} for a certain file split, e.g., for decoding. + * When overriding this method, also consider adapting {@link FileInputFormat#testForUnsplittable} if your + * stream decoration renders the input file unsplittable. Also consider calling existing superclass implementations. + * + * @param inputStream is the input stream to decorated + * @param fileSplit is the file split for which the input stream shall be decorated + * @return the decorated input stream + * @throws Throwable if the decoration fails + * @see org.apache.flink.api.common.io.InputStreamFSInputWrapper + */ + protected FSDataInputStream decorateInputStream(FSDataInputStream inputStream, FileInputSplit fileSplit) throws Throwable { + // Wrap stream in a extracting (decompressing) stream if file ends with .deflate. + if (fileSplit.getPath().getName().endsWith(DEFLATE_SUFFIX)) { + return new InflaterInputStreamFSInputWrapper(stream); + } + + return inputStream; + } + /** * Closes the file input stream of the input format. */ diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/InflaterInputStreamFSInputWrapper.java b/flink-core/src/main/java/org/apache/flink/api/common/io/InflaterInputStreamFSInputWrapper.java index c940cbcdc20fe..ac87535c0e164 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/InflaterInputStreamFSInputWrapper.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/InflaterInputStreamFSInputWrapper.java @@ -19,41 +19,14 @@ package org.apache.flink.api.common.io; -import java.io.IOException; -import java.util.zip.InflaterInputStream; - import org.apache.flink.core.fs.FSDataInputStream; -public class InflaterInputStreamFSInputWrapper extends FSDataInputStream { +import java.util.zip.InflaterInputStream; - private InflaterInputStream inStream; +public class InflaterInputStreamFSInputWrapper extends InputStreamFSInputWrapper { public InflaterInputStreamFSInputWrapper(FSDataInputStream inStream) { - this.inStream = new InflaterInputStream(inStream); - } - - @Override - public void seek(long desired) throws IOException { - throw new UnsupportedOperationException("Compressed streams do not support the seek operation"); + super(new InflaterInputStream(inStream)); } - @Override - public long getPos() throws IOException { - throw new UnsupportedOperationException("Compressed streams do not support the getPos operation"); - } - - @Override - public int read() throws IOException { - return inStream.read(); - } - - @Override - public int read(byte[] b, int off, int len) throws IOException { - return inStream.read(b, off, len); - } - - @Override - public int read(byte[] b) throws IOException { - return inStream.read(b); - } } diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/InputStreamFSInputWrapper.java b/flink-core/src/main/java/org/apache/flink/api/common/io/InputStreamFSInputWrapper.java new file mode 100644 index 0000000000000..3f6def47715cf --- /dev/null +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/InputStreamFSInputWrapper.java @@ -0,0 +1,60 @@ +package org.apache.flink.api.common.io; + +import org.apache.flink.core.fs.FSDataInputStream; + +import java.io.IOException; +import java.io.InputStream; + +/** + * This class wraps an {@link java.io.InputStream} and exposes it as {@link org.apache.flink.core.fs.FSDataInputStream}. + *
+ * NB: {@link #seek(long)} and {@link #getPos()} are currently not supported. + */ +public class InputStreamFSInputWrapper extends FSDataInputStream { + + private final InputStream inStream; + + private long pos = 0; + + public InputStreamFSInputWrapper(InputStream inStream) { + this.inStream = inStream; + } + + @Override + public void seek(long desired) throws IOException { + if (desired < this.pos) { + throw new IllegalArgumentException("Wrapped InputStream: cannot search backwards."); + } else if (desired == pos) { + return; + } + + this.inStream.skip(desired - pos); + this.pos = desired; + } + + @Override + public long getPos() throws IOException { + return this.pos; + } + + @Override + public int read() throws IOException { + int read = inStream.read(); + if (read != -1) { + this.pos++; + } + return read; + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + int numReadBytes = inStream.read(b, off, len); + this.pos += numReadBytes; + return numReadBytes; + } + + @Override + public int read(byte[] b) throws IOException { + return read(b, 0, b.length); + } +} diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java index 58154e50cd5be..5ec4aa829cdf7 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java @@ -18,24 +18,20 @@ package org.apache.flink.api.common.io; -import java.io.BufferedOutputStream; -import java.io.BufferedWriter; -import java.io.File; -import java.io.FileOutputStream; -import java.io.FileWriter; -import java.io.IOException; +import java.io.*; import java.net.URI; import org.apache.flink.api.common.io.FileInputFormat.FileBaseStatistics; import org.apache.flink.api.common.io.statistics.BaseStatistics; import org.apache.flink.configuration.Configuration; +import org.apache.flink.core.fs.FSDataInputStream; import org.apache.flink.core.fs.FileInputSplit; import org.apache.flink.testutils.TestFileUtils; import org.apache.flink.types.IntValue; import org.junit.Assert; import org.junit.Test; -public class FileInputFormatTest { +public class FileInputFormatTest { @Test public void testGetStatisticsNonExistingFile() { @@ -339,6 +335,40 @@ public void testGetStatsIgnoredUnderscoreFiles() { Assert.fail(e.getMessage()); } } + + + @Test + public void testDecorateInputStream() throws IOException { + // create temporary file with 3 blocks + final File tempFile = File.createTempFile("input-stream-decoration-test", "tmp"); + tempFile.deleteOnExit(); + final int blockSize = 8; + final int numBlocks = 3; + FileOutputStream fileOutputStream = new FileOutputStream(tempFile); + for (int i = 0; i < blockSize * numBlocks; i++) { + fileOutputStream.write(new byte[]{1}); + } + fileOutputStream.close(); + + final Configuration config = new Configuration(); + + final FileInputFormat inputFormat = new MyDecoratedInputFormat(); + inputFormat.setFilePath(tempFile.toURI().toString()); + + inputFormat.configure(config); + + FileInputSplit[] inputSplits = inputFormat.createInputSplits(3); + + byte[] bytes = null; + for (FileInputSplit inputSplit : inputSplits) { + inputFormat.open(inputSplit); + while (!inputFormat.reachedEnd()) { + if ((bytes = inputFormat.nextRecord(bytes)) != null) { + Assert.assertArrayEquals(new byte[]{(byte) 0xFE}, bytes); + } + } + } + } // ------------------------------------------------------------------------ @@ -355,4 +385,49 @@ public IntValue nextRecord(IntValue record) throws IOException { return null; } } + + + private static final class MyDecoratedInputFormat extends FileInputFormat { + + private static final long serialVersionUID = 1L; + + @Override + public boolean reachedEnd() throws IOException { + return this.splitLength <= this.stream.getPos(); + } + + @Override + public byte[] nextRecord(byte[] reuse) throws IOException { + int read = this.stream.read(); + if (read == -1) throw new IllegalStateException(); + return new byte[] { (byte) read }; + } + + @Override + protected FSDataInputStream decorateInputStream(FSDataInputStream inputStream, FileInputSplit fileSplit) throws Throwable { + inputStream = super.decorateInputStream(inputStream, fileSplit); + return new InputStreamFSInputWrapper(new InvertedInputStream(inputStream)); + } + + } + + private static final class InvertedInputStream extends InputStream { + + private final InputStream originalStream; + + private InvertedInputStream(InputStream originalStream) { + this.originalStream = originalStream; + } + + @Override + public int read() throws IOException { + int read = this.originalStream.read(); + return read == -1 ? -1 : (~read & 0xFF); + } + + @Override + public int available() throws IOException { + return this.originalStream.available(); + } + } } From f9ea55fb08b095939bf5b699d6e136e7c0064512 Mon Sep 17 00:00:00 2001 From: Sebastian Kruse Date: Fri, 8 May 2015 14:45:54 +0200 Subject: [PATCH 2/4] [FLINK-1980] fix style issues * use tabs instead of spaces * avoid asterisk imports * include license header --- .../flink/api/common/io/FileInputFormat.java | 38 ++--- .../common/io/InputStreamFSInputWrapper.java | 108 +++++++------ .../api/common/io/FileInputFormatTest.java | 142 +++++++++--------- 3 files changed, 156 insertions(+), 132 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java b/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java index f4d5c355ae909..ff8a5cc47c3cb 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/FileInputFormat.java @@ -654,25 +654,25 @@ public void open(FileInputSplit fileSplit) throws IOException { } } - /** - * This method allows to wrap/decorate the raw {@link FSDataInputStream} for a certain file split, e.g., for decoding. - * When overriding this method, also consider adapting {@link FileInputFormat#testForUnsplittable} if your - * stream decoration renders the input file unsplittable. Also consider calling existing superclass implementations. - * - * @param inputStream is the input stream to decorated - * @param fileSplit is the file split for which the input stream shall be decorated - * @return the decorated input stream - * @throws Throwable if the decoration fails - * @see org.apache.flink.api.common.io.InputStreamFSInputWrapper - */ - protected FSDataInputStream decorateInputStream(FSDataInputStream inputStream, FileInputSplit fileSplit) throws Throwable { - // Wrap stream in a extracting (decompressing) stream if file ends with .deflate. - if (fileSplit.getPath().getName().endsWith(DEFLATE_SUFFIX)) { - return new InflaterInputStreamFSInputWrapper(stream); - } - - return inputStream; - } + /** + * This method allows to wrap/decorate the raw {@link FSDataInputStream} for a certain file split, e.g., for decoding. + * When overriding this method, also consider adapting {@link FileInputFormat#testForUnsplittable} if your + * stream decoration renders the input file unsplittable. Also consider calling existing superclass implementations. + * + * @param inputStream is the input stream to decorated + * @param fileSplit is the file split for which the input stream shall be decorated + * @return the decorated input stream + * @throws Throwable if the decoration fails + * @see org.apache.flink.api.common.io.InputStreamFSInputWrapper + */ + protected FSDataInputStream decorateInputStream(FSDataInputStream inputStream, FileInputSplit fileSplit) throws Throwable { + // Wrap stream in a extracting (decompressing) stream if file ends with .deflate. + if (fileSplit.getPath().getName().endsWith(DEFLATE_SUFFIX)) { + return new InflaterInputStreamFSInputWrapper(stream); + } + + return inputStream; + } /** * Closes the file input stream of the input format. diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/InputStreamFSInputWrapper.java b/flink-core/src/main/java/org/apache/flink/api/common/io/InputStreamFSInputWrapper.java index 3f6def47715cf..208a8731c985e 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/InputStreamFSInputWrapper.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/InputStreamFSInputWrapper.java @@ -1,3 +1,21 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + package org.apache.flink.api.common.io; import org.apache.flink.core.fs.FSDataInputStream; @@ -12,49 +30,49 @@ */ public class InputStreamFSInputWrapper extends FSDataInputStream { - private final InputStream inStream; - - private long pos = 0; - - public InputStreamFSInputWrapper(InputStream inStream) { - this.inStream = inStream; - } - - @Override - public void seek(long desired) throws IOException { - if (desired < this.pos) { - throw new IllegalArgumentException("Wrapped InputStream: cannot search backwards."); - } else if (desired == pos) { - return; - } - - this.inStream.skip(desired - pos); - this.pos = desired; - } - - @Override - public long getPos() throws IOException { - return this.pos; - } - - @Override - public int read() throws IOException { - int read = inStream.read(); - if (read != -1) { - this.pos++; - } - return read; - } - - @Override - public int read(byte[] b, int off, int len) throws IOException { - int numReadBytes = inStream.read(b, off, len); - this.pos += numReadBytes; - return numReadBytes; - } - - @Override - public int read(byte[] b) throws IOException { - return read(b, 0, b.length); - } + private final InputStream inStream; + + private long pos = 0; + + public InputStreamFSInputWrapper(InputStream inStream) { + this.inStream = inStream; + } + + @Override + public void seek(long desired) throws IOException { + if (desired < this.pos) { + throw new IllegalArgumentException("Wrapped InputStream: cannot search backwards."); + } else if (desired == pos) { + return; + } + + this.inStream.skip(desired - pos); + this.pos = desired; + } + + @Override + public long getPos() throws IOException { + return this.pos; + } + + @Override + public int read() throws IOException { + int read = inStream.read(); + if (read != -1) { + this.pos++; + } + return read; + } + + @Override + public int read(byte[] b, int off, int len) throws IOException { + int numReadBytes = inStream.read(b, off, len); + this.pos += numReadBytes; + return numReadBytes; + } + + @Override + public int read(byte[] b) throws IOException { + return read(b, 0, b.length); + } } diff --git a/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java b/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java index 5ec4aa829cdf7..7e3edc0a36a7f 100644 --- a/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java +++ b/flink-core/src/test/java/org/apache/flink/api/common/io/FileInputFormatTest.java @@ -18,9 +18,6 @@ package org.apache.flink.api.common.io; -import java.io.*; -import java.net.URI; - import org.apache.flink.api.common.io.FileInputFormat.FileBaseStatistics; import org.apache.flink.api.common.io.statistics.BaseStatistics; import org.apache.flink.configuration.Configuration; @@ -31,6 +28,15 @@ import org.junit.Assert; import org.junit.Test; +import java.io.BufferedOutputStream; +import java.io.BufferedWriter; +import java.io.File; +import java.io.FileOutputStream; +import java.io.FileWriter; +import java.io.IOException; +import java.io.InputStream; +import java.net.URI; + public class FileInputFormatTest { @Test @@ -337,38 +343,38 @@ public void testGetStatsIgnoredUnderscoreFiles() { } - @Test - public void testDecorateInputStream() throws IOException { - // create temporary file with 3 blocks - final File tempFile = File.createTempFile("input-stream-decoration-test", "tmp"); - tempFile.deleteOnExit(); - final int blockSize = 8; - final int numBlocks = 3; - FileOutputStream fileOutputStream = new FileOutputStream(tempFile); - for (int i = 0; i < blockSize * numBlocks; i++) { - fileOutputStream.write(new byte[]{1}); - } - fileOutputStream.close(); - - final Configuration config = new Configuration(); - - final FileInputFormat inputFormat = new MyDecoratedInputFormat(); - inputFormat.setFilePath(tempFile.toURI().toString()); - - inputFormat.configure(config); - - FileInputSplit[] inputSplits = inputFormat.createInputSplits(3); - - byte[] bytes = null; - for (FileInputSplit inputSplit : inputSplits) { - inputFormat.open(inputSplit); - while (!inputFormat.reachedEnd()) { - if ((bytes = inputFormat.nextRecord(bytes)) != null) { - Assert.assertArrayEquals(new byte[]{(byte) 0xFE}, bytes); - } - } - } - } + @Test + public void testDecorateInputStream() throws IOException { + // create temporary file with 3 blocks + final File tempFile = File.createTempFile("input-stream-decoration-test", "tmp"); + tempFile.deleteOnExit(); + final int blockSize = 8; + final int numBlocks = 3; + FileOutputStream fileOutputStream = new FileOutputStream(tempFile); + for (int i = 0; i < blockSize * numBlocks; i++) { + fileOutputStream.write(new byte[]{1}); + } + fileOutputStream.close(); + + final Configuration config = new Configuration(); + + final FileInputFormat inputFormat = new MyDecoratedInputFormat(); + inputFormat.setFilePath(tempFile.toURI().toString()); + + inputFormat.configure(config); + + FileInputSplit[] inputSplits = inputFormat.createInputSplits(3); + + byte[] bytes = null; + for (FileInputSplit inputSplit : inputSplits) { + inputFormat.open(inputSplit); + while (!inputFormat.reachedEnd()) { + if ((bytes = inputFormat.nextRecord(bytes)) != null) { + Assert.assertArrayEquals(new byte[]{(byte) 0xFE}, bytes); + } + } + } + } // ------------------------------------------------------------------------ @@ -387,47 +393,47 @@ public IntValue nextRecord(IntValue record) throws IOException { } - private static final class MyDecoratedInputFormat extends FileInputFormat { + private static final class MyDecoratedInputFormat extends FileInputFormat { - private static final long serialVersionUID = 1L; + private static final long serialVersionUID = 1L; - @Override - public boolean reachedEnd() throws IOException { - return this.splitLength <= this.stream.getPos(); - } + @Override + public boolean reachedEnd() throws IOException { + return this.splitLength <= this.stream.getPos(); + } - @Override - public byte[] nextRecord(byte[] reuse) throws IOException { - int read = this.stream.read(); - if (read == -1) throw new IllegalStateException(); - return new byte[] { (byte) read }; - } + @Override + public byte[] nextRecord(byte[] reuse) throws IOException { + int read = this.stream.read(); + if (read == -1) throw new IllegalStateException(); + return new byte[]{(byte) read}; + } - @Override - protected FSDataInputStream decorateInputStream(FSDataInputStream inputStream, FileInputSplit fileSplit) throws Throwable { - inputStream = super.decorateInputStream(inputStream, fileSplit); - return new InputStreamFSInputWrapper(new InvertedInputStream(inputStream)); - } + @Override + protected FSDataInputStream decorateInputStream(FSDataInputStream inputStream, FileInputSplit fileSplit) throws Throwable { + inputStream = super.decorateInputStream(inputStream, fileSplit); + return new InputStreamFSInputWrapper(new InvertedInputStream(inputStream)); + } - } + } - private static final class InvertedInputStream extends InputStream { + private static final class InvertedInputStream extends InputStream { - private final InputStream originalStream; + private final InputStream originalStream; - private InvertedInputStream(InputStream originalStream) { - this.originalStream = originalStream; - } + private InvertedInputStream(InputStream originalStream) { + this.originalStream = originalStream; + } - @Override - public int read() throws IOException { - int read = this.originalStream.read(); - return read == -1 ? -1 : (~read & 0xFF); - } + @Override + public int read() throws IOException { + int read = this.originalStream.read(); + return read == -1 ? -1 : (~read & 0xFF); + } - @Override - public int available() throws IOException { - return this.originalStream.available(); - } - } + @Override + public int available() throws IOException { + return this.originalStream.available(); + } + } } From a572861683944054f4be3102a72435c2b640856b Mon Sep 17 00:00:00 2001 From: Sebastian Kruse Date: Fri, 8 May 2015 14:49:17 +0200 Subject: [PATCH 3/4] [FLINK-1980] handle end of streams appropriately in InputStreamFSInputWrapper * do not alter the stream position if the stream is at the end --- .../apache/flink/api/common/io/InputStreamFSInputWrapper.java | 4 +++- 1 file changed, 3 insertions(+), 1 deletion(-) diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/InputStreamFSInputWrapper.java b/flink-core/src/main/java/org/apache/flink/api/common/io/InputStreamFSInputWrapper.java index 208a8731c985e..19347aca2e5d9 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/InputStreamFSInputWrapper.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/InputStreamFSInputWrapper.java @@ -67,7 +67,9 @@ public int read() throws IOException { @Override public int read(byte[] b, int off, int len) throws IOException { int numReadBytes = inStream.read(b, off, len); - this.pos += numReadBytes; + if (numReadBytes != -1) { + this.pos += numReadBytes; + } return numReadBytes; } From 84c8f0bae9b48a984c1421a5376001f4919f476d Mon Sep 17 00:00:00 2001 From: Sebastian Kruse Date: Tue, 12 May 2015 16:55:15 +0200 Subject: [PATCH 4/4] [FLINK-1980] respect return value of InputStream#skip() in seek(...) --- .../api/common/io/InputStreamFSInputWrapper.java | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/flink-core/src/main/java/org/apache/flink/api/common/io/InputStreamFSInputWrapper.java b/flink-core/src/main/java/org/apache/flink/api/common/io/InputStreamFSInputWrapper.java index 19347aca2e5d9..9cd7dbf14c97b 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/io/InputStreamFSInputWrapper.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/io/InputStreamFSInputWrapper.java @@ -20,6 +20,7 @@ import org.apache.flink.core.fs.FSDataInputStream; +import java.io.EOFException; import java.io.IOException; import java.io.InputStream; @@ -42,12 +43,15 @@ public InputStreamFSInputWrapper(InputStream inStream) { public void seek(long desired) throws IOException { if (desired < this.pos) { throw new IllegalArgumentException("Wrapped InputStream: cannot search backwards."); - } else if (desired == pos) { - return; } - this.inStream.skip(desired - pos); - this.pos = desired; + while (this.pos < desired) { + long numReadBytes = this.inStream.skip(desired - pos); + if (numReadBytes == -1) { + throw new EOFException("Unexpected EOF during forward seek."); + } + this.pos += numReadBytes; + } } @Override