Skip to content

Commit

Permalink
Address review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
rdsr committed Jan 19, 2020
1 parent 03a96dc commit e36373a
Show file tree
Hide file tree
Showing 4 changed files with 22 additions and 44 deletions.
4 changes: 2 additions & 2 deletions core/src/main/java/org/apache/iceberg/BaseSnapshot.java
Expand Up @@ -169,10 +169,10 @@ private void cacheChanges() {
for (ManifestEntry entry : entries) {
switch (entry.status()) {
case ADDED:
adds.add(entry.file());
adds.add(entry.file().copy());
break;
case DELETED:
deletes.add(entry.file());
deletes.add(entry.file().copyWithoutStats());
break;
default:
throw new IllegalStateException(
Expand Down
2 changes: 1 addition & 1 deletion core/src/main/java/org/apache/iceberg/FileHistory.java
Expand Up @@ -108,7 +108,7 @@ public Iterable<ManifestEntry> build() {
CharSequenceWrapper locationWrapper = CharSequenceWrapper.wrap(null);
for (ManifestEntry entry : entries) {
if (entry != null && locations.contains(locationWrapper.set(entry.file().path()))) {
results.add(entry);
results.add(entry.copy());
}
}
} catch (IOException e) {
Expand Down
29 changes: 19 additions & 10 deletions core/src/main/java/org/apache/iceberg/FilteredManifest.java
Expand Up @@ -20,7 +20,6 @@
package org.apache.iceberg;

import com.google.common.base.Preconditions;
import com.google.common.collect.Iterators;
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import java.io.IOException;
Expand Down Expand Up @@ -96,30 +95,38 @@ CloseableIterable<ManifestEntry> allEntries() {
InclusiveMetricsEvaluator metricsEvaluator = metricsEvaluator();

// ensure stats columns are present for metrics evaluation
Collection<String> projectColumns = withStatsColumns(columns);
boolean requireStatsProjection = requireStatsProjection();
Collection<String> projectColumns = requireStatsProjection ? withStatsColumns(columns) : columns;

CloseableIterable<ManifestEntry> entries = CloseableIterable.filter(
reader.entries(projection(fileSchema, projectColumns, caseSensitive)),
entry -> entry != null &&
evaluator.eval(entry.file().partition()) &&
metricsEvaluator.eval(entry.file()));

boolean dropStats = dropColumnStats(columns);
return CloseableIterable.transform(entries, e -> dropStats ? e.copyWithoutStats() : e.copy());
// note that columns itself could have stats projected, that's ok.
// We only drop stats if we have forced stats projection.
return CloseableIterable.transform(entries, e -> requireStatsProjection ? e.copyWithoutStats() : e.copy());

} else {
return CloseableIterable.transform(
reader.entries(projection(fileSchema, columns, caseSensitive)), ManifestEntry::copy);
return reader.entries(projection(fileSchema, columns, caseSensitive));
}
}

CloseableIterable<ManifestEntry> liveEntries() {
return CloseableIterable.filter(allEntries(), entry -> entry != null && entry.status() != Status.DELETED);
}

/**
* @return an Iterator of DataFile. Makes defensive copies before returning
*/
@Override
public Iterator<DataFile> iterator() {
return Iterators.transform(liveEntries().iterator(), ManifestEntry::file);
// If requireStatsProjection is true, we don't create a defensive
// copy of manifest entry as it was already done by allEntries()
boolean requireStatsProjection = requireStatsProjection();
return CloseableIterable.transform(liveEntries(),
e -> requireStatsProjection ? e.file() : e.copy().file()).iterator();
}

@Override
Expand Down Expand Up @@ -165,13 +172,15 @@ private InclusiveMetricsEvaluator metricsEvaluator() {
return lazyMetricsEvaluator;
}

private static boolean dropColumnStats(Collection<String> columns) {
return !columns.contains("*") &&
private boolean requireStatsProjection() {
return rowFilter != Expressions.alwaysTrue() &&
// projected columns do not contain stats column
!columns.containsAll(ManifestReader.ALL_COLUMNS) &&
Sets.intersection(Sets.newHashSet(columns), STATS_COLUMNS).isEmpty();
}

private static Collection<String> withStatsColumns(Collection<String> columns) {
if (columns.contains("*")) {
if (columns.containsAll(ManifestReader.ALL_COLUMNS)) {
return columns;
} else {
List<String> projectColumns = Lists.newArrayList(columns);
Expand Down
31 changes: 0 additions & 31 deletions core/src/test/java/org/apache/iceberg/TestFindFiles.java
Expand Up @@ -22,17 +22,9 @@
import com.google.common.collect.ImmutableMap;
import com.google.common.collect.Iterables;
import com.google.common.collect.Sets;
import java.io.File;
import java.io.IOException;
import java.util.Arrays;
import java.util.List;
import java.util.Set;
import java.util.UUID;
import org.apache.avro.generic.GenericData;
import org.apache.iceberg.avro.Avro;
import org.apache.iceberg.avro.RandomAvroData;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.FileAppender;
import org.apache.iceberg.types.Conversions;
import org.apache.iceberg.types.Types;
import org.junit.Assert;
Expand Down Expand Up @@ -204,27 +196,4 @@ private Set<String> pathSet(DataFile... files) {
private Set<String> pathSet(Iterable<DataFile> files) {
return Sets.newHashSet(Iterables.transform(files, file -> file.path().toString()));
}

private DataFile createDataFile(File dataPath, String partValue) throws IOException {
List<GenericData.Record> expected = RandomAvroData.generate(SCHEMA, 100, 0L);

File dataFile = new File(dataPath, FileFormat.AVRO.addExtension(UUID.randomUUID().toString()));
try (FileAppender<GenericData.Record> writer = Avro.write(Files.localOutput(dataFile))
.schema(SCHEMA)
.named("test")
.build()) {
for (GenericData.Record rec : expected) {
rec.put("part", partValue); // create just one partition
writer.add(rec);
}
}

PartitionData partition = new PartitionData(SPEC.partitionType());
partition.set(0, partValue);
return DataFiles.builder(SPEC)
.withInputFile(Files.localInput(dataFile))
.withPartition(partition)
.withRecordCount(100)
.build();
}
}

0 comments on commit e36373a

Please sign in to comment.