Skip to content

Commit

Permalink
Merge pull request #28 from ivmaykov/master
Browse files Browse the repository at this point in the history
Added an option for LzoTextInputFormat to handle non-Lzo file through TextInputFormat.
  • Loading branch information
rangadi committed Aug 26, 2011
2 parents 23e8370 + 0a0847f commit 3d19b14
Show file tree
Hide file tree
Showing 5 changed files with 280 additions and 27 deletions.
75 changes: 75 additions & 0 deletions src/java/com/hadoop/compression/lzo/LzoInputFormatCommon.java
@@ -0,0 +1,75 @@
/*
* This file is part of Hadoop-Gpl-Compression.
*
* Hadoop-Gpl-Compression is free software: you can redistribute it
* and/or modify it under the terms of the GNU General Public License
* as published by the Free Software Foundation, either version 3 of
* the License, or (at your option) any later version.
*
* Hadoop-Gpl-Compression is distributed in the hope that it will be
* useful, but WITHOUT ANY WARRANTY; without even the implied warranty
* of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
* GNU General Public License for more details.
*
* You should have received a copy of the GNU General Public License
* along with Hadoop-Gpl-Compression. If not, see
* <http://www.gnu.org/licenses/>.
*/

package com.hadoop.compression.lzo;

import org.apache.hadoop.conf.Configuration;

import com.hadoop.compression.lzo.LzoIndexer;
import com.hadoop.compression.lzo.LzopCodec;

public class LzoInputFormatCommon {
/**
* The boolean property <code>lzo.text.input.format.ignore.nonlzo</code> tells
* the LZO text input format whether it should silently ignore non-LZO input
* files. When the property is true (which is the default), non-LZO files will
* be silently ignored. When the property is false, non-LZO files will be
* processed using the standard TextInputFormat.
*/
public static final String IGNORE_NONLZO_KEY = "lzo.text.input.format.ignore.nonlzo";
/**
* Default value of the <code>lzo.text.input.format.ignore.nonlzo</code>
* property.
*/
public static final boolean DEFAULT_IGNORE_NONLZO = true;
/**
* Full extension for LZO index files (".lzo.index").
*/
public static final String FULL_LZO_INDEX_SUFFIX =
LzopCodec.DEFAULT_LZO_EXTENSION + LzoIndex.LZO_INDEX_SUFFIX;

/**
* @param conf the Configuration object
* @return the value of the <code>lzo.text.input.format.ignore.nonlzo</code>
* property in <code>conf</code>, or <code>DEFAULT_IGNORE_NONLZO</code>
* if the property is not set.
*/
public static boolean getIgnoreNonLzoProperty(Configuration conf) {
return conf.getBoolean(IGNORE_NONLZO_KEY, DEFAULT_IGNORE_NONLZO);
}

/**
* Checks if the given filename ends in ".lzo".
*
* @param filename filename to check.
* @return true if the filename ends in ".lzo"
*/
public static boolean isLzoFile(String filename) {
return filename.endsWith(LzopCodec.DEFAULT_LZO_EXTENSION);
}

/**
* Checks if the given filename ends in ".lzo.index".
*
* @param filename filename to check.
* @return true if the filename ends in ".lzo.index"
*/
public static boolean isLzoIndexFile(String filename) {
return filename.endsWith(FULL_LZO_INDEX_SUFFIX);
}
}
3 changes: 2 additions & 1 deletion src/java/com/hadoop/compression/lzo/LzopCodec.java
Expand Up @@ -43,6 +43,7 @@ public class LzopCodec extends LzoCodec {
public static final int LZOP_VERSION = 0x1010;
/** Latest verion of lzop this should be compatible with */
public static final int LZOP_COMPAT_VERSION = 0x0940;
public static final String DEFAULT_LZO_EXTENSION = ".lzo";

@Override
public CompressionOutputStream createOutputStream(OutputStream out) throws IOException {
Expand Down Expand Up @@ -107,6 +108,6 @@ public Decompressor createDecompressor() {

@Override
public String getDefaultExtension() {
return ".lzo";
return DEFAULT_LZO_EXTENSION;
}
}
57 changes: 45 additions & 12 deletions src/java/com/hadoop/mapred/DeprecatedLzoTextInputFormat.java
Expand Up @@ -31,14 +31,16 @@
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.io.LongWritable;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapred.FileInputFormat;
import org.apache.hadoop.mapred.FileSplit;
import org.apache.hadoop.mapred.InputSplit;
import org.apache.hadoop.mapred.JobConf;
import org.apache.hadoop.mapred.JobConfigurable;
import org.apache.hadoop.mapred.RecordReader;
import org.apache.hadoop.mapred.Reporter;
import org.apache.hadoop.mapred.TextInputFormat;

import com.hadoop.compression.lzo.LzoIndex;
import com.hadoop.compression.lzo.LzoInputFormatCommon;
import com.hadoop.compression.lzo.LzopCodec;

/**
Expand All @@ -50,27 +52,40 @@
* com.hadoop.mapred.DeprecatedLzoTextInputFormat, not
* com.hadoop.mapreduce.LzoTextInputFormat. The classes attempt to be alike in
* every other respect.
*
* Note that to use this input format properly with hadoop-streaming, you should
* also set the property <code>stream.map.input.ignoreKey=true</code>. That will
* replicate the behavior of the default TextInputFormat by stripping off the byte
* offset keys from the input lines that get piped to the mapper process.
*
* See {@link LzoInputFormatCommon} for a description of the boolean property
* <code>lzo.text.input.format.ignore.nonlzo</code> and how it affects the
* behavior of this input format.
*/

@SuppressWarnings("deprecation")
public class DeprecatedLzoTextInputFormat extends FileInputFormat<LongWritable, Text> {
public static final String LZO_INDEX_SUFFIX = ".index";
public class DeprecatedLzoTextInputFormat extends TextInputFormat {
private final Map<Path, LzoIndex> indexes = new HashMap<Path, LzoIndex>();

@Override
protected FileStatus[] listStatus(JobConf conf) throws IOException {
List<FileStatus> files = new ArrayList<FileStatus>(Arrays.asList(super.listStatus(conf)));

String fileExtension = new LzopCodec().getDefaultExtension();
boolean ignoreNonLzo = LzoInputFormatCommon.getIgnoreNonLzoProperty(conf);

Iterator<FileStatus> it = files.iterator();
while (it.hasNext()) {
FileStatus fileStatus = it.next();
Path file = fileStatus.getPath();

if (!file.toString().endsWith(fileExtension)) {
// Get rid of non-LZO files.
it.remove();
if (!LzoInputFormatCommon.isLzoFile(file.toString())) {
// Get rid of non-LZO files, unless the conf explicitly tells us to
// keep them.
// However, always skip over files that end with ".lzo.index", since
// they are not part of the input.
if (ignoreNonLzo || LzoInputFormatCommon.isLzoIndexFile(file.toString())) {
it.remove();
}
} else {
FileSystem fs = file.getFileSystem(conf);
LzoIndex index = LzoIndex.readIndex(fs, file);
Expand All @@ -83,8 +98,13 @@ protected FileStatus[] listStatus(JobConf conf) throws IOException {

@Override
protected boolean isSplitable(FileSystem fs, Path filename) {
LzoIndex index = indexes.get(filename);
return !index.isEmpty();
if (LzoInputFormatCommon.isLzoFile(filename.toString())) {
LzoIndex index = indexes.get(filename);
return !index.isEmpty();
} else {
// Delegate non-LZO files to the TextInputFormat base class.
return super.isSplitable(fs, filename);
}
}

@Override
Expand All @@ -97,6 +117,14 @@ public InputSplit[] getSplits(JobConf conf, int numSplits) throws IOException {
for (FileSplit fileSplit: splits) {
Path file = fileSplit.getPath();
FileSystem fs = file.getFileSystem(conf);

if (!LzoInputFormatCommon.isLzoFile(file.toString())) {
// non-LZO file, keep the input split as is.
result.add(fileSplit);
continue;
}

// LZO file, try to split if the .index file was found
LzoIndex index = indexes.get(file);
if (index == null) {
throw new IOException("Index not found for " + file);
Expand Down Expand Up @@ -124,8 +152,13 @@ public InputSplit[] getSplits(JobConf conf, int numSplits) throws IOException {
@Override
public RecordReader<LongWritable, Text> getRecordReader(InputSplit split,
JobConf conf, Reporter reporter) throws IOException {
reporter.setStatus(split.toString());
return new DeprecatedLzoLineRecordReader(conf, (FileSplit)split);
FileSplit fileSplit = (FileSplit) split;
if (LzoInputFormatCommon.isLzoFile(fileSplit.getPath().toString())) {
reporter.setStatus(split.toString());
return new DeprecatedLzoLineRecordReader(conf, (FileSplit)split);
} else {
// delegate non-LZO files to the TextInputFormat base class.
return super.getRecordReader(split, conf, reporter);
}
}

}
52 changes: 39 additions & 13 deletions src/java/com/hadoop/mapreduce/LzoTextInputFormat.java
Expand Up @@ -36,36 +36,45 @@
import org.apache.hadoop.mapreduce.JobContext;
import org.apache.hadoop.mapreduce.RecordReader;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
import org.apache.hadoop.mapreduce.lib.input.FileSplit;
import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;

import com.hadoop.compression.lzo.LzoIndex;
import com.hadoop.compression.lzo.LzoInputFormatCommon;
import com.hadoop.compression.lzo.LzopCodec;

/**
* An {@link InputFormat} for lzop compressed text files. Files are broken into
* lines. Either linefeed or carriage-return are used to signal end of line.
* Keys are the position in the file, and values are the line of text.
*
* See {@link LzoInputFormatCommon} for a description of the boolean property
* <code>lzo.text.input.format.ignore.nonlzo</code> and how it affects the
* behavior of this input format.
*/
public class LzoTextInputFormat extends FileInputFormat<LongWritable, Text> {

public class LzoTextInputFormat extends TextInputFormat {
private final Map<Path, LzoIndex> indexes = new HashMap<Path, LzoIndex>();

@Override
protected List<FileStatus> listStatus(JobContext job) throws IOException {
List<FileStatus> files = super.listStatus(job);

String fileExtension = new LzopCodec().getDefaultExtension();
Configuration conf = job.getConfiguration();
boolean ignoreNonLzo = LzoInputFormatCommon.getIgnoreNonLzoProperty(conf);

for (Iterator<FileStatus> iterator = files.iterator(); iterator.hasNext();) {
FileStatus fileStatus = iterator.next();
Path file = fileStatus.getPath();
FileSystem fs = file.getFileSystem(conf);

if (!file.toString().endsWith(fileExtension)) {
//get rid of non lzo files
iterator.remove();
if (!LzoInputFormatCommon.isLzoFile(file.toString())) {
// Get rid of non-LZO files, unless the conf explicitly tells us to
// keep them.
// However, always skip over files that end with ".lzo.index", since
// they are not part of the input.
if (ignoreNonLzo || LzoInputFormatCommon.isLzoIndexFile(file.toString())) {
iterator.remove();
}
} else {
//read the index file
LzoIndex index = LzoIndex.readIndex(fs, file);
Expand All @@ -78,8 +87,13 @@ protected List<FileStatus> listStatus(JobContext job) throws IOException {

@Override
protected boolean isSplitable(JobContext context, Path filename) {
LzoIndex index = indexes.get(filename);
return !index.isEmpty();
if (LzoInputFormatCommon.isLzoFile(filename.toString())) {
LzoIndex index = indexes.get(filename);
return !index.isEmpty();
} else {
// Delegate non-LZO files to the TextInputFormat base class.
return super.isSplitable(context, filename);
}
}

@Override
Expand All @@ -92,10 +106,17 @@ public List<InputSplit> getSplits(JobContext job) throws IOException {
List<InputSplit> result = new ArrayList<InputSplit>();

for (InputSplit genericSplit : splits) {
// load the index
FileSplit fileSplit = (FileSplit) genericSplit;
Path file = fileSplit.getPath();
FileSystem fs = file.getFileSystem(conf);

if (!LzoInputFormatCommon.isLzoFile(file.toString())) {
// non-LZO file, keep the input split as is.
result.add(fileSplit);
continue;
}

// LZO file, try to split if the .index file was found
LzoIndex index = indexes.get(file);
if (index == null) {
throw new IOException("Index not found for " + file);
Expand Down Expand Up @@ -123,8 +144,13 @@ public List<InputSplit> getSplits(JobContext job) throws IOException {

@Override
public RecordReader<LongWritable, Text> createRecordReader(InputSplit split,
TaskAttemptContext taskAttempt) throws IOException, InterruptedException {

return new LzoLineRecordReader();
TaskAttemptContext taskAttempt) {
FileSplit fileSplit = (FileSplit) split;
if (LzoInputFormatCommon.isLzoFile(fileSplit.getPath().toString())) {
return new LzoLineRecordReader();
} else {
// Delegate non-LZO files to the TextInputFormat base class.
return super.createRecordReader(split, taskAttempt);
}
}
}

0 comments on commit 3d19b14

Please sign in to comment.