Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

PARQUET-7: [thrift] avoid thrift amender if all fields are optional. #7

Closed
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,9 @@
import java.nio.ByteBuffer;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.Iterator;
import java.util.List;
import java.util.Map;

import org.apache.thrift.TException;
import org.apache.thrift.protocol.TField;
import org.apache.thrift.protocol.TList;
Expand Down Expand Up @@ -778,7 +778,12 @@ public void end() {
private final ParquetReadProtocol protocol;
private final GroupConverter structConverter;
private List<TProtocol> rootEvents = new ArrayList<TProtocol>();
boolean hasRequiredFields = false;

//TODO(dmitriy): make this expire things
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why expire? It's possible for this to change during the lifetime of the jvm?
Or you want to avoid the map growing too big? Isn't a parquet schema inherently tied to a single thrift schema? Do we load more than one schema in the same jvm? If it's just one schema then this shouldn't grow without bound right?

// we probably want to cache this since we might have to keep re-examining the same struct in different
// instances of the same converter
private static Map<ThriftType.StructType, Boolean> hasRequiredFieldCache = new HashMap<ThriftType.StructType, Boolean>();
/**
*
* @param thriftReader the class responsible for instantiating the final object and read from the protocol
Expand All @@ -791,9 +796,54 @@ public ThriftRecordConverter(ThriftReader<T> thriftReader, String name, MessageT
this.thriftReader = thriftReader;
this.protocol = new ParquetReadProtocol();
this.thriftType = thriftType;
if (!hasRequiredFieldCache.containsKey(thriftType)) {
hasRequiredFieldCache.put(thriftType, somethingIsRequiredInStruct(thriftType));
}
this.hasRequiredFields = hasRequiredFieldCache.get(thriftType);
this.structConverter = new StructConverter(rootEvents, requestedParquetSchema, new ThriftField(name, (short)0, Requirement.REQUIRED, thriftType));
}

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we could push the logic in this PR (including the cache) into ProtocolEventsAmmender.ammendMissingRequiredFields(), and have it return an empty list here?

Then you don't need an if / else here, and you can make ammendMissingRequiredFields short circuit.

Also, what if the cache was instead a Map<ThriftType.StructType, List<TProtocol>> ?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

agree, caching in the amender so we can even do it in a finer granularity.. let me think....

private boolean fieldIsRequired(ThriftField thriftField) {
boolean isRequired = (thriftField.getRequirement() == ThriftField.Requirement.REQUIRED);
if (isRequired) return true;

ThriftType elementType = thriftField.getType();
ThriftTypeID elementFieldTypeID = elementType.getType();
switch (elementFieldTypeID) {
case STRUCT:
return somethingIsRequiredInStruct((ThriftType.StructType) elementType);
case LIST:
return somethingIsRequiredInList((ThriftType.ListType) elementType);
case MAP:
return somethingIsRequiredInMap((ThriftType.MapType) elementType);
case SET:
return somethingIsRequiredInSet((ThriftType.SetType) elementType);
default:
return false;
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

should this be an exception? Is it valid to land here?

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is correct. since before it's saying "if (isRequired) return true;"
When it hits the default case, meaning it's a primitive type and also isRequired is false, so here it should just return false.
To make the intention of the code more clear, maybe @dvryaboy can change it to "return isRequired"

}
}

private boolean somethingIsRequiredInList(ThriftType.ListType thriftType) {
return fieldIsRequired(thriftType.getValues());
}
private boolean somethingIsRequiredInSet(ThriftType.SetType thriftType) {
return fieldIsRequired(thriftType.getValues());
}

private boolean somethingIsRequiredInMap(ThriftType.MapType thriftType) {
return fieldIsRequired(thriftType.getKey()) || fieldIsRequired(thriftType.getValue());
}

private boolean somethingIsRequiredInStruct(ThriftType.StructType thriftType) {
boolean isRequired = false;
Iterator<ThriftField> childrenIter = thriftType.getChildren().iterator();
while (!isRequired && childrenIter.hasNext()) {
ThriftField field = childrenIter.next();
isRequired = fieldIsRequired(field);
}
return isRequired;
}

/**
*
* {@inheritDoc}
Expand All @@ -802,10 +852,16 @@ public ThriftRecordConverter(ThriftReader<T> thriftReader, String name, MessageT
@Override
public T getCurrentRecord() {
try {
List<TProtocol> fixedEvents = new ProtocolEventsAmender(rootEvents).amendMissingRequiredFields(thriftType);
protocol.addAll(fixedEvents);
rootEvents.clear();
return thriftReader.readOneRecord(protocol);
if (hasRequiredFields) {
List<TProtocol> fixedEvents = new ProtocolEventsAmender(rootEvents).amendMissingRequiredFields(thriftType);
protocol.addAll(fixedEvents);
rootEvents.clear();
return thriftReader.readOneRecord(protocol);
} else {
protocol.addAll(rootEvents);
rootEvents.clear();
return thriftReader.readOneRecord(protocol);
}
} catch (TException e) {
throw new ParquetDecodingException("Could not read thrift object from protocol", e);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -49,7 +49,7 @@ public void readStructEnd() throws TException {
public void readFieldEnd() throws TException {
}
};
List createdEvents = new ArrayList<TProtocol>();
List<TProtocol> createdEvents = new ArrayList<TProtocol>();

public List<TProtocol> createProtocolEventsForField(ThriftField missingField) {
TProtocol fieldBegin = new ReadFieldBeginProtocol(missingField);
Expand Down