Permalink
Browse files

Fix repeated read from LZOP stream which saw EOF.

  • Loading branch information...
1 parent cb82529 commit f85980af865fad797199721ae2c9947c37314edb Shevek committed Aug 23, 2011
@@ -104,7 +104,7 @@ public int read(byte[] b, int off, int len) throws IOException {
return len;
}
- private void logState(String when) {
+ protected void logState(String when) {
LOG.info("\n");
LOG.info(when + " Input buffer size=" + inputBuffer.length);
LOG.info(when + " Output buffer pos=" + outputBufferPos + "; length=" + outputBufferLen + "; size=" + outputBuffer.length);
@@ -119,6 +119,7 @@ private boolean fill() throws IOException {
}
protected boolean readBlock() throws IOException {
+ // logState("Before readBlock");
int outputBufferLength = readInt(true);
if (outputBufferLength == -1)
return false;
@@ -62,6 +62,7 @@
private final CRC32 c_crc32_d;
private final Adler32 c_adler32_c;
private final Adler32 c_adler32_d;
+ private boolean eof;
public LzopInputStream(InputStream in) throws IOException {
super(in, new LzoDecompressor1x());
@@ -70,6 +71,7 @@ public LzopInputStream(InputStream in) throws IOException {
this.c_crc32_d = ((flags & LzopConstants.F_CRC32_D) == 0) ? null : new CRC32();
this.c_adler32_c = ((flags & LzopConstants.F_ADLER32_C) == 0) ? null : new Adler32();
this.c_adler32_d = ((flags & LzopConstants.F_ADLER32_D) == 0) ? null : new Adler32();
+ this.eof = false;
// logState();
}
@@ -95,12 +97,13 @@ public int getUncompressedChecksumCount() {
return out;
}
- private void logState() {
- LOG.info("Flags = " + Integer.toHexString(flags));
- LOG.info("CRC32C = " + c_crc32_c);
- LOG.info("CRC32D = " + c_crc32_d);
- LOG.info("Adler32C = " + c_adler32_c);
- LOG.info("Adler32D = " + c_adler32_d);
+ protected void logState(String when) {
+ super.logState(when);
+ LOG.info(when + " Flags = " + Integer.toHexString(flags));
+ // LOG.info(when + " CRC32C = " + c_crc32_c);
+ // LOG.info(when + " CRC32D = " + c_crc32_d);
+ // LOG.info(when + " Adler32C = " + c_adler32_c);
+ // LOG.info(when + " Adler32D = " + c_adler32_d);
}
/**
@@ -232,9 +235,15 @@ private void testChecksum(Checksum csum, int value, byte[] data, int off, int le
@Override
protected boolean readBlock() throws IOException {
+ // logState("Before readBlock");
+ if (eof)
+ return false;
int outputBufferLength = readInt(false);
- if (outputBufferLength == 0)
+ if (outputBufferLength == 0) {
+ // logState("After empty readBlock");
+ eof = true;
return false;
+ }
setOutputBufferSize(outputBufferLength);
int inputBufferLength = readInt(false);
setInputBufferSize(inputBufferLength);
@@ -247,6 +256,7 @@ protected boolean readBlock() throws IOException {
readBytes(outputBuffer, 0, outputBufferLength);
testChecksum(c_adler32_d, v_adler32_d, outputBuffer, 0, outputBufferLength);
testChecksum(c_crc32_d, v_crc32_d, outputBuffer, 0, outputBufferLength);
+ // logState("After uncompressed readBlock");
return true;
}
int v_adler32_c = readChecksum(c_adler32_c);
@@ -257,6 +267,7 @@ protected boolean readBlock() throws IOException {
decompress(outputBufferLength, inputBufferLength);
testChecksum(c_adler32_d, v_adler32_d, outputBuffer, 0, outputBufferLength);
testChecksum(c_crc32_d, v_crc32_d, outputBuffer, 0, outputBufferLength);
+ // logState("After compressed readBlock");
return true;
}
}
@@ -0,0 +1,71 @@
+/*
+ * To change this template, choose Tools | Templates
+ * and open the template in the editor.
+ */
+package org.anarres.lzo;
+
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileInputStream;
+import org.anarres.lzo.hadoop.codec.LzopInputStream;
+import org.apache.commons.io.IOUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Text;
+import org.apache.hadoop.mapred.LineRecordReader;
+import org.junit.Test;
+
+/**
+ *
+ * @author shevek
+ */
+public class HadoopLzopFileTest {
+
+ private static final Log LOG = LogFactory.getLog(BlockCompressorStreamTest.class);
+
+ @Test
+ public void testInputFile() throws Exception {
+ try {
+ File dir = LzopFileTest.getDataDirectory();
+ File file = new File(dir, "input.txt.lzo");
+ FileInputStream fi = new FileInputStream(file);
+ LzopInputStream ci = new LzopInputStream(fi);
+ ByteArrayOutputStream bo = new ByteArrayOutputStream((int) (file.length() * 2));
+ bo.reset();
+ long start = System.currentTimeMillis();
+ IOUtils.copy(ci, bo);
+ long end = System.currentTimeMillis();
+ LOG.info("Uncompression took " + ((end - start) / 1000d) + " ms");
+ LOG.info("Uncompressed data is " + bo.size() + " bytes.");
+ LOG.info("Uncompressed data is\n" + bo);
+ } finally {
+ System.out.flush();
+ System.err.flush();
+ Thread.sleep(100);
+ }
+ }
+
+ @Test
+ public void testLineReader() throws Exception {
+ try {
+ File dir = LzopFileTest.getDataDirectory();
+ File file = new File(dir, "input.txt.lzo");
+ FileInputStream fi = new FileInputStream(file);
+ LzopInputStream ci = new LzopInputStream(fi);
+ LineRecordReader reader = new LineRecordReader(ci, 0L, 9999L, 9999);
+ LongWritable key = new LongWritable();
+ Text value = new Text();
+ while (reader.next(key, value)) {
+ LOG.info("key=" + key + "; value=" + value);
+ }
+ } catch (Exception e) {
+ LOG.info("Test failed", e);
+ throw e;
+ } finally {
+ System.out.flush();
+ System.err.flush();
+ Thread.sleep(100);
+ }
+ }
+}
@@ -16,7 +16,7 @@
*/
public class LzopFileTest {
- private File getDataDirectory() {
+ public static File getDataDirectory() {
String path = System.getProperty("test.data.dir");
File file = new File(path == null ? "test/data" : path);
assertTrue(file + " is a directory.", file.isDirectory());

0 comments on commit f85980a

Please sign in to comment.