Skip to content

Commit

Permalink
Cache Histograms in BigQuerySinkMetrics
Browse files Browse the repository at this point in the history
  • Loading branch information
JayajP committed Mar 28, 2024
1 parent e894d8c commit ebf509e
Show file tree
Hide file tree
Showing 2 changed files with 29 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@
import org.apache.beam.sdk.metrics.DelegatingHistogram;
import org.apache.beam.sdk.metrics.Histogram;
import org.apache.beam.sdk.metrics.MetricName;
import org.apache.beam.sdk.metrics.NoOpHistogram;
import org.apache.beam.sdk.util.HistogramData;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.annotations.VisibleForTesting;
import org.apache.beam.vendor.guava.v32_1_2_jre.com.google.common.base.Splitter;
Expand Down Expand Up @@ -89,6 +90,31 @@ enum RowStatus {
private static final char METRIC_KV_DELIMITER = ':';
private static final char METRIC_NAME_DELIMITER = '*';

private static class HistogramsHolder {
private static final Histogram STREAMING_INSERTS_HISTOGRAM =
createRPCLatencyHistogram(RpcMethod.STREAMING_INSERTS);
private static final Histogram APPEND_ROWS_HISTOGRAM =
createRPCLatencyHistogram(RpcMethod.APPEND_ROWS);
private static final Histogram FLUSH_ROWS_HISTOGRAM =
createRPCLatencyHistogram(RpcMethod.FLUSH_ROWS);
private static final Histogram FINALIZE_STREAM_HISTOGRAM =
createRPCLatencyHistogram(RpcMethod.FINALIZE_STREAM);
}

static Histogram getRpcLatencyHistogram(RpcMethod method) {
switch (method) {
case STREAMING_INSERTS:
return HistogramsHolder.STREAMING_INSERTS_HISTOGRAM;
case APPEND_ROWS:
return HistogramsHolder.APPEND_ROWS_HISTOGRAM;
case FLUSH_ROWS:
return HistogramsHolder.FLUSH_ROWS_HISTOGRAM;
case FINALIZE_STREAM:
return HistogramsHolder.FINALIZE_STREAM_HISTOGRAM;
}
return NoOpHistogram.getInstance();
}

@AutoValue
public abstract static class ParsedMetricName {
public abstract String getBaseName();
Expand Down Expand Up @@ -193,7 +219,7 @@ static Counter createRPCRequestCounter(RpcMethod method, String rpcStatus, Strin
* @param method StorageWriteAPI method associated with this metric.
* @return Histogram with exponential buckets with a sqrt(2) growth factor.
*/
static Histogram createRPCLatencyHistogram(RpcMethod method) {
private static Histogram createRPCLatencyHistogram(RpcMethod method) {
NavigableMap<String, String> metricLabels = new TreeMap<String, String>();
metricLabels.put(RPC_METHOD, method.toString());
String fullMetricName = createLabeledMetricName(RPC_LATENCY, metricLabels);
Expand All @@ -218,7 +244,7 @@ private static void updateRpcLatencyMetric(@Nonnull Context<?> c, RpcMethod meth
}
long timeElapsed = java.time.Duration.between(operationStartTime, operationEndTime).toMillis();
if (timeElapsed > 0) {
BigQuerySinkMetrics.createRPCLatencyHistogram(method).update(timeElapsed);
BigQuerySinkMetrics.getRpcLatencyHistogram(method).update(timeElapsed);
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -159,7 +159,7 @@ public void updateSuccessfulAndFailedRows(int totalRows, int failedRows) {
/** Record rpc latency histogram metrics. */
private void recordRpcLatencyMetrics() {
Histogram latencyHistogram =
BigQuerySinkMetrics.createRPCLatencyHistogram(
BigQuerySinkMetrics.getRpcLatencyHistogram(
BigQuerySinkMetrics.RpcMethod.STREAMING_INSERTS);
double[] rpcLatencies =
rpcLatencies().stream().mapToDouble(duration -> duration.toMillis()).toArray();
Expand Down

0 comments on commit ebf509e

Please sign in to comment.