Skip to content

Commit

Permalink
[Refactor]: Remove duplication form FilteredManifest
Browse files Browse the repository at this point in the history
  • Loading branch information
rdsr committed Jan 13, 2020
1 parent 20fa2c4 commit d8627c9
Show file tree
Hide file tree
Showing 8 changed files with 175 additions and 116 deletions.
7 changes: 4 additions & 3 deletions core/src/main/java/org/apache/iceberg/BaseSnapshot.java
Expand Up @@ -161,17 +161,18 @@ private void cacheChanges() {
// read only manifests that were created by this snapshot
Iterable<ManifestFile> changedManifests = Iterables.filter(manifests(),
manifest -> Objects.equal(manifest.snapshotId(), snapshotId));
try (CloseableIterable<ManifestEntry> entries = new ManifestGroup(io, changedManifests)
try (CloseableIterable<ManifestEntry> entries = ManifestGroup.builder(io, changedManifests)
.ignoreExisting()
.select(ManifestReader.ALL_COLUMNS)
.build()
.entries()) {
for (ManifestEntry entry : entries) {
switch (entry.status()) {
case ADDED:
adds.add(entry.file().copy());
adds.add(entry.file());
break;
case DELETED:
deletes.add(entry.file().copyWithoutStats());
deletes.add(entry.file());
break;
default:
throw new IllegalStateException(
Expand Down
8 changes: 5 additions & 3 deletions core/src/main/java/org/apache/iceberg/FileHistory.java
Expand Up @@ -98,15 +98,17 @@ public Iterable<ManifestEntry> build() {
manifest -> manifest.snapshotId() == null || matchingIds.contains(manifest.snapshotId()));

// a manifest group will only read each manifest once
ManifestGroup group = new ManifestGroup(((HasTableOperations) table).operations().io(), manifests);
ManifestGroup group = ManifestGroup.builder(((HasTableOperations) table).operations().io(), manifests)
.select(HISTORY_COLUMNS)
.build();

List<ManifestEntry> results = Lists.newArrayList();
try (CloseableIterable<ManifestEntry> entries = group.select(HISTORY_COLUMNS).entries()) {
try (CloseableIterable<ManifestEntry> entries = group.entries()) {
// TODO: replace this with an IN predicate
CharSequenceWrapper locationWrapper = CharSequenceWrapper.wrap(null);
for (ManifestEntry entry : entries) {
if (entry != null && locations.contains(locationWrapper.set(entry.file().path()))) {
results.add(entry.copy());
results.add(entry);
}
}
} catch (IOException e) {
Expand Down
68 changes: 27 additions & 41 deletions core/src/main/java/org/apache/iceberg/FilteredManifest.java
Expand Up @@ -95,60 +95,31 @@ CloseableIterable<ManifestEntry> allEntries() {
Evaluator evaluator = evaluator();
InclusiveMetricsEvaluator metricsEvaluator = metricsEvaluator();

return CloseableIterable.filter(reader.entries(projection(fileSchema, columns, caseSensitive)),
// ensure stats columns are present for metrics evaluation
Collection<String> projectColumns = withStatsColumns(columns);

CloseableIterable<ManifestEntry> entries = CloseableIterable.filter(
reader.entries(projection(fileSchema, projectColumns, caseSensitive)),
entry -> entry != null &&
evaluator.eval(entry.file().partition()) &&
metricsEvaluator.eval(entry.file()));

boolean dropStats = dropColumnStats(columns);
return CloseableIterable.transform(entries, e -> dropStats ? e.copyWithoutStats() : e.copy());

} else {
return reader.entries(projection(fileSchema, columns, caseSensitive));
return CloseableIterable.transform(
reader.entries(projection(fileSchema, columns, caseSensitive)), ManifestEntry::copy);
}
}

CloseableIterable<ManifestEntry> liveEntries() {
if ((rowFilter != null && rowFilter != Expressions.alwaysTrue()) ||
(partFilter != null && partFilter != Expressions.alwaysTrue())) {
Evaluator evaluator = evaluator();
InclusiveMetricsEvaluator metricsEvaluator = metricsEvaluator();

return CloseableIterable.filter(reader.entries(projection(fileSchema, columns, caseSensitive)),
entry -> entry != null &&
entry.status() != Status.DELETED &&
evaluator.eval(entry.file().partition()) &&
metricsEvaluator.eval(entry.file()));

} else {
return CloseableIterable.filter(reader.entries(projection(fileSchema, columns, caseSensitive)),
entry -> entry != null && entry.status() != Status.DELETED);
}
return CloseableIterable.filter(allEntries(), entry -> entry != null && entry.status() != Status.DELETED);
}

@Override
public Iterator<DataFile> iterator() {
if ((rowFilter != null && rowFilter != Expressions.alwaysTrue()) ||
(partFilter != null && partFilter != Expressions.alwaysTrue())) {
Evaluator evaluator = evaluator();
InclusiveMetricsEvaluator metricsEvaluator = metricsEvaluator();

// ensure stats columns are present for metrics evaluation
List<String> projectColumns = Lists.newArrayList(columns);
projectColumns.addAll(STATS_COLUMNS); // order doesn't matter

// if no stats columns were projected, drop them
boolean dropStats = Sets.intersection(Sets.newHashSet(columns), STATS_COLUMNS).isEmpty();

return Iterators.transform(
Iterators.filter(reader.iterator(partFilter, projection(fileSchema, projectColumns, caseSensitive)),
input -> input != null &&
evaluator.eval(input.partition()) &&
metricsEvaluator.eval(input)),
dropStats ? DataFile::copyWithoutStats : DataFile::copy);

} else {
return Iterators.transform(
reader.iterator(partFilter, projection(fileSchema, columns, caseSensitive)),
DataFile::copy);
}
return Iterators.transform(liveEntries().iterator(), ManifestEntry::file);
}

@Override
Expand Down Expand Up @@ -193,4 +164,19 @@ private InclusiveMetricsEvaluator metricsEvaluator() {
}
return lazyMetricsEvaluator;
}

private static boolean dropColumnStats(Collection<String> columns) {
return !columns.contains("*") &&
Sets.intersection(Sets.newHashSet(columns), STATS_COLUMNS).isEmpty();
}

private static Collection<String> withStatsColumns(Collection<String> columns) {
if (columns.contains("*")) {
return columns;
} else {
List<String> projectColumns = Lists.newArrayList(columns);
projectColumns.addAll(STATS_COLUMNS); // order doesn't matter
return projectColumns;
}
}
}
5 changes: 3 additions & 2 deletions core/src/main/java/org/apache/iceberg/FindFiles.java
Expand Up @@ -197,13 +197,14 @@ public CloseableIterable<DataFile> collect() {
}

// when snapshot is not null
CloseableIterable<ManifestEntry> entries = new ManifestGroup(ops.io(), snapshot.manifests(),
ops.current().specsById())
CloseableIterable<ManifestEntry> entries = ManifestGroup.builder(ops.io(), snapshot.manifests())
.specsById(ops.current().specsById())
.filterData(rowFilter)
.filterFiles(fileFilter)
.filterPartitions(partitionFilter)
.ignoreDeleted()
.caseSensitive(caseSensitive)
.build()
.entries();

return CloseableIterable.transform(entries, entry -> entry.file().copyWithoutStats());
Expand Down
139 changes: 75 additions & 64 deletions core/src/main/java/org/apache/iceberg/ManifestGroup.java
Expand Up @@ -25,7 +25,6 @@
import com.google.common.collect.Iterables;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.util.Arrays;
import java.util.List;
import java.util.Map;
import java.util.Set;
Expand Down Expand Up @@ -54,15 +53,6 @@ class ManifestGroup {

private final LoadingCache<Integer, ManifestEvaluator> evalCache;

ManifestGroup(FileIO io, Iterable<ManifestFile> manifests) {
this(io, manifests, null);
}

ManifestGroup(FileIO io, Iterable<ManifestFile> manifests, Map<Integer, PartitionSpec> specsById) {
this(io, Sets.newHashSet(manifests), specsById, Expressions.alwaysTrue(), Expressions.alwaysTrue(),
Expressions.alwaysTrue(), false, false, ImmutableList.of("*"), true);
}

private ManifestGroup(FileIO io, Set<ManifestFile> manifests, Map<Integer, PartitionSpec> specsById,
Expression dataFilter, Expression fileFilter, Expression partitionFilter,
boolean ignoreDeleted, boolean ignoreExisting, List<String> columns,
Expand All @@ -83,65 +73,12 @@ private ManifestGroup(FileIO io, Set<ManifestFile> manifests, Map<Integer, Parti
this.evalCache = Caffeine.newBuilder().build(specId -> {
PartitionSpec spec = specsById.get(specId);
return ManifestEvaluator.forPartitionFilter(
Expressions.and(partitionFilter, Projections.inclusive(spec).project(dataFilter)),
Expressions.and(partitionFilter, Projections.inclusive(spec, caseSensitive).project(dataFilter)),
spec, caseSensitive);
});
}
}

public ManifestGroup caseSensitive(boolean filterCaseSensitive) {
return new ManifestGroup(io, manifests, specsById, dataFilter, fileFilter, partitionFilter,
ignoreDeleted, ignoreExisting, columns, filterCaseSensitive);
}

public ManifestGroup filterData(Expression expr) {
return new ManifestGroup(
io, manifests, specsById, Expressions.and(dataFilter, expr), fileFilter, partitionFilter,
ignoreDeleted, ignoreExisting, columns, caseSensitive);
}

public ManifestGroup filterFiles(Expression expr) {
return new ManifestGroup(
io, manifests, specsById, dataFilter, Expressions.and(fileFilter, expr), partitionFilter,
ignoreDeleted, ignoreExisting, columns, caseSensitive);
}

public ManifestGroup filterPartitions(Expression expr) {
return new ManifestGroup(
io, manifests, specsById, dataFilter, fileFilter, Expressions.and(partitionFilter, expr),
ignoreDeleted, ignoreExisting, columns, caseSensitive);
}

public ManifestGroup ignoreDeleted() {
return new ManifestGroup(io, manifests, specsById, dataFilter, fileFilter, partitionFilter, true,
ignoreExisting, columns, caseSensitive);
}

public ManifestGroup ignoreDeleted(boolean shouldIgnoreDeleted) {
return new ManifestGroup(io, manifests, specsById, dataFilter, fileFilter, partitionFilter,
shouldIgnoreDeleted, ignoreExisting, columns, caseSensitive);
}

public ManifestGroup ignoreExisting() {
return new ManifestGroup(io, manifests, specsById, dataFilter, fileFilter, partitionFilter,
ignoreDeleted, true, columns, caseSensitive);
}

public ManifestGroup ignoreExisting(boolean shouldIgnoreExisting) {
return new ManifestGroup(io, manifests, specsById, dataFilter, fileFilter, partitionFilter,
ignoreDeleted, shouldIgnoreExisting, columns, caseSensitive);
}

public ManifestGroup select(List<String> columnNames) {
return new ManifestGroup(
io, manifests, specsById, dataFilter, fileFilter, partitionFilter, ignoreDeleted, ignoreExisting,
Lists.newArrayList(columnNames), caseSensitive);
}

public ManifestGroup select(String... columnNames) {
return select(Arrays.asList(columnNames));
}

/**
* Returns an iterable for manifest entries in the set of manifests.
* <p>
Expand Down Expand Up @@ -204,4 +141,78 @@ public CloseableIterable<ManifestEntry> entries() {

return CloseableIterable.concat(readers);
}

static Builder builder(FileIO io, Iterable<ManifestFile> manifests) {
return new Builder(io, manifests);
}

static class Builder {
private final FileIO io;
private final Set<ManifestFile> manifests;
private Map<Integer, PartitionSpec> specsById;
private Expression dataFilter;
private Expression fileFilter;
private Expression partitionFilter;
private boolean ignoreDeleted;
private boolean ignoreExisting;
private List<String> columns;
private boolean caseSensitive;

Builder(FileIO io, Iterable<ManifestFile> manifests) {
this.io = io;
this.manifests = Sets.newHashSet(manifests);
this.dataFilter = Expressions.alwaysTrue();
this.fileFilter = Expressions.alwaysTrue();
this.partitionFilter = Expressions.alwaysTrue();
this.ignoreDeleted = false;
this.ignoreExisting = false;
this.columns = ImmutableList.of("*");
this.caseSensitive = true;
}

Builder specsById(Map<Integer, PartitionSpec> newSpecsById) {
this.specsById = newSpecsById;
return this;
}

Builder filterData(Expression newDataFilter) {
this.dataFilter = Expressions.and(dataFilter, newDataFilter);
return this;
}

Builder filterFiles(Expression newFileFilter) {
this.fileFilter = Expressions.and(fileFilter, newFileFilter);
return this;
}

Builder filterPartitions(Expression newPartitionFilter) {
this.partitionFilter = Expressions.and(partitionFilter, newPartitionFilter);
return this;
}

Builder ignoreDeleted() {
this.ignoreDeleted = true;
return this;
}

Builder ignoreExisting() {
this.ignoreExisting = true;
return this;
}

Builder select(List<String> newColumns) {
this.columns = Lists.newArrayList(newColumns);
return this;
}

Builder caseSensitive(boolean newCaseSensitive) {
this.caseSensitive = newCaseSensitive;
return this;
}

ManifestGroup build() {
return new ManifestGroup(io, manifests, specsById, dataFilter, fileFilter, partitionFilter, ignoreDeleted,
ignoreExisting, columns, caseSensitive);
}
}
}
4 changes: 2 additions & 2 deletions core/src/main/java/org/apache/iceberg/ManifestReader.java
Expand Up @@ -226,11 +226,11 @@ CloseableIterable<ManifestEntry> entries(Schema fileProjection) {

@Override
public Iterator<DataFile> iterator() {
return iterator(alwaysTrue(), fileSchema);
return iterator(fileSchema);
}

// visible for use by PartialManifest
Iterator<DataFile> iterator(Expression partFilter, Schema fileProjection) {
Iterator<DataFile> iterator(Schema fileProjection) {
return Iterables.transform(Iterables.filter(
entries(fileProjection),
entry -> entry.status() != ManifestEntry.Status.DELETED),
Expand Down
4 changes: 3 additions & 1 deletion core/src/main/java/org/apache/iceberg/ScanSummary.java
Expand Up @@ -218,10 +218,12 @@ private Map<String, PartitionMetrics> computeTopPartitionMetrics(
TopN<String, PartitionMetrics> topN = new TopN<>(
limit, throwIfLimited, Comparators.charSequences());

try (CloseableIterable<ManifestEntry> entries = new ManifestGroup(ops.io(), manifests, ops.current().specsById())
try (CloseableIterable<ManifestEntry> entries = ManifestGroup.builder(ops.io(), manifests)
.specsById(ops.current().specsById())
.filterData(rowFilter)
.ignoreDeleted()
.select(SCAN_SUMMARY_COLUMNS)
.build()
.entries()) {

PartitionSpec spec = table.spec();
Expand Down

0 comments on commit d8627c9

Please sign in to comment.