Permalink
Browse files

AVRO-1162. Java: Extend AvroKeyValueOutputFormat to support reflectio…

…n. Contributed by Alexandre Normand.

git-svn-id: https://svn.apache.org/repos/asf/avro/trunk@1384600 13f79535-47bb-0310-9956-ffa450edef68
  • Loading branch information...
cutting committed Sep 13, 2012
1 parent 13bb8b2 commit a6cc5ca284ab09563228129dd700fb33dcd12d51
View
@@ -28,6 +28,9 @@ Avro 1.7.2 (unreleased)
AVRO-1129. C: Detect when avro_schema_decref frees schema.
(Maxim Pugachev via dcreager)
+ AVRO-1162. Java: Extend AvroKeyValueOutputFormat to support
+ reflection. (Alexandre Normand via cutting)
+
BUG FIXES
AVRO-1128. Java: Fix SpecificRecordBase#equals() to work for
@@ -27,8 +27,8 @@
import org.apache.avro.file.CodecFactory;
import org.apache.avro.file.DataFileWriter;
import org.apache.avro.generic.GenericData;
-import org.apache.avro.generic.GenericDatumWriter;
import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.reflect.ReflectDatumWriter;
import org.apache.hadoop.mapreduce.RecordWriter;
import org.apache.hadoop.mapreduce.TaskAttemptContext;
@@ -69,7 +69,7 @@ public AvroKeyValueRecordWriter(AvroDatumConverter<K, ?> keyConverter,
// Create an Avro container file and a writer to it.
mAvroFileWriter = new DataFileWriter<GenericRecord>(
- new GenericDatumWriter<GenericRecord>(mKeyValuePairSchema));
+ new ReflectDatumWriter<GenericRecord>(mKeyValuePairSchema));
mAvroFileWriter.setCodec(compressionCodec);
mAvroFileWriter.create(mKeyValuePairSchema, outputStream);
@@ -34,6 +34,8 @@
import org.apache.avro.hadoop.io.AvroKeyValue;
import org.apache.avro.io.DatumReader;
import org.apache.avro.mapred.AvroValue;
+import org.apache.avro.reflect.ReflectData;
+import org.apache.avro.reflect.ReflectDatumReader;
import org.apache.avro.specific.SpecificDatumReader;
import org.apache.hadoop.io.Text;
import org.apache.hadoop.mapreduce.Job;
@@ -101,4 +103,56 @@ public void testWriteRecords() throws IOException {
assertFalse(avroFileReader.hasNext());
avroFileReader.close();
}
+
+ public static class R1 {
+ String attribute;
+ }
+ @Test public void testUsingReflection() throws Exception {
+ Job job = new Job();
+ Schema schema = ReflectData.get().getSchema(R1.class);
+ AvroJob.setOutputValueSchema(job, schema);
+ TaskAttemptContext context = createMock(TaskAttemptContext.class);
+ replay(context);
+
+ R1 record = new R1();
+ record.attribute = "test";
+ AvroValue<R1> avroValue = new AvroValue<R1>(record);
+
+ ByteArrayOutputStream outputStream = new ByteArrayOutputStream();
+ AvroDatumConverterFactory factory =
+ new AvroDatumConverterFactory(job.getConfiguration());
+
+ AvroDatumConverter<Text, ?> keyConverter = factory.create(Text.class);
+
+ @SuppressWarnings("unchecked")
+ AvroDatumConverter<AvroValue<R1>, R1> valueConverter =
+ factory.create((Class<AvroValue<R1>>) avroValue.getClass());
+
+ AvroKeyValueRecordWriter<Text, AvroValue<R1>> writer =
+ new AvroKeyValueRecordWriter<Text, AvroValue<R1>>(keyConverter,
+ valueConverter, CodecFactory.nullCodec(), outputStream);
+
+ writer.write(new Text("reflectionData"), avroValue);
+ writer.close(context);
+
+ verify(context);
+
+ ByteArrayInputStream inputStream = new ByteArrayInputStream(outputStream.toByteArray());
+ Schema readerSchema = AvroKeyValue.getSchema(
+ Schema.create(Schema.Type.STRING), schema);
+ DatumReader<GenericRecord> datumReader =
+ new ReflectDatumReader<GenericRecord>(readerSchema);
+ DataFileStream<GenericRecord> avroFileReader =
+ new DataFileStream<GenericRecord>(inputStream, datumReader);
+
+ // Verify that the first record was written.
+ assertTrue(avroFileReader.hasNext());
+
+ // Verify that the record holds the same data that we've written
+ AvroKeyValue<CharSequence, R1> firstRecord =
+ new AvroKeyValue<CharSequence, R1>(avroFileReader.next());
+ assertNotNull(firstRecord.get());
+ assertEquals("reflectionData", firstRecord.getKey().toString());
+ assertEquals(record.attribute, firstRecord.getValue().attribute);
+ }
}

0 comments on commit a6cc5ca

Please sign in to comment.