Skip to content

Commit

Permalink
DRILL-5797: Use Parquet new reader on all non-complex columns queries
Browse files Browse the repository at this point in the history
  • Loading branch information
okalinin authored and arina-ielchiieva committed Jul 11, 2018
1 parent e79db14 commit a321bf0
Show file tree
Hide file tree
Showing 7 changed files with 391 additions and 36 deletions.
Expand Up @@ -25,22 +25,21 @@
import org.apache.drill.exec.ops.ExecutorFragmentContext; import org.apache.drill.exec.ops.ExecutorFragmentContext;
import org.apache.drill.exec.ops.OperatorContext; import org.apache.drill.exec.ops.OperatorContext;
import org.apache.drill.exec.physical.impl.ScanBatch; import org.apache.drill.exec.physical.impl.ScanBatch;
import org.apache.drill.exec.proto.helper.QueryIdHelper;
import org.apache.drill.exec.server.options.OptionManager; import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.store.ColumnExplorer; import org.apache.drill.exec.store.ColumnExplorer;
import org.apache.drill.exec.store.RecordReader; import org.apache.drill.exec.store.RecordReader;
import org.apache.drill.exec.store.dfs.DrillFileSystem; import org.apache.drill.exec.store.dfs.DrillFileSystem;
import org.apache.drill.exec.store.parquet.ParquetReaderUtility;
import org.apache.drill.exec.store.parquet.columnreaders.ParquetRecordReader; import org.apache.drill.exec.store.parquet.columnreaders.ParquetRecordReader;
import org.apache.drill.exec.store.parquet2.DrillParquetReader; import org.apache.drill.exec.store.parquet2.DrillParquetReader;
import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.Path; import org.apache.hadoop.fs.Path;
import org.apache.parquet.ParquetReadOptions; import org.apache.parquet.ParquetReadOptions;
import org.apache.parquet.column.ColumnDescriptor;
import org.apache.parquet.hadoop.CodecFactory; import org.apache.parquet.hadoop.CodecFactory;
import org.apache.parquet.hadoop.ParquetFileReader; import org.apache.parquet.hadoop.ParquetFileReader;
import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.hadoop.util.HadoopInputFile; import org.apache.parquet.hadoop.util.HadoopInputFile;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.Type;


import java.io.IOException; import java.io.IOException;
import java.util.ArrayList; import java.util.ArrayList;
Expand Down Expand Up @@ -107,7 +106,10 @@ protected ScanBatch getBatch(ExecutorFragmentContext context, AbstractParquetRow
ParquetReaderUtility.detectCorruptDates(footer, rowGroupScan.getColumns(), autoCorrectCorruptDates); ParquetReaderUtility.detectCorruptDates(footer, rowGroupScan.getColumns(), autoCorrectCorruptDates);
logger.debug("Contains corrupt dates: {}", containsCorruptDates); logger.debug("Contains corrupt dates: {}", containsCorruptDates);


if (!context.getOptions().getBoolean(ExecConstants.PARQUET_NEW_RECORD_READER) && !isComplex(footer)) { if (!context.getOptions().getBoolean(ExecConstants.PARQUET_NEW_RECORD_READER)
&& !ParquetReaderUtility.containsComplexColumn(footer, rowGroupScan.getColumns())) {
logger.debug("Query {} qualifies for new Parquet reader",
QueryIdHelper.getQueryId(oContext.getFragmentContext().getHandle().getQueryId()));
readers.add(new ParquetRecordReader(context, readers.add(new ParquetRecordReader(context,
rowGroup.getPath(), rowGroup.getPath(),
rowGroup.getRowGroupIndex(), rowGroup.getRowGroupIndex(),
Expand All @@ -118,6 +120,8 @@ protected ScanBatch getBatch(ExecutorFragmentContext context, AbstractParquetRow
rowGroupScan.getColumns(), rowGroupScan.getColumns(),
containsCorruptDates)); containsCorruptDates));
} else { } else {
logger.debug("Query {} doesn't qualify for new reader, using old one",
QueryIdHelper.getQueryId(oContext.getFragmentContext().getHandle().getQueryId()));
readers.add(new DrillParquetReader(context, readers.add(new DrillParquetReader(context,
footer, footer,
rowGroup, rowGroup,
Expand Down Expand Up @@ -161,22 +165,6 @@ private ParquetMetadata readFooter(Configuration conf, String path) throws IOExc
} }
} }


private boolean isComplex(ParquetMetadata footer) {
MessageType schema = footer.getFileMetaData().getSchema();

for (Type type : schema.getFields()) {
if (!type.isPrimitive()) {
return true;
}
}
for (ColumnDescriptor col : schema.getColumns()) {
if (col.getMaxRepetitionLevel() > 0) {
return true;
}
}
return false;
}

/** /**
* Helper class responsible for creating and managing DrillFileSystem. * Helper class responsible for creating and managing DrillFileSystem.
*/ */
Expand All @@ -190,5 +178,4 @@ protected AbstractDrillFileSystemManager(OperatorContext operatorContext) {


protected abstract DrillFileSystem get(Configuration config, String path) throws ExecutionSetupException; protected abstract DrillFileSystem get(Configuration config, String path) throws ExecutionSetupException;
} }

} }
Expand Up @@ -20,7 +20,6 @@
import com.google.common.collect.Sets; import com.google.common.collect.Sets;
import org.apache.commons.codec.binary.Base64; import org.apache.commons.codec.binary.Base64;
import org.apache.drill.common.exceptions.UserException; import org.apache.drill.common.exceptions.UserException;
import org.apache.drill.common.expression.PathSegment;
import org.apache.drill.common.expression.SchemaPath; import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.types.TypeProtos; import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.Types; import org.apache.drill.common.types.Types;
Expand All @@ -41,8 +40,10 @@
import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData; import org.apache.parquet.hadoop.metadata.ColumnChunkMetaData;
import org.apache.parquet.hadoop.metadata.ColumnPath; import org.apache.parquet.hadoop.metadata.ColumnPath;
import org.apache.parquet.hadoop.metadata.ParquetMetadata; import org.apache.parquet.hadoop.metadata.ParquetMetadata;
import org.apache.parquet.schema.MessageType;
import org.apache.parquet.schema.OriginalType; import org.apache.parquet.schema.OriginalType;
import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName; import org.apache.parquet.schema.PrimitiveType.PrimitiveTypeName;
import org.apache.parquet.schema.Type;
import org.joda.time.Chronology; import org.joda.time.Chronology;
import org.joda.time.DateTimeConstants; import org.joda.time.DateTimeConstants;
import org.apache.parquet.example.data.simple.NanoTime; import org.apache.parquet.example.data.simple.NanoTime;
Expand All @@ -51,6 +52,7 @@


import java.util.Arrays; import java.util.Arrays;
import java.util.HashMap; import java.util.HashMap;
import java.util.Iterator;
import java.util.List; import java.util.List;
import java.util.Map; import java.util.Map;
import java.util.Set; import java.util.Set;
Expand Down Expand Up @@ -140,15 +142,90 @@ public static int getIntFromLEBytes(byte[] input, int start) {
return out; return out;
} }


/**
* Map full schema paths in format `a`.`b`.`c` to respective SchemaElement objects.
*
* @param footer Parquet file metadata
* @return schema full path to SchemaElement map
*/
public static Map<String, SchemaElement> getColNameToSchemaElementMapping(ParquetMetadata footer) { public static Map<String, SchemaElement> getColNameToSchemaElementMapping(ParquetMetadata footer) {
HashMap<String, SchemaElement> schemaElements = new HashMap<>(); Map<String, SchemaElement> schemaElements = new HashMap<>();
FileMetaData fileMetaData = new ParquetMetadataConverter().toParquetMetadata(ParquetFileWriter.CURRENT_VERSION, footer); FileMetaData fileMetaData = new ParquetMetadataConverter().toParquetMetadata(ParquetFileWriter.CURRENT_VERSION, footer);
for (SchemaElement se : fileMetaData.getSchema()) {
schemaElements.put(se.getName(), se); Iterator<SchemaElement> iter = fileMetaData.getSchema().iterator();

// First element in collection is default `root` element. We skip it to maintain key in `a` format instead of `root`.`a`,
// and thus to avoid the need to cut it out again when comparing with SchemaPath string representation
if (iter.hasNext()) {
iter.next();
}
while (iter.hasNext()) {
addSchemaElementMapping(iter, new StringBuilder(), schemaElements);
} }
return schemaElements; return schemaElements;
} }


/**
* Populate full path to SchemaElement map by recursively traversing schema elements referenced by the given iterator
*
* @param iter file schema values iterator
* @param path parent schema element path
* @param schemaElements schema elements map to insert next iterator element into
*/
private static void addSchemaElementMapping(Iterator<SchemaElement> iter, StringBuilder path,
Map<String, SchemaElement> schemaElements) {
SchemaElement schemaElement = iter.next();
path.append('`').append(schemaElement.getName().toLowerCase()).append('`');
schemaElements.put(path.toString(), schemaElement);

// for each element that has children we need to maintain remaining children count
// to exit current recursion level when no more children is left
int remainingChildren = schemaElement.getNum_children();

while (remainingChildren > 0 && iter.hasNext()) {
addSchemaElementMapping(iter, new StringBuilder(path).append('.'), schemaElements);
remainingChildren--;
}
return;
}

/**
* generate full path of the column in format `a`.`b`.`c`
*
* @param column ColumnDescriptor object
* @return full path in format `a`.`b`.`c`
*/
public static String getFullColumnPath(ColumnDescriptor column) {
StringBuilder sb = new StringBuilder();
String[] path = column.getPath();
for (int i = 0; i < path.length; i++) {
sb.append("`").append(path[i].toLowerCase()).append("`").append(".");
}

// remove trailing dot
if (sb.length() > 0) {
sb.deleteCharAt(sb.length() - 1);
}

return sb.toString();
}

/**
* Map full column paths to all ColumnDescriptors in file schema
*
* @param footer Parquet file metadata
* @return column full path to ColumnDescriptor object map
*/
public static Map<String, ColumnDescriptor> getColNameToColumnDescriptorMapping(ParquetMetadata footer) {
Map<String, ColumnDescriptor> colDescMap = new HashMap<>();
List<ColumnDescriptor> columns = footer.getFileMetaData().getSchema().getColumns();

for (ColumnDescriptor column : columns) {
colDescMap.put(getFullColumnPath(column), column);
}
return colDescMap;
}

public static int autoCorrectCorruptedDate(int corruptedDate) { public static int autoCorrectCorruptedDate(int corruptedDate) {
return (int) (corruptedDate - CORRECT_CORRUPT_DATE_SHIFT); return (int) (corruptedDate - CORRECT_CORRUPT_DATE_SHIFT);
} }
Expand Down Expand Up @@ -361,7 +438,6 @@ else if (Boolean.valueOf(isDateCorrect)) {
} }
} }



/** /**
* Detect corrupt date values by looking at the min/max values in the metadata. * Detect corrupt date values by looking at the min/max values in the metadata.
* *
Expand Down Expand Up @@ -401,9 +477,9 @@ public static DateCorruptionStatus checkForCorruptDateValuesInStatistics(Parquet
// creating a NameSegment makes sure we are using the standard code for comparing names, // creating a NameSegment makes sure we are using the standard code for comparing names,
// currently it is all case-insensitive // currently it is all case-insensitive
if (Utilities.isStarQuery(columns) if (Utilities.isStarQuery(columns)
|| new PathSegment.NameSegment(column.getPath()[0]).equals(schemaPath.getRootSegment())) { || getFullColumnPath(column).equalsIgnoreCase(schemaPath.getUnIndexed().toString())) {
int colIndex = -1; int colIndex = -1;
ConvertedType convertedType = schemaElements.get(column.getPath()[0]).getConverted_type(); ConvertedType convertedType = schemaElements.get(getFullColumnPath(column)).getConverted_type();
if (convertedType != null && convertedType.equals(ConvertedType.DATE)) { if (convertedType != null && convertedType.equals(ConvertedType.DATE)) {
List<ColumnChunkMetaData> colChunkList = footer.getBlocks().get(rowGroupIndex).getColumns(); List<ColumnChunkMetaData> colChunkList = footer.getBlocks().get(rowGroupIndex).getColumns();
for (int j = 0; j < colChunkList.size(); j++) { for (int j = 0; j < colChunkList.size(); j++) {
Expand Down Expand Up @@ -525,4 +601,57 @@ public static TypeProtos.MajorType getType(PrimitiveTypeName type, OriginalType
} }
} }


/**
* Check whether any of columns in the given list is either nested or repetitive.
*
* @param footer Parquet file schema
* @param columns list of query SchemaPath objects
*/
public static boolean containsComplexColumn(ParquetMetadata footer, List<SchemaPath> columns) {

MessageType schema = footer.getFileMetaData().getSchema();

if (Utilities.isStarQuery(columns)) {
for (Type type : schema.getFields()) {
if (!type.isPrimitive()) {
return true;
}
}
for (ColumnDescriptor col : schema.getColumns()) {
if (col.getMaxRepetitionLevel() > 0) {
return true;
}
}
return false;
} else {
Map<String, ColumnDescriptor> colDescMap = ParquetReaderUtility.getColNameToColumnDescriptorMapping(footer);
Map<String, SchemaElement> schemaElements = ParquetReaderUtility.getColNameToSchemaElementMapping(footer);

for (SchemaPath schemaPath : columns) {
// Schema path which is non-leaf is complex column
if (!schemaPath.isLeaf()) {
logger.trace("rowGroupScan contains complex column: {}", schemaPath.getUnIndexed().toString());
return true;
}

// following column descriptor lookup failure may mean two cases, depending on subsequent SchemaElement lookup:
// 1. success: queried column is complex, i.e. GroupType
// 2. failure: queried column is not in schema and thus is non-complex
ColumnDescriptor column = colDescMap.get(schemaPath.getUnIndexed().toString().toLowerCase());

if (column == null) {
SchemaElement schemaElement = schemaElements.get(schemaPath.getUnIndexed().toString().toLowerCase());
if (schemaElement != null) {
return true;
}
} else {
if (column.getMaxRepetitionLevel() > 0) {
logger.trace("rowGroupScan contains repetitive column: {}", schemaPath.getUnIndexed().toString());
return true;
}
}
}
}
return false;
}
} }
Expand Up @@ -29,6 +29,7 @@
import org.apache.drill.exec.physical.impl.OutputMutator; import org.apache.drill.exec.physical.impl.OutputMutator;
import org.apache.drill.exec.record.MaterializedField; import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.server.options.OptionManager; import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.store.parquet.ParquetReaderUtility;
import org.apache.drill.exec.vector.ValueVector; import org.apache.drill.exec.vector.ValueVector;
import org.apache.drill.exec.vector.complex.RepeatedValueVector; import org.apache.drill.exec.vector.complex.RepeatedValueVector;
import org.apache.parquet.column.ColumnDescriptor; import org.apache.parquet.column.ColumnDescriptor;
Expand Down Expand Up @@ -56,8 +57,8 @@ public ParquetColumnMetadata(ColumnDescriptor column) {
} }


public void resolveDrillType(Map<String, SchemaElement> schemaElements, OptionManager options) { public void resolveDrillType(Map<String, SchemaElement> schemaElements, OptionManager options) {
se = schemaElements.get(column.getPath()[0]); se = schemaElements.get(ParquetReaderUtility.getFullColumnPath(column));
type = ParquetToDrillTypeConverter.toMajorType(column.getType(), se.getType_length(), type = ParquetToDrillTypeConverter.toMajorType(column.getType(), column.getTypeLength(),
getDataMode(column), se, options); getDataMode(column), se, options);
field = MaterializedField.create(toFieldName(column.getPath()).getLastSegment().getNameSegment().getPath(), type); field = MaterializedField.create(toFieldName(column.getPath()).getLastSegment().getNameSegment().getPath(), type);
length = getDataTypeLength(); length = getDataTypeLength();
Expand Down
Expand Up @@ -130,7 +130,7 @@ private void loadParquetSchema() {
for (ColumnDescriptor column : footer.getFileMetaData().getSchema().getColumns()) { for (ColumnDescriptor column : footer.getFileMetaData().getSchema().getColumns()) {
ParquetColumnMetadata columnMetadata = new ParquetColumnMetadata(column); ParquetColumnMetadata columnMetadata = new ParquetColumnMetadata(column);
columnMetadata.resolveDrillType(schemaElements, options); columnMetadata.resolveDrillType(schemaElements, options);
if (! fieldSelected(columnMetadata.field)) { if (! columnSelected(column)) {
continue; continue;
} }
selectedColumnMetadata.add(columnMetadata); selectedColumnMetadata.add(columnMetadata);
Expand Down Expand Up @@ -174,23 +174,22 @@ public BlockMetaData getRowGroupMetadata() {
} }


/** /**
* Determine if a Parquet field is selected for the query. It is selected * Determine if a Parquet column is selected for the query. It is selected
* either if this is a star query (we want all columns), or the column * either if this is a star query (we want all columns), or the column
* appears in the select list. * appears in the select list.
* *
* @param field the Parquet column expressed as as Drill field. * @param column the Parquet column expressed as column descriptor
* @return true if the column is to be included in the scan, false * @return true if the column is to be included in the scan, false
* if not * if not
*/ */

private boolean columnSelected(ColumnDescriptor column) {
private boolean fieldSelected(MaterializedField field) {
if (isStarQuery()) { if (isStarQuery()) {
return true; return true;
} }


int i = 0; int i = 0;
for (SchemaPath expr : selectedCols) { for (SchemaPath expr : selectedCols) {
if (field.getName().equalsIgnoreCase(expr.getRootSegmentPath())) { if (ParquetReaderUtility.getFullColumnPath(column).equalsIgnoreCase(expr.getUnIndexed().toString())) {
columnsFound[i] = true; columnsFound[i] = true;
return true; return true;
} }
Expand Down

0 comments on commit a321bf0

Please sign in to comment.