Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor FilteredManifest and ManifestGroup #735

Merged
merged 5 commits into from Jan 27, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
78 changes: 38 additions & 40 deletions core/src/main/java/org/apache/iceberg/FilteredManifest.java
Expand Up @@ -20,7 +20,6 @@
package org.apache.iceberg;

import com.google.common.base.Preconditions;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.io.IOException;
Expand Down Expand Up @@ -95,59 +94,33 @@ CloseableIterable<ManifestEntry> allEntries() {
Evaluator evaluator = evaluator();
InclusiveMetricsEvaluator metricsEvaluator = metricsEvaluator();

return CloseableIterable.filter(reader.entries(projection(fileSchema, columns, caseSensitive)),
// ensure stats columns are present for metrics evaluation
boolean requireStatsProjection = requireStatsProjection();
Collection<String> projectColumns = requireStatsProjection ? withStatsColumns(columns) : columns;

return CloseableIterable.filter(
reader.entries(projection(fileSchema, projectColumns, caseSensitive)),
entry -> entry != null &&
evaluator.eval(entry.file().partition()) &&
metricsEvaluator.eval(entry.file()));

} else {
return reader.entries(projection(fileSchema, columns, caseSensitive));
}
}

CloseableIterable<ManifestEntry> liveEntries() {
if ((rowFilter != null && rowFilter != Expressions.alwaysTrue()) ||
(partFilter != null && partFilter != Expressions.alwaysTrue())) {
Evaluator evaluator = evaluator();
InclusiveMetricsEvaluator metricsEvaluator = metricsEvaluator();

return CloseableIterable.filter(reader.entries(projection(fileSchema, columns, caseSensitive)),
entry -> entry != null &&
entry.status() != Status.DELETED &&
evaluator.eval(entry.file().partition()) &&
metricsEvaluator.eval(entry.file()));

} else {
return CloseableIterable.filter(reader.entries(projection(fileSchema, columns, caseSensitive)),
entry -> entry != null && entry.status() != Status.DELETED);
}
return CloseableIterable.filter(allEntries(), entry -> entry != null && entry.status() != Status.DELETED);
}

/**
* @return an Iterator of DataFile. Makes defensive copies of files before returning
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

*/
@Override
public Iterator<DataFile> iterator() {
if ((rowFilter != null && rowFilter != Expressions.alwaysTrue()) ||
(partFilter != null && partFilter != Expressions.alwaysTrue())) {
Evaluator evaluator = evaluator();
InclusiveMetricsEvaluator metricsEvaluator = metricsEvaluator();

// ensure stats columns are present for metrics evaluation
List<String> projectColumns = Lists.newArrayList(columns);
projectColumns.addAll(STATS_COLUMNS); // order doesn't matter

// if no stats columns were projected, drop them
boolean dropStats = Sets.intersection(Sets.newHashSet(columns), STATS_COLUMNS).isEmpty();

return Iterators.transform(
Iterators.filter(reader.iterator(partFilter, projection(fileSchema, projectColumns, caseSensitive)),
input -> input != null &&
evaluator.eval(input.partition()) &&
metricsEvaluator.eval(input)),
dropStats ? DataFile::copyWithoutStats : DataFile::copy);

if (dropStats()) {
return CloseableIterable.transform(liveEntries(), e -> e.file().copyWithoutStats()).iterator();
} else {
return Iterators.transform(
reader.iterator(partFilter, projection(fileSchema, columns, caseSensitive)),
DataFile::copy);
return CloseableIterable.transform(liveEntries(), e -> e.file().copy()).iterator();
}
}

Expand Down Expand Up @@ -193,4 +166,29 @@ private InclusiveMetricsEvaluator metricsEvaluator() {
}
return lazyMetricsEvaluator;
}

private boolean requireStatsProjection() {
// Make sure we have all stats columns for metrics evaluator
return rowFilter != Expressions.alwaysTrue() &&
!columns.containsAll(ManifestReader.ALL_COLUMNS) &&
!columns.containsAll(STATS_COLUMNS);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I like how straight-forward this is now.

}

private boolean dropStats() {
// Make sure we only drop all stats if we had projected all stats
// We do not drop stats even if we had partially added some stats columns
return rowFilter != Expressions.alwaysTrue() &&
!columns.containsAll(ManifestReader.ALL_COLUMNS) &&
Sets.intersection(Sets.newHashSet(columns), STATS_COLUMNS).isEmpty();
}

private static Collection<String> withStatsColumns(Collection<String> columns) {
if (columns.containsAll(ManifestReader.ALL_COLUMNS)) {
return columns;
} else {
List<String> projectColumns = Lists.newArrayList(columns);
projectColumns.addAll(STATS_COLUMNS); // order doesn't matter
rdsr marked this conversation as resolved.
Show resolved Hide resolved
return projectColumns;
}
}
}
4 changes: 2 additions & 2 deletions core/src/main/java/org/apache/iceberg/FindFiles.java
Expand Up @@ -197,8 +197,8 @@ public CloseableIterable<DataFile> collect() {
}

// when snapshot is not null
CloseableIterable<ManifestEntry> entries = new ManifestGroup(ops.io(), snapshot.manifests(),
ops.current().specsById())
CloseableIterable<ManifestEntry> entries = new ManifestGroup(ops.io(), snapshot.manifests())
.specsById(ops.current().specsById())
.filterData(rowFilter)
.filterFiles(fileFilter)
.filterPartitions(partitionFilter)
Expand Down
127 changes: 48 additions & 79 deletions core/src/main/java/org/apache/iceberg/ManifestGroup.java
Expand Up @@ -25,7 +25,6 @@
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand All @@ -43,103 +42,65 @@ class ManifestGroup {

private final FileIO io;
private final Set<ManifestFile> manifests;
private final Map<Integer, PartitionSpec> specsById;
private final Expression dataFilter;
private final Expression fileFilter;
private final Expression partitionFilter;
private final boolean ignoreDeleted;
private final boolean ignoreExisting;
private final List<String> columns;
private final boolean caseSensitive;

private final LoadingCache<Integer, ManifestEvaluator> evalCache;
private Map<Integer, PartitionSpec> specsById;
private Expression dataFilter;
private Expression fileFilter;
private Expression partitionFilter;
private boolean ignoreDeleted;
private boolean ignoreExisting;
private List<String> columns;
private boolean caseSensitive;

ManifestGroup(FileIO io, Iterable<ManifestFile> manifests) {
this(io, manifests, null);
}

ManifestGroup(FileIO io, Iterable<ManifestFile> manifests, Map<Integer, PartitionSpec> specsById) {
this(io, Sets.newHashSet(manifests), specsById, Expressions.alwaysTrue(), Expressions.alwaysTrue(),
Expressions.alwaysTrue(), false, false, ImmutableList.of("*"), true);
}

private ManifestGroup(FileIO io, Set<ManifestFile> manifests, Map<Integer, PartitionSpec> specsById,
Expression dataFilter, Expression fileFilter, Expression partitionFilter,
boolean ignoreDeleted, boolean ignoreExisting, List<String> columns,
boolean caseSensitive) {
this.io = io;
this.manifests = manifests;
this.specsById = specsById;
this.dataFilter = dataFilter;
this.fileFilter = fileFilter;
this.partitionFilter = partitionFilter;
this.ignoreDeleted = ignoreDeleted;
this.ignoreExisting = ignoreExisting;
this.columns = columns;
this.caseSensitive = caseSensitive;
if (specsById == null) {
this.evalCache = null;
} else {
this.evalCache = Caffeine.newBuilder().build(specId -> {
PartitionSpec spec = specsById.get(specId);
return ManifestEvaluator.forPartitionFilter(
Expressions.and(partitionFilter, Projections.inclusive(spec).project(dataFilter)),
spec, caseSensitive);
});
}
this.manifests = Sets.newHashSet(manifests);
this.dataFilter = Expressions.alwaysTrue();
this.fileFilter = Expressions.alwaysTrue();
this.partitionFilter = Expressions.alwaysTrue();
this.ignoreDeleted = false;
this.ignoreExisting = false;
this.columns = ImmutableList.of("*");
this.caseSensitive = true;
}

public ManifestGroup caseSensitive(boolean filterCaseSensitive) {
return new ManifestGroup(io, manifests, specsById, dataFilter, fileFilter, partitionFilter,
ignoreDeleted, ignoreExisting, columns, filterCaseSensitive);
ManifestGroup specsById(Map<Integer, PartitionSpec> newSpecsById) {
this.specsById = newSpecsById;
return this;
}

public ManifestGroup filterData(Expression expr) {
return new ManifestGroup(
io, manifests, specsById, Expressions.and(dataFilter, expr), fileFilter, partitionFilter,
ignoreDeleted, ignoreExisting, columns, caseSensitive);
ManifestGroup filterData(Expression newDataFilter) {
this.dataFilter = Expressions.and(dataFilter, newDataFilter);
return this;
}

public ManifestGroup filterFiles(Expression expr) {
return new ManifestGroup(
io, manifests, specsById, dataFilter, Expressions.and(fileFilter, expr), partitionFilter,
ignoreDeleted, ignoreExisting, columns, caseSensitive);
ManifestGroup filterFiles(Expression newFileFilter) {
this.fileFilter = Expressions.and(fileFilter, newFileFilter);
return this;
}

public ManifestGroup filterPartitions(Expression expr) {
return new ManifestGroup(
io, manifests, specsById, dataFilter, fileFilter, Expressions.and(partitionFilter, expr),
ignoreDeleted, ignoreExisting, columns, caseSensitive);
ManifestGroup filterPartitions(Expression newPartitionFilter) {
this.partitionFilter = Expressions.and(partitionFilter, newPartitionFilter);
return this;
}

public ManifestGroup ignoreDeleted() {
return new ManifestGroup(io, manifests, specsById, dataFilter, fileFilter, partitionFilter, true,
ignoreExisting, columns, caseSensitive);
ManifestGroup ignoreDeleted() {
this.ignoreDeleted = true;
return this;
}

public ManifestGroup ignoreDeleted(boolean shouldIgnoreDeleted) {
return new ManifestGroup(io, manifests, specsById, dataFilter, fileFilter, partitionFilter,
shouldIgnoreDeleted, ignoreExisting, columns, caseSensitive);
ManifestGroup ignoreExisting() {
this.ignoreExisting = true;
return this;
}

public ManifestGroup ignoreExisting() {
return new ManifestGroup(io, manifests, specsById, dataFilter, fileFilter, partitionFilter,
ignoreDeleted, true, columns, caseSensitive);
ManifestGroup select(List<String> newColumns) {
this.columns = Lists.newArrayList(newColumns);
return this;
}

public ManifestGroup ignoreExisting(boolean shouldIgnoreExisting) {
return new ManifestGroup(io, manifests, specsById, dataFilter, fileFilter, partitionFilter,
ignoreDeleted, shouldIgnoreExisting, columns, caseSensitive);
}

public ManifestGroup select(List<String> columnNames) {
return new ManifestGroup(
io, manifests, specsById, dataFilter, fileFilter, partitionFilter, ignoreDeleted, ignoreExisting,
Lists.newArrayList(columnNames), caseSensitive);
}

public ManifestGroup select(String... columnNames) {
return select(Arrays.asList(columnNames));
ManifestGroup caseSensitive(boolean newCaseSensitive) {
this.caseSensitive = newCaseSensitive;
return this;
}

/**
Expand All @@ -151,6 +112,14 @@ public ManifestGroup select(String... columnNames) {
* @return a CloseableIterable of manifest entries.
*/
public CloseableIterable<ManifestEntry> entries() {
LoadingCache<Integer, ManifestEvaluator> evalCache = specsById == null ?
null : Caffeine.newBuilder().build(specId -> {
PartitionSpec spec = specsById.get(specId);
return ManifestEvaluator.forPartitionFilter(
Expressions.and(partitionFilter, Projections.inclusive(spec).project(dataFilter)),
spec, caseSensitive);
});

Evaluator evaluator = new Evaluator(DataFile.getType(EMPTY_STRUCT), fileFilter, caseSensitive);

Iterable<ManifestFile> matchingManifests = evalCache == null ? manifests : Iterables.filter(manifests,
Expand Down
4 changes: 2 additions & 2 deletions core/src/main/java/org/apache/iceberg/ManifestReader.java
Expand Up @@ -226,11 +226,11 @@ CloseableIterable<ManifestEntry> entries(Schema fileProjection) {

@Override
public Iterator<DataFile> iterator() {
return iterator(alwaysTrue(), fileSchema);
return iterator(fileSchema);
}

// visible for use by PartialManifest
Iterator<DataFile> iterator(Expression partFilter, Schema fileProjection) {
Iterator<DataFile> iterator(Schema fileProjection) {
rdsr marked this conversation as resolved.
Show resolved Hide resolved
return Iterables.transform(Iterables.filter(
entries(fileProjection),
entry -> entry.status() != ManifestEntry.Status.DELETED),
Expand Down
3 changes: 2 additions & 1 deletion core/src/main/java/org/apache/iceberg/ScanSummary.java
Expand Up @@ -218,7 +218,8 @@ private Map<String, PartitionMetrics> computeTopPartitionMetrics(
TopN<String, PartitionMetrics> topN = new TopN<>(
limit, throwIfLimited, Comparators.charSequences());

try (CloseableIterable<ManifestEntry> entries = new ManifestGroup(ops.io(), manifests, ops.current().specsById())
try (CloseableIterable<ManifestEntry> entries = new ManifestGroup(ops.io(), manifests)
.specsById(ops.current().specsById())
.filterData(rowFilter)
.ignoreDeleted()
.select(SCAN_SUMMARY_COLUMNS)
Expand Down
25 changes: 25 additions & 0 deletions core/src/test/java/org/apache/iceberg/TestFindFiles.java
Expand Up @@ -19,11 +19,14 @@

package org.apache.iceberg;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import java.util.Arrays;
import java.util.Set;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.types.Conversions;
import org.apache.iceberg.types.Types;
import org.junit.Assert;
import org.junit.Test;

Expand Down Expand Up @@ -56,6 +59,28 @@ public void testWithMetadataMatching() {
Assert.assertEquals(pathSet(FILE_A), pathSet(files));
}

@Test
public void testWithRecordsMatching() {
table.newAppend()
.appendFile(DataFiles.builder(SPEC)
.withInputFile(Files.localInput("/path/to/data-e.parquet"))
.withPartitionPath("data_bucket=4")
.withMetrics(new Metrics(3L,
null, // no column sizes
ImmutableMap.of(1, 3L), // value count
ImmutableMap.of(1, 0L), // null count
ImmutableMap.of(1, Conversions.toByteBuffer(Types.IntegerType.get(), 1)), // lower bounds
ImmutableMap.of(1, Conversions.toByteBuffer(Types.IntegerType.get(), 5)))) // lower bounds
.build())
.commit();

final Iterable<DataFile> files = FindFiles.in(table)
.withRecordsMatching(Expressions.equal("id", 1))
.collect();

Assert.assertEquals(Sets.newHashSet("/path/to/data-e.parquet"), pathSet(files));
}

@Test
public void testInPartition() {
table.newAppend()
Expand Down