Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

'core' ORC extension #7138

Merged
merged 15 commits into from
Apr 9, 2019
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,7 @@
public class OrcHadoopInputRowParser implements InputRowParser<OrcStruct>
{
private final ParseSpec parseSpec;
private final ObjectFlattener<OrcStruct> groupFlattener;
private final ObjectFlattener<OrcStruct> orcStructFlattener;
private final MapInputRowParser parser;
private final boolean binaryAsString;

Expand All @@ -50,20 +50,20 @@ public OrcHadoopInputRowParser(
this.parseSpec = parseSpec;
this.binaryAsString = binaryAsString == null ? false : binaryAsString;
final JSONPathSpec flattenSpec;
if ((parseSpec instanceof OrcParseSpec)) {
if (parseSpec instanceof OrcParseSpec) {
flattenSpec = ((OrcParseSpec) parseSpec).getFlattenSpec();
} else {
flattenSpec = JSONPathSpec.DEFAULT;
}
this.groupFlattener = ObjectFlatteners.create(flattenSpec, new OrcStructFlattenerMaker(false));
this.orcStructFlattener = ObjectFlatteners.create(flattenSpec, new OrcStructFlattenerMaker(this.binaryAsString));
this.parser = new MapInputRowParser(parseSpec);
}

@NotNull
@Override
public List<InputRow> parseBatch(OrcStruct input)
{
return parser.parseBatch(groupFlattener.flatten(input));
return parser.parseBatch(orcStructFlattener.flatten(input));
}

@Override
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,8 @@

package org.apache.druid.data.input.orc;

import it.unimi.dsi.fastutil.objects.Object2IntMap;
import it.unimi.dsi.fastutil.objects.Object2IntOpenHashMap;
import org.apache.druid.java.util.common.DateTimes;
import org.apache.druid.java.util.common.StringUtils;
import org.apache.hadoop.hive.serde2.io.DateWritable;
Expand Down Expand Up @@ -49,6 +51,19 @@

public class OrcStructConverter
{
@Nonnull
private static List<Object> convertList(TypeDescription fieldDescription, OrcList orcList, boolean binaryAsString)
{
// if primitive list, convert primitives
TypeDescription listType = fieldDescription.getChildren().get(0);
if (listType.getCategory().isPrimitive()) {
return (List<Object>) orcList.stream()
.map(li -> convertPrimitive(listType, (WritableComparable) li, binaryAsString))
.collect(Collectors.toList());
}
return new ArrayList<Object>(orcList);
}

private static Map<Object, Object> convertMap(
TypeDescription fieldDescription,
OrcMap<? extends WritableComparable, ? extends WritableComparable> map,
Expand Down Expand Up @@ -126,6 +141,7 @@ private static Object convertPrimitive(TypeDescription fieldDescription, Writabl
}

private boolean binaryAsString;
private Object2IntMap<String> fieldIndexCache;

OrcStructConverter(boolean binaryAsString)
{
Expand All @@ -139,18 +155,46 @@ private static Object convertPrimitive(TypeDescription fieldDescription, Writabl
* primitive types will be extracted into an ingestion friendly state (e.g. 'int' and 'long'). Finally,
* if a field is not present, this method will return null.
*
* Note: "Union" types are not currently supported and will be returned as null
* Note: "Union" types are not currently supported and will be returned as null. Additionally, this method
* has a cache of field names to field index that is ONLY valid for the root level {@link OrcStruct}, and should
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This design is not foolproof enough- it's risky/errorprone, and a bit obtuse to read, because two methods called convertField with slightly different arguments have very different semantics. This one should be renamed to convertRootField and the javadoc should call out the restriction prominently, rather than in a side node.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Agree, will fix 👍

* not be used for nested {@link OrcStruct} fields of the root.
*/
@Nullable
Object convertField(OrcStruct struct, String fieldName)
{
// this cache is only valid for the root level, to skip the indexOf on fieldNames to get the fieldIndex.
TypeDescription schema = struct.getSchema();
int fieldIndex = schema.getFieldNames().indexOf(fieldName);
final List<String> fields = schema.getFieldNames();
if (fieldIndexCache == null) {
fieldIndexCache = new Object2IntOpenHashMap<>(fields.size());
for (int i = 0; i < fields.size(); i++) {
fieldIndexCache.put(fields.get(i), i);
}
}
WritableComparable wc = struct.getFieldValue(fieldName);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Variable is not used. I assume that calling struct.getFieldValue() doesn't have desirable side effects. I'll delete this line.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Deleted here: #7738

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Ouch, yeah this is not supposed to be there, I missed it and worse, it is sort of defeating the purpose of the field index cache since it's still causing indexOf to get called..

Thanks for fixing this, I'm going to open a PR to 0.15 branch to effectively backport this part of #7738 because this is a performance issue for data with lots of fields.


int fieldIndex = fieldIndexCache.getOrDefault(fieldName, -1);

return convertField(struct, fieldIndex);
}

/**
* Convert a orc struct field as though it were a map, by fieldIndex. Complex types will be transformed
* into java lists and maps when possible ({@link OrcStructConverter#convertList} and
* {@link OrcStructConverter#convertMap}), and
* primitive types will be extracted into an ingestion friendly state (e.g. 'int' and 'long'). Finally,
* if a field is not present, this method will return null.
*
* Note: "Union" types are not currently supported and will be returned as null
*/
@Nullable
Object convertField(OrcStruct struct, int fieldIndex)
{
if (fieldIndex < 0) {
return null;
}

TypeDescription schema = struct.getSchema();
TypeDescription fieldDescription = schema.getChildren().get(fieldIndex);
WritableComparable fieldValue = struct.getFieldValue(fieldIndex);

Expand All @@ -172,13 +216,13 @@ Object convertField(OrcStruct struct, String fieldName)
switch (fieldDescription.getCategory()) {
case LIST:
OrcList orcList = (OrcList) fieldValue;
return convertList(fieldDescription, orcList);
return convertList(fieldDescription, orcList, binaryAsString);
case MAP:
OrcMap map = (OrcMap) fieldValue;
return convertMap(fieldDescription, map, binaryAsString);
case STRUCT:
OrcStruct structMap = (OrcStruct) fieldValue;
return convertMap(structMap);
return convertStructToMap(structMap);
case UNION:
// sorry union types :(
default:
Expand All @@ -187,25 +231,13 @@ Object convertField(OrcStruct struct, String fieldName)
}
}

@Nonnull
private List<Object> convertList(TypeDescription fieldDescription, OrcList orcList)
{
// if primitive list, convert primitives
TypeDescription listType = fieldDescription.getChildren().get(0);
if (listType.getCategory().isPrimitive()) {
return (List<Object>) orcList.stream()
.map(li -> convertPrimitive(listType, (WritableComparable) li, binaryAsString))
.collect(Collectors.toList());
}
return new ArrayList<Object>(orcList);
}


private Map<String, Object> convertMap(OrcStruct map)
private Map<String, Object> convertStructToMap(OrcStruct map)
{
Map<String, Object> converted = new HashMap<>();
for (String key : map.getSchema().getFieldNames()) {
converted.put(key, convertField(map, key));
List<String> fieldNames = map.getSchema().getFieldNames();

for (int i = 0; i < fieldNames.size(); i++) {
converted.put(fieldNames.get(i), convertField(map, i));
}
return converted;
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,9 @@ public Object getMapValue(final Object o, final String s)
} else if (o instanceof Map) {
return ((Map) o).get(s);
} else if (o instanceof OrcStruct) {
OrcStruct g = (OrcStruct) o;
return converter.convertField(g, s);
OrcStruct struct = (OrcStruct) o;
// get field by index since we have no way to know if this map is the root or not
return converter.convertField(struct, struct.getSchema().getFieldNames().indexOf(s));
}
throw new UnsupportedOperationException(o.getClass().getName());
}
Expand Down