Skip to content

Commit

Permalink
do ProtocolEvents fixing only when there is required fields missing i…
Browse files Browse the repository at this point in the history
…n the requested schema

https://issues.apache.org/jira/browse/PARQUET-61
This PR is trying to redo the https://github.com/apache/incubator-parquet-mr/pull/7

In this PR, it fixes the protocol event in a more precise condition:
Only when the requested schema missing some required fields that are present in the full schema

So even if there a projection, as long as the projection is not getting rid of the required field, the protocol events amender will not be called.

Could you take a look at this ? @dvryaboy @yan-qi

Author: Tianshuo Deng <tdeng@twitter.com>

Closes #28 from tsdeng/fix_protocol_when_required_field_missing and squashes the following commits:

ba778b9 [Tianshuo Deng] add continue for readability
d5639df [Tianshuo Deng] fix unused import
090e894 [Tianshuo Deng] format
13a609d [Tianshuo Deng] comment format
ef1fe58 [Tianshuo Deng] little refactor, remove the hasMissingRequiredFieldFromProjection method
7c2c158 [Tianshuo Deng] format
83a5655 [Tianshuo Deng] do ProtocolEvents fixing only when there is required fields missing in the requested schema
  • Loading branch information
tsdeng authored and rdblue committed Sep 18, 2014
1 parent 0e9f24b commit 2a0b165
Showing 1 changed file with 35 additions and 2 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -778,6 +778,7 @@ public void end() {
private final ParquetReadProtocol protocol;
private final GroupConverter structConverter;
private List<TProtocol> rootEvents = new ArrayList<TProtocol>();
private boolean missingRequiredFieldsInProjection = false;

/**
*
Expand All @@ -791,9 +792,36 @@ public ThriftRecordConverter(ThriftReader<T> thriftReader, String name, MessageT
this.thriftReader = thriftReader;
this.protocol = new ParquetReadProtocol();
this.thriftType = thriftType;
MessageType fullSchema = new ThriftSchemaConverter().convert(thriftType);
missingRequiredFieldsInProjection = hasMissingRequiredFieldInGroupType(requestedParquetSchema, fullSchema);
this.structConverter = new StructConverter(rootEvents, requestedParquetSchema, new ThriftField(name, (short)0, Requirement.REQUIRED, thriftType));
}

private boolean hasMissingRequiredFieldInGroupType(GroupType requested, GroupType fullSchema) {
for (Type field : fullSchema.getFields()) {

if (requested.containsField(field.getName())) {
Type requestedType = requested.getType(field.getName());
// if a field is in requested schema and the type of it is a group type, then do recursive check
if (!field.isPrimitive()) {
if (hasMissingRequiredFieldInGroupType(requestedType.asGroupType(), field.asGroupType())) {
return true;
} else {
continue;// check next field
}
}
} else {
if (field.getRepetition() == Type.Repetition.REQUIRED) {
return true; // if a field is missing in requested schema and it's required
} else {
continue; // the missing field is not required, then continue checking next field
}
}
}

return false;
}

/**
*
* {@inheritDoc}
Expand All @@ -802,8 +830,13 @@ public ThriftRecordConverter(ThriftReader<T> thriftReader, String name, MessageT
@Override
public T getCurrentRecord() {
try {
List<TProtocol> fixedEvents = new ProtocolEventsAmender(rootEvents).amendMissingRequiredFields(thriftType);
protocol.addAll(fixedEvents);
if (missingRequiredFieldsInProjection) {
List<TProtocol> fixedEvents = new ProtocolEventsAmender(rootEvents).amendMissingRequiredFields(thriftType);
protocol.addAll(fixedEvents);
} else {
protocol.addAll(rootEvents);
}

rootEvents.clear();
return thriftReader.readOneRecord(protocol);
} catch (TException e) {
Expand Down

0 comments on commit 2a0b165

Please sign in to comment.