Permalink
Browse files

previous commit was incomplete. This should be part of it.

  • Loading branch information...
1 parent febf96b commit f011c246878b3fc4f0b71e1c01052add3b3fca12 Raghu Angadi committed Jul 27, 2011
View
2 src/java/com/hadoop/compression/lzo/LzopCodec.java
@@ -69,7 +69,7 @@ public CompressionOutputStream createIndexedOutputStream(OutputStream out,
LzoCompressor.CompressionStrategy strategy = LzoCompressor.CompressionStrategy.valueOf(
getConf().get(LZO_COMPRESSOR_KEY, LzoCompressor.CompressionStrategy.LZO1X_1.name()));
int bufferSize = getConf().getInt(LZO_BUFFER_SIZE_KEY, DEFAULT_LZO_BUFFER_SIZE);
- return new LzopIndexedOutputStream(out, indexOut, compressor, bufferSize, strategy);
+ return new LzopOutputStream(out, indexOut, compressor, bufferSize, strategy);
}
@Override
View
48 src/java/com/hadoop/compression/lzo/LzopOutputStream.java
@@ -18,6 +18,8 @@
package com.hadoop.compression.lzo;
+import java.io.DataOutputStream;
+import java.io.FilterOutputStream;
import java.io.IOException;
import java.io.OutputStream;
import java.util.zip.Adler32;
@@ -26,13 +28,11 @@
import org.apache.hadoop.io.compress.CompressorStream;
import org.apache.hadoop.io.compress.Compressor;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
public class LzopOutputStream extends CompressorStream {
- private static final Log LOG = LogFactory.getLog(LzopOutputStream.class);
final int MAX_INPUT_SIZE;
+ protected DataOutputStream indexOut;
+ private CountingOutputStream cout;
/**
* Write an lzop-compatible header to the OutputStream provided.
@@ -76,16 +76,19 @@ protected static void writeLzopHeader(OutputStream out,
}
}
- public LzopOutputStream(OutputStream out, Compressor compressor,
- int bufferSize, LzoCompressor.CompressionStrategy strategy)
- throws IOException {
- super(out, compressor, bufferSize);
+ public LzopOutputStream(OutputStream out, DataOutputStream indexOut,
+ Compressor compressor, int bufferSize,
+ LzoCompressor.CompressionStrategy strategy)
+ throws IOException {
+ super(new CountingOutputStream(out), compressor, bufferSize);
+ this.cout = (CountingOutputStream) this.out;
+ this.indexOut = indexOut;
int overhead = strategy.name().contains("LZO1") ?
(bufferSize >> 4) + 64 + 3 : (bufferSize >> 3) + 128 + 3;
MAX_INPUT_SIZE = bufferSize - overhead;
- writeLzopHeader(out, strategy);
+ writeLzopHeader(this.out, strategy);
}
/**
@@ -97,6 +100,9 @@ public void close() throws IOException {
finish();
out.write(new byte[]{ 0, 0, 0, 0 });
out.close();
+ if (indexOut != null) {
+ indexOut.close();
+ }
closed = true;
}
}
@@ -171,6 +177,11 @@ public void finish() throws IOException {
protected void compress() throws IOException {
int len = compressor.compress(buffer, 0, buffer.length);
if (len > 0) {
+ // new lzo block. write current position to index file.
+ if (indexOut != null) {
+ indexOut.writeLong(cout.bytesWritten);
+ }
+
rawWriteInt((int)compressor.getBytesRead());
// If the compressed buffer is actually larger than the uncompressed buffer,
@@ -196,4 +207,23 @@ private void rawWriteInt(int v) throws IOException {
out.write((v >>> 8) & 0xFF);
out.write((v >>> 0) & 0xFF);
}
+
+ /* keeps count of number of bytes written. */
+ private static class CountingOutputStream extends FilterOutputStream {
+ public CountingOutputStream(OutputStream out) {
+ super(out);
+ }
+
+ long bytesWritten = 0;
+
+ public void write(byte[] b, int off, int len) throws IOException {
+ out.write(b, off, len);
+ bytesWritten += len;
+ }
+
+ public void write(int b) throws IOException {
+ out.write(b);
+ bytesWritten++;
+ }
+ }
}
View
29 src/test/com/hadoop/compression/lzo/TestLzopOutputStream.java
@@ -20,18 +20,21 @@
import java.io.BufferedWriter;
import java.io.BufferedReader;
+import java.io.DataOutputStream;
import java.io.File;
import java.io.FileOutputStream;
import java.io.FileInputStream;
import java.io.IOException;
-import java.io.OutputStreamWriter;
import java.io.InputStreamReader;
import java.security.NoSuchAlgorithmException;
import junit.framework.TestCase;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
/**
* Test the LzoOutputFormat, make sure that it can write files of different sizes and read them back in
@@ -47,11 +50,15 @@
private final String mediumFile = "1000.txt";
private final String smallFile = "100.txt";
private final String issue20File = "issue20-lzop.txt";
+ private FileSystem localFs;
@Override
protected void setUp() throws Exception {
super.setUp();
inputDataPath = System.getProperty("test.build.data", "data");
+ Configuration conf = new Configuration();
+ conf.set("io.compression.codecs", LzopCodec.class.getName());
+ localFs = FileSystem.getLocal(conf).getRaw();
}
/**
@@ -121,6 +128,7 @@ private void runTest(String filename) throws IOException,
File textFile = new File(inputDataPath, filename);
File lzoFile = new File(inputDataPath, filename + new LzopCodec().getDefaultExtension());
File lzoOutFile = new File(inputDataPath, "output_" + filename + new LzopCodec().getDefaultExtension());
+ File lzoIndexFile = new File(lzoOutFile.getAbsolutePath() + LzoIndex.LZO_INDEX_SUFFIX);
if (lzoOutFile.exists()) {
lzoOutFile.delete();
}
@@ -136,7 +144,9 @@ private void runTest(String filename) throws IOException,
int lzoBufferSize = 256 * 1024;
LzoCompressor.CompressionStrategy strategy = LzoCompressor.CompressionStrategy.LZO1X_1;
LzoCompressor lzoCompressor = new LzoCompressor(strategy, lzoBufferSize);
- LzopOutputStream lzoOut = new LzopOutputStream(new FileOutputStream(lzoOutFile.getAbsolutePath()), lzoCompressor, lzoBufferSize, strategy);
+ LzopOutputStream lzoOut = new LzopOutputStream(new FileOutputStream(lzoOutFile),
+ new DataOutputStream(new FileOutputStream(lzoIndexFile)),
+ lzoCompressor, lzoBufferSize, strategy);
// Now read line by line and stream out..
String textLine;
@@ -178,5 +188,20 @@ private void runTest(String filename) throws IOException,
lzoBr.close();
textBr2.close();
+
+ // verify the index file:
+
+ Path lzoOutPath = new Path(lzoOutFile.getAbsolutePath());
+ LzoIndex lzoIndex = LzoIndex.readIndex(localFs, lzoOutPath);
+
+ // create offline index to compare.
+ assertTrue(lzoIndexFile.delete());
+ LzoIndex.createIndex(localFs, lzoOutPath);
+ LzoIndex expectedIndex = LzoIndex.readIndex(localFs, lzoOutPath);
+
+ assertEquals(lzoIndex.getNumberOfBlocks(), expectedIndex.getNumberOfBlocks());
+ for (int i=0; i<lzoIndex.getNumberOfBlocks(); i++) {
+ assertEquals(lzoIndex.getPosition(i), expectedIndex.getPosition(i));
+ }
}
}

0 comments on commit f011c24

Please sign in to comment.