Skip to content

Commit

Permalink
JAVA-1829: Add metrics for bytes-sent and bytes-received
Browse files Browse the repository at this point in the history
Motivation:

JAVA-708 added metrics to java driver 3.x for tracking the number of
bytes added and received for a Cluster instance.  For feature parity,
these metrics should also be tracked at a Session and Node level in java
driver 4.x.

Modifications:

- Add 'bytes-sent' and 'bytes-received' to node-level metrics and
  'bytes-sent' and 'bytes-received' to session-level metrics.

- Add InboundTrafficMeter and OutboundTrafficMeter to pipeline in
  ChannelFactory.initializer.

- Update ChannelFactory.connect methods to accept Node instead of
  SocketAddress.  Previous connect method retained for unit testing.

- Updated unit tests to reason with Node instead of SocketAddress where
  connect is involved and a Node instance is available.

- Added static singleton instance of NoopNodeMetricUpdater.

Result:

Additional metrics added for tracking bytes sent and received at both
Session and Node level.
  • Loading branch information
tolbertam authored and olim7t committed May 24, 2018
1 parent c87c3cf commit 29cefec
Show file tree
Hide file tree
Showing 28 changed files with 468 additions and 224 deletions.
2 changes: 2 additions & 0 deletions changelog/README.md
Expand Up @@ -4,6 +4,7 @@

### 4.0.0-alpha4 (in progress)

- [new feature] JAVA-1829: Add metrics for bytes-sent and bytes-received
- [improvement] JAVA-1755: Normalize usage of DEBUG/TRACE log levels
- [improvement] JAVA-1803: Log driver version on first use
- [improvement] JAVA-1792: Add AuthProvider callback to handle missing challenge from server
Expand All @@ -24,6 +25,7 @@
- [improvement] JAVA-1772: Revisit multi-response callbacks
- [new feature] JAVA-1537: Add remaining socket options
- [bug] JAVA-1756: Propagate custom payload when preparing a statement
- [improvement] JAVA-1829: Add metrics for bytes-sent and bytes-received

### 4.0.0-alpha3

Expand Down
Expand Up @@ -24,6 +24,8 @@ public enum DefaultNodeMetric implements NodeMetric {
AVAILABLE_STREAMS("pool.available-streams"),
IN_FLIGHT("pool.in-flight"),
ORPHANED_STREAMS("pool.orphaned-streams"),
BYTES_SENT("bytes-sent"),
BYTES_RECEIVED("bytes-received"),
CQL_MESSAGES("cql-messages"),
UNSENT_REQUESTS("errors.request.unsent"),
ABORTED_REQUESTS("errors.request.aborted"),
Expand Down
Expand Up @@ -20,6 +20,8 @@

/** See {@code reference.conf} for a description of each metric. */
public enum DefaultSessionMetric implements SessionMetric {
BYTES_SENT("bytes-sent"),
BYTES_RECEIVED("bytes-received"),
CONNECTED_NODES("connected-nodes"),
CQL_REQUESTS("cql-requests"),
CQL_CLIENT_TIMEOUTS("cql-client-timeouts"),
Expand Down
Expand Up @@ -19,8 +19,15 @@
import com.datastax.oss.driver.api.core.UnsupportedProtocolVersionException;
import com.datastax.oss.driver.api.core.config.DefaultDriverOption;
import com.datastax.oss.driver.api.core.config.DriverConfigProfile;
import com.datastax.oss.driver.api.core.metadata.Node;
import com.datastax.oss.driver.api.core.metrics.DefaultNodeMetric;
import com.datastax.oss.driver.api.core.metrics.DefaultSessionMetric;
import com.datastax.oss.driver.internal.core.context.InternalDriverContext;
import com.datastax.oss.driver.internal.core.context.NettyOptions;
import com.datastax.oss.driver.internal.core.metadata.DefaultNode;
import com.datastax.oss.driver.internal.core.metrics.NodeMetricUpdater;
import com.datastax.oss.driver.internal.core.metrics.NoopNodeMetricUpdater;
import com.datastax.oss.driver.internal.core.metrics.SessionMetricUpdater;
import com.datastax.oss.driver.internal.core.protocol.FrameDecoder;
import com.datastax.oss.driver.internal.core.protocol.FrameEncoder;
import com.datastax.oss.driver.shaded.guava.common.annotations.VisibleForTesting;
Expand Down Expand Up @@ -84,8 +91,19 @@ public void setProtocolVersion(ProtocolVersion newVersion) {
this.protocolVersion = newVersion;
}

public CompletionStage<DriverChannel> connect(
final SocketAddress address, DriverChannelOptions options) {
public CompletionStage<DriverChannel> connect(Node node, DriverChannelOptions options) {
NodeMetricUpdater nodeMetricUpdater;
if (node instanceof DefaultNode) {
nodeMetricUpdater = ((DefaultNode) node).getMetricUpdater();
} else {
nodeMetricUpdater = NoopNodeMetricUpdater.INSTANCE;
}
return connect(node.getConnectAddress(), options, nodeMetricUpdater);
}

@VisibleForTesting
CompletionStage<DriverChannel> connect(
SocketAddress address, DriverChannelOptions options, NodeMetricUpdater nodeMetricUpdater) {
CompletableFuture<DriverChannel> resultFuture = new CompletableFuture<>();

ProtocolVersion currentVersion;
Expand All @@ -99,14 +117,22 @@ public CompletionStage<DriverChannel> connect(
isNegotiating = true;
}

connect(address, options, currentVersion, isNegotiating, attemptedVersions, resultFuture);
connect(
address,
options,
nodeMetricUpdater,
currentVersion,
isNegotiating,
attemptedVersions,
resultFuture);
return resultFuture;
}

private void connect(
SocketAddress address,
DriverChannelOptions options,
final ProtocolVersion currentVersion,
NodeMetricUpdater nodeMetricUpdater,
ProtocolVersion currentVersion,
boolean isNegotiating,
List<ProtocolVersion> attemptedVersions,
CompletableFuture<DriverChannel> resultFuture) {
Expand All @@ -118,7 +144,8 @@ private void connect(
.group(nettyOptions.ioEventLoopGroup())
.channel(nettyOptions.channelClass())
.option(ChannelOption.ALLOCATOR, nettyOptions.allocator())
.handler(initializer(address, currentVersion, options, resultFuture));
.handler(
initializer(address, currentVersion, options, nodeMetricUpdater, resultFuture));

DriverConfigProfile config = context.config().getDefaultProfile();

Expand Down Expand Up @@ -180,7 +207,14 @@ private void connect(
"Failed to connect with protocol {}, retrying with {}",
currentVersion,
downgraded.get());
connect(address, options, downgraded.get(), true, attemptedVersions, resultFuture);
connect(
address,
options,
nodeMetricUpdater,
downgraded.get(),
true,
attemptedVersions,
resultFuture);
} else {
resultFuture.completeExceptionally(
UnsupportedProtocolVersionException.forNegotiation(address, attemptedVersions));
Expand All @@ -197,8 +231,9 @@ private void connect(
@VisibleForTesting
ChannelInitializer<Channel> initializer(
SocketAddress address,
final ProtocolVersion protocolVersion,
final DriverChannelOptions options,
ProtocolVersion protocolVersion,
DriverChannelOptions options,
NodeMetricUpdater nodeMetricUpdater,
CompletableFuture<DriverChannel> resultFuture) {
return new ChannelInitializer<Channel>() {
@Override
Expand Down Expand Up @@ -236,6 +271,23 @@ protected void initChannel(Channel channel) {
.sslHandlerFactory()
.map(f -> f.newSslHandler(channel, address))
.map(h -> pipeline.addLast("ssl", h));

// Only add meter handlers on the pipeline if metrics are enabled.
SessionMetricUpdater sessionMetricUpdater = context.metricsFactory().getSessionUpdater();
if (nodeMetricUpdater.isEnabled(DefaultNodeMetric.BYTES_RECEIVED)
|| sessionMetricUpdater.isEnabled(DefaultSessionMetric.BYTES_RECEIVED)) {
pipeline.addLast(
"inboundTrafficMeter",
new InboundTrafficMeter(nodeMetricUpdater, sessionMetricUpdater));
}

if (nodeMetricUpdater.isEnabled(DefaultNodeMetric.BYTES_SENT)
|| sessionMetricUpdater.isEnabled(DefaultSessionMetric.BYTES_SENT)) {
pipeline.addLast(
"outboundTrafficMeter",
new OutboundTrafficMeter(nodeMetricUpdater, sessionMetricUpdater));
}

pipeline
.addLast("encoder", new FrameEncoder(context.frameCodec(), maxFrameLength))
.addLast("decoder", new FrameDecoder(context.frameCodec(), maxFrameLength))
Expand Down
@@ -0,0 +1,46 @@
/*
* Copyright DataStax, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.datastax.oss.driver.internal.core.channel;

import com.datastax.oss.driver.api.core.metrics.DefaultNodeMetric;
import com.datastax.oss.driver.api.core.metrics.DefaultSessionMetric;
import com.datastax.oss.driver.internal.core.metrics.NodeMetricUpdater;
import com.datastax.oss.driver.internal.core.metrics.SessionMetricUpdater;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelInboundHandlerAdapter;

public class InboundTrafficMeter extends ChannelInboundHandlerAdapter {

private final NodeMetricUpdater nodeMetricUpdater;
private final SessionMetricUpdater sessionMetricUpdater;

InboundTrafficMeter(
NodeMetricUpdater nodeMetricUpdater, SessionMetricUpdater sessionMetricUpdater) {
this.nodeMetricUpdater = nodeMetricUpdater;
this.sessionMetricUpdater = sessionMetricUpdater;
}

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof ByteBuf) {
int bytes = ((ByteBuf) msg).readableBytes();
nodeMetricUpdater.markMeter(DefaultNodeMetric.BYTES_RECEIVED, bytes);
sessionMetricUpdater.markMeter(DefaultSessionMetric.BYTES_RECEIVED, bytes);
}
super.channelRead(ctx, msg);
}
}
@@ -0,0 +1,48 @@
/*
* Copyright DataStax, Inc.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package com.datastax.oss.driver.internal.core.channel;

import com.datastax.oss.driver.api.core.metrics.DefaultNodeMetric;
import com.datastax.oss.driver.api.core.metrics.DefaultSessionMetric;
import com.datastax.oss.driver.internal.core.metrics.NodeMetricUpdater;
import com.datastax.oss.driver.internal.core.metrics.SessionMetricUpdater;
import io.netty.buffer.ByteBuf;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.ChannelOutboundHandlerAdapter;
import io.netty.channel.ChannelPromise;

public class OutboundTrafficMeter extends ChannelOutboundHandlerAdapter {

private final NodeMetricUpdater nodeMetricUpdater;
private final SessionMetricUpdater sessionMetricUpdater;

OutboundTrafficMeter(
NodeMetricUpdater nodeMetricUpdater, SessionMetricUpdater sessionMetricUpdater) {
this.nodeMetricUpdater = nodeMetricUpdater;
this.sessionMetricUpdater = sessionMetricUpdater;
}

@Override
public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise)
throws Exception {
if (msg instanceof ByteBuf) {
int bytes = ((ByteBuf) msg).readableBytes();
nodeMetricUpdater.markMeter(DefaultNodeMetric.BYTES_SENT, bytes);
sessionMetricUpdater.markMeter(DefaultSessionMetric.BYTES_SENT, bytes);
}
super.write(ctx, msg, promise);
}
}
Expand Up @@ -277,7 +277,7 @@ private void connect(
LOG.debug("[{}] Trying to establish a connection to {}", logPrefix, node);
context
.channelFactory()
.connect(node.getConnectAddress(), channelOptions)
.connect(node, channelOptions)
.whenCompleteAsync(
(channel, error) -> {
try {
Expand Down
Expand Up @@ -44,28 +44,28 @@ protected DropwizardMetricUpdater(Set<MetricT> enabledMetrics, MetricRegistry re

@Override
public void incrementCounter(MetricT metric, long amount) {
if (enabledMetrics.contains(metric)) {
if (isEnabled(metric)) {
registry.counter(buildFullName(metric)).inc(amount);
}
}

@Override
public void updateHistogram(MetricT metric, long value) {
if (enabledMetrics.contains(metric)) {
if (isEnabled(metric)) {
registry.histogram(buildFullName(metric)).update(value);
}
}

@Override
public void markMeter(MetricT metric, long amount) {
if (enabledMetrics.contains(metric)) {
if (isEnabled(metric)) {
registry.meter(buildFullName(metric)).mark(amount);
}
}

@Override
public void updateTimer(MetricT metric, long duration, TimeUnit unit) {
if (enabledMetrics.contains(metric)) {
if (isEnabled(metric)) {
registry.timer(buildFullName(metric)).update(duration, unit);
}
}
Expand All @@ -75,8 +75,13 @@ public <T extends Metric> T getMetric(MetricT metric) {
return (T) registry.getMetrics().get(buildFullName(metric));
}

@Override
public boolean isEnabled(MetricT metric) {
return enabledMetrics.contains(metric);
}

protected void initializeDefaultCounter(MetricT metric) {
if (enabledMetrics.contains(metric)) {
if (isEnabled(metric)) {
// Just initialize eagerly so that the metric appears even when it has no data yet
registry.counter(buildFullName(metric));
}
Expand All @@ -88,7 +93,7 @@ protected void initializeHdrTimer(
DefaultDriverOption highestLatencyOption,
DefaultDriverOption significantDigitsOption,
DefaultDriverOption intervalOption) {
if (enabledMetrics.contains(metric)) {
if (isEnabled(metric)) {
String fullName = buildFullName(metric);

Duration highestLatency = config.getDuration(highestLatencyOption);
Expand Down
Expand Up @@ -59,7 +59,7 @@ public DropwizardMetricsFactory(InternalDriverContext context) {
if (enabledSessionMetrics.isEmpty() && enabledNodeMetrics.isEmpty()) {
LOG.debug("[{}] All metrics are disabled, Session.getMetrics will be empty", logPrefix);
this.registry = null;
this.sessionUpdater = new NoopSessionMetricUpdater();
this.sessionUpdater = NoopSessionMetricUpdater.INSTANCE;
this.metrics = Optional.empty();
} else {
this.registry = new MetricRegistry();
Expand All @@ -83,7 +83,7 @@ public SessionMetricUpdater getSessionUpdater() {
@Override
public NodeMetricUpdater newNodeUpdater(Node node) {
return (registry == null)
? new NoopNodeMetricUpdater()
? NoopNodeMetricUpdater.INSTANCE
: new DropwizardNodeMetricUpdater(node, enabledNodeMetrics, registry, context);
}

Expand Down
Expand Up @@ -34,4 +34,6 @@ default void markMeter(MetricT metric) {
}

void updateTimer(MetricT metric, long duration, TimeUnit unit);

boolean isEnabled(MetricT metric);
}
Expand Up @@ -22,6 +22,10 @@
@ThreadSafe
public class NoopNodeMetricUpdater implements NodeMetricUpdater {

public static NoopNodeMetricUpdater INSTANCE = new NoopNodeMetricUpdater();

private NoopNodeMetricUpdater() {}

@Override
public void incrementCounter(NodeMetric metric, long amount) {
// nothing to do
Expand All @@ -41,4 +45,10 @@ public void markMeter(NodeMetric metric, long amount) {
public void updateTimer(NodeMetric metric, long duration, TimeUnit unit) {
// nothing to do
}

@Override
public boolean isEnabled(NodeMetric metric) {
// since methods don't do anything, return false
return false;
}
}
Expand Up @@ -22,6 +22,10 @@
@ThreadSafe
public class NoopSessionMetricUpdater implements SessionMetricUpdater {

public static NoopSessionMetricUpdater INSTANCE = new NoopSessionMetricUpdater();

private NoopSessionMetricUpdater() {}

@Override
public void incrementCounter(SessionMetric metric, long amount) {
// nothing to do
Expand All @@ -41,4 +45,10 @@ public void markMeter(SessionMetric metric, long amount) {
public void updateTimer(SessionMetric metric, long duration, TimeUnit unit) {
// nothing to do
}

@Override
public boolean isEnabled(SessionMetric metric) {
// since methods don't do anything, return false
return false;
}
}
Expand Up @@ -274,8 +274,7 @@ private CompletionStage<Boolean> addMissingChannels() {
.withOwnerLogPrefix(sessionLogPrefix)
.build();
for (int i = 0; i < missing; i++) {
CompletionStage<DriverChannel> channelFuture =
channelFactory.connect(node.getConnectAddress(), options);
CompletionStage<DriverChannel> channelFuture = channelFactory.connect(node, options);
pendingChannels.add(channelFuture);
}
return CompletableFutures.allDone(pendingChannels)
Expand Down

0 comments on commit 29cefec

Please sign in to comment.