Skip to content

Commit

Permalink
Emit a metric for number of local read and write calls
Browse files Browse the repository at this point in the history
patch by Damien Stevenson; reviewed by Stefan Miklosovic and Brandon Williams for CASSANDRA-10023
  • Loading branch information
smiklosovic committed Dec 13, 2021
1 parent e99a8da commit c64ff69
Show file tree
Hide file tree
Showing 5 changed files with 280 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
@@ -1,4 +1,5 @@
4.1
* Emit a metric for number of local read and write calls
* Add non-blocking mode for CDC writes (CASSANDRA-17001)
* Add guardrails framework (CASSANDRA-17147)
* Harden resource management on SSTable components to prevent future leaks (CASSANDRA-17174)
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,8 @@ public class ClientRequestMetrics extends LatencyMetrics
public final Meter aborts;
public final Meter tombstoneAborts;
public final Meter readSizeAborts;
public final Meter localRequests;
public final Meter remoteRequests;

public ClientRequestMetrics(String scope)
{
Expand All @@ -48,6 +50,8 @@ public ClientRequestMetrics(String scope)
aborts = Metrics.meter(factory.createMetricName("Aborts"));
tombstoneAborts = Metrics.meter(factory.createMetricName("TombstoneAborts"));
readSizeAborts = Metrics.meter(factory.createMetricName("ReadSizeAborts"));
localRequests = Metrics.meter(factory.createMetricName("LocalRequests"));
remoteRequests = Metrics.meter(factory.createMetricName("RemoteRequests"));
}

public void markAbort(Throwable cause)
Expand All @@ -74,5 +78,7 @@ public void release()
Metrics.remove(factory.createMetricName("Aborts"));
Metrics.remove(factory.createMetricName("TombstoneAborts"));
Metrics.remove(factory.createMetricName("ReadSizeAborts"));
Metrics.remove(factory.createMetricName("LocalRequests"));
Metrics.remove(factory.createMetricName("RemoteRequests"));
}
}
42 changes: 42 additions & 0 deletions src/java/org/apache/cassandra/service/StorageProxy.java
Original file line number Diff line number Diff line change
Expand Up @@ -643,10 +643,14 @@ private static PrepareCallback preparePaxos(Commit toPrepare, ReplicaPlan.ForPax
{
PrepareCallback callback = new PrepareCallback(toPrepare.update.partitionKey(), toPrepare.update.metadata(), replicaPlan.requiredParticipants(), replicaPlan.consistencyLevel(), queryStartNanoTime);
Message<Commit> message = Message.out(PAXOS_PREPARE_REQ, toPrepare);

boolean hasLocalRequest = false;

for (Replica replica: replicaPlan.contacts())
{
if (replica.isSelf())
{
hasLocalRequest = true;
PAXOS_PREPARE_REQ.stage.execute(() -> {
try
{
Expand All @@ -663,6 +667,12 @@ private static PrepareCallback preparePaxos(Commit toPrepare, ReplicaPlan.ForPax
MessagingService.instance().sendWithCallback(message, replica.endpoint(), callback);
}
}

if (hasLocalRequest)
writeMetrics.localRequests.mark();
else
writeMetrics.remoteRequests.mark();

callback.await();
return callback;
}
Expand Down Expand Up @@ -795,6 +805,17 @@ protected Verb verb()
});
}

private static boolean hasLocalMutation(IMutation mutation)
{
return canDoLocalRequest(StorageService.instance.getNaturalEndpointsWithPort(mutation.getKeyspaceName(),
mutation.key().getKey()));
}

private static boolean canDoLocalRequest(List<String> endpoints)
{
return endpoints.contains(FBUtilities.getBroadcastAddressAndPort().getHostAddressAndPort());
}

/**
* Use this method to have these Mutations applied
* across all replicas. This method will take care
Expand All @@ -820,6 +841,11 @@ public static void mutate(List<? extends IMutation> mutations, ConsistencyLevel
{
for (IMutation mutation : mutations)
{
if (hasLocalMutation(mutation))
writeMetrics.localRequests.mark();
else
writeMetrics.remoteRequests.mark();

if (mutation instanceof CounterMutation)
responseHandlers.add(mutateCounter((CounterMutation)mutation, localDataCenter, queryStartNanoTime));
else
Expand Down Expand Up @@ -973,6 +999,11 @@ public static void mutateMV(ByteBuffer dataKey, Collection<Mutation> mutations,
// add a handler for each mutation - includes checking availability, but doesn't initiate any writes, yet
for (Mutation mutation : mutations)
{
if (hasLocalMutation(mutation))
writeMetrics.localRequests.mark();
else
writeMetrics.remoteRequests.mark();

String keyspaceName = mutation.getKeyspaceName();
Token tk = mutation.key().getToken();
AbstractReplicationStrategy replicationStrategy = Keyspace.open(keyspaceName).getReplicationStrategy();
Expand Down Expand Up @@ -1139,6 +1170,11 @@ public static void mutateAtomically(Collection<Mutation> mutations,
// add a handler for each mutation - includes checking availability, but doesn't initiate any writes, yet
for (Mutation mutation : mutations)
{
if (hasLocalMutation(mutation))
writeMetrics.localRequests.mark();
else
writeMetrics.remoteRequests.mark();

WriteResponseHandlerWrapper wrapper = wrapBatchResponseHandler(mutation,
consistency_level,
batchConsistencyLevel,
Expand Down Expand Up @@ -1976,6 +2012,12 @@ private static PartitionIterator fetchRows(List<SinglePartitionReadCommand> comm
for (int i=0; i<cmdCount; i++)
{
reads[i] = AbstractReadExecutor.getReadExecutor(commands.get(i), consistencyLevel, queryStartNanoTime);

if (canDoLocalRequest(reads[i].getContactedReplicas())) {
readMetrics.localRequests.mark();
} else {
readMetrics.remoteRequests.mark();
}
}

// sends a data request to the closest replica, and a digest request to the others. If we have a speculating
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@
*/
package org.apache.cassandra.service.reads;

import java.util.List;
import java.util.stream.Collectors;

import com.google.common.base.Preconditions;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -207,6 +210,13 @@ public static AbstractReadExecutor getReadExecutor(SinglePartitionReadCommand co
return new SpeculatingReadExecutor(cfs, command, replicaPlan, queryStartNanoTime);
}

public List<String> getContactedReplicas() {
return replicaPlan().contacts()
.stream()
.map(r -> r.endpoint().getHostAddress(true))
.collect(Collectors.toList());
}

/**
* Returns true if speculation should occur and if it should then block until it is time to
* send the speculative reads
Expand Down
221 changes: 221 additions & 0 deletions test/unit/org/apache/cassandra/metrics/ClientRequestMetricsTest.java
Original file line number Diff line number Diff line change
@@ -0,0 +1,221 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.cassandra.metrics;

import java.io.IOException;

import org.junit.BeforeClass;
import org.junit.Test;

import com.datastax.driver.core.BatchStatement;
import com.datastax.driver.core.BoundStatement;
import com.datastax.driver.core.Cluster;
import com.datastax.driver.core.PreparedStatement;
import com.datastax.driver.core.Session;
import org.apache.cassandra.SchemaLoader;
import org.apache.cassandra.config.DatabaseDescriptor;
import org.apache.cassandra.exceptions.ConfigurationException;
import org.apache.cassandra.schema.Schema;
import org.apache.cassandra.service.EmbeddedCassandraService;

import static com.datastax.driver.core.Cluster.*;
import static org.junit.Assert.assertEquals;

public class ClientRequestMetricsTest extends SchemaLoader
{
private static EmbeddedCassandraService cassandra;

private static Cluster cluster;
private static Session session;

private static String KEYSPACE = "junit";
private static final String TABLE = "clientrequestsmetricstest";

private static PreparedStatement writePS;
private static PreparedStatement paxosPS;
private static PreparedStatement readPS;
private static PreparedStatement readRangePS;

private static final ClientRequestMetrics readMetrics = ClientRequestsMetricsHolder.readMetrics;
private static final ClientWriteRequestMetrics writeMetrics = ClientRequestsMetricsHolder.writeMetrics;

@BeforeClass
public static void setup() throws ConfigurationException, IOException
{
Schema.instance.clear();

cassandra = new EmbeddedCassandraService();
cassandra.start();

cluster = builder().addContactPoint("127.0.0.1").withPort(DatabaseDescriptor.getNativeTransportPort()).build();
session = cluster.connect();

session.execute("CREATE KEYSPACE IF NOT EXISTS " + KEYSPACE + " WITH replication = { 'class' : 'SimpleStrategy', 'replication_factor' : 1 };");
session.execute("USE " + KEYSPACE);
session.execute("CREATE TABLE IF NOT EXISTS " + TABLE + " (id int, ord int, val text, PRIMARY KEY (id, ord));");

writePS = session.prepare("INSERT INTO " + KEYSPACE + '.' + TABLE + " (id, ord, val) VALUES (?, ?, ?);");
paxosPS = session.prepare("INSERT INTO " + KEYSPACE + '.' + TABLE + " (id, ord, val) VALUES (?, ?, ?) IF NOT EXISTS;");
readPS = session.prepare("SELECT * FROM " + KEYSPACE + '.' + TABLE + " WHERE id=?;");
readRangePS = session.prepare("SELECT * FROM " + KEYSPACE + '.' + TABLE + " WHERE id=? AND ord>=? AND ord <= ?;");
}

@Test
public void testWriteStatement()
{
ClientRequestMetricsContainer writeMetricsContainer = new ClientRequestMetricsContainer(writeMetrics);
ClientRequestMetricsContainer readMetricsContainer = new ClientRequestMetricsContainer(readMetrics);

executeWrite(1, 1, "aaaa");

assertEquals(1, writeMetricsContainer.compareLocalRequest());
assertEquals(0, writeMetricsContainer.compareRemoteRequest());

assertEquals(0, readMetricsContainer.compareLocalRequest());
assertEquals(0, readMetricsContainer.compareRemoteRequest());
}

@Test
public void testPaxosStatement()
{
ClientRequestMetricsContainer writeMetricsContainer = new ClientRequestMetricsContainer(writeMetrics);
ClientRequestMetricsContainer readMetricsContainer = new ClientRequestMetricsContainer(readMetrics);

executePAXOS(2, 2, "aaaa");

assertEquals(1, readMetricsContainer.compareLocalRequest());
assertEquals(0, readMetricsContainer.compareRemoteRequest());

assertEquals(1, writeMetricsContainer.compareLocalRequest());
assertEquals(0, writeMetricsContainer.compareRemoteRequest());
}

@Test
public void testBatchStatement()
{
ClientRequestMetricsContainer writeMetricsContainer = new ClientRequestMetricsContainer(writeMetrics);
ClientRequestMetricsContainer readMetricsContainer = new ClientRequestMetricsContainer(readMetrics);

executeBatch(10, 10);

assertEquals(0, readMetricsContainer.compareLocalRequest());
assertEquals(0, readMetricsContainer.compareRemoteRequest());

assertEquals(10, writeMetricsContainer.compareLocalRequest());
assertEquals(0, writeMetricsContainer.compareRemoteRequest());
}

@Test
public void testReadStatement()
{
executeWrite(1, 1, "aaaa");

ClientRequestMetricsContainer writeMetricsContainer = new ClientRequestMetricsContainer(writeMetrics);
ClientRequestMetricsContainer readMetricsContainer = new ClientRequestMetricsContainer(readMetrics);

executeRead(1);

assertEquals(1, readMetricsContainer.compareLocalRequest());
assertEquals(0, readMetricsContainer.compareRemoteRequest());

assertEquals(0, writeMetricsContainer.compareLocalRequest());
assertEquals(0, writeMetricsContainer.compareRemoteRequest());
}

@Test
public void testRangeStatement()
{
executeBatch(1, 100);

ClientRequestMetricsContainer writeMetricsContainer = new ClientRequestMetricsContainer(writeMetrics);
ClientRequestMetricsContainer readMetricsContainer = new ClientRequestMetricsContainer(readMetrics);

executeSlice(1, 0, 99);

assertEquals(1, readMetricsContainer.compareLocalRequest());
assertEquals(0, readMetricsContainer.compareRemoteRequest());

assertEquals(0, writeMetricsContainer.compareLocalRequest());
assertEquals(0, writeMetricsContainer.compareRemoteRequest());
}


private static class ClientRequestMetricsContainer
{
private ClientRequestMetrics metrics;

private long localRequests;
private long remoteRequests;

public ClientRequestMetricsContainer(ClientRequestMetrics clientRequestMetrics)
{
metrics = clientRequestMetrics;
localRequests = metrics.localRequests.getCount();
remoteRequests = metrics.remoteRequests.getCount();
}

public long compareLocalRequest()
{
return metrics.localRequests.getCount() - localRequests;
}

public long compareRemoteRequest()
{
return metrics.remoteRequests.getCount() - remoteRequests;
}
}

private void executeWrite(int id, int ord, String val)
{
BoundStatement bs = writePS.bind(id, ord, val);
session.execute(bs);
}

private void executePAXOS(int id, int ord, String val)
{
BoundStatement bs = paxosPS.bind(id, ord, val);
session.execute(bs);
}

private void executeBatch(int distinctPartitions, int numClusteringKeys)
{
BatchStatement batch = new BatchStatement();

for (int i = 0; i < distinctPartitions; i++)
{
for (int y = 0; y < numClusteringKeys; y++)
{
batch.add(writePS.bind(i, y, "aaaaaaaa"));
}
}
session.execute(batch);
}

private void executeRead(int id)
{
BoundStatement bs = readPS.bind(id);
session.execute(bs);
}

private void executeSlice(int id, int start_range, int end_range)
{
BoundStatement bs = readRangePS.bind(id, start_range, end_range);
session.execute(bs);
}
}

0 comments on commit c64ff69

Please sign in to comment.