Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Refactor FilteredManifest and ManifestGroup #735

Merged
merged 5 commits into from Jan 27, 2020
Merged
Show file tree
Hide file tree
Changes from 2 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
3 changes: 2 additions & 1 deletion core/src/main/java/org/apache/iceberg/BaseSnapshot.java
Expand Up @@ -161,9 +161,10 @@ private void cacheChanges() {
// read only manifests that were created by this snapshot
Iterable<ManifestFile> changedManifests = Iterables.filter(manifests(),
manifest -> Objects.equal(manifest.snapshotId(), snapshotId));
try (CloseableIterable<ManifestEntry> entries = new ManifestGroup(io, changedManifests)
try (CloseableIterable<ManifestEntry> entries = ManifestGroup.builder(io, changedManifests)
.ignoreExisting()
.select(ManifestReader.ALL_COLUMNS)
.build()
.entries()) {
for (ManifestEntry entry : entries) {
switch (entry.status()) {
Expand Down
6 changes: 4 additions & 2 deletions core/src/main/java/org/apache/iceberg/FileHistory.java
Expand Up @@ -98,10 +98,12 @@ public Iterable<ManifestEntry> build() {
manifest -> manifest.snapshotId() == null || matchingIds.contains(manifest.snapshotId()));

// a manifest group will only read each manifest once
ManifestGroup group = new ManifestGroup(((HasTableOperations) table).operations().io(), manifests);
ManifestGroup group = ManifestGroup.builder(((HasTableOperations) table).operations().io(), manifests)
.select(HISTORY_COLUMNS)
.build();

List<ManifestEntry> results = Lists.newArrayList();
try (CloseableIterable<ManifestEntry> entries = group.select(HISTORY_COLUMNS).entries()) {
try (CloseableIterable<ManifestEntry> entries = group.entries()) {
// TODO: replace this with an IN predicate
CharSequenceWrapper locationWrapper = CharSequenceWrapper.wrap(null);
for (ManifestEntry entry : entries) {
Expand Down
76 changes: 35 additions & 41 deletions core/src/main/java/org/apache/iceberg/FilteredManifest.java
Expand Up @@ -20,7 +20,6 @@
package org.apache.iceberg;

import com.google.common.base.Preconditions;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.io.IOException;
Expand Down Expand Up @@ -95,60 +94,38 @@ CloseableIterable<ManifestEntry> allEntries() {
Evaluator evaluator = evaluator();
InclusiveMetricsEvaluator metricsEvaluator = metricsEvaluator();

return CloseableIterable.filter(reader.entries(projection(fileSchema, columns, caseSensitive)),
// ensure stats columns are present for metrics evaluation
boolean requireStatsProjection = requireStatsProjection();
Collection<String> projectColumns = requireStatsProjection ? withStatsColumns(columns) : columns;

CloseableIterable<ManifestEntry> entries = CloseableIterable.filter(
reader.entries(projection(fileSchema, projectColumns, caseSensitive)),
entry -> entry != null &&
evaluator.eval(entry.file().partition()) &&
metricsEvaluator.eval(entry.file()));

// note that columns itself could have stats projected, that's ok.
// We only drop stats if we have forced stats projection.
return CloseableIterable.transform(entries, e -> requireStatsProjection ? e.copyWithoutStats() : e);
rdsr marked this conversation as resolved.
Show resolved Hide resolved
} else {
return reader.entries(projection(fileSchema, columns, caseSensitive));
}
}

CloseableIterable<ManifestEntry> liveEntries() {
if ((rowFilter != null && rowFilter != Expressions.alwaysTrue()) ||
(partFilter != null && partFilter != Expressions.alwaysTrue())) {
Evaluator evaluator = evaluator();
InclusiveMetricsEvaluator metricsEvaluator = metricsEvaluator();

return CloseableIterable.filter(reader.entries(projection(fileSchema, columns, caseSensitive)),
entry -> entry != null &&
entry.status() != Status.DELETED &&
evaluator.eval(entry.file().partition()) &&
metricsEvaluator.eval(entry.file()));

} else {
return CloseableIterable.filter(reader.entries(projection(fileSchema, columns, caseSensitive)),
entry -> entry != null && entry.status() != Status.DELETED);
}
return CloseableIterable.filter(allEntries(), entry -> entry != null && entry.status() != Status.DELETED);
}

/**
* @return an Iterator of DataFile. Makes defensive copies of files before returning
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

*/
@Override
public Iterator<DataFile> iterator() {
if ((rowFilter != null && rowFilter != Expressions.alwaysTrue()) ||
(partFilter != null && partFilter != Expressions.alwaysTrue())) {
Evaluator evaluator = evaluator();
InclusiveMetricsEvaluator metricsEvaluator = metricsEvaluator();

// ensure stats columns are present for metrics evaluation
List<String> projectColumns = Lists.newArrayList(columns);
projectColumns.addAll(STATS_COLUMNS); // order doesn't matter

// if no stats columns were projected, drop them
boolean dropStats = Sets.intersection(Sets.newHashSet(columns), STATS_COLUMNS).isEmpty();

return Iterators.transform(
Iterators.filter(reader.iterator(partFilter, projection(fileSchema, projectColumns, caseSensitive)),
input -> input != null &&
evaluator.eval(input.partition()) &&
metricsEvaluator.eval(input)),
dropStats ? DataFile::copyWithoutStats : DataFile::copy);

} else {
return Iterators.transform(
reader.iterator(partFilter, projection(fileSchema, columns, caseSensitive)),
DataFile::copy);
}
// If requireStatsProjection is true, we don't create a defensive
// copy of manifest entry as it was already done by allEntries()
boolean requireStatsProjection = requireStatsProjection();
rdsr marked this conversation as resolved.
Show resolved Hide resolved
return CloseableIterable.transform(liveEntries(),
e -> requireStatsProjection ? e.file() : e.copy().file()).iterator();
}

@Override
Expand Down Expand Up @@ -193,4 +170,21 @@ private InclusiveMetricsEvaluator metricsEvaluator() {
}
return lazyMetricsEvaluator;
}

private boolean requireStatsProjection() {
return rowFilter != Expressions.alwaysTrue() &&
// projected columns do not contain stats column
!columns.containsAll(ManifestReader.ALL_COLUMNS) &&
Sets.intersection(Sets.newHashSet(columns), STATS_COLUMNS).isEmpty();
}

private static Collection<String> withStatsColumns(Collection<String> columns) {
if (columns.containsAll(ManifestReader.ALL_COLUMNS)) {
return columns;
} else {
List<String> projectColumns = Lists.newArrayList(columns);
projectColumns.addAll(STATS_COLUMNS); // order doesn't matter
rdsr marked this conversation as resolved.
Show resolved Hide resolved
return projectColumns;
}
}
}
5 changes: 3 additions & 2 deletions core/src/main/java/org/apache/iceberg/FindFiles.java
Expand Up @@ -197,13 +197,14 @@ public CloseableIterable<DataFile> collect() {
}

// when snapshot is not null
CloseableIterable<ManifestEntry> entries = new ManifestGroup(ops.io(), snapshot.manifests(),
ops.current().specsById())
CloseableIterable<ManifestEntry> entries = ManifestGroup.builder(ops.io(), snapshot.manifests())
.specsById(ops.current().specsById())
.filterData(rowFilter)
.filterFiles(fileFilter)
.filterPartitions(partitionFilter)
.ignoreDeleted()
.caseSensitive(caseSensitive)
.build()
.entries();

return CloseableIterable.transform(entries, entry -> entry.file().copyWithoutStats());
Expand Down
139 changes: 75 additions & 64 deletions core/src/main/java/org/apache/iceberg/ManifestGroup.java
Expand Up @@ -25,7 +25,6 @@
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -54,15 +53,6 @@ class ManifestGroup {

private final LoadingCache<Integer, ManifestEvaluator> evalCache;

ManifestGroup(FileIO io, Iterable<ManifestFile> manifests) {
this(io, manifests, null);
}

ManifestGroup(FileIO io, Iterable<ManifestFile> manifests, Map<Integer, PartitionSpec> specsById) {
this(io, Sets.newHashSet(manifests), specsById, Expressions.alwaysTrue(), Expressions.alwaysTrue(),
Expressions.alwaysTrue(), false, false, ImmutableList.of("*"), true);
}

private ManifestGroup(FileIO io, Set<ManifestFile> manifests, Map<Integer, PartitionSpec> specsById,
Expression dataFilter, Expression fileFilter, Expression partitionFilter,
boolean ignoreDeleted, boolean ignoreExisting, List<String> columns,
Expand All @@ -83,65 +73,12 @@ private ManifestGroup(FileIO io, Set<ManifestFile> manifests, Map<Integer, Parti
this.evalCache = Caffeine.newBuilder().build(specId -> {
PartitionSpec spec = specsById.get(specId);
return ManifestEvaluator.forPartitionFilter(
Expressions.and(partitionFilter, Projections.inclusive(spec).project(dataFilter)),
Expressions.and(partitionFilter, Projections.inclusive(spec, caseSensitive).project(dataFilter)),
spec, caseSensitive);
});
}
}

public ManifestGroup caseSensitive(boolean filterCaseSensitive) {
return new ManifestGroup(io, manifests, specsById, dataFilter, fileFilter, partitionFilter,
ignoreDeleted, ignoreExisting, columns, filterCaseSensitive);
}

public ManifestGroup filterData(Expression expr) {
return new ManifestGroup(
io, manifests, specsById, Expressions.and(dataFilter, expr), fileFilter, partitionFilter,
ignoreDeleted, ignoreExisting, columns, caseSensitive);
}

public ManifestGroup filterFiles(Expression expr) {
return new ManifestGroup(
io, manifests, specsById, dataFilter, Expressions.and(fileFilter, expr), partitionFilter,
ignoreDeleted, ignoreExisting, columns, caseSensitive);
}

public ManifestGroup filterPartitions(Expression expr) {
return new ManifestGroup(
io, manifests, specsById, dataFilter, fileFilter, Expressions.and(partitionFilter, expr),
ignoreDeleted, ignoreExisting, columns, caseSensitive);
}

public ManifestGroup ignoreDeleted() {
return new ManifestGroup(io, manifests, specsById, dataFilter, fileFilter, partitionFilter, true,
ignoreExisting, columns, caseSensitive);
}

public ManifestGroup ignoreDeleted(boolean shouldIgnoreDeleted) {
return new ManifestGroup(io, manifests, specsById, dataFilter, fileFilter, partitionFilter,
shouldIgnoreDeleted, ignoreExisting, columns, caseSensitive);
}

public ManifestGroup ignoreExisting() {
return new ManifestGroup(io, manifests, specsById, dataFilter, fileFilter, partitionFilter,
ignoreDeleted, true, columns, caseSensitive);
}

public ManifestGroup ignoreExisting(boolean shouldIgnoreExisting) {
return new ManifestGroup(io, manifests, specsById, dataFilter, fileFilter, partitionFilter,
ignoreDeleted, shouldIgnoreExisting, columns, caseSensitive);
}

public ManifestGroup select(List<String> columnNames) {
return new ManifestGroup(
io, manifests, specsById, dataFilter, fileFilter, partitionFilter, ignoreDeleted, ignoreExisting,
Lists.newArrayList(columnNames), caseSensitive);
}

public ManifestGroup select(String... columnNames) {
return select(Arrays.asList(columnNames));
}

/**
* Returns an iterable for manifest entries in the set of manifests.
* <p>
Expand Down Expand Up @@ -204,4 +141,78 @@ public CloseableIterable<ManifestEntry> entries() {

return CloseableIterable.concat(readers);
}

static Builder builder(FileIO io, Iterable<ManifestFile> manifests) {
return new Builder(io, manifests);
}

static class Builder {
rdsr marked this conversation as resolved.
Show resolved Hide resolved
private final FileIO io;
private final Set<ManifestFile> manifests;
private Map<Integer, PartitionSpec> specsById;
private Expression dataFilter;
private Expression fileFilter;
private Expression partitionFilter;
private boolean ignoreDeleted;
private boolean ignoreExisting;
private List<String> columns;
private boolean caseSensitive;

Builder(FileIO io, Iterable<ManifestFile> manifests) {
this.io = io;
this.manifests = Sets.newHashSet(manifests);
this.dataFilter = Expressions.alwaysTrue();
this.fileFilter = Expressions.alwaysTrue();
this.partitionFilter = Expressions.alwaysTrue();
this.ignoreDeleted = false;
this.ignoreExisting = false;
this.columns = ImmutableList.of("*");
this.caseSensitive = true;
}

Builder specsById(Map<Integer, PartitionSpec> newSpecsById) {
this.specsById = newSpecsById;
return this;
}

Builder filterData(Expression newDataFilter) {
this.dataFilter = Expressions.and(dataFilter, newDataFilter);
return this;
}

Builder filterFiles(Expression newFileFilter) {
this.fileFilter = Expressions.and(fileFilter, newFileFilter);
return this;
}

Builder filterPartitions(Expression newPartitionFilter) {
this.partitionFilter = Expressions.and(partitionFilter, newPartitionFilter);
return this;
}

Builder ignoreDeleted() {
this.ignoreDeleted = true;
return this;
}

Builder ignoreExisting() {
this.ignoreExisting = true;
return this;
}

Builder select(List<String> newColumns) {
this.columns = Lists.newArrayList(newColumns);
return this;
}

Builder caseSensitive(boolean newCaseSensitive) {
this.caseSensitive = newCaseSensitive;
return this;
}

ManifestGroup build() {
return new ManifestGroup(io, manifests, specsById, dataFilter, fileFilter, partitionFilter, ignoreDeleted,
ignoreExisting, columns, caseSensitive);
}
}
}
4 changes: 2 additions & 2 deletions core/src/main/java/org/apache/iceberg/ManifestReader.java
Expand Up @@ -226,11 +226,11 @@ CloseableIterable<ManifestEntry> entries(Schema fileProjection) {

@Override
public Iterator<DataFile> iterator() {
return iterator(alwaysTrue(), fileSchema);
return iterator(fileSchema);
}

// visible for use by PartialManifest
Iterator<DataFile> iterator(Expression partFilter, Schema fileProjection) {
Iterator<DataFile> iterator(Schema fileProjection) {
rdsr marked this conversation as resolved.
Show resolved Hide resolved
return Iterables.transform(Iterables.filter(
entries(fileProjection),
entry -> entry.status() != ManifestEntry.Status.DELETED),
Expand Down
4 changes: 3 additions & 1 deletion core/src/main/java/org/apache/iceberg/ScanSummary.java
Expand Up @@ -218,10 +218,12 @@ private Map<String, PartitionMetrics> computeTopPartitionMetrics(
TopN<String, PartitionMetrics> topN = new TopN<>(
limit, throwIfLimited, Comparators.charSequences());

try (CloseableIterable<ManifestEntry> entries = new ManifestGroup(ops.io(), manifests, ops.current().specsById())
try (CloseableIterable<ManifestEntry> entries = ManifestGroup.builder(ops.io(), manifests)
.specsById(ops.current().specsById())
.filterData(rowFilter)
.ignoreDeleted()
.select(SCAN_SUMMARY_COLUMNS)
.build()
.entries()) {

PartitionSpec spec = table.spec();
Expand Down
25 changes: 25 additions & 0 deletions core/src/test/java/org/apache/iceberg/TestFindFiles.java
Expand Up @@ -19,11 +19,14 @@

package org.apache.iceberg;

import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import java.util.Arrays;
import java.util.Set;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.types.Conversions;
import org.apache.iceberg.types.Types;
import org.junit.Assert;
import org.junit.Test;

Expand Down Expand Up @@ -56,6 +59,28 @@ public void testWithMetadataMatching() {
Assert.assertEquals(pathSet(FILE_A), pathSet(files));
}

@Test
public void testWithRecordsMatching() {
table.newAppend()
.appendFile(DataFiles.builder(SPEC)
.withInputFile(Files.localInput("/path/to/data-e.parquet"))
.withPartitionPath("data_bucket=4")
.withMetrics(new Metrics(3L,
null, // no column sizes
ImmutableMap.of(1, 3L), // value count
ImmutableMap.of(1, 0L), // null count
ImmutableMap.of(1, Conversions.toByteBuffer(Types.IntegerType.get(), 1)), // lower bounds
ImmutableMap.of(1, Conversions.toByteBuffer(Types.IntegerType.get(), 5)))) // lower bounds
.build())
.commit();

final Iterable<DataFile> files = FindFiles.in(table)
.withRecordsMatching(Expressions.equal("id", 1))
.collect();

Assert.assertEquals(Sets.newHashSet("/path/to/data-e.parquet"), pathSet(files));
}

@Test
public void testInPartition() {
table.newAppend()
Expand Down