Skip to content

Commit

Permalink
DRILL-7199: Optimize population of metadata for non-interesting columns
Browse files Browse the repository at this point in the history
closes #1771
  • Loading branch information
dvjyothsna authored and sohami committed May 4, 2019
1 parent 7e0e583 commit 99707a3
Show file tree
Hide file tree
Showing 8 changed files with 165 additions and 23 deletions.
Expand Up @@ -51,6 +51,7 @@
import org.apache.drill.metastore.ColumnStatisticsKind;
import org.apache.drill.metastore.FileMetadata;
import org.apache.drill.metastore.LocationProvider;
import org.apache.drill.metastore.NonInterestingColumnsMetadata;
import org.apache.drill.metastore.PartitionMetadata;
import org.apache.drill.metastore.TableMetadata;
import org.apache.drill.metastore.TableStatisticsKind;
Expand Down Expand Up @@ -86,6 +87,7 @@ public abstract class AbstractGroupScanWithMetadata extends AbstractFileGroupSca
// partition metadata info: mixed partition values for all partition keys in the same list
protected List<PartitionMetadata> partitions;

protected NonInterestingColumnsMetadata nonInterestingColumnsMetadata;
protected List<SchemaPath> partitionColumns;
protected LogicalExpression filter;
protected List<SchemaPath> columns;
Expand Down Expand Up @@ -115,7 +117,7 @@ protected AbstractGroupScanWithMetadata(AbstractGroupScanWithMetadata that) {
this.partitionColumns = that.partitionColumns;
this.partitions = that.partitions;
this.files = that.files;

this.nonInterestingColumnsMetadata = that.nonInterestingColumnsMetadata;
this.fileSet = that.fileSet == null ? null : new HashSet<>(that.fileSet);
}

Expand Down Expand Up @@ -151,18 +153,29 @@ public boolean isMatchAllMetadata() {
*/
@Override
public long getColumnValueCount(SchemaPath column) {
long tableRowCount = (long) TableStatisticsKind.ROW_COUNT.getValue(getTableMetadata());
long tableRowCount, colNulls;
Long nulls;
ColumnStatistics columnStats = getTableMetadata().getColumnStatistics(column);
long colNulls;
ColumnStatistics nonInterestingColStats = null;
if (columnStats == null) {
nonInterestingColStats = getNonInterestingColumnsMetadata().getColumnStatistics(column);
}

if (columnStats != null) {
Long nulls = (Long) columnStats.getStatistic(ColumnStatisticsKind.NULLS_COUNT);
colNulls = nulls != null ? nulls : Statistic.NO_COLUMN_STATS;
tableRowCount = (long) TableStatisticsKind.ROW_COUNT.getValue(getTableMetadata());
} else if (nonInterestingColStats != null) {
tableRowCount = (long) TableStatisticsKind.ROW_COUNT.getValue(getNonInterestingColumnsMetadata());
} else {
return 0;
return 0; // returns 0 if the column doesn't exist in the table.
}

columnStats = columnStats != null ? columnStats : nonInterestingColStats;
nulls = (Long) columnStats.getStatistic(ColumnStatisticsKind.NULLS_COUNT);
colNulls = nulls != null ? nulls : Statistic.NO_COLUMN_STATS;

return Statistic.NO_COLUMN_STATS == tableRowCount
|| Statistic.NO_COLUMN_STATS == colNulls
? Statistic.NO_COLUMN_STATS : tableRowCount - colNulls;
|| Statistic.NO_COLUMN_STATS == colNulls
? Statistic.NO_COLUMN_STATS : tableRowCount - colNulls;
}

@Override
Expand Down Expand Up @@ -266,6 +279,7 @@ public AbstractGroupScanWithMetadata applyFilter(LogicalExpression filterExpr, U
filteredMetadata.withTable(getTableMetadata())
.withPartitions(getNextOrEmpty(getPartitionsMetadata()))
.withFiles(filesMap)
.withNonInterestingColumns(getNonInterestingColumnsMetadata())
.withMatching(false);
}

Expand Down Expand Up @@ -387,6 +401,7 @@ public GroupScan applyLimit(int maxRecords) {
.withTable(getTableMetadata())
.withPartitions(getPartitionsMetadata())
.withFiles(filesMap)
.withNonInterestingColumns(getNonInterestingColumnsMetadata())
.withMatching(matchAllMetadata)
.build();
}
Expand Down Expand Up @@ -520,6 +535,14 @@ protected List<PartitionMetadata> getPartitionsMetadata() {
return partitions;
}

@JsonIgnore
public NonInterestingColumnsMetadata getNonInterestingColumnsMetadata() {
if (nonInterestingColumnsMetadata == null) {
nonInterestingColumnsMetadata = metadataProvider.getNonInterestingColumnsMeta();
}
return nonInterestingColumnsMetadata;
}

/**
* This class is responsible for filtering different metadata levels.
*/
Expand All @@ -531,6 +554,7 @@ protected abstract static class GroupScanWithMetadataFilterer {
protected TableMetadata tableMetadata;
protected List<PartitionMetadata> partitions = Collections.emptyList();
protected Map<Path, FileMetadata> files = Collections.emptyMap();
protected NonInterestingColumnsMetadata nonInterestingColumnsMetadata;

// for the case when filtering is possible for partitions, but files count exceeds
// PARQUET_ROWGROUP_FILTER_PUSHDOWN_PLANNING_THRESHOLD, new group scan with at least filtered partitions
Expand Down Expand Up @@ -558,6 +582,11 @@ public GroupScanWithMetadataFilterer withPartitions(List<PartitionMetadata> part
return this;
}

public GroupScanWithMetadataFilterer withNonInterestingColumns(NonInterestingColumnsMetadata nonInterestingColumns) {
this.nonInterestingColumnsMetadata = nonInterestingColumns;
return this;
}

public GroupScanWithMetadataFilterer withFiles(Map<Path, FileMetadata> files) {
this.files = files;
return this;
Expand Down Expand Up @@ -729,6 +758,9 @@ public <T extends BaseMetadata> List<T> filterAndGetMetadata(Set<SchemaPath> sch
locationProvider.getLocation(), source.supportsFileImplicitColumns());
}

if (source.getNonInterestingColumnsMetadata() != null) {
columnsStatistics.putAll(source.getNonInterestingColumnsMetadata().getColumnsStatistics());
}
RowsMatch match = FilterEvaluatorUtils.matches(filterPredicate,
columnsStatistics, (long) metadata.getStatistic(TableStatisticsKind.ROW_COUNT),
metadata.getSchema(), schemaPathsInExpr);
Expand Down
Expand Up @@ -26,6 +26,7 @@
import org.apache.drill.metastore.ColumnStatisticsImpl;
import org.apache.drill.metastore.FileMetadata;
import org.apache.drill.metastore.FileTableMetadata;
import org.apache.drill.metastore.NonInterestingColumnsMetadata;
import org.apache.drill.metastore.PartitionMetadata;
import org.apache.drill.metastore.TableMetadata;
import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -86,6 +87,11 @@ public List<FileMetadata> getFilesForPartition(PartitionMetadata partition) {
return null;
}

@Override
public NonInterestingColumnsMetadata getNonInterestingColumnsMeta() {
return null;
}

public static class Builder implements SimpleFileTableMetadataProviderBuilder {
private String tableName;
private Path location;
Expand Down
Expand Up @@ -290,6 +290,7 @@ public AbstractGroupScanWithMetadata applyFilter(LogicalExpression filterExpr, U
builder.withRowGroups(rowGroupsMap)
.withTable(getTableMetadata())
.withPartitions(getNextOrEmpty(getPartitionsMetadata()))
.withNonInterestingColumns(getNonInterestingColumnsMetadata())
.withFiles(filesMap)
.withMatching(false);
}
Expand Down Expand Up @@ -363,6 +364,7 @@ public GroupScan applyLimit(int maxRecords) {
.withTable(getTableMetadata())
.withPartitions(getPartitionsMetadata())
.withFiles(qualifiedFiles)
.withNonInterestingColumns(getNonInterestingColumnsMetadata())
.withMatching(matchAllMetadata)
.build();
}
Expand Down Expand Up @@ -500,6 +502,7 @@ public AbstractParquetGroupScan build() {
newScan.files = files;
newScan.rowGroups = rowGroups;
newScan.matchAllMetadata = matchAllMetadata;
newScan.nonInterestingColumnsMetadata = nonInterestingColumnsMetadata;
// since builder is used when pruning happens, entries and fileSet should be expanded
if (!newScan.getFilesMetadata().isEmpty()) {
newScan.entries = newScan.getFilesMetadata().keySet().stream()
Expand Down
Expand Up @@ -24,6 +24,7 @@
import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.metastore.BaseMetadata;
import org.apache.drill.metastore.ColumnStatisticsImpl;
import org.apache.drill.metastore.NonInterestingColumnsMetadata;
import org.apache.drill.metastore.StatisticsKind;
import org.apache.drill.metastore.TableMetadata;
import org.apache.drill.metastore.TableStatisticsKind;
Expand Down Expand Up @@ -88,6 +89,7 @@ public abstract class BaseParquetMetadataProvider implements ParquetMetadataProv
private TableMetadata tableMetadata;
private List<PartitionMetadata> partitions;
private Map<Path, FileMetadata> files;
private NonInterestingColumnsMetadata nonInterestingColumnsMetadata;

// whether metadata for row groups should be collected to create files, partitions and table metadata
private final boolean collectMetadata = false;
Expand Down Expand Up @@ -160,6 +162,7 @@ protected void init(BaseParquetMetadataProvider metadataProvider) throws IOExcep
TableMetadata tableMetadata = getTableMetadata();
getPartitionsMetadata();
getRowGroupsMeta();
getNonInterestingColumnsMeta();
this.tableMetadata = ParquetTableMetadataUtils.updateRowCount(tableMetadata, getRowGroupsMeta());
parquetTableMetadata = null;
}
Expand All @@ -178,9 +181,18 @@ public void initializeMetadata() throws IOException {
getFilesMetadata();
getPartitionsMetadata();
getRowGroupsMeta();
getNonInterestingColumnsMeta();
parquetTableMetadata = null;
}

@Override
public NonInterestingColumnsMetadata getNonInterestingColumnsMeta() {
if (nonInterestingColumnsMetadata == null) {
nonInterestingColumnsMetadata = ParquetTableMetadataUtils.getNonInterestingColumnsMeta(parquetTableMetadata);
}
return nonInterestingColumnsMetadata;
}

@Override
@SuppressWarnings("unchecked")
public TableMetadata getTableMetadata() {
Expand Down Expand Up @@ -235,7 +247,6 @@ public TableMetadata getTableMetadata() {
new ColumnStatisticsImpl(DrillStatsTable.getEstimatedColumnStats(statsTable, column),
ParquetTableMetadataUtils.getNaturalNullsFirstComparator()));
}
columnsStatistics.putAll(ParquetTableMetadataUtils.populateNonInterestingColumnsStats(columnsStatistics.keySet(), parquetTableMetadata));
}
tableMetadata = new FileTableMetadata(tableName, tableLocation, schema, columnsStatistics, tableStatistics,
-1L, "", partitionKeys);
Expand Down
Expand Up @@ -21,6 +21,7 @@
import org.apache.drill.exec.record.metadata.SchemaPathUtils;
import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.exec.store.parquet.metadata.MetadataBase;
import org.apache.drill.metastore.NonInterestingColumnsMetadata;
import org.apache.drill.metastore.RowGroupMetadata;
import org.apache.drill.metastore.TableStatisticsKind;
import org.apache.drill.exec.expr.FilterBuilder;
Expand Down Expand Up @@ -62,7 +63,14 @@ public static RowsMatch evalFilter(LogicalExpression expr, MetadataBase.ParquetT
expr.<Set<SchemaPath>, Void, RuntimeException>accept(new FieldReferenceFinder(), null));

RowGroupMetadata rowGroupMetadata = new ArrayList<>(ParquetTableMetadataUtils.getRowGroupsMetadata(footer).values()).get(rowGroupIndex);
NonInterestingColumnsMetadata nonInterestingColumnsMetadata = ParquetTableMetadataUtils.getNonInterestingColumnsMeta(footer);
Map<SchemaPath, ColumnStatistics> columnsStatistics = rowGroupMetadata.getColumnsStatistics();

// Add column statistics of non-interesting columns if there are any
if (nonInterestingColumnsMetadata != null) {
columnsStatistics.putAll(nonInterestingColumnsMetadata.getColumnsStatistics());
}

columnsStatistics = ParquetTableMetadataUtils.addImplicitColumnsStatistics(columnsStatistics,
schemaPathsInExpr, Collections.emptyList(), options, rowGroupMetadata.getLocation(), true);

Expand Down
Expand Up @@ -35,6 +35,7 @@
import org.apache.drill.metastore.ColumnStatisticsImpl;
import org.apache.drill.metastore.ColumnStatisticsKind;
import org.apache.drill.metastore.FileMetadata;
import org.apache.drill.metastore.NonInterestingColumnsMetadata;
import org.apache.drill.metastore.PartitionMetadata;
import org.apache.drill.metastore.RowGroupMetadata;
import org.apache.drill.metastore.StatisticsKind;
Expand Down Expand Up @@ -195,7 +196,6 @@ public static <T extends BaseMetadata> Map<SchemaPath, ColumnStatistics> mergeCo
}
columnsStatistics.put(column, new ColumnStatisticsImpl(statisticsMap, statisticsList.iterator().next().getValueComparator()));
}
columnsStatistics.putAll(populateNonInterestingColumnsStats(columnsStatistics.keySet(), parquetTableMetadata));
return columnsStatistics;
}

Expand Down Expand Up @@ -287,27 +287,27 @@ public static Map<SchemaPath, ColumnStatistics> getRowGroupColumnStatistics(
statistics.put(ColumnStatisticsKind.NULLS_COUNT, nulls);
columnsStatistics.put(colPath, new ColumnStatisticsImpl(statistics, comparator));
}
columnsStatistics.putAll(populateNonInterestingColumnsStats(columnsStatistics.keySet(), tableMetadata));
return columnsStatistics;
}

/**
* Populates the non-interesting column's statistics
* @param schemaPaths columns paths which should be ignored
* Returns the non-interesting column's metadata
* @param parquetTableMetadata the source of column metadata for non-interesting column's statistics
* @return returns non-interesting column statistics map
* @return returns non-interesting columns metadata
*/
@SuppressWarnings("unchecked")
public static Map<SchemaPath, ColumnStatistics> populateNonInterestingColumnsStats(
Set<SchemaPath> schemaPaths, MetadataBase.ParquetTableMetadataBase parquetTableMetadata) {
public static NonInterestingColumnsMetadata getNonInterestingColumnsMeta(MetadataBase.ParquetTableMetadataBase parquetTableMetadata) {
Map<SchemaPath, ColumnStatistics> columnsStatistics = new HashMap<>();
if (parquetTableMetadata instanceof Metadata_V4.ParquetTableMetadata_v4) {
ConcurrentHashMap<Metadata_V4.ColumnTypeMetadata_v4.Key, Metadata_V4.ColumnTypeMetadata_v4 > columnTypeInfoMap =
((Metadata_V4.ParquetTableMetadata_v4) parquetTableMetadata).getColumnTypeInfoMap();
if ( columnTypeInfoMap == null ) { return columnsStatistics; } // in some cases for runtime pruning
ConcurrentHashMap<Metadata_V4.ColumnTypeMetadata_v4.Key, Metadata_V4.ColumnTypeMetadata_v4> columnTypeInfoMap =
((Metadata_V4.ParquetTableMetadata_v4) parquetTableMetadata).getColumnTypeInfoMap();

if (columnTypeInfoMap == null) {
return new NonInterestingColumnsMetadata(columnsStatistics);
} // in some cases for runtime pruning

for (Metadata_V4.ColumnTypeMetadata_v4 columnTypeMetadata : columnTypeInfoMap.values()) {
SchemaPath schemaPath = SchemaPath.getCompoundPath(columnTypeMetadata.name);
if (!schemaPaths.contains(schemaPath)) {
if (!columnTypeMetadata.isInteresting) {
SchemaPath schemaPath = SchemaPath.getCompoundPath(columnTypeMetadata.name);
Map<StatisticsKind, Object> statistics = new HashMap<>();
statistics.put(ColumnStatisticsKind.NULLS_COUNT, Statistic.NO_COLUMN_STATS);
PrimitiveType.PrimitiveTypeName primitiveType = columnTypeMetadata.primitiveType;
Expand All @@ -316,8 +316,9 @@ public static Map<SchemaPath, ColumnStatistics> populateNonInterestingColumnsSta
columnsStatistics.put(schemaPath, new ColumnStatisticsImpl<>(statistics, comparator));
}
}
return new NonInterestingColumnsMetadata(columnsStatistics);
}
return columnsStatistics;
return new NonInterestingColumnsMetadata(columnsStatistics);
}

/**
Expand Down
Expand Up @@ -19,6 +19,7 @@

import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.metastore.FileMetadata;
import org.apache.drill.metastore.NonInterestingColumnsMetadata;
import org.apache.drill.metastore.PartitionMetadata;
import org.apache.drill.metastore.TableMetadata;
import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -85,4 +86,10 @@ public interface TableMetadataProvider {
* @return list of {@link FileMetadata} instances which belongs to specified partitions
*/
List<FileMetadata> getFilesForPartition(PartitionMetadata partition);

/**
* Returns {@link NonInterestingColumnsMetadata} instance which provides metadata for non-interesting columns.
* @return {@link NonInterestingColumnsMetadata} instance
*/
NonInterestingColumnsMetadata getNonInterestingColumnsMeta();
}

0 comments on commit 99707a3

Please sign in to comment.