Skip to content
This repository

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse code

Merge remote branch 'kw/master'

  • Loading branch information...
commit ceb643f15057b0b5c2664233db33062bf49cb8c6 2 parents 47d4714 + 3d19b14
Todd Lipcon toddlipcon authored
1  .archive-version
... ... @@ -0,0 +1 @@
  1 +$Format:%H$
1  .gitattributes
... ... @@ -0,0 +1 @@
  1 +.archive-version export-subst
2  .gitignore
@@ -15,11 +15,13 @@
15 15
16 16 *~
17 17 .classpath
  18 +.idea
18 19 .project
19 20 .settings
20 21 .svn
21 22 build/
22 23 bin/
  24 +out/
23 25 *.ipr
24 26 *.iml
25 27 *.iws
2  README.md
Source Rendered
@@ -5,7 +5,7 @@ Hadoop-LZO is a project to bring splittable LZO compression to Hadoop. LZO is a
5 5
6 6 ### Origins
7 7
8   -This project builds off the great work done at [code.google.com/p/hadoop-gpl-compression](code.google.com/p/hadoop-gpl-compression). As of issue 41, the differences in this codebase are the following.
  8 +This project builds off the great work done at [http://code.google.com/p/hadoop-gpl-compression](http://code.google.com/p/hadoop-gpl-compression). As of issue 41, the differences in this codebase are the following.
9 9
10 10 - it fixes a few bugs in hadoop-gpl-compression -- notably, it allows the decompressor to read small or uncompressable lzo files, and also fixes the compressor to follow the lzo standard when compressing small or uncompressible chunks. it also fixes a number of inconsistenly caught and thrown exception cases that can occur when the lzo writer gets killed mid-stream, plus some other smaller issues (see commit log).
11 11 - it adds the ability to work with Hadoop streaming via the com.apache.hadoop.mapred.DeprecatedLzoTextInputFormat class
2  build.xml
@@ -28,7 +28,7 @@
28 28
29 29 <property name="Name" value="Hadoop GPL Compression"/>
30 30 <property name="name" value="hadoop-lzo"/>
31   - <property name="version" value="0.4.10"/>
  31 + <property name="version" value="0.4.13"/>
32 32 <property name="final.name" value="${name}-${version}"/>
33 33 <property name="year" value="2008"/>
34 34
28 src/get_build_revision.sh
... ... @@ -1,7 +1,27 @@
1   -#!/bin/sh
  1 +#!/bin/bash
2 2
3   -if [ -z "$BUILD_REVISION" ]; then
4   - git rev-parse HEAD
5   -else
  3 +# Allow user to specify - this is done by packages
  4 +if [ -n "$BUILD_REVISION" ]; then
6 5 echo $BUILD_REVISION
  6 + exit
7 7 fi
  8 +
  9 +# If we're in git, use that
  10 +BUILD_REVISION=$(git rev-parse HEAD 2>/dev/null)
  11 +if [ -n "$BUILD_REVISION" ]; then
  12 + echo $BUILD_REVISION
  13 + exit
  14 +fi
  15 +
  16 +# Otherwise try to use the .archive-version file which
  17 +# is filled in by git exports (eg github downloads)
  18 +BIN=$(dirname ${BASH_SOURCE:-0})
  19 +BUILD_REVISION=$(cat $BIN/../.archive-version 2>/dev/null)
  20 +
  21 +if [[ "$BUILD_REVISION" != *Format* ]]; then
  22 + echo "$BUILD_REVISION"
  23 + exit
  24 +fi
  25 +
  26 +# Give up
  27 +echo "Unknown build revision"
31 src/java/com/hadoop/compression/lzo/LzoCodec.java
@@ -46,17 +46,21 @@
46 46
47 47 public static final String LZO_COMPRESSOR_KEY = "io.compression.codec.lzo.compressor";
48 48 public static final String LZO_DECOMPRESSOR_KEY = "io.compression.codec.lzo.decompressor";
  49 + public static final String LZO_COMPRESSION_LEVEL_KEY = "io.compression.codec.lzo.compression.level";
49 50 public static final String LZO_BUFFER_SIZE_KEY = "io.compression.codec.lzo.buffersize";
50 51 public static final int DEFAULT_LZO_BUFFER_SIZE = 256 * 1024;
51 52 public static final int MAX_BLOCK_SIZE = 64*1024*1024;
  53 + public static final int UNDEFINED_COMPRESSION_LEVEL = -999; // Constant from LzoCompressor.c
52 54
53 55
54 56 private Configuration conf;
55 57
  58 + @Override
56 59 public void setConf(Configuration conf) {
57 60 this.conf = conf;
58 61 }
59 62
  63 + @Override
60 64 public Configuration getConf() {
61 65 return conf;
62 66 }
@@ -86,6 +90,7 @@ public Configuration getConf() {
86 90 * else <code>false</code>
87 91 */
88 92 public static boolean isNativeLzoLoaded(Configuration conf) {
  93 + assert conf != null : "Configuration cannot be null!";
89 94 return nativeLzoLoaded && conf.getBoolean("hadoop.native.lib", true);
90 95 }
91 96
@@ -100,10 +105,12 @@ public static String getRevisionHash() {
100 105 }
101 106 }
102 107
  108 + @Override
103 109 public CompressionOutputStream createOutputStream(OutputStream out) throws IOException {
104 110 return createOutputStream(out, createCompressor());
105 111 }
106 112
  113 + @Override
107 114 public CompressionOutputStream createOutputStream(OutputStream out,
108 115 Compressor compressor) throws IOException {
109 116 // Ensure native-lzo library is loaded & initialized
@@ -141,6 +148,7 @@ public CompressionOutputStream createOutputStream(OutputStream out,
141 148 compressionOverhead);
142 149 }
143 150
  151 + @Override
144 152 public Class<? extends Compressor> getCompressorType() {
145 153 // Ensure native-lzo library is loaded & initialized
146 154 if (!isNativeLzoLoaded(conf)) {
@@ -149,8 +157,10 @@ public CompressionOutputStream createOutputStream(OutputStream out,
149 157 return LzoCompressor.class;
150 158 }
151 159
  160 + @Override
152 161 public Compressor createCompressor() {
153 162 // Ensure native-lzo library is loaded & initialized
  163 + assert conf != null : "Configuration cannot be null! You must call setConf() before creating a compressor.";
154 164 if (!isNativeLzoLoaded(conf)) {
155 165 throw new RuntimeException("native-lzo library not available");
156 166 }
@@ -158,11 +168,13 @@ public Compressor createCompressor() {
158 168 return new LzoCompressor(conf);
159 169 }
160 170
  171 + @Override
161 172 public CompressionInputStream createInputStream(InputStream in)
162 173 throws IOException {
163 174 return createInputStream(in, createDecompressor());
164 175 }
165 176
  177 + @Override
166 178 public CompressionInputStream createInputStream(InputStream in,
167 179 Decompressor decompressor)
168 180 throws IOException {
@@ -174,6 +186,7 @@ public CompressionInputStream createInputStream(InputStream in,
174 186 conf.getInt(LZO_BUFFER_SIZE_KEY, DEFAULT_LZO_BUFFER_SIZE));
175 187 }
176 188
  189 + @Override
177 190 public Class<? extends Decompressor> getDecompressorType() {
178 191 // Ensure native-lzo library is loaded & initialized
179 192 if (!isNativeLzoLoaded(conf)) {
@@ -182,6 +195,7 @@ public CompressionInputStream createInputStream(InputStream in,
182 195 return LzoDecompressor.class;
183 196 }
184 197
  198 + @Override
185 199 public Decompressor createDecompressor() {
186 200 // Ensure native-lzo library is loaded & initialized
187 201 if (!isNativeLzoLoaded(conf)) {
@@ -197,37 +211,54 @@ public Decompressor createDecompressor() {
197 211 * Get the default filename extension for this kind of compression.
198 212 * @return the extension including the '.'
199 213 */
  214 + @Override
200 215 public String getDefaultExtension() {
201 216 return ".lzo_deflate";
202 217 }
203 218
204 219 static LzoCompressor.CompressionStrategy getCompressionStrategy(Configuration conf) {
  220 + assert conf != null : "Configuration cannot be null!";
205 221 return LzoCompressor.CompressionStrategy.valueOf(
206 222 conf.get(LZO_COMPRESSOR_KEY,
207 223 LzoCompressor.CompressionStrategy.LZO1X_1.name()));
208 224 }
209 225
210 226 static LzoDecompressor.CompressionStrategy getDecompressionStrategy(Configuration conf) {
  227 + assert conf != null : "Configuration cannot be null!";
211 228 return LzoDecompressor.CompressionStrategy.valueOf(
212 229 conf.get(LZO_DECOMPRESSOR_KEY,
213 230 LzoDecompressor.CompressionStrategy.LZO1X.name()));
214 231 }
215 232
  233 + static int getCompressionLevel(Configuration conf) {
  234 + assert conf != null : "Configuration cannot be null!";
  235 + return conf.getInt(LZO_COMPRESSION_LEVEL_KEY, UNDEFINED_COMPRESSION_LEVEL);
  236 + }
  237 +
216 238 static int getBufferSize(Configuration conf) {
  239 + assert conf != null : "Configuration cannot be null!";
217 240 return conf.getInt(LZO_BUFFER_SIZE_KEY, DEFAULT_LZO_BUFFER_SIZE);
218 241 }
219 242
220 243 public static void setCompressionStrategy(Configuration conf,
221 244 LzoCompressor.CompressionStrategy strategy) {
  245 + assert conf != null : "Configuration cannot be null!";
222 246 conf.set(LZO_COMPRESSOR_KEY, strategy.name());
223 247 }
224 248
225 249 public static void setDecompressionStrategy(Configuration conf,
226 250 LzoDecompressor.CompressionStrategy strategy) {
  251 + assert conf != null : "Configuration cannot be null!";
227 252 conf.set(LZO_DECOMPRESSOR_KEY, strategy.name());
228 253 }
229 254
  255 + public static void setCompressionLevel(Configuration conf, int compressionLevel) {
  256 + assert conf != null : "Configuration cannot be null!";
  257 + conf.setInt(LZO_COMPRESSION_LEVEL_KEY, compressionLevel);
  258 + }
  259 +
230 260 public static void setBufferSize(Configuration conf, int bufferSize) {
  261 + assert conf != null : "Configuration cannot be null!";
231 262 conf.setInt(LZO_BUFFER_SIZE_KEY, bufferSize);
232 263 }
233 264
9 src/java/com/hadoop/compression/lzo/LzoCompressor.java
@@ -57,6 +57,7 @@
57 57 private int workingMemoryBufLen = 0; // The length of 'working memory' buf.
58 58 @SuppressWarnings("unused")
59 59 private ByteBuffer workingMemoryBuf; // The 'working memory' for lzo.
  60 + private int lzoCompressionLevel;
60 61
61 62 /**
62 63 * Used when the user doesn't specify a configuration. We cache a single
@@ -209,9 +210,10 @@ public void reinit(Configuration conf) {
209 210 conf = defaultConfiguration;
210 211 }
211 212 LzoCompressor.CompressionStrategy strategy = LzoCodec.getCompressionStrategy(conf);
  213 + int compressionLevel = LzoCodec.getCompressionLevel(conf);
212 214 int bufferSize = LzoCodec.getBufferSize(conf);
213 215
214   - init(strategy, bufferSize);
  216 + init(strategy, compressionLevel, bufferSize);
215 217 }
216 218
217 219 /**
@@ -221,7 +223,7 @@ public void reinit(Configuration conf) {
221 223 * @param directBufferSize size of the direct buffer to be used.
222 224 */
223 225 public LzoCompressor(CompressionStrategy strategy, int directBufferSize) {
224   - init(strategy, directBufferSize);
  226 + init(strategy, LzoCodec.UNDEFINED_COMPRESSION_LEVEL, directBufferSize);
225 227 }
226 228
227 229 /**
@@ -254,8 +256,9 @@ private ByteBuffer realloc(ByteBuffer buf, int newSize) {
254 256 return ByteBuffer.allocateDirect(newSize);
255 257 }
256 258
257   - private void init(CompressionStrategy strategy, int directBufferSize) {
  259 + private void init(CompressionStrategy strategy, int compressionLevel, int directBufferSize) {
258 260 this.strategy = strategy;
  261 + this.lzoCompressionLevel = compressionLevel;
259 262 this.directBufferSize = directBufferSize;
260 263
261 264 uncompressedDirectBuf = realloc(uncompressedDirectBuf, directBufferSize);
75 src/java/com/hadoop/compression/lzo/LzoInputFormatCommon.java
... ... @@ -0,0 +1,75 @@
  1 +/*
  2 + * This file is part of Hadoop-Gpl-Compression.
  3 + *
  4 + * Hadoop-Gpl-Compression is free software: you can redistribute it
  5 + * and/or modify it under the terms of the GNU General Public License
  6 + * as published by the Free Software Foundation, either version 3 of
  7 + * the License, or (at your option) any later version.
  8 + *
  9 + * Hadoop-Gpl-Compression is distributed in the hope that it will be
  10 + * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
  11 + * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
  12 + * GNU General Public License for more details.
  13 + *
  14 + * You should have received a copy of the GNU General Public License
  15 + * along with Hadoop-Gpl-Compression. If not, see
  16 + * <http://www.gnu.org/licenses/>.
  17 + */
  18 +
  19 +package com.hadoop.compression.lzo;
  20 +
  21 +import org.apache.hadoop.conf.Configuration;
  22 +
  23 +import com.hadoop.compression.lzo.LzoIndexer;
  24 +import com.hadoop.compression.lzo.LzopCodec;
  25 +
  26 +public class LzoInputFormatCommon {
  27 + /**
  28 + * The boolean property <code>lzo.text.input.format.ignore.nonlzo</code> tells
  29 + * the LZO text input format whether it should silently ignore non-LZO input
  30 + * files. When the property is true (which is the default), non-LZO files will
  31 + * be silently ignored. When the property is false, non-LZO files will be
  32 + * processed using the standard TextInputFormat.
  33 + */
  34 + public static final String IGNORE_NONLZO_KEY = "lzo.text.input.format.ignore.nonlzo";
  35 + /**
  36 + * Default value of the <code>lzo.text.input.format.ignore.nonlzo</code>
  37 + * property.
  38 + */
  39 + public static final boolean DEFAULT_IGNORE_NONLZO = true;
  40 + /**
  41 + * Full extension for LZO index files (".lzo.index").
  42 + */
  43 + public static final String FULL_LZO_INDEX_SUFFIX =
  44 + LzopCodec.DEFAULT_LZO_EXTENSION + LzoIndex.LZO_INDEX_SUFFIX;
  45 +
  46 + /**
  47 + * @param conf the Configuration object
  48 + * @return the value of the <code>lzo.text.input.format.ignore.nonlzo</code>
  49 + * property in <code>conf</code>, or <code>DEFAULT_IGNORE_NONLZO</code>
  50 + * if the property is not set.
  51 + */
  52 + public static boolean getIgnoreNonLzoProperty(Configuration conf) {
  53 + return conf.getBoolean(IGNORE_NONLZO_KEY, DEFAULT_IGNORE_NONLZO);
  54 + }
  55 +
  56 + /**
  57 + * Checks if the given filename ends in ".lzo".
  58 + *
  59 + * @param filename filename to check.
  60 + * @return true if the filename ends in ".lzo"
  61 + */
  62 + public static boolean isLzoFile(String filename) {
  63 + return filename.endsWith(LzopCodec.DEFAULT_LZO_EXTENSION);
  64 + }
  65 +
  66 + /**
  67 + * Checks if the given filename ends in ".lzo.index".
  68 + *
  69 + * @param filename filename to check.
  70 + * @return true if the filename ends in ".lzo.index"
  71 + */
  72 + public static boolean isLzoIndexFile(String filename) {
  73 + return filename.endsWith(FULL_LZO_INDEX_SUFFIX);
  74 + }
  75 +}
17 src/java/com/hadoop/compression/lzo/LzopCodec.java
@@ -18,6 +18,7 @@
18 18
19 19 package com.hadoop.compression.lzo;
20 20
  21 +import java.io.DataOutputStream;
21 22 import java.io.IOException;
22 23 import java.io.InputStream;
23 24 import java.io.OutputStream;
@@ -42,22 +43,34 @@
42 43 public static final int LZOP_VERSION = 0x1010;
43 44 /** Latest verion of lzop this should be compatible with */
44 45 public static final int LZOP_COMPAT_VERSION = 0x0940;
  46 + public static final String DEFAULT_LZO_EXTENSION = ".lzo";
45 47
46 48 @Override
47 49 public CompressionOutputStream createOutputStream(OutputStream out) throws IOException {
48 50 return createOutputStream(out, createCompressor());
49 51 }
50 52
  53 + public CompressionOutputStream createIndexedOutputStream(OutputStream out,
  54 + DataOutputStream indexOut)
  55 + throws IOException {
  56 + return createIndexedOutputStream(out, indexOut, createCompressor());
  57 + }
  58 +
51 59 @Override
52 60 public CompressionOutputStream createOutputStream(OutputStream out,
53 61 Compressor compressor) throws IOException {
  62 + return createIndexedOutputStream(out, null, compressor);
  63 + }
  64 +
  65 + public CompressionOutputStream createIndexedOutputStream(OutputStream out,
  66 + DataOutputStream indexOut, Compressor compressor) throws IOException {
54 67 if (!isNativeLzoLoaded(getConf())) {
55 68 throw new RuntimeException("native-lzo library not available");
56 69 }
57 70 LzoCompressor.CompressionStrategy strategy = LzoCompressor.CompressionStrategy.valueOf(
58 71 getConf().get(LZO_COMPRESSOR_KEY, LzoCompressor.CompressionStrategy.LZO1X_1.name()));
59 72 int bufferSize = getConf().getInt(LZO_BUFFER_SIZE_KEY, DEFAULT_LZO_BUFFER_SIZE);
60   - return new LzopOutputStream(out, compressor, bufferSize, strategy);
  73 + return new LzopOutputStream(out, indexOut, compressor, bufferSize, strategy);
61 74 }
62 75
63 76 @Override
@@ -95,6 +108,6 @@ public Decompressor createDecompressor() {
95 108
96 109 @Override
97 110 public String getDefaultExtension() {
98   - return ".lzo";
  111 + return DEFAULT_LZO_EXTENSION;
99 112 }
100 113 }
50 src/java/com/hadoop/compression/lzo/LzopOutputStream.java
@@ -18,6 +18,8 @@
18 18
19 19 package com.hadoop.compression.lzo;
20 20
  21 +import java.io.DataOutputStream;
  22 +import java.io.FilterOutputStream;
21 23 import java.io.IOException;
22 24 import java.io.OutputStream;
23 25 import java.util.zip.Adler32;
@@ -26,13 +28,11 @@
26 28 import org.apache.hadoop.io.compress.CompressorStream;
27 29 import org.apache.hadoop.io.compress.Compressor;
28 30
29   -import org.apache.commons.logging.Log;
30   -import org.apache.commons.logging.LogFactory;
31   -
32 31 public class LzopOutputStream extends CompressorStream {
33   - private static final Log LOG = LogFactory.getLog(LzopOutputStream.class);
34 32
35 33 final int MAX_INPUT_SIZE;
  34 + protected DataOutputStream indexOut;
  35 + private CountingOutputStream cout;
36 36
37 37 /**
38 38 * Write an lzop-compatible header to the OutputStream provided.
@@ -79,13 +79,22 @@ protected static void writeLzopHeader(OutputStream out,
79 79 public LzopOutputStream(OutputStream out, Compressor compressor,
80 80 int bufferSize, LzoCompressor.CompressionStrategy strategy)
81 81 throws IOException {
82   - super(out, compressor, bufferSize);
  82 + this(out, null, compressor, bufferSize, strategy);
  83 + }
83 84
  85 + public LzopOutputStream(OutputStream out, DataOutputStream indexOut,
  86 + Compressor compressor, int bufferSize,
  87 + LzoCompressor.CompressionStrategy strategy)
  88 + throws IOException {
  89 + super(new CountingOutputStream(out), compressor, bufferSize);
  90 +
  91 + this.cout = (CountingOutputStream) this.out;
  92 + this.indexOut = indexOut;
84 93 int overhead = strategy.name().contains("LZO1") ?
85 94 (bufferSize >> 4) + 64 + 3 : (bufferSize >> 3) + 128 + 3;
86 95 MAX_INPUT_SIZE = bufferSize - overhead;
87 96
88   - writeLzopHeader(out, strategy);
  97 + writeLzopHeader(this.out, strategy);
89 98 }
90 99
91 100 /**
@@ -97,6 +106,9 @@ public void close() throws IOException {
97 106 finish();
98 107 out.write(new byte[]{ 0, 0, 0, 0 });
99 108 out.close();
  109 + if (indexOut != null) {
  110 + indexOut.close();
  111 + }
100 112 closed = true;
101 113 }
102 114 }
@@ -171,13 +183,18 @@ public void finish() throws IOException {
171 183 protected void compress() throws IOException {
172 184 int len = compressor.compress(buffer, 0, buffer.length);
173 185 if (len > 0) {
  186 + // new lzo block. write current position to index file.
  187 + if (indexOut != null) {
  188 + indexOut.writeLong(cout.bytesWritten);
  189 + }
  190 +
174 191 rawWriteInt((int)compressor.getBytesRead());
175 192
176 193 // If the compressed buffer is actually larger than the uncompressed buffer,
177 194 // the LZO specification says that we should write the uncompressed bytes rather
178 195 // than the compressed bytes. The decompressor understands this because both sizes
179 196 // get written to the stream.
180   - if (compressor.getBytesRead() < compressor.getBytesWritten()) {
  197 + if (compressor.getBytesRead() <= compressor.getBytesWritten()) {
181 198 // Compression actually increased the size of the buffer, so write the uncompressed bytes.
182 199 byte[] uncompressed = ((LzoCompressor)compressor).uncompressedBytes();
183 200 rawWriteInt(uncompressed.length);
@@ -196,4 +213,23 @@ private void rawWriteInt(int v) throws IOException {
196 213 out.write((v >>> 8) & 0xFF);
197 214 out.write((v >>> 0) & 0xFF);
198 215 }
  216 +
  217 + /* keeps count of number of bytes written. */
  218 + private static class CountingOutputStream extends FilterOutputStream {
  219 + public CountingOutputStream(OutputStream out) {
  220 + super(out);
  221 + }
  222 +
  223 + long bytesWritten = 0;
  224 +
  225 + public void write(byte[] b, int off, int len) throws IOException {
  226 + out.write(b, off, len);
  227 + bytesWritten += len;
  228 + }
  229 +
  230 + public void write(int b) throws IOException {
  231 + out.write(b);
  232 + bytesWritten++;
  233 + }
  234 + }
199 235 }
57 src/java/com/hadoop/mapred/DeprecatedLzoTextInputFormat.java
@@ -31,14 +31,16 @@
31 31 import org.apache.hadoop.fs.Path;
32 32 import org.apache.hadoop.io.LongWritable;
33 33 import org.apache.hadoop.io.Text;
34   -import org.apache.hadoop.mapred.FileInputFormat;
35 34 import org.apache.hadoop.mapred.FileSplit;
36 35 import org.apache.hadoop.mapred.InputSplit;
37 36 import org.apache.hadoop.mapred.JobConf;
  37 +import org.apache.hadoop.mapred.JobConfigurable;
38 38 import org.apache.hadoop.mapred.RecordReader;
39 39 import org.apache.hadoop.mapred.Reporter;
  40 +import org.apache.hadoop.mapred.TextInputFormat;
40 41
41 42 import com.hadoop.compression.lzo.LzoIndex;
  43 +import com.hadoop.compression.lzo.LzoInputFormatCommon;
42 44 import com.hadoop.compression.lzo.LzopCodec;
43 45
44 46 /**
@@ -50,27 +52,40 @@
50 52 * com.hadoop.mapred.DeprecatedLzoTextInputFormat, not
51 53 * com.hadoop.mapreduce.LzoTextInputFormat. The classes attempt to be alike in
52 54 * every other respect.
  55 + *
  56 + * Note that to use this input format properly with hadoop-streaming, you should
  57 + * also set the property <code>stream.map.input.ignoreKey=true</code>. That will
  58 + * replicate the behavior of the default TextInputFormat by stripping off the byte
  59 + * offset keys from the input lines that get piped to the mapper process.
  60 + *
  61 + * See {@link LzoInputFormatCommon} for a description of the boolean property
  62 + * <code>lzo.text.input.format.ignore.nonlzo</code> and how it affects the
  63 + * behavior of this input format.
53 64 */
54 65
55 66 @SuppressWarnings("deprecation")
56   -public class DeprecatedLzoTextInputFormat extends FileInputFormat<LongWritable, Text> {
57   - public static final String LZO_INDEX_SUFFIX = ".index";
  67 +public class DeprecatedLzoTextInputFormat extends TextInputFormat {
58 68 private final Map<Path, LzoIndex> indexes = new HashMap<Path, LzoIndex>();
59 69
60 70 @Override
61 71 protected FileStatus[] listStatus(JobConf conf) throws IOException {
62 72 List<FileStatus> files = new ArrayList<FileStatus>(Arrays.asList(super.listStatus(conf)));
63 73
64   - String fileExtension = new LzopCodec().getDefaultExtension();
  74 + boolean ignoreNonLzo = LzoInputFormatCommon.getIgnoreNonLzoProperty(conf);
65 75
66 76 Iterator<FileStatus> it = files.iterator();
67 77 while (it.hasNext()) {
68 78 FileStatus fileStatus = it.next();
69 79 Path file = fileStatus.getPath();
70 80
71   - if (!file.toString().endsWith(fileExtension)) {
72   - // Get rid of non-LZO files.
73   - it.remove();
  81 + if (!LzoInputFormatCommon.isLzoFile(file.toString())) {
  82 + // Get rid of non-LZO files, unless the conf explicitly tells us to
  83 + // keep them.
  84 + // However, always skip over files that end with ".lzo.index", since
  85 + // they are not part of the input.
  86 + if (ignoreNonLzo || LzoInputFormatCommon.isLzoIndexFile(file.toString())) {
  87 + it.remove();
  88 + }
74 89 } else {
75 90 FileSystem fs = file.getFileSystem(conf);
76 91 LzoIndex index = LzoIndex.readIndex(fs, file);
@@ -83,8 +98,13 @@
83 98
84 99 @Override
85 100 protected boolean isSplitable(FileSystem fs, Path filename) {
86   - LzoIndex index = indexes.get(filename);
87   - return !index.isEmpty();
  101 + if (LzoInputFormatCommon.isLzoFile(filename.toString())) {
  102 + LzoIndex index = indexes.get(filename);
  103 + return !index.isEmpty();
  104 + } else {
  105 + // Delegate non-LZO files to the TextInputFormat base class.
  106 + return super.isSplitable(fs, filename);
  107 + }
88 108 }
89 109
90 110 @Override
@@ -97,6 +117,14 @@ protected boolean isSplitable(FileSystem fs, Path filename) {
97 117 for (FileSplit fileSplit: splits) {
98 118 Path file = fileSplit.getPath();
99 119 FileSystem fs = file.getFileSystem(conf);
  120 +
  121 + if (!LzoInputFormatCommon.isLzoFile(file.toString())) {
  122 + // non-LZO file, keep the input split as is.
  123 + result.add(fileSplit);
  124 + continue;
  125 + }
  126 +
  127 + // LZO file, try to split if the .index file was found
100 128 LzoIndex index = indexes.get(file);
101 129 if (index == null) {
102 130 throw new IOException("Index not found for " + file);
@@ -124,8 +152,13 @@ protected boolean isSplitable(FileSystem fs, Path filename) {
124 152 @Override
125 153 public RecordReader<LongWritable, Text> getRecordReader(InputSplit split,
126 154 JobConf conf, Reporter reporter) throws IOException {
127   - reporter.setStatus(split.toString());
128   - return new DeprecatedLzoLineRecordReader(conf, (FileSplit)split);
  155 + FileSplit fileSplit = (FileSplit) split;
  156 + if (LzoInputFormatCommon.isLzoFile(fileSplit.getPath().toString())) {
  157 + reporter.setStatus(split.toString());
  158 + return new DeprecatedLzoLineRecordReader(conf, (FileSplit)split);
  159 + } else {
  160 + // delegate non-LZO files to the TextInputFormat base class.
  161 + return super.getRecordReader(split, conf, reporter);
  162 + }
129 163 }
130   -
131 164 }
52 src/java/com/hadoop/mapreduce/LzoTextInputFormat.java
@@ -36,36 +36,45 @@
36 36 import org.apache.hadoop.mapreduce.JobContext;
37 37 import org.apache.hadoop.mapreduce.RecordReader;
38 38 import org.apache.hadoop.mapreduce.TaskAttemptContext;
39   -import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
40 39 import org.apache.hadoop.mapreduce.lib.input.FileSplit;
  40 +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
41 41
42 42 import com.hadoop.compression.lzo.LzoIndex;
  43 +import com.hadoop.compression.lzo.LzoInputFormatCommon;
43 44 import com.hadoop.compression.lzo.LzopCodec;
44 45
45 46 /**
46 47 * An {@link InputFormat} for lzop compressed text files. Files are broken into
47 48 * lines. Either linefeed or carriage-return are used to signal end of line.
48 49 * Keys are the position in the file, and values are the line of text.
  50 + *
  51 + * See {@link LzoInputFormatCommon} for a description of the boolean property
  52 + * <code>lzo.text.input.format.ignore.nonlzo</code> and how it affects the
  53 + * behavior of this input format.
49 54 */
50   -public class LzoTextInputFormat extends FileInputFormat<LongWritable, Text> {
51   -
  55 +public class LzoTextInputFormat extends TextInputFormat {
52 56 private final Map<Path, LzoIndex> indexes = new HashMap<Path, LzoIndex>();
53 57
54 58 @Override
55 59 protected List<FileStatus> listStatus(JobContext job) throws IOException {
56 60 List<FileStatus> files = super.listStatus(job);
57 61
58   - String fileExtension = new LzopCodec().getDefaultExtension();
59 62 Configuration conf = job.getConfiguration();
  63 + boolean ignoreNonLzo = LzoInputFormatCommon.getIgnoreNonLzoProperty(conf);
60 64
61 65 for (Iterator<FileStatus> iterator = files.iterator(); iterator.hasNext();) {
62 66 FileStatus fileStatus = iterator.next();
63 67 Path file = fileStatus.getPath();
64 68 FileSystem fs = file.getFileSystem(conf);
65 69
66   - if (!file.toString().endsWith(fileExtension)) {
67   - //get rid of non lzo files
68   - iterator.remove();
  70 + if (!LzoInputFormatCommon.isLzoFile(file.toString())) {
  71 + // Get rid of non-LZO files, unless the conf explicitly tells us to
  72 + // keep them.
  73 + // However, always skip over files that end with ".lzo.index", since
  74 + // they are not part of the input.
  75 + if (ignoreNonLzo || LzoInputFormatCommon.isLzoIndexFile(file.toString())) {
  76 + iterator.remove();
  77 + }
69 78 } else {
70 79 //read the index file
71 80 LzoIndex index = LzoIndex.readIndex(fs, file);
@@ -78,8 +87,13 @@
78 87
79 88 @Override
80 89 protected boolean isSplitable(JobContext context, Path filename) {
81   - LzoIndex index = indexes.get(filename);
82   - return !index.isEmpty();
  90 + if (LzoInputFormatCommon.isLzoFile(filename.toString())) {
  91 + LzoIndex index = indexes.get(filename);
  92 + return !index.isEmpty();
  93 + } else {
  94 + // Delegate non-LZO files to the TextInputFormat base class.
  95 + return super.isSplitable(context, filename);
  96 + }
83 97 }
84 98
85 99 @Override
@@ -92,10 +106,17 @@ protected boolean isSplitable(JobContext context, Path filename) {
92 106 List<InputSplit> result = new ArrayList<InputSplit>();
93 107
94 108 for (InputSplit genericSplit : splits) {
95   - // load the index
96 109 FileSplit fileSplit = (FileSplit) genericSplit;
97 110 Path file = fileSplit.getPath();
98 111 FileSystem fs = file.getFileSystem(conf);
  112 +
  113 + if (!LzoInputFormatCommon.isLzoFile(file.toString())) {
  114 + // non-LZO file, keep the input split as is.
  115 + result.add(fileSplit);
  116 + continue;
  117 + }
  118 +
  119 + // LZO file, try to split if the .index file was found
99 120 LzoIndex index = indexes.get(file);
100 121 if (index == null) {
101 122 throw new IOException("Index not found for " + file);
@@ -123,8 +144,13 @@ protected boolean isSplitable(JobContext context, Path filename) {
123 144
124 145 @Override
125 146 public RecordReader<LongWritable, Text> createRecordReader(InputSplit split,
126   - TaskAttemptContext taskAttempt) throws IOException, InterruptedException {
127   -
128   - return new LzoLineRecordReader();
  147 + TaskAttemptContext taskAttempt) {
  148 + FileSplit fileSplit = (FileSplit) split;
  149 + if (LzoInputFormatCommon.isLzoFile(fileSplit.getPath().toString())) {
  150 + return new LzoLineRecordReader();
  151 + } else {
  152 + // Delegate non-LZO files to the TextInputFormat base class.
  153 + return super.createRecordReader(split, taskAttempt);
  154 + }
129 155 }
130 156 }
19 src/native/impl/lzo/LzoCompressor.c
@@ -35,6 +35,7 @@ typedef struct {
35 35
36 36 #define UNDEFINED_COMPRESSION_LEVEL -999
37 37
  38 +// Default compression level used when user supplies no value.
38 39 static lzo_compressor lzo_compressors[] = {
39 40 /** lzo1 compressors */
40 41 /* 0 */ {"lzo1_compress", LZO1_MEM_COMPRESS, UNDEFINED_COMPRESSION_LEVEL},
@@ -112,6 +113,7 @@ static jfieldID LzoCompressor_uncompressedDirectBufLen;
112 113 static jfieldID LzoCompressor_compressedDirectBuf;
113 114 static jfieldID LzoCompressor_directBufferSize;
114 115 static jfieldID LzoCompressor_lzoCompressor;
  116 +static jfieldID LzoCompressor_lzoCompressionLevel;
115 117 static jfieldID LzoCompressor_workingMemoryBufLen;
116 118 static jfieldID LzoCompressor_workingMemoryBuf;
117 119
@@ -144,6 +146,8 @@ Java_com_hadoop_compression_lzo_LzoCompressor_initIDs(
144 146 "directBufferSize", "I");
145 147 LzoCompressor_lzoCompressor = (*env)->GetFieldID(env, class,
146 148 "lzoCompressor", "J");
  149 + LzoCompressor_lzoCompressionLevel = (*env)->GetFieldID(env, class,
  150 + "lzoCompressionLevel", "I");
147 151 LzoCompressor_workingMemoryBufLen = (*env)->GetFieldID(env, class,
148 152 "workingMemoryBufLen", "I");
149 153 LzoCompressor_workingMemoryBuf = (*env)->GetFieldID(env, class,
@@ -189,7 +193,6 @@ Java_com_hadoop_compression_lzo_LzoCompressor_init(
189 193 // Save the compressor-function into LzoCompressor_lzoCompressor
190 194 (*env)->SetIntField(env, this, LzoCompressor_workingMemoryBufLen,
191 195 lzo_compressors[compressor].wrkmem);
192   -
193 196 return;
194 197 }
195 198
@@ -218,6 +221,13 @@ Java_com_hadoop_compression_lzo_LzoCompressor_compressBytesDirect(
218 221 lzo_uint compressed_direct_buf_len = (*env)->GetIntField(env, this,
219 222 LzoCompressor_directBufferSize);
220 223
  224 + // Prefer the user defined compression level.
  225 + int compression_level = (*env)->GetIntField(env, this,
  226 + LzoCompressor_lzoCompressionLevel);
  227 + if (UNDEFINED_COMPRESSION_LEVEL == compression_level) {
  228 + compression_level = lzo_compressors[compressor].compression_level;
  229 + }
  230 +
221 231 jobject working_memory_buf = (*env)->GetObjectField(env, this,
222 232 LzoCompressor_workingMemoryBuf);
223 233
@@ -256,12 +266,17 @@ Java_com_hadoop_compression_lzo_LzoCompressor_compressBytesDirect(
256 266 // Compress
257 267 lzo_uint no_compressed_bytes = compressed_direct_buf_len;
258 268 int rv = 0;
259   - int compression_level = lzo_compressors[compressor].compression_level;
260 269 if (compression_level == UNDEFINED_COMPRESSION_LEVEL) {
261 270 lzo_compress_t fptr = (lzo_compress_t) FUNC_PTR(lzo_compressor_funcptr);
262 271 rv = fptr(uncompressed_bytes, uncompressed_direct_buf_len,
263 272 compressed_bytes, &no_compressed_bytes,
264 273 workmem);
  274 + } else if (strstr(lzo_compressor_function, "lzo1x_999")
  275 + || strstr(lzo_compressor_function, "lzo1y_999")) {
  276 + // Compression levels are only available in these codecs.
  277 + rv = lzo1x_999_compress_level(uncompressed_bytes, uncompressed_direct_buf_len,
  278 + compressed_bytes, &no_compressed_bytes,
  279 + workmem, NULL, 0, 0, compression_level);
265 280 } else {
266 281 lzo_compress2_t fptr = (lzo_compress2_t) FUNC_PTR(lzo_compressor_funcptr);
267 282 rv = fptr(uncompressed_bytes, uncompressed_direct_buf_len,
49 src/test/com/hadoop/compression/lzo/TestLzopOutputStream.java
@@ -20,11 +20,11 @@
20 20
21 21 import java.io.BufferedWriter;
22 22 import java.io.BufferedReader;
  23 +import java.io.DataOutputStream;
23 24 import java.io.File;
24 25 import java.io.FileOutputStream;
25 26 import java.io.FileInputStream;
26 27 import java.io.IOException;
27   -import java.io.OutputStreamWriter;
28 28 import java.io.InputStreamReader;
29 29 import java.security.NoSuchAlgorithmException;
30 30
@@ -32,6 +32,9 @@
32 32
33 33 import org.apache.commons.logging.Log;
34 34 import org.apache.commons.logging.LogFactory;
  35 +import org.apache.hadoop.conf.Configuration;
  36 +import org.apache.hadoop.fs.FileSystem;
  37 +import org.apache.hadoop.fs.Path;
35 38
36 39 /**
37 40 * Test the LzoOutputFormat, make sure that it can write files of different sizes and read them back in
@@ -46,11 +49,16 @@
46 49 private final String bigFile = "100000.txt";
47 50 private final String mediumFile = "1000.txt";
48 51 private final String smallFile = "100.txt";
  52 + private final String issue20File = "issue20-lzop.txt";
  53 + private FileSystem localFs;
49 54
50 55 @Override
51 56 protected void setUp() throws Exception {
52 57 super.setUp();
53 58 inputDataPath = System.getProperty("test.build.data", "data");
  59 + Configuration conf = new Configuration();
  60 + conf.set("io.compression.codecs", LzopCodec.class.getName());
  61 + localFs = FileSystem.getLocal(conf).getRaw();
54 62 }
55 63
56 64 /**
@@ -85,6 +93,25 @@ public void testSmallFile() throws NoSuchAlgorithmException, IOException,
85 93 }
86 94
87 95 /**
  96 + * The LZO specification says that we should write the uncompressed bytes
  97 + * rather than the compressed bytes if the compressed buffer is actually
  98 + * larger ('&gt;') than the uncompressed buffer.
  99 + *
  100 + * To conform to the standard, this means we have to write the uncompressed
  101 + * bytes also when they have exactly the same size as the compressed bytes.
  102 + * (the '==' in '&lt;=').
  103 + *
  104 + * The input data of this test is known to compress to the same size as the
  105 + * uncompressed data. Hence we verify that we handle the boundary condition
  106 + * correctly.
  107 + *
  108 + */
  109 + public void testIssue20File() throws NoSuchAlgorithmException, IOException,
  110 + InterruptedException {
  111 + runTest(issue20File);
  112 + }
  113 +
  114 + /**
88 115 * Test that reading an lzo-compressed file produces the same lines as reading the equivalent
89 116 * flat file. The test opens both the compressed and flat file, successively reading each
90 117 * line by line and comparing.
@@ -101,6 +128,7 @@ private void runTest(String filename) throws IOException,
101 128 File textFile = new File(inputDataPath, filename);
102 129 File lzoFile = new File(inputDataPath, filename + new LzopCodec().getDefaultExtension());
103 130 File lzoOutFile = new File(inputDataPath, "output_" + filename + new LzopCodec().getDefaultExtension());
  131 + File lzoIndexFile = new File(lzoOutFile.getAbsolutePath() + LzoIndex.LZO_INDEX_SUFFIX);
104 132 if (lzoOutFile.exists()) {
105 133 lzoOutFile.delete();
106 134 }
@@ -116,7 +144,9 @@ private void runTest(String filename) throws IOException,
116 144 int lzoBufferSize = 256 * 1024;
117 145 LzoCompressor.CompressionStrategy strategy = LzoCompressor.CompressionStrategy.LZO1X_1;
118 146 LzoCompressor lzoCompressor = new LzoCompressor(strategy, lzoBufferSize);
119   - LzopOutputStream lzoOut = new LzopOutputStream(new FileOutputStream(lzoOutFile.getAbsolutePath()), lzoCompressor, lzoBufferSize, strategy);
  147 + LzopOutputStream lzoOut = new LzopOutputStream(new FileOutputStream(lzoOutFile),
  148 + new DataOutputStream(new FileOutputStream(lzoIndexFile)),
  149 + lzoCompressor, lzoBufferSize, strategy);
120 150
121 151 // Now read line by line and stream out..
122 152 String textLine;
@@ -158,5 +188,20 @@ private void runTest(String filename) throws IOException,
158 188
159 189 lzoBr.close();
160 190 textBr2.close();
  191 +
  192 + // verify the index file:
  193 +
  194 + Path lzoOutPath = new Path(lzoOutFile.getAbsolutePath());
  195 + LzoIndex lzoIndex = LzoIndex.readIndex(localFs, lzoOutPath);
  196 +
  197 + // create offline index to compare.
  198 + assertTrue(lzoIndexFile.delete());
  199 + LzoIndex.createIndex(localFs, lzoOutPath);
  200 + LzoIndex expectedIndex = LzoIndex.readIndex(localFs, lzoOutPath);
  201 +
  202 + assertEquals(lzoIndex.getNumberOfBlocks(), expectedIndex.getNumberOfBlocks());
  203 + for (int i=0; i<lzoIndex.getNumberOfBlocks(); i++) {
  204 + assertEquals(lzoIndex.getPosition(i), expectedIndex.getPosition(i));
  205 + }
161 206 }
162 207 }
120 src/test/com/hadoop/mapreduce/TestLzoTextInputFormat.java
@@ -21,6 +21,7 @@
21 21 import java.io.IOException;
22 22 import java.security.MessageDigest;
23 23 import java.security.NoSuchAlgorithmException;
  24 +import java.util.ArrayList;
24 25 import java.util.Arrays;
25 26 import java.util.List;
26 27 import java.util.Random;
@@ -31,6 +32,7 @@
31 32 import org.apache.commons.logging.LogFactory;
32 33 import org.apache.hadoop.conf.Configuration;
33 34 import org.apache.hadoop.fs.FileSystem;
  35 +import org.apache.hadoop.fs.FSDataOutputStream;
34 36 import org.apache.hadoop.fs.Path;
35 37 import org.apache.hadoop.io.LongWritable;
36 38 import org.apache.hadoop.io.Text;
@@ -41,11 +43,13 @@
41 43 import org.apache.hadoop.mapreduce.RecordWriter;
42 44 import org.apache.hadoop.mapreduce.TaskAttemptContext;
43 45 import org.apache.hadoop.mapreduce.TaskAttemptID;
  46 +import org.apache.hadoop.mapreduce.lib.input.FileSplit;
44 47 import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
45 48 import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat;
46 49
47 50 import com.hadoop.compression.lzo.GPLNativeCodeLoader;
48 51 import com.hadoop.compression.lzo.LzoIndex;
  52 +import com.hadoop.compression.lzo.LzoInputFormatCommon;
49 53 import com.hadoop.compression.lzo.LzopCodec;
50 54
51 55 /**
@@ -156,7 +160,7 @@ private void runTest(boolean testWithIndex, int charsToOutput) throws IOExceptio
156 160 localFs.delete(outputDir, true);
157 161 localFs.mkdirs(outputDir);
158 162
159   - Job job = new Job(conf);
  163 + Job job = new Job(conf);
160 164 TextOutputFormat.setCompressOutput(job, true);
161 165 TextOutputFormat.setOutputCompressorClass(job, LzopCodec.class);
162 166 TextOutputFormat.setOutputPath(job, outputDir);
@@ -263,4 +267,118 @@ private int fillText(char[] chars, Random r, int charsMax, Text text) {
263 267 return stringLength;
264 268 }
265 269
  270 + public void testIgnoreNonLzoTrue()
  271 + throws IOException, InterruptedException, NoSuchAlgorithmException {
  272 + runTestIgnoreNonLzo(true, OUTPUT_BIG, true);
  273 + runTestIgnoreNonLzo(true, OUTPUT_SMALL, true);
  274 + runTestIgnoreNonLzo(false, OUTPUT_BIG, true);
  275 + runTestIgnoreNonLzo(false, OUTPUT_SMALL, true);
  276 + }
  277 +
  278 + public void testIgnoreNonLzoFalse()
  279 + throws IOException, InterruptedException, NoSuchAlgorithmException {
  280 + runTestIgnoreNonLzo(true, OUTPUT_BIG, false);
  281 + runTestIgnoreNonLzo(true, OUTPUT_SMALL, false);
  282 + runTestIgnoreNonLzo(false, OUTPUT_BIG, false);
  283 + runTestIgnoreNonLzo(false, OUTPUT_SMALL, false);
  284 + }
  285 +
  286 + private void runTestIgnoreNonLzo(boolean testWithIndex, int charsToOutput,
  287 + boolean ignoreNonLzo) throws IOException, InterruptedException, NoSuchAlgorithmException {
  288 + if (!GPLNativeCodeLoader.isNativeCodeLoaded()) {
  289 + LOG.warn("Cannot run this test without the native lzo libraries");
  290 + return;
  291 + }
  292 +
  293 + Configuration conf = new Configuration();
  294 + conf.setLong("fs.local.block.size", charsToOutput / 2);
  295 + // reducing block size to force a split of the tiny file
  296 + conf.set("io.compression.codecs", LzopCodec.class.getName());
  297 + conf.setBoolean(LzoInputFormatCommon.IGNORE_NONLZO_KEY, ignoreNonLzo);
  298 +
  299 + FileSystem localFs = FileSystem.getLocal(conf);
  300 + localFs.delete(outputDir, true);
  301 + localFs.mkdirs(outputDir);
  302 +
  303 + // Create a non-LZO input file and put it alongside the LZO files.
  304 + Path nonLzoFile = new Path(outputDir, "part-r-00001");
  305 + localFs.createNewFile(nonLzoFile);
  306 + FSDataOutputStream outputStream = localFs.create(nonLzoFile);
  307 + outputStream.writeBytes("key1\tvalue1\nkey2\tvalue2\nkey3\tvalue3\n");
  308 + outputStream.close();
  309 +
  310 + Job job = new Job(conf);
  311 + TextOutputFormat.setCompressOutput(job, true);
  312 + TextOutputFormat.setOutputCompressorClass(job, LzopCodec.class);
  313 + TextOutputFormat.setOutputPath(job, outputDir);
  314 +
  315 + TaskAttemptContext attemptContext = new TaskAttemptContext(job.getConfiguration(),
  316 + new TaskAttemptID("123", 0, false, 1, 2));
  317 +
  318 + // create some input data
  319 + byte[] expectedMd5 = createTestInput(outputDir, localFs, attemptContext, charsToOutput);
  320 +
  321 + if (testWithIndex) {
  322 + Path lzoFile = new Path(outputDir, lzoFileName);
  323 + LzoIndex.createIndex(localFs, lzoFile);
  324 + }
  325 +
  326 + LzoTextInputFormat inputFormat = new LzoTextInputFormat();
  327 + TextInputFormat.setInputPaths(job, outputDir);
  328 +
  329 + // verify we have the right number of input splits
  330 + List<InputSplit> is = inputFormat.getSplits(job);
  331 + int numExpectedLzoSplits = 0;
  332 + int numExpectedNonLzoSplits = 0;
  333 + int numActualLzoSplits = 0;
  334 + int numActualNonLzoSplits = 0;
  335 + if (!ignoreNonLzo) {
  336 + numExpectedNonLzoSplits += 1;
  337 + }
  338 + if (testWithIndex && OUTPUT_BIG == charsToOutput) {
  339 + numExpectedLzoSplits += 3;
  340 + } else {
  341 + numExpectedLzoSplits += 1;
  342 + }
  343 + assertEquals(numExpectedLzoSplits + numExpectedNonLzoSplits, is.size());
  344 +
  345 + // Verify that we have the right number of each kind of split and the right
  346 + // data inside the splits.
  347 + List<String> expectedNonLzoLines = new ArrayList<String>();
  348 + if (!ignoreNonLzo) {
  349 + expectedNonLzoLines.add("key1\tvalue1");
  350 + expectedNonLzoLines.add("key2\tvalue2");
  351 + expectedNonLzoLines.add("key3\tvalue3");
  352 + }
  353 + List<String> actualNonLzoLines = new ArrayList<String>();
  354 + for (InputSplit inputSplit : is) {
  355 + FileSplit fileSplit = (FileSplit) inputSplit;
  356 + Path file = fileSplit.getPath();
  357 + RecordReader<LongWritable, Text> rr = inputFormat.createRecordReader(
  358 + inputSplit, attemptContext);
  359 + rr.initialize(inputSplit, attemptContext);
  360 + if (LzoInputFormatCommon.isLzoFile(file.toString())) {
  361 + numActualLzoSplits += 1;
  362 +
  363 + while (rr.nextKeyValue()) {
  364 + Text value = rr.getCurrentValue();
  365 +
  366 + md5.update(value.getBytes(), 0, value.getLength());
  367 + }
  368 +
  369 + rr.close();
  370 + } else {
  371 + numActualNonLzoSplits += 1;
  372 +
  373 + while (rr.nextKeyValue()) {
  374 + actualNonLzoLines.add(rr.getCurrentValue().toString());
  375 + }
  376 + }
  377 + }
  378 + localFs.close();
  379 + assertEquals(numExpectedLzoSplits, numActualLzoSplits);
  380 + assertEquals(numExpectedNonLzoSplits, numActualNonLzoSplits);
  381 + assertTrue(Arrays.equals(expectedMd5, md5.digest()));
  382 + assertEquals(expectedNonLzoLines, actualNonLzoLines);
  383 + }
266 384 }
6 src/test/data/issue20-lzop.txt
... ... @@ -0,0 +1,6 @@
  1 +0.5 74 25425
  2 +0.9 200 25384
  3 +0.95 203 4
  4 +0.98 211 2
  5 +0.99 219 3
  6 +0.995 240 5

0 comments on commit ceb643f

Please sign in to comment.