Skip to content

Commit

Permalink
Add review comments
Browse files Browse the repository at this point in the history
  • Loading branch information
karuppayya committed Jul 14, 2023
1 parent 6a1e3e1 commit 6d55476
Show file tree
Hide file tree
Showing 10 changed files with 10 additions and 20 deletions.
Expand Up @@ -73,7 +73,6 @@ class SparkCopyOnWriteScan extends SparkPartitioningAwareScan<FileScanTask>
Schema expectedSchema,
List<Expression> filters,
Supplier<ScanReport> scanReportSupplier) {

super(spark, table, scan, readConf, expectedSchema, filters, scanReportSupplier);

this.snapshot = snapshot;
Expand Down
Expand Up @@ -78,7 +78,6 @@ abstract class SparkPartitioningAwareScan<T extends PartitionScanTask> extends S
Schema expectedSchema,
List<Expression> filters,
Supplier<ScanReport> scanReportSupplier) {

super(spark, table, readConf, expectedSchema, filters, scanReportSupplier);

this.scan = scan;
Expand Down
Expand Up @@ -36,6 +36,7 @@
import org.apache.iceberg.spark.SparkSchemaUtil;
import org.apache.iceberg.spark.source.metrics.NumDeletes;
import org.apache.iceberg.spark.source.metrics.NumSplits;
import org.apache.iceberg.spark.source.metrics.ScannedDataFiles;
import org.apache.iceberg.spark.source.metrics.ScannedDataManifests;
import org.apache.iceberg.spark.source.metrics.SkippedDataFiles;
import org.apache.iceberg.spark.source.metrics.SkippedDataManifests;
Expand All @@ -47,7 +48,6 @@
import org.apache.iceberg.spark.source.metrics.TaskTotalPlanningDuration;
import org.apache.iceberg.spark.source.metrics.TotalFileSize;
import org.apache.iceberg.spark.source.metrics.TotalPlanningDuration;
import org.apache.iceberg.spark.source.metrics.scannedDataFiles;
import org.apache.iceberg.types.Types;
import org.apache.iceberg.util.PropertyUtil;
import org.apache.iceberg.util.SnapshotUtil;
Expand Down Expand Up @@ -192,6 +192,7 @@ public String description() {
@Override
public CustomTaskMetric[] reportDriverMetrics() {
ScanReport scanReport = scanReportSupplier != null ? scanReportSupplier.get() : null;

if (scanReport == null) {
return new CustomTaskMetric[0];
}
Expand All @@ -216,7 +217,7 @@ public CustomMetric[] supportedCustomMetrics() {
new TotalPlanningDuration(),
new ScannedDataManifests(),
new SkippedDataManifests(),
new scannedDataFiles(),
new ScannedDataFiles(),
new SkippedDataFiles()
};
}
Expand Down
Expand Up @@ -20,7 +20,7 @@

import org.apache.spark.sql.connector.metric.CustomSumMetric;

public class scannedDataFiles extends CustomSumMetric {
public class ScannedDataFiles extends CustomSumMetric {

static final String NAME = "scannedDataFiles";

Expand Down
Expand Up @@ -33,13 +33,4 @@ public String name() {
public String description() {
return "number of scanned data manifests";
}

@Override
public String aggregateTaskMetrics(long[] taskMetrics) {
long sum = 0L;
for (long taskMetric : taskMetrics) {
sum += taskMetric;
}
return String.valueOf(sum);
}
}
Expand Up @@ -31,7 +31,7 @@ private TaskScannedDataFiles(long value) {

@Override
public String name() {
return scannedDataFiles.NAME;
return ScannedDataFiles.NAME;
}

@Override
Expand Down
Expand Up @@ -42,7 +42,7 @@ public long value() {

public static TaskTotalPlanningDuration from(ScanReport scanReport) {
TimerResult timerResult = scanReport.scanMetrics().totalPlanningDuration();
long value = timerResult != null ? timerResult.count() : -1;
long value = timerResult != null ? timerResult.totalDuration().toMillis() : -1;
return new TaskTotalPlanningDuration(value);
}
}
Expand Up @@ -31,6 +31,6 @@ public String name() {

@Override
public String description() {
return "total file size";
return "total file size (bytes)";
}
}
Expand Up @@ -31,6 +31,6 @@ public String name() {

@Override
public String description() {
return "total planning duration";
return "total planning duration (ms)";
}
}
Expand Up @@ -58,7 +58,7 @@ public void testReadMetricsForV1Table() throws NoSuchTableException {
JavaConverters.mapAsJavaMapConverter(sparkPlans.get(0).metrics()).asJava();
Assertions.assertThat(metricsMap.get("skippedDataFiles").value()).isEqualTo(1);
Assertions.assertThat(metricsMap.get("scannedDataManifests").value()).isEqualTo(2);
Assertions.assertThat(metricsMap.get("resultDataFiles").value()).isEqualTo(1);
Assertions.assertThat(metricsMap.get("scannedDataFiles").value()).isEqualTo(1);
}

@Test
Expand All @@ -79,6 +79,6 @@ public void testReadMetricsForV2Table() throws NoSuchTableException {
JavaConverters.mapAsJavaMapConverter(sparkPlans.get(0).metrics()).asJava();
Assertions.assertThat(metricsMap.get("skippedDataFiles").value()).isEqualTo(1);
Assertions.assertThat(metricsMap.get("scannedDataManifests").value()).isEqualTo(2);
Assertions.assertThat(metricsMap.get("resultDataFiles").value()).isEqualTo(1);
Assertions.assertThat(metricsMap.get("scannedDataFiles").value()).isEqualTo(1);
}
}

0 comments on commit 6d55476

Please sign in to comment.