Permalink
Browse files

Per code review feedback from Raghu Angadi, removed the LzoStreamingI…

…nputFormat.
  • Loading branch information...
Ilya Maykov
Ilya Maykov committed Aug 23, 2011
1 parent 806e494 commit dbfd677af2ea2543bf418d91d0474e5099fee166
@@ -1,105 +0,0 @@
-/*
- * This file is part of Hadoop-Gpl-Compression.
- *
- * Hadoop-Gpl-Compression is free software: you can redistribute it
- * and/or modify it under the terms of the GNU General Public License
- * as published by the Free Software Foundation, either version 3 of
- * the License, or (at your option) any later version.
- *
- * Hadoop-Gpl-Compression is distributed in the hope that it will be
- * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
- * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with Hadoop-Gpl-Compression. If not, see
- * <http://www.gnu.org/licenses/>.
- */
-
-package com.hadoop.mapred;
-
-import java.io.IOException;
-
-import org.apache.hadoop.fs.FileStatus;
-import org.apache.hadoop.fs.FileSystem;
-import org.apache.hadoop.fs.Path;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.FileInputFormat;
-import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapred.InputSplit;
-import org.apache.hadoop.mapred.JobConf;
-import org.apache.hadoop.mapred.JobConfigurable;
-import org.apache.hadoop.mapred.KeyValueTextInputFormat;
-import org.apache.hadoop.mapred.RecordReader;
-import org.apache.hadoop.mapred.Reporter;
-
-import com.hadoop.compression.lzo.LzoInputFormatCommon;
-
-/**
- * This class conforms to the old (org.apache.hadoop.mapred.*) hadoop API style
- * which is deprecated but still required in places. Streaming, for example,
- * does a check that the given input format is a descendant of
- * org.apache.hadoop.mapred.InputFormat, which any InputFormat-derived class
- * from the new API fails. In order for streaming to work, you must use
- * com.hadoop.mapred.LzoStreamingInputFormat.
- *
- * See {@link LzoInputFormatCommon} for a description of the boolean property
- * <code>lzo.text.input.format.ignore.nonlzo</code> and how it affects the
- * behavior of this input format.
-*/
-
-@SuppressWarnings("deprecation")
-public class LzoStreamingInputFormat extends FileInputFormat<Text, Text>
- implements JobConfigurable {
-
- // Wrapper around DeprecatedLzoTextInputFormat that exposes a couple
- // protected methods so we can delegate to them.
- private class WrappedDeprecatedLzoTextInputFormat extends DeprecatedLzoTextInputFormat {
- public boolean isSplitableWrapper(FileSystem fs, Path filename) {
- return isSplitable(fs, filename);
- }
-
- public FileStatus[] listStatusWrapper(JobConf conf) throws IOException {
- return listStatus(conf);
- }
- }
-
- // This class delegates most calls to either DeprecatedLzoTextInputFormat
- // (listStatus, getSplits, isSplitable) or KeyValueTextInputFormat
- // (getRecordReader for non-LZO files).
- private final WrappedDeprecatedLzoTextInputFormat lzoTextInputFormat =
- new WrappedDeprecatedLzoTextInputFormat();
- private final KeyValueTextInputFormat kvTextInputFormat =
- new KeyValueTextInputFormat();
-
- @Override
- public void configure(JobConf conf) {
- lzoTextInputFormat.configure(conf);
- kvTextInputFormat.configure(conf);
- }
-
- @Override
- protected FileStatus[] listStatus(JobConf conf) throws IOException {
- // Delegate to DeprecatedLzoTextInputFormat
- return lzoTextInputFormat.listStatusWrapper(conf);
- }
-
- @Override
- public InputSplit[] getSplits(JobConf conf, int numSplits) throws IOException {
- // Delegate to DeprecatedLzoTextInputFormat
- return lzoTextInputFormat.getSplits(conf, numSplits);
- }
-
- public RecordReader<Text, Text> getRecordReader(InputSplit split,
- JobConf conf, Reporter reporter) throws IOException {
-
- FileSplit fileSplit = (FileSplit) split;
- if (LzoInputFormatCommon.isLzoFile(fileSplit.getPath().toString())) {
- reporter.setStatus(split.toString());
- return new LzoStreamingLineRecordReader(conf, (FileSplit)split);
- } else {
- // delegate non-LZO files to KeyValueTextInputFormat
- return kvTextInputFormat.getRecordReader(split, conf, reporter);
- }
- }
-}
@@ -1,140 +0,0 @@
-/*
- * This file is part of Hadoop-Gpl-Compression.
- *
- * Hadoop-Gpl-Compression is free software: you can redistribute it
- * and/or modify it under the terms of the GNU General Public License
- * as published by the Free Software Foundation, either version 3 of
- * the License, or (at your option) any later version.
- *
- * Hadoop-Gpl-Compression is distributed in the hope that it will be
- * useful, but WITHOUT ANY WARRANTY; without even the implied warranty
- * of MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
- * GNU General Public License for more details.
- *
- * You should have received a copy of the GNU General Public License
- * along with Hadoop-Gpl-Compression. If not, see
- * <http://www.gnu.org/licenses/>.
- */
-
-package com.hadoop.mapred;
-
-import java.io.IOException;
-
-import org.apache.hadoop.conf.Configuration;
-import org.apache.hadoop.io.LongWritable;
-import org.apache.hadoop.io.Text;
-import org.apache.hadoop.mapred.FileSplit;
-import org.apache.hadoop.mapred.RecordReader;
-
-/**
- * This class treats a line in the input as a key/value pair separated by a
- * separator character. The separator can be specified in config file
- * under the attribute name key.value.separator.in.input.line. The default
- * separator is the tab character ('\t').
- *
- * Note: this class is basically a copy of
- * {@link org.apache.hadoop.mapred.KeyValueLineRecordReader}, except that it
- * uses {@link DeprecatedLzoLineRecordReader} as the internal line reader.
- */
-@SuppressWarnings("deprecation")
-public class LzoStreamingLineRecordReader implements RecordReader<Text, Text> {
- private final DeprecatedLzoLineRecordReader lzoLineRecordReader;
- private byte separator = (byte) '\t';
- private LongWritable dummyKey;
- private Text innerValue;
-
- public Class getKeyClass() {
- return Text.class;
- }
-
- @Override
- public Text createKey() {
- return new Text();
- }
-
- @Override
- public Text createValue() {
- return new Text();
- }
-
- public LzoStreamingLineRecordReader(Configuration job, FileSplit split)
- throws IOException {
-
- lzoLineRecordReader = new DeprecatedLzoLineRecordReader(job, split);
- dummyKey = lzoLineRecordReader.createKey();
- innerValue = lzoLineRecordReader.createValue();
- String sepStr = job.get("key.value.separator.in.input.line", "\t");
- this.separator = (byte) sepStr.charAt(0);
- }
-
- /**
- * Note: copied from org.apache.hadoop.mapreduce.lib.input.KeyValueLineRecordReader,
- * which is a library class that's missing from the hadoop jar that's in the
- * lib/ directory. Maybe it should be added.
- */
- public static int findSeparator(byte[] utf, int start, int length,
- byte sep) {
- for (int i = start; i < (start + length); i++) {
- if (utf[i] == sep) {
- return i;
- }
- }
- return -1;
- }
-
- /** Read key/value pair in a line. */
- @Override
- public synchronized boolean next(Text key, Text value)
- throws IOException {
- byte[] line = null;
- int lineLen = -1;
- if (lzoLineRecordReader.next(dummyKey, innerValue)) {
- line = innerValue.getBytes();
- lineLen = innerValue.getLength();
- } else {
- return false;
- }
- if (line == null)
- return false;
- int pos = findSeparator(line, 0, lineLen, this.separator);
- LzoStreamingLineRecordReader.setKeyValue(key, value, line, lineLen, pos);
- return true;
- }
-
- @Override
- public float getProgress() throws IOException {
- return lzoLineRecordReader.getProgress();
- }
-
- @Override
- public synchronized long getPos() throws IOException {
- return lzoLineRecordReader.getPos();
- }
-
- @Override
- public synchronized void close() throws IOException {
- lzoLineRecordReader.close();
- }
-
- /**
- * Note: copied from org.apache.hadoop.mapreduce.lib.input.KeyValueLineRecordReader,
- * which is a library class that's missing from the hadoop jar that's in the
- * lib/ directory. Maybe it should be added.
- */
- private static void setKeyValue(Text key, Text value, byte[] line,
- int lineLen, int pos) {
- if (pos == -1) {
- key.set(line, 0, lineLen);
- value.set("");
- } else {
- int keyLen = pos;
- byte[] keyBytes = new byte[keyLen];
- System.arraycopy(line, 0, keyBytes, 0, keyLen);
- int valLen = lineLen - keyLen - 1;
- byte[] valBytes = new byte[valLen];
- System.arraycopy(line, pos + 1, valBytes, 0, valLen);
- key.set(keyBytes);
- value.set(valBytes);
- }
- }
-}

0 comments on commit dbfd677

Please sign in to comment.