diff --git a/core/src/main/java/org/apache/iceberg/BaseScan.java b/core/src/main/java/org/apache/iceberg/BaseScan.java index 9db72227ac3f..953ad754aa1b 100644 --- a/core/src/main/java/org/apache/iceberg/BaseScan.java +++ b/core/src/main/java/org/apache/iceberg/BaseScan.java @@ -254,6 +254,6 @@ private static Schema lazyColumnProjection(TableScanContext context, Schema sche @Override public ThisT metricsReporter(MetricsReporter reporter) { - return newRefinedScan(table(), schema(), context().reportWith(reporter)); + return newRefinedScan(table, schema, context.reportWith(reporter)); } } diff --git a/core/src/main/java/org/apache/iceberg/metrics/InMemoryMetricsReporter.java b/core/src/main/java/org/apache/iceberg/metrics/InMemoryMetricsReporter.java new file mode 100644 index 000000000000..79b446c0ddbf --- /dev/null +++ b/core/src/main/java/org/apache/iceberg/metrics/InMemoryMetricsReporter.java @@ -0,0 +1,38 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.metrics; + +import org.apache.iceberg.relocated.com.google.common.base.Preconditions; + +public class InMemoryMetricsReporter implements MetricsReporter { + + private MetricsReport metricsReport; + + @Override + public void report(MetricsReport report) { + this.metricsReport = report; + } + + public ScanReport scanReport() { + Preconditions.checkArgument( + metricsReport == null || metricsReport instanceof ScanReport, + "Metrics report is not a scan report"); + return (ScanReport) metricsReport; + } +} diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java index dd493fbc5097..036c395f7115 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkBatchQueryScan.java @@ -22,6 +22,7 @@ import java.util.Map; import java.util.Objects; import java.util.Set; +import java.util.function.Supplier; import java.util.stream.Collectors; import org.apache.iceberg.PartitionField; import org.apache.iceberg.PartitionScanTask; @@ -39,6 +40,7 @@ import org.apache.iceberg.expressions.ExpressionUtil; import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.expressions.Projections; +import org.apache.iceberg.metrics.ScanReport; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.relocated.com.google.common.collect.Sets; @@ -73,9 +75,9 @@ class SparkBatchQueryScan extends SparkPartitioningAwareScan Scan> scan, SparkReadConf readConf, Schema expectedSchema, - List filters) { - - super(spark, table, scan, readConf, expectedSchema, filters); + List filters, + Supplier scanReportSupplier) { + super(spark, table, scan, readConf, expectedSchema, filters, scanReportSupplier); this.snapshotId = readConf.snapshotId(); this.startSnapshotId = readConf.startSnapshotId(); diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java index d978b81e67bd..16eb9c51df5c 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkCopyOnWriteScan.java @@ -22,6 +22,7 @@ import java.util.List; import java.util.Objects; import java.util.Set; +import java.util.function.Supplier; import java.util.stream.Collectors; import org.apache.iceberg.BatchScan; import org.apache.iceberg.FileScanTask; @@ -30,6 +31,7 @@ import org.apache.iceberg.Snapshot; import org.apache.iceberg.Table; import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.metrics.ScanReport; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.Sets; import org.apache.iceberg.spark.SparkReadConf; @@ -57,8 +59,9 @@ class SparkCopyOnWriteScan extends SparkPartitioningAwareScan Table table, SparkReadConf readConf, Schema expectedSchema, - List filters) { - this(spark, table, null, null, readConf, expectedSchema, filters); + List filters, + Supplier scanReportSupplier) { + this(spark, table, null, null, readConf, expectedSchema, filters, scanReportSupplier); } SparkCopyOnWriteScan( @@ -68,9 +71,9 @@ class SparkCopyOnWriteScan extends SparkPartitioningAwareScan Snapshot snapshot, SparkReadConf readConf, Schema expectedSchema, - List filters) { - - super(spark, table, scan, readConf, expectedSchema, filters); + List filters, + Supplier scanReportSupplier) { + super(spark, table, scan, readConf, expectedSchema, filters, scanReportSupplier); this.snapshot = snapshot; diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitioningAwareScan.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitioningAwareScan.java index cf274f794e52..6538268697e2 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitioningAwareScan.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkPartitioningAwareScan.java @@ -24,6 +24,7 @@ import java.util.List; import java.util.Map; import java.util.Set; +import java.util.function.Supplier; import java.util.stream.Collectors; import java.util.stream.IntStream; import org.apache.iceberg.PartitionField; @@ -37,6 +38,7 @@ import org.apache.iceberg.exceptions.ValidationException; import org.apache.iceberg.expressions.Expression; import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.metrics.ScanReport; import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.relocated.com.google.common.collect.Maps; import org.apache.iceberg.spark.Spark3Util; @@ -74,9 +76,9 @@ abstract class SparkPartitioningAwareScan extends S Scan> scan, SparkReadConf readConf, Schema expectedSchema, - List filters) { - - super(spark, table, readConf, expectedSchema, filters); + List filters, + Supplier scanReportSupplier) { + super(spark, table, readConf, expectedSchema, filters, scanReportSupplier); this.scan = scan; this.preserveDataGrouping = readConf.preserveDataGrouping(); diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java index b47818ec550d..65c5e04f31a1 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScan.java @@ -21,6 +21,7 @@ import java.util.Collections; import java.util.List; import java.util.Map; +import java.util.function.Supplier; import java.util.stream.Collectors; import org.apache.iceberg.ScanTaskGroup; import org.apache.iceberg.Schema; @@ -28,17 +29,32 @@ import org.apache.iceberg.SnapshotSummary; import org.apache.iceberg.Table; import org.apache.iceberg.expressions.Expression; +import org.apache.iceberg.metrics.ScanReport; +import org.apache.iceberg.relocated.com.google.common.collect.Lists; import org.apache.iceberg.spark.Spark3Util; import org.apache.iceberg.spark.SparkReadConf; import org.apache.iceberg.spark.SparkSchemaUtil; import org.apache.iceberg.spark.source.metrics.NumDeletes; import org.apache.iceberg.spark.source.metrics.NumSplits; +import org.apache.iceberg.spark.source.metrics.ScannedDataFiles; +import org.apache.iceberg.spark.source.metrics.ScannedDataManifests; +import org.apache.iceberg.spark.source.metrics.SkippedDataFiles; +import org.apache.iceberg.spark.source.metrics.SkippedDataManifests; +import org.apache.iceberg.spark.source.metrics.TaskScannedDataFiles; +import org.apache.iceberg.spark.source.metrics.TaskScannedDataManifests; +import org.apache.iceberg.spark.source.metrics.TaskSkippedDataFiles; +import org.apache.iceberg.spark.source.metrics.TaskSkippedDataManifests; +import org.apache.iceberg.spark.source.metrics.TaskTotalFileSize; +import org.apache.iceberg.spark.source.metrics.TaskTotalPlanningDuration; +import org.apache.iceberg.spark.source.metrics.TotalFileSize; +import org.apache.iceberg.spark.source.metrics.TotalPlanningDuration; import org.apache.iceberg.types.Types; import org.apache.iceberg.util.PropertyUtil; import org.apache.iceberg.util.SnapshotUtil; import org.apache.spark.api.java.JavaSparkContext; import org.apache.spark.sql.SparkSession; import org.apache.spark.sql.connector.metric.CustomMetric; +import org.apache.spark.sql.connector.metric.CustomTaskMetric; import org.apache.spark.sql.connector.read.Batch; import org.apache.spark.sql.connector.read.Scan; import org.apache.spark.sql.connector.read.Statistics; @@ -58,6 +74,7 @@ abstract class SparkScan implements Scan, SupportsReportStatistics { private final Schema expectedSchema; private final List filterExpressions; private final String branch; + private final Supplier scanReportSupplier; // lazy variables private StructType readSchema; @@ -67,7 +84,8 @@ abstract class SparkScan implements Scan, SupportsReportStatistics { Table table, SparkReadConf readConf, Schema expectedSchema, - List filters) { + List filters, + Supplier scanReportSupplier) { Schema snapshotSchema = SnapshotUtil.schemaFor(table, readConf.branch()); SparkSchemaUtil.validateMetadataColumnReferences(snapshotSchema, expectedSchema); @@ -78,6 +96,7 @@ abstract class SparkScan implements Scan, SupportsReportStatistics { this.expectedSchema = expectedSchema; this.filterExpressions = filters != null ? filters : Collections.emptyList(); this.branch = readConf.branch(); + this.scanReportSupplier = scanReportSupplier; } protected Table table() { @@ -170,8 +189,36 @@ public String description() { table(), branch(), Spark3Util.describe(filterExpressions), groupingKeyFieldNamesAsString); } + @Override + public CustomTaskMetric[] reportDriverMetrics() { + ScanReport scanReport = scanReportSupplier != null ? scanReportSupplier.get() : null; + + if (scanReport == null) { + return new CustomTaskMetric[0]; + } + + List driverMetrics = Lists.newArrayList(); + driverMetrics.add(TaskTotalFileSize.from(scanReport)); + driverMetrics.add(TaskTotalPlanningDuration.from(scanReport)); + driverMetrics.add(TaskSkippedDataFiles.from(scanReport)); + driverMetrics.add(TaskScannedDataFiles.from(scanReport)); + driverMetrics.add(TaskSkippedDataManifests.from(scanReport)); + driverMetrics.add(TaskScannedDataManifests.from(scanReport)); + + return driverMetrics.toArray(new CustomTaskMetric[0]); + } + @Override public CustomMetric[] supportedCustomMetrics() { - return new CustomMetric[] {new NumSplits(), new NumDeletes()}; + return new CustomMetric[] { + new NumSplits(), + new NumDeletes(), + new TotalFileSize(), + new TotalPlanningDuration(), + new ScannedDataManifests(), + new SkippedDataManifests(), + new ScannedDataFiles(), + new SkippedDataFiles() + }; } } diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java index ddeec9c4943b..d19368cec819 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkScanBuilder.java @@ -43,6 +43,7 @@ import org.apache.iceberg.expressions.ExpressionUtil; import org.apache.iceberg.expressions.Expressions; import org.apache.iceberg.io.CloseableIterable; +import org.apache.iceberg.metrics.InMemoryMetricsReporter; import org.apache.iceberg.relocated.com.google.common.base.Preconditions; import org.apache.iceberg.relocated.com.google.common.collect.ImmutableList; import org.apache.iceberg.relocated.com.google.common.collect.Lists; @@ -91,6 +92,7 @@ public class SparkScanBuilder private final CaseInsensitiveStringMap options; private final SparkReadConf readConf; private final List metaColumns = Lists.newArrayList(); + private final InMemoryMetricsReporter metricsReporter; private Schema schema = null; private boolean caseSensitive; @@ -109,6 +111,7 @@ public class SparkScanBuilder this.options = options; this.readConf = new SparkReadConf(spark, table, branch, options); this.caseSensitive = readConf.caseSensitive(); + this.metricsReporter = new InMemoryMetricsReporter(); } SparkScanBuilder(SparkSession spark, Table table, CaseInsensitiveStringMap options) { @@ -430,7 +433,8 @@ private Scan buildBatchScan(Long snapshotId, Long asOfTimestamp, String branch, .newBatchScan() .caseSensitive(caseSensitive) .filter(filterExpression()) - .project(expectedSchema); + .project(expectedSchema) + .metricsReporter(metricsReporter); if (snapshotId != null) { scan = scan.useSnapshot(snapshotId); @@ -450,7 +454,14 @@ private Scan buildBatchScan(Long snapshotId, Long asOfTimestamp, String branch, scan = configureSplitPlanning(scan); - return new SparkBatchQueryScan(spark, table, scan, readConf, expectedSchema, filterExpressions); + return new SparkBatchQueryScan( + spark, + table, + scan, + readConf, + expectedSchema, + filterExpressions, + metricsReporter::scanReport); } private Scan buildIncrementalAppendScan(long startSnapshotId, Long endSnapshotId) { @@ -470,7 +481,14 @@ private Scan buildIncrementalAppendScan(long startSnapshotId, Long endSnapshotId scan = configureSplitPlanning(scan); - return new SparkBatchQueryScan(spark, table, scan, readConf, expectedSchema, filterExpressions); + return new SparkBatchQueryScan( + spark, + table, + scan, + readConf, + expectedSchema, + filterExpressions, + metricsReporter::scanReport); } public Scan buildChangelogScan() { @@ -573,7 +591,13 @@ public Scan buildMergeOnReadScan() { if (snapshot == null) { return new SparkBatchQueryScan( - spark, table, null, readConf, schemaWithMetadataColumns(), filterExpressions); + spark, + table, + null, + readConf, + schemaWithMetadataColumns(), + filterExpressions, + metricsReporter::scanReport); } // remember the current snapshot ID for commit validation @@ -597,7 +621,13 @@ public Scan buildMergeOnReadScan() { scan = configureSplitPlanning(scan); return new SparkBatchQueryScan( - spark, table, scan, adjustedReadConf, expectedSchema, filterExpressions); + spark, + table, + scan, + adjustedReadConf, + expectedSchema, + filterExpressions, + metricsReporter::scanReport); } public Scan buildCopyOnWriteScan() { @@ -605,7 +635,12 @@ public Scan buildCopyOnWriteScan() { if (snapshot == null) { return new SparkCopyOnWriteScan( - spark, table, readConf, schemaWithMetadataColumns(), filterExpressions); + spark, + table, + readConf, + schemaWithMetadataColumns(), + filterExpressions, + metricsReporter::scanReport); } Schema expectedSchema = schemaWithMetadataColumns(); @@ -622,7 +657,14 @@ public Scan buildCopyOnWriteScan() { scan = configureSplitPlanning(scan); return new SparkCopyOnWriteScan( - spark, table, scan, snapshot, readConf, expectedSchema, filterExpressions); + spark, + table, + scan, + snapshot, + readConf, + expectedSchema, + filterExpressions, + metricsReporter::scanReport); } private > T configureSplitPlanning(T scan) { diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScan.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScan.java index 89b184c91c51..0290bf7e84ce 100644 --- a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScan.java +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/SparkStagedScan.java @@ -40,7 +40,7 @@ class SparkStagedScan extends SparkScan { private List> taskGroups = null; // lazy cache of tasks SparkStagedScan(SparkSession spark, Table table, SparkReadConf readConf) { - super(spark, table, readConf, table.schema(), ImmutableList.of()); + super(spark, table, readConf, table.schema(), ImmutableList.of(), null); this.taskSetId = readConf.scanTaskSetId(); this.splitSize = readConf.splitSize(); diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/ScannedDataFiles.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/ScannedDataFiles.java new file mode 100644 index 000000000000..f453872fdc29 --- /dev/null +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/ScannedDataFiles.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source.metrics; + +import org.apache.spark.sql.connector.metric.CustomSumMetric; + +public class ScannedDataFiles extends CustomSumMetric { + + static final String NAME = "scannedDataFiles"; + + @Override + public String name() { + return NAME; + } + + @Override + public String description() { + return "number of scanned data files"; + } +} diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/ScannedDataManifests.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/ScannedDataManifests.java new file mode 100644 index 000000000000..a167904280e6 --- /dev/null +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/ScannedDataManifests.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source.metrics; + +import org.apache.spark.sql.connector.metric.CustomSumMetric; + +public class ScannedDataManifests extends CustomSumMetric { + + static final String NAME = "scannedDataManifests"; + + @Override + public String name() { + return NAME; + } + + @Override + public String description() { + return "number of scanned data manifests"; + } +} diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/SkippedDataFiles.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/SkippedDataFiles.java new file mode 100644 index 000000000000..7fd17425313d --- /dev/null +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/SkippedDataFiles.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source.metrics; + +import org.apache.spark.sql.connector.metric.CustomSumMetric; + +public class SkippedDataFiles extends CustomSumMetric { + + static final String NAME = "skippedDataFiles"; + + @Override + public String name() { + return NAME; + } + + @Override + public String description() { + return "number of skipped data files"; + } +} diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/SkippedDataManifests.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/SkippedDataManifests.java new file mode 100644 index 000000000000..b0eaeb5d87f2 --- /dev/null +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/SkippedDataManifests.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source.metrics; + +import org.apache.spark.sql.connector.metric.CustomSumMetric; + +public class SkippedDataManifests extends CustomSumMetric { + + static final String NAME = "skippedDataManifests"; + + @Override + public String name() { + return NAME; + } + + @Override + public String description() { + return "number of skipped data manifests"; + } +} diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskScannedDataFiles.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskScannedDataFiles.java new file mode 100644 index 000000000000..d9a527da08f6 --- /dev/null +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskScannedDataFiles.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source.metrics; + +import org.apache.iceberg.metrics.CounterResult; +import org.apache.iceberg.metrics.ScanReport; +import org.apache.spark.sql.connector.metric.CustomTaskMetric; + +public class TaskScannedDataFiles implements CustomTaskMetric { + private final long value; + + private TaskScannedDataFiles(long value) { + this.value = value; + } + + @Override + public String name() { + return ScannedDataFiles.NAME; + } + + @Override + public long value() { + return value; + } + + public static TaskScannedDataFiles from(ScanReport scanReport) { + CounterResult counter = scanReport.scanMetrics().resultDataFiles(); + long value = counter != null ? counter.value() : 0L; + return new TaskScannedDataFiles(value); + } +} diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskScannedDataManifests.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskScannedDataManifests.java new file mode 100644 index 000000000000..09dd0339910c --- /dev/null +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskScannedDataManifests.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source.metrics; + +import org.apache.iceberg.metrics.CounterResult; +import org.apache.iceberg.metrics.ScanReport; +import org.apache.spark.sql.connector.metric.CustomTaskMetric; + +public class TaskScannedDataManifests implements CustomTaskMetric { + private final long value; + + private TaskScannedDataManifests(long value) { + this.value = value; + } + + @Override + public String name() { + return ScannedDataManifests.NAME; + } + + @Override + public long value() { + return value; + } + + public static TaskScannedDataManifests from(ScanReport scanReport) { + CounterResult counter = scanReport.scanMetrics().scannedDataManifests(); + long value = counter != null ? counter.value() : 0L; + return new TaskScannedDataManifests(value); + } +} diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskSkippedDataFiles.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskSkippedDataFiles.java new file mode 100644 index 000000000000..5165f9a3116c --- /dev/null +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskSkippedDataFiles.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source.metrics; + +import org.apache.iceberg.metrics.CounterResult; +import org.apache.iceberg.metrics.ScanReport; +import org.apache.spark.sql.connector.metric.CustomTaskMetric; + +public class TaskSkippedDataFiles implements CustomTaskMetric { + private final long value; + + private TaskSkippedDataFiles(long value) { + this.value = value; + } + + @Override + public String name() { + return SkippedDataFiles.NAME; + } + + @Override + public long value() { + return value; + } + + public static TaskSkippedDataFiles from(ScanReport scanReport) { + CounterResult counter = scanReport.scanMetrics().skippedDataFiles(); + long value = counter != null ? counter.value() : 0L; + return new TaskSkippedDataFiles(value); + } +} diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskSkippedDataManifests.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskSkippedDataManifests.java new file mode 100644 index 000000000000..86fef8c4118b --- /dev/null +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskSkippedDataManifests.java @@ -0,0 +1,47 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source.metrics; + +import org.apache.iceberg.metrics.CounterResult; +import org.apache.iceberg.metrics.ScanReport; +import org.apache.spark.sql.connector.metric.CustomTaskMetric; + +public class TaskSkippedDataManifests implements CustomTaskMetric { + private final long value; + + private TaskSkippedDataManifests(long value) { + this.value = value; + } + + @Override + public String name() { + return SkippedDataManifests.NAME; + } + + @Override + public long value() { + return value; + } + + public static TaskSkippedDataManifests from(ScanReport scanReport) { + CounterResult counter = scanReport.scanMetrics().skippedDataManifests(); + long value = counter != null ? counter.value() : 0L; + return new TaskSkippedDataManifests(value); + } +} diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskTotalFileSize.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskTotalFileSize.java new file mode 100644 index 000000000000..c300d835e777 --- /dev/null +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskTotalFileSize.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source.metrics; + +import org.apache.iceberg.metrics.CounterResult; +import org.apache.iceberg.metrics.ScanReport; +import org.apache.spark.sql.connector.metric.CustomTaskMetric; + +public class TaskTotalFileSize implements CustomTaskMetric { + + private final long value; + + private TaskTotalFileSize(long value) { + this.value = value; + } + + @Override + public String name() { + return TotalFileSize.NAME; + } + + @Override + public long value() { + return value; + } + + public static TaskTotalFileSize from(ScanReport scanReport) { + CounterResult counter = scanReport.scanMetrics().totalFileSizeInBytes(); + long value = counter != null ? counter.value() : 0L; + return new TaskTotalFileSize(value); + } +} diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskTotalPlanningDuration.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskTotalPlanningDuration.java new file mode 100644 index 000000000000..32ac6fde8bf3 --- /dev/null +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TaskTotalPlanningDuration.java @@ -0,0 +1,48 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source.metrics; + +import org.apache.iceberg.metrics.ScanReport; +import org.apache.iceberg.metrics.TimerResult; +import org.apache.spark.sql.connector.metric.CustomTaskMetric; + +public class TaskTotalPlanningDuration implements CustomTaskMetric { + + private final long value; + + private TaskTotalPlanningDuration(long value) { + this.value = value; + } + + @Override + public String name() { + return TotalPlanningDuration.NAME; + } + + @Override + public long value() { + return value; + } + + public static TaskTotalPlanningDuration from(ScanReport scanReport) { + TimerResult timerResult = scanReport.scanMetrics().totalPlanningDuration(); + long value = timerResult != null ? timerResult.totalDuration().toMillis() : -1; + return new TaskTotalPlanningDuration(value); + } +} diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalFileSize.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalFileSize.java new file mode 100644 index 000000000000..994626e54f10 --- /dev/null +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalFileSize.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source.metrics; + +import org.apache.spark.sql.connector.metric.CustomSumMetric; + +public class TotalFileSize extends CustomSumMetric { + + static final String NAME = "totalFileSize"; + + @Override + public String name() { + return NAME; + } + + @Override + public String description() { + return "total file size (bytes)"; + } +} diff --git a/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalPlanningDuration.java b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalPlanningDuration.java new file mode 100644 index 000000000000..8b66eeac4046 --- /dev/null +++ b/spark/v3.4/spark/src/main/java/org/apache/iceberg/spark/source/metrics/TotalPlanningDuration.java @@ -0,0 +1,36 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source.metrics; + +import org.apache.spark.sql.connector.metric.CustomSumMetric; + +public class TotalPlanningDuration extends CustomSumMetric { + + static final String NAME = "totalPlanningDuration"; + + @Override + public String name() { + return NAME; + } + + @Override + public String description() { + return "total planning duration (ms)"; + } +} diff --git a/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReadMetrics.java b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReadMetrics.java new file mode 100644 index 000000000000..7b943372d167 --- /dev/null +++ b/spark/v3.4/spark/src/test/java/org/apache/iceberg/spark/source/TestSparkReadMetrics.java @@ -0,0 +1,90 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, + * software distributed under the License is distributed on an + * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY + * KIND, either express or implied. See the License for the + * specific language governing permissions and limitations + * under the License. + */ +package org.apache.iceberg.spark.source; + +import static scala.collection.JavaConverters.seqAsJavaListConverter; + +import java.util.List; +import java.util.Map; +import org.apache.iceberg.spark.SparkTestBaseWithCatalog; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.catalyst.analysis.NoSuchTableException; +import org.apache.spark.sql.execution.SparkPlan; +import org.apache.spark.sql.execution.metric.SQLMetric; +import org.assertj.core.api.Assertions; +import org.junit.After; +import org.junit.Test; +import scala.collection.JavaConverters; + +public class TestSparkReadMetrics extends SparkTestBaseWithCatalog { + + @After + public void removeTables() { + sql("DROP TABLE IF EXISTS %s", tableName); + } + + @Test + public void testReadMetricsForV1Table() throws NoSuchTableException { + sql( + "CREATE TABLE %s (id BIGINT) USING iceberg TBLPROPERTIES ('format-version'='2')", + tableName); + + spark.range(10000).coalesce(1).writeTo(tableName).append(); + spark.range(10001, 20000).coalesce(1).writeTo(tableName).append(); + + Dataset df = spark.sql(String.format("select * from %s where id < 10000", tableName)); + df.collect(); + + List sparkPlans = + seqAsJavaListConverter(df.queryExecution().executedPlan().collectLeaves()).asJava(); + Map metricsMap = + JavaConverters.mapAsJavaMapConverter(sparkPlans.get(0).metrics()).asJava(); + Assertions.assertThat(metricsMap.get("skippedDataFiles").value()).isEqualTo(1); + Assertions.assertThat(metricsMap.get("scannedDataManifests").value()).isEqualTo(2); + Assertions.assertThat(metricsMap.get("scannedDataFiles").value()).isEqualTo(1); + Assertions.assertThat(metricsMap.get("skippedDataManifests").value()).isEqualTo(0); + Assertions.assertThat(metricsMap.get("totalFileSize").value()).isNotEqualTo(0); + Assertions.assertThat(metricsMap.get("totalPlanningDuration").value()).isNotEqualTo(0); + } + + @Test + public void testReadMetricsForV2Table() throws NoSuchTableException { + sql( + "CREATE TABLE %s (id BIGINT) USING iceberg TBLPROPERTIES ('format-version'='2')", + tableName); + + spark.range(10000).coalesce(1).writeTo(tableName).append(); + spark.range(10001, 20000).coalesce(1).writeTo(tableName).append(); + + Dataset df = spark.sql(String.format("select * from %s where id < 10000", tableName)); + df.collect(); + + List sparkPlans = + seqAsJavaListConverter(df.queryExecution().executedPlan().collectLeaves()).asJava(); + Map metricsMap = + JavaConverters.mapAsJavaMapConverter(sparkPlans.get(0).metrics()).asJava(); + Assertions.assertThat(metricsMap.get("skippedDataFiles").value()).isEqualTo(1); + Assertions.assertThat(metricsMap.get("scannedDataManifests").value()).isEqualTo(2); + Assertions.assertThat(metricsMap.get("scannedDataFiles").value()).isEqualTo(1); + Assertions.assertThat(metricsMap.get("skippedDataManifests").value()).isEqualTo(0); + Assertions.assertThat(metricsMap.get("totalFileSize").value()).isNotEqualTo(0); + Assertions.assertThat(metricsMap.get("totalPlanningDuration").value()).isNotEqualTo(0); + } +}