Skip to content

Commit

Permalink
DRILL-7098: File Metadata Metastore Plugin
Browse files Browse the repository at this point in the history
  • Loading branch information
vdiravka committed Apr 17, 2019
1 parent 494c206 commit 0358d65
Show file tree
Hide file tree
Showing 39 changed files with 253 additions and 108 deletions.
5 changes: 5 additions & 0 deletions exec/java-exec/pom.xml
Expand Up @@ -286,6 +286,11 @@
<artifactId>vector</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>org.apache.drill.metastore</groupId>
<artifactId>drill-metastore-api</artifactId>
<version>${project.version}</version>
</dependency>
<dependency>
<groupId>com.beust</groupId>
<artifactId>jcommander</artifactId>
Expand Down
Expand Up @@ -18,7 +18,7 @@
package org.apache.drill.exec.expr;

import org.apache.drill.exec.expr.stat.RowsMatch;
import org.apache.drill.exec.physical.base.GroupScan;
import org.apache.drill.exec.physical.impl.statistics.Statistic;
import org.apache.drill.metastore.ColumnStatistics;
import org.apache.drill.metastore.ColumnStatisticsKind;
import org.apache.drill.shaded.guava.com.google.common.base.Preconditions;
Expand Down Expand Up @@ -76,7 +76,7 @@ static boolean isNullOrEmpty(ColumnStatistics stat) {
|| !stat.containsStatistic(ColumnStatisticsKind.MIN_VALUE)
|| !stat.containsStatistic(ColumnStatisticsKind.MAX_VALUE)
|| !stat.containsStatistic(ColumnStatisticsKind.NULLS_COUNT)
|| (long) stat.getStatistic(ColumnStatisticsKind.NULLS_COUNT) == GroupScan.NO_COLUMN_STATS;
|| (long) stat.getStatistic(ColumnStatisticsKind.NULLS_COUNT) == Statistic.NO_COLUMN_STATS;
}

/**
Expand Down
Expand Up @@ -33,6 +33,7 @@
import org.apache.drill.exec.expr.fn.FunctionImplementationRegistry;
import org.apache.drill.exec.expr.stat.RowsMatch;
import org.apache.drill.exec.ops.UdfUtilities;
import org.apache.drill.exec.physical.impl.statistics.Statistic;
import org.apache.drill.exec.planner.physical.PlannerSettings;
import org.apache.drill.exec.record.MaterializedField;
import org.apache.drill.exec.record.metadata.ColumnMetadata;
Expand Down Expand Up @@ -151,13 +152,13 @@ public long getColumnValueCount(SchemaPath column) {
long colNulls;
if (columnStats != null) {
Long nulls = (Long) columnStats.getStatistic(ColumnStatisticsKind.NULLS_COUNT);
colNulls = nulls != null ? nulls : GroupScan.NO_COLUMN_STATS;
colNulls = nulls != null ? nulls : Statistic.NO_COLUMN_STATS;
} else {
return 0;
}
return GroupScan.NO_COLUMN_STATS == tableRowCount
|| GroupScan.NO_COLUMN_STATS == colNulls
? GroupScan.NO_COLUMN_STATS : tableRowCount - colNulls;
return Statistic.NO_COLUMN_STATS == tableRowCount
|| Statistic.NO_COLUMN_STATS == colNulls
? Statistic.NO_COLUMN_STATS : tableRowCount - colNulls;
}

@Override
Expand Down Expand Up @@ -339,7 +340,7 @@ public GroupScan applyLimit(int maxRecords) {
GroupScanWithMetadataFilterer prunedMetadata = getFilterer();
if (getTableMetadata() != null) {
long tableRowCount = (long) TableStatisticsKind.ROW_COUNT.getValue(getTableMetadata());
if (tableRowCount == NO_COLUMN_STATS || tableRowCount <= maxRecords) {
if (tableRowCount == Statistic.NO_COLUMN_STATS || tableRowCount <= maxRecords) {
logger.debug("limit push down does not apply, since total number of rows [{}] is less or equal to the required [{}].",
tableRowCount, maxRecords);
return null;
Expand Down Expand Up @@ -404,7 +405,7 @@ protected <T extends BaseMetadata> List<T> limitMetadata(Collection<T> metadataL
int currentRowCount = 0;
for (T metadata : metadataList) {
long rowCount = (long) TableStatisticsKind.ROW_COUNT.getValue(metadata);
if (rowCount == NO_COLUMN_STATS) {
if (rowCount == Statistic.NO_COLUMN_STATS) {
return null;
} else if (currentRowCount + rowCount <= maxRecords) {
currentRowCount += rowCount;
Expand Down
Expand Up @@ -46,8 +46,6 @@ public interface GroupScan extends Scan, HasAffinity {
*/
List<SchemaPath> ALL_COLUMNS = ImmutableList.of(SchemaPath.STAR_COLUMN);

long NO_COLUMN_STATS = -1;

void applyAssignments(List<DrillbitEndpoint> endpoints) throws PhysicalOperatorSetupException;

SubScan getSpecificScan(int minorFragmentId) throws ExecutionSetupException;
Expand Down
Expand Up @@ -124,7 +124,7 @@ public SimpleFileTableMetadataProviderBuilder withSchema(TupleMetadata schema) {

@Override
@SuppressWarnings("unchecked")
public TableMetadataProvider build() throws IOException {
public TableMetadataProvider build() {
SchemaProvider schemaProvider = metadataProviderManager.getSchemaProvider();
TableMetadataProvider source = metadataProviderManager.getTableMetadataProvider();
if (source == null) {
Expand Down
Expand Up @@ -19,7 +19,7 @@

import org.apache.drill.exec.vector.complex.MapVector;

public abstract class AbstractMergedStatistic extends Statistic implements MergedStatistic {
public abstract class AbstractMergedStatistic implements MergedStatistic, Statistic {
protected String name;
protected String inputName;
protected double samplePercent;
Expand Down
Expand Up @@ -31,8 +31,8 @@
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.logical.FormatPluginConfig;

import org.apache.drill.exec.physical.base.GroupScan;
import org.apache.drill.exec.physical.base.ScanStats;
import org.apache.drill.exec.physical.impl.statistics.Statistic;
import org.apache.drill.exec.planner.common.CountToDirectScanUtils;
import org.apache.drill.exec.planner.common.DrillRelOptUtil;

Expand Down Expand Up @@ -288,7 +288,7 @@ private Map<String, Long> collectCounts(PlannerSettings settings, Metadata_V4.Me

Metadata_V4.ColumnTypeMetadata_v4 columnMetadata = metadataSummary.getColumnTypeInfo(new Metadata_V4.ColumnTypeMetadata_v4.Key(simplePath));

if (columnMetadata == null || columnMetadata.totalNullCount == GroupScan.NO_COLUMN_STATS) {
if (columnMetadata == null || columnMetadata.totalNullCount == Statistic.NO_COLUMN_STATS) {
// if column stats is not available don't apply this rule, return empty counts
return ImmutableMap.of();
} else {
Expand Down
Expand Up @@ -33,6 +33,7 @@
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.exec.physical.base.GroupScan;
import org.apache.drill.exec.physical.base.ScanStats;
import org.apache.drill.exec.physical.impl.statistics.Statistic;
import org.apache.drill.exec.planner.logical.DrillAggregateRel;
import org.apache.drill.exec.planner.logical.DrillProjectRel;
import org.apache.drill.exec.planner.logical.DrillScanRel;
Expand Down Expand Up @@ -202,7 +203,7 @@ private Map<String, Long> collectCounts(PlannerSettings settings, DrillAggregate
}

cnt = oldGrpScan.getColumnValueCount(simplePath);
if (cnt == GroupScan.NO_COLUMN_STATS) {
if (cnt == Statistic.NO_COLUMN_STATS) {
// if column stats is not available don't apply this rule, return empty counts
return ImmutableMap.of();
}
Expand Down
Expand Up @@ -64,8 +64,7 @@ public MapColumnMetadata(MapColumnMetadata from) {
mapSchema = (TupleSchema) from.mapSchema.copy();
}

public MapColumnMetadata(String name, DataMode mode,
TupleSchema mapSchema) {
public MapColumnMetadata(String name, DataMode mode, TupleSchema mapSchema) {
super(name, MinorType.MAP, mode);
if (mapSchema == null) {
this.mapSchema = new TupleSchema();
Expand Down
Expand Up @@ -19,6 +19,10 @@

import java.util.List;

import org.apache.drill.common.exceptions.DrillRuntimeException;
import org.apache.drill.common.expression.PathSegment;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.common.types.TypeProtos.DataMode;
import org.apache.drill.common.types.TypeProtos.MajorType;
import org.apache.drill.common.types.TypeProtos.MinorType;
Expand Down Expand Up @@ -203,4 +207,37 @@ private static ColumnMetadata newDecimal(String name, MinorType type, DataMode m
.build();
return new PrimitiveColumnMetadata(field);
}

/**
* Adds column with specified schema path and type into specified {@code TupleMetadata schema}.
*
* @param schema tuple schema where column should be added
* @param schemaPath schema path of the column which should be added
* @param type type of the column which should be added
*/
public static void addColumnMetadata(TupleMetadata schema, SchemaPath schemaPath, TypeProtos.MajorType type) {
PathSegment.NameSegment colPath = schemaPath.getUnIndexed().getRootSegment();
ColumnMetadata colMetadata;

while (!colPath.isLastPath()) {
colMetadata = schema.metadata(colPath.getPath());
if (colMetadata == null) {
colMetadata = MetadataUtils.newMap(colPath.getPath(), null);
schema.addColumn(colMetadata);
}
if (!colMetadata.isMap()) {
throw new DrillRuntimeException(String.format("Expected map, but was %s", colMetadata.majorType()));
}

schema = colMetadata.mapSchema();
colPath = (PathSegment.NameSegment) colPath.getChild();
}

colMetadata = schema.metadata(colPath.getPath());
if (colMetadata == null) {
schema.addColumn(new PrimitiveColumnMetadata(MaterializedField.create(colPath.getPath(), type)));
} else if (!colMetadata.majorType().equals(type)) {
throw new DrillRuntimeException(String.format("Types mismatch: existing type: %s, new type: %s", colMetadata.majorType(), type));
}
}
}
Expand Up @@ -24,6 +24,7 @@
import org.apache.drill.common.expression.ExpressionStringBuilder;
import org.apache.drill.exec.physical.base.AbstractGroupScanWithMetadata;
import org.apache.drill.exec.physical.base.ParquetMetadataProvider;
import org.apache.drill.exec.physical.impl.statistics.Statistic;
import org.apache.drill.metastore.BaseMetadata;
import org.apache.drill.metastore.LocationProvider;
import org.apache.drill.metastore.PartitionMetadata;
Expand Down Expand Up @@ -319,7 +320,7 @@ public GroupScan applyLimit(int maxRecords) {
maxRecords = Math.max(maxRecords, 1); // Make sure it request at least 1 row -> 1 rowGroup.
if (getTableMetadata() != null) {
long tableRowCount = (long) TableStatisticsKind.ROW_COUNT.getValue(getTableMetadata());
if (tableRowCount == NO_COLUMN_STATS || tableRowCount <= maxRecords) {
if (tableRowCount == Statistic.NO_COLUMN_STATS || tableRowCount <= maxRecords) {
logger.debug("limit push down does not apply, since total number of rows [{}] is less or equal to the required [{}].",
tableRowCount, maxRecords);
return null;
Expand Down
Expand Up @@ -17,9 +17,10 @@
*/
package org.apache.drill.exec.store.parquet;

import org.apache.drill.exec.physical.base.GroupScan;
import org.apache.drill.exec.physical.base.ParquetMetadataProvider;
import org.apache.drill.exec.physical.impl.statistics.Statistic;
import org.apache.drill.exec.planner.common.DrillStatsTable;
import org.apache.drill.exec.record.metadata.MetadataUtils;
import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.metastore.BaseMetadata;
import org.apache.drill.metastore.ColumnStatisticsImpl;
Expand Down Expand Up @@ -190,12 +191,12 @@ public TableMetadata getTableMetadata() {

if (this.schema == null) {
schema = new TupleSchema();
fields.forEach((schemaPath, majorType) -> SchemaPathUtils.addColumnMetadata(schema, schemaPath, majorType));
fields.forEach((schemaPath, majorType) -> MetadataUtils.addColumnMetadata(schema, schemaPath, majorType));
} else {
// merges specified schema with schema from table
fields.forEach((schemaPath, majorType) -> {
if (SchemaPathUtils.getColumnMetadata(schemaPath, schema) == null) {
SchemaPathUtils.addColumnMetadata(schema, schemaPath, majorType);
MetadataUtils.addColumnMetadata(schema, schemaPath, majorType);
}
});
}
Expand Down Expand Up @@ -306,8 +307,8 @@ public List<PartitionMetadata> getPartitionsMetadata() {
statistics.put(ColumnStatisticsKind.MIN_VALUE, partitionKey);
statistics.put(ColumnStatisticsKind.MAX_VALUE, partitionKey);

statistics.put(ColumnStatisticsKind.NULLS_COUNT, GroupScan.NO_COLUMN_STATS);
statistics.put(TableStatisticsKind.ROW_COUNT, GroupScan.NO_COLUMN_STATS);
statistics.put(ColumnStatisticsKind.NULLS_COUNT, Statistic.NO_COLUMN_STATS);
statistics.put(TableStatisticsKind.ROW_COUNT, Statistic.NO_COLUMN_STATS);
columnsStatistics.put(partitionColumn,
new ColumnStatisticsImpl<>(statistics,
ParquetTableMetadataUtils.getComparator(getParquetGroupScanStatistics().getTypeForColumn(partitionColumn).getMinorType())));
Expand Down
Expand Up @@ -20,7 +20,7 @@
import org.apache.commons.lang3.mutable.MutableLong;
import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.exec.physical.base.GroupScan;
import org.apache.drill.exec.physical.impl.statistics.Statistic;
import org.apache.drill.exec.record.metadata.ColumnMetadata;
import org.apache.drill.exec.record.metadata.SchemaPathUtils;
import org.apache.drill.metastore.BaseMetadata;
Expand Down Expand Up @@ -109,10 +109,10 @@ public void collect(List<T> metadataList) {
previousCount = emptyCount;
}
Long nullsNum = (Long) statistics.getStatistic(ColumnStatisticsKind.NULLS_COUNT);
if (previousCount.longValue() != GroupScan.NO_COLUMN_STATS && nullsNum != null && nullsNum != GroupScan.NO_COLUMN_STATS) {
if (previousCount.longValue() != Statistic.NO_COLUMN_STATS && nullsNum != null && nullsNum != Statistic.NO_COLUMN_STATS) {
previousCount.add(localRowCount - nullsNum);
} else {
previousCount.setValue(GroupScan.NO_COLUMN_STATS);
previousCount.setValue(Statistic.NO_COLUMN_STATS);
}
ColumnMetadata columnMetadata = SchemaPathUtils.getColumnMetadata(schemaPath, metadata.getSchema());
TypeProtos.MajorType majorType = columnMetadata != null ? columnMetadata.majorType() : null;
Expand Down Expand Up @@ -207,7 +207,7 @@ private boolean hasSingleValue(ColumnStatistics columnStatistics, long rowCount)

private boolean isSingleVal(ColumnStatistics columnStatistics, long rowCount) {
Long numNulls = (Long) columnStatistics.getStatistic(ColumnStatisticsKind.NULLS_COUNT);
if (numNulls != null && numNulls != GroupScan.NO_COLUMN_STATS) {
if (numNulls != null && numNulls != Statistic.NO_COLUMN_STATS) {
Object min = columnStatistics.getStatistic(ColumnStatisticsKind.MIN_VALUE);
Object max = columnStatistics.getStatistic(ColumnStatisticsKind.MAX_VALUE);
if (min != null) {
Expand Down
Expand Up @@ -19,8 +19,8 @@

import org.apache.drill.common.expression.SchemaPath;
import org.apache.drill.common.types.TypeProtos;
import org.apache.drill.exec.physical.base.GroupScan;
import org.apache.drill.exec.record.metadata.SchemaPathUtils;
import org.apache.drill.exec.physical.impl.statistics.Statistic;
import org.apache.drill.exec.record.metadata.MetadataUtils;
import org.apache.drill.exec.record.metadata.TupleMetadata;
import org.apache.drill.exec.record.metadata.TupleSchema;
import org.apache.drill.exec.resolver.TypeCastRules;
Expand Down Expand Up @@ -154,7 +154,7 @@ public static RowGroupMetadata getRowGroupMetadata(MetadataBase.ParquetTableMeta
Map<SchemaPath, TypeProtos.MajorType> columns = getRowGroupFields(tableMetadata, rowGroupMetadata);

TupleSchema schema = new TupleSchema();
columns.forEach((schemaPath, majorType) -> SchemaPathUtils.addColumnMetadata(schema, schemaPath, majorType));
columns.forEach((schemaPath, majorType) -> MetadataUtils.addColumnMetadata(schema, schemaPath, majorType));

return new RowGroupMetadata(
schema, columnsStatistics, rowGroupStatistics, rowGroupMetadata.getHostAffinity(), rgIndexInFile, location);
Expand Down Expand Up @@ -274,7 +274,7 @@ private static Map<SchemaPath, ColumnStatistics> getRowGroupColumnStatistics(

Long nulls = column.getNulls();
if (!column.isNumNullsSet() || nulls == null) {
nulls = GroupScan.NO_COLUMN_STATS;
nulls = Statistic.NO_COLUMN_STATS;
}
PrimitiveType.PrimitiveTypeName primitiveType = getPrimitiveTypeName(tableMetadata, column);
OriginalType originalType = getOriginalType(tableMetadata, column);
Expand Down Expand Up @@ -306,7 +306,7 @@ public static Map<SchemaPath, ColumnStatistics> populateNonInterestingColumnsSta
SchemaPath schemaPath = SchemaPath.getCompoundPath(columnTypeMetadata.name);
if (!schemaPaths.contains(schemaPath)) {
Map<StatisticsKind, Object> statistics = new HashMap<>();
statistics.put(ColumnStatisticsKind.NULLS_COUNT, GroupScan.NO_COLUMN_STATS);
statistics.put(ColumnStatisticsKind.NULLS_COUNT, Statistic.NO_COLUMN_STATS);
PrimitiveType.PrimitiveTypeName primitiveType = columnTypeMetadata.primitiveType;
OriginalType originalType = columnTypeMetadata.originalType;
Comparator comparator = getComparator(primitiveType, originalType);
Expand Down
Expand Up @@ -17,7 +17,6 @@
*/
package org.apache.drill.exec.store.parquet.metadata;

import org.apache.drill.shaded.guava.com.google.common.collect.Lists;
import org.apache.drill.common.util.DrillVersionInfo;
import org.apache.hadoop.fs.Path;

Expand All @@ -44,10 +43,9 @@ public class MetadataPathUtils {
*/
public static List<Path> convertToAbsolutePaths(List<Path> paths, String baseDir) {
if (!paths.isEmpty()) {
List<Path> absolutePaths = Lists.newArrayList();
List<Path> absolutePaths = new ArrayList<>();
for (Path relativePath : paths) {
Path absolutePath = (relativePath.isAbsolute()) ? relativePath
: new Path(baseDir, relativePath);
Path absolutePath = (relativePath.isAbsolute()) ? relativePath : new Path(baseDir, relativePath);
absolutePaths.add(absolutePath);
}
return absolutePaths;
Expand All @@ -62,9 +60,10 @@ public static List<Path> convertToAbsolutePaths(List<Path> paths, String baseDir
* @param baseDir base parent directory
* @return list of files with absolute paths
*/
public static List<? extends ParquetFileMetadata> convertToFilesWithAbsolutePaths(List<? extends ParquetFileMetadata> files, String baseDir) {
public static List<? extends ParquetFileMetadata> convertToFilesWithAbsolutePaths(
List<? extends ParquetFileMetadata> files, String baseDir) {
if (!files.isEmpty()) {
List<ParquetFileMetadata> filesWithAbsolutePaths = Lists.newArrayList();
List<ParquetFileMetadata> filesWithAbsolutePaths = new ArrayList<>();
for (ParquetFileMetadata file : files) {
Path relativePath = file.getPath();
ParquetFileMetadata fileWithAbsolutePath = null;
Expand Down Expand Up @@ -97,7 +96,7 @@ public static ParquetTableMetadata_v4 createMetadataWithRelativePaths(
for (Path directory : tableMetadataWithAbsolutePaths.getDirectories()) {
directoriesWithRelativePaths.add(relativize(baseDir, directory));
}
List<ParquetFileMetadata_v4> filesWithRelativePaths = Lists.newArrayList();
List<ParquetFileMetadata_v4> filesWithRelativePaths = new ArrayList<>();
for (ParquetFileMetadata_v4 file : (List<ParquetFileMetadata_v4>) tableMetadataWithAbsolutePaths.getFiles()) {
filesWithRelativePaths.add(new ParquetFileMetadata_v4(
relativize(baseDir, file.getPath()), file.length, file.rowGroups));
Expand Down

0 comments on commit 0358d65

Please sign in to comment.