From 3dbf0775b65823d544d09262f11e4544576b06b6 Mon Sep 17 00:00:00 2001 From: jhkim Date: Fri, 5 Dec 2014 19:19:14 +0900 Subject: [PATCH 1/3] TAJO-1235: ByteBufLineReader can not read text line with CRLF --- .../tajo/storage/ByteBufInputChannel.java | 4 -- .../tajo/storage/text/ByteBufLineReader.java | 60 +++++++++++-------- .../storage/text/DelimitedLineReader.java | 7 +-- .../apache/tajo/storage/TestLineReader.java | 43 ++++++++++--- 4 files changed, 72 insertions(+), 42 deletions(-) diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/ByteBufInputChannel.java b/tajo-storage/src/main/java/org/apache/tajo/storage/ByteBufInputChannel.java index b1b6d65b17..45fb1d8865 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/ByteBufInputChannel.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/ByteBufInputChannel.java @@ -69,8 +69,4 @@ public int read(ByteBuffer dst) throws IOException { protected void implCloseChannel() throws IOException { IOUtils.cleanup(null, channel, inputStream); } - - public int available() throws IOException { - return inputStream.available(); - } } diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/text/ByteBufLineReader.java b/tajo-storage/src/main/java/org/apache/tajo/storage/text/ByteBufLineReader.java index 86319e16fc..d2becb8115 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/text/ByteBufLineReader.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/text/ByteBufLineReader.java @@ -32,10 +32,11 @@ public class ByteBufLineReader implements Closeable { private int bufferSize; private long readBytes; + private int startIndex; private boolean eof = false; private ByteBuf buffer; private final ByteBufInputChannel channel; - private final AtomicInteger tempReadBytes = new AtomicInteger(); + private final AtomicInteger lineReadBytes = new AtomicInteger(); private final LineSplitProcessor processor = new LineSplitProcessor(); public ByteBufLineReader(ByteBufInputChannel channel) { @@ -53,10 +54,6 @@ public long readBytes() { return readBytes - buffer.readableBytes(); } - public long available() throws IOException { - return channel.available() + buffer.readableBytes(); - } - @Override public void close() throws IOException { if (this.buffer.refCnt() > 0) { @@ -66,7 +63,7 @@ public void close() throws IOException { } public String readLine() throws IOException { - ByteBuf buf = readLineBuf(tempReadBytes); + ByteBuf buf = readLineBuf(lineReadBytes); if (buf != null) { return buf.toString(CharsetUtil.UTF_8); } @@ -77,23 +74,25 @@ private void fillBuffer() throws IOException { int tailBytes = 0; if (this.readBytes > 0) { + //startIndex = 0, readIndex = tailBytes length, writable = (buffer capacity - tailBytes) this.buffer.markReaderIndex(); - this.buffer.discardSomeReadBytes(); // compact the buffer + this.buffer.discardReadBytes(); // compact the buffer tailBytes = this.buffer.writerIndex(); if (!this.buffer.isWritable()) { // a line bytes is large than the buffer - BufferPool.ensureWritable(buffer, bufferSize); + BufferPool.ensureWritable(buffer, bufferSize * 2); this.bufferSize = buffer.capacity(); } + this.startIndex = 0; } boolean release = true; try { int readBytes = tailBytes; for (; ; ) { - int localReadBytes = buffer.writeBytes(channel, bufferSize - readBytes); + int localReadBytes = buffer.writeBytes(channel, this.bufferSize - readBytes); if (localReadBytes < 0) { - if (tailBytes == readBytes) { + if (buffer.isWritable()) { // no more bytes are in the channel eof = true; } @@ -106,9 +105,8 @@ private void fillBuffer() throws IOException { } this.readBytes += (readBytes - tailBytes); release = false; - if (!eof) { - this.buffer.readerIndex(this.buffer.readerIndex() + tailBytes); //skip past buffer (tail) - } + + this.buffer.readerIndex(this.buffer.readerIndex() + tailBytes); //skip past buffer (tail) } finally { if (release) { buffer.release(); @@ -120,24 +118,34 @@ private void fillBuffer() throws IOException { * Read a line terminated by one of CR, LF, or CRLF. */ public ByteBuf readLineBuf(AtomicInteger reads) throws IOException { - if(eof) return null; - - int startIndex = buffer.readerIndex(); - int readBytes; + int readBytes = 0; + int newlineLength = 0; //length of terminating newline int readable; - int newlineLength; //length of terminating newline + + this.startIndex = buffer.readerIndex(); loop: while (true) { readable = buffer.readableBytes(); if (readable <= 0) { - buffer.readerIndex(startIndex); + buffer.readerIndex(this.startIndex); fillBuffer(); //compact and fill buffer if (!buffer.isReadable()) { + reads.set(0); return null; } else { - if (!eof) startIndex = 0; // reset the line start position - else startIndex = buffer.readerIndex(); + //skip first newLine + if (processor.isPrevCharCR() && buffer.getByte(buffer.readerIndex()) == LineSplitProcessor.LF) { + buffer.skipBytes(1); + if(eof && !buffer.isReadable()) { + reads.set(1); + return null; + } + + newlineLength++; + readBytes++; + startIndex = buffer.readerIndex(); + } } readable = buffer.readableBytes(); } @@ -147,19 +155,19 @@ public ByteBuf readLineBuf(AtomicInteger reads) throws IOException { //does not appeared terminating newline buffer.readerIndex(buffer.writerIndex()); // set to end buffer if(eof){ - readBytes = buffer.readerIndex() - startIndex; - newlineLength = 0; + readBytes += readable; break loop; } } else { buffer.readerIndex(endIndex + 1); - readBytes = buffer.readerIndex() - startIndex; + readBytes += (buffer.readerIndex() - startIndex); if (processor.isPrevCharCR() && buffer.isReadable() && buffer.getByte(buffer.readerIndex()) == LineSplitProcessor.LF) { buffer.skipBytes(1); - newlineLength = 2; + readBytes++; + newlineLength += 2; } else { - newlineLength = 1; + newlineLength += 1; } break loop; } diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java b/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java index eb1929ecef..0efe03047f 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/text/DelimitedLineReader.java @@ -57,7 +57,7 @@ public class DelimitedLineReader implements Closeable { private long startOffset, end, pos; private boolean eof = true; private ByteBufLineReader lineReader; - private AtomicInteger tempReadBytes = new AtomicInteger(); + private AtomicInteger lineReadBytes = new AtomicInteger(); private FileFragment fragment; private Configuration conf; @@ -122,11 +122,10 @@ public ByteBuf readLine() throws IOException { return null; } - ByteBuf buf = lineReader.readLineBuf(tempReadBytes); + ByteBuf buf = lineReader.readLineBuf(lineReadBytes); + pos += lineReadBytes.get(); if (buf == null) { eof = true; - } else { - pos += tempReadBytes.get(); } if (!isCompressed() && getCompressedPosition() > end) { diff --git a/tajo-storage/src/test/java/org/apache/tajo/storage/TestLineReader.java b/tajo-storage/src/test/java/org/apache/tajo/storage/TestLineReader.java index 4512d00a6d..bfaba043f9 100644 --- a/tajo-storage/src/test/java/org/apache/tajo/storage/TestLineReader.java +++ b/tajo-storage/src/test/java/org/apache/tajo/storage/TestLineReader.java @@ -19,6 +19,7 @@ package org.apache.tajo.storage; import io.netty.buffer.ByteBuf; +import org.apache.hadoop.fs.FSDataOutputStream; import org.apache.hadoop.fs.FileStatus; import org.apache.hadoop.fs.FileSystem; import org.apache.hadoop.fs.Path; @@ -43,6 +44,7 @@ import java.io.File; import java.io.FileInputStream; import java.io.IOException; +import java.nio.charset.Charset; import java.util.concurrent.atomic.AtomicInteger; import static org.junit.Assert.*; @@ -84,18 +86,15 @@ public void testByteBufLineReader() throws IOException { FileStatus status = fs.getFileStatus(tablePath); ByteBufInputChannel channel = new ByteBufInputChannel(fs.open(tablePath)); - assertEquals(status.getLen(), channel.available()); ByteBufLineReader reader = new ByteBufLineReader(channel); - assertEquals(status.getLen(), reader.available()); long totalRead = 0; int i = 0; AtomicInteger bytes = new AtomicInteger(); for(;;){ ByteBuf buf = reader.readLineBuf(bytes); - if(buf == null) break; - totalRead += bytes.get(); + if(buf == null) break; i++; } IOUtils.cleanup(null, reader, channel, fs); @@ -171,18 +170,15 @@ public void testByteBufLineReaderWithoutTerminating() throws IOException { String data = FileUtil.readTextFile(file); ByteBufInputChannel channel = new ByteBufInputChannel(new FileInputStream(file)); - - assertEquals(file.length(), channel.available()); ByteBufLineReader reader = new ByteBufLineReader(channel); - assertEquals(file.length(), reader.available()); long totalRead = 0; int i = 0; AtomicInteger bytes = new AtomicInteger(); for(;;){ ByteBuf buf = reader.readLineBuf(bytes); - if(buf == null) break; totalRead += bytes.get(); + if(buf == null) break; i++; } IOUtils.cleanup(null, reader); @@ -190,4 +186,35 @@ public void testByteBufLineReaderWithoutTerminating() throws IOException { assertEquals(file.length(), reader.readBytes()); assertEquals(data.split("\n").length, i); } + + @Test + public void testCRLFLine() throws IOException { + TajoConf conf = new TajoConf(); + Path testFile = new Path(CommonTestingUtil.getTestDir(TEST_PATH), "testCRLFLineText.txt"); + + FileSystem fs = testFile.getFileSystem(conf); + FSDataOutputStream outputStream = fs.create(testFile, true); + outputStream.write("0\r\n1\r\n".getBytes()); + outputStream.flush(); + IOUtils.closeStream(outputStream); + + ByteBufInputChannel channel = new ByteBufInputChannel(fs.open(testFile)); + ByteBufLineReader reader = new ByteBufLineReader(channel, BufferPool.directBuffer(2)); + FileStatus status = fs.getFileStatus(testFile); + + long totalRead = 0; + int i = 0; + AtomicInteger bytes = new AtomicInteger(); + for(;;){ + ByteBuf buf = reader.readLineBuf(bytes); + totalRead += bytes.get(); + if(buf == null) break; + String row = buf.toString(Charset.defaultCharset()); + assertEquals(i, Integer.parseInt(row)); + i++; + } + IOUtils.cleanup(null, reader); + assertEquals(status.getLen(), totalRead); + assertEquals(status.getLen(), reader.readBytes()); + } } From 63c3e575001d82b38d70930b56205cba0889d50d Mon Sep 17 00:00:00 2001 From: jhkim Date: Mon, 8 Dec 2014 19:30:27 +0900 Subject: [PATCH 2/3] fix the bug (file length == buffer size) --- .../java/org/apache/tajo/storage/text/ByteBufLineReader.java | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/text/ByteBufLineReader.java b/tajo-storage/src/main/java/org/apache/tajo/storage/text/ByteBufLineReader.java index d2becb8115..959d5d2152 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/text/ByteBufLineReader.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/text/ByteBufLineReader.java @@ -130,7 +130,7 @@ public ByteBuf readLineBuf(AtomicInteger reads) throws IOException { if (readable <= 0) { buffer.readerIndex(this.startIndex); fillBuffer(); //compact and fill buffer - if (!buffer.isReadable()) { + if (!buffer.isReadable() && buffer.writerIndex() == 0) { reads.set(0); return null; } else { @@ -155,7 +155,7 @@ public ByteBuf readLineBuf(AtomicInteger reads) throws IOException { //does not appeared terminating newline buffer.readerIndex(buffer.writerIndex()); // set to end buffer if(eof){ - readBytes += readable; + readBytes += buffer.readerIndex() - startIndex; break loop; } } else { From 6c89470a718b55f7723866fb0db0790266f9ab34 Mon Sep 17 00:00:00 2001 From: jhkim Date: Tue, 9 Dec 2014 11:30:29 +0900 Subject: [PATCH 3/3] add some comments for hard logic --- .../apache/tajo/storage/text/ByteBufLineReader.java | 12 ++++++++---- 1 file changed, 8 insertions(+), 4 deletions(-) diff --git a/tajo-storage/src/main/java/org/apache/tajo/storage/text/ByteBufLineReader.java b/tajo-storage/src/main/java/org/apache/tajo/storage/text/ByteBufLineReader.java index 959d5d2152..2f742c64c2 100644 --- a/tajo-storage/src/main/java/org/apache/tajo/storage/text/ByteBufLineReader.java +++ b/tajo-storage/src/main/java/org/apache/tajo/storage/text/ByteBufLineReader.java @@ -93,7 +93,7 @@ private void fillBuffer() throws IOException { int localReadBytes = buffer.writeBytes(channel, this.bufferSize - readBytes); if (localReadBytes < 0) { if (buffer.isWritable()) { - // no more bytes are in the channel + //if read bytes is less than the buffer capacity, there is no more bytes in the channel eof = true; } break; @@ -118,7 +118,7 @@ private void fillBuffer() throws IOException { * Read a line terminated by one of CR, LF, or CRLF. */ public ByteBuf readLineBuf(AtomicInteger reads) throws IOException { - int readBytes = 0; + int readBytes = 0; // newline + text line bytes int newlineLength = 0; //length of terminating newline int readable; @@ -130,6 +130,8 @@ public ByteBuf readLineBuf(AtomicInteger reads) throws IOException { if (readable <= 0) { buffer.readerIndex(this.startIndex); fillBuffer(); //compact and fill buffer + + //if buffer.writerIndex() is zero, there is no bytes in buffer if (!buffer.isReadable() && buffer.writerIndex() == 0) { reads.set(0); return null; @@ -155,12 +157,14 @@ public ByteBuf readLineBuf(AtomicInteger reads) throws IOException { //does not appeared terminating newline buffer.readerIndex(buffer.writerIndex()); // set to end buffer if(eof){ - readBytes += buffer.readerIndex() - startIndex; + readBytes += (buffer.readerIndex() - startIndex); break loop; } } else { buffer.readerIndex(endIndex + 1); - readBytes += (buffer.readerIndex() - startIndex); + readBytes += (buffer.readerIndex() - startIndex); //past newline + text line + + //appeared terminating CRLF if (processor.isPrevCharCR() && buffer.isReadable() && buffer.getByte(buffer.readerIndex()) == LineSplitProcessor.LF) { buffer.skipBytes(1);