Skip to content

Commit

Permalink
DRILL-7089: Implement caching for TableMetadataProvider at query leve…
Browse files Browse the repository at this point in the history
…l and adapt statistics to use Drill metastore API

closes #1728
  • Loading branch information
vvysotskyi authored and sohami committed Apr 10, 2019
1 parent 579ebca commit 455cee7
Show file tree
Hide file tree
Showing 50 changed files with 1,528 additions and 506 deletions.
Expand Up @@ -49,17 +49,17 @@ public HiveParquetTableMetadataProvider(List<ReadEntryWithPath> entries,
HivePartitionHolder hivePartitionHolder,
HiveStoragePlugin hiveStoragePlugin,
ParquetReaderConfig readerConfig) throws IOException {
super(entries, readerConfig, null, null);
super(entries, readerConfig);
this.hiveStoragePlugin = hiveStoragePlugin;
this.hivePartitionHolder = hivePartitionHolder;

init();
init(null);
}

public HiveParquetTableMetadataProvider(HiveStoragePlugin hiveStoragePlugin,
List<HiveMetadataProvider.LogicalInputSplit> logicalInputSplits,
ParquetReaderConfig readerConfig) throws IOException {
super(readerConfig, null, null, null);
super(null, readerConfig);
this.hiveStoragePlugin = hiveStoragePlugin;
this.hivePartitionHolder = new HivePartitionHolder();

Expand All @@ -81,7 +81,7 @@ public HiveParquetTableMetadataProvider(HiveStoragePlugin hiveStoragePlugin,
hivePartitionHolder.add(pathString, partition.getValues());
}
}
init();
init(null);
}

public HivePartitionHolder getHivePartitionHolder() {
Expand Down
Expand Up @@ -246,10 +246,10 @@ private ColumnStatistics<T> evalCastFunc(FunctionHolderExpression holderExpr, Co
}

public static class MinMaxStatistics<V> implements ColumnStatistics<V> {
private V minVal;
private V maxVal;
private final V minVal;
private final V maxVal;
private final Comparator<V> valueComparator;
private long nullsCount;
private Comparator<V> valueComparator;

public MinMaxStatistics(V minVal, V maxVal, Comparator<V> valueComparator) {
this.minVal = minVal;
Expand Down Expand Up @@ -283,6 +283,11 @@ public boolean containsStatistic(StatisticsKind statisticsKind) {
}
}

@Override
public boolean containsExactStatistics(StatisticsKind statisticsKind) {
return true;
}

@Override
public Comparator<V> getValueComparator() {
return valueComparator;
Expand Down
Expand Up @@ -22,6 +22,7 @@
import java.util.Iterator;
import java.util.List;

import org.apache.drill.metastore.TableMetadata;
import org.apache.drill.shaded.guava.com.google.common.collect.Lists;

import org.apache.drill.common.expression.LogicalExpression;
Expand Down Expand Up @@ -190,4 +191,14 @@ public LogicalExpression getFilter() {
public GroupScan applyFilter(LogicalExpression filterExpr, UdfUtilities udfUtilities, FunctionImplementationRegistry functionImplementationRegistry, OptionManager optionManager) {
return null;
}

@Override
public TableMetadataProvider getMetadataProvider() {
return null;
}

@Override
public TableMetadata getTableMetadata() {
return null;
}
}
Expand Up @@ -38,6 +38,7 @@
import org.apache.drill.exec.record.metadata.ColumnMetadata;
import org.apache.drill.exec.record.metadata.SchemaPathUtils;
import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.exec.record.metadata.TupleSchema;
import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.exec.store.ColumnExplorer;
import org.apache.drill.exec.store.dfs.FileSelection;
Expand Down Expand Up @@ -181,6 +182,11 @@ public LogicalExpression getFilter() {
return filter;
}

@Override
public TableMetadataProvider getMetadataProvider() {
return metadataProvider;
}

public void setFilter(LogicalExpression filter) {
this.filter = filter;
}
Expand Down Expand Up @@ -280,7 +286,7 @@ public FilterPredicate getFilterPredicate(LogicalExpression filterExpr,
FunctionImplementationRegistry functionImplementationRegistry,
OptionManager optionManager,
boolean omitUnsupportedExprs) {
TupleMetadata types = getColumnMetadata();
TupleMetadata types = getSchema();
if (types == null) {
throw new UnsupportedOperationException("At least one schema source should be available.");
}
Expand Down Expand Up @@ -311,8 +317,14 @@ public FilterPredicate getFilterPredicate(LogicalExpression filterExpr,
return FilterBuilder.buildFilterPredicate(materializedFilter, constantBoundaries, udfUtilities, omitUnsupportedExprs);
}

protected TupleMetadata getColumnMetadata() {
return getTableMetadata().getSchema().copy();
@JsonIgnore
public TupleSchema getSchema() {
// creates a copy of TupleMetadata from tableMetadata
TupleSchema tuple = new TupleSchema();
for (ColumnMetadata md : getTableMetadata().getSchema()) {
tuple.addColumn(md.copy());
}
return tuple;
}

// limit push down methods start
Expand Down Expand Up @@ -365,14 +377,14 @@ public GroupScan applyLimit(int maxRecords) {
protected static <T extends BaseMetadata & LocationProvider> Map<Path, T> pruneForPartitions(Map<Path, T> metadataToPrune, List<PartitionMetadata> filteredPartitionMetadata) {
Map<Path, T> prunedFiles = new LinkedHashMap<>();
if (metadataToPrune != null) {
for (Map.Entry<Path, T> entry : metadataToPrune.entrySet()) {
metadataToPrune.forEach((path, metadata) -> {
for (PartitionMetadata filteredPartition : filteredPartitionMetadata) {
if (filteredPartition.getLocations().contains(entry.getKey())) {
prunedFiles.put(entry.getKey(), entry.getValue());
if (filteredPartition.getLocations().contains(path)) {
prunedFiles.put(path, metadata);
break;
}
}
}
});
}

return prunedFiles;
Expand Down Expand Up @@ -468,7 +480,8 @@ protected Map<Path, FileMetadata> getFilesMetadata() {
return files;
}

protected TableMetadata getTableMetadata() {
@Override
public TableMetadata getTableMetadata() {
if (tableMetadata == null) {
tableMetadata = metadataProvider.getTableMetadata();
}
Expand Down
@@ -0,0 +1,79 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.physical.base;

import org.apache.drill.exec.planner.common.DrillStatsTable;
import org.apache.drill.exec.record.metadata.schema.SchemaProvider;
import org.apache.drill.exec.store.parquet.ParquetTableMetadataProviderImpl;

/**
* Implementation of {@link MetadataProviderManager} which uses file system providers and returns
* builders for file system based {@link TableMetadataProvider} instances.
*/
public class FileSystemMetadataProviderManager implements MetadataProviderManager {

private SchemaProvider schemaProvider;
private DrillStatsTable statsProvider;

private TableMetadataProvider tableMetadataProvider;

public static MetadataProviderManager getMetadataProviderManager() {
return new FileSystemMetadataProviderManager();
}

@Override
public void setSchemaProvider(SchemaProvider schemaProvider) {
this.schemaProvider = schemaProvider;
}

@Override
public void setStatsProvider(DrillStatsTable statsProvider) {
this.statsProvider = statsProvider;
}

@Override
public DrillStatsTable getStatsProvider() {
return statsProvider;
}

@Override
public SchemaProvider getSchemaProvider() {
return schemaProvider;
}

@Override
public void setTableMetadataProvider(TableMetadataProvider tableMetadataProvider) {
this.tableMetadataProvider = tableMetadataProvider;
}

@Override
public TableMetadataProvider getTableMetadataProvider() {
return tableMetadataProvider;
}

@Override
public TableMetadataProviderBuilder builder(MetadataProviderKind kind) {
switch (kind) {
case PARQUET_TABLE:
return new ParquetTableMetadataProviderImpl.Builder(this);
case SCHEMA_STATS_ONLY:
return new SimpleFileTableMetadataProvider.Builder(this);
}
return null;
}
}
Expand Up @@ -29,6 +29,7 @@
import org.apache.drill.exec.planner.physical.PlannerSettings;
import org.apache.drill.exec.proto.CoordinationProtos.DrillbitEndpoint;
import org.apache.drill.exec.server.options.OptionManager;
import org.apache.drill.metastore.TableMetadata;
import org.apache.drill.shaded.guava.com.google.common.collect.ImmutableList;
import org.apache.hadoop.fs.Path;

Expand Down Expand Up @@ -154,4 +155,14 @@ public interface GroupScan extends Scan, HasAffinity {
GroupScan applyFilter(LogicalExpression filterExpr, UdfUtilities udfUtilities,
FunctionImplementationRegistry functionImplementationRegistry, OptionManager optionManager);

/**
* Returns {@link TableMetadataProvider} instance which is used for providing metadata for current {@link GroupScan}.
*
* @return {@link TableMetadataProvider} instance the source of metadata
*/
@JsonIgnore
TableMetadataProvider getMetadataProvider();

@JsonIgnore
TableMetadata getTableMetadata();
}
@@ -0,0 +1,59 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.drill.exec.physical.base;

import org.apache.drill.exec.planner.common.DrillStatsTable;
import org.apache.drill.exec.record.metadata.schema.SchemaProvider;

/**
* Base interface for passing and obtaining {@link SchemaProvider}, {@link DrillStatsTable} and
* {@link TableMetadataProvider}, responsible for creating required
* {@link TableMetadataProviderBuilder} which constructs required {@link TableMetadataProvider}
* based on specified providers
*/
public interface MetadataProviderManager {

DrillStatsTable getStatsProvider();

void setStatsProvider(DrillStatsTable statsProvider);

SchemaProvider getSchemaProvider();

void setSchemaProvider(SchemaProvider schemaProvider);

TableMetadataProvider getTableMetadataProvider();

void setTableMetadataProvider(TableMetadataProvider tableMetadataProvider);

/**
* Returns builder responsible for constructing required {@link TableMetadataProvider} instances
* based on specified providers.
*
* @param kind kind of {@link TableMetadataProvider} whose builder should be obtained
* @return builder responsible for constructing required {@link TableMetadataProvider}
*/
TableMetadataProviderBuilder builder(MetadataProviderKind kind);

/**
* Kinds of {@link TableMetadataProvider} whose builder should be obtained.
*/
enum MetadataProviderKind {
PARQUET_TABLE,
SCHEMA_STATS_ONLY
}
}
Expand Up @@ -46,6 +46,13 @@ public interface ParquetMetadataProvider extends TableMetadataProvider {
*/
List<RowGroupMetadata> getRowGroupsMeta();

/**
* Returns list of file paths which belong to current table.
*
* @return list of file paths
*/
List<Path> getLocations();

/**
* Returns multimap of {@link RowGroupMetadata} instances which provides metadata for specific row group and its columns mapped to their locations.
*
Expand Down

0 comments on commit 455cee7

Please sign in to comment.