Skip to content

Commit

Permalink
Format metrics; add (min, med, max) for timing metrics (#1054)
Browse files Browse the repository at this point in the history
  • Loading branch information
vishalkarve15 committed Sep 6, 2023
1 parent 566d2b4 commit 296f1b9
Show file tree
Hide file tree
Showing 12 changed files with 89 additions and 25 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,37 @@
package com.google.cloud.spark.bigquery.v2.customMetrics;

import static org.apache.spark.util.Utils.bytesToString;
import static org.apache.spark.util.Utils.msDurationToString;

import java.text.NumberFormat;
import java.util.Arrays;
import java.util.Locale;

class MetricUtils {

private static final String METRIC_FORMAT = "total (min, med, max)\n%s (%s, %s, %s)";
private static final NumberFormat NUMBER_FORMAT_US = NumberFormat.getIntegerInstance(Locale.US);

static String formatTimeMetrics(long[] taskMetrics) {
if (taskMetrics.length == 0) {
return msDurationToString(0);
}
if (taskMetrics.length == 1) {
return msDurationToString(taskMetrics[0]);
}
Arrays.sort(taskMetrics);
String sum = msDurationToString(Arrays.stream(taskMetrics).sum());
String min = msDurationToString(taskMetrics[0]);
String med = msDurationToString(taskMetrics[taskMetrics.length / 2]);
String max = msDurationToString(taskMetrics[taskMetrics.length - 1]);
return String.format(METRIC_FORMAT, sum, min, med, max);
}

static String formatSizeMetrics(long[] taskMetrics) {
return bytesToString(Arrays.stream(taskMetrics).sum());
}

static String formatSumMetrics(long[] taskMetrics) {
return NUMBER_FORMAT_US.format(Arrays.stream(taskMetrics).sum());
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
import static com.google.cloud.spark.bigquery.v2.customMetrics.SparkBigQueryCustomMetricConstants.BIG_QUERY_BYTES_READ_METRIC_DESCRIPTION;
import static com.google.cloud.spark.bigquery.v2.customMetrics.SparkBigQueryCustomMetricConstants.BIG_QUERY_BYTES_READ_METRIC_NAME;

import org.apache.spark.sql.connector.metric.CustomSumMetric;
import org.apache.spark.sql.connector.metric.CustomMetric;

public class SparkBigQueryBytesReadMetric extends CustomSumMetric {
public class SparkBigQueryBytesReadMetric implements CustomMetric {

@Override
public String name() {
Expand All @@ -16,4 +16,9 @@ public String name() {
public String description() {
return BIG_QUERY_BYTES_READ_METRIC_DESCRIPTION;
}

@Override
public String aggregateTaskMetrics(long[] taskMetrics) {
return MetricUtils.formatSizeMetrics(taskMetrics);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,9 @@ public class SparkBigQueryCustomMetricConstants {
public static final String BIG_QUERY_ROWS_READ_METRIC_NAME = "bqRowsRead";
static final String BIG_QUERY_ROWS_READ_METRIC_DESCRIPTION = "number of BQ rows read";
public static final String BIG_QUERY_SCAN_TIME_METRIC_NAME = "bqScanTime";
static final String BIG_QUERY_SCAN_TIME_METRIC_DESCRIPTION = "scan time for BQ in milli sec";
static final String BIG_QUERY_SCAN_TIME_METRIC_DESCRIPTION = "scan time for BQ";
public static final String BIG_QUERY_PARSE_TIME_METRIC_NAME = "bqParseTime";
static final String BIG_QUERY_PARSE_TIME_METRIC_DESCRIPTION = "parsing time for BQ in milli sec";
static final String BIG_QUERY_PARSE_TIME_METRIC_DESCRIPTION = "parsing time for BQ";
public static final String BIG_QUERY_TIME_IN_SPARK_METRIC_NAME = "bqTimeInSpark";
static final String BIG_QUERY_TIME_IN_SPARK_METRIC_DESCRIPTION =
"time spent in spark in milli sec";
static final String BIG_QUERY_TIME_IN_SPARK_METRIC_DESCRIPTION = "time spent in spark";
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package com.google.cloud.spark.bigquery.v2.customMetrics;

import static com.google.cloud.spark.bigquery.v2.customMetrics.SparkBigQueryCustomMetricConstants.*;
import static com.google.cloud.spark.bigquery.v2.customMetrics.SparkBigQueryCustomMetricConstants.BIG_QUERY_PARSE_TIME_METRIC_DESCRIPTION;
import static com.google.cloud.spark.bigquery.v2.customMetrics.SparkBigQueryCustomMetricConstants.BIG_QUERY_PARSE_TIME_METRIC_NAME;

import org.apache.spark.sql.connector.metric.CustomSumMetric;
import org.apache.spark.sql.connector.metric.CustomMetric;

public class SparkBigQueryParseTimeMetric extends CustomSumMetric {
public class SparkBigQueryParseTimeMetric implements CustomMetric {

@Override
public String name() {
Expand All @@ -15,4 +16,9 @@ public String name() {
public String description() {
return BIG_QUERY_PARSE_TIME_METRIC_DESCRIPTION;
}

@Override
public String aggregateTaskMetrics(long[] taskMetrics) {
return MetricUtils.formatTimeMetrics(taskMetrics);
}
}
Original file line number Diff line number Diff line change
@@ -1,10 +1,11 @@
package com.google.cloud.spark.bigquery.v2.customMetrics;

import static com.google.cloud.spark.bigquery.v2.customMetrics.SparkBigQueryCustomMetricConstants.*;
import static com.google.cloud.spark.bigquery.v2.customMetrics.SparkBigQueryCustomMetricConstants.BIG_QUERY_ROWS_READ_METRIC_DESCRIPTION;
import static com.google.cloud.spark.bigquery.v2.customMetrics.SparkBigQueryCustomMetricConstants.BIG_QUERY_ROWS_READ_METRIC_NAME;

import org.apache.spark.sql.connector.metric.CustomSumMetric;
import org.apache.spark.sql.connector.metric.CustomMetric;

public class SparkBigQueryRowsReadMetric extends CustomSumMetric {
public class SparkBigQueryRowsReadMetric implements CustomMetric {
@Override
public String name() {
return BIG_QUERY_ROWS_READ_METRIC_NAME;
Expand All @@ -14,4 +15,9 @@ public String name() {
public String description() {
return BIG_QUERY_ROWS_READ_METRIC_DESCRIPTION;
}

@Override
public String aggregateTaskMetrics(long[] taskMetrics) {
return MetricUtils.formatSumMetrics(taskMetrics);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -3,9 +3,9 @@
import static com.google.cloud.spark.bigquery.v2.customMetrics.SparkBigQueryCustomMetricConstants.BIG_QUERY_SCAN_TIME_METRIC_DESCRIPTION;
import static com.google.cloud.spark.bigquery.v2.customMetrics.SparkBigQueryCustomMetricConstants.BIG_QUERY_SCAN_TIME_METRIC_NAME;

import org.apache.spark.sql.connector.metric.CustomSumMetric;
import org.apache.spark.sql.connector.metric.CustomMetric;

public class SparkBigQueryScanTimeMetric extends CustomSumMetric {
public class SparkBigQueryScanTimeMetric implements CustomMetric {

@Override
public String name() {
Expand All @@ -16,4 +16,9 @@ public String name() {
public String description() {
return BIG_QUERY_SCAN_TIME_METRIC_DESCRIPTION;
}

@Override
public String aggregateTaskMetrics(long[] taskMetrics) {
return MetricUtils.formatTimeMetrics(taskMetrics);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -2,9 +2,9 @@

import static com.google.cloud.spark.bigquery.v2.customMetrics.SparkBigQueryCustomMetricConstants.*;

import org.apache.spark.sql.connector.metric.CustomSumMetric;
import org.apache.spark.sql.connector.metric.CustomMetric;

public class SparkBigQueryTimeInSparkMetric extends CustomSumMetric {
public class SparkBigQueryTimeInSparkMetric implements CustomMetric {

@Override
public String name() {
Expand All @@ -15,4 +15,9 @@ public String name() {
public String description() {
return BIG_QUERY_TIME_IN_SPARK_METRIC_DESCRIPTION;
}

@Override
public String aggregateTaskMetrics(long[] taskMetrics) {
return MetricUtils.formatTimeMetrics(taskMetrics);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,7 @@ public void testDescription() {

@Test
public void testAggregateMetrics() {
assertThat(sparkBigQueryBytesReadMetric.aggregateTaskMetrics(new long[] {1000L, 2000L}))
.isEqualTo("3000");
assertThat(sparkBigQueryBytesReadMetric.aggregateTaskMetrics(new long[] {1024L, 2048L}))
.isEqualTo("3.0 KiB");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public void testDescription() {

@Test
public void testAggregateMetrics() {
assertThat(sparkBigQueryParseTimeMetric.aggregateTaskMetrics(new long[] {1000L, 2000L}))
.isEqualTo("3000");
assertThat(sparkBigQueryParseTimeMetric.aggregateTaskMetrics(new long[] {1020L, 2030L, 3040L}))
.isEqualTo("total (min, med, max)\n" + "6.1 s (1.0 s, 2.0 s, 3.0 s)");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,6 @@ public void testDescription() {
@Test
public void testAggregateMetrics() {
assertThat(sparkBigQueryRowsReadMetric.aggregateTaskMetrics(new long[] {1000L, 2000L}))
.isEqualTo("3000");
.isEqualTo("3,000");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@ public void testDescription() {

@Test
public void testAggregateMetrics() {
assertThat(sparkBigQueryScanTimeMetric.aggregateTaskMetrics(new long[] {1000L, 2000L}))
.isEqualTo("3000");
assertThat(sparkBigQueryScanTimeMetric.aggregateTaskMetrics(new long[] {1010L, 2020L, 3030L}))
.isEqualTo("total (min, med, max)\n" + "6.1 s (1.0 s, 2.0 s, 3.0 s)");
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -23,7 +23,8 @@ public void testDescription() {

@Test
public void testAggregateMetrics() {
assertThat(sparkBigQueryTimeInSparkMetric.aggregateTaskMetrics(new long[] {1000L, 2000L}))
.isEqualTo("3000");
assertThat(
sparkBigQueryTimeInSparkMetric.aggregateTaskMetrics(new long[] {1020L, 2010L, 3030L}))
.isEqualTo("total (min, med, max)\n" + "6.1 s (1.0 s, 2.0 s, 3.0 s)");
}
}

0 comments on commit 296f1b9

Please sign in to comment.