Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with
or
.
Download ZIP

Loading…

Parquet hive implementation #28

Merged
merged 36 commits into from

5 participants

@mickaellcr
Collaborator

HiveSerDe, DeprecatedParquetInputFormat and DeprecatedParquetOutputFormat are necessary to create a Hive table.

What we have done :

  • Read and write primitive & complex type (included string)
  • Give to parquet the requestedSchema needed by the user
  • Can use HiveInputFormat or HiveCombine
  • Unit test (not full coverage)

Todo :

  • increase the test coverage
...n/java/parquet/hive/DeprecatedParquetInputFormat.java
((54 lines not shown))
+import parquet.schema.MessageType;
+import parquet.schema.Type;
+
+/**
+ *
+ * A Parquet InputFormat for Hive (with the deprecated package mapred)
+ *
+ *
+ * @author Mickaël Lacour <m.lacour@criteo.com>
+ * @author Rémy Pecqueur <r.pecqueur@criteo.com>
+ *
+ */
+@SuppressWarnings({ "unchecked", "rawtypes" })
+public class DeprecatedParquetInputFormat extends FileInputFormat<Void, MapWritable> {
+
+ protected ParquetInputFormat<MapWritable> realInput;

just for consistency.. I think we have a 2-space tabbing convention, looks like you guys are using 4. Can you retab to 2, so our editors don't start a re-whitespacing war? This is a setting that should be getting auto-generated for the parquet eclipse/intellij projects by mvn, I think. Maybe we need to fix something in that setup to enforce this.

@mickaellcr Collaborator

Done !

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
...n/java/parquet/hive/DeprecatedParquetInputFormat.java
((52 lines not shown))
+import parquet.hadoop.metadata.ParquetMetadata;
+import parquet.hive.read.MapWritableReadSupport;
+import parquet.schema.MessageType;
+import parquet.schema.Type;
+
+/**
+ *
+ * A Parquet InputFormat for Hive (with the deprecated package mapred)
+ *
+ *
+ * @author Mickaël Lacour <m.lacour@criteo.com>
+ * @author Rémy Pecqueur <r.pecqueur@criteo.com>
+ *
+ */
+@SuppressWarnings({ "unchecked", "rawtypes" })
+public class DeprecatedParquetInputFormat extends FileInputFormat<Void, MapWritable> {

why MapWritable? Is that a hive thing? I thought it doesn't require a map.

@mickaellcr Collaborator

MapWritable is a hadoop thing (org.apache.hadoop.io.MapWritable). Hive need something that implements Writable. Why did we choose a map ? Because (maybe we are wrong) we need to map the name of the fields with its value.

Depending on the split, the parquetSchema can be different. We need to access to the right fields without dealing with an index. With an array, we might not have access to one field, and we don't want to be screwed or to add a fake field inside the array.

Does Hive not have some sort of a Tuple abstraction? That's how we do it with Pig.
Looking at the javadocs, seems like maybe org.apache.hive.service.cli.Row (but I'm by no means a hive expert). The reason I'm harping on this is that a Map is expensive in terms of memory, this can have performance implications on the Hive side.

@mickaellcr Collaborator

I will take a look at it, but it's not a Writable object. Map is expensive, you're right. Once we will be done with hive implementation, we will try to use another structure. And I will ask for advice on the hive developer mailing list.

Thx for your advice !

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
...in/java/parquet/hive/convert/HiveSchemaConverter.java
((72 lines not shown))
+ static private Type convertType(final String name, final TypeInfo typeInfo, final Repetition repetition) {
+ if (typeInfo.getCategory().equals(Category.PRIMITIVE)) {
+ if (typeInfo.equals(TypeInfoFactory.stringTypeInfo)) {
+ return new PrimitiveType(repetition, PrimitiveTypeName.BINARY, name);
+ } else if (typeInfo.equals(TypeInfoFactory.intTypeInfo) || typeInfo.equals(TypeInfoFactory.shortTypeInfo) || typeInfo.equals(TypeInfoFactory.byteTypeInfo)) {
+ return new PrimitiveType(repetition, PrimitiveTypeName.INT32, name);
+ } else if (typeInfo.equals(TypeInfoFactory.longTypeInfo)) {
+ return new PrimitiveType(repetition, PrimitiveTypeName.INT64, name);
+ } else if (typeInfo.equals(TypeInfoFactory.doubleTypeInfo)) {
+ return new PrimitiveType(repetition, PrimitiveTypeName.DOUBLE, name);
+ } else if (typeInfo.equals(TypeInfoFactory.floatTypeInfo)) {
+ return new PrimitiveType(repetition, PrimitiveTypeName.FLOAT, name);
+ } else if (typeInfo.equals(TypeInfoFactory.booleanTypeInfo)) {
+ return new PrimitiveType(repetition, PrimitiveTypeName.BOOLEAN, name);
+ } else {
+ throw new RuntimeException();

can we document what's not supported? I recall Hive having a lot of "primitive" types like Timestamp

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
parquet-hive/pom.xml
((46 lines not shown))
+ <dependency>
+ <groupId>com.twitter</groupId>
+ <artifactId>parquet-hadoop</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-core</artifactId>
+ <version>1.0.0</version>
+ </dependency>
+
+ <dependency>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-serde</artifactId>
+ <version>0.8.0</version>
@julienledem Owner

any reason this is not 0.9.0 or 0.10.0 ?

@mickaellcr Collaborator

Only because my environment use Hive 0.8 but in few days I will use Hive 0.10. Didn't want to bother with this for the first draft.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
...n/java/parquet/hive/DeprecatedParquetInputFormat.java
((48 lines not shown))
+import parquet.hadoop.metadata.ParquetMetadata;
+import parquet.hive.read.MapWritableReadSupport;
+import parquet.schema.MessageType;
+import parquet.schema.Type;
+
+/**
+ *
+ * A Parquet InputFormat for Hive (with the deprecated package mapred)
+ *
+ *
+ * @author Mickaël Lacour <m.lacour@criteo.com>
+ * @author Rémy Pecqueur <r.pecqueur@criteo.com>
+ *
+ */
+@SuppressWarnings({"unchecked", "rawtypes"})
+public class DeprecatedParquetInputFormat extends FileInputFormat<Void, MapWritable> {
@mickaellcr Collaborator

Unfortunately not :/ By default hive set to true the use of CombineHive. So

https://github.com/Parquet/parquet-mr/blob/master/parquet-hadoop/src/main/java/parquet/hadoop/mapred/DeprecatedParquetInputFormat.java#L87

our oldsplit is not necessarily a ParquetInputSplitWrapper but can be a FileSplit. And in this case you need to create a new ParquetInputSplit (and update all the jobconf to know which columns it's looking for).

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
.../java/parquet/hive/DeprecatedParquetOutputFormat.java
((38 lines not shown))
+
+import parquet.hadoop.ParquetOutputFormat;
+import parquet.hive.convert.HiveSchemaConverter;
+import parquet.hive.write.MapWritableWriteSupport;
+
+/**
+ *
+ * A Parquet OutputFormat for Hive (with the deprecated package mapred)
+ *
+ *
+ * @author Mickaël Lacour <m.lacour@criteo.com>
+ * @author Rémy Pecqueur <r.pecqueur@criteo.com>
+ *
+ */
+@SuppressWarnings({"unchecked", "rawtypes"})
+public class DeprecatedParquetOutputFormat extends FileOutputFormat<Void, MapWritable> implements HiveOutputFormat<Void, MapWritable> {
@julienledem Owner

Could you add a class in this package?
https://github.com/Parquet/parquet-mr/tree/master/parquet-hadoop/src/main/java/parquet/hadoop/mapred
That way it could be re-used for the Cascading integration.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
...parquet/hive/convert/ArrayWritableGroupConverter.java
((50 lines not shown))
+ if (groupType.getFieldCount() == 2) {
+ final MapWritableGroupConverter intermediateConverter = new MapWritableGroupConverter(groupType, this, 0);
+ converters = new Converter[groupType.getFieldCount()];
+ converters[0] = getConverterFromDescription(groupType.getType(0), 0, intermediateConverter);
+ converters[1] = getConverterFromDescription(groupType.getType(1), 1, intermediateConverter);
+ } else if (groupType.getFieldCount() == 1) {
+ converters = new Converter[1];
+ converters[0] = getConverterFromDescription(groupType.getType(0), 0, this);
+ } else {
+ throw new RuntimeException("Invalid parquet hive schema: " + groupType);
+ }
+
+ }
+
+ final public ArrayWritable getCurrentArray() {
+ if (arrayWritable == null) {
@julienledem Owner

I the record contains lists of lists you might not want to reuse this object.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
...parquet/hive/convert/ArrayWritableGroupConverter.java
((75 lines not shown))
+ }
+
+ @Override
+ public void start() {
+ currentList.clear();
+ }
+
+ @Override
+ public void end() {
+ parent.set(index, getCurrentArray());
+ }
+
+ @Override
+ protected void set(final int index, final Writable value) {
+ if (index != 0) {
+ throw new RuntimeException("weee" + index);
@julienledem Owner

You can use ParquetDecodingException

@mickaellcr Collaborator

oupsss :) :D

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
...rc/main/java/parquet/hive/convert/ETypeConverter.java
((106 lines not shown))
+ @Override
+ void add(final Object value) {
+ // LOG.info("adding value " + value + " at index " + index);
+ parent.set(index, new BigDecimalWritable((BigDecimal) value));
+ }
+ });
+ }
+ },
+ EBINARY_CONVERTER(Binary.class) {
+
+ @Override
+ Converter getConverter(final Class<?> type, final int index, final HiveGroupConverter parent) {
+ return new FieldBinaryConverter(new ParentValueContainer() {
+ @Override
+ void add(final Object value) {
+ // LOG.info("adding value " + value + " at index " + index);
@julienledem Owner

You can leave those uncommented if you use
If (DEBUG) LOG.debug(....)
see parquet.Log.LEVEL to turn them on.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
...rc/main/java/parquet/hive/convert/ETypeConverter.java
((115 lines not shown))
+
+ @Override
+ Converter getConverter(final Class<?> type, final int index, final HiveGroupConverter parent) {
+ return new FieldBinaryConverter(new ParentValueContainer() {
+ @Override
+ void add(final Object value) {
+ // LOG.info("adding value " + value + " at index " + index);
+ parent.set(index, new BinaryWritable((Binary) value));
+ }
+ });
+ }
+
+ };
+ // private static final Log LOG = Log.getLog(ETypeConverter.class);
+
+ final Class<?> _type;
@julienledem Owner

just type ? also the getter is private so the field too?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
...in/java/parquet/hive/convert/HiveSchemaConverter.java
((40 lines not shown))
+ *
+ *
+ * @author Rémy Pecqueur <r.pecqueur@criteo.com>
+ *
+ */
+public class HiveSchemaConverter {
+ private static final Log LOG = Log.getLog(HiveSchemaConverter.class);
+
+ static public MessageType convert(final List<String> columnNames, final List<TypeInfo> columnTypes) {
+ LOG.info(new MessageType("hive_schema", convertTypes(columnNames, columnTypes)));
+ return new MessageType("hive_schema", convertTypes(columnNames, columnTypes));
+ }
+
+ static private Type[] convertTypes(final List<String> columnNames, final List<TypeInfo> columnTypes) {
+ if (columnNames.size() != columnTypes.size()) {
+ throw new RuntimeException("Mismatched Hive columns and types");
@julienledem Owner

add more info like the schema in error message

@mickaellcr Collaborator

Done

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
...in/java/parquet/hive/convert/HiveSchemaConverter.java
((72 lines not shown))
+ static private Type convertType(final String name, final TypeInfo typeInfo, final Repetition repetition) {
+ if (typeInfo.getCategory().equals(Category.PRIMITIVE)) {
+ if (typeInfo.equals(TypeInfoFactory.stringTypeInfo)) {
+ return new PrimitiveType(repetition, PrimitiveTypeName.BINARY, name);
+ } else if (typeInfo.equals(TypeInfoFactory.intTypeInfo) || typeInfo.equals(TypeInfoFactory.shortTypeInfo) || typeInfo.equals(TypeInfoFactory.byteTypeInfo)) {
+ return new PrimitiveType(repetition, PrimitiveTypeName.INT32, name);
+ } else if (typeInfo.equals(TypeInfoFactory.longTypeInfo)) {
+ return new PrimitiveType(repetition, PrimitiveTypeName.INT64, name);
+ } else if (typeInfo.equals(TypeInfoFactory.doubleTypeInfo)) {
+ return new PrimitiveType(repetition, PrimitiveTypeName.DOUBLE, name);
+ } else if (typeInfo.equals(TypeInfoFactory.floatTypeInfo)) {
+ return new PrimitiveType(repetition, PrimitiveTypeName.FLOAT, name);
+ } else if (typeInfo.equals(TypeInfoFactory.booleanTypeInfo)) {
+ return new PrimitiveType(repetition, PrimitiveTypeName.BOOLEAN, name);
+ } else {
+ throw new RuntimeException();
@julienledem Owner

error message

@mickaellcr Collaborator

Done

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
...ain/java/parquet/hive/convert/HiveGroupConverter.java
((26 lines not shown))
+ // private static final Log LOG = Log.getLog(HiveGroupConverter.class);
+
+ static protected Converter getConverterFromDescription(final Type type, final int index, final HiveGroupConverter parent) {
+ if (type == null) {
+ return null;
+ }
+
+ if (type.isPrimitive()) {
+ return ETypeConverter.getNewConverter(type.asPrimitiveType().getPrimitiveTypeName().javaType, index, parent);
+ } else {
+ if (type.asGroupType().getRepetition() == Repetition.REPEATED) {
+ // LOG.info("getting array converter " + type);
+ return new ArrayWritableGroupConverter(type.asGroupType(), parent, index);
+ } else {
+ // LOG.info("getting map converter " + type);
+ return new MapWritableGroupConverter(type.asGroupType(), parent, index);
@julienledem Owner

This is the case where we deal with STRUCT right?
Is this where you want to use something else than map?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
...a/parquet/hive/convert/MapWritableGroupConverter.java
((56 lines not shown))
+ this.index = index;
+ this.currentMap = new HashMap<Writable, Writable>();
+ converters = new Converter[this.groupType.getFieldCount()];
+
+ int i = 0;
+ for (final Type subtype : this.groupType.getFields()) {
+ converters[i] = getConverterFromDescription(subtype, i, this);
+ ++i;
+ }
+ }
+
+ final public MapWritable getCurrentMap() {
+ if (mapWritable == null) {
+ mapWritable = new MapWritable();
+ }
+ mapWritable.clear();
@julienledem Owner

reusing the MapWritable might be a problem if we have an ARRAY of STRUCTs

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
...in/java/parquet/hive/read/MapWritableReadSupport.java
((39 lines not shown))
+ *
+ */
+public class MapWritableReadSupport extends ReadSupport<MapWritable> {
+ // static final Log LOG = LogFactory.getLog(MapWritableReadSupport.class);
+
+ @Override
+ public parquet.hadoop.api.ReadSupport.ReadContext init(final Configuration configuration, final Map<String, String> keyValueMetaData, final MessageType fileSchema) {
+ final List<String> listColumns = (List<String>) StringUtils.getStringCollection(configuration.get("columns"));
+
+ MessageType requestedSchemaByUser = fileSchema;
+ final List<Integer> indexColumnsWanted = ColumnProjectionUtils.getReadColumnIDs(configuration);
+
+ if (indexColumnsWanted.isEmpty() == false) {
+ final List<Type> typeList = new ArrayList<Type>();
+ for(final Integer idx : indexColumnsWanted) {
+ typeList.add(fileSchema.getType(listColumns.get(idx)));
@julienledem Owner

does Hive support projection push down of nested fields? It looks like we do only first level. (FYI: Pig does only first level)

@mickaellcr Collaborator

Only first level. We could get the name of the field you are looking for, but not the all tree. For example you are looking for struct_field.id_toto, you will only know that you want the column which contains struct_field but not the id only. So we will ask parquet to read the all struct. Very bad.

We found in one of the hive object, that we can access to the name 'id_toto' for example but you don't know where it belongs to. If you have many struct with this beautiful name, you will not be able to know which one is from. ( no tree , just the last name)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
...va/parquet/hive/serde/MapWritableObjectInspector.java
((114 lines not shown))
+ return Category.STRUCT;
+ }
+
+ @Override
+ public String getTypeName() {
+ return typeInfo.getTypeName();
+ }
+
+ @Override
+ public List<? extends StructField> getAllStructFieldRefs() {
+ return fields;
+ }
+
+ @Override
+ public Object getStructFieldData(final Object data, final StructField fieldRef) {
+ if (data != null && data instanceof MapWritable) {
@julienledem Owner

SerDe noob questions:
It seems that the API is expecting the value in return, and can optionally pass the previous object to be reused.
correct?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@julienledem julienledem commented on the diff
...ava/parquet/hive/serde/ParquetHiveArrayInspector.java
((67 lines not shown))
+ if (data == null) {
+ return 0;
+ }
+
+ final Writable subObj = ((MapWritable) data).get(ParquetHiveSerDe.ARRAY);
+
+ if (subObj == null) {
+ return 0;
+ }
+
+ return ((ArrayWritable) subObj).get().length;
+ }
+
+ @Override
+ public List<?> getList(final Object data) {
+ if (data == null) {
@julienledem Owner

what is data?

@mickaellcr Collaborator

Data (in fact ArrayWritable) is returned by the Next() method

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
...rc/main/java/parquet/hive/serde/ParquetHiveSerDe.java
((138 lines not shown))
+ }
+
+ @Override
+ public ObjectInspector getObjectInspector() throws SerDeException {
+ return objInspector;
+ }
+
+ @Override
+ public Class<? extends Writable> getSerializedClass() {
+ return MapWritable.class;
+ }
+
+ @Override
+ public Writable serialize(final Object obj, final ObjectInspector objInspector) throws SerDeException {
+ if (!objInspector.getCategory().equals(Category.STRUCT)) {
+ throw new SerDeException("Can only serialize a struct");
@julienledem Owner

please add given category to error message

@mickaellcr Collaborator

Done

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
...rc/main/java/parquet/hive/serde/ParquetHiveSerDe.java
((258 lines not shown))
+ case BYTE:
+ return new ByteWritable((byte) ((ByteObjectInspector) inspector).get(obj));
+ case DOUBLE:
+ return new DoubleWritable(((DoubleObjectInspector) inspector).get(obj));
+ case FLOAT:
+ return new FloatWritable(((FloatObjectInspector) inspector).get(obj));
+ case INT:
+ return new IntWritable(((IntObjectInspector) inspector).get(obj));
+ case LONG:
+ return new LongWritable(((LongObjectInspector) inspector).get(obj));
+ case SHORT:
+ return new ShortWritable((short) ((ShortObjectInspector) inspector).get(obj));
+ case STRING:
+ return new BinaryWritable(((StringObjectInspector) inspector).getPrimitiveJavaObject(obj));
+ default:
+ throw new SerDeException("Unknown primitive");
@julienledem Owner
  • inspector.getPrimitiveCategory()
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
...rc/main/java/parquet/hive/serde/ParquetHiveSerDe.java
((280 lines not shown))
+ return createStruct(obj, (StructObjectInspector) inspector, null);
+ case LIST:
+ return createArray(obj, (ListObjectInspector) inspector);
+ case MAP:
+ return createMap(obj, (MapObjectInspector) inspector);
+ case PRIMITIVE:
+ return createPrimitive(obj, (PrimitiveObjectInspector) inspector);
+ default:
+ throw new SerDeException("Unknown data type" + inspector.getCategory());
+ }
+ }
+ //
+ @Override
+ public SerDeStats getSerDeStats() {
+ // must be different
+ assert (lastOperationSerialize != lastOperationDeserialize);
@julienledem Owner

you could also have 1 enum field LAST_OPERATION {SERIALIZE, DESERIALIZE}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
...in/java/parquet/hive/writable/BigDecimalWritable.java
((19 lines not shown))
+package parquet.hive.writable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.math.BigDecimal;
+import java.math.BigInteger;
+
+import org.apache.hadoop.hive.serde2.ByteStream.Output;
+import org.apache.hadoop.hive.serde2.lazybinary.LazyBinaryUtils;
+import org.apache.hadoop.hive.serde2.lazybinary.LazyBinaryUtils.VInt;
+import org.apache.hadoop.io.WritableComparable;
+import org.apache.hadoop.io.WritableUtils;
+/**
+ *
+ * From HIVE 0.9 !!!!
@julienledem Owner

So it's not needed anymore?

@mickaellcr Collaborator

My bad ... It's HIVE 0.11 :
https://issues.apache.org/jira/browse/HIVE-2693
or 0.10.0-cdh4.2.0

So I will keep this file for the moment. Not ready for 0.11 yet ( I mean by this : didn't test it, and cloudera is on 0.10 + some patch)

@isnotinvain Owner

Can you update this to say it's from hive 0.11 instead of 0.9?

Do we need to add a header that says something along the lines of "this file is taken from hive, here is their license, here's a link to their website etc.?" I'm not sure what the right way to cite this is but we probably should.

@mickaellcr Collaborator

Yep ! My bad,

I will add something like

/**

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
...c/main/java/parquet/hive/writable/BinaryWritable.java
((14 lines not shown))
+ * limitations under the License.
+ */
+package parquet.hive.writable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Writable;
+
+import parquet.io.api.Binary;
+/**
+ *
+ * A Parquet InputFormat for Hive (with the deprecated package mapred)
@julienledem Owner

Fix description

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
...c/main/java/parquet/hive/writable/BinaryWritable.java
((21 lines not shown))
+import java.util.Arrays;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Writable;
+
+import parquet.io.api.Binary;
+/**
+ *
+ * A Parquet InputFormat for Hive (with the deprecated package mapred)
+ *
+ *
+ * @author Mickaël Lacour <m.lacour@criteo.com>
+ * @author Rémy Pecqueur <r.pecqueur@criteo.com>
+ *
+ */
+public class BinaryWritable implements Writable {
@julienledem Owner

Would BytesWritable work ?

@mickaellcr Collaborator

definitely yes ! For the next version I will remove this class. Right now I keep it and it's a wrapper to BytesWritable with two constructors (one for string, and another one for Binary)

(sorry didn't see this message before ...)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
...c/main/java/parquet/hive/write/MapWritableWriter.java
((70 lines not shown))
+ }
+ recordConsumer.startField(fieldName, field);
+
+
+ if (fieldType.isPrimitive()) {
+
+ writePrimitive(value);
+ } else {
+ recordConsumer.startGroup();
+
+ if (value instanceof MapWritable) {
+ writeMap((MapWritable) value, fieldType.asGroupType());
+ } else if (value instanceof ArrayWritable) {
+ writeArray((ArrayWritable) value, fieldType.asGroupType());
+ } else if (value != null) {
+ throw new RuntimeException("This should be an ArrayWritable or MapWritable: " + value);
@julienledem Owner

You can use ParquetEncodingException

@mickaellcr Collaborator

Done

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
...c/main/java/parquet/hive/write/MapWritableWriter.java
((122 lines not shown))
+ writeMap((MapWritable) subValue, subType.asGroupType());
+ recordConsumer.endGroup();
+ }
+ }
+ }
+ }
+ recordConsumer.endField(subType.getName(), field);
+ }
+
+ }
+
+ private void writePrimitive(final Writable value) {
+ if (value == null) {
+ return;
+ }
+ if (value instanceof DoubleWritable) {
@julienledem Owner

potentially a switch on type would be faster

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
...va/parquet/hive/TestDeprecatedParquetInputFormat.java
((189 lines not shown))
+// System.out.println("obj : " + obj);
+ final MapWritable expected = mapData.get(((IntWritable) obj).get());
+// System.out.println("expected " + expected.entrySet().toString());
+// System.out.println("value " + value.entrySet().toString());
+
+ assertTrue(UtilitiesTestMethods.mapEquals(value, expected));
+ count++;
+ }
+
+ reader.close();
+
+ assertEquals("Number of lines found and data written don't match", count, mapData.size());
+
+ } catch (final Exception e) {
+ System.err.println("caught: " + e);
+ assertFalse(true);
@julienledem Owner

just let the exception propagate. That way they will show up in the report

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
...va/parquet/hive/TestDeprecatedParquetOuputFormat.java
((102 lines not shown))
+// int count = 0;
+// while (reader.next(key, value)) {
+// assertTrue(count < mapData.size());
+// assertTrue(key == null);
+// final Object obj = value.get(new Text("c_custkey"));
+// final MapWritable expected = mapData.get(((IntWritable) obj).get());
+//
+// assertTrue(UtilitiesTestMethods.mapEquals(value, expected));
+// count++;
+// }
+//
+// reader.close();
+
+// assertEquals("Number of lines found and data written don't match", count, mapData.size());
+
+ } catch (final Exception e) {
@julienledem Owner

just have the test method "throws Exception"

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
...hive/src/test/java/parquet/hive/TestParquetSerDe.java
((55 lines not shown))
+ // Data
+ final MapWritable map = new MapWritable();
+
+ map.put(new Text("abyte"), new ByteWritable((byte) 123));
+ map.put(new Text("ashort"), new ShortWritable((short) 456));
+ map.put(new Text("aint"), new IntWritable(789));
+ map.put(new Text("along"), new LongWritable(1000l));
+ map.put(new Text("adouble"), new DoubleWritable((double) 5.3));
+ map.put(new Text("astring"), new BinaryWritable("hive and hadoop and parquet. Big family."));
+
+ // Test
+ deserializeAndSerializeLazySimple(serDe, map);
+ System.out.println("test: testParquetHiveSerDe - OK");
+
+ } catch (final Throwable e) {
+ e.printStackTrace();
@julienledem Owner

the exception will show in the test report

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@julienledem
Owner

This patch is getting huge! Thanks for the contribution guys.

...n/java/parquet/hive/DeprecatedParquetInputFormat.java
((69 lines not shown))
+public class DeprecatedParquetInputFormat extends FileInputFormat<Void, ArrayWritable> {
+
+ private static final Log LOG = Log.getLog(DeprecatedParquetInputFormat.class);
+ protected ParquetInputFormat<ArrayWritable> realInput;
+
+ public DeprecatedParquetInputFormat() {
+ this.realInput = new ParquetInputFormat<ArrayWritable>(DataWritableReadSupport.class);
+ }
+
+ public DeprecatedParquetInputFormat(final InputFormat<Void, ArrayWritable> realInputFormat) {
+ this.realInput = (ParquetInputFormat<ArrayWritable>) realInputFormat;
+ }
+
+ @Override
+ protected boolean isSplitable(final FileSystem fs, final Path filename) {
+ return false;
@julienledem Owner

does that have an impact?

@mickaellcr Collaborator

We removed it (it was always true in fact). Used to split a block if necessary but never called because we override the getSplits() methods.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
...n/java/parquet/hive/DeprecatedParquetInputFormat.java
((94 lines not shown))
+ final Path tmpPath = new Path((dirs[dirs.length - 1]).makeQualified(FileSystem.get(job)).toUri().getPath());
+ final JobConf cloneJobConf = manageJob.cloneJobAndInit(job, tmpPath);
+ final List<org.apache.hadoop.mapreduce.InputSplit> splits = realInput.getSplits(new JobContext(cloneJobConf, null));
+
+ if (splits == null) {
+ return null;
+ }
+
+ final InputSplit[] resultSplits = new InputSplit[splits.size()];
+ int i = 0;
+
+ for (final org.apache.hadoop.mapreduce.InputSplit split : splits) {
+ try {
+ resultSplits[i++] = new InputSplitWrapper((ParquetInputSplit) split);
+ } catch (final InterruptedException e) {
+ return null;
@julienledem Owner

throw an exception here?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
...n/java/parquet/hive/DeprecatedParquetInputFormat.java
((108 lines not shown))
+ } catch (final InterruptedException e) {
+ return null;
+ }
+ }
+
+ return resultSplits;
+ }
+
+ @Override
+ public org.apache.hadoop.mapred.RecordReader<Void, ArrayWritable> getRecordReader(final org.apache.hadoop.mapred.InputSplit split,
+ final org.apache.hadoop.mapred.JobConf job, final org.apache.hadoop.mapred.Reporter reporter) throws IOException {
+ try {
+ return (RecordReader<Void, ArrayWritable>) new RecordReaderWrapper(realInput, split, job, reporter);
+ } catch (final InterruptedException e) {
+ e.printStackTrace();
+ return null;
@julienledem Owner

same here?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
...n/java/parquet/hive/DeprecatedParquetInputFormat.java
((135 lines not shown))
+ // MapReduce instantiates this.
+ public InputSplitWrapper() {
+ super((Path) null, 0, 0, (String[]) null);
+ }
+
+ public InputSplitWrapper(final ParquetInputSplit realSplit) throws IOException, InterruptedException {
+ super(realSplit.getPath(), realSplit.getStart(), realSplit.getLength(), realSplit.getLocations());
+ this.realSplit = realSplit;
+ }
+
+ @Override
+ public long getLength() {
+ try {
+ return realSplit.getLength();
+ } catch (final InterruptedException e) {
+ return 0;
@julienledem Owner

exception?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
...n/java/parquet/hive/DeprecatedParquetInputFormat.java
((137 lines not shown))
+ super((Path) null, 0, 0, (String[]) null);
+ }
+
+ public InputSplitWrapper(final ParquetInputSplit realSplit) throws IOException, InterruptedException {
+ super(realSplit.getPath(), realSplit.getStart(), realSplit.getLength(), realSplit.getLocations());
+ this.realSplit = realSplit;
+ }
+
+ @Override
+ public long getLength() {
+ try {
+ return realSplit.getLength();
+ } catch (final InterruptedException e) {
+ return 0;
+ } catch (final IOException e) {
+ return 0;
@julienledem Owner

exception?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
...n/java/parquet/hive/DeprecatedParquetInputFormat.java
((249 lines not shown))
+ final long firstDataPage = block.getColumns().get(0).getFirstDataPageOffset();
+ if (firstDataPage >= hdfsBlock.getOffset() && firstDataPage < hdfsBlock.getOffset() + hdfsBlock.getLength()) {
+ splitGroup.add(block);
+ }
+ }
+
+ if (splitGroup.size() == 0) {
+ LOG.warn("Skipping split, could not find row group in: " + (FileSplit) oldSplit);
+ split = null;
+ } else {
+ split = new ParquetInputSplit(finalPath, hdfsBlock.getOffset(), hdfsBlock.getLength(), hdfsBlock.getHosts(), splitGroup, fileMetaData.getSchema().toString(),
+ requestedSchemaByUser.toString(), fileMetaData.getKeyValueMetaData());
+ }
+
+ } else {
+ throw new RuntimeException("Unknown split type");
@julienledem Owner

"Unknown split type " + oldSplit);

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
.../parquet/hive/serde/ArrayWritableObjectInspector.java
((188 lines not shown))
+
+ JavaStringBinaryObjectInspector() {
+ super(PrimitiveObjectInspectorUtils.stringTypeEntry);
+ }
+
+ @Override
+ public Text getPrimitiveWritableObject(final Object o) {
+ return o == null ? null : new Text(((BinaryWritable) o).getBytes());
+ }
+
+ @Override
+ public String getPrimitiveJavaObject(final Object o) {
+ try {
+ return new String(((BinaryWritable) o).getBytes(), "UTF-8");
+ } catch (final UnsupportedEncodingException e) {
+ return null;
@julienledem Owner

throw exception

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@julienledem julienledem referenced this pull request
Closed

Hive SerDe #12

@mickaellcr
Collaborator

Done for all the exception

...n/java/parquet/column/impl/ColumnReaderEmptyImpl.java
((21 lines not shown))
+import parquet.column.ColumnDescriptor;
+import parquet.column.ColumnReader;
+import parquet.column.Dictionary;
+import parquet.column.page.DictionaryPage;
+import parquet.column.page.Page;
+import parquet.column.page.PageReader;
+import parquet.column.values.ValuesReader;
+import parquet.column.values.ValuesType;
+import parquet.io.ParquetDecodingException;
+import parquet.io.api.Binary;
+import parquet.io.api.PrimitiveConverter;
+import parquet.schema.PrimitiveType.PrimitiveTypeName;
+import parquet.schema.PrimitiveType.PrimitiveTypeNameConverter;
+
+/**
+ * ColumnReader implementation
@isnotinvain Owner

"ColumnReader implementation that does..."

Why does an empty column have to keep track of readValues?
Is the a column that returns N 0s?

@mickaellcr Collaborator

Yes it's to return N 0s

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
.../src/main/java/parquet/column/page/PageReadStore.java
@@ -36,8 +36,14 @@
/**
*
+ * @param descriptor the descriptor of the column
+ * @return true if the column exist in the schema
+ */
+ boolean isColumnExist(ColumnDescriptor descriptor);
@isnotinvain Owner

being picky here, but consider hasColumn or columnExists

@mickaellcr Collaborator

hasColumn ! Thx :)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@isnotinvain isnotinvain commented on the diff
parquet-hive/pom.xml
((30 lines not shown))
+ <repositories>
+ <repository>
+ <id>repository.cloudera.com</id>
+ <name>repository.cloudera.com-releases</name>
+ <url>https://repository.cloudera.com/artifactory/datanucleus</url>
+ </repository>
+ </repositories>
+
+ <properties>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>commons-lang</groupId>
+ <artifactId>commons-lang</artifactId>
+ <version>2.4</version>
@isnotinvain Owner

put the version in the parent pom?

I'm not to familiar with maven but I've seen it done that way before and it seems easier to keep track of that way.

@mickaellcr Collaborator

That's a good question. For me in the parent pom, I write everything needed by the children and that's it (common share). If I need something specific for a sub module, I will write it on its own pom.xml

I don't know if it's good or not, need advice on this.

And about the repository section, I added it to fix the travis error, waiting for a better solution if someone got one.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
parquet-hive/pom.xml
((50 lines not shown))
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>com.twitter</groupId>
+ <artifactId>parquet-hadoop</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-core</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-serde</artifactId>
+ <version>0.10.0</version>
@isnotinvain Owner

move version to parent pom?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
parquet-hive/pom.xml
((55 lines not shown))
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hadoop</groupId>
+ <artifactId>hadoop-core</artifactId>
+ <version>${hadoop.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-serde</artifactId>
+ <version>0.10.0</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.hive</groupId>
+ <artifactId>hive-exec</artifactId>
+ <version>0.10.0</version>
@isnotinvain Owner

move version to parent pom?

@mickaellcr Collaborator

hive is specific to parquet-hive, so I feel like keeping it to this pom.xml

@julienledem Owner

when you have several dependencies that need to be in sync you can add a properties section in your pom.
here for hive-serde and hive-exec you would have a hive-version properry to simplify updating the version

@mickaellcr Collaborator

yeap !

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@isnotinvain isnotinvain commented on the diff
...n/java/parquet/hive/DeprecatedParquetInputFormat.java
((44 lines not shown))
+import parquet.hadoop.metadata.FileMetaData;
+import parquet.hadoop.metadata.ParquetMetadata;
+import parquet.hadoop.util.ContextUtil;
+import parquet.hive.read.DataWritableReadSupport;
+import parquet.schema.MessageTypeParser;
+
+/**
+ *
+ * A Parquet InputFormat for Hive (with the deprecated package mapred)
+ *
+ *
+ * @author Mickaël Lacour <m.lacour@criteo.com>
+ * @author Rémy Pecqueur <r.pecqueur@criteo.com>
+ *
+ */
+public class DeprecatedParquetInputFormat extends FileInputFormat<Void, ArrayWritable> {
@isnotinvain Owner

I'm not sure if this does what you need, but elephant-bird has wrappers for converting mapred input formats to mapreduce input formats:
https://github.com/kevinweil/elephant-bird/blob/master/core/src/main/java/com/twitter/elephantbird/mapred/input/DeprecatedInputFormatWrapper.java

That one assumes the object is reused, which is not true in parquet's case. I did create a version of it for parquet / cascading integration, that does support creation of a new object every time. Could potentially eliminate a lot of code here.

@isnotinvain Owner

Even if we don't do it in this iteration, a TODO would be nice. We can even move these wrappers to their own EB module to make it a very light dependency.

@mickaellcr Collaborator

I will add a TODO here, but we have some issues here that don't allow us to create simple wrapper. The big issue is due to the SPLIT. Hive creates new Split from the original ParquetInputSplit, but we lose the intelligence. You cannot say : a ParquetInputSplit is just a InputSplit with a length, a path, a location and a start. It's more than that.

@julienledem Owner

Yes please add documentation in the code regarding this. We want to allow refactoring of this.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@isnotinvain isnotinvain commented on the diff
.../java/parquet/hive/DeprecatedParquetOutputFormat.java
((37 lines not shown))
+import parquet.hadoop.ParquetOutputFormat;
+import parquet.hadoop.util.ContextUtil;
+import parquet.hive.convert.HiveSchemaConverter;
+import parquet.hive.write.DataWritableWriteSupport;
+
+/**
+ *
+ * A Parquet OutputFormat for Hive (with the deprecated package mapred)
+ *
+ *
+ * @author Mickaël Lacour <m.lacour@criteo.com>
+ * @author Rémy Pecqueur <r.pecqueur@criteo.com>
+ *
+ */
+@SuppressWarnings({"unchecked", "rawtypes"})
+public class DeprecatedParquetOutputFormat extends FileOutputFormat<Void, ArrayWritable> implements HiveOutputFormat<Void, ArrayWritable> {
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
...-hive/src/main/java/parquet/hive/ManageJobConfig.java
((27 lines not shown))
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.hive.ql.plan.PartitionDesc;
+import org.apache.hadoop.hive.ql.plan.TableScanDesc;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ *
+ * ManageJobConfig (from Hive file, need to be deleted after) quick workaround to init the job with column needed
@isnotinvain Owner

What do you mean "from hive file?"

@mickaellcr Collaborator

Changed the comment for this one :

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
...c/main/java/parquet/hive/writable/BinaryWritable.java
((15 lines not shown))
+ */
+package parquet.hive.writable;
+
+import java.io.DataInput;
+import java.io.DataOutput;
+import java.io.IOException;
+import java.util.Arrays;
+
+import org.apache.hadoop.io.BytesWritable;
+import org.apache.hadoop.io.Writable;
+
+import parquet.io.api.Binary;
+
+/**
+ *
+ * A Parquet InputFormat for Hive (with the deprecated package mapred)
@isnotinvain Owner

Not true :)

@mickaellcr Collaborator

:D

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
...java/parquet/hive/write/DataWritableWriteSupport.java
((12 lines not shown))
+package parquet.hive.write;
+
+import java.util.HashMap;
+
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.io.ArrayWritable;
+
+import parquet.hadoop.api.WriteSupport;
+import parquet.io.api.RecordConsumer;
+import parquet.schema.MessageType;
+import parquet.schema.MessageTypeParser;
+
+/**
+ *
+ * A MapWritableWriteSupport
+ * TODO
@isnotinvain Owner

TODO what?

@mickaellcr Collaborator

Javadoc, un oublie

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
.../main/java/parquet/hive/write/DataWritableWriter.java
((18 lines not shown))
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+import org.apache.hadoop.io.Writable;
+import parquet.hive.writable.BigDecimalWritable;
+
+import parquet.hive.writable.BinaryWritable;
+import parquet.io.ParquetEncodingException;
+import parquet.io.api.Binary;
+import parquet.io.api.RecordConsumer;
+import parquet.schema.GroupType;
+import parquet.schema.Type;
+
+/**
+ *
+ * A MapWritableWriter TODO
@isnotinvain Owner

TODO what?

@mickaellcr Collaborator

Javadoc, un oublie

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
...n/java/parquet/column/impl/ColumnReaderEmptyImpl.java
((23 lines not shown))
+import parquet.column.Dictionary;
+import parquet.column.page.DictionaryPage;
+import parquet.column.page.Page;
+import parquet.column.page.PageReader;
+import parquet.column.values.ValuesReader;
+import parquet.column.values.ValuesType;
+import parquet.io.ParquetDecodingException;
+import parquet.io.api.Binary;
+import parquet.io.api.PrimitiveConverter;
+import parquet.schema.PrimitiveType.PrimitiveTypeName;
+import parquet.schema.PrimitiveType.PrimitiveTypeNameConverter;
+
+/**
+ * ColumnReader implementation
+ *
+ * @author Julien Le Dem
@julienledem Owner

you should update the javadoc.

This column reader generate default values for new columns to support schema evolution.
For a new optional or repeated field it will always return null.

I now realize that this will work only for flat schemas.
We can keep it for now and add a TODO to make it work for all schemas.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
...-column/src/main/java/parquet/io/MessageColumnIO.java
@@ -54,12 +54,16 @@
}
public <T> RecordReader<T> getRecordReader(PageReadStore columns, RecordMaterializer<T> recordMaterializer) {
- return new RecordReaderImplementation<T>(
- this,
- recordMaterializer,
- validating,
- new ColumnReadStoreImpl(columns, recordMaterializer.getRootConverter(), getType())
- );
+ if (leaves.size() > 0) {
+ return new RecordReaderImplementation<T>(
+ this,
+ recordMaterializer,
+ validating,
+ new ColumnReadStoreImpl(columns, recordMaterializer.getRootConverter(), getType()));
+ } else {
+ return new RecordReaderEmpty<T>(recordMaterializer);
@julienledem Owner

EmptyRecordReader

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
...n/java/parquet/hive/DeprecatedParquetInputFormat.java
((310 lines not shown))
+
+ private ParquetInputSplit getSplit(final InputSplit oldSplit, final JobConf conf) throws IOException {
+ ParquetInputSplit split;
+
+ if (oldSplit instanceof InputSplitWrapper) {
+ split = ((InputSplitWrapper) oldSplit).getRealSplit();
+ } else if (oldSplit instanceof FileSplit) {
+ final Path finalPath = ((FileSplit) oldSplit).getPath();
+ final JobConf cloneJob = manageJob.cloneJobAndInit(conf, finalPath.getParent());
+
+ final FileSystem fs = FileSystem.get(cloneJob);
+ final FileStatus status = fs.getFileStatus(finalPath);
+
+ final BlockLocation[] hdfsBlocks = fs.getFileBlockLocations(status, ((FileSplit) oldSplit).getStart(), oldSplit.getLength());
+ if (hdfsBlocks.length != 1) {
+ throw new RuntimeException("Should have exactly 1 HDFS block per split, got: " + hdfsBlocks.length);
@julienledem Owner

I don't understand why this is true

@Lordshinjo Collaborator

I actually found a better way to recreate the split without the need to access block level information.
(everything we need is actually in the Filesplit)

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
...n/java/parquet/hive/DeprecatedParquetInputFormat.java
((296 lines not shown))
+ if (arrValue.length != arrCurrent.length) {
+ throw new IOException("DeprecatedParquetHiveInput : size of object differs. Value size : " + arrValue.length + ", Current Object size : "
+ + arrCurrent.length);
+ } else {
+ throw new IOException("DeprecatedParquetHiveInput can not support RecordReaders that don't return same key & value & value is null");
+ }
+ }
+ }
+ return true;
+
+ } catch (final InterruptedException e) {
+ throw new IOException(e);
+ }
+ }
+
+ private ParquetInputSplit getSplit(final InputSplit oldSplit, final JobConf conf) throws IOException {
@julienledem Owner

Please document what this method is doing and why.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@julienledem julienledem commented on the diff
.../java/parquet/hive/DeprecatedParquetOutputFormat.java
((80 lines not shown))
+ List<String> columnNames;
+ List<TypeInfo> columnTypes;
+
+ if (columnNameProperty.length() == 0) {
+ columnNames = new ArrayList<String>();
+ } else {
+ columnNames = Arrays.asList(columnNameProperty.split(","));
+ }
+
+ if (columnTypeProperty.length() == 0) {
+ columnTypes = new ArrayList<TypeInfo>();
+ } else {
+ columnTypes = TypeInfoUtils.getTypeInfosFromTypeString(columnTypeProperty);
+ }
+
+ DataWritableWriteSupport.setSchema(HiveSchemaConverter.convert(columnNames, columnTypes), jc);
@julienledem Owner

Please document why we need this

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@julienledem julienledem commented on the diff
.../java/parquet/hive/DeprecatedParquetOutputFormat.java
((96 lines not shown))
+ return new RecordWriterWrapper(realOutputFormat, jc, finalOutPath.toString(), progress);
+ }
+
+ private static class RecordWriterWrapper implements RecordWriter<Void, ArrayWritable>, org.apache.hadoop.hive.ql.exec.FileSinkOperator.RecordWriter {
+
+ private final org.apache.hadoop.mapreduce.RecordWriter<Void, ArrayWritable> realWriter;
+ private TaskAttemptContext taskContext;
+
+ RecordWriterWrapper(final OutputFormat<Void, ArrayWritable> realOutputFormat, final JobConf jobConf, final String name, final Progressable progress) throws IOException {
+ try {
+ // create a TaskInputOutputContext
+ TaskAttemptID taskAttemptID = TaskAttemptID.forName(jobConf.get("mapred.task.id"));
+ if (taskAttemptID == null) {
+ taskAttemptID = new TaskAttemptID();
+ }
+ taskContext = ContextUtil.newTaskAttemptContext(jobConf, taskAttemptID);
@julienledem Owner

Why is the TaskAttemptID allowed to be created if missing? That would probably lead to inconsistencies if it is different from the actual attempt id.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@julienledem julienledem commented on the diff
...-hive/src/main/java/parquet/hive/ManageJobConfig.java
((34 lines not shown))
+import org.apache.hadoop.hive.ql.plan.PartitionDesc;
+import org.apache.hadoop.hive.ql.plan.TableScanDesc;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ *
+ * ManageJobConfig quick workaround to initialize the job with column needed
+ *
+ *
+ *
+ * @author Mickaël Lacour <m.lacour@criteo.com>
+ *
+ */
+public class ManageJobConfig {
@julienledem Owner

please add javadoc for those methods

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@julienledem julienledem commented on the diff
...-hive/src/main/java/parquet/hive/ManageJobConfig.java
((27 lines not shown))
+import org.apache.hadoop.fs.Path;
+import org.apache.hadoop.hive.conf.HiveConf;
+import org.apache.hadoop.hive.ql.exec.Operator;
+import org.apache.hadoop.hive.ql.exec.TableScanOperator;
+import org.apache.hadoop.hive.ql.exec.Utilities;
+import org.apache.hadoop.hive.ql.plan.ExprNodeDesc;
+import org.apache.hadoop.hive.ql.plan.MapredWork;
+import org.apache.hadoop.hive.ql.plan.PartitionDesc;
+import org.apache.hadoop.hive.ql.plan.TableScanDesc;
+import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
+import org.apache.hadoop.mapred.JobConf;
+import org.apache.hadoop.util.StringUtils;
+
+/**
+ *
+ * ManageJobConfig quick workaround to initialize the job with column needed
@julienledem Owner

So this is the implementation of projection push down? Please add details on how it works

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
...rc/main/java/parquet/hive/convert/ETypeConverter.java
((19 lines not shown))
+
+import org.apache.hadoop.hive.serde2.io.DoubleWritable;
+import org.apache.hadoop.io.BooleanWritable;
+import org.apache.hadoop.io.FloatWritable;
+import org.apache.hadoop.io.IntWritable;
+import org.apache.hadoop.io.LongWritable;
+
+import parquet.hive.writable.BigDecimalWritable;
+import parquet.hive.writable.BinaryWritable;
+import parquet.io.api.Binary;
+import parquet.io.api.Converter;
+import parquet.io.api.PrimitiveConverter;
+
+/**
+ *
+ * TODO : doc + see classes below (duplicated code from julien)
@julienledem Owner

javadoc

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
...rc/main/java/parquet/hive/convert/ETypeConverter.java
((132 lines not shown))
+ abstract Converter getConverter(final Class<?> type, final int index, final HiveGroupConverter parent);
+
+ static public Converter getNewConverter(final Class<?> type, final int index, final HiveGroupConverter parent) {
+ for (final ETypeConverter eConverter : values()) {
+ if (eConverter.getType() == type) {
+ return eConverter.getConverter(type, index, parent);
+ }
+ }
+ throw new RuntimeException("Converter not found ... for type : " + type);
+ }
+
+ // TODO : Duplicate code with Julien, need to take a look after it works
+ /**
+ * handle string values
+ *
+ * @author Julien Le Dem
@julienledem Owner

javadoc

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
...rc/main/java/parquet/hive/convert/ETypeConverter.java
((153 lines not shown))
+
+ public FieldBinaryConverter(final ParentValueContainer parent) {
+ this.parent = parent;
+ }
+
+ @Override
+ final public void addBinary(final Binary value) {
+ parent.add(value);
+ }
+
+ }
+
+ /**
+ * Handles doubles
+ *
+ * @author Julien Le Dem
@julienledem Owner

javadoc

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
...rc/main/java/parquet/hive/convert/ETypeConverter.java
((230 lines not shown))
+ final class FieldBooleanConverter extends PrimitiveConverter {
+
+ private final ParentValueContainer parent;
+
+ public FieldBooleanConverter(final ParentValueContainer parent) {
+ this.parent = parent;
+ }
+
+ @Override
+ public void addBoolean(final boolean value) {
+ parent.add(value);
+ }
+
+ }
+
+ // TODO NOT IMPLEMENTED YET !!!
@julienledem Owner

todo ?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
...rc/main/java/parquet/hive/convert/ETypeConverter.java
((249 lines not shown))
+
+ public FieldBigDecimalConverter(final ParentValueContainer parent) {
+ this.parent = parent;
+ }
+
+ @Override
+ public void addLong(final long value) {
+ parent.add(value);
+ }
+
+ }
+
+ /**
+ * for converters to add their current value to their parent
+ *
+ * @author Julien Le Dem
@julienledem Owner

javadoc

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
...in/java/parquet/hive/convert/HiveSchemaConverter.java
((38 lines not shown))
+/**
+ *
+ * A HiveSchemaConverter
+ *
+ *
+ * @author Rémy Pecqueur <r.pecqueur@criteo.com>
+ *
+ */
+public class HiveSchemaConverter {
+
+ private static final Log LOG = Log.getLog(HiveSchemaConverter.class);
+
+ static public MessageType convert(final List<String> columnNames, final List<TypeInfo> columnTypes) {
+ final MessageType schema = new MessageType("hive_schema", convertTypes(columnNames, columnTypes));
+ if (Log.INFO) {
+ LOG.info(schema);
@julienledem Owner

debug?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
...in/java/parquet/hive/convert/HiveSchemaConverter.java
((77 lines not shown))
+ static private Type convertType(final String name, final TypeInfo typeInfo, final Repetition repetition) {
+ if (typeInfo.getCategory().equals(Category.PRIMITIVE)) {
+ if (typeInfo.equals(TypeInfoFactory.stringTypeInfo)) {
+ return new PrimitiveType(repetition, PrimitiveTypeName.BINARY, name);
+ } else if (typeInfo.equals(TypeInfoFactory.intTypeInfo) || typeInfo.equals(TypeInfoFactory.shortTypeInfo) || typeInfo.equals(TypeInfoFactory.byteTypeInfo)) {
+ return new PrimitiveType(repetition, PrimitiveTypeName.INT32, name);
+ } else if (typeInfo.equals(TypeInfoFactory.longTypeInfo)) {
+ return new PrimitiveType(repetition, PrimitiveTypeName.INT64, name);
+ } else if (typeInfo.equals(TypeInfoFactory.doubleTypeInfo)) {
+ return new PrimitiveType(repetition, PrimitiveTypeName.DOUBLE, name);
+ } else if (typeInfo.equals(TypeInfoFactory.floatTypeInfo)) {
+ return new PrimitiveType(repetition, PrimitiveTypeName.FLOAT, name);
+ } else if (typeInfo.equals(TypeInfoFactory.booleanTypeInfo)) {
+ return new PrimitiveType(repetition, PrimitiveTypeName.BOOLEAN, name);
+ } else if (typeInfo.equals(TypeInfoFactory.binaryTypeInfo)) {
+ throw new NotImplementedException("Binary type not implemented");
@julienledem Owner

what's the binary type ?
Is it a byte array or one bit ?
could we map it?

@mickaellcr Collaborator

Binary is a Byte Array. I will add a todo to implement it.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@julienledem julienledem commented on the diff
...in/java/parquet/hive/convert/HiveSchemaConverter.java
((81 lines not shown))
+ } else if (typeInfo.equals(TypeInfoFactory.intTypeInfo) || typeInfo.equals(TypeInfoFactory.shortTypeInfo) || typeInfo.equals(TypeInfoFactory.byteTypeInfo)) {
+ return new PrimitiveType(repetition, PrimitiveTypeName.INT32, name);
+ } else if (typeInfo.equals(TypeInfoFactory.longTypeInfo)) {
+ return new PrimitiveType(repetition, PrimitiveTypeName.INT64, name);
+ } else if (typeInfo.equals(TypeInfoFactory.doubleTypeInfo)) {
+ return new PrimitiveType(repetition, PrimitiveTypeName.DOUBLE, name);
+ } else if (typeInfo.equals(TypeInfoFactory.floatTypeInfo)) {
+ return new PrimitiveType(repetition, PrimitiveTypeName.FLOAT, name);
+ } else if (typeInfo.equals(TypeInfoFactory.booleanTypeInfo)) {
+ return new PrimitiveType(repetition, PrimitiveTypeName.BOOLEAN, name);
+ } else if (typeInfo.equals(TypeInfoFactory.binaryTypeInfo)) {
+ throw new NotImplementedException("Binary type not implemented");
+ } else if (typeInfo.equals(TypeInfoFactory.timestampTypeInfo)) {
+ throw new NotImplementedException("Timestamp type not implemented");
+ } else if (typeInfo.equals(TypeInfoFactory.voidTypeInfo)) {
+ throw new NotImplementedException("Void type not implemented");
@julienledem Owner

FYI: the AVRO integration deals with void types.
We can skip it for now

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@julienledem julienledem commented on the diff
...in/java/parquet/hive/convert/HiveSchemaConverter.java
((94 lines not shown))
+ throw new NotImplementedException("Timestamp type not implemented");
+ } else if (typeInfo.equals(TypeInfoFactory.voidTypeInfo)) {
+ throw new NotImplementedException("Void type not implemented");
+ } else if (typeInfo.equals(TypeInfoFactory.unknownTypeInfo)) {
+ throw new NotImplementedException("Unknown type not implemented");
+ } else {
+ throw new RuntimeException("Unknown type: " + typeInfo);
+ }
+ } else if (typeInfo.getCategory().equals(Category.LIST)) {
+ return convertArrayType(name, (ListTypeInfo) typeInfo);
+ } else if (typeInfo.getCategory().equals(Category.STRUCT)) {
+ return convertStructType(name, (StructTypeInfo) typeInfo);
+ } else if (typeInfo.getCategory().equals(Category.MAP)) {
+ return convertMapType(name, (MapTypeInfo) typeInfo);
+ } else if (typeInfo.getCategory().equals(Category.UNION)) {
+ throw new NotImplementedException("Union type not implemented");
@julienledem Owner

Would a union be mapped to a group with one field per type?
Not needed right away.

@Lordshinjo Collaborator

Yes a union would be the same as a struct, with only one of the subfields filled.

@julienledem Owner

FYI, this pull request (#68) implements union mapping for Avro.
this can be in a future contribution if you want.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@julienledem julienledem commented on the diff
...n/java/parquet/hive/read/DataWritableReadSupport.java
((51 lines not shown))
+ * @param configuration needed to get the wanted columns
+ * @param keyValueMetaData // unused
+ * @param fileSchema parquet file schema
+ * @return the parquet ReadContext
+ */
+ @Override
+ public parquet.hadoop.api.ReadSupport.ReadContext init(final Configuration configuration, final Map<String, String> keyValueMetaData, final MessageType fileSchema) {
+ final String columns = configuration.get("columns");
+ final List<String> listColumns = ManageJobConfig.getColumns(columns);
+ final Map<String, String> contextMetadata = new HashMap<String, String>();
+
+ final List<Type> typeListTable = new ArrayList<Type>();
+ for (final String col : listColumns) {
+ if (fileSchema.containsField(col)) {
+ typeListTable.add(fileSchema.getType(col));
+ } else { // dummy type, should not be called
@julienledem Owner

should we throw an exception?

@Lordshinjo Collaborator

The dummy type is needed to make sure we have the correct schema size.
"should not be called" refers to the fact that we should get an EmptyRecordReader for this field.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
...va/parquet/hive/TestDeprecatedParquetInputFormat.java
((98 lines not shown))
+
+ public void testParquetHiveInputFormatWithSpecificSchemaRandomColumn() throws Exception {
+ final String schemaRequested = "message customer {\n"
+ + " optional int32 c_custkey;\n"
+ + " optional binary c_mktsegment;\n"
+ + "}";
+ readParquetHiveInputFormat(schemaRequested, new Integer[] {0, 6});
+ }
+
+ public void testParquetHiveInputFormatWithSpecificSchemaFirstColumn() throws Exception {
+ final String schemaRequested = "message customer {\n"
+ + " optional int32 c_custkey;\n"
+ + "}";
+ readParquetHiveInputFormat(schemaRequested, new Integer[] {0});
+ }
+// Need to improve the checking because we need the key for the map. // TODO
@julienledem Owner

todo?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
...va/parquet/hive/TestDeprecatedParquetInputFormat.java
((106 lines not shown))
+
+ public void testParquetHiveInputFormatWithSpecificSchemaFirstColumn() throws Exception {
+ final String schemaRequested = "message customer {\n"
+ + " optional int32 c_custkey;\n"
+ + "}";
+ readParquetHiveInputFormat(schemaRequested, new Integer[] {0});
+ }
+// Need to improve the checking because we need the key for the map. // TODO
+// public void testParquetHiveInputFormatWithSpecificSchemaLastColumn() throws Exception {
+// final String schemaRequested = "message customer {\n"
+// + " optional binary c_comment;\n"
+// + "}";
+// readParquetHiveInputFormat(schemaRequested, new Integer[] {7});
+// }
+
+// Not working yet // TODO
@julienledem Owner

todo?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
...va/parquet/hive/TestDeprecatedParquetInputFormat.java
((123 lines not shown))
+// final String schemaRequested = "message customer {\n"
+// + " optional int32 unknown;\n"
+// + "}";
+// readParquetHiveInputFormat(schemaRequested, new Integer[] {Integer.MIN_VALUE});
+// }
+//
+ @Override
+ protected void setUp() throws Exception {
+ //
+ // create job and filesystem and reporter and such.
+ //
+ mapData = new HashMap<Integer, ArrayWritable>();
+ conf = new Configuration();
+ job = new JobConf(conf);
+ fs = FileSystem.getLocal(conf);
+ dir = new Path("testdata/from_java/deprecatedoutputformat/");
@julienledem Owner

temporary test files should live under target/tests/...

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
.../src/test/java/parquet/hive/UtilitiesTestMethods.java
((66 lines not shown))
+ if (phone != null) {
+ arr[4] = new BinaryWritable(phone);
+ }
+ if (acctbal != null) {
+ arr[5] = new DoubleWritable(acctbal);
+ }
+ if (mktsegment != null) {
+ arr[6] = new BinaryWritable(mktsegment);
+ }
+ if (comment != null) {
+ arr[7] = new BinaryWritable(comment);
+ }
+
+ return new ArrayWritable(Writable.class, arr);
+ }
+// public static void readTestFile(Path testFile, Configuration configuration)
@julienledem Owner

still needed?

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@julienledem
Owner

you can use mvn cobertura:cobertura to verify your test coverage if you'd like

@mickaellcr
Collaborator

thx for cobertura, it's nice :) It will be my next todo xD

...n/java/parquet/column/impl/ColumnReaderEmptyImpl.java
((10 lines not shown))
+ * OF ANY KIND, either express or implied. See the License for the specific language governing permissions and limitations under the License.
+ */
+package parquet.column.impl;
+
+import parquet.column.ColumnReader;
+import parquet.io.ParquetDecodingException;
+import parquet.io.api.Binary;
+
+/**
+ * Empty ColumnReader implementation
+ *
+ *
+ * This column reader generate default values for new columns to support schema evolution.
+ * For a new optional or repeated field it will always return null.
+ *
+ * TODO : Right now, this will work only for flat schemas, Need to make it work for all schemas.
@julienledem Owner

The change to ColumnIOFactory now in trunk should cover that.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
...ain/java/parquet/column/impl/ColumnReadStoreImpl.java
@@ -55,7 +55,11 @@ public ColumnReadStoreImpl(PageReadStore pageReadStore, GroupConverter recordCon
@Override
public ColumnReader getColumnReader(ColumnDescriptor path) {
- return newMemColumnReader(path, pageReadStore.getPageReader(path));
+ if (pageReadStore.hasColumn(path)) {
+ return newMemColumnReader(path, pageReadStore.getPageReader(path));
+ } else {
+ return new ColumnReaderEmptyImpl(pageReadStore.getRowCount());
@julienledem Owner

We can remove that now

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@julienledem
Owner

So it looks like you just need to integrate the schema compatibility change and then we can merge?

mickaellcr and others added some commits
@mickaellcr mickaellcr Start implementation parquet for hive :
 - Read simple data (INT32, INT64, DOUBLE, FLOAT, BOOLEAN, BINARY -String)
 - Write simple data (INT32, INT64, FLOAT, DOUBLE, BOOLEAN, BINARY -String)
 - Read data works only if HiveInputFormat is set (not CombineHive)

TODO :

 - Support complex type (struct, map, array)
 - Support CombineHive
 - Unit test :)
c7a8eaf
@Lordshinjo Lordshinjo Remove any K,V from DeprecatedXXFormat 2471e51
@Lordshinjo Lordshinjo Add support for CombineHiveInputFormat
- This recreates the input splits by reading each file's metadata
- This is slower than being able to get our InputSplits directly,
but still faster than calling getSplits over and over again
fcc88f3
@Lordshinjo Lordshinjo Can write complex types 4fe18c5
@Lordshinjo Lordshinjo Can read some complex types
- Structs should be entirely good
- Array and maps do not seem to work correctly
b4d1c71
@mickaellcr mickaellcr Improve column reading
 - Trying to only read the right column (work in progress)
f5ca27b
@mickaellcr mickaellcr rename one parameter 07e54a5
@mickaellcr mickaellcr Some improvements on the hive implementation :
 - start to read complex type (struct done !, other in progress)
 - start to write complex type (struct done !, other maybe done :) )
 - try to give only the requested schema to parquet objects but
we have some troubles with the way readsupport object is initialized. Trying few
work around with hive to force the jobConf to be updated (like they do for
RCFile in HiveInputFormat). Not working when getSplits is called because
we have no path (in progress).

Unit test :

 - Just a start only one very very tiny small is working to test hiveschemaconverter
 - working on the SerDe testing (trying to create a parquet file first in unit test)

The code is not clean, neither optimize. 'Make it work, Make it right, Make it fast'
only on the first step :p
1ada3d2
@mickaellcr mickaellcr Indentation : retab to 2 spaces. Nothing else. ebf76d4
@mickaellcr mickaellcr Give selected columns to ParquetInputFormat : Done
 - no more hack
 - works with HiveInputFormat and HiveCombine
126da3c
@mickaellcr mickaellcr Add equals and hashcode methods to BinaryWritable 910e7cd
@mickaellcr mickaellcr Implement a basic version of the SerDeStats object for ParquetHiveSerDe 7f534d5
@mickaellcr mickaellcr Fix Short object for Hive (use short for short instead of byte :)) 6c219a5
@mickaellcr mickaellcr Add a simple unit test for ParquetSerDe bd826ec
@mickaellcr mickaellcr Add some unit test in order to test :
 - HiveSerDe : fix some bugs about long and byte datas
 - DeprecatedParquetInputFormat : add StatsSerDe method
 - MapWritableWriter : fix bug if we start and close without adding values
 - UtilitiesTestMethods : almost everything is from parquet-pig. Very useful
 - TestDeprecated{Input,Output}Format : in order to test the hive stuff

Update pom.xml :

 - Hive-* 0.10 version
 - Add column for test purpose
33e0131
@mickaellcr mickaellcr Improve the pull request following advices from Julien 1dc42f0
@Lordshinjo Lordshinjo Add full support for array and map reading 74be528
@mickaellcr mickaellcr Add unit test for storage d615729
@mickaellcr mickaellcr update hadoop version f2d9e81
@mickaellcr mickaellcr Fix compile with abstract methods 4198153
@Lordshinjo Lordshinjo Fix CombineHive bug
- When getting splits from CombineHive, recalculate all possible
splits for the path, and only keep the correct one, to keep only
the correct blocks
cc6754c
@Lordshinjo Lordshinjo Fix more combine stuff cee774c
@Lordshinjo Lordshinjo Correct fix to CombineHive 174c26a
@mickaellcr mickaellcr Change MapWritable to ArrayWritable (perfomance improved !)
Fix the ugly fix in case trouble while reading with combine hive
Refactor the unit test
Add more unit test
Specify all the unsupported format (next : refactor this, because we have like 4 methods for it)
Fix the fact that we were reading twice the data ( sorry :) )
Did some profiling with the unit test
3ee49a5
@mickaellcr mickaellcr Remove unused parameters 8234945
@Lordshinjo Lordshinjo Fix the size of the value array
- Give the list of columns to the ReadSupport via the split
- The ReadSupport then gives the GroupConverter the Hive schema
converted in Parquet format
543cdc2
@mickaellcr mickaellcr Hadoop 2.0 compatibility, hive 0.10
 - Using ContextUtils to be able to launch with hadoop 2.0
 - Working with hive 0.10
 - Fix some issues with ArrayWritable to be able to reach any columns
with objectInspector
bb6e2ff
@Lordshinjo Lordshinjo Add metadata in ReadContext instead of Split
- Fix TaskAttemptContext in the deprecated output format
- Clean up a bit how we get the split in the deprecated input format
0ec089d
@Lordshinjo Lordshinjo Clean up ReadSupport init 82fff8c
@mickaellcr mickaellcr Manage count 0 3089e83
@mickaellcr mickaellcr Update with advices from Julien 21b0d97
@mickaellcr mickaellcr Improve speed for queries like count(0), in which we only need the nu…
…mber of lines
eccbba1
@mickaellcr mickaellcr Try to fix travis build bf7263b
@mickaellcr mickaellcr Minor changes
 - Documentation
 - Change the name of a method
26360f3
@Lordshinjo Lordshinjo Update getSplits in DeprecatedParquetInputFormat
- In the case of a FileSplit, do not get the blocks, and instead
compute directly from the split offset/length
- Add javadoc
2525587
@mickaellcr mickaellcr Code review
 - Add todo
 - Add javadoc
 - Rename class
 - Rename method
 - Improve tests
4cf6ae1
@mickaellcr mickaellcr commented on the diff
...-column/src/main/java/parquet/io/MessageColumnIO.java
((16 lines not shown))
}
+
public <T> RecordReader<T> getRecordReader(PageReadStore columns, RecordMaterializer<T> recordMaterializer,
UnboundRecordFilter unboundFilter) {
@mickaellcr Collaborator

@julienledem : Quick question

I noticed that you implemented a new getRecordReader for the unboundFilter. In case the unboundFilter is not null, do I need to use the same trick as the previous method with the : if (leaves.size() > 0) .... else EmptyRecordReader ?

@julienledem Owner

I'm not sure what the ramifications would be as the Filter would work with no columns at all.
Let's add a TODO in getRecordReader(PageReadStore columns, RecordMaterializer recordMaterializer, UnboundRecordFilter unboundFilter)
and punt on that for now, it is kind of incompatible.
You could also throw an exception in that case to make it clear.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
@mickaellcr mickaellcr was assigned
@julienledem
Owner

LGTM
Congrats!

@julienledem julienledem merged commit 92c450a into Parquet:master
@julienledem julienledem referenced this pull request from a commit
@tsdeng tsdeng do ProtocolEvents fixing only when there is required fields missing i…
…n the requested schema


https://issues.apache.org/jira/browse/PARQUET-61
This PR is trying to redo the https://github.com/apache/incubator-parquet-mr/pull/7

In this PR, it fixes the protocol event in a more precise condition:
Only when the requested schema missing some required fields that are present in the full schema

So even if there a projection, as long as the projection is not getting rid of the required field, the protocol events amender will not be called.

Could you take a look at this ? @dvryaboy @yan-qi

Author: Tianshuo Deng <tdeng@twitter.com>

Closes #28 from tsdeng/fix_protocol_when_required_field_missing and squashes the following commits:

ba778b9 [Tianshuo Deng] add continue for readability
d5639df [Tianshuo Deng] fix unused import
090e894 [Tianshuo Deng] format
13a609d [Tianshuo Deng] comment format
ef1fe58 [Tianshuo Deng] little refactor, remove the hasMissingRequiredFieldFromProjection method
7c2c158 [Tianshuo Deng] format
83a5655 [Tianshuo Deng] do ProtocolEvents fixing only when there is required fields missing in the requested schema
f8b06df
@abayer abayer referenced this pull request from a commit in cloudera/parquet-mr
@tsdeng tsdeng do ProtocolEvents fixing only when there is required fields missing i…
…n the requested schema


https://issues.apache.org/jira/browse/PARQUET-61
This PR is trying to redo the https://github.com/apache/incubator-parquet-mr/pull/7

In this PR, it fixes the protocol event in a more precise condition:
Only when the requested schema missing some required fields that are present in the full schema

So even if there a projection, as long as the projection is not getting rid of the required field, the protocol events amender will not be called.

Could you take a look at this ? @dvryaboy @yan-qi

Author: Tianshuo Deng <tdeng@twitter.com>

Closes #28 from tsdeng/fix_protocol_when_required_field_missing and squashes the following commits:

ba778b9 [Tianshuo Deng] add continue for readability
d5639df [Tianshuo Deng] fix unused import
090e894 [Tianshuo Deng] format
13a609d [Tianshuo Deng] comment format
ef1fe58 [Tianshuo Deng] little refactor, remove the hasMissingRequiredFieldFromProjection method
7c2c158 [Tianshuo Deng] format
83a5655 [Tianshuo Deng] do ProtocolEvents fixing only when there is required fields missing in the requested schema
2a0b165
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Commits on Jul 23, 2013
  1. @mickaellcr

    Start implementation parquet for hive :

    mickaellcr authored
     - Read simple data (INT32, INT64, DOUBLE, FLOAT, BOOLEAN, BINARY -String)
     - Write simple data (INT32, INT64, FLOAT, DOUBLE, BOOLEAN, BINARY -String)
     - Read data works only if HiveInputFormat is set (not CombineHive)
    
    TODO :
    
     - Support complex type (struct, map, array)
     - Support CombineHive
     - Unit test :)
  2. @Lordshinjo @mickaellcr

    Remove any K,V from DeprecatedXXFormat

    Lordshinjo authored mickaellcr committed
  3. @Lordshinjo @mickaellcr

    Add support for CombineHiveInputFormat

    Lordshinjo authored mickaellcr committed
    - This recreates the input splits by reading each file's metadata
    - This is slower than being able to get our InputSplits directly,
    but still faster than calling getSplits over and over again
  4. @Lordshinjo @mickaellcr

    Can write complex types

    Lordshinjo authored mickaellcr committed
  5. @Lordshinjo @mickaellcr

    Can read some complex types

    Lordshinjo authored mickaellcr committed
    - Structs should be entirely good
    - Array and maps do not seem to work correctly
  6. @mickaellcr

    Improve column reading

    mickaellcr authored
     - Trying to only read the right column (work in progress)
  7. @mickaellcr

    rename one parameter

    mickaellcr authored
  8. @mickaellcr

    Some improvements on the hive implementation :

    mickaellcr authored
     - start to read complex type (struct done !, other in progress)
     - start to write complex type (struct done !, other maybe done :) )
     - try to give only the requested schema to parquet objects but
    we have some troubles with the way readsupport object is initialized. Trying few
    work around with hive to force the jobConf to be updated (like they do for
    RCFile in HiveInputFormat). Not working when getSplits is called because
    we have no path (in progress).
    
    Unit test :
    
     - Just a start only one very very tiny small is working to test hiveschemaconverter
     - working on the SerDe testing (trying to create a parquet file first in unit test)
    
    The code is not clean, neither optimize. 'Make it work, Make it right, Make it fast'
    only on the first step :p
  9. @mickaellcr
  10. @mickaellcr

    Give selected columns to ParquetInputFormat : Done

    mickaellcr authored
     - no more hack
     - works with HiveInputFormat and HiveCombine
  11. @mickaellcr
  12. @mickaellcr
  13. @mickaellcr
  14. @mickaellcr
  15. @mickaellcr

    Add some unit test in order to test :

    mickaellcr authored
     - HiveSerDe : fix some bugs about long and byte datas
     - DeprecatedParquetInputFormat : add StatsSerDe method
     - MapWritableWriter : fix bug if we start and close without adding values
     - UtilitiesTestMethods : almost everything is from parquet-pig. Very useful
     - TestDeprecated{Input,Output}Format : in order to test the hive stuff
    
    Update pom.xml :
    
     - Hive-* 0.10 version
     - Add column for test purpose
  16. @mickaellcr
  17. @Lordshinjo @mickaellcr

    Add full support for array and map reading

    Lordshinjo authored mickaellcr committed
  18. @mickaellcr

    Add unit test for storage

    mickaellcr authored
  19. @mickaellcr

    update hadoop version

    mickaellcr authored
  20. @mickaellcr
  21. @Lordshinjo @mickaellcr

    Fix CombineHive bug

    Lordshinjo authored mickaellcr committed
    - When getting splits from CombineHive, recalculate all possible
    splits for the path, and only keep the correct one, to keep only
    the correct blocks
  22. @Lordshinjo @mickaellcr

    Fix more combine stuff

    Lordshinjo authored mickaellcr committed
  23. @Lordshinjo @mickaellcr

    Correct fix to CombineHive

    Lordshinjo authored mickaellcr committed
  24. @mickaellcr

    Change MapWritable to ArrayWritable (perfomance improved !)

    mickaellcr authored
    Fix the ugly fix in case trouble while reading with combine hive
    Refactor the unit test
    Add more unit test
    Specify all the unsupported format (next : refactor this, because we have like 4 methods for it)
    Fix the fact that we were reading twice the data ( sorry :) )
    Did some profiling with the unit test
  25. @mickaellcr

    Remove unused parameters

    mickaellcr authored
  26. @Lordshinjo @mickaellcr

    Fix the size of the value array

    Lordshinjo authored mickaellcr committed
    - Give the list of columns to the ReadSupport via the split
    - The ReadSupport then gives the GroupConverter the Hive schema
    converted in Parquet format
  27. @mickaellcr

    Hadoop 2.0 compatibility, hive 0.10

    mickaellcr authored
     - Using ContextUtils to be able to launch with hadoop 2.0
     - Working with hive 0.10
     - Fix some issues with ArrayWritable to be able to reach any columns
    with objectInspector
  28. @Lordshinjo @mickaellcr

    Add metadata in ReadContext instead of Split

    Lordshinjo authored mickaellcr committed
    - Fix TaskAttemptContext in the deprecated output format
    - Clean up a bit how we get the split in the deprecated input format
  29. @Lordshinjo @mickaellcr

    Clean up ReadSupport init

    Lordshinjo authored mickaellcr committed
  30. @mickaellcr

    Manage count 0

    mickaellcr authored
  31. @mickaellcr
  32. @mickaellcr
  33. @mickaellcr

    Try to fix travis build

    mickaellcr authored
  34. @mickaellcr

    Minor changes

    mickaellcr authored
     - Documentation
     - Change the name of a method
  35. @Lordshinjo @mickaellcr

    Update getSplits in DeprecatedParquetInputFormat

    Lordshinjo authored mickaellcr committed
    - In the case of a FileSplit, do not get the blocks, and instead
    compute directly from the split offset/length
    - Add javadoc
  36. @mickaellcr

    Code review

    mickaellcr authored
     - Add todo
     - Add javadoc
     - Rename class
     - Rename method
     - Improve tests
Something went wrong with that request. Please try again.