Permalink
Browse files

1) Added the boolean option "lzo.text.input.format.ignore.nonlzo", (d…

…efault

is true). The option is to be used with the DeprecatedLzoTextInputFormat and
LzoTextInputFormat input format classes.

When true, it causes all files that don't end in ".lzo" to be silently dropped
from the input set.

When false, it will keep files that don't end in ".lzo", and will process them
with TextInputFormat (however, files that end in ".lzo.index" will still be
ignored). This makes it possible to process a mix of LZO and non-LZO files
with a single MR job, which in turn makes it much easier to perform an online
upgrade to LZO compression in a production system without incurring downtime.

It also makes it possible to reprocess ranges of log files that span the
pre-LZO / post-LZO boundary in a single MR job.

2) Added unit test for the above feature to TestLzoTextInputFormat.

3) Added a public LzopCodec.DEFAULT_LZO_EXTENSION constant.
  • Loading branch information...
1 parent 23e8370 commit c4d41c326f07a33efcfdc81f0f03aa499282f205 Ilya Maykov committed Aug 17, 2011
@@ -0,0 +1,75 @@
+/*
+ * This file is part of Hadoop-Gpl-Compression.
+ *
+ * Hadoop-Gpl-Compression is free software: you can redistribute it
+ * and/or modify it under the terms of the GNU General Public License
+ * as published by the Free Software Foundation, either version 3 of
+ * the License, or (at your option) any later version.
+ *
+ * Hadoop-Gpl-Compression is distributed in the hope that it will be
+ * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
+ * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
+ * GNU General Public License for more details.
+ *
+ * You should have received a copy of the GNU General Public License
+ * along with Hadoop-Gpl-Compression. If not, see
+ * <http://www.gnu.org/licenses/>.
+ */
+
+package com.hadoop.compression.lzo;
+
+import org.apache.hadoop.conf.Configuration;
+
+import com.hadoop.compression.lzo.LzoIndexer;
+import com.hadoop.compression.lzo.LzopCodec;
+
+public class LzoInputFormatCommon {
+ /**
+ * The boolean property <code>lzo.text.input.format.ignore.nonlzo</code> tells
+ * the LZO text input format whether it should silently ignore non-LZO input
+ * files. When the property is true (which is the default), non-LZO files will
+ * be silently ignored. When the property is false, non-LZO files will be
+ * processed using the standard TextInputFormat.
+ */
+ public static final String IGNORE_NONLZO_KEY = "lzo.text.input.format.ignore.nonlzo";
+ /**
+ * Default value of the <code>lzo.text.input.format.ignore.nonlzo</code>
+ * property.
+ */
+ public static final boolean DEFAULT_IGNORE_NONLZO = true;
+ /**
+ * Full extension for LZO index files (".lzo.index").
+ */
+ public static final String FULL_LZO_INDEX_SUFFIX =
+ LzopCodec.DEFAULT_LZO_EXTENSION + LzoIndex.LZO_INDEX_SUFFIX;
+
+ /**
+ * @param conf the Configuration object
+ * @return the value of the <code>lzo.text.input.format.ignore.nonlzo</code>
+ * property in <code>conf</code>, or <code>DEFAULT_IGNORE_NONLZO</code>
+ * if the property is not set.
+ */
+ public static boolean getIgnoreNonLzoProperty(Configuration conf) {
+ return conf.getBoolean(IGNORE_NONLZO_KEY, DEFAULT_IGNORE_NONLZO);
+ }
+
+ /**
+ * Checks if the given filename ends in ".lzo".
+ *
+ * @param filename filename to check.
+ * @return true if the filename ends in ".lzo"
+ */
+ public static boolean isLzoFile(String filename) {
+ return filename.endsWith(LzopCodec.DEFAULT_LZO_EXTENSION);
+ }
+
+ /**
+ * Checks if the given filename ends in ".lzo.index".
+ *
+ * @param filename filename to check.
+ * @return true if the filename ends in ".lzo.index"
+ */
+ public static boolean isLzoIndexFile(String filename) {
+ return filename.endsWith(FULL_LZO_INDEX_SUFFIX);
+ }
+}
@@ -43,6 +43,7 @@
public static final int LZOP_VERSION = 0x1010;
/** Latest verion of lzop this should be compatible with */
public static final int LZOP_COMPAT_VERSION = 0x0940;
+ public static final String DEFAULT_LZO_EXTENSION = ".lzo";
@Override
public CompressionOutputStream createOutputStream(OutputStream out) throws IOException {
@@ -107,6 +108,6 @@ public Decompressor createDecompressor() {
@Override
public String getDefaultExtension() {
- return ".lzo";
+ return DEFAULT_LZO_EXTENSION;
}
}
@@ -35,10 +35,13 @@
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.mapred.JobConfigurable;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
+import org.apache.hadoop.mapred.TextInputFormat;
import com.hadoop.compression.lzo.LzoIndex;
+import com.hadoop.compression.lzo.LzoInputFormatCommon;
import com.hadoop.compression.lzo.LzopCodec;
/**
@@ -50,27 +53,45 @@
* com.hadoop.mapred.DeprecatedLzoTextInputFormat, not
* com.hadoop.mapreduce.LzoTextInputFormat. The classes attempt to be alike in
* every other respect.
+ *
+ * See {@link LzoInputFormatCommon} for a description of the boolean property
+ * <code>lzo.text.input.format.ignore.nonlzo</code> and how it affects the
+ * behavior of this input format.
*/
@SuppressWarnings("deprecation")
-public class DeprecatedLzoTextInputFormat extends FileInputFormat<LongWritable, Text> {
- public static final String LZO_INDEX_SUFFIX = ".index";
+public class DeprecatedLzoTextInputFormat extends FileInputFormat<LongWritable, Text>
+ implements JobConfigurable {
+ // We need to call TextInputFormat.isSplitable() but the method is protected, so we
+ // make a private subclass that exposes a public wrapper method. /puke.
+ private class WrappedTextInputFormat extends TextInputFormat {
+ public boolean isSplitableWrapper(FileSystem fs, Path file) {
+ return isSplitable(fs, file);
+ }
+ }
+
private final Map<Path, LzoIndex> indexes = new HashMap<Path, LzoIndex>();
+ private final WrappedTextInputFormat textInputFormat = new WrappedTextInputFormat();
@Override
protected FileStatus[] listStatus(JobConf conf) throws IOException {
List<FileStatus> files = new ArrayList<FileStatus>(Arrays.asList(super.listStatus(conf)));
- String fileExtension = new LzopCodec().getDefaultExtension();
+ boolean ignoreNonLzo = LzoInputFormatCommon.getIgnoreNonLzoProperty(conf);
Iterator<FileStatus> it = files.iterator();
while (it.hasNext()) {
FileStatus fileStatus = it.next();
Path file = fileStatus.getPath();
- if (!file.toString().endsWith(fileExtension)) {
- // Get rid of non-LZO files.
- it.remove();
+ if (!LzoInputFormatCommon.isLzoFile(file.toString())) {
+ // Get rid of non-LZO files, unless the conf explicitly tells us to
+ // keep them.
+ // However, always skip over files that end with ".lzo.index", since
+ // they are not part of the input.
+ if (ignoreNonLzo || LzoInputFormatCommon.isLzoIndexFile(file.toString())) {
+ it.remove();
+ }
} else {
FileSystem fs = file.getFileSystem(conf);
LzoIndex index = LzoIndex.readIndex(fs, file);
@@ -83,8 +104,13 @@
@Override
protected boolean isSplitable(FileSystem fs, Path filename) {
- LzoIndex index = indexes.get(filename);
- return !index.isEmpty();
+ if (LzoInputFormatCommon.isLzoFile(filename.toString())) {
+ LzoIndex index = indexes.get(filename);
+ return !index.isEmpty();
+ } else {
+ // Delegate non-LZO files to TextInputFormat.
+ return textInputFormat.isSplitableWrapper(fs, filename);
+ }
}
@Override
@@ -97,6 +123,14 @@ protected boolean isSplitable(FileSystem fs, Path filename) {
for (FileSplit fileSplit: splits) {
Path file = fileSplit.getPath();
FileSystem fs = file.getFileSystem(conf);
+
+ if (!LzoInputFormatCommon.isLzoFile(file.toString())) {
+ // non-LZO file, keep the input split as is.
+ result.add(fileSplit);
+ continue;
+ }
+
+ // LZO file, try to split if the .index file was found
LzoIndex index = indexes.get(file);
if (index == null) {
throw new IOException("Index not found for " + file);
@@ -124,8 +158,18 @@ protected boolean isSplitable(FileSystem fs, Path filename) {
@Override
public RecordReader<LongWritable, Text> getRecordReader(InputSplit split,
JobConf conf, Reporter reporter) throws IOException {
- reporter.setStatus(split.toString());
- return new DeprecatedLzoLineRecordReader(conf, (FileSplit)split);
+ FileSplit fileSplit = (FileSplit) split;
+ if (LzoInputFormatCommon.isLzoFile(fileSplit.getPath().toString())) {
+ reporter.setStatus(split.toString());
+ return new DeprecatedLzoLineRecordReader(conf, (FileSplit)split);
+ } else {
+ // delegate non-LZO files to TextInputFormat
+ return textInputFormat.getRecordReader(split, conf, reporter);
+ }
}
+ @Override
+ public void configure(JobConf conf) {
+ textInputFormat.configure(conf);
+ }
}
@@ -38,34 +38,53 @@
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
import com.hadoop.compression.lzo.LzoIndex;
+import com.hadoop.compression.lzo.LzoInputFormatCommon;
import com.hadoop.compression.lzo.LzopCodec;
/**
* An {@link InputFormat} for lzop compressed text files. Files are broken into
* lines. Either linefeed or carriage-return are used to signal end of line.
* Keys are the position in the file, and values are the line of text.
+ *
+ * See {@link LzoInputFormatCommon} for a description of the boolean property
+ * <code>lzo.text.input.format.ignore.nonlzo</code> and how it affects the
+ * behavior of this input format.
*/
public class LzoTextInputFormat extends FileInputFormat<LongWritable, Text> {
+ // We need to call TextInputFormat.isSplitable() but the method is protected, so we
+ // make a private subclass that exposes a public wrapper method. /puke.
+ private class WrappedTextInputFormat extends TextInputFormat {
+ public boolean isSplitableWrapper(JobContext context, Path file) {
+ return isSplitable(context, file);
+ }
+ }
private final Map<Path, LzoIndex> indexes = new HashMap<Path, LzoIndex>();
+ private final WrappedTextInputFormat textInputFormat = new WrappedTextInputFormat();
@Override
protected List<FileStatus> listStatus(JobContext job) throws IOException {
List<FileStatus> files = super.listStatus(job);
- String fileExtension = new LzopCodec().getDefaultExtension();
Configuration conf = job.getConfiguration();
+ boolean ignoreNonLzo = LzoInputFormatCommon.getIgnoreNonLzoProperty(conf);
for (Iterator<FileStatus> iterator = files.iterator(); iterator.hasNext();) {
FileStatus fileStatus = iterator.next();
Path file = fileStatus.getPath();
FileSystem fs = file.getFileSystem(conf);
- if (!file.toString().endsWith(fileExtension)) {
- //get rid of non lzo files
- iterator.remove();
+ if (!LzoInputFormatCommon.isLzoFile(file.toString())) {
+ // Get rid of non-LZO files, unless the conf explicitly tells us to
+ // keep them.
+ // However, always skip over files that end with ".lzo.index", since
+ // they are not part of the input.
+ if (ignoreNonLzo || LzoInputFormatCommon.isLzoIndexFile(file.toString())) {
+ iterator.remove();
+ }
} else {
//read the index file
LzoIndex index = LzoIndex.readIndex(fs, file);
@@ -78,8 +97,13 @@
@Override
protected boolean isSplitable(JobContext context, Path filename) {
- LzoIndex index = indexes.get(filename);
- return !index.isEmpty();
+ if (LzoInputFormatCommon.isLzoFile(filename.toString())) {
+ LzoIndex index = indexes.get(filename);
+ return !index.isEmpty();
+ } else {
+ // Delegate non-LZO files to TextInputFormat.
+ return textInputFormat.isSplitableWrapper(context, filename);
+ }
}
@Override
@@ -92,10 +116,17 @@ protected boolean isSplitable(JobContext context, Path filename) {
List<InputSplit> result = new ArrayList<InputSplit>();
for (InputSplit genericSplit : splits) {
- // load the index
FileSplit fileSplit = (FileSplit) genericSplit;
Path file = fileSplit.getPath();
FileSystem fs = file.getFileSystem(conf);
+
+ if (!LzoInputFormatCommon.isLzoFile(file.toString())) {
+ // non-LZO file, keep the input split as is.
+ result.add(fileSplit);
+ continue;
+ }
+
+ // LZO file, try to split if the .index file was found
LzoIndex index = indexes.get(file);
if (index == null) {
throw new IOException("Index not found for " + file);
@@ -124,7 +155,11 @@ protected boolean isSplitable(JobContext context, Path filename) {
@Override
public RecordReader<LongWritable, Text> createRecordReader(InputSplit split,
TaskAttemptContext taskAttempt) throws IOException, InterruptedException {
-
- return new LzoLineRecordReader();
+ FileSplit fileSplit = (FileSplit) split;
+ if (LzoInputFormatCommon.isLzoFile(fileSplit.getPath().toString())) {
+ return new LzoLineRecordReader();
+ } else {
+ return textInputFormat.createRecordReader(split, taskAttempt);
+ }
}
}
Oops, something went wrong.

0 comments on commit c4d41c3

Please sign in to comment.