Skip to content

Commit

Permalink
HIVE-15057: Nested column pruning: support all operators (Chao Sun, r…
Browse files Browse the repository at this point in the history
…eviewed by Ferdinand Xu)
  • Loading branch information
sunchao committed Dec 3, 2016
1 parent 2feaa5d commit a625bb0
Show file tree
Hide file tree
Showing 16 changed files with 1,815 additions and 403 deletions.
72 changes: 66 additions & 6 deletions ql/src/java/org/apache/hadoop/hive/ql/exec/MapOperator.java
Expand Up @@ -45,6 +45,7 @@
import org.apache.hadoop.hive.ql.plan.TableDesc;
import org.apache.hadoop.hive.ql.plan.TableScanDesc;
import org.apache.hadoop.hive.ql.plan.api.OperatorType;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
import org.apache.hadoop.hive.serde2.Deserializer;
import org.apache.hadoop.hive.serde2.SerDeException;
import org.apache.hadoop.hive.serde2.SerDeStats;
Expand Down Expand Up @@ -178,7 +179,6 @@ private MapOpCtx initObjectInspector(Configuration hconf, MapOpCtx opCtx,
SerDeUtils.createOverlayedProperties(td.getProperties(), pd.getProperties());

Map<String, String> partSpec = pd.getPartSpec();

opCtx.tableName = String.valueOf(overlayedProps.getProperty("name"));
opCtx.partName = String.valueOf(partSpec);
opCtx.deserializer = pd.getDeserializer(hconf);
Expand Down Expand Up @@ -279,19 +279,20 @@ private MapOpCtx initObjectInspector(Configuration hconf, MapOpCtx opCtx,
* and P1's schema is same as T, whereas P2's scheme is different from T, conversion
* might be needed for both P1 and P2, since SettableOI might be needed for T
*/
private Map<TableDesc, StructObjectInspector> getConvertedOI(Configuration hconf)
private Map<TableDesc, StructObjectInspector> getConvertedOI(Map<String, Configuration> tableToConf)
throws HiveException {
Map<TableDesc, StructObjectInspector> tableDescOI =
new HashMap<TableDesc, StructObjectInspector>();
Set<TableDesc> identityConverterTableDesc = new HashSet<TableDesc>();

try {
Map<ObjectInspector, Boolean> oiSettableProperties = new HashMap<ObjectInspector, Boolean>();

for (Path onefile : conf.getPathToAliases().keySet()) {
PartitionDesc pd = conf.getPathToPartitionInfo().get(onefile);
TableDesc tableDesc = pd.getTableDesc();
Configuration hconf = tableToConf.get(tableDesc.getTableName());
Deserializer partDeserializer = pd.getDeserializer(hconf);

StructObjectInspector partRawRowObjectInspector;
boolean isAcid = AcidUtils.isTablePropertyTransactional(tableDesc.getProperties());
if (Utilities.isSchemaEvolutionEnabled(hconf, isAcid) && Utilities.isInputFileFormatSelfDescribing(pd)) {
Expand Down Expand Up @@ -329,6 +330,58 @@ else if (partRawRowObjectInspector.equals(tblRawRowObjectInspector)) {
return tableDescOI;
}

/**
* For each source table, combine the nested column pruning information from all its
* table scan descriptors and set it in a configuration copy. This is necessary since
* the configuration property "READ_NESTED_COLUMN_PATH_CONF_STR" is set on a per-table
* basis, so we can't just use a single configuration for all the tables.
*/
private Map<String, Configuration> cloneConfsForNestedColPruning(Configuration hconf) {
Map<String, Configuration> tableNameToConf = new HashMap<>();

for (Map.Entry<Path, ArrayList<String>> e : conf.getPathToAliases().entrySet()) {
List<String> aliases = e.getValue();
if (aliases == null || aliases.isEmpty()) {
continue;
}

String tableName = conf.getPathToPartitionInfo().get(e.getKey()).getTableName();
for (String alias: aliases) {
Operator<?> rootOp = conf.getAliasToWork().get(alias);
if (!(rootOp instanceof TableScanOperator)) {
continue;
}
TableScanDesc tableScanDesc = ((TableScanOperator) rootOp).getConf();
List<String> nestedColumnPaths = tableScanDesc.getNeededNestedColumnPaths();
if (nestedColumnPaths == null || nestedColumnPaths.isEmpty()) {
continue;
}
if (!tableNameToConf.containsKey(tableName)) {
Configuration clonedConf = new Configuration(hconf);
clonedConf.unset(ColumnProjectionUtils.READ_NESTED_COLUMN_PATH_CONF_STR);
tableNameToConf.put(tableName, clonedConf);
}
Configuration newConf = tableNameToConf.get(tableName);
ColumnProjectionUtils.appendNestedColumnPaths(newConf, nestedColumnPaths);
}
}

// Assign tables without nested column pruning info to the default conf
for (PartitionDesc pd : conf.getPathToPartitionInfo().values()) {
if (!tableNameToConf.containsKey(pd.getTableName())) {
tableNameToConf.put(pd.getTableName(), hconf);
}
}

for (PartitionDesc pd: conf.getAliasToPartnInfo().values()) {
if (!tableNameToConf.containsKey(pd.getTableName())) {
tableNameToConf.put(pd.getTableName(), hconf);
}
}

return tableNameToConf;
}

/*
* This is the same as the setChildren method below but for empty tables.
* It takes care of the following:
Expand All @@ -339,15 +392,19 @@ else if (partRawRowObjectInspector.equals(tblRawRowObjectInspector)) {
public void initEmptyInputChildren(List<Operator<?>> children, Configuration hconf)
throws SerDeException, Exception {
setChildOperators(children);

Map<String, Configuration> tableNameToConf = cloneConfsForNestedColPruning(hconf);

for (Operator<?> child : children) {
TableScanOperator tsOp = (TableScanOperator) child;
StructObjectInspector soi = null;
PartitionDesc partDesc = conf.getAliasToPartnInfo().get(tsOp.getConf().getAlias());
Configuration newConf = tableNameToConf.get(partDesc.getTableDesc().getTableName());
Deserializer serde = partDesc.getTableDesc().getDeserializer();
partDesc.setProperties(partDesc.getProperties());
MapOpCtx opCtx = new MapOpCtx(tsOp.getConf().getAlias(), child, partDesc);
StructObjectInspector tableRowOI = (StructObjectInspector) serde.getObjectInspector();
initObjectInspector(hconf, opCtx, tableRowOI);
initObjectInspector(newConf, opCtx, tableRowOI);
soi = opCtx.rowObjectInspector;
child.getParentOperators().add(this);
childrenOpToOI.put(child, soi);
Expand All @@ -359,12 +416,15 @@ public void setChildren(Configuration hconf) throws Exception {
List<Operator<? extends OperatorDesc>> children =
new ArrayList<Operator<? extends OperatorDesc>>();

Map<TableDesc, StructObjectInspector> convertedOI = getConvertedOI(hconf);
Map<String, Configuration> tableNameToConf = cloneConfsForNestedColPruning(hconf);
Map<TableDesc, StructObjectInspector> convertedOI = getConvertedOI(tableNameToConf);

for (Map.Entry<Path, ArrayList<String>> entry : conf.getPathToAliases().entrySet()) {
Path onefile = entry.getKey();
List<String> aliases = entry.getValue();
PartitionDesc partDesc = conf.getPathToPartitionInfo().get(onefile);
TableDesc tableDesc = partDesc.getTableDesc();
Configuration newConf = tableNameToConf.get(tableDesc.getTableName());

for (String alias : aliases) {
Operator<? extends OperatorDesc> op = conf.getAliasToWork().get(alias);
Expand All @@ -381,7 +441,7 @@ public void setChildren(Configuration hconf) throws Exception {
}
MapOpCtx context = new MapOpCtx(alias, op, partDesc);
StructObjectInspector tableRowOI = convertedOI.get(partDesc.getTableDesc());
contexts.put(op, initObjectInspector(hconf, context, tableRowOI));
contexts.put(op, initObjectInspector(newConf, context, tableRowOI));

if (children.contains(op) == false) {
op.setParentOperators(new ArrayList<Operator<? extends OperatorDesc>>(1));
Expand Down
Expand Up @@ -228,15 +228,16 @@ public static MessageType getProjectedSchema(
MessageType schema,
List<String> colNames,
List<Integer> colIndexes,
List<String> nestedColumnPaths) {
Set<String> nestedColumnPaths) {
List<Type> schemaTypes = new ArrayList<Type>();

Map<String, FieldNode> prunedCols = getPrunedNestedColumns(nestedColumnPaths);
for (Integer i : colIndexes) {
if (i < colNames.size()) {
if (i < schema.getFieldCount()) {
Type t = schema.getType(i);
if (!prunedCols.containsKey(t.getName())) {
String tn = t.getName().toLowerCase();
if (!prunedCols.containsKey(tn)) {
schemaTypes.add(schema.getType(i));
} else {
if (t.isPrimitive()) {
Expand All @@ -245,7 +246,7 @@ public static MessageType getProjectedSchema(
} else {
// For group type, we need to build the projected group type with required leaves
List<Type> g =
projectLeafTypes(Arrays.asList(t), Arrays.asList(prunedCols.get(t.getName())));
projectLeafTypes(Arrays.asList(t), Arrays.asList(prunedCols.get(tn)));
if (!g.isEmpty()) {
schemaTypes.addAll(g);
}
Expand All @@ -264,20 +265,19 @@ public static MessageType getProjectedSchema(

/**
* Return the columns which contains required nested attribute level
* e.g.
* Given struct a <x:int, y:int> and a is required while y is not, so the method will return a
* who contains the attribute x
* E.g., given struct a:<x:int, y:int> while 'x' is required and 'y' is not, the method will return
* a pruned struct for 'a' which only contains the attribute 'x'
*
* @param nestedColPaths the paths for required nested attribute
* @return column list contains required nested attribute
* @return a map from the column to its selected nested column paths, of which the keys are all lower-cased.
*/
private static Map<String, FieldNode> getPrunedNestedColumns(List<String> nestedColPaths) {
private static Map<String, FieldNode> getPrunedNestedColumns(Set<String> nestedColPaths) {
Map<String, FieldNode> resMap = new HashMap<>();
if (nestedColPaths.isEmpty()) {
return resMap;
}
for (String s : nestedColPaths) {
String c = StringUtils.split(s, '.')[0];
String c = StringUtils.split(s, '.')[0].toLowerCase();
if (!resMap.containsKey(c)) {
FieldNode f = NestedColumnFieldPruningUtils.addNodeByPath(null, s);
resMap.put(c, f);
Expand Down Expand Up @@ -306,10 +306,10 @@ private static List<Type> projectLeafTypes(
}
Map<String, FieldNode> fieldMap = new HashMap<>();
for (FieldNode n : nodes) {
fieldMap.put(n.getFieldName(), n);
fieldMap.put(n.getFieldName().toLowerCase(), n);
}
for (Type type : types) {
String tn = type.getName();
String tn = type.getName().toLowerCase();

if (fieldMap.containsKey(tn)) {
FieldNode f = fieldMap.get(tn);
Expand Down Expand Up @@ -373,7 +373,7 @@ public org.apache.parquet.hadoop.api.ReadSupport.ReadContext init(InitContext co
contextMetadata.put(PARQUET_COLUMN_INDEX_ACCESS, String.valueOf(indexAccess));
this.hiveTypeInfo = TypeInfoFactory.getStructTypeInfo(columnNamesList, columnTypesList);

List<String> groupPaths = ColumnProjectionUtils.getNestedColumnPaths(configuration);
Set<String> groupPaths = ColumnProjectionUtils.getNestedColumnPaths(configuration);
List<Integer> indexColumnsWanted = ColumnProjectionUtils.getReadColumnIDs(configuration);
if (!ColumnProjectionUtils.isReadAllColumns(configuration) && !indexColumnsWanted.isEmpty()) {
MessageType requestedSchemaByUser = getProjectedSchema(tableSchema, columnNamesList,
Expand Down
Expand Up @@ -72,14 +72,20 @@ public ArrayWritableObjectInspector(boolean isRoot,
final String name = fieldNames.get(i);
final TypeInfo fieldInfo = fieldInfos.get(i);

StructFieldImpl field;
if (prunedTypeInfo != null && prunedTypeInfo.getAllStructFieldNames().indexOf(name) >= 0) {
int adjustedIndex = prunedTypeInfo.getAllStructFieldNames().indexOf(name);
TypeInfo prunedFieldInfo = prunedTypeInfo.getAllStructFieldTypeInfos().get(adjustedIndex);
field = new StructFieldImpl(name, getObjectInspector(fieldInfo, prunedFieldInfo), i, adjustedIndex);
} else {
StructFieldImpl field = null;
if (prunedTypeInfo != null) {
for (int idx = 0; idx < prunedTypeInfo.getAllStructFieldNames().size(); ++idx) {
if (prunedTypeInfo.getAllStructFieldNames().get(idx).equalsIgnoreCase(name)) {
TypeInfo prunedFieldInfo = prunedTypeInfo.getAllStructFieldTypeInfos().get(idx);
field = new StructFieldImpl(name, getObjectInspector(fieldInfo, prunedFieldInfo), i, idx);
break;
}
}
}
if (field == null) {
field = new StructFieldImpl(name, getObjectInspector(fieldInfo, null), i, i);
}

fields.add(field);
fieldsByName.put(name.toLowerCase(), field);
}
Expand Down
Expand Up @@ -22,6 +22,7 @@

import com.google.common.base.Preconditions;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hive.ql.optimizer.FieldNode;
import org.apache.hadoop.hive.serde.serdeConstants;
import org.apache.hadoop.hive.serde2.AbstractSerDe;
import org.apache.hadoop.hive.serde2.ColumnProjectionUtils;
Expand Down Expand Up @@ -116,8 +117,9 @@ public final void initialize(final Configuration conf, final Properties tbl) thr
(StructTypeInfo) TypeInfoFactory.getStructTypeInfo(columnNames, columnTypes);
StructTypeInfo prunedTypeInfo = null;
if (conf != null) {
String prunedColumnPaths = conf.get(ColumnProjectionUtils.READ_NESTED_COLUMN_PATH_CONF_STR);
if (prunedColumnPaths != null) {
String rawPrunedColumnPaths = conf.get(ColumnProjectionUtils.READ_NESTED_COLUMN_PATH_CONF_STR);
if (rawPrunedColumnPaths != null) {
List<String> prunedColumnPaths = processRawPrunedPaths(rawPrunedColumnPaths);
prunedTypeInfo = pruneFromPaths(completeTypeInfo, prunedColumnPaths);
}
}
Expand Down Expand Up @@ -176,31 +178,44 @@ public SerDeStats getSerDeStats() {
return stats;
}

/**
* Given a list of raw pruned paths separated by ',', return a list of merged pruned paths.
* For instance, if the 'prunedPaths' is "s.a, s, s", this returns ["s"].
*/
private static List<String> processRawPrunedPaths(String prunedPaths) {
List<FieldNode> fieldNodes = new ArrayList<>();
for (String p : prunedPaths.split(",")) {
fieldNodes = FieldNode.mergeFieldNodes(fieldNodes, FieldNode.fromPath(p));
}
List<String> prunedPathList = new ArrayList<>();
for (FieldNode fn : fieldNodes) {
prunedPathList.addAll(fn.toPaths());
}
return prunedPathList;
}

/**
* Given a complete struct type info and pruned paths containing selected fields
* from the type info, return a pruned struct type info only with the selected fields.
*
* For instance, if 'originalTypeInfo' is: s:struct<a:struct<b:int, c:boolean>, d:string>
* and 'prunedPaths' is "s.a.b,s.d", then the result will be:
* and 'prunedPaths' is ["s.a.b,s.d"], then the result will be:
* s:struct<a:struct<b:int>, d:string>
*
* @param originalTypeInfo the complete struct type info
* @param prunedPaths a string representing the pruned paths, separated by ','
* @return the pruned struct type info
*/
private StructTypeInfo pruneFromPaths(
StructTypeInfo originalTypeInfo, String prunedPaths) {
private static StructTypeInfo pruneFromPaths(
StructTypeInfo originalTypeInfo, List<String> prunedPaths) {
PrunedStructTypeInfo prunedTypeInfo = new PrunedStructTypeInfo(originalTypeInfo);

String[] prunedPathList = prunedPaths.split(",");
for (String path : prunedPathList) {
for (String path : prunedPaths) {
pruneFromSinglePath(prunedTypeInfo, path);
}

return prunedTypeInfo.prune();
}

private void pruneFromSinglePath(PrunedStructTypeInfo prunedInfo, String path) {
private static void pruneFromSinglePath(PrunedStructTypeInfo prunedInfo, String path) {
Preconditions.checkArgument(prunedInfo != null,
"PrunedStructTypeInfo for path " + path + " should not be null");

Expand All @@ -212,7 +227,7 @@ private void pruneFromSinglePath(PrunedStructTypeInfo prunedInfo, String path) {
String fieldName = path.substring(0, index);
prunedInfo.markSelected(fieldName);
if (index < path.length()) {
pruneFromSinglePath(prunedInfo.children.get(fieldName), path.substring(index + 1));
pruneFromSinglePath(prunedInfo.getChild(fieldName), path.substring(index + 1));
}
}

Expand All @@ -228,16 +243,22 @@ private static class PrunedStructTypeInfo {
for (int i = 0; i < typeInfo.getAllStructFieldTypeInfos().size(); ++i) {
TypeInfo ti = typeInfo.getAllStructFieldTypeInfos().get(i);
if (ti.getCategory() == Category.STRUCT) {
this.children.put(typeInfo.getAllStructFieldNames().get(i),
this.children.put(typeInfo.getAllStructFieldNames().get(i).toLowerCase(),
new PrunedStructTypeInfo((StructTypeInfo) ti));
}
}
}

PrunedStructTypeInfo getChild(String fieldName) {
return children.get(fieldName.toLowerCase());
}

void markSelected(String fieldName) {
int index = typeInfo.getAllStructFieldNames().indexOf(fieldName);
if (index >= 0) {
selected[index] = true;
for (int i = 0; i < typeInfo.getAllStructFieldNames().size(); ++i) {
if (typeInfo.getAllStructFieldNames().get(i).equalsIgnoreCase(fieldName)) {
selected[i] = true;
break;
}
}
}

Expand All @@ -250,8 +271,8 @@ StructTypeInfo prune() {
String fn = oldNames.get(i);
if (selected[i]) {
newNames.add(fn);
if (children.containsKey(fn)) {
newTypes.add(children.get(fn).prune());
if (children.containsKey(fn.toLowerCase())) {
newTypes.add(children.get(fn.toLowerCase()).prune());
} else {
newTypes.add(oldTypes.get(i));
}
Expand Down

0 comments on commit a625bb0

Please sign in to comment.