Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .baseline/checkstyle/checkstyle.xml
Original file line number Diff line number Diff line change
Expand Up @@ -120,6 +120,7 @@
org.apache.iceberg.NullOrder.*,
org.apache.iceberg.MetadataTableType.*,
org.apache.iceberg.MetadataColumns.*,
org.apache.iceberg.PlanningMode.*,
org.apache.iceberg.SortDirection.*,
org.apache.iceberg.TableProperties.*,
org.apache.iceberg.types.Type.*,
Expand Down
392 changes: 392 additions & 0 deletions core/src/main/java/org/apache/iceberg/BaseDistributedDataScan.java

Large diffs are not rendered by default.

20 changes: 17 additions & 3 deletions core/src/main/java/org/apache/iceberg/BaseScan.java
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@
import org.apache.iceberg.expressions.Binder;
import org.apache.iceberg.expressions.Expression;
import org.apache.iceberg.expressions.Expressions;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.metrics.MetricsReporter;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
Expand Down Expand Up @@ -58,7 +59,7 @@ abstract class BaseScan<ThisT, T extends ScanTask, G extends ScanTaskGroup<T>>
"upper_bounds",
"column_sizes");

private static final List<String> SCAN_WITH_STATS_COLUMNS =
protected static final List<String> SCAN_WITH_STATS_COLUMNS =
ImmutableList.<String>builder().addAll(SCAN_COLUMNS).addAll(STATS_COLUMNS).build();

protected static final List<String> DELETE_SCAN_COLUMNS =
Expand All @@ -73,12 +74,13 @@ abstract class BaseScan<ThisT, T extends ScanTask, G extends ScanTaskGroup<T>>
"record_count",
"partition",
"key_metadata",
"split_offsets");
"split_offsets",
"equality_ids");

protected static final List<String> DELETE_SCAN_WITH_STATS_COLUMNS =
ImmutableList.<String>builder().addAll(DELETE_SCAN_COLUMNS).addAll(STATS_COLUMNS).build();

private static final boolean PLAN_SCANS_WITH_WORKER_POOL =
protected static final boolean PLAN_SCANS_WITH_WORKER_POOL =
SystemConfigs.SCAN_THREAD_POOL_ENABLED.value();

private final Table table;
Expand All @@ -95,6 +97,10 @@ public Table table() {
return table;
}

protected FileIO io() {
return table.io();
}

protected Schema tableSchema() {
return schema;
}
Expand All @@ -111,10 +117,18 @@ protected List<String> scanColumns() {
return context.returnColumnStats() ? SCAN_WITH_STATS_COLUMNS : SCAN_COLUMNS;
}

protected boolean shouldReturnColumnStats() {
return context().returnColumnStats();
}

protected boolean shouldIgnoreResiduals() {
return context().ignoreResiduals();
}

protected Expression residualFilter() {
return shouldIgnoreResiduals() ? Expressions.alwaysTrue() : filter();
}

protected boolean shouldPlanWithExecutor() {
return PLAN_SCANS_WITH_WORKER_POOL || context().planWithCustomizedExecutor();
}
Expand Down
70 changes: 70 additions & 0 deletions core/src/main/java/org/apache/iceberg/DataScan.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg;

import java.util.List;
import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList;

abstract class DataScan<ThisT, T extends ScanTask, G extends ScanTaskGroup<T>>
extends SnapshotScan<ThisT, T, G> {

protected DataScan(Table table, Schema schema, TableScanContext context) {
super(table, schema, context);
}

@Override
protected boolean useSnapshotSchema() {
return true;
}

protected ManifestGroup newManifestGroup(
List<ManifestFile> dataManifests, List<ManifestFile> deleteManifests) {
return newManifestGroup(dataManifests, deleteManifests, context().returnColumnStats());
}

protected ManifestGroup newManifestGroup(
List<ManifestFile> dataManifests, boolean withColumnStats) {
return newManifestGroup(dataManifests, ImmutableList.of(), withColumnStats);
}

protected ManifestGroup newManifestGroup(
List<ManifestFile> dataManifests,
List<ManifestFile> deleteManifests,
boolean withColumnStats) {

ManifestGroup manifestGroup =
new ManifestGroup(io(), dataManifests, deleteManifests)
.caseSensitive(isCaseSensitive())
.select(withColumnStats ? SCAN_WITH_STATS_COLUMNS : SCAN_COLUMNS)
.filterData(filter())
.specsById(table().specs())
.scanMetrics(scanMetrics())
.ignoreDeleted();

if (shouldIgnoreResiduals()) {
manifestGroup = manifestGroup.ignoreResiduals();
}

if (shouldPlanWithExecutor() && (dataManifests.size() > 1 || deleteManifests.size() > 1)) {
manifestGroup = manifestGroup.planWith(planExecutor());
}

return manifestGroup;
}
}
9 changes: 2 additions & 7 deletions core/src/main/java/org/apache/iceberg/DataTableScan.java
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@
import org.apache.iceberg.io.CloseableIterable;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.relocated.com.google.common.base.Preconditions;
import org.apache.iceberg.util.SnapshotUtil;

public class DataTableScan extends BaseTableScan {
protected DataTableScan(Table table, Schema schema, TableScanContext context) {
Expand Down Expand Up @@ -52,12 +51,8 @@ public TableScan appendsAfter(long fromSnapshotId) {
}

@Override
public TableScan useSnapshot(long scanSnapshotId) {
// call method in superclass just for the side effect of argument validation;
// we do not use its return value
super.useSnapshot(scanSnapshotId);
Schema snapshotSchema = SnapshotUtil.schemaFor(table(), scanSnapshotId);
return newRefinedScan(table(), snapshotSchema, context().useSnapshotId(scanSnapshotId));
protected boolean useSnapshotSchema() {
return true;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

big difference from the previous implementation here? I assume i'll see later that you moved the validation out?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The validation is in the parent so that I can reuse it in both scans now.

}

@Override
Expand Down
4 changes: 4 additions & 0 deletions core/src/main/java/org/apache/iceberg/DeleteFileIndex.java
Original file line number Diff line number Diff line change
Expand Up @@ -142,6 +142,10 @@ DeleteFile[] forEntry(ManifestEntry<DataFile> entry) {
return forDataFile(entry.dataSequenceNumber(), entry.file());
}

DeleteFile[] forDataFile(DataFile file) {
return forDataFile(file.dataSequenceNumber(), file);
}

DeleteFile[] forDataFile(long sequenceNumber, DataFile file) {
if (isEmpty) {
return NO_DELETES;
Expand Down
21 changes: 15 additions & 6 deletions core/src/main/java/org/apache/iceberg/ManifestGroup.java
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.iceberg.io.CloseableIterator;
import org.apache.iceberg.io.FileIO;
import org.apache.iceberg.metrics.ScanMetrics;
import org.apache.iceberg.metrics.ScanMetricsUtil;
import org.apache.iceberg.relocated.com.google.common.collect.Iterables;
import org.apache.iceberg.relocated.com.google.common.collect.Lists;
import org.apache.iceberg.relocated.com.google.common.collect.Sets;
Expand Down Expand Up @@ -222,6 +223,19 @@ public CloseableIterable<ManifestEntry<DataFile>> entries() {
return CloseableIterable.concat(entries((manifest, entries) -> entries));
}

/**
* Returns an iterable for groups of data files in the set of manifests.
*
* <p>Files are not copied, it is the caller's responsibility to make defensive copies if adding
* these files to a collection.
*
* @return an iterable of file groups
*/
public Iterable<CloseableIterable<DataFile>> fileGroups() {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why is this called fileGroups? It looks like it produces the file from every entry, not a group.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I called it fileGroups because instead of combining entries from all manifests into a single iterable, it returns an iterable of iterables where each element represents content of one manifest. Let me know if that makes sense.

return entries(
(manifest, entries) -> CloseableIterable.transform(entries, ManifestEntry::file));
}

private <T> Iterable<CloseableIterable<T>> entries(
BiFunction<ManifestFile, CloseableIterable<ManifestEntry<DataFile>>, CloseableIterable<T>>
entryFn) {
Expand Down Expand Up @@ -349,12 +363,7 @@ private static CloseableIterable<FileScanTask> createFileScanTasks(
entry -> {
DataFile dataFile = entry.file().copy(ctx.shouldKeepStats());
DeleteFile[] deleteFiles = ctx.deletes().forEntry(entry);
for (DeleteFile deleteFile : deleteFiles) {
ctx.scanMetrics().totalDeleteFileSizeInBytes().increment(deleteFile.fileSizeInBytes());
}
ctx.scanMetrics().totalFileSizeInBytes().increment(dataFile.fileSizeInBytes());
ctx.scanMetrics().resultDataFiles().increment();
ctx.scanMetrics().resultDeleteFiles().increment((long) deleteFiles.length);
ScanMetricsUtil.fileTask(ctx.scanMetrics(), dataFile, deleteFiles);
return new BaseFileScanTask(
dataFile, deleteFiles, ctx.schemaAsString(), ctx.specAsString(), ctx.residuals());
});
Expand Down
54 changes: 54 additions & 0 deletions core/src/main/java/org/apache/iceberg/PlanningMode.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,54 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.iceberg;

import org.apache.iceberg.relocated.com.google.common.base.Preconditions;

public enum PlanningMode {
AUTO("auto"),
LOCAL("local"),
DISTRIBUTED("distributed");

private final String modeName;

PlanningMode(String modeName) {
this.modeName = modeName;
}

public static PlanningMode fromName(String modeName) {
Preconditions.checkArgument(modeName != null, "Mode name is null");

if (AUTO.modeName().equalsIgnoreCase(modeName)) {
return AUTO;

} else if (LOCAL.modeName().equalsIgnoreCase(modeName)) {
return LOCAL;

} else if (DISTRIBUTED.modeName().equalsIgnoreCase(modeName)) {
return DISTRIBUTED;

} else {
throw new IllegalArgumentException("Unknown planning mode: " + modeName);
}
}

public String modeName() {
return modeName;
}
}
10 changes: 9 additions & 1 deletion core/src/main/java/org/apache/iceberg/SnapshotScan.java
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,11 @@ protected Long snapshotId() {

protected abstract CloseableIterable<T> doPlanFiles();

// controls whether to use the snapshot schema while time travelling
protected boolean useSnapshotSchema() {
return false;
}

protected ScanMetrics scanMetrics() {
if (scanMetrics == null) {
this.scanMetrics = ScanMetrics.of(new DefaultMetricsContext());
Expand All @@ -81,7 +86,10 @@ public ThisT useSnapshot(long scanSnapshotId) {
table().snapshot(scanSnapshotId) != null,
"Cannot find snapshot with ID %s",
scanSnapshotId);
return newRefinedScan(table(), tableSchema(), context().useSnapshotId(scanSnapshotId));
Schema newSchema =
useSnapshotSchema() ? SnapshotUtil.schemaFor(table(), scanSnapshotId) : tableSchema();
TableScanContext newContext = context().useSnapshotId(scanSnapshotId);
return newRefinedScan(table(), newSchema, newContext);
}

public ThisT useRef(String name) {
Expand Down
4 changes: 4 additions & 0 deletions core/src/main/java/org/apache/iceberg/TableProperties.java
Original file line number Diff line number Diff line change
Expand Up @@ -231,6 +231,10 @@ private TableProperties() {}
public static final String ORC_BATCH_SIZE = "read.orc.vectorization.batch-size";
public static final int ORC_BATCH_SIZE_DEFAULT = 5000;

public static final String DATA_PLANNING_MODE = "read.data-planning-mode";
public static final String DELETE_PLANNING_MODE = "read.delete-planning-mode";
public static final String PLANNING_MODE_DEFAULT = PlanningMode.AUTO.modeName();

public static final String OBJECT_STORE_ENABLED = "write.object-storage.enabled";
public static final boolean OBJECT_STORE_ENABLED_DEFAULT = false;

Expand Down
14 changes: 14 additions & 0 deletions core/src/main/java/org/apache/iceberg/metrics/ScanMetricsUtil.java
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
*/
package org.apache.iceberg.metrics;

import org.apache.iceberg.DataFile;
import org.apache.iceberg.DeleteFile;
import org.apache.iceberg.FileContent;

Expand All @@ -34,4 +35,17 @@ public static void indexedDeleteFile(ScanMetrics metrics, DeleteFile deleteFile)
metrics.equalityDeleteFiles().increment();
}
}

public static void fileTask(ScanMetrics metrics, DataFile dataFile, DeleteFile[] deleteFiles) {
metrics.totalFileSizeInBytes().increment(dataFile.fileSizeInBytes());
metrics.resultDataFiles().increment();
metrics.resultDeleteFiles().increment(deleteFiles.length);

long deletesSizeInBytes = 0L;
for (DeleteFile deleteFile : deleteFiles) {
deletesSizeInBytes += deleteFile.fileSizeInBytes();
}

metrics.totalDeleteFileSizeInBytes().increment(deletesSizeInBytes);
}
}
Loading