Skip to content

Commit

Permalink
STAR-1151 Port ReadCoordinationMetrics (apache#364)
Browse files Browse the repository at this point in the history
Port ReadCoordinationMetrics to add metrics that count when the coordinator is either not a replica or a preferred replica for a read request.
  • Loading branch information
djatnieks committed Mar 14, 2022
1 parent 1284926 commit eb52d46
Show file tree
Hide file tree
Showing 3 changed files with 163 additions and 0 deletions.
70 changes: 70 additions & 0 deletions src/java/org/apache/cassandra/metrics/ReadCoordinationMetrics.java
@@ -0,0 +1,70 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.metrics;

import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
import java.util.concurrent.TimeUnit;

import com.google.common.annotations.VisibleForTesting;

import com.codahale.metrics.Counter;
import com.codahale.metrics.Histogram;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.locator.InetAddressAndPort;

import static org.apache.cassandra.metrics.CassandraMetricsRegistry.Metrics;

/**
* Metrics for read coordination behaviors.
*/
public final class ReadCoordinationMetrics
{
private static final MetricNameFactory factory = new DefaultNameFactory("ReadCoordination");

public static final Counter nonreplicaRequests = Metrics.counter(factory.createMetricName("LocalNodeNonreplicaRequests"));
public static final Counter preferredOtherReplicas = Metrics.counter(factory.createMetricName("PreferredOtherReplicas"));

private static final ConcurrentMap<InetAddressAndPort, Histogram> replicaLatencies = new ConcurrentHashMap<>();

public static void updateReplicaLatency(InetAddressAndPort address, long latency, TimeUnit timeUnit)
{
if (latency >= DatabaseDescriptor.getReadRpcTimeout(timeUnit))
return; // don't track timeouts

Histogram histogram = replicaLatencies.get(address);

// avoid computeIfAbsent() call on the common path
if (null == histogram)
histogram = replicaLatencies.computeIfAbsent(address, ReadCoordinationMetrics::createHistogram);

histogram.update(latency);
}

private static Histogram createHistogram(InetAddressAndPort h)
{
CassandraMetricsRegistry.MetricName metricName = DefaultNameFactory.createMetricName("ReadCoordination", "ReplicaLatency", h.getHostAddressAndPort().replace(':', '.'));
return Metrics.histogram(metricName, false);
}

@VisibleForTesting
static Histogram getReplicaLatencyHistogram(InetAddressAndPort address)
{
return replicaLatencies.get(address);
}
}
Expand Up @@ -39,13 +39,15 @@
import org.apache.cassandra.locator.ReplicaCollection;
import org.apache.cassandra.locator.ReplicaPlan;
import org.apache.cassandra.locator.ReplicaPlans;
import org.apache.cassandra.metrics.ReadCoordinationMetrics;
import org.apache.cassandra.net.Message;
import org.apache.cassandra.net.MessagingService;
import org.apache.cassandra.service.QueryInfoTracker;
import org.apache.cassandra.service.StorageProxy.LocalReadRunnable;
import org.apache.cassandra.service.reads.repair.ReadRepair;
import org.apache.cassandra.tracing.TraceState;
import org.apache.cassandra.tracing.Tracing;
import org.apache.cassandra.utils.FBUtilities;

import static com.google.common.collect.Iterables.all;
import static java.util.concurrent.TimeUnit.NANOSECONDS;
Expand Down Expand Up @@ -74,6 +76,11 @@ public abstract class AbstractReadExecutor
protected volatile PartitionIterator result = null;
protected final QueryInfoTracker.ReadTracker readTracker;

static
{
MessagingService.instance().latencySubscribers.subscribe(ReadCoordinationMetrics::updateReplicaLatency);
}

AbstractReadExecutor(ColumnFamilyStore cfs,
ReadCommand command,
ReplicaPlan.ForTokenRead replicaPlan,
Expand Down Expand Up @@ -201,6 +208,15 @@ public static AbstractReadExecutor getReadExecutor(SinglePartitionReadCommand co

ReplicaPlan.ForTokenRead replicaPlan = ReplicaPlans.forRead(keyspace, command.partitionKey().getToken(), command.indexQueryPlan(), consistencyLevel, retry);

if (replicaPlan.candidates().stream().noneMatch(replica -> replica.endpoint().equals(FBUtilities.getBroadcastAddressAndPort())))
{
ReadCoordinationMetrics.nonreplicaRequests.inc();
}
else if (replicaPlan.contacts().stream().noneMatch(replica -> replica.endpoint().equals(FBUtilities.getBroadcastAddressAndPort())))
{
ReadCoordinationMetrics.preferredOtherReplicas.inc();
}

// Speculative retry is disabled *OR*
// 11980: Disable speculative retry if using EACH_QUORUM in order to prevent miscounting DC responses
if (retry.equals(NeverSpeculativeRetryPolicy.INSTANCE) || consistencyLevel == ConsistencyLevel.EACH_QUORUM)
Expand Down
@@ -0,0 +1,77 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.cassandra.metrics;

import java.net.UnknownHostException;
import java.util.concurrent.TimeUnit;

import org.junit.BeforeClass;
import org.junit.Test;

import com.codahale.metrics.Histogram;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.locator.InetAddressAndPort;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;

public class ReadCoordinationMetricsTest
{
@BeforeClass()
public static void setup() throws ConfigurationException
{
DatabaseDescriptor.daemonInitialization();
}

@Test
public void testReadCoordinatorMetrics()
{
assertEquals(0, ReadCoordinationMetrics.nonreplicaRequests.getCount());
ReadCoordinationMetrics.nonreplicaRequests.inc();
assertEquals(1, ReadCoordinationMetrics.nonreplicaRequests.getCount());
ReadCoordinationMetrics.nonreplicaRequests.dec();
assertEquals(0, ReadCoordinationMetrics.nonreplicaRequests.getCount());

assertEquals(0, ReadCoordinationMetrics.preferredOtherReplicas.getCount());
ReadCoordinationMetrics.preferredOtherReplicas.inc();
assertEquals(1, ReadCoordinationMetrics.preferredOtherReplicas.getCount());
ReadCoordinationMetrics.preferredOtherReplicas.dec();
assertEquals(0, ReadCoordinationMetrics.preferredOtherReplicas.getCount());
}

@Test
public void testReplicaLatencyHistogram() throws UnknownHostException
{
InetAddressAndPort host = InetAddressAndPort.getByName("127.0.0.1");
assertNull(ReadCoordinationMetrics.getReplicaLatencyHistogram(host));

// Record a replica latency
ReadCoordinationMetrics.updateReplicaLatency(host, 100, TimeUnit.MILLISECONDS);

Histogram histogram = ReadCoordinationMetrics.getReplicaLatencyHistogram(host);
assertNotNull(histogram);
assertEquals(1, histogram.getCount());

// ReadRpcTimeout latency should not be tracked
ReadCoordinationMetrics.updateReplicaLatency(host, DatabaseDescriptor.getReadRpcTimeout(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS);
assertEquals(1, histogram.getCount());
}
}

0 comments on commit eb52d46

Please sign in to comment.