From a6cc5ca284ab09563228129dd700fb33dcd12d51 Mon Sep 17 00:00:00 2001 From: Doug Cutting Date: Thu, 13 Sep 2012 23:40:10 +0000 Subject: [PATCH] AVRO-1162. Java: Extend AvroKeyValueOutputFormat to support reflection. Contributed by Alexandre Normand. git-svn-id: https://svn.apache.org/repos/asf/avro/trunk@1384600 13f79535-47bb-0310-9956-ffa450edef68 --- CHANGES.txt | 3 ++ .../mapreduce/AvroKeyValueRecordWriter.java | 4 +- .../TestAvroKeyValueRecordWriter.java | 54 +++++++++++++++++++ 3 files changed, 59 insertions(+), 2 deletions(-) diff --git a/CHANGES.txt b/CHANGES.txt index 1fd44d8bc5e..c2fe090bc37 100644 --- a/CHANGES.txt +++ b/CHANGES.txt @@ -28,6 +28,9 @@ Avro 1.7.2 (unreleased) AVRO-1129. C: Detect when avro_schema_decref frees schema. (Maxim Pugachev via dcreager) + AVRO-1162. Java: Extend AvroKeyValueOutputFormat to support + reflection. (Alexandre Normand via cutting) + BUG FIXES AVRO-1128. Java: Fix SpecificRecordBase#equals() to work for diff --git a/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyValueRecordWriter.java b/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyValueRecordWriter.java index 77c6e2a3423..983a8497408 100644 --- a/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyValueRecordWriter.java +++ b/lang/java/mapred/src/main/java/org/apache/avro/mapreduce/AvroKeyValueRecordWriter.java @@ -27,8 +27,8 @@ import org.apache.avro.file.CodecFactory; import org.apache.avro.file.DataFileWriter; import org.apache.avro.generic.GenericData; -import org.apache.avro.generic.GenericDatumWriter; import org.apache.avro.generic.GenericRecord; +import org.apache.avro.reflect.ReflectDatumWriter; import org.apache.hadoop.mapreduce.RecordWriter; import org.apache.hadoop.mapreduce.TaskAttemptContext; @@ -69,7 +69,7 @@ public AvroKeyValueRecordWriter(AvroDatumConverter keyConverter, // Create an Avro container file and a writer to it. mAvroFileWriter = new DataFileWriter( - new GenericDatumWriter(mKeyValuePairSchema)); + new ReflectDatumWriter(mKeyValuePairSchema)); mAvroFileWriter.setCodec(compressionCodec); mAvroFileWriter.create(mKeyValuePairSchema, outputStream); diff --git a/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyValueRecordWriter.java b/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyValueRecordWriter.java index 4739a063d59..03bfd8ba586 100644 --- a/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyValueRecordWriter.java +++ b/lang/java/mapred/src/test/java/org/apache/avro/mapreduce/TestAvroKeyValueRecordWriter.java @@ -34,6 +34,8 @@ import org.apache.avro.hadoop.io.AvroKeyValue; import org.apache.avro.io.DatumReader; import org.apache.avro.mapred.AvroValue; +import org.apache.avro.reflect.ReflectData; +import org.apache.avro.reflect.ReflectDatumReader; import org.apache.avro.specific.SpecificDatumReader; import org.apache.hadoop.io.Text; import org.apache.hadoop.mapreduce.Job; @@ -101,4 +103,56 @@ public void testWriteRecords() throws IOException { assertFalse(avroFileReader.hasNext()); avroFileReader.close(); } + + public static class R1 { + String attribute; + } + @Test public void testUsingReflection() throws Exception { + Job job = new Job(); + Schema schema = ReflectData.get().getSchema(R1.class); + AvroJob.setOutputValueSchema(job, schema); + TaskAttemptContext context = createMock(TaskAttemptContext.class); + replay(context); + + R1 record = new R1(); + record.attribute = "test"; + AvroValue avroValue = new AvroValue(record); + + ByteArrayOutputStream outputStream = new ByteArrayOutputStream(); + AvroDatumConverterFactory factory = + new AvroDatumConverterFactory(job.getConfiguration()); + + AvroDatumConverter keyConverter = factory.create(Text.class); + + @SuppressWarnings("unchecked") + AvroDatumConverter, R1> valueConverter = + factory.create((Class>) avroValue.getClass()); + + AvroKeyValueRecordWriter> writer = + new AvroKeyValueRecordWriter>(keyConverter, + valueConverter, CodecFactory.nullCodec(), outputStream); + + writer.write(new Text("reflectionData"), avroValue); + writer.close(context); + + verify(context); + + ByteArrayInputStream inputStream = new ByteArrayInputStream(outputStream.toByteArray()); + Schema readerSchema = AvroKeyValue.getSchema( + Schema.create(Schema.Type.STRING), schema); + DatumReader datumReader = + new ReflectDatumReader(readerSchema); + DataFileStream avroFileReader = + new DataFileStream(inputStream, datumReader); + + // Verify that the first record was written. + assertTrue(avroFileReader.hasNext()); + + // Verify that the record holds the same data that we've written + AvroKeyValue firstRecord = + new AvroKeyValue(avroFileReader.next()); + assertNotNull(firstRecord.get()); + assertEquals("reflectionData", firstRecord.getKey().toString()); + assertEquals(record.attribute, firstRecord.getValue().attribute); + } }