Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

Already on GitHub? Sign in to your account

Filtered Reader Implementation, Avro Specific Support #68

Closed
wants to merge 11 commits into
from
View
@@ -16,6 +16,7 @@
<url>https://github.com/Parquet/parquet-mr</url>
<properties>
+ <avro.version>1.7.4</avro.version>
</properties>
<dependencies>
@@ -37,7 +38,7 @@
<dependency>
<groupId>org.apache.avro</groupId>
<artifactId>avro</artifactId>
- <version>1.7.3</version>
+ <version>${avro.version}</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
@@ -107,6 +108,44 @@
</execution>
</executions>
</plugin>
+ <plugin>
+ <groupId>org.apache.avro</groupId>
+ <artifactId>avro-maven-plugin</artifactId>
+ <version>${avro.version}</version>
+ <executions>
+ <execution>
+ <phase>generate-test-sources</phase>
+ <goals>
+ <goal>idl-protocol</goal>
+ </goals>
+ <configuration>
+ <sourceDirectory>${project.basedir}/src/test/resources</sourceDirectory>
+ <outputDirectory>${project.build.directory}/generated-test-sources</outputDirectory>
+ <stringType>String</stringType>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <!-- Ensure that the specific classes are available during test compile but not included in jar -->
+ <groupId>org.codehaus.mojo</groupId>
+ <artifactId>build-helper-maven-plugin</artifactId>
+ <version>1.8</version>
+ <executions>
+ <execution>
+ <id>add-test-sources</id>
+ <phase>generate-test-sources</phase>
+ <goals>
+ <goal>add-test-source</goal>
+ </goals>
+ <configuration>
+ <sources>
+ <source>${project.build.directory}/generated-test-sources</source>
+ </sources>
+ </configuration>
+ </execution>
+ </executions>
+ </plugin>
</plugins>
</build>
@@ -22,6 +22,9 @@
import org.apache.avro.generic.GenericArray;
import org.apache.avro.generic.GenericData;
import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
+import org.apache.avro.specific.SpecificData;
+import parquet.Preconditions;
import parquet.io.api.Binary;
import parquet.io.api.Converter;
import parquet.io.api.GroupConverter;
@@ -30,26 +33,28 @@
import parquet.schema.MessageType;
import parquet.schema.Type;
-class AvroGenericRecordConverter extends GroupConverter {
+class AvroIndexedRecordConverter extends GroupConverter {
private final ParentValueContainer parent;
- protected GenericData.Record currentRecord;
+ protected IndexedRecord currentRecord;
private final Converter[] converters;
private final GroupType parquetSchema;
private final Schema avroSchema;
+ private final Class<? extends IndexedRecord> specificClass;
- public AvroGenericRecordConverter(MessageType parquetSchema, Schema avroSchema) {
+ public AvroIndexedRecordConverter(MessageType parquetSchema, Schema avroSchema) {
this(null, parquetSchema, avroSchema);
}
- public AvroGenericRecordConverter(ParentValueContainer parent, GroupType
+ public AvroIndexedRecordConverter(ParentValueContainer parent, GroupType
parquetSchema, Schema avroSchema) {
this.parent = parent;
this.parquetSchema = parquetSchema;
this.avroSchema = avroSchema;
int schemaSize = parquetSchema.getFieldCount();
this.converters = new Converter[schemaSize];
+ this.specificClass = SpecificData.get().getClass(avroSchema);
int index = 0; // parquet ignores Avro nulls, so index may differ
for (int avroIndex = 0; avroIndex < avroSchema.getFields().size(); avroIndex++) {
Schema.Field field = avroSchema.getFields().get(avroIndex);
@@ -62,7 +67,7 @@ public AvroGenericRecordConverter(ParentValueContainer parent, GroupType
converters[index] = newConverter(fieldSchema, type, new ParentValueContainer() {
@Override
void add(Object value) {
- AvroGenericRecordConverter.this.set(finalAvroIndex, value);
+ AvroIndexedRecordConverter.this.set(finalAvroIndex, value);
}
});
index++;
@@ -86,13 +91,15 @@ private static Converter newConverter(Schema schema, Type type,
} else if (schema.getType().equals(Schema.Type.STRING)) {
return new FieldStringConverter(parent);
} else if (schema.getType().equals(Schema.Type.RECORD)) {
- return new AvroGenericRecordConverter(parent, type.asGroupType(), schema);
+ return new AvroIndexedRecordConverter(parent, type.asGroupType(), schema);
} else if (schema.getType().equals(Schema.Type.ENUM)) {
- return new FieldStringConverter(parent);
+ return new FieldEnumConverter(parent,schema);
} else if (schema.getType().equals(Schema.Type.ARRAY)) {
- return new GenericArrayConverter(parent, type, schema);
+ return new AvroArrayConverter(parent, type, schema);
} else if (schema.getType().equals(Schema.Type.MAP)) {
return new MapConverter(parent, type, schema);
+ } else if (schema.getType().equals(Schema.Type.UNION)) {
+ return new AvroUnionConverter(parent, type, schema);
} else if (schema.getType().equals(Schema.Type.FIXED)) {
return new FieldFixedConverter(parent, schema);
}
@@ -111,7 +118,10 @@ public Converter getConverter(int fieldIndex) {
@Override
public void start() {
- this.currentRecord = new GenericData.Record(avroSchema);
+ // Should do the right thing whether it is generic or specific
+ this.currentRecord = (this.specificClass == null) ?
+ new GenericData.Record(avroSchema) :
+ (IndexedRecord)SpecificData.newInstance(specificClass, avroSchema);
}
@Override
@@ -121,7 +131,7 @@ public void end() {
}
}
- GenericRecord getCurrentRecord() {
+ IndexedRecord getCurrentRecord() {
return currentRecord;
}
@@ -239,6 +249,26 @@ final public void addBinary(Binary value) {
}
+ static final class FieldEnumConverter extends PrimitiveConverter {
+
+ private final ParentValueContainer parent;
+ private final Class<? extends Enum> enumClass;
+
+ public FieldEnumConverter(ParentValueContainer parent, Schema enumSchema) {
+ this.parent = parent;
+ this.enumClass = SpecificData.get().getClass(enumSchema);
+ }
+
+ @Override
+ final public void addBinary(Binary value) {
+ Object enumValue = value.toStringUsingUTF8();
+ if (enumClass != null) {
@julienledem

julienledem Jul 8, 2013

Owner

in what case is enumClass null ?
Please add a comment about this

+ enumValue = (Enum.valueOf(enumClass,(String)enumValue));
+ }
+ parent.add(enumValue);
+ }
+ }
+
static final class FieldFixedConverter extends PrimitiveConverter {
private final ParentValueContainer parent;
@@ -256,14 +286,14 @@ final public void addBinary(Binary value) {
}
- static final class GenericArrayConverter<T> extends GroupConverter {
+ static final class AvroArrayConverter<T> extends GroupConverter {
private final ParentValueContainer parent;
private final Schema avroSchema;
private final Converter converter;
private GenericArray<T> array;
- public GenericArrayConverter(ParentValueContainer parent, Type parquetSchema,
+ public AvroArrayConverter(ParentValueContainer parent, Type parquetSchema,
Schema avroSchema) {
this.parent = parent;
this.avroSchema = avroSchema;
@@ -294,6 +324,51 @@ public void end() {
}
}
+ static final class AvroUnionConverter<T> extends GroupConverter {
+
+ private final ParentValueContainer parent;
+ private final Converter[] memberConverters;
+ private Object memberValue = null;
+
+ public AvroUnionConverter(ParentValueContainer parent, Type parquetSchema,
+ Schema avroSchema) {
+ this.parent = parent;
+ GroupType parquetGroup = parquetSchema.asGroupType();
+ this.memberConverters = new Converter[ parquetGroup.getFieldCount()];
+
+ int parquetIndex = 0;
+ for (int index = 0; index < avroSchema.getTypes().size(); index++) {
+ Schema memberSchema = avroSchema.getTypes().get(index);
+ if (!memberSchema.getType().equals(Schema.Type.NULL)) {
+ Type memberType = parquetGroup.getType(parquetIndex);
+ memberConverters[parquetIndex] = newConverter(memberSchema, memberType, new ParentValueContainer() {
+ @Override
+ void add(Object value) {
+ Preconditions.checkArgument(memberValue==null, "Union is resolving to more than one type");
+ memberValue = value;
+ }
+ });
+ parquetIndex++; // Note for nulls the parquetIndex id not increased
+ }
+ }
+ }
+
+ @Override
+ public Converter getConverter(int fieldIndex) {
+ return memberConverters[fieldIndex];
+ }
+
+ @Override
+ public void start() {
+ memberValue = null;
+ }
+
+ @Override
+ public void end() {
+ parent.add(memberValue);
+ }
+ }
+
static final class MapConverter<V> extends GroupConverter {
private final ParentValueContainer parent;
@@ -15,14 +15,14 @@
*/
package parquet.avro;
-import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
import parquet.avro.AvroReadSupport;
import parquet.hadoop.ParquetInputFormat;
/**
* A Hadoop {@link org.apache.hadoop.mapreduce.InputFormat} for Parquet files.
*/
-public class AvroParquetInputFormat extends ParquetInputFormat<GenericRecord> {
+public class AvroParquetInputFormat extends ParquetInputFormat<IndexedRecord> {
public AvroParquetInputFormat() {
super(AvroReadSupport.class);
}
@@ -16,7 +16,7 @@
package parquet.avro;
import org.apache.avro.Schema;
-import org.apache.avro.generic.GenericRecord;
+import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.mapreduce.Job;
import parquet.avro.AvroWriteSupport;
import parquet.hadoop.ParquetOutputFormat;
@@ -25,7 +25,7 @@
/**
* A Hadoop {@link org.apache.hadoop.mapreduce.OutputFormat} for Parquet files.
*/
-public class AvroParquetOutputFormat extends ParquetOutputFormat<GenericRecord> {
+public class AvroParquetOutputFormat extends ParquetOutputFormat<IndexedRecord> {
@dvryaboy

dvryaboy Jun 23, 2013

Contributor

Briefly looking at the Avro APIs (I am not fluent with Avro), this seems to be a change to a less specific interface -- allow indexing by index, and do not require (but still allow, the user would have to cast) indexing by name. Correct?
Is that likely to mess up existing users? Should we provide some convenience method that does the casting for them so they don't have to deal with it? I don't feel strongly one way or another, just want to hear your and other Avro people's thoughts.

@JacobMetcalf

JacobMetcalf Jun 29, 2013

Contributor

Basically Avro Generic provides serailization for dynamic data structures. Avro Specific on the other hand generates typed Java (and other language) objects from the schema at compile time with getters and setters built in SerDe. This is similar to Protobuf - I don't know Thrift that well. IndexedRecord is the parent class of both GenericRecord and SpecificRecord so by moving the support up a level I aim to cover both communities.

AvroParquetReader is a Java generic so I assume Avro Generic users would be able to instantiate a AvroParquetReader and Avro Specific users an AvroParquetReader for example. If you are suggesting I inherit from this to create an AvroGenericParquetReader we could do this for convenience.

Would be happy to adapt to Tom White or anyone else's input on the matter.

@dvryaboy

dvryaboy Jun 30, 2013

Contributor

@tomwhite can you comment?

@JacobMetcalf

JacobMetcalf Jul 1, 2013

Contributor

@tomwhite also note I have introduced support for unions with multiple types. Would appreciate some feedback on this.

@tomwhite

tomwhite Jul 2, 2013

Contributor

Looks good. Can you add a test to TestAvroSchemaConverter for this. I noticed that you removed the test for a failure, but having a positive test would be good.

@JacobMetcalf

JacobMetcalf Jul 6, 2013

Contributor

Good point - added. Note that since there is no underlying representation of unions in parquet I have modelled this as separate "memberN" sub-records. This results in a sparsely populated column set but is about the only option I see.

We use unions heavily to replicate inheritance - so I would draw parallels between what I am doing here and a one table per class hierarchy in an ORM.

@julienledem

julienledem Jul 8, 2013

Owner

@JacobMetcalf Using a group with one field per type in the union (minus null) sounds good to me. Only one field being non-null. Parquet stores nulls efficiently so sparse columns are not a problem.

public static void setSchema(Job job, Schema schema) {
AvroWriteSupport.setSchema(ContextUtil.getConfiguration(job), schema);
@@ -16,16 +16,23 @@
package parquet.avro;
import java.io.IOException;
+
+import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.Path;
+import parquet.filter.UnboundRecordFilter;
import parquet.hadoop.ParquetReader;
import parquet.hadoop.api.ReadSupport;
/**
* Read Avro records from a Parquet file.
*/
-public class AvroParquetReader<T> extends ParquetReader<T> {
+public class AvroParquetReader<T extends IndexedRecord> extends ParquetReader<T> {
public AvroParquetReader(Path file) throws IOException {
super(file, (ReadSupport<T>) new AvroReadSupport());
}
+
+ public AvroParquetReader(Path file, UnboundRecordFilter recordFilter ) throws IOException {
+ super(file, (ReadSupport<T>) new AvroReadSupport(), recordFilter);
+ }
}
@@ -17,6 +17,7 @@
import java.io.IOException;
import org.apache.avro.Schema;
+import org.apache.avro.generic.IndexedRecord;
import org.apache.hadoop.fs.Path;
import parquet.hadoop.ParquetWriter;
import parquet.hadoop.api.WriteSupport;
@@ -25,7 +26,7 @@
/**
* Write Avro records to a Parquet file.
*/
-public class AvroParquetWriter<T> extends ParquetWriter<T> {
+public class AvroParquetWriter<T extends IndexedRecord> extends ParquetWriter<T> {
@dvryaboy

dvryaboy Jun 23, 2013

Contributor

does this limit where this Writer is applicable?

@JacobMetcalf

JacobMetcalf Jun 29, 2013

Contributor

Avro Generic and Specific records extend IndexedRecord. If you look at the original version of AvroWriteSupport its write method was expecting a GenericRecord so I believe this a) expands the range of possibilities b) Is more explicit about what is actually supported.

Would appreciate Tom White's views on this however.

@tomwhite

tomwhite Jul 2, 2013

Contributor

Changing from GenericRecord to IndexedRecord is an improvement - nice work Jacob! +1

It would be nice to support Avro Reflect, but that can be done separately.

/** Create a new {@link AvroParquetWriter}.
*
@@ -39,21 +40,38 @@
public AvroParquetWriter(Path file, Schema avroSchema,
CompressionCodecName compressionCodecName, int blockSize,
int pageSize) throws IOException {
+ super(file, (WriteSupport<T>)new AvroWriteSupport(new AvroSchemaConverter().convert(avroSchema), avroSchema),
+ compressionCodecName, blockSize, pageSize);
+ }
+
+ /** Create a new {@link AvroParquetWriter}.
+ *
+ * @param file The file name to write to.
+ * @param avroSchema The schema to write with.
+ * @param compressionCodecName Compression code to use, or CompressionCodecName.UNCOMPRESSED
+ * @param blockSize HDFS block size
+ * @param pageSize See parquet write up. Blocks are subdivided into pages for alignment and other purposes.
+ * @param enableDictionary Whether to use a dictionary to compress columns.
+ * @throws IOException
+ */
+ public AvroParquetWriter(Path file, Schema avroSchema,
+ CompressionCodecName compressionCodecName, int blockSize,
+ int pageSize, boolean enableDictionary) throws IOException {
super(file, (WriteSupport<T>)
- new AvroWriteSupport(new AvroSchemaConverter().convert(avroSchema), avroSchema),
- compressionCodecName, blockSize, pageSize);
+ new AvroWriteSupport(new AvroSchemaConverter().convert(avroSchema),avroSchema),
+ compressionCodecName, blockSize, pageSize, enableDictionary, false);
}
/** Create a new {@link AvroParquetWriter}. The default block size is 50 MB.The default
* page size is 1 MB. Default compression is no compression. (Inherited from {@link ParquetWriter})
*
- * @param file
- * @param avroSchema
+ * @param file The file name to write to.
+ * @param avroSchema The schema to write with.
* @throws IOException
*/
public AvroParquetWriter(Path file, Schema avroSchema) throws IOException {
this(file, avroSchema, CompressionCodecName.UNCOMPRESSED,
- DEFAULT_BLOCK_SIZE, DEFAULT_PAGE_SIZE);
+ DEFAULT_BLOCK_SIZE, DEFAULT_PAGE_SIZE);
}
}
Oops, something went wrong.