From 18da5d0edfa95337b69d02fc8f87b709d3f98c16 Mon Sep 17 00:00:00 2001 From: twalthr Date: Wed, 7 May 2014 02:54:29 +0200 Subject: [PATCH] The Hadoop Compatibility has been refactored and extended to support the new Java API. --- .../mapred/HadoopInputFormat.java | 287 +++++++++++++++ .../mapred/HadoopOutputFormat.java | 164 +++++++++ .../mapred/example/WordCount.java | 115 ++++++ .../{ => mapred/record}/HadoopDataSink.java | 10 +- .../{ => mapred/record}/HadoopDataSource.java | 12 +- .../record/HadoopRecordInputFormat.java} | 29 +- .../record/HadoopRecordOutputFormat.java} | 24 +- .../datatypes/DefaultHadoopTypeConverter.java | 2 +- .../DefaultStratosphereTypeConverter.java | 2 +- .../datatypes/HadoopFileOutputCommitter.java} | 4 +- .../datatypes/HadoopTypeConverter.java | 2 +- .../datatypes/StratosphereTypeConverter.java | 2 +- .../datatypes/WritableComparableWrapper.java | 2 +- .../record}/datatypes/WritableWrapper.java | 2 +- .../datatypes/WritableWrapperConverter.java | 2 +- .../record}/example/WordCount.java | 17 +- .../example/WordCountWithOutputFormat.java} | 17 +- .../mapred/utils/HadoopUtils.java | 84 +++++ .../wrapper/HadoopDummyProgressable.java} | 4 +- .../wrapper/HadoopDummyReporter.java} | 4 +- .../wrapper/HadoopInputSplit.java} | 8 +- .../mapreduce/HadoopInputFormat.java | 337 ++++++++++++++++++ .../mapreduce/HadoopOutputFormat.java | 204 +++++++++++ .../mapreduce/example/WordCount.java | 114 ++++++ .../mapreduce/utils/HadoopUtils.java | 80 +++++ .../mapreduce/wrapper/HadoopInputSplit.java | 86 +++++ .../mapred/HadoopInputOutputITCase.java} | 44 ++- .../HadoopRecordInputOutputITCase.java} | 8 +- .../mapreduce/HadoopInputOutputITCase.java | 40 +++ 29 files changed, 1623 insertions(+), 83 deletions(-) create mode 100644 stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/mapred/HadoopInputFormat.java create mode 100644 stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/mapred/HadoopOutputFormat.java create mode 100644 stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/mapred/example/WordCount.java rename stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/{ => mapred/record}/HadoopDataSink.java (91%) rename stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/{ => mapred/record}/HadoopDataSource.java (89%) rename stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/{HadoopInputFormatWrapper.java => mapred/record/HadoopRecordInputFormat.java} (79%) rename stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/{HadoopOutputFormatWrapper.java => mapred/record/HadoopRecordOutputFormat.java} (81%) rename stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/{ => mapred/record}/datatypes/DefaultHadoopTypeConverter.java (97%) rename stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/{ => mapred/record}/datatypes/DefaultStratosphereTypeConverter.java (98%) rename stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/{FileOutputCommitterWrapper.java => mapred/record/datatypes/HadoopFileOutputCommitter.java} (97%) rename stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/{ => mapred/record}/datatypes/HadoopTypeConverter.java (95%) rename stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/{ => mapred/record}/datatypes/StratosphereTypeConverter.java (95%) rename stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/{ => mapred/record}/datatypes/WritableComparableWrapper.java (95%) rename stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/{ => mapred/record}/datatypes/WritableWrapper.java (96%) rename stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/{ => mapred/record}/datatypes/WritableWrapperConverter.java (96%) rename stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/{ => mapred/record}/example/WordCount.java (92%) rename stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/{example/WordCountWithHadoopOutputFormat.java => mapred/record/example/WordCountWithOutputFormat.java} (91%) create mode 100644 stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/mapred/utils/HadoopUtils.java rename stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/{DummyHadoopProgressable.java => mapred/wrapper/HadoopDummyProgressable.java} (89%) rename stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/{DummyHadoopReporter.java => mapred/wrapper/HadoopDummyReporter.java} (93%) rename stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/{HadoopInputSplitWrapper.java => mapred/wrapper/HadoopInputSplit.java} (91%) create mode 100644 stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/mapreduce/HadoopInputFormat.java create mode 100644 stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/mapreduce/HadoopOutputFormat.java create mode 100644 stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/mapreduce/example/WordCount.java create mode 100644 stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/mapreduce/utils/HadoopUtils.java create mode 100644 stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/mapreduce/wrapper/HadoopInputSplit.java rename stratosphere-addons/hadoop-compatibility/src/{main/java/eu/stratosphere/hadoopcompatibility/HadoopConfiguration.java => test/java/eu/stratosphere/test/hadoopcompatibility/mapred/HadoopInputOutputITCase.java} (52%) rename stratosphere-addons/hadoop-compatibility/src/test/java/eu/stratosphere/test/hadoopcompatibility/{HadoopInputOutputTest.java => mapred/record/HadoopRecordInputOutputITCase.java} (86%) create mode 100644 stratosphere-addons/hadoop-compatibility/src/test/java/eu/stratosphere/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java diff --git a/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/mapred/HadoopInputFormat.java b/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/mapred/HadoopInputFormat.java new file mode 100644 index 0000000000000..882f4a3db7fa7 --- /dev/null +++ b/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/mapred/HadoopInputFormat.java @@ -0,0 +1,287 @@ +/*********************************************************************************************************************** + * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + **********************************************************************************************************************/ + +package eu.stratosphere.hadoopcompatibility.mapred; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.ArrayList; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.FileInputFormat; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.RecordReader; +import org.apache.hadoop.util.ReflectionUtils; + +import eu.stratosphere.api.common.io.FileInputFormat.FileBaseStatistics; +import eu.stratosphere.api.common.io.InputFormat; +import eu.stratosphere.api.common.io.statistics.BaseStatistics; +import eu.stratosphere.api.java.tuple.Tuple2; +import eu.stratosphere.api.java.typeutils.ResultTypeQueryable; +import eu.stratosphere.api.java.typeutils.TupleTypeInfo; +import eu.stratosphere.api.java.typeutils.WritableTypeInfo; +import eu.stratosphere.configuration.Configuration; +import eu.stratosphere.core.fs.FileStatus; +import eu.stratosphere.core.fs.FileSystem; +import eu.stratosphere.core.fs.Path; +import eu.stratosphere.hadoopcompatibility.mapred.utils.HadoopUtils; +import eu.stratosphere.hadoopcompatibility.mapred.wrapper.HadoopDummyReporter; +import eu.stratosphere.hadoopcompatibility.mapred.wrapper.HadoopInputSplit; +import eu.stratosphere.types.TypeInformation; + +public class HadoopInputFormat implements InputFormat, HadoopInputSplit>, ResultTypeQueryable> { + + private static final long serialVersionUID = 1L; + + private static final Log LOG = LogFactory.getLog(HadoopInputFormat.class); + + private org.apache.hadoop.mapred.InputFormat mapredInputFormat; + private Class keyClass; + private Class valueClass; + private JobConf jobConf; + + public transient K key; + public transient V value; + + public RecordReader recordReader; + private transient boolean fetched = false; + private transient boolean hasNext; + + public HadoopInputFormat() { + super(); + } + + public HadoopInputFormat(org.apache.hadoop.mapred.InputFormat mapredInputFormat, Class key, Class value, JobConf job) { + super(); + this.mapredInputFormat = mapredInputFormat; + this.keyClass = key; + this.valueClass = value; + HadoopUtils.mergeHadoopConf(job); + this.jobConf = job; + } + + public void setJobConf(JobConf job) { + this.jobConf = job; + } + + public org.apache.hadoop.mapred.InputFormat getHadoopInputFormat() { + return mapredInputFormat; + } + + public void setHadoopInputFormat(org.apache.hadoop.mapred.InputFormat mapredInputFormat) { + this.mapredInputFormat = mapredInputFormat; + } + + public JobConf getJobConf() { + return jobConf; + } + + // -------------------------------------------------------------------------------------------- + // InputFormat + // -------------------------------------------------------------------------------------------- + + @Override + public void configure(Configuration parameters) { + // nothing to do + } + + @Override + public BaseStatistics getStatistics(BaseStatistics cachedStats) throws IOException { + // only gather base statistics for FileInputFormats + if(!(mapredInputFormat instanceof FileInputFormat)) { + return null; + } + + final FileBaseStatistics cachedFileStats = (cachedStats != null && cachedStats instanceof FileBaseStatistics) ? + (FileBaseStatistics) cachedStats : null; + + try { + final org.apache.hadoop.fs.Path[] paths = FileInputFormat.getInputPaths(this.jobConf); + + return getFileStats(cachedFileStats, paths, new ArrayList(1)); + } catch (IOException ioex) { + if (LOG.isWarnEnabled()) { + LOG.warn("Could not determine statistics due to an io error: " + + ioex.getMessage()); + } + } + catch (Throwable t) { + if (LOG.isErrorEnabled()) { + LOG.error("Unexpected problen while getting the file statistics: " + + t.getMessage(), t); + } + } + + // no statistics available + return null; + } + + @Override + public HadoopInputSplit[] createInputSplits(int minNumSplits) + throws IOException { + org.apache.hadoop.mapred.InputSplit[] splitArray = mapredInputFormat.getSplits(jobConf, minNumSplits); + HadoopInputSplit[] hiSplit = new HadoopInputSplit[splitArray.length]; + for(int i=0;i getInputSplitType() { + return HadoopInputSplit.class; + } + + @Override + public void open(HadoopInputSplit split) throws IOException { + this.recordReader = this.mapredInputFormat.getRecordReader(split.getHadoopInputSplit(), jobConf, new HadoopDummyReporter()); + key = this.recordReader.createKey(); + value = this.recordReader.createValue(); + this.fetched = false; + } + + @Override + public boolean reachedEnd() throws IOException { + if(!fetched) { + fetchNext(); + } + return !hasNext; + } + + private void fetchNext() throws IOException { + hasNext = this.recordReader.next(key, value); + fetched = true; + } + + @Override + public Tuple2 nextRecord(Tuple2 record) throws IOException { + if(!fetched) { + fetchNext(); + } + if(!hasNext) { + return null; + } + record.f0 = key; + record.f1 = value; + fetched = false; + return record; + } + + @Override + public void close() throws IOException { + this.recordReader.close(); + } + + // -------------------------------------------------------------------------------------------- + // Helper methods + // -------------------------------------------------------------------------------------------- + + private FileBaseStatistics getFileStats(FileBaseStatistics cachedStats, org.apache.hadoop.fs.Path[] hadoopFilePaths, + ArrayList files) throws IOException { + + long latestModTime = 0L; + + // get the file info and check whether the cached statistics are still valid. + for(org.apache.hadoop.fs.Path hadoopPath : hadoopFilePaths) { + + final Path filePath = new Path(hadoopPath.toUri()); + final FileSystem fs = FileSystem.get(filePath.toUri()); + + final FileStatus file = fs.getFileStatus(filePath); + latestModTime = Math.max(latestModTime, file.getModificationTime()); + + // enumerate all files and check their modification time stamp. + if (file.isDir()) { + FileStatus[] fss = fs.listStatus(filePath); + files.ensureCapacity(files.size() + fss.length); + + for (FileStatus s : fss) { + if (!s.isDir()) { + files.add(s); + latestModTime = Math.max(s.getModificationTime(), latestModTime); + } + } + } else { + files.add(file); + } + } + + // check whether the cached statistics are still valid, if we have any + if (cachedStats != null && latestModTime <= cachedStats.getLastModificationTime()) { + return cachedStats; + } + + // calculate the whole length + long len = 0; + for (FileStatus s : files) { + len += s.getLen(); + } + + // sanity check + if (len <= 0) { + len = BaseStatistics.SIZE_UNKNOWN; + } + + return new FileBaseStatistics(latestModTime, len, BaseStatistics.AVG_RECORD_BYTES_UNKNOWN); + } + + // -------------------------------------------------------------------------------------------- + // Custom serialization methods + // -------------------------------------------------------------------------------------------- + + private void writeObject(ObjectOutputStream out) throws IOException { + out.writeUTF(mapredInputFormat.getClass().getName()); + out.writeUTF(keyClass.getName()); + out.writeUTF(valueClass.getName()); + jobConf.write(out); + } + + @SuppressWarnings("unchecked") + private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { + String hadoopInputFormatClassName = in.readUTF(); + String keyClassName = in.readUTF(); + String valueClassName = in.readUTF(); + if(jobConf == null) { + jobConf = new JobConf(); + } + jobConf.readFields(in); + try { + this.mapredInputFormat = (org.apache.hadoop.mapred.InputFormat) Class.forName(hadoopInputFormatClassName, true, Thread.currentThread().getContextClassLoader()).newInstance(); + } catch (Exception e) { + throw new RuntimeException("Unable to instantiate the hadoop input format", e); + } + try { + this.keyClass = (Class) Class.forName(keyClassName, true, Thread.currentThread().getContextClassLoader()); + } catch (Exception e) { + throw new RuntimeException("Unable to find key class.", e); + } + try { + this.valueClass = (Class) Class.forName(valueClassName, true, Thread.currentThread().getContextClassLoader()); + } catch (Exception e) { + throw new RuntimeException("Unable to find value class.", e); + } + ReflectionUtils.setConf(mapredInputFormat, jobConf); + } + + // -------------------------------------------------------------------------------------------- + // ResultTypeQueryable + // -------------------------------------------------------------------------------------------- + + @Override + public TypeInformation> getProducedType() { + return new TupleTypeInfo>(new WritableTypeInfo((Class) keyClass), new WritableTypeInfo((Class) valueClass)); + } +} diff --git a/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/mapred/HadoopOutputFormat.java b/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/mapred/HadoopOutputFormat.java new file mode 100644 index 0000000000000..849c7011dca39 --- /dev/null +++ b/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/mapred/HadoopOutputFormat.java @@ -0,0 +1,164 @@ +/*********************************************************************************************************************** + * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + **********************************************************************************************************************/ + +package eu.stratosphere.hadoopcompatibility.mapred; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; + +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapred.FileOutputCommitter; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobContext; +import org.apache.hadoop.mapred.JobID; +import org.apache.hadoop.mapred.RecordWriter; +import org.apache.hadoop.mapred.TaskAttemptContext; +import org.apache.hadoop.mapred.TaskAttemptID; +import org.apache.hadoop.util.ReflectionUtils; + +import eu.stratosphere.api.common.io.OutputFormat; +import eu.stratosphere.api.java.tuple.Tuple2; +import eu.stratosphere.configuration.Configuration; +import eu.stratosphere.hadoopcompatibility.mapred.utils.HadoopUtils; +import eu.stratosphere.hadoopcompatibility.mapred.wrapper.HadoopDummyProgressable; +import eu.stratosphere.hadoopcompatibility.mapred.wrapper.HadoopDummyReporter; + + +public class HadoopOutputFormat implements OutputFormat> { + + private static final long serialVersionUID = 1L; + + public JobConf jobConf; + public org.apache.hadoop.mapred.OutputFormat mapredOutputFormat; + public transient RecordWriter recordWriter; + public transient FileOutputCommitter fileOutputCommitter; + private transient TaskAttemptContext context; + private transient JobContext jobContext; + + public HadoopOutputFormat(org.apache.hadoop.mapred.OutputFormat mapredOutputFormat, JobConf job) { + super(); + this.mapredOutputFormat = mapredOutputFormat; + HadoopUtils.mergeHadoopConf(job); + this.jobConf = job; + } + + public void setJobConf(JobConf job) { + this.jobConf = job; + } + + public JobConf getJobConf() { + return jobConf; + } + + public org.apache.hadoop.mapred.OutputFormat getHadoopOutputFormat() { + return mapredOutputFormat; + } + + public void setHadoopOutputFormat(org.apache.hadoop.mapred.OutputFormat mapredOutputFormat) { + this.mapredOutputFormat = mapredOutputFormat; + } + + // -------------------------------------------------------------------------------------------- + // OutputFormat + // -------------------------------------------------------------------------------------------- + + @Override + public void configure(Configuration parameters) { + // nothing to do + } + + /** + * create the temporary output file for hadoop RecordWriter. + * @param taskNumber The number of the parallel instance. + * @param numTasks The number of parallel tasks. + * @throws IOException + */ + @Override + public void open(int taskNumber, int numTasks) throws IOException { + if (Integer.toString(taskNumber + 1).length() > 6) { + throw new IOException("Task id too large."); + } + + TaskAttemptID taskAttemptID = TaskAttemptID.forName("attempt__0000_r_" + + String.format("%" + (6 - Integer.toString(taskNumber + 1).length()) + "s"," ").replace(" ", "0") + + Integer.toString(taskNumber + 1) + + "_0"); + + try { + this.context = HadoopUtils.instantiateTaskAttemptContext(this.jobConf, taskAttemptID); + } catch (Exception e) { + throw new RuntimeException(e); + } + + this.jobConf.set("mapred.task.id", taskAttemptID.toString()); + // for hadoop 2.2 + this.jobConf.set("mapreduce.task.attempt.id", taskAttemptID.toString()); + + this.fileOutputCommitter = new FileOutputCommitter(); + + try { + this.jobContext = HadoopUtils.instantiateJobContext(this.jobConf, new JobID()); + } + catch (Exception e) { + throw new RuntimeException(e); + } + + this.fileOutputCommitter.setupJob(jobContext); + + this.recordWriter = this.mapredOutputFormat.getRecordWriter(null, this.jobConf, Integer.toString(taskNumber + 1), new HadoopDummyProgressable()); + } + + @Override + public void writeRecord(Tuple2 record) throws IOException { + this.recordWriter.write(record.f0, record.f1); + } + + /** + * commit the task by moving the output file out from the temporary directory. + * @throws IOException + */ + @Override + public void close() throws IOException { + this.recordWriter.close(new HadoopDummyReporter()); + + if (this.fileOutputCommitter.needsTaskCommit(this.context)) { + this.fileOutputCommitter.commitTask(this.context); + } + this.fileOutputCommitter.commitJob(this.jobContext); + } + + // -------------------------------------------------------------------------------------------- + // Custom serialization methods + // -------------------------------------------------------------------------------------------- + + private void writeObject(ObjectOutputStream out) throws IOException { + out.writeUTF(mapredOutputFormat.getClass().getName()); + jobConf.write(out); + } + + @SuppressWarnings("unchecked") + private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { + String hadoopOutputFormatName = in.readUTF(); + if(jobConf == null) { + jobConf = new JobConf(); + } + jobConf.readFields(in); + try { + this.mapredOutputFormat = (org.apache.hadoop.mapred.OutputFormat) Class.forName(hadoopOutputFormatName, true, Thread.currentThread().getContextClassLoader()).newInstance(); + } catch (Exception e) { + throw new RuntimeException("Unable to instantiate the hadoop output format", e); + } + ReflectionUtils.setConf(mapredOutputFormat, jobConf); + } +} diff --git a/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/mapred/example/WordCount.java b/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/mapred/example/WordCount.java new file mode 100644 index 0000000000000..54160bf9cd66e --- /dev/null +++ b/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/mapred/example/WordCount.java @@ -0,0 +1,115 @@ +/*********************************************************************************************************************** + * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + **********************************************************************************************************************/ +package eu.stratosphere.hadoopcompatibility.mapred.example; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.TextInputFormat; +import org.apache.hadoop.mapred.TextOutputFormat; + +import eu.stratosphere.api.java.DataSet; +import eu.stratosphere.api.java.ExecutionEnvironment; +import eu.stratosphere.api.java.aggregation.Aggregations; +import eu.stratosphere.api.java.functions.FlatMapFunction; +import eu.stratosphere.api.java.functions.MapFunction; +import eu.stratosphere.api.java.tuple.Tuple2; +import eu.stratosphere.hadoopcompatibility.mapred.HadoopInputFormat; +import eu.stratosphere.hadoopcompatibility.mapred.HadoopOutputFormat; +import eu.stratosphere.util.Collector; + + + +/** + * Implements a word count which takes the input file and counts the number of + * occurrences of each word in the file and writes the result back to disk. + * + * This example shows how to use Hadoop Input Formats, how to convert Hadoop Writables to + * common Java types for better usage in a Stratosphere job and how to use Hadoop Output Formats. + */ +@SuppressWarnings("serial") +public class WordCount { + + public static void main(String[] args) throws Exception { + if (args.length < 2) { + System.err.println("Usage: WordCount "); + return; + } + + final String inputPath = args[0]; + final String outputPath = args[1]; + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setDegreeOfParallelism(1); + + // Set up the Hadoop Input Format + HadoopInputFormat hadoopInputFormat = new HadoopInputFormat(new TextInputFormat(), LongWritable.class, Text.class, new JobConf()); + TextInputFormat.addInputPath(hadoopInputFormat.getJobConf(), new Path(inputPath)); + + // Create a Stratosphere job with it + DataSet> text = env.createInput(hadoopInputFormat); + + // Tokenize the line and convert from Writable "Text" to String for better handling + DataSet> words = text.flatMap(new Tokenizer()); + + // Sum up the words + DataSet> result = words.groupBy(0).aggregate(Aggregations.SUM, 1); + + // Convert String back to Writable "Text" for use with Hadoop Output Format + DataSet> hadoopResult = result.map(new HadoopDatatypeMapper()); + + // Set up Hadoop Output Format + HadoopOutputFormat hadoopOutputFormat = new HadoopOutputFormat(new TextOutputFormat(), new JobConf()); + hadoopOutputFormat.getJobConf().set("mapred.textoutputformat.separator", " "); + TextOutputFormat.setOutputPath(hadoopOutputFormat.getJobConf(), new Path(outputPath)); + + // Output & Execute + hadoopResult.output(hadoopOutputFormat); + env.execute("Word Count"); + } + + /** + * Splits a line into words and converts Hadoop Writables into normal Java data types. + */ + public static final class Tokenizer extends FlatMapFunction, Tuple2> { + + @Override + public void flatMap(Tuple2 value, Collector> out) { + // normalize and split the line + String line = value.f1.toString(); + String[] tokens = line.toLowerCase().split("\\W+"); + + // emit the pairs + for (String token : tokens) { + if (token.length() > 0) { + out.collect(new Tuple2(token, 1)); + } + } + } + } + + /** + * Converts Java data types to Hadoop Writables. + */ + public static final class HadoopDatatypeMapper extends MapFunction, Tuple2> { + + @Override + public Tuple2 map(Tuple2 value) throws Exception { + return new Tuple2(new Text(value.f0), new IntWritable(value.f1)); + } + + } + +} diff --git a/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/HadoopDataSink.java b/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/mapred/record/HadoopDataSink.java similarity index 91% rename from stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/HadoopDataSink.java rename to stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/mapred/record/HadoopDataSink.java index 62a7ebaafb6a7..2d0e0526a72e2 100644 --- a/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/HadoopDataSink.java +++ b/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/mapred/record/HadoopDataSink.java @@ -11,7 +11,7 @@ * specific language governing permissions and limitations under the License. **********************************************************************************************************************/ -package eu.stratosphere.hadoopcompatibility; +package eu.stratosphere.hadoopcompatibility.mapred.record; import java.util.List; @@ -26,8 +26,8 @@ import eu.stratosphere.api.java.record.operators.GenericDataSink; import eu.stratosphere.api.common.operators.Operator; import eu.stratosphere.compiler.contextcheck.Validatable; -import eu.stratosphere.hadoopcompatibility.datatypes.DefaultStratosphereTypeConverter; -import eu.stratosphere.hadoopcompatibility.datatypes.StratosphereTypeConverter; +import eu.stratosphere.hadoopcompatibility.mapred.record.datatypes.DefaultStratosphereTypeConverter; +import eu.stratosphere.hadoopcompatibility.mapred.record.datatypes.StratosphereTypeConverter; /** * The HadoopDataSink is a generic wrapper for all Hadoop OutputFormats. @@ -40,7 +40,7 @@ * * Note that it is possible to provide custom data type converter. * - * The HadoopDataSink provides a default converter: {@link eu.stratosphere.hadoopcompatibility.datatypes.DefaultStratosphereTypeConverter} + * The HadoopDataSink provides a default converter: {@link eu.stratosphere.hadoopcompatibility.mapred.record.datatypes.DefaultStratosphereTypeConverter} **/ public class HadoopDataSink extends GenericDataSink implements Validatable { @@ -68,7 +68,7 @@ public HadoopDataSink(OutputFormat hadoopFormat, Operator input, Cl @SuppressWarnings("deprecation") public HadoopDataSink(OutputFormat hadoopFormat, JobConf jobConf, String name, List> input, StratosphereTypeConverter conv, Class keyClass, Class valueClass) { - super(new HadoopOutputFormatWrapper(hadoopFormat, jobConf, conv),input, name); + super(new HadoopRecordOutputFormat(hadoopFormat, jobConf, conv),input, name); Preconditions.checkNotNull(hadoopFormat); Preconditions.checkNotNull(jobConf); this.name = name; diff --git a/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/HadoopDataSource.java b/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/mapred/record/HadoopDataSource.java similarity index 89% rename from stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/HadoopDataSource.java rename to stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/mapred/record/HadoopDataSource.java index fe1f36576bd26..f8f212012df80 100644 --- a/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/HadoopDataSource.java +++ b/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/mapred/record/HadoopDataSource.java @@ -11,7 +11,7 @@ * specific language governing permissions and limitations under the License. **********************************************************************************************************************/ -package eu.stratosphere.hadoopcompatibility; +package eu.stratosphere.hadoopcompatibility.mapred.record; import org.apache.hadoop.mapred.InputFormat; @@ -20,8 +20,10 @@ import com.google.common.base.Preconditions; import eu.stratosphere.api.java.record.operators.GenericDataSource; -import eu.stratosphere.hadoopcompatibility.datatypes.DefaultHadoopTypeConverter; -import eu.stratosphere.hadoopcompatibility.datatypes.HadoopTypeConverter; +import eu.stratosphere.hadoopcompatibility.mapred.record.datatypes.DefaultHadoopTypeConverter; +import eu.stratosphere.hadoopcompatibility.mapred.record.datatypes.HadoopTypeConverter; + + /** * The HadoopDataSource is a generic wrapper for all Hadoop InputFormats. @@ -39,7 +41,7 @@ * * DefaultHadoopTypeConverter: Converts the standard hadoop types (longWritable, Text) to Stratosphere's standard types. * */ -public class HadoopDataSource extends GenericDataSource> { +public class HadoopDataSource extends GenericDataSource> { private static String DEFAULT_NAME = ""; @@ -53,7 +55,7 @@ public class HadoopDataSource extends GenericDataSource hadoopFormat, JobConf jobConf, String name, HadoopTypeConverter conv) { - super(new HadoopInputFormatWrapper(hadoopFormat, jobConf, conv),name); + super(new HadoopRecordInputFormat(hadoopFormat, jobConf, conv),name); Preconditions.checkNotNull(hadoopFormat); Preconditions.checkNotNull(jobConf); Preconditions.checkNotNull(conv); diff --git a/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/HadoopInputFormatWrapper.java b/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/mapred/record/HadoopRecordInputFormat.java similarity index 79% rename from stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/HadoopInputFormatWrapper.java rename to stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/mapred/record/HadoopRecordInputFormat.java index 968b746b62e9f..7c47aa8119f15 100644 --- a/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/HadoopInputFormatWrapper.java +++ b/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/mapred/record/HadoopRecordInputFormat.java @@ -11,7 +11,7 @@ * specific language governing permissions and limitations under the License. **********************************************************************************************************************/ -package eu.stratosphere.hadoopcompatibility; +package eu.stratosphere.hadoopcompatibility.mapred.record; import java.io.IOException; import java.io.ObjectInputStream; @@ -24,10 +24,13 @@ import eu.stratosphere.api.common.io.InputFormat; import eu.stratosphere.api.common.io.statistics.BaseStatistics; import eu.stratosphere.configuration.Configuration; -import eu.stratosphere.hadoopcompatibility.datatypes.HadoopTypeConverter; +import eu.stratosphere.hadoopcompatibility.mapred.record.datatypes.HadoopTypeConverter; +import eu.stratosphere.hadoopcompatibility.mapred.utils.HadoopUtils; +import eu.stratosphere.hadoopcompatibility.mapred.wrapper.HadoopDummyReporter; +import eu.stratosphere.hadoopcompatibility.mapred.wrapper.HadoopInputSplit; import eu.stratosphere.types.Record; -public class HadoopInputFormatWrapper implements InputFormat { +public class HadoopRecordInputFormat implements InputFormat { private static final long serialVersionUID = 1L; @@ -41,16 +44,16 @@ public class HadoopInputFormatWrapper implements InputFormat hadoopInputFormat, JobConf job, HadoopTypeConverter conv) { + public HadoopRecordInputFormat(org.apache.hadoop.mapred.InputFormat hadoopInputFormat, JobConf job, HadoopTypeConverter conv) { super(); this.hadoopInputFormat = hadoopInputFormat; this.hadoopInputFormatName = hadoopInputFormat.getClass().getName(); this.converter = conv; - HadoopConfiguration.mergeHadoopConf(job); + HadoopUtils.mergeHadoopConf(job); this.jobConf = job; } @@ -65,24 +68,24 @@ public BaseStatistics getStatistics(BaseStatistics cachedStatistics) throws IOEx } @Override - public HadoopInputSplitWrapper[] createInputSplits(int minNumSplits) + public HadoopInputSplit[] createInputSplits(int minNumSplits) throws IOException { org.apache.hadoop.mapred.InputSplit[] splitArray = hadoopInputFormat.getSplits(jobConf, minNumSplits); - HadoopInputSplitWrapper[] hiSplit = new HadoopInputSplitWrapper[splitArray.length]; + HadoopInputSplit[] hiSplit = new HadoopInputSplit[splitArray.length]; for(int i=0;i getInputSplitType() { - return HadoopInputSplitWrapper.class; + public Class getInputSplitType() { + return HadoopInputSplit.class; } @Override - public void open(HadoopInputSplitWrapper split) throws IOException { - this.recordReader = this.hadoopInputFormat.getRecordReader(split.getHadoopInputSplit(), jobConf, new DummyHadoopReporter()); + public void open(HadoopInputSplit split) throws IOException { + this.recordReader = this.hadoopInputFormat.getRecordReader(split.getHadoopInputSplit(), jobConf, new HadoopDummyReporter()); key = this.recordReader.createKey(); value = this.recordReader.createValue(); this.fetched = false; diff --git a/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/HadoopOutputFormatWrapper.java b/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/mapred/record/HadoopRecordOutputFormat.java similarity index 81% rename from stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/HadoopOutputFormatWrapper.java rename to stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/mapred/record/HadoopRecordOutputFormat.java index 93a203f4ff66d..edd02a154dc75 100644 --- a/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/HadoopOutputFormatWrapper.java +++ b/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/mapred/record/HadoopRecordOutputFormat.java @@ -11,7 +11,7 @@ * specific language governing permissions and limitations under the License. **********************************************************************************************************************/ -package eu.stratosphere.hadoopcompatibility; +package eu.stratosphere.hadoopcompatibility.mapred.record; import java.io.IOException; import java.io.ObjectInputStream; @@ -24,11 +24,15 @@ import eu.stratosphere.api.common.io.OutputFormat; import eu.stratosphere.configuration.Configuration; -import eu.stratosphere.hadoopcompatibility.datatypes.StratosphereTypeConverter; +import eu.stratosphere.hadoopcompatibility.mapred.record.datatypes.HadoopFileOutputCommitter; +import eu.stratosphere.hadoopcompatibility.mapred.record.datatypes.StratosphereTypeConverter; +import eu.stratosphere.hadoopcompatibility.mapred.utils.HadoopUtils; +import eu.stratosphere.hadoopcompatibility.mapred.wrapper.HadoopDummyProgressable; +import eu.stratosphere.hadoopcompatibility.mapred.wrapper.HadoopDummyReporter; import eu.stratosphere.types.Record; -public class HadoopOutputFormatWrapper implements OutputFormat { +public class HadoopRecordOutputFormat implements OutputFormat { private static final long serialVersionUID = 1L; @@ -42,15 +46,15 @@ public class HadoopOutputFormatWrapper implements OutputFormat { public StratosphereTypeConverter converter; - public FileOutputCommitterWrapper fileOutputCommitterWrapper; + public HadoopFileOutputCommitter fileOutputCommitterWrapper; - public HadoopOutputFormatWrapper(org.apache.hadoop.mapred.OutputFormat hadoopFormat, JobConf job, StratosphereTypeConverter conv) { + public HadoopRecordOutputFormat(org.apache.hadoop.mapred.OutputFormat hadoopFormat, JobConf job, StratosphereTypeConverter conv) { super(); this.hadoopOutputFormat = hadoopFormat; this.hadoopOutputFormatName = hadoopFormat.getClass().getName(); this.converter = conv; - this.fileOutputCommitterWrapper = new FileOutputCommitterWrapper(); - HadoopConfiguration.mergeHadoopConf(job); + this.fileOutputCommitterWrapper = new HadoopFileOutputCommitter(); + HadoopUtils.mergeHadoopConf(job); this.jobConf = job; } @@ -74,7 +78,7 @@ public void open(int taskNumber, int numTasks) throws IOException { } else { throw new IOException("task id too large"); } - this.recordWriter = this.hadoopOutputFormat.getRecordWriter(null, this.jobConf, Integer.toString(taskNumber + 1), new DummyHadoopProgressable()); + this.recordWriter = this.hadoopOutputFormat.getRecordWriter(null, this.jobConf, Integer.toString(taskNumber + 1), new HadoopDummyProgressable()); } @@ -91,7 +95,7 @@ public void writeRecord(Record record) throws IOException { */ @Override public void close() throws IOException { - this.recordWriter.close(new DummyHadoopReporter()); + this.recordWriter.close(new HadoopDummyReporter()); if (this.fileOutputCommitterWrapper.needsTaskCommit(this.jobConf, TaskAttemptID.forName(this.jobConf.get("mapred.task.id")))) { this.fileOutputCommitterWrapper.commitTask(this.jobConf, TaskAttemptID.forName(this.jobConf.get("mapred.task.id"))); } @@ -124,7 +128,7 @@ private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundE } ReflectionUtils.setConf(hadoopOutputFormat, jobConf); converter = (StratosphereTypeConverter) in.readObject(); - fileOutputCommitterWrapper = (FileOutputCommitterWrapper) in.readObject(); + fileOutputCommitterWrapper = (HadoopFileOutputCommitter) in.readObject(); } diff --git a/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/datatypes/DefaultHadoopTypeConverter.java b/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/mapred/record/datatypes/DefaultHadoopTypeConverter.java similarity index 97% rename from stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/datatypes/DefaultHadoopTypeConverter.java rename to stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/mapred/record/datatypes/DefaultHadoopTypeConverter.java index 42f81de72c5ce..3832772415718 100644 --- a/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/datatypes/DefaultHadoopTypeConverter.java +++ b/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/mapred/record/datatypes/DefaultHadoopTypeConverter.java @@ -11,7 +11,7 @@ * specific language governing permissions and limitations under the License. **********************************************************************************************************************/ -package eu.stratosphere.hadoopcompatibility.datatypes; +package eu.stratosphere.hadoopcompatibility.mapred.record.datatypes; import org.apache.hadoop.io.BooleanWritable; import org.apache.hadoop.io.ByteWritable; diff --git a/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/datatypes/DefaultStratosphereTypeConverter.java b/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/mapred/record/datatypes/DefaultStratosphereTypeConverter.java similarity index 98% rename from stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/datatypes/DefaultStratosphereTypeConverter.java rename to stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/mapred/record/datatypes/DefaultStratosphereTypeConverter.java index 9dbe318db33e0..a1850ccf114f2 100644 --- a/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/datatypes/DefaultStratosphereTypeConverter.java +++ b/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/mapred/record/datatypes/DefaultStratosphereTypeConverter.java @@ -11,7 +11,7 @@ * specific language governing permissions and limitations under the License. **********************************************************************************************************************/ -package eu.stratosphere.hadoopcompatibility.datatypes; +package eu.stratosphere.hadoopcompatibility.mapred.record.datatypes; import org.apache.hadoop.io.BooleanWritable; import org.apache.hadoop.io.ByteWritable; diff --git a/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/FileOutputCommitterWrapper.java b/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/mapred/record/datatypes/HadoopFileOutputCommitter.java similarity index 97% rename from stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/FileOutputCommitterWrapper.java rename to stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/mapred/record/datatypes/HadoopFileOutputCommitter.java index 23a4605c3dd21..8f46c00e18061 100644 --- a/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/FileOutputCommitterWrapper.java +++ b/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/mapred/record/datatypes/HadoopFileOutputCommitter.java @@ -11,7 +11,7 @@ * specific language governing permissions and limitations under the License. **********************************************************************************************************************/ -package eu.stratosphere.hadoopcompatibility; +package eu.stratosphere.hadoopcompatibility.mapred.record.datatypes; import java.io.IOException; import java.io.Serializable; @@ -31,7 +31,7 @@ * as input parameter. However JobContext class is package private, and in Hadoop 2.2.0 it's public. * This class takes {@link org.apache.hadoop.mapred.JobConf} as input instead of JobContext in order to setup and commit tasks. */ -public class FileOutputCommitterWrapper extends FileOutputCommitter implements Serializable { +public class HadoopFileOutputCommitter extends FileOutputCommitter implements Serializable { private static final long serialVersionUID = 1L; diff --git a/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/datatypes/HadoopTypeConverter.java b/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/mapred/record/datatypes/HadoopTypeConverter.java similarity index 95% rename from stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/datatypes/HadoopTypeConverter.java rename to stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/mapred/record/datatypes/HadoopTypeConverter.java index d27941717000b..83c14e6a8f966 100644 --- a/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/datatypes/HadoopTypeConverter.java +++ b/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/mapred/record/datatypes/HadoopTypeConverter.java @@ -11,7 +11,7 @@ * specific language governing permissions and limitations under the License. **********************************************************************************************************************/ -package eu.stratosphere.hadoopcompatibility.datatypes; +package eu.stratosphere.hadoopcompatibility.mapred.record.datatypes; import java.io.Serializable; diff --git a/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/datatypes/StratosphereTypeConverter.java b/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/mapred/record/datatypes/StratosphereTypeConverter.java similarity index 95% rename from stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/datatypes/StratosphereTypeConverter.java rename to stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/mapred/record/datatypes/StratosphereTypeConverter.java index 180024809414d..27d710dc847db 100644 --- a/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/datatypes/StratosphereTypeConverter.java +++ b/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/mapred/record/datatypes/StratosphereTypeConverter.java @@ -11,7 +11,7 @@ * specific language governing permissions and limitations under the License. **********************************************************************************************************************/ -package eu.stratosphere.hadoopcompatibility.datatypes; +package eu.stratosphere.hadoopcompatibility.mapred.record.datatypes; import java.io.Serializable; diff --git a/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/datatypes/WritableComparableWrapper.java b/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/mapred/record/datatypes/WritableComparableWrapper.java similarity index 95% rename from stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/datatypes/WritableComparableWrapper.java rename to stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/mapred/record/datatypes/WritableComparableWrapper.java index 90787a47e649a..767f53955fd73 100644 --- a/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/datatypes/WritableComparableWrapper.java +++ b/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/mapred/record/datatypes/WritableComparableWrapper.java @@ -11,7 +11,7 @@ * specific language governing permissions and limitations under the License. **********************************************************************************************************************/ -package eu.stratosphere.hadoopcompatibility.datatypes; +package eu.stratosphere.hadoopcompatibility.mapred.record.datatypes; import org.apache.hadoop.io.WritableComparable; diff --git a/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/datatypes/WritableWrapper.java b/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/mapred/record/datatypes/WritableWrapper.java similarity index 96% rename from stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/datatypes/WritableWrapper.java rename to stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/mapred/record/datatypes/WritableWrapper.java index f84d9c0a4a980..d74eb74d2abc3 100644 --- a/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/datatypes/WritableWrapper.java +++ b/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/mapred/record/datatypes/WritableWrapper.java @@ -11,7 +11,7 @@ * specific language governing permissions and limitations under the License. **********************************************************************************************************************/ -package eu.stratosphere.hadoopcompatibility.datatypes; +package eu.stratosphere.hadoopcompatibility.mapred.record.datatypes; import java.io.DataInput; import java.io.DataOutput; diff --git a/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/datatypes/WritableWrapperConverter.java b/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/mapred/record/datatypes/WritableWrapperConverter.java similarity index 96% rename from stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/datatypes/WritableWrapperConverter.java rename to stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/mapred/record/datatypes/WritableWrapperConverter.java index 0831a8df1b425..2a42c518003e1 100644 --- a/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/datatypes/WritableWrapperConverter.java +++ b/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/mapred/record/datatypes/WritableWrapperConverter.java @@ -11,7 +11,7 @@ * specific language governing permissions and limitations under the License. **********************************************************************************************************************/ -package eu.stratosphere.hadoopcompatibility.datatypes; +package eu.stratosphere.hadoopcompatibility.mapred.record.datatypes; import org.apache.hadoop.io.Writable; import org.apache.hadoop.io.WritableComparable; diff --git a/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/example/WordCount.java b/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/mapred/record/example/WordCount.java similarity index 92% rename from stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/example/WordCount.java rename to stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/mapred/record/example/WordCount.java index 204a80bd1d463..e135ed5f36ed9 100644 --- a/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/example/WordCount.java +++ b/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/mapred/record/example/WordCount.java @@ -11,7 +11,7 @@ * specific language governing permissions and limitations under the License. **********************************************************************************************************************/ -package eu.stratosphere.hadoopcompatibility.example; +package eu.stratosphere.hadoopcompatibility.mapred.record.example; import java.io.Serializable; import java.util.Iterator; @@ -26,17 +26,17 @@ import eu.stratosphere.api.common.Plan; import eu.stratosphere.api.common.Program; import eu.stratosphere.api.common.ProgramDescription; -import eu.stratosphere.api.java.record.operators.FileDataSink; import eu.stratosphere.api.java.record.functions.FunctionAnnotation.ConstantFields; import eu.stratosphere.api.java.record.functions.MapFunction; import eu.stratosphere.api.java.record.functions.ReduceFunction; import eu.stratosphere.api.java.record.io.CsvOutputFormat; +import eu.stratosphere.api.java.record.operators.FileDataSink; import eu.stratosphere.api.java.record.operators.MapOperator; import eu.stratosphere.api.java.record.operators.ReduceOperator; import eu.stratosphere.api.java.record.operators.ReduceOperator.Combinable; import eu.stratosphere.client.LocalExecutor; -import eu.stratosphere.hadoopcompatibility.HadoopDataSource; -import eu.stratosphere.hadoopcompatibility.datatypes.WritableWrapperConverter; +import eu.stratosphere.hadoopcompatibility.mapred.record.HadoopDataSource; +import eu.stratosphere.hadoopcompatibility.mapred.record.datatypes.WritableWrapperConverter; import eu.stratosphere.types.IntValue; import eu.stratosphere.types.Record; import eu.stratosphere.types.StringValue; @@ -44,7 +44,14 @@ /** * Implements a word count which takes the input file and counts the number of - * the occurrences of each word in the file. + * the occurrences of each word in the file. + * + *

+ * + * Note: This example uses the out dated Record API. + * It is recommended to use the new Java API. + * + * @see eu.stratosphere.hadoopcompatibility.mapred.record.example.example.WordCount */ public class WordCount implements Program, ProgramDescription { diff --git a/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/example/WordCountWithHadoopOutputFormat.java b/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/mapred/record/example/WordCountWithOutputFormat.java similarity index 91% rename from stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/example/WordCountWithHadoopOutputFormat.java rename to stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/mapred/record/example/WordCountWithOutputFormat.java index 08a989d001734..77191900e9870 100644 --- a/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/example/WordCountWithHadoopOutputFormat.java +++ b/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/mapred/record/example/WordCountWithOutputFormat.java @@ -11,7 +11,7 @@ * specific language governing permissions and limitations under the License. **********************************************************************************************************************/ -package eu.stratosphere.hadoopcompatibility.example; +package eu.stratosphere.hadoopcompatibility.mapred.record.example; import java.io.Serializable; import java.util.Iterator; @@ -35,8 +35,8 @@ import eu.stratosphere.api.java.record.operators.ReduceOperator; import eu.stratosphere.api.java.record.operators.ReduceOperator.Combinable; import eu.stratosphere.client.LocalExecutor; -import eu.stratosphere.hadoopcompatibility.HadoopDataSink; -import eu.stratosphere.hadoopcompatibility.HadoopDataSource; +import eu.stratosphere.hadoopcompatibility.mapred.record.HadoopDataSink; +import eu.stratosphere.hadoopcompatibility.mapred.record.HadoopDataSource; import eu.stratosphere.types.IntValue; import eu.stratosphere.types.Record; import eu.stratosphere.types.StringValue; @@ -45,9 +45,16 @@ /** * Implements a word count which takes the input file and counts the number of * the occurrences of each word in the file. + * + *

+ * + * Note: This example uses the out dated Record API. + * It is recommended to use the new Java API. + * + * @see eu.stratosphere.hadoopcompatibility.mapred.record.example.example.WordCount */ @SuppressWarnings("serial") -public class WordCountWithHadoopOutputFormat implements Program, ProgramDescription { +public class WordCountWithOutputFormat implements Program, ProgramDescription { /** * Converts a Record containing one string in to multiple string/integer pairs. @@ -143,7 +150,7 @@ public String getDescription() { public static void main(String[] args) throws Exception { - WordCountWithHadoopOutputFormat wc = new WordCountWithHadoopOutputFormat(); + WordCountWithOutputFormat wc = new WordCountWithOutputFormat(); if (args.length < 3) { System.err.println(wc.getDescription()); diff --git a/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/mapred/utils/HadoopUtils.java b/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/mapred/utils/HadoopUtils.java new file mode 100644 index 0000000000000..18b30da41f2f5 --- /dev/null +++ b/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/mapred/utils/HadoopUtils.java @@ -0,0 +1,84 @@ +/*********************************************************************************************************************** + * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + **********************************************************************************************************************/ + +package eu.stratosphere.hadoopcompatibility.mapred.utils; + +import java.lang.reflect.Constructor; +import java.util.Map; + +import org.apache.hadoop.mapred.JobConf; +import org.apache.hadoop.mapred.JobContext; +import org.apache.hadoop.mapred.JobID; +import org.apache.hadoop.mapred.TaskAttemptContext; +import org.apache.hadoop.mapred.TaskAttemptID; + +import eu.stratosphere.runtime.fs.hdfs.DistributedFileSystem; + + +public class HadoopUtils { + + /** + * Merge HadoopConfiguration into JobConf. This is necessary for the HDFS configuration. + */ + public static void mergeHadoopConf(JobConf jobConf) { + org.apache.hadoop.conf.Configuration hadoopConf = DistributedFileSystem.getHadoopConfiguration(); + for (Map.Entry e : hadoopConf) { + jobConf.set(e.getKey(), e.getValue()); + } + } + + public static JobContext instantiateJobContext(JobConf jobConf, JobID jobId) throws Exception { + try { + // for Hadoop 1.xx + Class clazz = null; + if(!TaskAttemptContext.class.isInterface()) { + clazz = Class.forName("org.apache.hadoop.mapred.JobContext", true, Thread.currentThread().getContextClassLoader()); + } + // for Hadoop 2.xx + else { + clazz = Class.forName("org.apache.hadoop.mapred.JobContextImpl", true, Thread.currentThread().getContextClassLoader()); + } + Constructor constructor = clazz.getDeclaredConstructor(JobConf.class, org.apache.hadoop.mapreduce.JobID.class); + // for Hadoop 1.xx + constructor.setAccessible(true); + JobContext context = (JobContext) constructor.newInstance(jobConf, jobId); + + return context; + } + catch(Exception e) { + throw new Exception("Could not create instance of JobContext.", e); + } + } + + public static TaskAttemptContext instantiateTaskAttemptContext(JobConf jobConf, TaskAttemptID taskAttemptID) throws Exception { + try { + // for Hadoop 1.xx + Class clazz = null; + if(!TaskAttemptContext.class.isInterface()) { + clazz = Class.forName("org.apache.hadoop.mapred.TaskAttemptContext", true, Thread.currentThread().getContextClassLoader()); + } + // for Hadoop 2.xx + else { + clazz = Class.forName("org.apache.hadoop.mapred.TaskAttemptContextImpl", true, Thread.currentThread().getContextClassLoader()); + } + Constructor constructor = clazz.getDeclaredConstructor(JobConf.class, TaskAttemptID.class); + // for Hadoop 1.xx + constructor.setAccessible(true); + TaskAttemptContext context = (TaskAttemptContext) constructor.newInstance(jobConf, taskAttemptID); + return context; + } + catch(Exception e) { + throw new Exception("Could not create instance of TaskAttemptContext.", e); + } + } +} diff --git a/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/DummyHadoopProgressable.java b/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/mapred/wrapper/HadoopDummyProgressable.java similarity index 89% rename from stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/DummyHadoopProgressable.java rename to stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/mapred/wrapper/HadoopDummyProgressable.java index 11c8606495a15..26827bb0af79f 100644 --- a/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/DummyHadoopProgressable.java +++ b/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/mapred/wrapper/HadoopDummyProgressable.java @@ -11,7 +11,7 @@ * specific language governing permissions and limitations under the License. **********************************************************************************************************************/ -package eu.stratosphere.hadoopcompatibility; +package eu.stratosphere.hadoopcompatibility.mapred.wrapper; import org.apache.hadoop.util.Progressable; @@ -19,7 +19,7 @@ * This is a dummy progress * */ -public class DummyHadoopProgressable implements Progressable { +public class HadoopDummyProgressable implements Progressable { @Override public void progress() { diff --git a/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/DummyHadoopReporter.java b/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/mapred/wrapper/HadoopDummyReporter.java similarity index 93% rename from stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/DummyHadoopReporter.java rename to stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/mapred/wrapper/HadoopDummyReporter.java index 823217b8442f7..130b40295cdc5 100644 --- a/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/DummyHadoopReporter.java +++ b/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/mapred/wrapper/HadoopDummyReporter.java @@ -11,7 +11,7 @@ * specific language governing permissions and limitations under the License. **********************************************************************************************************************/ -package eu.stratosphere.hadoopcompatibility; +package eu.stratosphere.hadoopcompatibility.mapred.wrapper; import org.apache.hadoop.mapred.Counters.Counter; import org.apache.hadoop.mapred.InputSplit; @@ -21,7 +21,7 @@ * This is a dummy progress monitor / reporter * */ -public class DummyHadoopReporter implements Reporter { +public class HadoopDummyReporter implements Reporter { @Override public void progress() { diff --git a/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/HadoopInputSplitWrapper.java b/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/mapred/wrapper/HadoopInputSplit.java similarity index 91% rename from stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/HadoopInputSplitWrapper.java rename to stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/mapred/wrapper/HadoopInputSplit.java index 936d5c88f06fb..5eb185d8bf2a5 100644 --- a/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/HadoopInputSplitWrapper.java +++ b/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/mapred/wrapper/HadoopInputSplit.java @@ -11,7 +11,7 @@ * specific language governing permissions and limitations under the License. **********************************************************************************************************************/ -package eu.stratosphere.hadoopcompatibility; +package eu.stratosphere.hadoopcompatibility.mapred.wrapper; import java.io.DataInput; import java.io.DataOutput; @@ -23,7 +23,7 @@ import eu.stratosphere.core.io.InputSplit; -public class HadoopInputSplitWrapper implements InputSplit { +public class HadoopInputSplit implements InputSplit { public transient org.apache.hadoop.mapred.InputSplit hadoopInputSplit; public JobConf jobConf; @@ -36,12 +36,12 @@ public org.apache.hadoop.mapred.InputSplit getHadoopInputSplit() { } - public HadoopInputSplitWrapper() { + public HadoopInputSplit() { super(); } - public HadoopInputSplitWrapper(org.apache.hadoop.mapred.InputSplit hInputSplit, JobConf jobconf) { + public HadoopInputSplit(org.apache.hadoop.mapred.InputSplit hInputSplit, JobConf jobconf) { this.hadoopInputSplit = hInputSplit; this.hadoopInputSplitTypeName = hInputSplit.getClass().getName(); this.jobConf=jobconf; diff --git a/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/mapreduce/HadoopInputFormat.java b/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/mapreduce/HadoopInputFormat.java new file mode 100644 index 0000000000000..2e847bdb3b2b3 --- /dev/null +++ b/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/mapreduce/HadoopInputFormat.java @@ -0,0 +1,337 @@ +/*********************************************************************************************************************** + * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + **********************************************************************************************************************/ + +package eu.stratosphere.hadoopcompatibility.mapreduce; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.ArrayList; +import java.util.List; + +import org.apache.commons.logging.Log; +import org.apache.commons.logging.LogFactory; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.RecordReader; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.lib.input.FileInputFormat; + +import eu.stratosphere.api.common.io.FileInputFormat.FileBaseStatistics; +import eu.stratosphere.api.common.io.InputFormat; +import eu.stratosphere.api.common.io.statistics.BaseStatistics; +import eu.stratosphere.api.java.tuple.Tuple2; +import eu.stratosphere.api.java.typeutils.ResultTypeQueryable; +import eu.stratosphere.api.java.typeutils.TupleTypeInfo; +import eu.stratosphere.api.java.typeutils.WritableTypeInfo; +import eu.stratosphere.configuration.Configuration; +import eu.stratosphere.core.fs.FileStatus; +import eu.stratosphere.core.fs.FileSystem; +import eu.stratosphere.core.fs.Path; +import eu.stratosphere.hadoopcompatibility.mapreduce.utils.HadoopUtils; +import eu.stratosphere.hadoopcompatibility.mapreduce.wrapper.HadoopInputSplit; +import eu.stratosphere.types.TypeInformation; + +public class HadoopInputFormat implements InputFormat, HadoopInputSplit>, ResultTypeQueryable> { + + private static final long serialVersionUID = 1L; + + private static final Log LOG = LogFactory.getLog(HadoopInputFormat.class); + + private org.apache.hadoop.mapreduce.InputFormat mapreduceInputFormat; + private Class keyClass; + private Class valueClass; + private org.apache.hadoop.conf.Configuration configuration; + + public transient RecordReader recordReader; + private boolean fetched = false; + private boolean hasNext; + + public HadoopInputFormat() { + super(); + } + + public HadoopInputFormat(org.apache.hadoop.mapreduce.InputFormat mapreduceInputFormat, Class key, Class value, Job job) { + super(); + this.mapreduceInputFormat = mapreduceInputFormat; + this.keyClass = key; + this.valueClass = value; + this.configuration = job.getConfiguration(); + HadoopUtils.mergeHadoopConf(configuration); + } + + public void setConfiguration(org.apache.hadoop.conf.Configuration configuration) { + this.configuration = configuration; + } + + public org.apache.hadoop.mapreduce.InputFormat getHadoopInputFormat() { + return this.mapreduceInputFormat; + } + + public void setHadoopInputFormat(org.apache.hadoop.mapreduce.InputFormat mapreduceInputFormat) { + this.mapreduceInputFormat = mapreduceInputFormat; + } + + public org.apache.hadoop.conf.Configuration getConfiguration() { + return this.configuration; + } + + // -------------------------------------------------------------------------------------------- + // InputFormat + // -------------------------------------------------------------------------------------------- + + @Override + public void configure(Configuration parameters) { + // nothing to do + } + + @Override + public BaseStatistics getStatistics(BaseStatistics cachedStats) throws IOException { + // only gather base statistics for FileInputFormats + if(!(mapreduceInputFormat instanceof FileInputFormat)) { + return null; + } + + JobContext jobContext = null; + try { + jobContext = HadoopUtils.instantiateJobContext(configuration, null); + } + catch (Exception e) { + throw new RuntimeException(e); + } + + final FileBaseStatistics cachedFileStats = (cachedStats != null && cachedStats instanceof FileBaseStatistics) ? + (FileBaseStatistics) cachedStats : null; + + try { + final org.apache.hadoop.fs.Path[] paths = FileInputFormat.getInputPaths(jobContext); + + return getFileStats(cachedFileStats, paths, new ArrayList(1)); + } catch (IOException ioex) { + if (LOG.isWarnEnabled()) { + LOG.warn("Could not determine statistics due to an io error: " + + ioex.getMessage()); + } + } + catch (Throwable t) { + if (LOG.isErrorEnabled()) { + LOG.error("Unexpected problen while getting the file statistics: " + + t.getMessage(), t); + } + } + + // no statistics available + return null; + } + + @Override + public HadoopInputSplit[] createInputSplits(int minNumSplits) + throws IOException { + configuration.setInt("mapreduce.input.fileinputformat.split.minsize", minNumSplits); + + JobContext jobContext = null; + try { + jobContext = HadoopUtils.instantiateJobContext(configuration, new JobID()); + } + catch (Exception e) { + throw new RuntimeException(e); + } + + List splits; + try { + splits = this.mapreduceInputFormat.getSplits(jobContext); + } catch (InterruptedException e) { + throw new IOException("Could not get Splits.", e); + } + HadoopInputSplit[] hadoopInputSplits = new HadoopInputSplit[splits.size()]; + + for(int i = 0; i < hadoopInputSplits.length; i++){ + hadoopInputSplits[i] = new HadoopInputSplit(splits.get(i), jobContext); + } + return hadoopInputSplits; + } + + @Override + public Class getInputSplitType() { + return HadoopInputSplit.class; + } + + @Override + public void open(HadoopInputSplit split) throws IOException { + TaskAttemptContext context = null; + try { + context = HadoopUtils.instantiateTaskAttemptContext(configuration, new TaskAttemptID()); + } + catch(Exception e) { + throw new RuntimeException(e); + } + + try { + this.recordReader = this.mapreduceInputFormat + .createRecordReader(split.getHadoopInputSplit(), context); + this.recordReader.initialize(split.getHadoopInputSplit(), context); + } catch (InterruptedException e) { + throw new IOException("Could not create RecordReader.", e); + } finally { + this.fetched = false; + } + } + + @Override + public boolean reachedEnd() throws IOException { + if(!this.fetched) { + fetchNext(); + } + return !this.hasNext; + } + + private void fetchNext() throws IOException { + try { + this.hasNext = this.recordReader.nextKeyValue(); + } catch (InterruptedException e) { + throw new IOException("Could not fetch next KeyValue pair.", e); + } finally { + this.fetched = true; + } + } + + @Override + public Tuple2 nextRecord(Tuple2 record) throws IOException { + if(!this.fetched) { + fetchNext(); + } + if(!this.hasNext) { + return null; + } + try { + record.f0 = this.recordReader.getCurrentKey(); + record.f1 = this.recordReader.getCurrentValue(); + } catch (InterruptedException e) { + throw new IOException("Could not get KeyValue pair.", e); + } + this.fetched = false; + + return record; + } + + @Override + public void close() throws IOException { + this.recordReader.close(); + } + + // -------------------------------------------------------------------------------------------- + // Helper methods + // -------------------------------------------------------------------------------------------- + + private FileBaseStatistics getFileStats(FileBaseStatistics cachedStats, org.apache.hadoop.fs.Path[] hadoopFilePaths, + ArrayList files) throws IOException { + + long latestModTime = 0L; + + // get the file info and check whether the cached statistics are still valid. + for(org.apache.hadoop.fs.Path hadoopPath : hadoopFilePaths) { + + final Path filePath = new Path(hadoopPath.toUri()); + final FileSystem fs = FileSystem.get(filePath.toUri()); + + final FileStatus file = fs.getFileStatus(filePath); + latestModTime = Math.max(latestModTime, file.getModificationTime()); + + // enumerate all files and check their modification time stamp. + if (file.isDir()) { + FileStatus[] fss = fs.listStatus(filePath); + files.ensureCapacity(files.size() + fss.length); + + for (FileStatus s : fss) { + if (!s.isDir()) { + files.add(s); + latestModTime = Math.max(s.getModificationTime(), latestModTime); + } + } + } else { + files.add(file); + } + } + + // check whether the cached statistics are still valid, if we have any + if (cachedStats != null && latestModTime <= cachedStats.getLastModificationTime()) { + return cachedStats; + } + + // calculate the whole length + long len = 0; + for (FileStatus s : files) { + len += s.getLen(); + } + + // sanity check + if (len <= 0) { + len = BaseStatistics.SIZE_UNKNOWN; + } + + return new FileBaseStatistics(latestModTime, len, BaseStatistics.AVG_RECORD_BYTES_UNKNOWN); + } + + // -------------------------------------------------------------------------------------------- + // Custom serialization methods + // -------------------------------------------------------------------------------------------- + + private void writeObject(ObjectOutputStream out) throws IOException { + out.writeUTF(this.mapreduceInputFormat.getClass().getName()); + out.writeUTF(this.keyClass.getName()); + out.writeUTF(this.valueClass.getName()); + this.configuration.write(out); + } + + @SuppressWarnings("unchecked") + private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { + String hadoopInputFormatClassName = in.readUTF(); + String keyClassName = in.readUTF(); + String valueClassName = in.readUTF(); + + org.apache.hadoop.conf.Configuration configuration = new org.apache.hadoop.conf.Configuration(); + configuration.readFields(in); + + if(this.configuration == null) { + this.configuration = configuration; + } + + try { + this.mapreduceInputFormat = (org.apache.hadoop.mapreduce.InputFormat) Class.forName(hadoopInputFormatClassName, true, Thread.currentThread().getContextClassLoader()).newInstance(); + } catch (Exception e) { + throw new RuntimeException("Unable to instantiate the hadoop input format", e); + } + try { + this.keyClass = (Class) Class.forName(keyClassName, true, Thread.currentThread().getContextClassLoader()); + } catch (Exception e) { + throw new RuntimeException("Unable to find key class.", e); + } + try { + this.valueClass = (Class) Class.forName(valueClassName, true, Thread.currentThread().getContextClassLoader()); + } catch (Exception e) { + throw new RuntimeException("Unable to find value class.", e); + } + } + + // -------------------------------------------------------------------------------------------- + // ResultTypeQueryable + // -------------------------------------------------------------------------------------------- + + @Override + public TypeInformation> getProducedType() { + return new TupleTypeInfo>(new WritableTypeInfo((Class) keyClass), new WritableTypeInfo((Class) valueClass)); + } +} diff --git a/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/mapreduce/HadoopOutputFormat.java b/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/mapreduce/HadoopOutputFormat.java new file mode 100644 index 0000000000000..2ec82de6c2334 --- /dev/null +++ b/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/mapreduce/HadoopOutputFormat.java @@ -0,0 +1,204 @@ +/*********************************************************************************************************************** + * Copyright (C) 2010-2014 by the Stratosphere project (http://stratosphere.eu) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + **********************************************************************************************************************/ + +package eu.stratosphere.hadoopcompatibility.mapreduce; + +import java.io.IOException; +import java.io.ObjectInputStream; +import java.io.ObjectOutputStream; +import java.util.regex.Matcher; +import java.util.regex.Pattern; + +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.FileSystem; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.RecordWriter; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; +import org.apache.hadoop.mapreduce.lib.output.FileOutputCommitter; + +import eu.stratosphere.api.common.io.OutputFormat; +import eu.stratosphere.api.java.tuple.Tuple2; +import eu.stratosphere.configuration.Configuration; +import eu.stratosphere.hadoopcompatibility.mapreduce.utils.HadoopUtils; + + +public class HadoopOutputFormat implements OutputFormat> { + + private static final long serialVersionUID = 1L; + + private org.apache.hadoop.conf.Configuration configuration; + private org.apache.hadoop.mapreduce.OutputFormat mapreduceOutputFormat; + private transient RecordWriter recordWriter; + private transient FileOutputCommitter fileOutputCommitter; + private transient TaskAttemptContext context; + + public HadoopOutputFormat(org.apache.hadoop.mapreduce.OutputFormat mapreduceOutputFormat, Job job) { + super(); + this.mapreduceOutputFormat = mapreduceOutputFormat; + this.configuration = job.getConfiguration(); + HadoopUtils.mergeHadoopConf(configuration); + } + + public void setConfiguration(org.apache.hadoop.conf.Configuration configuration) { + this.configuration = configuration; + } + + public org.apache.hadoop.conf.Configuration getConfiguration() { + return this.configuration; + } + + public org.apache.hadoop.mapreduce.OutputFormat getHadoopOutputFormat() { + return this.mapreduceOutputFormat; + } + + public void setHadoopOutputFormat(org.apache.hadoop.mapreduce.OutputFormat mapreduceOutputFormat) { + this.mapreduceOutputFormat = mapreduceOutputFormat; + } + + // -------------------------------------------------------------------------------------------- + // OutputFormat + // -------------------------------------------------------------------------------------------- + + @Override + public void configure(Configuration parameters) { + // nothing to do + } + + /** + * create the temporary output file for hadoop RecordWriter. + * @param taskNumber The number of the parallel instance. + * @param numTasks The number of parallel tasks. + * @throws IOException + */ + @Override + public void open(int taskNumber, int numTasks) throws IOException { + if (Integer.toString(taskNumber + 1).length() > 6) { + throw new IOException("Task id too large."); + } + + // for hadoop 2.2 + this.configuration.set("mapreduce.output.basename", "tmp"); + + TaskAttemptID taskAttemptID = TaskAttemptID.forName("attempt__0000_r_" + + String.format("%" + (6 - Integer.toString(taskNumber + 1).length()) + "s"," ").replace(" ", "0") + + Integer.toString(taskNumber + 1) + + "_0"); + + try { + this.context = HadoopUtils.instantiateTaskAttemptContext(this.configuration, taskAttemptID); + } catch (Exception e) { + throw new RuntimeException(e); + } + this.configuration.set("mapred.task.id", taskAttemptID.toString()); + // for hadoop 2.2 + this.configuration.set("mapreduce.task.attempt.id", taskAttemptID.toString()); + + this.fileOutputCommitter = new FileOutputCommitter(new Path(this.configuration.get("mapred.output.dir")), context); + + try { + this.fileOutputCommitter.setupJob(HadoopUtils.instantiateJobContext(this.configuration, new JobID())); + } + catch (Exception e) { + throw new RuntimeException(e); + } + + // compatible for hadoop 2.2.0, the temporary output directory is different from hadoop 1.2.1 + this.configuration.set("mapreduce.task.output.dir", this.fileOutputCommitter.getWorkPath().toString()); + + try { + this.recordWriter = this.mapreduceOutputFormat.getRecordWriter(this.context); + } catch (InterruptedException e) { + throw new IOException("Could not create RecordWriter.", e); + } + } + + + @Override + public void writeRecord(Tuple2 record) throws IOException { + try { + this.recordWriter.write(record.f0, record.f1); + } catch (InterruptedException e) { + throw new IOException("Could not write Record.", e); + } + } + + /** + * commit the task by moving the output file out from the temporary directory. + * @throws IOException + */ + @SuppressWarnings("deprecation") + @Override + public void close() throws IOException { + try { + this.recordWriter.close(this.context); + } catch (InterruptedException e) { + throw new IOException("Could not close RecordReader.", e); + } + + if (this.fileOutputCommitter.needsTaskCommit(this.context)) { + this.fileOutputCommitter.commitTask(this.context); + } + this.fileOutputCommitter.commitJob(this.context); + + // rename tmp-* files to final name + FileSystem fs = FileSystem.get(this.configuration); + + Path outputPath = new Path(this.configuration.get("mapred.output.dir")); + + final Pattern p = Pattern.compile("tmp-(.)-([0-9]+)"); + + // isDirectory does not work in hadoop 1 + if(fs.getFileStatus(outputPath).isDir()) { + FileStatus[] files = fs.listStatus(outputPath); + + for(FileStatus f : files) { + Matcher m = p.matcher(f.getPath().getName()); + if(m.matches()) { + int part = Integer.valueOf(m.group(2)); + fs.rename(f.getPath(), new Path(outputPath.toString()+"/"+part)); + } + } + } + } + + // -------------------------------------------------------------------------------------------- + // Custom serialization methods + // -------------------------------------------------------------------------------------------- + + private void writeObject(ObjectOutputStream out) throws IOException { + out.writeUTF(this.mapreduceOutputFormat.getClass().getName()); + this.configuration.write(out); + } + + @SuppressWarnings("unchecked") + private void readObject(ObjectInputStream in) throws IOException, ClassNotFoundException { + String hadoopOutputFormatClassName = in.readUTF(); + + org.apache.hadoop.conf.Configuration configuration = new org.apache.hadoop.conf.Configuration(); + configuration.readFields(in); + + if(this.configuration == null) { + this.configuration = configuration; + } + + try { + this.mapreduceOutputFormat = (org.apache.hadoop.mapreduce.OutputFormat) Class.forName(hadoopOutputFormatClassName, true, Thread.currentThread().getContextClassLoader()).newInstance(); + } catch (Exception e) { + throw new RuntimeException("Unable to instantiate the hadoop output format", e); + } + } +} \ No newline at end of file diff --git a/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/mapreduce/example/WordCount.java b/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/mapreduce/example/WordCount.java new file mode 100644 index 0000000000000..b7a37c795838a --- /dev/null +++ b/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/mapreduce/example/WordCount.java @@ -0,0 +1,114 @@ +/*********************************************************************************************************************** + * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + **********************************************************************************************************************/ +package eu.stratosphere.hadoopcompatibility.mapreduce.example; + +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.io.IntWritable; +import org.apache.hadoop.io.LongWritable; +import org.apache.hadoop.io.Text; +import org.apache.hadoop.mapreduce.Job; +import org.apache.hadoop.mapreduce.lib.input.TextInputFormat; +import org.apache.hadoop.mapreduce.lib.output.TextOutputFormat; + +import eu.stratosphere.api.java.DataSet; +import eu.stratosphere.api.java.ExecutionEnvironment; +import eu.stratosphere.api.java.aggregation.Aggregations; +import eu.stratosphere.api.java.functions.FlatMapFunction; +import eu.stratosphere.api.java.functions.MapFunction; +import eu.stratosphere.api.java.tuple.Tuple2; +import eu.stratosphere.hadoopcompatibility.mapreduce.HadoopInputFormat; +import eu.stratosphere.hadoopcompatibility.mapreduce.HadoopOutputFormat; +import eu.stratosphere.util.Collector; + +/** + * Implements a word count which takes the input file and counts the number of + * occurrences of each word in the file and writes the result back to disk. + * + * This example shows how to use Hadoop Input Formats, how to convert Hadoop Writables to + * common Java types for better usage in a Stratosphere job and how to use Hadoop Output Formats. + */ +@SuppressWarnings("serial") +public class WordCount { + + public static void main(String[] args) throws Exception { + if (args.length < 2) { + System.err.println("Usage: WordCount "); + return; + } + + final String inputPath = args[0]; + final String outputPath = args[1]; + + final ExecutionEnvironment env = ExecutionEnvironment.getExecutionEnvironment(); + env.setDegreeOfParallelism(1); + + // Set up the Hadoop Input Format + Job job = Job.getInstance(); + HadoopInputFormat hadoopInputFormat = new HadoopInputFormat(new TextInputFormat(), LongWritable.class, Text.class, job); + TextInputFormat.addInputPath(job, new Path(inputPath)); + + // Create a Stratosphere job with it + DataSet> text = env.createInput(hadoopInputFormat); + + // Tokenize the line and convert from Writable "Text" to String for better handling + DataSet> words = text.flatMap(new Tokenizer()); + + // Sum up the words + DataSet> result = words.groupBy(0).aggregate(Aggregations.SUM, 1); + + // Convert String back to Writable "Text" for use with Hadoop Output Format + DataSet> hadoopResult = result.map(new HadoopDatatypeMapper()); + + // Set up Hadoop Output Format + HadoopOutputFormat hadoopOutputFormat = new HadoopOutputFormat(new TextOutputFormat(), job); + hadoopOutputFormat.getConfiguration().set("mapreduce.output.textoutputformat.separator", " "); + TextOutputFormat.setOutputPath(job, new Path(outputPath)); + + // Output & Execute + hadoopResult.output(hadoopOutputFormat); + env.execute("Word Count"); + } + + /** + * Splits a line into words and converts Hadoop Writables into normal Java data types. + */ + public static final class Tokenizer extends FlatMapFunction, Tuple2> { + + @Override + public void flatMap(Tuple2 value, Collector> out) { + // normalize and split the line + String line = value.f1.toString(); + String[] tokens = line.toLowerCase().split("\\W+"); + + // emit the pairs + for (String token : tokens) { + if (token.length() > 0) { + out.collect(new Tuple2(token, 1)); + } + } + } + } + + /** + * Converts Java data types to Hadoop Writables. + */ + public static final class HadoopDatatypeMapper extends MapFunction, Tuple2> { + + @Override + public Tuple2 map(Tuple2 value) throws Exception { + return new Tuple2(new Text(value.f0), new IntWritable(value.f1)); + } + + } + +} diff --git a/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/mapreduce/utils/HadoopUtils.java b/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/mapreduce/utils/HadoopUtils.java new file mode 100644 index 0000000000000..e46aa37e56f99 --- /dev/null +++ b/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/mapreduce/utils/HadoopUtils.java @@ -0,0 +1,80 @@ +/*********************************************************************************************************************** + * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + **********************************************************************************************************************/ +package eu.stratosphere.hadoopcompatibility.mapreduce.utils; + +import java.lang.reflect.Constructor; +import java.util.Map; + +import org.apache.hadoop.conf.Configuration; +import org.apache.hadoop.mapreduce.JobContext; +import org.apache.hadoop.mapreduce.JobID; +import org.apache.hadoop.mapreduce.TaskAttemptContext; +import org.apache.hadoop.mapreduce.TaskAttemptID; + +import eu.stratosphere.runtime.fs.hdfs.DistributedFileSystem; + +public class HadoopUtils { + + /** + * Merge HadoopConfiguration into Configuration. This is necessary for the HDFS configuration. + */ + public static void mergeHadoopConf(Configuration configuration) { + Configuration hadoopConf = DistributedFileSystem.getHadoopConfiguration(); + + for (Map.Entry e : hadoopConf) { + configuration.set(e.getKey(), e.getValue()); + } + } + + public static JobContext instantiateJobContext(Configuration configuration, JobID jobId) throws Exception { + try { + Class clazz; + // for Hadoop 1.xx + if(JobContext.class.isInterface()) { + clazz = Class.forName("org.apache.hadoop.mapreduce.task.JobContextImpl", true, Thread.currentThread().getContextClassLoader()); + } + // for Hadoop 2.xx + else { + clazz = Class.forName("org.apache.hadoop.mapreduce.JobContext", true, Thread.currentThread().getContextClassLoader()); + } + Constructor constructor = clazz.getConstructor(Configuration.class, JobID.class); + JobContext context = (JobContext) constructor.newInstance(configuration, jobId); + + return context; + } + catch(Exception e) { + throw new Exception("Could not create instance of JobContext."); + } + } + + public static TaskAttemptContext instantiateTaskAttemptContext(Configuration configuration, TaskAttemptID taskAttemptID) throws Exception { + try { + Class clazz; + // for Hadoop 1.xx + if(JobContext.class.isInterface()) { + clazz = Class.forName("org.apache.hadoop.mapreduce.task.TaskAttemptContextImpl"); + } + // for Hadoop 2.xx + else { + clazz = Class.forName("org.apache.hadoop.mapreduce.TaskAttemptContext"); + } + Constructor constructor = clazz.getConstructor(Configuration.class, TaskAttemptID.class); + TaskAttemptContext context = (TaskAttemptContext) constructor.newInstance(configuration, taskAttemptID); + + return context; + } + catch(Exception e) { + throw new Exception("Could not create instance of TaskAttemptContext."); + } + } +} diff --git a/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/mapreduce/wrapper/HadoopInputSplit.java b/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/mapreduce/wrapper/HadoopInputSplit.java new file mode 100644 index 0000000000000..656339b6bb6c7 --- /dev/null +++ b/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/mapreduce/wrapper/HadoopInputSplit.java @@ -0,0 +1,86 @@ +/*********************************************************************************************************************** + * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + **********************************************************************************************************************/ + +package eu.stratosphere.hadoopcompatibility.mapreduce.wrapper; + +import java.io.DataInput; +import java.io.DataOutput; +import java.io.IOException; + +import org.apache.hadoop.io.Writable; +import org.apache.hadoop.io.WritableFactories; +import org.apache.hadoop.mapreduce.JobContext; + +import eu.stratosphere.core.io.InputSplit; + + +public class HadoopInputSplit implements InputSplit { + + public transient org.apache.hadoop.mapreduce.InputSplit mapreduceInputSplit; + public transient JobContext jobContext; + + private int splitNumber; + + public org.apache.hadoop.mapreduce.InputSplit getHadoopInputSplit() { + return mapreduceInputSplit; + } + + + public HadoopInputSplit() { + super(); + } + + + public HadoopInputSplit(org.apache.hadoop.mapreduce.InputSplit mapreduceInputSplit, JobContext jobContext) { + if(!(mapreduceInputSplit instanceof Writable)) { + throw new IllegalArgumentException("InputSplit must implement Writable interface."); + } + this.mapreduceInputSplit = mapreduceInputSplit; + this.jobContext = jobContext; + } + + @Override + public void write(DataOutput out) throws IOException { + out.writeInt(this.splitNumber); + out.writeUTF(this.mapreduceInputSplit.getClass().getName()); + Writable w = (Writable) this.mapreduceInputSplit; + w.write(out); + } + + @Override + public void read(DataInput in) throws IOException { + this.splitNumber=in.readInt(); + String className = in.readUTF(); + + if(this.mapreduceInputSplit == null) { + try { + Class inputSplit = + Class.forName(className).asSubclass(org.apache.hadoop.io.Writable.class); + this.mapreduceInputSplit = (org.apache.hadoop.mapreduce.InputSplit) WritableFactories.newInstance(inputSplit); + } + catch (Exception e) { + throw new RuntimeException("Unable to create InputSplit", e); + } + } + ((Writable)this.mapreduceInputSplit).readFields(in); + } + + @Override + public int getSplitNumber() { + return this.splitNumber; + } + + public void setSplitNumber(int splitNumber) { + this.splitNumber = splitNumber; + } +} diff --git a/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/HadoopConfiguration.java b/stratosphere-addons/hadoop-compatibility/src/test/java/eu/stratosphere/test/hadoopcompatibility/mapred/HadoopInputOutputITCase.java similarity index 52% rename from stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/HadoopConfiguration.java rename to stratosphere-addons/hadoop-compatibility/src/test/java/eu/stratosphere/test/hadoopcompatibility/mapred/HadoopInputOutputITCase.java index 350871daa6796..44f7ea78a2079 100644 --- a/stratosphere-addons/hadoop-compatibility/src/main/java/eu/stratosphere/hadoopcompatibility/HadoopConfiguration.java +++ b/stratosphere-addons/hadoop-compatibility/src/test/java/eu/stratosphere/test/hadoopcompatibility/mapred/HadoopInputOutputITCase.java @@ -10,25 +10,31 @@ * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the * specific language governing permissions and limitations under the License. **********************************************************************************************************************/ +package eu.stratosphere.test.hadoopcompatibility.mapred; -package eu.stratosphere.hadoopcompatibility; +import eu.stratosphere.hadoopcompatibility.mapred.example.WordCount; +import eu.stratosphere.test.testdata.WordCountData; +import eu.stratosphere.test.util.JavaProgramTestBase; -import java.util.Map; - -import org.apache.hadoop.mapred.JobConf; - -import eu.stratosphere.runtime.fs.hdfs.DistributedFileSystem; - -/** - * merge hadoopConf into jobConf. This is necessary for the hdfs configuration - - */ - -public class HadoopConfiguration { - public static void mergeHadoopConf(JobConf jobConf) { - org.apache.hadoop.conf.Configuration hadoopConf = DistributedFileSystem.getHadoopConfiguration(); - for (Map.Entry e : hadoopConf) { - jobConf.set(e.getKey(), e.getValue()); - } +public class HadoopInputOutputITCase extends JavaProgramTestBase { + + protected String textPath; + protected String resultPath; + + + @Override + protected void preSubmit() throws Exception { + textPath = createTempFile("text.txt", WordCountData.TEXT); + resultPath = getTempDirPath("result"); + } + + @Override + protected void postSubmit() throws Exception { + compareResultsByLinesInMemory(WordCountData.COUNTS, resultPath + "/1"); + } + + @Override + protected void testProgram() throws Exception { + WordCount.main(new String[] { textPath, resultPath }); } -} +} \ No newline at end of file diff --git a/stratosphere-addons/hadoop-compatibility/src/test/java/eu/stratosphere/test/hadoopcompatibility/HadoopInputOutputTest.java b/stratosphere-addons/hadoop-compatibility/src/test/java/eu/stratosphere/test/hadoopcompatibility/mapred/record/HadoopRecordInputOutputITCase.java similarity index 86% rename from stratosphere-addons/hadoop-compatibility/src/test/java/eu/stratosphere/test/hadoopcompatibility/HadoopInputOutputTest.java rename to stratosphere-addons/hadoop-compatibility/src/test/java/eu/stratosphere/test/hadoopcompatibility/mapred/record/HadoopRecordInputOutputITCase.java index 1ba1c8bbd2782..6ca97969fe09e 100644 --- a/stratosphere-addons/hadoop-compatibility/src/test/java/eu/stratosphere/test/hadoopcompatibility/HadoopInputOutputTest.java +++ b/stratosphere-addons/hadoop-compatibility/src/test/java/eu/stratosphere/test/hadoopcompatibility/mapred/record/HadoopRecordInputOutputITCase.java @@ -11,17 +11,17 @@ * specific language governing permissions and limitations under the License. **********************************************************************************************************************/ -package eu.stratosphere.test.hadoopcompatibility; +package eu.stratosphere.test.hadoopcompatibility.mapred.record; import eu.stratosphere.api.common.Plan; -import eu.stratosphere.hadoopcompatibility.example.WordCountWithHadoopOutputFormat; +import eu.stratosphere.hadoopcompatibility.mapred.record.example.WordCountWithOutputFormat; import eu.stratosphere.test.testdata.WordCountData; import eu.stratosphere.test.util.RecordAPITestBase; /** * test the hadoop inputformat and outputformat for stratosphere */ -public class HadoopInputOutputTest extends RecordAPITestBase { +public class HadoopRecordInputOutputITCase extends RecordAPITestBase { protected String textPath; protected String resultPath; protected String counts; @@ -36,7 +36,7 @@ protected void preSubmit() throws Exception { @Override protected Plan getTestJob() { //WordCountWithHadoopOutputFormat takes hadoop TextInputFormat as input and output file in hadoop TextOutputFormat - WordCountWithHadoopOutputFormat wc = new WordCountWithHadoopOutputFormat(); + WordCountWithOutputFormat wc = new WordCountWithOutputFormat(); return wc.getPlan("1", textPath, resultPath); } diff --git a/stratosphere-addons/hadoop-compatibility/src/test/java/eu/stratosphere/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java b/stratosphere-addons/hadoop-compatibility/src/test/java/eu/stratosphere/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java new file mode 100644 index 0000000000000..fe80b6667451e --- /dev/null +++ b/stratosphere-addons/hadoop-compatibility/src/test/java/eu/stratosphere/test/hadoopcompatibility/mapreduce/HadoopInputOutputITCase.java @@ -0,0 +1,40 @@ +/*********************************************************************************************************************** + * Copyright (C) 2010-2013 by the Stratosphere project (http://stratosphere.eu) + * + * Licensed under the Apache License, Version 2.0 (the "License"); you may not use this file except in compliance with + * the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software distributed under the License is distributed on + * an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the License for the + * specific language governing permissions and limitations under the License. + **********************************************************************************************************************/ +package eu.stratosphere.test.hadoopcompatibility.mapreduce; + +import eu.stratosphere.hadoopcompatibility.mapreduce.example.WordCount; +import eu.stratosphere.test.testdata.WordCountData; +import eu.stratosphere.test.util.JavaProgramTestBase; + +public class HadoopInputOutputITCase extends JavaProgramTestBase { + + protected String textPath; + protected String resultPath; + + + @Override + protected void preSubmit() throws Exception { + textPath = createTempFile("text.txt", WordCountData.TEXT); + resultPath = getTempDirPath("result"); + } + + @Override + protected void postSubmit() throws Exception { + compareResultsByLinesInMemory(WordCountData.COUNTS, resultPath + "/1"); + } + + @Override + protected void testProgram() throws Exception { + WordCount.main(new String[] { textPath, resultPath }); + } +} \ No newline at end of file