Keys are AvroContainerFileHeader objects which contain the Avro container file header.
+ * Values are AvroContainerFileBlock objects which contain decompressed Avro container file blocks,
+ * which consist of a number of binary-encoded objects.
+ */
+public class AvroEncodedInputFormat
+ extends FileInputFormat {
+
+ /**
+ * Utility static method for Hadoop Map/Reduce jobs.
+ *
+ * Constructs a DataFileStream given an Avro container file header and one block.
+ */
+ public static DataFileStream stream(
+ DatumReader reader, AvroContainerFileHeader key, AvroContainerFileBlock value)
+ throws IOException {
+ return new DataFileStream(new AvroBlockInputStream(key, value), reader);
+ }
+
+ /**
+ * Utility static method for Hadoop Map/Reduce jobs.
+ *
+ * Constructs a DataFileStream given an Avro container file header and some blocks.
+ */
+ public static DataFileStream stream(
+ DatumReader reader, AvroContainerFileHeader key, Iterable values)
+ throws IOException {
+ return new DataFileStream(new AvroBlockInputStream(key, values), reader);
+ }
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public RecordReader createRecordReader(
+ InputSplit split, TaskAttemptContext context) throws IOException, InterruptedException {
+ return new AvroEncodedRecordReader();
+ }
+
+ static class AvroBlockInputStream extends InputStream {
+
+ private InputStream stream;
+ private Iterator iterator;
+
+ AvroBlockInputStream(AvroContainerFileHeader header, AvroContainerFileBlock value) {
+ this.stream = bytesWritableToStream(header.unwrap());
+ this.iterator = Collections.singletonList(value).iterator();
+ }
+
+ AvroBlockInputStream(AvroContainerFileHeader header, Iterable values) {
+ this.stream = bytesWritableToStream(header.unwrap());
+ this.iterator = values.iterator();
+ }
+
+ private void update() throws IOException {
+ if (stream.available() == 0 && iterator.hasNext()) {
+ stream = bytesWritableToStream(iterator.next().unwrap());
+ }
+ }
+
+ private InputStream bytesWritableToStream(BytesWritable bw) {
+ return new ByteArrayInputStream(bw.getBytes(), 0, bw.getLength());
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ update();
+ return stream.read(b, off, len);
+ }
+
+ @Override
+ public int read() throws IOException {
+ update();
+ return stream.read();
+ }
+
+ @Override
+ public int available() throws IOException {
+ return stream.available();
+ }
+
+ @Override
+ public long skip(long n) throws IOException {
+ return stream.skip(n);
+ }
+
+ @Override
+ public void close() throws IOException {
+ stream.close();
+ stream = new ByteArrayInputStream(new byte[0]);
+ iterator = Collections.emptyList().iterator();
+ super.close();
+ }
+ }
+}
+
diff --git a/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroEncodedOutputFormat.java b/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroEncodedOutputFormat.java
new file mode 100644
index 00000000000..f02abbe0834
--- /dev/null
+++ b/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroEncodedOutputFormat.java
@@ -0,0 +1,75 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.mapreduce;
+
+import org.apache.avro.Schema;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.hadoop.io.AvroSerialization;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+
+/**
+ * FileOutputFormat for writing Avro container files with already-encoded objects.
+ *
+ * Keys are BytesWritable containing a sequence of binary-encoded objects. Values are
+ * LongWritable containing the object count in the key.
+ *
+ * No validation is performed to verify that the schema used in the encoding matches the writer
+ * schema declared in the job configuration.
+ */
+public class AvroEncodedOutputFormat
+ extends AvroOutputFormatBase {
+
+ /**
+ * {@inheritDoc}
+ */
+ @Override
+ public RecordWriter getRecordWriter(TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ Configuration conf = context.getConfiguration();
+ GenericData dataModel = AvroSerialization.createDataModel(conf);
+
+ // Get the writer schema.
+ Schema writerSchema = AvroJob.getOutputKeySchema(conf);
+ boolean isMapOnly = context.getNumReduceTasks() == 0;
+ if (isMapOnly) {
+ Schema mapOutputSchema = AvroJob.getMapOutputKeySchema(conf);
+ if (mapOutputSchema != null) {
+ writerSchema = mapOutputSchema;
+ }
+ }
+ if (null == writerSchema) {
+ throw new IOException(
+ "AvroEncodedOutputFormat requires an output schema. " +
+ "Use AvroJob.setOutputKeySchema().");
+ }
+
+ return new AvroEncodedRecordWriter(
+ writerSchema,
+ dataModel,
+ getCompressionCodec(context),
+ getAvroFileOutputStream(context),
+ getSyncInterval(context));
+ }
+}
diff --git a/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroEncodedRecordReader.java b/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroEncodedRecordReader.java
new file mode 100644
index 00000000000..e851e4ac0de
--- /dev/null
+++ b/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroEncodedRecordReader.java
@@ -0,0 +1,95 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.mapreduce;
+
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.file.SeekableInput;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.mapred.AvroContainerFileBlock;
+import org.apache.avro.mapred.AvroContainerFileHeader;
+import org.apache.hadoop.mapreduce.InputSplit;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+/**
+ * Reads file header and decompressed, binary-encoded object blocks from an input split
+ * representing a chunk of an Avro container file.
+ */
+public class AvroEncodedRecordReader
+ extends AvroRecordReaderBase {
+
+ private BlockDataFileReader fileReader;
+ private AvroContainerFileHeader fileHeader;
+
+ public AvroEncodedRecordReader() {
+ super(null);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ protected DataFileReader createAvroFileReader(
+ SeekableInput input, DatumReader datumReader)
+ throws IOException {
+ return fileReader = new BlockDataFileReader(input, datumReader);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void initialize(InputSplit inputSplit, TaskAttemptContext context)
+ throws IOException, InterruptedException {
+ super.initialize(inputSplit, context);
+ fileHeader = new AvroContainerFileHeader(fileReader.getSchema());
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public AvroContainerFileHeader getCurrentKey() {
+ return fileHeader;
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public AvroContainerFileBlock getCurrentValue() {
+ return fileReader.block;
+ }
+
+ static class BlockDataFileReader extends DataFileReader {
+
+ AvroContainerFileBlock block = new AvroContainerFileBlock();
+
+ BlockDataFileReader(SeekableInput sin, DatumReader reader)
+ throws IOException {
+ super(sin, reader);
+ }
+
+ @Override
+ public IndexedRecord next(IndexedRecord indexedRecord) throws IOException {
+ ByteBuffer bb = nextBlock();
+ final long count = getBlockCount();
+ final int pos = bb.arrayOffset() + bb.position();
+ final int len = bb.remaining();
+ block.set(count, bb.array(), pos, len);
+ blockFinished();
+ return null;
+ }
+ }
+}
diff --git a/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroEncodedRecordWriter.java b/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroEncodedRecordWriter.java
new file mode 100644
index 00000000000..423e9f31080
--- /dev/null
+++ b/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroEncodedRecordWriter.java
@@ -0,0 +1,81 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.mapreduce;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.CodecFactory;
+import org.apache.avro.file.DataFileWriter;
+import org.apache.avro.generic.GenericData;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.RecordWriter;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+
+import java.io.IOException;
+import java.io.OutputStream;
+import java.nio.ByteBuffer;
+
+/**
+ * Writes binary-encoded Avro records to an Avro container file output stream.
+ */
+public class AvroEncodedRecordWriter
+ extends RecordWriter
+ implements Syncable {
+
+ private final DataFileWriter fileWriter;
+
+ @SuppressWarnings("unchecked")
+ public AvroEncodedRecordWriter(Schema writerSchema,
+ GenericData dataModel,
+ CodecFactory compressionCodec,
+ OutputStream outputStream,
+ int syncInterval)
+ throws IOException {
+ fileWriter = new DataFileWriter(dataModel.createDatumWriter(writerSchema));
+ fileWriter.setCodec(compressionCodec);
+ fileWriter.setSyncInterval(syncInterval);
+ fileWriter.create(writerSchema, outputStream);
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public long sync() throws IOException {
+ return fileWriter.sync();
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void write(BytesWritable encodedObjects, LongWritable objectsCount) throws IOException {
+ long count = objectsCount.get();
+ if (count <= 0) {
+ throw new IOException("AvroEncodedRecordWriter requires non-negative object count.");
+ }
+ while (--count > 0) {
+ fileWriter.appendEncoded(ByteBuffer.wrap(new byte[0]));
+ }
+ fileWriter.appendEncoded(
+ ByteBuffer.wrap(encodedObjects.getBytes(), 0, encodedObjects.getLength()));
+ }
+
+ /** {@inheritDoc} */
+ @Override
+ public void close(TaskAttemptContext context) throws IOException {
+ fileWriter.close();
+ }
+}
diff --git a/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroEncodedInputAndOutputFormats.java b/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroEncodedInputAndOutputFormats.java
new file mode 100644
index 00000000000..1548c66389d
--- /dev/null
+++ b/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroEncodedInputAndOutputFormats.java
@@ -0,0 +1,205 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.mapreduce;
+
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.generic.GenericDatumWriter;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.io.BinaryEncoder;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.io.DatumWriter;
+import org.apache.avro.io.EncoderFactory;
+import org.apache.avro.mapred.AvroContainerFileBlock;
+import org.apache.avro.mapred.AvroContainerFileHeader;
+import org.apache.avro.mapred.AvroKey;
+import org.apache.avro.mapred.FsInput;
+import org.apache.avro.reflect.ReflectDatumReader;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.FileStatus;
+import org.apache.hadoop.fs.FileSystem;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.NullWritable;
+import org.apache.hadoop.mapreduce.*;
+import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.ByteArrayOutputStream;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import static org.easymock.EasyMock.*;
+import static org.junit.Assert.assertNotNull;
+
+public class TestAvroEncodedInputAndOutputFormats {
+
+ /** A temporary directory for test data. */
+ @Rule
+ public TemporaryFolder tmpFolder = new TemporaryFolder();
+
+ /**
+ * Verifies that a non-null record reader can be created, and the key/value types are
+ * as expected.
+ */
+ @Test
+ public void testCreateRecordReader() throws IOException, InterruptedException {
+ // Set up the job configuration.
+ Job job = new Job();
+ Configuration conf = job.getConfiguration();
+
+ FileSplit inputSplit = createMock(FileSplit.class);
+ TaskAttemptContext context = createMock(TaskAttemptContext.class);
+ expect(context.getConfiguration()).andReturn(conf).anyTimes();
+
+ replay(inputSplit);
+ replay(context);
+
+ AvroEncodedInputFormat inputFormat = new AvroEncodedInputFormat();
+ @SuppressWarnings("unchecked")
+ RecordReader recordReader =
+ inputFormat.createRecordReader(inputSplit, context);
+ assertNotNull(inputFormat);
+ recordReader.close();
+
+ verify(inputSplit);
+ verify(context);
+ }
+
+ private static class TestMapper extends
+ Mapper, NullWritable> {
+
+ private AvroKey mAvroKey = new AvroKey();
+
+ @Override
+ protected void map(AvroContainerFileHeader key, AvroContainerFileBlock value, Context context)
+ throws IOException, InterruptedException {
+
+ assert TextStats.getClassSchema().equals(key.getWriterSchema());
+ assert value.getObjectCount() > 0;
+
+ DatumReader datumReader = new ReflectDatumReader(TextStats.class);
+ for (TextStats ts : AvroEncodedInputFormat.stream(datumReader, key, value)) {
+ mAvroKey.datum(ts);
+ context.write(mAvroKey, NullWritable.get());
+ }
+ }
+ }
+
+ private Job createTestJob(String outputLabel) throws Exception {
+ Job job = new Job();
+ FileInputFormat.setInputPaths(job, new Path(getClass()
+ .getResource("/org/apache/avro/mapreduce/mapreduce-test-input.avro")
+ .toURI().toString()));
+ job.setInputFormatClass(AvroEncodedInputFormat.class);
+ Path outputPath = new Path(tmpFolder.getRoot().getPath() + "/out-" + outputLabel);
+ FileOutputFormat.setOutputPath(job, outputPath);
+ return job;
+ }
+
+ /** Checks that the results from the MapReduce were as expected. */
+ private void validateTestJobResults(Job job) throws Exception {
+ Path outputPath = FileOutputFormat.getOutputPath(job);
+ FileSystem fileSystem = FileSystem.get(job.getConfiguration());
+ FileStatus[] outputFiles = fileSystem.globStatus(outputPath.suffix("/part-*"));
+ Assert.assertEquals(1, outputFiles.length);
+
+ DataFileReader reader = new DataFileReader(
+ new FsInput(outputFiles[0].getPath(), job.getConfiguration()),
+ new SpecificDatumReader());
+ Map counts = new HashMap();
+ for (TextStats record : reader) {
+ counts.put(record.name.toString(), record.count);
+ }
+ reader.close();
+
+ Assert.assertEquals(3, counts.get("apple").intValue());
+ Assert.assertEquals(2, counts.get("banana").intValue());
+ Assert.assertEquals(1, counts.get("carrot").intValue());
+ }
+
+ /**
+ * Verifies that the AvroEncodedInputFormat behaves as expected.
+ */
+ @Test
+ public void testInputFormat() throws Exception {
+ Job job = createTestJob("test-avro-encoded-input-format");
+
+ job.setMapperClass(TestMapper.class);
+ AvroJob.setMapOutputKeySchema(job, TextStats.getClassSchema());
+ job.setNumReduceTasks(0);
+ job.setOutputFormatClass(AvroKeyOutputFormat.class);
+
+ Assert.assertTrue(job.waitForCompletion(true));
+
+ validateTestJobResults(job);
+ }
+
+ private static class TestReducer
+ extends Reducer {
+
+ @Override
+ protected void reduce(
+ AvroContainerFileHeader key, Iterable values, Context context)
+ throws IOException, InterruptedException {
+
+ assert TextStats.getClassSchema().equals(key.getWriterSchema());
+
+ DatumReader datumReader = new ReflectDatumReader(TextStats.class);
+ DatumWriter datumWriter =
+ new GenericDatumWriter(TextStats.getClassSchema());
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ BinaryEncoder encoder = null;
+ for (TextStats ts : AvroEncodedInputFormat.stream(datumReader, key, values)) {
+ outputStream.reset();
+ encoder = EncoderFactory.get().directBinaryEncoder(outputStream, encoder);
+ datumWriter.write(ts, encoder);
+ context.write(new BytesWritable(outputStream.toByteArray()), new LongWritable(1L));
+ }
+ }
+ }
+
+ /**
+ * Verifies that the AvroEncodedOutputFormat behaves as expected, assuming that
+ * AvroEncodedInputFormat behaves as expected.
+ */
+ @Test
+ public void testOutputFormat() throws Exception {
+ Job job = createTestJob("test-avro-encoded-output-format");
+
+ job.setMapOutputKeyClass(AvroContainerFileHeader.class);
+ job.setMapOutputValueClass(AvroContainerFileBlock.class);
+ job.setReducerClass(TestReducer.class);
+ AvroJob.setOutputKeySchema(job, TextStats.getClassSchema());
+ job.setOutputKeyClass(BytesWritable.class);
+ job.setOutputValueClass(LongWritable.class);
+ job.setOutputFormatClass(AvroEncodedOutputFormat.class);
+
+ Assert.assertTrue(job.waitForCompletion(true));
+
+ validateTestJobResults(job);
+ }
+}
diff --git a/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroEncodedRecordReader.java b/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroEncodedRecordReader.java
new file mode 100644
index 00000000000..874f7cc6b86
--- /dev/null
+++ b/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroEncodedRecordReader.java
@@ -0,0 +1,173 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.mapreduce;
+
+import org.apache.avro.Schema;
+import org.apache.avro.SchemaBuilder;
+import org.apache.avro.file.SeekableFileInput;
+import org.apache.avro.file.SeekableInput;
+import org.apache.avro.generic.GenericDatumReader;
+import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.GenericRecordBuilder;
+import org.apache.avro.io.BinaryDecoder;
+import org.apache.avro.io.DecoderFactory;
+import org.apache.avro.mapred.AvroContainerFileBlock;
+import org.apache.avro.mapred.AvroContainerFileHeader;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.mapreduce.RecordReader;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.apache.hadoop.mapreduce.lib.input.FileSplit;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.File;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+
+import static org.easymock.EasyMock.*;
+import static org.junit.Assert.*;
+
+public class TestAvroEncodedRecordReader {
+
+ /**
+ * A temporary directory for test data.
+ */
+ @Rule
+ public TemporaryFolder mTempDir = new TemporaryFolder();
+
+ /**
+ * Verifies that avro records can be read and progress is reported correctly.
+ */
+ @Test
+ public void testReadRecords() throws IOException, InterruptedException {
+
+ // Create the test avro file input with three records.
+ Schema schema = SchemaBuilder.builder().record("stats").fields()
+ .name("counts").type(Schema.create(Schema.Type.INT)).noDefault()
+ .name("name").type(Schema.create(Schema.Type.STRING)).noDefault()
+ .endRecord();
+
+ GenericRecord firstInputRecord = new GenericRecordBuilder(schema)
+ .set("counts", 3)
+ .set("name", "apple")
+ .build();
+
+ GenericRecord secondInputRecord = new GenericRecordBuilder(schema)
+ .set("counts", 2)
+ .set("name", "banana")
+ .build();
+
+ GenericRecord thirdInputRecord = new GenericRecordBuilder(schema)
+ .set("counts", 1)
+ .set("name", "carrot")
+ .build();
+
+ final SeekableInput avroFileInput = new SeekableFileInput(
+ AvroFiles.createFile(
+ new File(mTempDir.getRoot(), "myInputFile.avro"),
+ schema,
+ firstInputRecord,
+ secondInputRecord,
+ thirdInputRecord));
+
+ // Create the record reader over the avro input file.
+ RecordReader recordReader
+ = new AvroEncodedRecordReader() {
+ @Override
+ protected SeekableInput createSeekableInput(Configuration conf, Path path)
+ throws IOException {
+ return avroFileInput;
+ }
+ };
+
+ // Set up the job configuration.
+ Configuration conf = new Configuration();
+
+ // Create a mock input split for this record reader.
+ FileSplit inputSplit = createMock(FileSplit.class);
+ expect(inputSplit.getPath()).andReturn(new Path("/path/to/an/avro/file")).anyTimes();
+ expect(inputSplit.getStart()).andReturn(0L).anyTimes();
+ expect(inputSplit.getLength()).andReturn(avroFileInput.length()).anyTimes();
+
+ // Create a mock task attempt context for this record reader.
+ TaskAttemptContext context = createMock(TaskAttemptContext.class);
+ expect(context.getConfiguration()).andReturn(conf).anyTimes();
+
+ // Initialize the record reader.
+ replay(inputSplit);
+ replay(context);
+ recordReader.initialize(inputSplit, context);
+
+ assertEquals("Progress should be zero before any records are read",
+ 0.0f, recordReader.getProgress(), 0.0f);
+
+ // Some variables to hold the records.
+ AvroContainerFileHeader key;
+ AvroContainerFileBlock value;
+
+ // Read the first record.
+ assertTrue("Expected at least one record", recordReader.nextKeyValue());
+ key = recordReader.getCurrentKey();
+ value = recordReader.getCurrentValue();
+
+ assertNotNull("First record had null key", key);
+ assertNotNull("First record had null value", value);
+
+ assertEquals(schema, key.getWriterSchema());
+
+ assertEquals(3L, value.getObjectCount());
+
+ ByteBuffer encodedObjects = value.getEncodedObjects();
+ assertEquals(23, encodedObjects.remaining());
+ assertEquals(0x1422132F, encodedObjects.hashCode());
+
+ assertEquals("Progress should be complete (1 out of 1 records processed)",
+ 1.0f, recordReader.getProgress(), 0.0f);
+
+ GenericDatumReader datumReader = new GenericDatumReader(schema);
+ InputStream encodedStream = value.getEncodedObjectStream();
+ BinaryDecoder decoder = DecoderFactory.get().directBinaryDecoder(encodedStream, null);
+ GenericRecord record = datumReader.read(null, decoder);
+ assertEquals(3, record.get("counts"));
+ assertEquals("apple", record.get("name").toString());
+
+ record = datumReader.read(record, decoder);
+ assertEquals(2, record.get("counts"));
+ assertEquals("banana", record.get("name").toString());
+
+ record = datumReader.read(record, decoder);
+ assertEquals(1, record.get("counts"));
+ assertEquals("carrot", record.get("name").toString());
+
+ assertEquals(0, encodedStream.available());
+
+ // There should be no more records.
+ assertFalse("Expected only 1 record", recordReader.nextKeyValue());
+
+ // Close the record reader.
+ recordReader.close();
+
+ // Verify the expected calls on the mocks.
+ verify(inputSplit);
+ verify(context);
+ }
+}
diff --git a/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroEncodedRecordWriter.java b/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroEncodedRecordWriter.java
new file mode 100644
index 00000000000..193d460ebed
--- /dev/null
+++ b/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroEncodedRecordWriter.java
@@ -0,0 +1,154 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.avro.mapreduce;
+
+import org.apache.avro.Schema;
+import org.apache.avro.file.CodecFactory;
+import org.apache.avro.file.DataFileConstants;
+import org.apache.avro.file.DataFileReader;
+import org.apache.avro.file.DataFileStream;
+import org.apache.avro.generic.GenericData;
+import org.apache.avro.io.DatumReader;
+import org.apache.avro.mapred.FsInput;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.specific.SpecificDatumReader;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.mapreduce.TaskAttemptContext;
+import org.junit.Rule;
+import org.junit.Test;
+import org.junit.rules.TemporaryFolder;
+
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.File;
+import java.io.FileOutputStream;
+import java.io.InputStream;
+import java.io.IOException;
+
+import static org.easymock.EasyMock.*;
+import static org.junit.Assert.*;
+
+public class TestAvroEncodedRecordWriter {
+
+ /**
+ * A temporary directory for test data.
+ */
+ @Rule
+ public TemporaryFolder mTempDir = new TemporaryFolder();
+
+ /**
+ * Verifies that AvroEncodedRecordWriter performs as expected.
+ */
+ @Test
+ public void testWrite() throws IOException {
+ Schema writerSchema = Schema.create(Schema.Type.INT);
+ GenericData dataModel = new ReflectData();
+ CodecFactory compressionCodec = CodecFactory.deflateCodec(6);
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ TaskAttemptContext context = createMock(TaskAttemptContext.class);
+
+ replay(context);
+
+ // Write an avro container file with two records: 1 and 2.
+ AvroEncodedRecordWriter recordWriter = new AvroEncodedRecordWriter(
+ writerSchema,
+ dataModel,
+ compressionCodec,
+ outputStream,
+ DataFileConstants.DEFAULT_SYNC_INTERVAL);
+
+ BytesWritable encodedObjects = new BytesWritable(new byte[2]);
+ encodedObjects.getBytes()[0] = 2;
+ encodedObjects.getBytes()[1] = 4;
+ LongWritable objectsCount = new LongWritable(2L);
+ recordWriter.write(encodedObjects, objectsCount);
+ recordWriter.close(context);
+
+ verify(context);
+
+ // Verify that the file was written as expected.
+ InputStream inputStream = new ByteArrayInputStream(outputStream.toByteArray());
+ Schema readerSchema = Schema.create(Schema.Type.INT);
+ DatumReader datumReader = new SpecificDatumReader(readerSchema);
+ DataFileStream dataFileReader = new DataFileStream(inputStream, datumReader);
+
+ assertTrue(dataFileReader.hasNext()); // Record 1.
+ assertEquals(1, dataFileReader.next().intValue());
+ assertTrue(dataFileReader.hasNext()); // Record 2.
+ assertEquals(2, dataFileReader.next().intValue());
+ assertFalse(dataFileReader.hasNext()); // No more records.
+
+ dataFileReader.close();
+ }
+
+ /**
+ * Verifies that AvroEncodedRecordWriter performs as expected with syncable outputs..
+ */
+ @Test
+ public void testSyncableWrite() throws IOException {
+ Schema writerSchema = Schema.create(Schema.Type.INT);
+ GenericData dataModel = new ReflectData();
+ CodecFactory compressionCodec = CodecFactory.deflateCodec(6);
+ FileOutputStream outputStream =
+ new FileOutputStream(new File(mTempDir.getRoot(), "temp.avro"));
+ TaskAttemptContext context = createMock(TaskAttemptContext.class);
+
+ replay(context);
+
+ // Write an avro container file with two records: 1 and 2.
+ AvroEncodedRecordWriter recordWriter = new AvroEncodedRecordWriter(
+ writerSchema,
+ dataModel,
+ compressionCodec,
+ outputStream,
+ DataFileConstants.DEFAULT_SYNC_INTERVAL);
+
+ BytesWritable encodedObjects = new BytesWritable(new byte[1]);
+ LongWritable objectsCount = new LongWritable(1L);
+ long positionOne = recordWriter.sync();
+ encodedObjects.getBytes()[0] = 2;
+ recordWriter.write(encodedObjects, objectsCount);
+ long positionTwo = recordWriter.sync();
+ encodedObjects.getBytes()[0] = 4;
+ recordWriter.write(encodedObjects, objectsCount);
+ recordWriter.close(context);
+
+ verify(context);
+
+ // Verify that the file was written as expected.
+ Configuration conf = new Configuration();
+ conf.set("fs.default.name", "file:///");
+ Path avroFile = new Path(new File(mTempDir.getRoot(), "temp.avro").toString());
+ DataFileReader dataFileReader =
+ new DataFileReader(new FsInput(avroFile, conf), new SpecificDatumReader());
+
+ dataFileReader.seek(positionTwo);
+ assertTrue(dataFileReader.hasNext()); // Record 2.
+ assertEquals(2, (Object) dataFileReader.next());
+
+ dataFileReader.seek(positionOne);
+ assertTrue(dataFileReader.hasNext()); // Record 1.
+ assertEquals(1, (Object) dataFileReader.next());
+
+ dataFileReader.close();
+ }
+}