Skip to content

Commit

Permalink
Refactor FilteredManifest and ManifestGroup (#735)
Browse files Browse the repository at this point in the history
  • Loading branch information
rdsr authored and rdblue committed Jan 27, 2020
1 parent cc9d996 commit c73b657
Show file tree
Hide file tree
Showing 6 changed files with 117 additions and 124 deletions.
78 changes: 38 additions & 40 deletions core/src/main/java/org/apache/iceberg/FilteredManifest.java
Expand Up @@ -20,7 +20,6 @@
package org.apache.iceberg;

import com.google.common.base.Preconditions;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.io.IOException;
Expand Down Expand Up @@ -95,59 +94,33 @@ CloseableIterable<ManifestEntry> allEntries() {
Evaluator evaluator = evaluator();
InclusiveMetricsEvaluator metricsEvaluator = metricsEvaluator();

return CloseableIterable.filter(reader.entries(projection(fileSchema, columns, caseSensitive)),
// ensure stats columns are present for metrics evaluation
boolean requireStatsProjection = requireStatsProjection();
Collection<String> projectColumns = requireStatsProjection ? withStatsColumns(columns) : columns;

return CloseableIterable.filter(
reader.entries(projection(fileSchema, projectColumns, caseSensitive)),
entry -> entry != null &&
evaluator.eval(entry.file().partition()) &&
metricsEvaluator.eval(entry.file()));

} else {
return reader.entries(projection(fileSchema, columns, caseSensitive));
}
}

CloseableIterable<ManifestEntry> liveEntries() {
if ((rowFilter != null && rowFilter != Expressions.alwaysTrue()) ||
(partFilter != null && partFilter != Expressions.alwaysTrue())) {
Evaluator evaluator = evaluator();
InclusiveMetricsEvaluator metricsEvaluator = metricsEvaluator();

return CloseableIterable.filter(reader.entries(projection(fileSchema, columns, caseSensitive)),
entry -> entry != null &&
entry.status() != Status.DELETED &&
evaluator.eval(entry.file().partition()) &&
metricsEvaluator.eval(entry.file()));

} else {
return CloseableIterable.filter(reader.entries(projection(fileSchema, columns, caseSensitive)),
entry -> entry != null && entry.status() != Status.DELETED);
}
return CloseableIterable.filter(allEntries(), entry -> entry != null && entry.status() != Status.DELETED);
}

/**
* @return an Iterator of DataFile. Makes defensive copies of files before returning
*/
@Override
public Iterator<DataFile> iterator() {
if ((rowFilter != null && rowFilter != Expressions.alwaysTrue()) ||
(partFilter != null && partFilter != Expressions.alwaysTrue())) {
Evaluator evaluator = evaluator();
InclusiveMetricsEvaluator metricsEvaluator = metricsEvaluator();

// ensure stats columns are present for metrics evaluation
List<String> projectColumns = Lists.newArrayList(columns);
projectColumns.addAll(STATS_COLUMNS); // order doesn't matter

// if no stats columns were projected, drop them
boolean dropStats = Sets.intersection(Sets.newHashSet(columns), STATS_COLUMNS).isEmpty();

return Iterators.transform(
Iterators.filter(reader.iterator(partFilter, projection(fileSchema, projectColumns, caseSensitive)),
input -> input != null &&
evaluator.eval(input.partition()) &&
metricsEvaluator.eval(input)),
dropStats ? DataFile::copyWithoutStats : DataFile::copy);

if (dropStats()) {
return CloseableIterable.transform(liveEntries(), e -> e.file().copyWithoutStats()).iterator();
} else {
return Iterators.transform(
reader.iterator(partFilter, projection(fileSchema, columns, caseSensitive)),
DataFile::copy);
return CloseableIterable.transform(liveEntries(), e -> e.file().copy()).iterator();
}
}

Expand Down Expand Up @@ -193,4 +166,29 @@ private InclusiveMetricsEvaluator metricsEvaluator() {
}
return lazyMetricsEvaluator;
}

private boolean requireStatsProjection() {
// Make sure we have all stats columns for metrics evaluator
return rowFilter != Expressions.alwaysTrue() &&
!columns.containsAll(ManifestReader.ALL_COLUMNS) &&
!columns.containsAll(STATS_COLUMNS);
}

private boolean dropStats() {
// Make sure we only drop all stats if we had projected all stats
// We do not drop stats even if we had partially added some stats columns
return rowFilter != Expressions.alwaysTrue() &&
!columns.containsAll(ManifestReader.ALL_COLUMNS) &&
Sets.intersection(Sets.newHashSet(columns), STATS_COLUMNS).isEmpty();
}

private static Collection<String> withStatsColumns(Collection<String> columns) {
if (columns.containsAll(ManifestReader.ALL_COLUMNS)) {
return columns;
} else {
List<String> projectColumns = Lists.newArrayList(columns);
projectColumns.addAll(STATS_COLUMNS); // order doesn't matter
return projectColumns;
}
}
}
4 changes: 2 additions & 2 deletions core/src/main/java/org/apache/iceberg/FindFiles.java
Expand Up @@ -197,8 +197,8 @@ public CloseableIterable<DataFile> collect() {
}

// when snapshot is not null
CloseableIterable<ManifestEntry> entries = new ManifestGroup(ops.io(), snapshot.manifests(),
ops.current().specsById())
CloseableIterable<ManifestEntry> entries = new ManifestGroup(ops.io(), snapshot.manifests())
.specsById(ops.current().specsById())
.filterData(rowFilter)
.filterFiles(fileFilter)
.filterPartitions(partitionFilter)
Expand Down
127 changes: 48 additions & 79 deletions core/src/main/java/org/apache/iceberg/ManifestGroup.java
Expand Up @@ -25,7 +25,6 @@
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -43,103 +42,65 @@ class ManifestGroup {

private final FileIO io;
private final Set<ManifestFile> manifests;
private final Map<Integer, PartitionSpec> specsById;
private final Expression dataFilter;
private final Expression fileFilter;
private final Expression partitionFilter;
private final boolean ignoreDeleted;
private final boolean ignoreExisting;
private final List<String> columns;
private final boolean caseSensitive;

private final LoadingCache<Integer, ManifestEvaluator> evalCache;
private Map<Integer, PartitionSpec> specsById;
private Expression dataFilter;
private Expression fileFilter;
private Expression partitionFilter;
private boolean ignoreDeleted;
private boolean ignoreExisting;
private List<String> columns;
private boolean caseSensitive;

ManifestGroup(FileIO io, Iterable<ManifestFile> manifests) {
this(io, manifests, null);
}

ManifestGroup(FileIO io, Iterable<ManifestFile> manifests, Map<Integer, PartitionSpec> specsById) {
this(io, Sets.newHashSet(manifests), specsById, Expressions.alwaysTrue(), Expressions.alwaysTrue(),
Expressions.alwaysTrue(), false, false, ImmutableList.of("*"), true);
}

private ManifestGroup(FileIO io, Set<ManifestFile> manifests, Map<Integer, PartitionSpec> specsById,
Expression dataFilter, Expression fileFilter, Expression partitionFilter,
boolean ignoreDeleted, boolean ignoreExisting, List<String> columns,
boolean caseSensitive) {
this.io = io;
this.manifests = manifests;
this.specsById = specsById;
this.dataFilter = dataFilter;
this.fileFilter = fileFilter;
this.partitionFilter = partitionFilter;
this.ignoreDeleted = ignoreDeleted;
this.ignoreExisting = ignoreExisting;
this.columns = columns;
this.caseSensitive = caseSensitive;
if (specsById == null) {
this.evalCache = null;
} else {
this.evalCache = Caffeine.newBuilder().build(specId -> {
PartitionSpec spec = specsById.get(specId);
return ManifestEvaluator.forPartitionFilter(
Expressions.and(partitionFilter, Projections.inclusive(spec).project(dataFilter)),
spec, caseSensitive);
});
}
this.manifests = Sets.newHashSet(manifests);
this.dataFilter = Expressions.alwaysTrue();
this.fileFilter = Expressions.alwaysTrue();
this.partitionFilter = Expressions.alwaysTrue();
this.ignoreDeleted = false;
this.ignoreExisting = false;
this.columns = ImmutableList.of("*");
this.caseSensitive = true;
}

public ManifestGroup caseSensitive(boolean filterCaseSensitive) {
return new ManifestGroup(io, manifests, specsById, dataFilter, fileFilter, partitionFilter,
ignoreDeleted, ignoreExisting, columns, filterCaseSensitive);
ManifestGroup specsById(Map<Integer, PartitionSpec> newSpecsById) {
this.specsById = newSpecsById;
return this;
}

public ManifestGroup filterData(Expression expr) {
return new ManifestGroup(
io, manifests, specsById, Expressions.and(dataFilter, expr), fileFilter, partitionFilter,
ignoreDeleted, ignoreExisting, columns, caseSensitive);
ManifestGroup filterData(Expression newDataFilter) {
this.dataFilter = Expressions.and(dataFilter, newDataFilter);
return this;
}

public ManifestGroup filterFiles(Expression expr) {
return new ManifestGroup(
io, manifests, specsById, dataFilter, Expressions.and(fileFilter, expr), partitionFilter,
ignoreDeleted, ignoreExisting, columns, caseSensitive);
ManifestGroup filterFiles(Expression newFileFilter) {
this.fileFilter = Expressions.and(fileFilter, newFileFilter);
return this;
}

public ManifestGroup filterPartitions(Expression expr) {
return new ManifestGroup(
io, manifests, specsById, dataFilter, fileFilter, Expressions.and(partitionFilter, expr),
ignoreDeleted, ignoreExisting, columns, caseSensitive);
ManifestGroup filterPartitions(Expression newPartitionFilter) {
this.partitionFilter = Expressions.and(partitionFilter, newPartitionFilter);
return this;
}

public ManifestGroup ignoreDeleted() {
return new ManifestGroup(io, manifests, specsById, dataFilter, fileFilter, partitionFilter, true,
ignoreExisting, columns, caseSensitive);
ManifestGroup ignoreDeleted() {
this.ignoreDeleted = true;
return this;
}

public ManifestGroup ignoreDeleted(boolean shouldIgnoreDeleted) {
return new ManifestGroup(io, manifests, specsById, dataFilter, fileFilter, partitionFilter,
shouldIgnoreDeleted, ignoreExisting, columns, caseSensitive);
ManifestGroup ignoreExisting() {
this.ignoreExisting = true;
return this;
}

public ManifestGroup ignoreExisting() {
return new ManifestGroup(io, manifests, specsById, dataFilter, fileFilter, partitionFilter,
ignoreDeleted, true, columns, caseSensitive);
ManifestGroup select(List<String> newColumns) {
this.columns = Lists.newArrayList(newColumns);
return this;
}

public ManifestGroup ignoreExisting(boolean shouldIgnoreExisting) {
return new ManifestGroup(io, manifests, specsById, dataFilter, fileFilter, partitionFilter,
ignoreDeleted, shouldIgnoreExisting, columns, caseSensitive);
}

public ManifestGroup select(List<String> columnNames) {
return new ManifestGroup(
io, manifests, specsById, dataFilter, fileFilter, partitionFilter, ignoreDeleted, ignoreExisting,
Lists.newArrayList(columnNames), caseSensitive);
}

public ManifestGroup select(String... columnNames) {
return select(Arrays.asList(columnNames));
ManifestGroup caseSensitive(boolean newCaseSensitive) {
this.caseSensitive = newCaseSensitive;
return this;
}

/**
Expand All @@ -151,6 +112,14 @@ public ManifestGroup select(String... columnNames) {
* @return a CloseableIterable of manifest entries.
*/
public CloseableIterable<ManifestEntry> entries() {
LoadingCache<Integer, ManifestEvaluator> evalCache = specsById == null ?
null : Caffeine.newBuilder().build(specId -> {
PartitionSpec spec = specsById.get(specId);
return ManifestEvaluator.forPartitionFilter(
Expressions.and(partitionFilter, Projections.inclusive(spec).project(dataFilter)),
spec, caseSensitive);
});

Evaluator evaluator = new Evaluator(DataFile.getType(EMPTY_STRUCT), fileFilter, caseSensitive);

Iterable<ManifestFile> matchingManifests = evalCache == null ? manifests : Iterables.filter(manifests,
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/java/org/apache/iceberg/ManifestReader.java
Expand Up @@ -226,11 +226,11 @@ CloseableIterable<ManifestEntry> entries(Schema fileProjection) {

@Override
public Iterator<DataFile> iterator() {
return iterator(alwaysTrue(), fileSchema);
return iterator(fileSchema);
}

// visible for use by PartialManifest
Iterator<DataFile> iterator(Expression partFilter, Schema fileProjection) {
Iterator<DataFile> iterator(Schema fileProjection) {
return Iterables.transform(Iterables.filter(
entries(fileProjection),
entry -> entry.status() != ManifestEntry.Status.DELETED),
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/java/org/apache/iceberg/ScanSummary.java
Expand Up @@ -218,7 +218,8 @@ private Map<String, PartitionMetrics> computeTopPartitionMetrics(
TopN<String, PartitionMetrics> topN = new TopN<>(
limit, throwIfLimited, Comparators.charSequences());

try (CloseableIterable<ManifestEntry> entries = new ManifestGroup(ops.io(), manifests, ops.current().specsById())
try (CloseableIterable<ManifestEntry> entries = new ManifestGroup(ops.io(), manifests)
.specsById(ops.current().specsById())
.filterData(rowFilter)
.ignoreDeleted()
.select(SCAN_SUMMARY_COLUMNS)
Expand Down
25 changes: 25 additions & 0 deletions core/src/test/java/org/apache/iceberg/TestFindFiles.java
Expand Up @@ -19,11 +19,14 @@

package org.apache.iceberg;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import java.util.Arrays;
import java.util.Set;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.types.Conversions;
import org.apache.iceberg.types.Types;
import org.junit.Assert;
import org.junit.Test;

Expand Down Expand Up @@ -56,6 +59,28 @@ public void testWithMetadataMatching() {
Assert.assertEquals(pathSet(FILE_A), pathSet(files));
}

@Test
public void testWithRecordsMatching() {
table.newAppend()
.appendFile(DataFiles.builder(SPEC)
.withInputFile(Files.localInput("/path/to/data-e.parquet"))
.withPartitionPath("data_bucket=4")
.withMetrics(new Metrics(3L,
null, // no column sizes
ImmutableMap.of(1, 3L), // value count
ImmutableMap.of(1, 0L), // null count
ImmutableMap.of(1, Conversions.toByteBuffer(Types.IntegerType.get(), 1)), // lower bounds
ImmutableMap.of(1, Conversions.toByteBuffer(Types.IntegerType.get(), 5)))) // lower bounds
.build())
.commit();

final Iterable<DataFile> files = FindFiles.in(table)
.withRecordsMatching(Expressions.equal("id", 1))
.collect();

Assert.assertEquals(Sets.newHashSet("/path/to/data-e.parquet"), pathSet(files));
}

@Test
public void testInPartition() {
table.newAppend()
Expand Down

0 comments on commit c73b657

Please sign in to comment.