diff --git a/changelog/README.md b/changelog/README.md index 4d207c3a0a7..6366fa3a0f4 100644 --- a/changelog/README.md +++ b/changelog/README.md @@ -4,6 +4,7 @@ ### 4.0.0-alpha4 (in progress) +- [new feature] JAVA-1829: Add metrics for bytes-sent and bytes-received - [improvement] JAVA-1755: Normalize usage of DEBUG/TRACE log levels - [improvement] JAVA-1803: Log driver version on first use - [improvement] JAVA-1792: Add AuthProvider callback to handle missing challenge from server @@ -24,6 +25,7 @@ - [improvement] JAVA-1772: Revisit multi-response callbacks - [new feature] JAVA-1537: Add remaining socket options - [bug] JAVA-1756: Propagate custom payload when preparing a statement +- [improvement] JAVA-1829: Add metrics for bytes-sent and bytes-received ### 4.0.0-alpha3 diff --git a/core/src/main/java/com/datastax/oss/driver/api/core/metrics/DefaultNodeMetric.java b/core/src/main/java/com/datastax/oss/driver/api/core/metrics/DefaultNodeMetric.java index 6af84750589..78bc048f8e8 100644 --- a/core/src/main/java/com/datastax/oss/driver/api/core/metrics/DefaultNodeMetric.java +++ b/core/src/main/java/com/datastax/oss/driver/api/core/metrics/DefaultNodeMetric.java @@ -24,6 +24,8 @@ public enum DefaultNodeMetric implements NodeMetric { AVAILABLE_STREAMS("pool.available-streams"), IN_FLIGHT("pool.in-flight"), ORPHANED_STREAMS("pool.orphaned-streams"), + BYTES_SENT("bytes-sent"), + BYTES_RECEIVED("bytes-received"), CQL_MESSAGES("cql-messages"), UNSENT_REQUESTS("errors.request.unsent"), ABORTED_REQUESTS("errors.request.aborted"), diff --git a/core/src/main/java/com/datastax/oss/driver/api/core/metrics/DefaultSessionMetric.java b/core/src/main/java/com/datastax/oss/driver/api/core/metrics/DefaultSessionMetric.java index 15effc60252..5651347faa4 100644 --- a/core/src/main/java/com/datastax/oss/driver/api/core/metrics/DefaultSessionMetric.java +++ b/core/src/main/java/com/datastax/oss/driver/api/core/metrics/DefaultSessionMetric.java @@ -20,6 +20,8 @@ /** See {@code reference.conf} for a description of each metric. */ public enum DefaultSessionMetric implements SessionMetric { + BYTES_SENT("bytes-sent"), + BYTES_RECEIVED("bytes-received"), CONNECTED_NODES("connected-nodes"), CQL_REQUESTS("cql-requests"), CQL_CLIENT_TIMEOUTS("cql-client-timeouts"), diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/channel/ChannelFactory.java b/core/src/main/java/com/datastax/oss/driver/internal/core/channel/ChannelFactory.java index 556df9673c9..8a573ef3dd9 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/channel/ChannelFactory.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/channel/ChannelFactory.java @@ -19,8 +19,15 @@ import com.datastax.oss.driver.api.core.UnsupportedProtocolVersionException; import com.datastax.oss.driver.api.core.config.DefaultDriverOption; import com.datastax.oss.driver.api.core.config.DriverConfigProfile; +import com.datastax.oss.driver.api.core.metadata.Node; +import com.datastax.oss.driver.api.core.metrics.DefaultNodeMetric; +import com.datastax.oss.driver.api.core.metrics.DefaultSessionMetric; import com.datastax.oss.driver.internal.core.context.InternalDriverContext; import com.datastax.oss.driver.internal.core.context.NettyOptions; +import com.datastax.oss.driver.internal.core.metadata.DefaultNode; +import com.datastax.oss.driver.internal.core.metrics.NodeMetricUpdater; +import com.datastax.oss.driver.internal.core.metrics.NoopNodeMetricUpdater; +import com.datastax.oss.driver.internal.core.metrics.SessionMetricUpdater; import com.datastax.oss.driver.internal.core.protocol.FrameDecoder; import com.datastax.oss.driver.internal.core.protocol.FrameEncoder; import com.datastax.oss.driver.shaded.guava.common.annotations.VisibleForTesting; @@ -84,8 +91,19 @@ public void setProtocolVersion(ProtocolVersion newVersion) { this.protocolVersion = newVersion; } - public CompletionStage connect( - final SocketAddress address, DriverChannelOptions options) { + public CompletionStage connect(Node node, DriverChannelOptions options) { + NodeMetricUpdater nodeMetricUpdater; + if (node instanceof DefaultNode) { + nodeMetricUpdater = ((DefaultNode) node).getMetricUpdater(); + } else { + nodeMetricUpdater = NoopNodeMetricUpdater.INSTANCE; + } + return connect(node.getConnectAddress(), options, nodeMetricUpdater); + } + + @VisibleForTesting + CompletionStage connect( + SocketAddress address, DriverChannelOptions options, NodeMetricUpdater nodeMetricUpdater) { CompletableFuture resultFuture = new CompletableFuture<>(); ProtocolVersion currentVersion; @@ -99,14 +117,22 @@ public CompletionStage connect( isNegotiating = true; } - connect(address, options, currentVersion, isNegotiating, attemptedVersions, resultFuture); + connect( + address, + options, + nodeMetricUpdater, + currentVersion, + isNegotiating, + attemptedVersions, + resultFuture); return resultFuture; } private void connect( SocketAddress address, DriverChannelOptions options, - final ProtocolVersion currentVersion, + NodeMetricUpdater nodeMetricUpdater, + ProtocolVersion currentVersion, boolean isNegotiating, List attemptedVersions, CompletableFuture resultFuture) { @@ -118,7 +144,8 @@ private void connect( .group(nettyOptions.ioEventLoopGroup()) .channel(nettyOptions.channelClass()) .option(ChannelOption.ALLOCATOR, nettyOptions.allocator()) - .handler(initializer(address, currentVersion, options, resultFuture)); + .handler( + initializer(address, currentVersion, options, nodeMetricUpdater, resultFuture)); DriverConfigProfile config = context.config().getDefaultProfile(); @@ -180,7 +207,14 @@ private void connect( "Failed to connect with protocol {}, retrying with {}", currentVersion, downgraded.get()); - connect(address, options, downgraded.get(), true, attemptedVersions, resultFuture); + connect( + address, + options, + nodeMetricUpdater, + downgraded.get(), + true, + attemptedVersions, + resultFuture); } else { resultFuture.completeExceptionally( UnsupportedProtocolVersionException.forNegotiation(address, attemptedVersions)); @@ -197,8 +231,9 @@ private void connect( @VisibleForTesting ChannelInitializer initializer( SocketAddress address, - final ProtocolVersion protocolVersion, - final DriverChannelOptions options, + ProtocolVersion protocolVersion, + DriverChannelOptions options, + NodeMetricUpdater nodeMetricUpdater, CompletableFuture resultFuture) { return new ChannelInitializer() { @Override @@ -236,6 +271,23 @@ protected void initChannel(Channel channel) { .sslHandlerFactory() .map(f -> f.newSslHandler(channel, address)) .map(h -> pipeline.addLast("ssl", h)); + + // Only add meter handlers on the pipeline if metrics are enabled. + SessionMetricUpdater sessionMetricUpdater = context.metricsFactory().getSessionUpdater(); + if (nodeMetricUpdater.isEnabled(DefaultNodeMetric.BYTES_RECEIVED) + || sessionMetricUpdater.isEnabled(DefaultSessionMetric.BYTES_RECEIVED)) { + pipeline.addLast( + "inboundTrafficMeter", + new InboundTrafficMeter(nodeMetricUpdater, sessionMetricUpdater)); + } + + if (nodeMetricUpdater.isEnabled(DefaultNodeMetric.BYTES_SENT) + || sessionMetricUpdater.isEnabled(DefaultSessionMetric.BYTES_SENT)) { + pipeline.addLast( + "outboundTrafficMeter", + new OutboundTrafficMeter(nodeMetricUpdater, sessionMetricUpdater)); + } + pipeline .addLast("encoder", new FrameEncoder(context.frameCodec(), maxFrameLength)) .addLast("decoder", new FrameDecoder(context.frameCodec(), maxFrameLength)) diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/channel/InboundTrafficMeter.java b/core/src/main/java/com/datastax/oss/driver/internal/core/channel/InboundTrafficMeter.java new file mode 100644 index 00000000000..bba1320b527 --- /dev/null +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/channel/InboundTrafficMeter.java @@ -0,0 +1,46 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datastax.oss.driver.internal.core.channel; + +import com.datastax.oss.driver.api.core.metrics.DefaultNodeMetric; +import com.datastax.oss.driver.api.core.metrics.DefaultSessionMetric; +import com.datastax.oss.driver.internal.core.metrics.NodeMetricUpdater; +import com.datastax.oss.driver.internal.core.metrics.SessionMetricUpdater; +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelInboundHandlerAdapter; + +public class InboundTrafficMeter extends ChannelInboundHandlerAdapter { + + private final NodeMetricUpdater nodeMetricUpdater; + private final SessionMetricUpdater sessionMetricUpdater; + + InboundTrafficMeter( + NodeMetricUpdater nodeMetricUpdater, SessionMetricUpdater sessionMetricUpdater) { + this.nodeMetricUpdater = nodeMetricUpdater; + this.sessionMetricUpdater = sessionMetricUpdater; + } + + @Override + public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { + if (msg instanceof ByteBuf) { + int bytes = ((ByteBuf) msg).readableBytes(); + nodeMetricUpdater.markMeter(DefaultNodeMetric.BYTES_RECEIVED, bytes); + sessionMetricUpdater.markMeter(DefaultSessionMetric.BYTES_RECEIVED, bytes); + } + super.channelRead(ctx, msg); + } +} diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/channel/OutboundTrafficMeter.java b/core/src/main/java/com/datastax/oss/driver/internal/core/channel/OutboundTrafficMeter.java new file mode 100644 index 00000000000..1ed82839244 --- /dev/null +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/channel/OutboundTrafficMeter.java @@ -0,0 +1,48 @@ +/* + * Copyright DataStax, Inc. + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package com.datastax.oss.driver.internal.core.channel; + +import com.datastax.oss.driver.api.core.metrics.DefaultNodeMetric; +import com.datastax.oss.driver.api.core.metrics.DefaultSessionMetric; +import com.datastax.oss.driver.internal.core.metrics.NodeMetricUpdater; +import com.datastax.oss.driver.internal.core.metrics.SessionMetricUpdater; +import io.netty.buffer.ByteBuf; +import io.netty.channel.ChannelHandlerContext; +import io.netty.channel.ChannelOutboundHandlerAdapter; +import io.netty.channel.ChannelPromise; + +public class OutboundTrafficMeter extends ChannelOutboundHandlerAdapter { + + private final NodeMetricUpdater nodeMetricUpdater; + private final SessionMetricUpdater sessionMetricUpdater; + + OutboundTrafficMeter( + NodeMetricUpdater nodeMetricUpdater, SessionMetricUpdater sessionMetricUpdater) { + this.nodeMetricUpdater = nodeMetricUpdater; + this.sessionMetricUpdater = sessionMetricUpdater; + } + + @Override + public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) + throws Exception { + if (msg instanceof ByteBuf) { + int bytes = ((ByteBuf) msg).readableBytes(); + nodeMetricUpdater.markMeter(DefaultNodeMetric.BYTES_SENT, bytes); + sessionMetricUpdater.markMeter(DefaultSessionMetric.BYTES_SENT, bytes); + } + super.write(ctx, msg, promise); + } +} diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/control/ControlConnection.java b/core/src/main/java/com/datastax/oss/driver/internal/core/control/ControlConnection.java index 77cb42f98bd..31d163cb6a3 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/control/ControlConnection.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/control/ControlConnection.java @@ -277,7 +277,7 @@ private void connect( LOG.debug("[{}] Trying to establish a connection to {}", logPrefix, node); context .channelFactory() - .connect(node.getConnectAddress(), channelOptions) + .connect(node, channelOptions) .whenCompleteAsync( (channel, error) -> { try { diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/metrics/DropwizardMetricUpdater.java b/core/src/main/java/com/datastax/oss/driver/internal/core/metrics/DropwizardMetricUpdater.java index 2315b98f939..20654e197ae 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/metrics/DropwizardMetricUpdater.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/metrics/DropwizardMetricUpdater.java @@ -44,28 +44,28 @@ protected DropwizardMetricUpdater(Set enabledMetrics, MetricRegistry re @Override public void incrementCounter(MetricT metric, long amount) { - if (enabledMetrics.contains(metric)) { + if (isEnabled(metric)) { registry.counter(buildFullName(metric)).inc(amount); } } @Override public void updateHistogram(MetricT metric, long value) { - if (enabledMetrics.contains(metric)) { + if (isEnabled(metric)) { registry.histogram(buildFullName(metric)).update(value); } } @Override public void markMeter(MetricT metric, long amount) { - if (enabledMetrics.contains(metric)) { + if (isEnabled(metric)) { registry.meter(buildFullName(metric)).mark(amount); } } @Override public void updateTimer(MetricT metric, long duration, TimeUnit unit) { - if (enabledMetrics.contains(metric)) { + if (isEnabled(metric)) { registry.timer(buildFullName(metric)).update(duration, unit); } } @@ -75,8 +75,13 @@ public T getMetric(MetricT metric) { return (T) registry.getMetrics().get(buildFullName(metric)); } + @Override + public boolean isEnabled(MetricT metric) { + return enabledMetrics.contains(metric); + } + protected void initializeDefaultCounter(MetricT metric) { - if (enabledMetrics.contains(metric)) { + if (isEnabled(metric)) { // Just initialize eagerly so that the metric appears even when it has no data yet registry.counter(buildFullName(metric)); } @@ -88,7 +93,7 @@ protected void initializeHdrTimer( DefaultDriverOption highestLatencyOption, DefaultDriverOption significantDigitsOption, DefaultDriverOption intervalOption) { - if (enabledMetrics.contains(metric)) { + if (isEnabled(metric)) { String fullName = buildFullName(metric); Duration highestLatency = config.getDuration(highestLatencyOption); diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/metrics/DropwizardMetricsFactory.java b/core/src/main/java/com/datastax/oss/driver/internal/core/metrics/DropwizardMetricsFactory.java index 95512a19681..2cb5886fe20 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/metrics/DropwizardMetricsFactory.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/metrics/DropwizardMetricsFactory.java @@ -59,7 +59,7 @@ public DropwizardMetricsFactory(InternalDriverContext context) { if (enabledSessionMetrics.isEmpty() && enabledNodeMetrics.isEmpty()) { LOG.debug("[{}] All metrics are disabled, Session.getMetrics will be empty", logPrefix); this.registry = null; - this.sessionUpdater = new NoopSessionMetricUpdater(); + this.sessionUpdater = NoopSessionMetricUpdater.INSTANCE; this.metrics = Optional.empty(); } else { this.registry = new MetricRegistry(); @@ -83,7 +83,7 @@ public SessionMetricUpdater getSessionUpdater() { @Override public NodeMetricUpdater newNodeUpdater(Node node) { return (registry == null) - ? new NoopNodeMetricUpdater() + ? NoopNodeMetricUpdater.INSTANCE : new DropwizardNodeMetricUpdater(node, enabledNodeMetrics, registry, context); } diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/metrics/MetricUpdater.java b/core/src/main/java/com/datastax/oss/driver/internal/core/metrics/MetricUpdater.java index c4e2777db5f..966366b0a20 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/metrics/MetricUpdater.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/metrics/MetricUpdater.java @@ -34,4 +34,6 @@ default void markMeter(MetricT metric) { } void updateTimer(MetricT metric, long duration, TimeUnit unit); + + boolean isEnabled(MetricT metric); } diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/metrics/NoopNodeMetricUpdater.java b/core/src/main/java/com/datastax/oss/driver/internal/core/metrics/NoopNodeMetricUpdater.java index d1d81ca056f..404d5953611 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/metrics/NoopNodeMetricUpdater.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/metrics/NoopNodeMetricUpdater.java @@ -22,6 +22,10 @@ @ThreadSafe public class NoopNodeMetricUpdater implements NodeMetricUpdater { + public static NoopNodeMetricUpdater INSTANCE = new NoopNodeMetricUpdater(); + + private NoopNodeMetricUpdater() {} + @Override public void incrementCounter(NodeMetric metric, long amount) { // nothing to do @@ -41,4 +45,10 @@ public void markMeter(NodeMetric metric, long amount) { public void updateTimer(NodeMetric metric, long duration, TimeUnit unit) { // nothing to do } + + @Override + public boolean isEnabled(NodeMetric metric) { + // since methods don't do anything, return false + return false; + } } diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/metrics/NoopSessionMetricUpdater.java b/core/src/main/java/com/datastax/oss/driver/internal/core/metrics/NoopSessionMetricUpdater.java index 12a961b8c4c..17b7fd23e3b 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/metrics/NoopSessionMetricUpdater.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/metrics/NoopSessionMetricUpdater.java @@ -22,6 +22,10 @@ @ThreadSafe public class NoopSessionMetricUpdater implements SessionMetricUpdater { + public static NoopSessionMetricUpdater INSTANCE = new NoopSessionMetricUpdater(); + + private NoopSessionMetricUpdater() {} + @Override public void incrementCounter(SessionMetric metric, long amount) { // nothing to do @@ -41,4 +45,10 @@ public void markMeter(SessionMetric metric, long amount) { public void updateTimer(SessionMetric metric, long duration, TimeUnit unit) { // nothing to do } + + @Override + public boolean isEnabled(SessionMetric metric) { + // since methods don't do anything, return false + return false; + } } diff --git a/core/src/main/java/com/datastax/oss/driver/internal/core/pool/ChannelPool.java b/core/src/main/java/com/datastax/oss/driver/internal/core/pool/ChannelPool.java index 10c80d99322..451de38fe03 100644 --- a/core/src/main/java/com/datastax/oss/driver/internal/core/pool/ChannelPool.java +++ b/core/src/main/java/com/datastax/oss/driver/internal/core/pool/ChannelPool.java @@ -274,8 +274,7 @@ private CompletionStage addMissingChannels() { .withOwnerLogPrefix(sessionLogPrefix) .build(); for (int i = 0; i < missing; i++) { - CompletionStage channelFuture = - channelFactory.connect(node.getConnectAddress(), options); + CompletionStage channelFuture = channelFactory.connect(node, options); pendingChannels.add(channelFuture); } return CompletableFutures.allDone(pendingChannels) diff --git a/core/src/main/resources/reference.conf b/core/src/main/resources/reference.conf index 22884c045c6..2a4e465e9ef 100644 --- a/core/src/main/resources/reference.conf +++ b/core/src/main/resources/reference.conf @@ -812,6 +812,12 @@ datastax-java-driver { # The session-level metrics (all disabled by default). session { enabled = [ + # The number and rate of bytes sent for the entire session (exposed as a Meter). + // bytes-sent, + + # The number and rate of bytes received for the entire session (exposed as a Meter). + // bytes-received + # The number of nodes to which the driver has at least one active connection (exposed as a # Gauge). // connected-nodes, @@ -916,6 +922,12 @@ datastax-java-driver { # See the description of the connection.max-orphan-requests option for more details. // pool.orphaned-streams, + # The number and rate of bytes sent to this node (exposed as a Meter). + // bytes-sent, + + # The number and rate of bytes received from this node (exposed as a Meter). + // bytes-received, + # The throughput and latency percentiles of individual CQL messages sent to this node as # part of an overall request (exposed as a Timer). # diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/channel/ChannelFactoryAvailableIdsTest.java b/core/src/test/java/com/datastax/oss/driver/internal/core/channel/ChannelFactoryAvailableIdsTest.java index ec0e15285cf..2645c70e8e0 100644 --- a/core/src/test/java/com/datastax/oss/driver/internal/core/channel/ChannelFactoryAvailableIdsTest.java +++ b/core/src/test/java/com/datastax/oss/driver/internal/core/channel/ChannelFactoryAvailableIdsTest.java @@ -21,6 +21,7 @@ import com.datastax.oss.driver.api.core.DefaultProtocolVersion; import com.datastax.oss.driver.api.core.config.DefaultDriverOption; +import com.datastax.oss.driver.internal.core.metrics.NoopNodeMetricUpdater; import com.datastax.oss.protocol.internal.Frame; import com.datastax.oss.protocol.internal.request.Query; import com.datastax.oss.protocol.internal.response.result.Void; @@ -58,7 +59,8 @@ public void should_report_available_ids() { // When CompletionStage channelFuture = - factory.connect(SERVER_ADDRESS, DriverChannelOptions.builder().build()); + factory.connect( + SERVER_ADDRESS, DriverChannelOptions.builder().build(), NoopNodeMetricUpdater.INSTANCE); completeSimpleChannelInit(); // Then diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/channel/ChannelFactoryClusterNameTest.java b/core/src/test/java/com/datastax/oss/driver/internal/core/channel/ChannelFactoryClusterNameTest.java index 509510720f3..e0b6494836d 100644 --- a/core/src/test/java/com/datastax/oss/driver/internal/core/channel/ChannelFactoryClusterNameTest.java +++ b/core/src/test/java/com/datastax/oss/driver/internal/core/channel/ChannelFactoryClusterNameTest.java @@ -20,6 +20,7 @@ import com.datastax.oss.driver.api.core.DefaultProtocolVersion; import com.datastax.oss.driver.api.core.config.DefaultDriverOption; import com.datastax.oss.driver.internal.core.TestResponses; +import com.datastax.oss.driver.internal.core.metrics.NoopNodeMetricUpdater; import com.datastax.oss.protocol.internal.response.Ready; import java.util.concurrent.CompletionStage; import org.junit.Test; @@ -37,7 +38,8 @@ public void should_set_cluster_name_from_first_connection() { // When CompletionStage channelFuture = - factory.connect(SERVER_ADDRESS, DriverChannelOptions.DEFAULT); + factory.connect( + SERVER_ADDRESS, DriverChannelOptions.DEFAULT, NoopNodeMetricUpdater.INSTANCE); writeInboundFrame(readOutboundFrame(), new Ready()); writeInboundFrame(readOutboundFrame(), TestResponses.clusterNameResponse("mockClusterName")); @@ -57,13 +59,16 @@ public void should_check_cluster_name_for_next_connections() throws Throwable { // When CompletionStage channelFuture = - factory.connect(SERVER_ADDRESS, DriverChannelOptions.DEFAULT); + factory.connect( + SERVER_ADDRESS, DriverChannelOptions.DEFAULT, NoopNodeMetricUpdater.INSTANCE); // open a first connection that will define the cluster name writeInboundFrame(readOutboundFrame(), new Ready()); writeInboundFrame(readOutboundFrame(), TestResponses.clusterNameResponse("mockClusterName")); assertThat(channelFuture).isSuccess(); // open a second connection that returns the same cluster name - channelFuture = factory.connect(SERVER_ADDRESS, DriverChannelOptions.DEFAULT); + channelFuture = + factory.connect( + SERVER_ADDRESS, DriverChannelOptions.DEFAULT, NoopNodeMetricUpdater.INSTANCE); writeInboundFrame(readOutboundFrame(), new Ready()); writeInboundFrame(readOutboundFrame(), TestResponses.clusterNameResponse("mockClusterName")); @@ -72,7 +77,9 @@ public void should_check_cluster_name_for_next_connections() throws Throwable { // When // open a third connection that returns a different cluster name - channelFuture = factory.connect(SERVER_ADDRESS, DriverChannelOptions.DEFAULT); + channelFuture = + factory.connect( + SERVER_ADDRESS, DriverChannelOptions.DEFAULT, NoopNodeMetricUpdater.INSTANCE); writeInboundFrame(readOutboundFrame(), new Ready()); writeInboundFrame(readOutboundFrame(), TestResponses.clusterNameResponse("wrongClusterName")); diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/channel/ChannelFactoryProtocolNegotiationTest.java b/core/src/test/java/com/datastax/oss/driver/internal/core/channel/ChannelFactoryProtocolNegotiationTest.java index a52221f4cec..f7a37f5edab 100644 --- a/core/src/test/java/com/datastax/oss/driver/internal/core/channel/ChannelFactoryProtocolNegotiationTest.java +++ b/core/src/test/java/com/datastax/oss/driver/internal/core/channel/ChannelFactoryProtocolNegotiationTest.java @@ -21,6 +21,7 @@ import com.datastax.oss.driver.api.core.UnsupportedProtocolVersionException; import com.datastax.oss.driver.api.core.config.DefaultDriverOption; import com.datastax.oss.driver.internal.core.TestResponses; +import com.datastax.oss.driver.internal.core.metrics.NoopNodeMetricUpdater; import com.datastax.oss.protocol.internal.Frame; import com.datastax.oss.protocol.internal.ProtocolConstants; import com.datastax.oss.protocol.internal.response.Error; @@ -46,7 +47,8 @@ public void should_succeed_if_version_specified_and_supported_by_server() { // When CompletionStage channelFuture = - factory.connect(SERVER_ADDRESS, DriverChannelOptions.DEFAULT); + factory.connect( + SERVER_ADDRESS, DriverChannelOptions.DEFAULT, NoopNodeMetricUpdater.INSTANCE); completeSimpleChannelInit(); @@ -69,7 +71,8 @@ public void should_fail_if_version_specified_and_not_supported_by_server(int err // When CompletionStage channelFuture = - factory.connect(SERVER_ADDRESS, DriverChannelOptions.DEFAULT); + factory.connect( + SERVER_ADDRESS, DriverChannelOptions.DEFAULT, NoopNodeMetricUpdater.INSTANCE); Frame requestFrame = readOutboundFrame(); assertThat(requestFrame.protocolVersion).isEqualTo(DefaultProtocolVersion.V4.getCode()); @@ -99,7 +102,8 @@ public void should_succeed_if_version_not_specified_and_server_supports_latest_s // When CompletionStage channelFuture = - factory.connect(SERVER_ADDRESS, DriverChannelOptions.DEFAULT); + factory.connect( + SERVER_ADDRESS, DriverChannelOptions.DEFAULT, NoopNodeMetricUpdater.INSTANCE); Frame requestFrame = readOutboundFrame(); assertThat(requestFrame.protocolVersion).isEqualTo(DefaultProtocolVersion.V4.getCode()); @@ -127,7 +131,8 @@ public void should_negotiate_if_version_not_specified_and_server_supports_legacy // When CompletionStage channelFuture = - factory.connect(SERVER_ADDRESS, DriverChannelOptions.DEFAULT); + factory.connect( + SERVER_ADDRESS, DriverChannelOptions.DEFAULT, NoopNodeMetricUpdater.INSTANCE); Frame requestFrame = readOutboundFrame(); assertThat(requestFrame.protocolVersion).isEqualTo(DefaultProtocolVersion.V4.getCode()); @@ -163,7 +168,8 @@ public void should_fail_if_negotiation_finds_no_matching_version(int errorCode) // When CompletionStage channelFuture = - factory.connect(SERVER_ADDRESS, DriverChannelOptions.DEFAULT); + factory.connect( + SERVER_ADDRESS, DriverChannelOptions.DEFAULT, NoopNodeMetricUpdater.INSTANCE); Frame requestFrame = readOutboundFrame(); assertThat(requestFrame.protocolVersion).isEqualTo(DefaultProtocolVersion.V4.getCode()); diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/channel/ChannelFactoryTestBase.java b/core/src/test/java/com/datastax/oss/driver/internal/core/channel/ChannelFactoryTestBase.java index 6369aa22b39..23ff1ab8b2d 100644 --- a/core/src/test/java/com/datastax/oss/driver/internal/core/channel/ChannelFactoryTestBase.java +++ b/core/src/test/java/com/datastax/oss/driver/internal/core/channel/ChannelFactoryTestBase.java @@ -27,6 +27,7 @@ import com.datastax.oss.driver.internal.core.context.EventBus; import com.datastax.oss.driver.internal.core.context.InternalDriverContext; import com.datastax.oss.driver.internal.core.context.NettyOptions; +import com.datastax.oss.driver.internal.core.metrics.NodeMetricUpdater; import com.datastax.oss.driver.internal.core.protocol.ByteBufPrimitiveCodec; import com.datastax.oss.protocol.internal.Compressor; import com.datastax.oss.protocol.internal.Frame; @@ -229,6 +230,7 @@ ChannelInitializer initializer( SocketAddress address, ProtocolVersion protocolVersion, DriverChannelOptions options, + NodeMetricUpdater nodeMetricUpdater, CompletableFuture resultFuture) { return new ChannelInitializer() { @Override diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/channel/MockChannelFactoryHelper.java b/core/src/test/java/com/datastax/oss/driver/internal/core/channel/MockChannelFactoryHelper.java index 41c0c199004..ec205fb825b 100644 --- a/core/src/test/java/com/datastax/oss/driver/internal/core/channel/MockChannelFactoryHelper.java +++ b/core/src/test/java/com/datastax/oss/driver/internal/core/channel/MockChannelFactoryHelper.java @@ -21,11 +21,11 @@ import static org.mockito.ArgumentMatchers.eq; import static org.mockito.Mockito.timeout; +import com.datastax.oss.driver.api.core.metadata.Node; import com.datastax.oss.driver.internal.core.util.concurrent.CompletableFutures; import com.datastax.oss.driver.shaded.guava.common.collect.ListMultimap; import com.datastax.oss.driver.shaded.guava.common.collect.MultimapBuilder; import com.datastax.oss.driver.shaded.guava.common.collect.Sets; -import java.net.SocketAddress; import java.util.ArrayDeque; import java.util.Deque; import java.util.HashMap; @@ -46,7 +46,7 @@ * methods throughout the test to check that each call has been performed. * *

This class handles asynchronous calls to the thread factory, but it must be used from a single - * thread (see {@link #waitForCalls(SocketAddress, int)}). + * thread (see {@link #waitForCalls(Node, int)}). */ public class MockChannelFactoryHelper { @@ -59,15 +59,15 @@ public static Builder builder(ChannelFactory channelFactory) { private final ChannelFactory channelFactory; private final InOrder inOrder; // If waitForCalls sees more invocations than expected, the difference is stored here - private final Map previous = new HashMap<>(); + private final Map previous = new HashMap<>(); public MockChannelFactoryHelper(ChannelFactory channelFactory) { this.channelFactory = channelFactory; this.inOrder = Mockito.inOrder(channelFactory); } - public void waitForCall(SocketAddress address) { - waitForCalls(address, 1); + public void waitForCall(Node node) { + waitForCalls(node, 1); } /** @@ -77,10 +77,10 @@ public void waitForCall(SocketAddress address) { * expected when this method is called. If so, the extra calls are stored and stored and will be * taken into account next time. */ - public void waitForCalls(SocketAddress address, int expected) { - int fromLastTime = previous.getOrDefault(address, 0); + public void waitForCalls(Node node, int expected) { + int fromLastTime = previous.getOrDefault(node, 0); if (fromLastTime >= expected) { - previous.put(address, fromLastTime - expected); + previous.put(node, fromLastTime - expected); return; } expected -= fromLastTime; @@ -91,19 +91,19 @@ public void waitForCalls(SocketAddress address, int expected) { ArgumentCaptor.forClass(DriverChannelOptions.class); inOrder .verify(channelFactory, timeout(CONNECT_TIMEOUT_MILLIS).atLeast(expected)) - .connect(eq(address), optionsCaptor.capture()); + .connect(eq(node), optionsCaptor.capture()); int actual = optionsCaptor.getAllValues().size(); int extras = actual - expected; if (extras > 0) { - previous.compute(address, (k, v) -> (v == null) ? extras : v + extras); + previous.compute(node, (k, v) -> (v == null) ? extras : v + extras); } } public void verifyNoMoreCalls() { inOrder .verify(channelFactory, timeout(CONNECT_TIMEOUT_MILLIS).times(0)) - .connect(any(SocketAddress.class), any(DriverChannelOptions.class)); + .connect(any(Node.class), any(DriverChannelOptions.class)); Set counts = Sets.newHashSet(previous.values()); if (!counts.isEmpty()) { @@ -113,7 +113,7 @@ public void verifyNoMoreCalls() { public static class Builder { private final ChannelFactory channelFactory; - private final ListMultimap invocations = + private final ListMultimap invocations = MultimapBuilder.hashKeys().arrayListValues().build(); public Builder(ChannelFactory channelFactory) { @@ -122,23 +122,23 @@ public Builder(ChannelFactory channelFactory) { this.channelFactory = channelFactory; } - public Builder success(SocketAddress address, DriverChannel channel) { - invocations.put(address, channel); + public Builder success(Node node, DriverChannel channel) { + invocations.put(node, channel); return this; } - public Builder failure(SocketAddress address, String error) { - invocations.put(address, new Exception(error)); + public Builder failure(Node node, String error) { + invocations.put(node, new Exception(error)); return this; } - public Builder failure(SocketAddress address, Throwable error) { - invocations.put(address, error); + public Builder failure(Node node, Throwable error) { + invocations.put(node, error); return this; } - public Builder pending(SocketAddress address, CompletableFuture future) { - invocations.put(address, future); + public Builder pending(Node node, CompletableFuture future) { + invocations.put(node, future); return this; } @@ -148,9 +148,9 @@ public MockChannelFactoryHelper build() { } private void stub() { - for (SocketAddress address : invocations.keySet()) { + for (Node node : invocations.keySet()) { Deque> results = new ArrayDeque<>(); - for (Object object : invocations.get(address)) { + for (Object object : invocations.get(node)) { if (object instanceof DriverChannel) { results.add(CompletableFuture.completedFuture(((DriverChannel) object))); } else if (object instanceof Throwable) { @@ -166,7 +166,7 @@ private void stub() { if (results.size() > 0) { CompletionStage first = results.poll(); OngoingStubbing> ongoingStubbing = - Mockito.when(channelFactory.connect(eq(address), any(DriverChannelOptions.class))) + Mockito.when(channelFactory.connect(eq(node), any(DriverChannelOptions.class))) .thenReturn(first); for (CompletionStage result : results) { ongoingStubbing.thenReturn(result); diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/control/ControlConnectionEventsTest.java b/core/src/test/java/com/datastax/oss/driver/internal/core/control/ControlConnectionEventsTest.java index ace2d8166d8..e77427c6e3f 100644 --- a/core/src/test/java/com/datastax/oss/driver/internal/core/control/ControlConnectionEventsTest.java +++ b/core/src/test/java/com/datastax/oss/driver/internal/core/control/ControlConnectionEventsTest.java @@ -40,7 +40,7 @@ public void should_register_for_all_events_if_topology_requested() { DriverChannel channel1 = newMockDriverChannel(1); ArgumentCaptor optionsCaptor = ArgumentCaptor.forClass(DriverChannelOptions.class); - Mockito.when(channelFactory.connect(eq(ADDRESS1), optionsCaptor.capture())) + Mockito.when(channelFactory.connect(eq(node1), optionsCaptor.capture())) .thenReturn(CompletableFuture.completedFuture(channel1)); // When @@ -63,7 +63,7 @@ public void should_register_for_schema_events_only_if_topology_not_requested() { DriverChannel channel1 = newMockDriverChannel(1); ArgumentCaptor optionsCaptor = ArgumentCaptor.forClass(DriverChannelOptions.class); - Mockito.when(channelFactory.connect(eq(ADDRESS1), optionsCaptor.capture())) + Mockito.when(channelFactory.connect(eq(node1), optionsCaptor.capture())) .thenReturn(CompletableFuture.completedFuture(channel1)); // When @@ -83,7 +83,7 @@ public void should_process_status_change_events() { DriverChannel channel1 = newMockDriverChannel(1); ArgumentCaptor optionsCaptor = ArgumentCaptor.forClass(DriverChannelOptions.class); - Mockito.when(channelFactory.connect(eq(ADDRESS1), optionsCaptor.capture())) + Mockito.when(channelFactory.connect(eq(node1), optionsCaptor.capture())) .thenReturn(CompletableFuture.completedFuture(channel1)); controlConnection.init(true, false); waitForPendingAdminTasks(); @@ -105,7 +105,7 @@ public void should_process_topology_change_events() { DriverChannel channel1 = newMockDriverChannel(1); ArgumentCaptor optionsCaptor = ArgumentCaptor.forClass(DriverChannelOptions.class); - Mockito.when(channelFactory.connect(eq(ADDRESS1), optionsCaptor.capture())) + Mockito.when(channelFactory.connect(eq(node1), optionsCaptor.capture())) .thenReturn(CompletableFuture.completedFuture(channel1)); controlConnection.init(true, false); waitForPendingAdminTasks(); @@ -127,7 +127,7 @@ public void should_process_schema_change_events() { DriverChannel channel1 = newMockDriverChannel(1); ArgumentCaptor optionsCaptor = ArgumentCaptor.forClass(DriverChannelOptions.class); - Mockito.when(channelFactory.connect(eq(ADDRESS1), optionsCaptor.capture())) + Mockito.when(channelFactory.connect(eq(node1), optionsCaptor.capture())) .thenReturn(CompletableFuture.completedFuture(channel1)); controlConnection.init(false, false); waitForPendingAdminTasks(); diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/control/ControlConnectionTest.java b/core/src/test/java/com/datastax/oss/driver/internal/core/control/ControlConnectionTest.java index 70dfd52a5fb..35aa42a3ee8 100644 --- a/core/src/test/java/com/datastax/oss/driver/internal/core/control/ControlConnectionTest.java +++ b/core/src/test/java/com/datastax/oss/driver/internal/core/control/ControlConnectionTest.java @@ -50,11 +50,11 @@ public void should_init_with_first_contact_point_if_reachable() { // Given DriverChannel channel1 = newMockDriverChannel(1); MockChannelFactoryHelper factoryHelper = - MockChannelFactoryHelper.builder(channelFactory).success(ADDRESS1, channel1).build(); + MockChannelFactoryHelper.builder(channelFactory).success(node1, channel1).build(); // When CompletionStage initFuture = controlConnection.init(false, false); - factoryHelper.waitForCall(ADDRESS1); + factoryHelper.waitForCall(node1); waitForPendingAdminTasks(); // Then @@ -70,11 +70,11 @@ public void should_always_return_same_init_future() { // Given DriverChannel channel1 = newMockDriverChannel(1); MockChannelFactoryHelper factoryHelper = - MockChannelFactoryHelper.builder(channelFactory).success(ADDRESS1, channel1).build(); + MockChannelFactoryHelper.builder(channelFactory).success(node1, channel1).build(); // When CompletionStage initFuture1 = controlConnection.init(false, false); - factoryHelper.waitForCall(ADDRESS1); + factoryHelper.waitForCall(node1); CompletionStage initFuture2 = controlConnection.init(false, false); // Then @@ -89,14 +89,14 @@ public void should_init_with_second_contact_point_if_first_one_fails() { DriverChannel channel2 = newMockDriverChannel(2); MockChannelFactoryHelper factoryHelper = MockChannelFactoryHelper.builder(channelFactory) - .failure(ADDRESS1, "mock failure") - .success(ADDRESS2, channel2) + .failure(node1, "mock failure") + .success(node2, channel2) .build(); // When CompletionStage initFuture = controlConnection.init(false, false); - factoryHelper.waitForCall(ADDRESS1); - factoryHelper.waitForCall(ADDRESS2); + factoryHelper.waitForCall(node1); + factoryHelper.waitForCall(node2); waitForPendingAdminTasks(); // Then @@ -115,14 +115,14 @@ public void should_fail_to_init_if_all_contact_points_fail() { // Given MockChannelFactoryHelper factoryHelper = MockChannelFactoryHelper.builder(channelFactory) - .failure(ADDRESS1, "mock failure") - .failure(ADDRESS2, "mock failure") + .failure(node1, "mock failure") + .failure(node2, "mock failure") .build(); // When CompletionStage initFuture = controlConnection.init(false, false); - factoryHelper.waitForCall(ADDRESS1); - factoryHelper.waitForCall(ADDRESS2); + factoryHelper.waitForCall(node1); + factoryHelper.waitForCall(node2); waitForPendingAdminTasks(); // Then @@ -143,13 +143,13 @@ public void should_reconnect_if_channel_goes_down() throws Exception { DriverChannel channel2 = newMockDriverChannel(2); MockChannelFactoryHelper factoryHelper = MockChannelFactoryHelper.builder(channelFactory) - .success(ADDRESS1, channel1) - .failure(ADDRESS1, "mock failure") - .success(ADDRESS2, channel2) + .success(node1, channel1) + .failure(node1, "mock failure") + .success(node2, channel2) .build(); CompletionStage initFuture = controlConnection.init(false, false); - factoryHelper.waitForCall(ADDRESS1); + factoryHelper.waitForCall(node1); waitForPendingAdminTasks(); assertThat(initFuture).isSuccess(); @@ -163,8 +163,8 @@ public void should_reconnect_if_channel_goes_down() throws Exception { // Then // a reconnection was started Mockito.verify(reconnectionSchedule).nextDelay(); - factoryHelper.waitForCall(ADDRESS1); - factoryHelper.waitForCall(ADDRESS2); + factoryHelper.waitForCall(node1); + factoryHelper.waitForCall(node2); waitForPendingAdminTasks(); assertThat(controlConnection.channel()).isEqualTo(channel2); Mockito.verify(eventBus).fire(ChannelEvent.channelClosed(node1)); @@ -183,12 +183,12 @@ public void should_reconnect_if_node_becomes_ignored() { DriverChannel channel2 = newMockDriverChannel(2); MockChannelFactoryHelper factoryHelper = MockChannelFactoryHelper.builder(channelFactory) - .success(ADDRESS1, channel1) - .success(ADDRESS2, channel2) + .success(node1, channel1) + .success(node2, channel2) .build(); CompletionStage initFuture = controlConnection.init(false, false); - factoryHelper.waitForCall(ADDRESS1); + factoryHelper.waitForCall(node1); waitForPendingAdminTasks(); assertThat(initFuture).isSuccess(); @@ -203,7 +203,7 @@ public void should_reconnect_if_node_becomes_ignored() { // Then // an immediate reconnection was started Mockito.verify(reconnectionSchedule, never()).nextDelay(); - factoryHelper.waitForCall(ADDRESS2); + factoryHelper.waitForCall(node2); waitForPendingAdminTasks(); assertThat(controlConnection.channel()).isEqualTo(channel2); Mockito.verify(eventBus).fire(ChannelEvent.channelClosed(node1)); @@ -231,12 +231,12 @@ private void should_reconnect_if_event(NodeStateEvent event) { DriverChannel channel2 = newMockDriverChannel(2); MockChannelFactoryHelper factoryHelper = MockChannelFactoryHelper.builder(channelFactory) - .success(ADDRESS1, channel1) - .success(ADDRESS2, channel2) + .success(node1, channel1) + .success(node2, channel2) .build(); CompletionStage initFuture = controlConnection.init(false, false); - factoryHelper.waitForCall(ADDRESS1); + factoryHelper.waitForCall(node1); waitForPendingAdminTasks(); assertThat(initFuture).isSuccess(); @@ -251,7 +251,7 @@ private void should_reconnect_if_event(NodeStateEvent event) { // Then // an immediate reconnection was started Mockito.verify(reconnectionSchedule, never()).nextDelay(); - factoryHelper.waitForCall(ADDRESS2); + factoryHelper.waitForCall(node2); waitForPendingAdminTasks(); assertThat(controlConnection.channel()).isEqualTo(channel2); Mockito.verify(eventBus).fire(ChannelEvent.channelClosed(node1)); @@ -273,14 +273,14 @@ public void should_reconnect_if_node_became_ignored_during_reconnection_attempt( MockChannelFactoryHelper factoryHelper = MockChannelFactoryHelper.builder(channelFactory) // init - .success(ADDRESS1, channel1) + .success(node1, channel1) // reconnection - .pending(ADDRESS2, channel2Future) - .success(ADDRESS1, channel3) + .pending(node2, channel2Future) + .success(node1, channel3) .build(); CompletionStage initFuture = controlConnection.init(false, false); - factoryHelper.waitForCall(ADDRESS1); + factoryHelper.waitForCall(node1); waitForPendingAdminTasks(); assertThat(initFuture).isSuccess(); @@ -294,7 +294,7 @@ public void should_reconnect_if_node_became_ignored_during_reconnection_attempt( Mockito.verify(eventBus).fire(ChannelEvent.channelClosed(node1)); Mockito.verify(reconnectionSchedule).nextDelay(); // the reconnection to node2 is in progress - factoryHelper.waitForCall(ADDRESS2); + factoryHelper.waitForCall(node2); // When // node2 becomes ignored @@ -306,7 +306,7 @@ public void should_reconnect_if_node_became_ignored_during_reconnection_attempt( // Then // The channel should get closed and we should try the next node Mockito.verify(channel2).forceClose(); - factoryHelper.waitForCall(ADDRESS1); + factoryHelper.waitForCall(node1); } @Test @@ -330,14 +330,14 @@ private void should_reconnect_if_event_during_reconnection_attempt(NodeStateEven MockChannelFactoryHelper factoryHelper = MockChannelFactoryHelper.builder(channelFactory) // init - .success(ADDRESS1, channel1) + .success(node1, channel1) // reconnection - .pending(ADDRESS2, channel2Future) - .success(ADDRESS1, channel3) + .pending(node2, channel2Future) + .success(node1, channel3) .build(); CompletionStage initFuture = controlConnection.init(false, false); - factoryHelper.waitForCall(ADDRESS1); + factoryHelper.waitForCall(node1); waitForPendingAdminTasks(); assertThat(initFuture).isSuccess(); @@ -351,7 +351,7 @@ private void should_reconnect_if_event_during_reconnection_attempt(NodeStateEven Mockito.verify(eventBus).fire(ChannelEvent.channelClosed(node1)); Mockito.verify(reconnectionSchedule).nextDelay(); // the reconnection to node2 is in progress - factoryHelper.waitForCall(ADDRESS2); + factoryHelper.waitForCall(node2); // When // node2 goes into the new state @@ -363,7 +363,7 @@ private void should_reconnect_if_event_during_reconnection_attempt(NodeStateEven // Then // The channel should get closed and we should try the next node Mockito.verify(channel2).forceClose(); - factoryHelper.waitForCall(ADDRESS1); + factoryHelper.waitForCall(node1); } @Test @@ -375,13 +375,13 @@ public void should_force_reconnection_if_pending() { DriverChannel channel2 = newMockDriverChannel(2); MockChannelFactoryHelper factoryHelper = MockChannelFactoryHelper.builder(channelFactory) - .success(ADDRESS1, channel1) - .failure(ADDRESS1, "mock failure") - .success(ADDRESS2, channel2) + .success(node1, channel1) + .failure(node1, "mock failure") + .success(node2, channel2) .build(); CompletionStage initFuture = controlConnection.init(false, false); - factoryHelper.waitForCall(ADDRESS1); + factoryHelper.waitForCall(node1); waitForPendingAdminTasks(); assertThat(initFuture).isSuccess(); assertThat(controlConnection.channel()).isEqualTo(channel1); @@ -395,8 +395,8 @@ public void should_force_reconnection_if_pending() { // When controlConnection.reconnectNow(); - factoryHelper.waitForCall(ADDRESS1); - factoryHelper.waitForCall(ADDRESS2); + factoryHelper.waitForCall(node1); + factoryHelper.waitForCall(node2); waitForPendingAdminTasks(); // Then @@ -413,13 +413,13 @@ public void should_force_reconnection_even_if_connected() { DriverChannel channel2 = newMockDriverChannel(2); MockChannelFactoryHelper factoryHelper = MockChannelFactoryHelper.builder(channelFactory) - .success(ADDRESS1, channel1) - .failure(ADDRESS1, "mock failure") - .success(ADDRESS2, channel2) + .success(node1, channel1) + .failure(node1, "mock failure") + .success(node2, channel2) .build(); CompletionStage initFuture = controlConnection.init(false, false); - factoryHelper.waitForCall(ADDRESS1); + factoryHelper.waitForCall(node1); waitForPendingAdminTasks(); assertThat(initFuture).isSuccess(); assertThat(controlConnection.channel()).isEqualTo(channel1); @@ -429,8 +429,8 @@ public void should_force_reconnection_even_if_connected() { controlConnection.reconnectNow(); // Then - factoryHelper.waitForCall(ADDRESS1); - factoryHelper.waitForCall(ADDRESS2); + factoryHelper.waitForCall(node1); + factoryHelper.waitForCall(node2); waitForPendingAdminTasks(); assertThat(controlConnection.channel()).isEqualTo(channel2); Mockito.verify(channel1).forceClose(); @@ -455,9 +455,9 @@ public void should_not_force_reconnection_if_closed() { // Given DriverChannel channel1 = newMockDriverChannel(1); MockChannelFactoryHelper factoryHelper = - MockChannelFactoryHelper.builder(channelFactory).success(ADDRESS1, channel1).build(); + MockChannelFactoryHelper.builder(channelFactory).success(node1, channel1).build(); CompletionStage initFuture = controlConnection.init(false, false); - factoryHelper.waitForCall(ADDRESS1); + factoryHelper.waitForCall(node1); waitForPendingAdminTasks(); assertThat(initFuture).isSuccess(); CompletionStage closeFuture = controlConnection.forceCloseAsync(); @@ -478,10 +478,10 @@ public void should_close_channel_when_closing() { // Given DriverChannel channel1 = newMockDriverChannel(1); MockChannelFactoryHelper factoryHelper = - MockChannelFactoryHelper.builder(channelFactory).success(ADDRESS1, channel1).build(); + MockChannelFactoryHelper.builder(channelFactory).success(node1, channel1).build(); CompletionStage initFuture = controlConnection.init(false, false); - factoryHelper.waitForCall(ADDRESS1); + factoryHelper.waitForCall(node1); waitForPendingAdminTasks(); assertThat(initFuture).isSuccess(); @@ -506,13 +506,13 @@ public void should_close_channel_if_closed_during_reconnection() { CompletableFuture channel2Future = new CompletableFuture<>(); MockChannelFactoryHelper factoryHelper = MockChannelFactoryHelper.builder(channelFactory) - .success(ADDRESS1, channel1) - .failure(ADDRESS1, "mock failure") - .pending(ADDRESS2, channel2Future) + .success(node1, channel1) + .failure(node1, "mock failure") + .pending(node2, channel2Future) .build(); CompletionStage initFuture = controlConnection.init(false, false); - factoryHelper.waitForCall(ADDRESS1); + factoryHelper.waitForCall(node1); waitForPendingAdminTasks(); assertThat(initFuture).isSuccess(); assertThat(controlConnection.channel()).isEqualTo(channel1); @@ -523,9 +523,9 @@ public void should_close_channel_if_closed_during_reconnection() { waitForPendingAdminTasks(); Mockito.verify(eventBus).fire(ChannelEvent.channelClosed(node1)); Mockito.verify(reconnectionSchedule).nextDelay(); - factoryHelper.waitForCall(ADDRESS1); + factoryHelper.waitForCall(node1); // channel2 starts initializing (but the future is not completed yet) - factoryHelper.waitForCall(ADDRESS2); + factoryHelper.waitForCall(node2); // When // the control connection gets closed before channel2 initialization is complete @@ -553,13 +553,13 @@ public void should_handle_channel_failure_if_closed_during_reconnection() { CompletableFuture channel1Future = new CompletableFuture<>(); MockChannelFactoryHelper factoryHelper = MockChannelFactoryHelper.builder(channelFactory) - .success(ADDRESS1, channel1) - .pending(ADDRESS1, channel1Future) - .success(ADDRESS2, channel2) + .success(node1, channel1) + .pending(node1, channel1Future) + .success(node2, channel2) .build(); CompletionStage initFuture = controlConnection.init(false, false); - factoryHelper.waitForCall(ADDRESS1); + factoryHelper.waitForCall(node1); waitForPendingAdminTasks(); assertThat(initFuture).isSuccess(); assertThat(controlConnection.channel()).isEqualTo(channel1); @@ -571,7 +571,7 @@ public void should_handle_channel_failure_if_closed_during_reconnection() { Mockito.verify(eventBus).fire(ChannelEvent.channelClosed(node1)); Mockito.verify(reconnectionSchedule).nextDelay(); // channel1 starts initializing (but the future is not completed yet) - factoryHelper.waitForCall(ADDRESS1); + factoryHelper.waitForCall(node1); // When // the control connection gets closed before channel1 initialization fails diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/control/ControlConnectionTestBase.java b/core/src/test/java/com/datastax/oss/driver/internal/core/control/ControlConnectionTestBase.java index 0a201218790..c21443f9d28 100644 --- a/core/src/test/java/com/datastax/oss/driver/internal/core/control/ControlConnectionTestBase.java +++ b/core/src/test/java/com/datastax/oss/driver/internal/core/control/ControlConnectionTestBase.java @@ -38,7 +38,6 @@ import io.netty.channel.EventLoop; import io.netty.util.concurrent.Future; import java.net.InetSocketAddress; -import java.net.SocketAddress; import java.time.Duration; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentLinkedQueue; @@ -87,7 +86,7 @@ public void setup() { Mockito.when(context.channelFactory()).thenReturn(channelFactory); channelFactoryFuture = new Exchanger<>(); - Mockito.when(channelFactory.connect(any(SocketAddress.class), any(DriverChannelOptions.class))) + Mockito.when(channelFactory.connect(any(Node.class), any(DriverChannelOptions.class))) .thenAnswer( invocation -> { CompletableFuture channelFuture = new CompletableFuture<>(); diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/pool/ChannelPoolInitTest.java b/core/src/test/java/com/datastax/oss/driver/internal/core/pool/ChannelPoolInitTest.java index 9d3f7a2f7a8..0dada4d4ce9 100644 --- a/core/src/test/java/com/datastax/oss/driver/internal/core/pool/ChannelPoolInitTest.java +++ b/core/src/test/java/com/datastax/oss/driver/internal/core/pool/ChannelPoolInitTest.java @@ -47,15 +47,15 @@ public void should_initialize_when_all_channels_succeed() throws Exception { DriverChannel channel3 = newMockDriverChannel(3); MockChannelFactoryHelper factoryHelper = MockChannelFactoryHelper.builder(channelFactory) - .success(ADDRESS, channel1) - .success(ADDRESS, channel2) - .success(ADDRESS, channel3) + .success(node, channel1) + .success(node, channel2) + .success(node, channel3) .build(); CompletionStage poolFuture = ChannelPool.init(node, null, NodeDistance.LOCAL, context, "test"); - factoryHelper.waitForCalls(ADDRESS, 3); + factoryHelper.waitForCalls(node, 3); waitForPendingAdminTasks(); assertThat(poolFuture) @@ -72,15 +72,15 @@ public void should_initialize_when_all_channels_fail() throws Exception { MockChannelFactoryHelper factoryHelper = MockChannelFactoryHelper.builder(channelFactory) - .failure(ADDRESS, "mock channel init failure") - .failure(ADDRESS, "mock channel init failure") - .failure(ADDRESS, "mock channel init failure") + .failure(node, "mock channel init failure") + .failure(node, "mock channel init failure") + .failure(node, "mock channel init failure") .build(); CompletionStage poolFuture = ChannelPool.init(node, null, NodeDistance.LOCAL, context, "test"); - factoryHelper.waitForCalls(ADDRESS, 3); + factoryHelper.waitForCalls(node, 3); waitForPendingAdminTasks(); assertThat(poolFuture).isSuccess(pool -> assertThat(pool.channels).isEmpty()); @@ -98,15 +98,15 @@ public void should_indicate_when_keyspace_failed_on_all_channels() { MockChannelFactoryHelper factoryHelper = MockChannelFactoryHelper.builder(channelFactory) - .failure(ADDRESS, new InvalidKeyspaceException("invalid keyspace")) - .failure(ADDRESS, new InvalidKeyspaceException("invalid keyspace")) - .failure(ADDRESS, new InvalidKeyspaceException("invalid keyspace")) + .failure(node, new InvalidKeyspaceException("invalid keyspace")) + .failure(node, new InvalidKeyspaceException("invalid keyspace")) + .failure(node, new InvalidKeyspaceException("invalid keyspace")) .build(); CompletionStage poolFuture = ChannelPool.init(node, null, NodeDistance.LOCAL, context, "test"); - factoryHelper.waitForCalls(ADDRESS, 3); + factoryHelper.waitForCalls(node, 3); waitForPendingAdminTasks(); assertThat(poolFuture) .isSuccess( @@ -126,14 +126,14 @@ public void should_fire_force_down_event_when_cluster_name_does_not_match() thro new ClusterNameMismatchException(ADDRESS, "actual", "expected"); MockChannelFactoryHelper factoryHelper = MockChannelFactoryHelper.builder(channelFactory) - .failure(ADDRESS, error) - .failure(ADDRESS, error) - .failure(ADDRESS, error) + .failure(node, error) + .failure(node, error) + .failure(node, error) .build(); ChannelPool.init(node, null, NodeDistance.LOCAL, context, "test"); - factoryHelper.waitForCalls(ADDRESS, 3); + factoryHelper.waitForCalls(node, 3); waitForPendingAdminTasks(); Mockito.verify(eventBus).fire(TopologyEvent.forceDown(ADDRESS)); @@ -158,17 +158,17 @@ public void should_reconnect_when_init_incomplete() throws Exception { MockChannelFactoryHelper factoryHelper = MockChannelFactoryHelper.builder(channelFactory) // Init: 1 channel fails, the other succeeds - .failure(ADDRESS, "mock channel init failure") - .success(ADDRESS, channel1) + .failure(node, "mock channel init failure") + .success(node, channel1) // 1st reconnection - .pending(ADDRESS, channel2Future) + .pending(node, channel2Future) .build(); InOrder inOrder = Mockito.inOrder(eventBus); CompletionStage poolFuture = ChannelPool.init(node, null, NodeDistance.LOCAL, context, "test"); - factoryHelper.waitForCalls(ADDRESS, 2); + factoryHelper.waitForCalls(node, 2); waitForPendingAdminTasks(); assertThat(poolFuture).isSuccess(); @@ -181,7 +181,7 @@ public void should_reconnect_when_init_incomplete() throws Exception { inOrder.verify(eventBus).fire(ChannelEvent.reconnectionStarted(node)); channel2Future.complete(channel2); - factoryHelper.waitForCalls(ADDRESS, 1); + factoryHelper.waitForCalls(node, 1); waitForPendingAdminTasks(); inOrder.verify(eventBus).fire(ChannelEvent.channelOpened(node)); inOrder.verify(eventBus).fire(ChannelEvent.reconnectionStopped(node)); diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/pool/ChannelPoolKeyspaceTest.java b/core/src/test/java/com/datastax/oss/driver/internal/core/pool/ChannelPoolKeyspaceTest.java index b65ed11f752..3f087f3b5c1 100644 --- a/core/src/test/java/com/datastax/oss/driver/internal/core/pool/ChannelPoolKeyspaceTest.java +++ b/core/src/test/java/com/datastax/oss/driver/internal/core/pool/ChannelPoolKeyspaceTest.java @@ -40,14 +40,14 @@ public void should_switch_keyspace_on_existing_channels() throws Exception { DriverChannel channel2 = newMockDriverChannel(2); MockChannelFactoryHelper factoryHelper = MockChannelFactoryHelper.builder(channelFactory) - .success(ADDRESS, channel1) - .success(ADDRESS, channel2) + .success(node, channel1) + .success(node, channel2) .build(); CompletionStage poolFuture = ChannelPool.init(node, null, NodeDistance.LOCAL, context, "test"); - factoryHelper.waitForCalls(ADDRESS, 2); + factoryHelper.waitForCalls(node, 2); waitForPendingAdminTasks(); assertThat(poolFuture).isSuccess(); @@ -80,17 +80,17 @@ public void should_switch_keyspace_on_pending_channels() throws Exception { MockChannelFactoryHelper factoryHelper = MockChannelFactoryHelper.builder(channelFactory) // init - .failure(ADDRESS, "mock channel init failure") - .failure(ADDRESS, "mock channel init failure") + .failure(node, "mock channel init failure") + .failure(node, "mock channel init failure") // reconnection - .pending(ADDRESS, channel1Future) - .pending(ADDRESS, channel2Future) + .pending(node, channel1Future) + .pending(node, channel2Future) .build(); CompletionStage poolFuture = ChannelPool.init(node, null, NodeDistance.LOCAL, context, "test"); - factoryHelper.waitForCalls(ADDRESS, 2); + factoryHelper.waitForCalls(node, 2); waitForPendingAdminTasks(); assertThat(poolFuture).isSuccess(); @@ -99,7 +99,7 @@ public void should_switch_keyspace_on_pending_channels() throws Exception { // Check that reconnection has kicked in, but do not complete it yet Mockito.verify(reconnectionSchedule).nextDelay(); Mockito.verify(eventBus).fire(ChannelEvent.reconnectionStarted(node)); - factoryHelper.waitForCalls(ADDRESS, 2); + factoryHelper.waitForCalls(node, 2); // Switch keyspace, it succeeds immediately since there is no active channel CqlIdentifier newKeyspace = CqlIdentifier.fromCql("new_keyspace"); diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/pool/ChannelPoolReconnectTest.java b/core/src/test/java/com/datastax/oss/driver/internal/core/pool/ChannelPoolReconnectTest.java index dd92c90474f..8194157c839 100644 --- a/core/src/test/java/com/datastax/oss/driver/internal/core/pool/ChannelPoolReconnectTest.java +++ b/core/src/test/java/com/datastax/oss/driver/internal/core/pool/ChannelPoolReconnectTest.java @@ -50,17 +50,17 @@ public void should_reconnect_when_channel_closes() throws Exception { MockChannelFactoryHelper factoryHelper = MockChannelFactoryHelper.builder(channelFactory) // init - .success(ADDRESS, channel1) - .success(ADDRESS, channel2) + .success(node, channel1) + .success(node, channel2) // reconnection - .pending(ADDRESS, channel3Future) + .pending(node, channel3Future) .build(); InOrder inOrder = Mockito.inOrder(eventBus); CompletionStage poolFuture = ChannelPool.init(node, null, NodeDistance.LOCAL, context, "test"); - factoryHelper.waitForCalls(ADDRESS, 2); + factoryHelper.waitForCalls(node, 2); waitForPendingAdminTasks(); assertThat(poolFuture).isSuccess(); @@ -76,7 +76,7 @@ public void should_reconnect_when_channel_closes() throws Exception { Mockito.verify(reconnectionSchedule).nextDelay(); inOrder.verify(eventBus).fire(ChannelEvent.reconnectionStarted(node)); - factoryHelper.waitForCall(ADDRESS); + factoryHelper.waitForCall(node); channel3Future.complete(channel3); waitForPendingAdminTasks(); @@ -102,17 +102,17 @@ public void should_reconnect_when_channel_starts_graceful_shutdown() throws Exce MockChannelFactoryHelper factoryHelper = MockChannelFactoryHelper.builder(channelFactory) // init - .success(ADDRESS, channel1) - .success(ADDRESS, channel2) + .success(node, channel1) + .success(node, channel2) // reconnection - .pending(ADDRESS, channel3Future) + .pending(node, channel3Future) .build(); InOrder inOrder = Mockito.inOrder(eventBus); CompletionStage poolFuture = ChannelPool.init(node, null, NodeDistance.LOCAL, context, "test"); - factoryHelper.waitForCalls(ADDRESS, 2); + factoryHelper.waitForCalls(node, 2); waitForPendingAdminTasks(); assertThat(poolFuture).isSuccess(); @@ -127,7 +127,7 @@ public void should_reconnect_when_channel_starts_graceful_shutdown() throws Exce Mockito.verify(reconnectionSchedule).nextDelay(); inOrder.verify(eventBus).fire(ChannelEvent.reconnectionStarted(node)); - factoryHelper.waitForCall(ADDRESS); + factoryHelper.waitForCall(node); channel3Future.complete(channel3); waitForPendingAdminTasks(); @@ -153,9 +153,9 @@ public void should_let_current_attempt_complete_when_reconnecting_now() MockChannelFactoryHelper factoryHelper = MockChannelFactoryHelper.builder(channelFactory) // init - .success(ADDRESS, channel1) + .success(node, channel1) // reconnection - .pending(ADDRESS, channel2Future) + .pending(node, channel2Future) .build(); InOrder inOrder = Mockito.inOrder(eventBus); @@ -163,7 +163,7 @@ public void should_let_current_attempt_complete_when_reconnecting_now() // Initial connection CompletionStage poolFuture = ChannelPool.init(node, null, NodeDistance.LOCAL, context, "test"); - factoryHelper.waitForCalls(ADDRESS, 1); + factoryHelper.waitForCalls(node, 1); waitForPendingAdminTasks(); assertThat(poolFuture).isSuccess(); ChannelPool pool = poolFuture.toCompletableFuture().get(); @@ -176,7 +176,7 @@ public void should_let_current_attempt_complete_when_reconnecting_now() inOrder.verify(eventBus).fire(ChannelEvent.channelClosed(node)); inOrder.verify(eventBus).fire(ChannelEvent.reconnectionStarted(node)); Mockito.verify(reconnectionSchedule).nextDelay(); - factoryHelper.waitForCalls(ADDRESS, 1); + factoryHelper.waitForCalls(node, 1); // Force a reconnection, should not try to create a new channel since we have a pending one pool.reconnectNow(); diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/pool/ChannelPoolResizeTest.java b/core/src/test/java/com/datastax/oss/driver/internal/core/pool/ChannelPoolResizeTest.java index 04aa316f56f..a0144ebe4c0 100644 --- a/core/src/test/java/com/datastax/oss/driver/internal/core/pool/ChannelPoolResizeTest.java +++ b/core/src/test/java/com/datastax/oss/driver/internal/core/pool/ChannelPoolResizeTest.java @@ -47,17 +47,17 @@ public void should_shrink_outside_of_reconnection() throws Exception { DriverChannel channel4 = newMockDriverChannel(4); MockChannelFactoryHelper factoryHelper = MockChannelFactoryHelper.builder(channelFactory) - .success(ADDRESS, channel1) - .success(ADDRESS, channel2) - .success(ADDRESS, channel3) - .success(ADDRESS, channel4) + .success(node, channel1) + .success(node, channel2) + .success(node, channel3) + .success(node, channel4) .build(); InOrder inOrder = Mockito.inOrder(eventBus); CompletionStage poolFuture = ChannelPool.init(node, null, NodeDistance.REMOTE, context, "test"); - factoryHelper.waitForCalls(ADDRESS, 4); + factoryHelper.waitForCalls(node, 4); waitForPendingAdminTasks(); assertThat(poolFuture).isSuccess(); @@ -93,20 +93,20 @@ public void should_shrink_during_reconnection() throws Exception { MockChannelFactoryHelper factoryHelper = MockChannelFactoryHelper.builder(channelFactory) // init - .success(ADDRESS, channel1) - .success(ADDRESS, channel2) - .failure(ADDRESS, "mock channel init failure") - .failure(ADDRESS, "mock channel init failure") + .success(node, channel1) + .success(node, channel2) + .failure(node, "mock channel init failure") + .failure(node, "mock channel init failure") // reconnection - .pending(ADDRESS, channel3Future) - .pending(ADDRESS, channel4Future) + .pending(node, channel3Future) + .pending(node, channel4Future) .build(); InOrder inOrder = Mockito.inOrder(eventBus); CompletionStage poolFuture = ChannelPool.init(node, null, NodeDistance.REMOTE, context, "test"); - factoryHelper.waitForCalls(ADDRESS, 4); + factoryHelper.waitForCalls(node, 4); waitForPendingAdminTasks(); inOrder.verify(eventBus, times(2)).fire(ChannelEvent.channelOpened(node)); @@ -126,7 +126,7 @@ public void should_shrink_during_reconnection() throws Exception { channel3Future.complete(channel3); channel4Future.complete(channel4); - factoryHelper.waitForCalls(ADDRESS, 2); + factoryHelper.waitForCalls(node, 2); waitForPendingAdminTasks(); // Pool should have shrinked back to 2. We keep the most recent channels so 1 and 2 get closed. @@ -154,18 +154,18 @@ public void should_grow_outside_of_reconnection() throws Exception { MockChannelFactoryHelper factoryHelper = MockChannelFactoryHelper.builder(channelFactory) // init - .success(ADDRESS, channel1) - .success(ADDRESS, channel2) + .success(node, channel1) + .success(node, channel2) // growth attempt - .success(ADDRESS, channel3) - .success(ADDRESS, channel4) + .success(node, channel3) + .success(node, channel4) .build(); InOrder inOrder = Mockito.inOrder(eventBus); CompletionStage poolFuture = ChannelPool.init(node, null, NodeDistance.LOCAL, context, "test"); - factoryHelper.waitForCalls(ADDRESS, 2); + factoryHelper.waitForCalls(node, 2); waitForPendingAdminTasks(); inOrder.verify(eventBus, times(2)).fire(ChannelEvent.channelOpened(node)); @@ -180,7 +180,7 @@ public void should_grow_outside_of_reconnection() throws Exception { Mockito.verify(reconnectionSchedule).nextDelay(); inOrder.verify(eventBus).fire(ChannelEvent.reconnectionStarted(node)); - factoryHelper.waitForCalls(ADDRESS, 2); + factoryHelper.waitForCalls(node, 2); waitForPendingAdminTasks(); inOrder.verify(eventBus, times(2)).fire(ChannelEvent.channelOpened(node)); inOrder.verify(eventBus).fire(ChannelEvent.reconnectionStopped(node)); @@ -209,20 +209,20 @@ public void should_grow_during_reconnection() throws Exception { MockChannelFactoryHelper factoryHelper = MockChannelFactoryHelper.builder(channelFactory) // init - .success(ADDRESS, channel1) - .failure(ADDRESS, "mock channel init failure") + .success(node, channel1) + .failure(node, "mock channel init failure") // first reconnection attempt - .pending(ADDRESS, channel2Future) + .pending(node, channel2Future) // extra reconnection attempt after we realize the pool must grow - .pending(ADDRESS, channel3Future) - .pending(ADDRESS, channel4Future) + .pending(node, channel3Future) + .pending(node, channel4Future) .build(); InOrder inOrder = Mockito.inOrder(eventBus); CompletionStage poolFuture = ChannelPool.init(node, null, NodeDistance.LOCAL, context, "test"); - factoryHelper.waitForCalls(ADDRESS, 2); + factoryHelper.waitForCalls(node, 2); waitForPendingAdminTasks(); inOrder.verify(eventBus).fire(ChannelEvent.channelOpened(node)); @@ -240,7 +240,7 @@ public void should_grow_during_reconnection() throws Exception { // Complete the channel for the first reconnection, bringing the count to 2 channel2Future.complete(channel2); - factoryHelper.waitForCall(ADDRESS); + factoryHelper.waitForCall(node); waitForPendingAdminTasks(); inOrder.verify(eventBus).fire(ChannelEvent.channelOpened(node)); @@ -253,7 +253,7 @@ public void should_grow_during_reconnection() throws Exception { inOrder.verify(eventBus, never()).fire(ChannelEvent.reconnectionStarted(node)); // Two more channels get opened, bringing us to the target count - factoryHelper.waitForCalls(ADDRESS, 2); + factoryHelper.waitForCalls(node, 2); channel3Future.complete(channel3); channel4Future.complete(channel4); waitForPendingAdminTasks(); @@ -279,18 +279,18 @@ public void should_resize_outside_of_reconnection_if_config_changes() throws Exc MockChannelFactoryHelper factoryHelper = MockChannelFactoryHelper.builder(channelFactory) // init - .success(ADDRESS, channel1) - .success(ADDRESS, channel2) + .success(node, channel1) + .success(node, channel2) // growth attempt - .success(ADDRESS, channel3) - .success(ADDRESS, channel4) + .success(node, channel3) + .success(node, channel4) .build(); InOrder inOrder = Mockito.inOrder(eventBus); CompletionStage poolFuture = ChannelPool.init(node, null, NodeDistance.LOCAL, context, "test"); - factoryHelper.waitForCalls(ADDRESS, 2); + factoryHelper.waitForCalls(node, 2); waitForPendingAdminTasks(); inOrder.verify(eventBus, times(2)).fire(ChannelEvent.channelOpened(node)); @@ -308,7 +308,7 @@ public void should_resize_outside_of_reconnection_if_config_changes() throws Exc Mockito.verify(reconnectionSchedule).nextDelay(); inOrder.verify(eventBus).fire(ChannelEvent.reconnectionStarted(node)); - factoryHelper.waitForCalls(ADDRESS, 2); + factoryHelper.waitForCalls(node, 2); waitForPendingAdminTasks(); inOrder.verify(eventBus, times(2)).fire(ChannelEvent.channelOpened(node)); inOrder.verify(eventBus).fire(ChannelEvent.reconnectionStopped(node)); @@ -335,20 +335,20 @@ public void should_resize_during_reconnection_if_config_changes() throws Excepti MockChannelFactoryHelper factoryHelper = MockChannelFactoryHelper.builder(channelFactory) // init - .success(ADDRESS, channel1) - .failure(ADDRESS, "mock channel init failure") + .success(node, channel1) + .failure(node, "mock channel init failure") // first reconnection attempt - .pending(ADDRESS, channel2Future) + .pending(node, channel2Future) // extra reconnection attempt after we realize the pool must grow - .pending(ADDRESS, channel3Future) - .pending(ADDRESS, channel4Future) + .pending(node, channel3Future) + .pending(node, channel4Future) .build(); InOrder inOrder = Mockito.inOrder(eventBus); CompletionStage poolFuture = ChannelPool.init(node, null, NodeDistance.LOCAL, context, "test"); - factoryHelper.waitForCalls(ADDRESS, 2); + factoryHelper.waitForCalls(node, 2); waitForPendingAdminTasks(); inOrder.verify(eventBus).fire(ChannelEvent.channelOpened(node)); @@ -368,7 +368,7 @@ public void should_resize_during_reconnection_if_config_changes() throws Excepti // Complete the channel for the first reconnection, bringing the count to 2 channel2Future.complete(channel2); - factoryHelper.waitForCall(ADDRESS); + factoryHelper.waitForCall(node); waitForPendingAdminTasks(); inOrder.verify(eventBus).fire(ChannelEvent.channelOpened(node)); @@ -381,7 +381,7 @@ public void should_resize_during_reconnection_if_config_changes() throws Excepti inOrder.verify(eventBus, never()).fire(ChannelEvent.reconnectionStarted(node)); // Two more channels get opened, bringing us to the target count - factoryHelper.waitForCalls(ADDRESS, 2); + factoryHelper.waitForCalls(node, 2); channel3Future.complete(channel3); channel4Future.complete(channel4); waitForPendingAdminTasks(); @@ -404,15 +404,15 @@ public void should_ignore_config_change_if_not_relevant() throws Exception { DriverChannel channel2 = newMockDriverChannel(2); MockChannelFactoryHelper factoryHelper = MockChannelFactoryHelper.builder(channelFactory) - .success(ADDRESS, channel1) - .success(ADDRESS, channel2) + .success(node, channel1) + .success(node, channel2) .build(); InOrder inOrder = Mockito.inOrder(eventBus); CompletionStage poolFuture = ChannelPool.init(node, null, NodeDistance.LOCAL, context, "test"); - factoryHelper.waitForCalls(ADDRESS, 2); + factoryHelper.waitForCalls(node, 2); waitForPendingAdminTasks(); inOrder.verify(eventBus, times(2)).fire(ChannelEvent.channelOpened(node)); diff --git a/core/src/test/java/com/datastax/oss/driver/internal/core/pool/ChannelPoolShutdownTest.java b/core/src/test/java/com/datastax/oss/driver/internal/core/pool/ChannelPoolShutdownTest.java index 2d0448655f8..01b22fe6763 100644 --- a/core/src/test/java/com/datastax/oss/driver/internal/core/pool/ChannelPoolShutdownTest.java +++ b/core/src/test/java/com/datastax/oss/driver/internal/core/pool/ChannelPoolShutdownTest.java @@ -49,18 +49,18 @@ public void should_close_all_channels_when_closed() throws Exception { MockChannelFactoryHelper factoryHelper = MockChannelFactoryHelper.builder(channelFactory) // init - .success(ADDRESS, channel1) - .success(ADDRESS, channel2) - .success(ADDRESS, channel3) + .success(node, channel1) + .success(node, channel2) + .success(node, channel3) // reconnection - .pending(ADDRESS, channel4Future) + .pending(node, channel4Future) .build(); InOrder inOrder = Mockito.inOrder(eventBus); CompletionStage poolFuture = ChannelPool.init(node, null, NodeDistance.LOCAL, context, "test"); - factoryHelper.waitForCalls(ADDRESS, 3); + factoryHelper.waitForCalls(node, 3); waitForPendingAdminTasks(); inOrder.verify(eventBus, times(3)).fire(ChannelEvent.channelOpened(node)); @@ -74,7 +74,7 @@ public void should_close_all_channels_when_closed() throws Exception { // Reconnection should have kicked in and started to open channel4, do not complete it yet Mockito.verify(reconnectionSchedule).nextDelay(); - factoryHelper.waitForCalls(ADDRESS, 1); + factoryHelper.waitForCalls(node, 1); CompletionStage closeFuture = pool.closeAsync(); waitForPendingAdminTasks(); @@ -121,18 +121,18 @@ public void should_force_close_all_channels_when_force_closed() throws Exception MockChannelFactoryHelper factoryHelper = MockChannelFactoryHelper.builder(channelFactory) // init - .success(ADDRESS, channel1) - .success(ADDRESS, channel2) - .success(ADDRESS, channel3) + .success(node, channel1) + .success(node, channel2) + .success(node, channel3) // reconnection - .pending(ADDRESS, channel4Future) + .pending(node, channel4Future) .build(); InOrder inOrder = Mockito.inOrder(eventBus); CompletionStage poolFuture = ChannelPool.init(node, null, NodeDistance.LOCAL, context, "test"); - factoryHelper.waitForCalls(ADDRESS, 3); + factoryHelper.waitForCalls(node, 3); waitForPendingAdminTasks(); assertThat(poolFuture).isSuccess(); @@ -146,7 +146,7 @@ public void should_force_close_all_channels_when_force_closed() throws Exception // Reconnection should have kicked in and started to open a channel, do not complete it yet Mockito.verify(reconnectionSchedule).nextDelay(); - factoryHelper.waitForCalls(ADDRESS, 1); + factoryHelper.waitForCalls(node, 1); CompletionStage closeFuture = pool.forceCloseAsync(); waitForPendingAdminTasks(); diff --git a/integration-tests/src/test/java/com/datastax/oss/driver/api/core/metrics/MetricsIT.java b/integration-tests/src/test/java/com/datastax/oss/driver/api/core/metrics/MetricsIT.java index 4eb4debf07c..c6ff94dc2ef 100644 --- a/integration-tests/src/test/java/com/datastax/oss/driver/api/core/metrics/MetricsIT.java +++ b/integration-tests/src/test/java/com/datastax/oss/driver/api/core/metrics/MetricsIT.java @@ -17,8 +17,10 @@ import static org.assertj.core.api.Assertions.assertThat; +import com.codahale.metrics.Meter; import com.codahale.metrics.Timer; import com.datastax.oss.driver.api.core.CqlSession; +import com.datastax.oss.driver.api.core.metadata.Node; import com.datastax.oss.driver.api.testinfra.ccm.CcmRule; import com.datastax.oss.driver.api.testinfra.session.SessionUtils; import com.datastax.oss.driver.categories.ParallelizableTests; @@ -52,6 +54,42 @@ public void should_expose_metrics() { } } + @Test + public void should_expose_bytes_sent_and_received() { + try (CqlSession session = + SessionUtils.newSession( + ccmRule, + "metrics.session.enabled = [ bytes-sent, bytes-received ]", + "metrics.node.enabled = [ bytes-sent, bytes-received ]")) { + for (int i = 0; i < 10; i++) { + session.execute("SELECT release_version FROM system.local"); + } + + assertThat(session.getMetrics()) + .hasValueSatisfying( + metrics -> { + Meter bytesSent = metrics.getSessionMetric(DefaultSessionMetric.BYTES_SENT); + assertThat(bytesSent).isNotNull(); + // Can't be precise here as payload can be dependent on protocol version. + assertThat(bytesSent.getCount()).isGreaterThan(0); + + Meter bytesReceived = metrics.getSessionMetric(DefaultSessionMetric.BYTES_RECEIVED); + assertThat(bytesReceived).isNotNull(); + assertThat(bytesReceived.getCount()).isGreaterThan(0); + + // get only node in cluster and evaluate its metrics. + Node node = session.getMetadata().getNodes().values().iterator().next(); + bytesSent = metrics.getNodeMetric(node, DefaultNodeMetric.BYTES_SENT); + assertThat(bytesSent).isNotNull(); + assertThat(bytesSent.getCount()).isGreaterThan(0); + + bytesReceived = metrics.getNodeMetric(node, DefaultNodeMetric.BYTES_RECEIVED); + assertThat(bytesReceived).isNotNull(); + assertThat(bytesReceived.getCount()).isGreaterThan(0); + }); + } + } + @Test public void should_not_expose_metrics_if_disabled() { try (CqlSession session =