From dd2a31217a30e362de7791ea97554264ca2f60c0 Mon Sep 17 00:00:00 2001 From: Alex Petrov Date: Fri, 5 Oct 2018 16:50:52 +0200 Subject: [PATCH 1/4] Avoid running query to self through messaging service --- .../cassandra/net/MessagingService.java | 2 +- .../reads/repair/AbstractReadRepair.java | 24 +++++++++++++++---- 2 files changed, 20 insertions(+), 6 deletions(-) diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java index c6e8496d5b41..50b62f2420a7 100644 --- a/src/java/org/apache/cassandra/net/MessagingService.java +++ b/src/java/org/apache/cassandra/net/MessagingService.java @@ -1078,7 +1078,7 @@ public void sendOneWay(MessageOut message, int id, InetAddressAndPort to) logger.trace("{} sending {} to {}@{}", FBUtilities.getBroadcastAddressAndPort(), message.verb, id, to); if (to.equals(FBUtilities.getBroadcastAddressAndPort())) - logger.trace("Message-to-self {} going over MessagingService", message); + logger.warn("Message-to-self {} going over MessagingService", message); // message sinks are a testing hook for (IMessageSink ms : messageSinks) diff --git a/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java b/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java index b74f8d3b1e1a..2dd27eefa08a 100644 --- a/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java +++ b/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java @@ -30,8 +30,10 @@ import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.ReadCommand; +import org.apache.cassandra.db.ReadExecutionController; import org.apache.cassandra.db.SinglePartitionReadCommand; import org.apache.cassandra.db.partitions.PartitionIterator; +import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; import org.apache.cassandra.exceptions.ReadTimeoutException; import org.apache.cassandra.locator.Endpoints; import org.apache.cassandra.locator.Replica; @@ -102,12 +104,24 @@ void sendReadCommand(Replica to, ReadCallback readCallback, boolean speculative) else type = to.isFull() ? "full" : "transient"; Tracing.trace("Enqueuing {} data read to {}", type, to); } - MessageOut message = command.createMessage(); - // if enabled, request additional info about repaired data from any full replicas - if (command.isTrackingRepairedStatus() && to.isFull()) - message = message.withParameter(ParameterType.TRACK_REPAIRED_DATA, MessagingService.ONE_BYTE); - MessagingService.instance().sendRRWithFailure(message, to.endpoint(), readCallback); + if (to.isSelf()) + { + try (ReadExecutionController executionController = command.executionController(); + UnfilteredPartitionIterator iterator = command.executeLocally(executionController)) + { + readCallback.response(command.createResponse(iterator)); + } + } + else + { + MessageOut message = command.createMessage(); + // if enabled, request additional info about repaired data from any full replicas + if (command.isTrackingRepairedStatus() && to.isFull()) + message = message.withParameter(ParameterType.TRACK_REPAIRED_DATA, MessagingService.ONE_BYTE); + + MessagingService.instance().sendRRWithFailure(message, to.endpoint(), readCallback); + } } abstract Meter getRepairMeter(); From 7259ba5b7d934e394b8d5ed6cc7327494b496833 Mon Sep 17 00:00:00 2001 From: Alex Petrov Date: Fri, 5 Oct 2018 17:39:31 +0200 Subject: [PATCH 2/4] Enable circleci --- .circleci/config.yml | 14 +++++++------- 1 file changed, 7 insertions(+), 7 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index 5a84f724fcf8..94b574ccaca1 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -58,16 +58,16 @@ with_dtest_jobs_only: &with_dtest_jobs_only - build # Set env_settings, env_vars, and workflows/build_and_run_tests based on environment env_settings: &env_settings - <<: *default_env_settings - #<<: *high_capacity_env_settings + #<<: *default_env_settings + <<: *high_capacity_env_settings env_vars: &env_vars - <<: *resource_constrained_env_vars - #<<: *high_capacity_env_vars + #<<: *resource_constrained_env_vars + <<: *high_capacity_env_vars workflows: version: 2 - build_and_run_tests: *default_jobs + #build_and_run_tests: *default_jobs #build_and_run_tests: *with_dtest_jobs_only - #build_and_run_tests: *with_dtest_jobs + build_and_run_tests: *with_dtest_jobs docker_image: &docker_image kjellman/cassandra-test:0.4.3 version: 2 jobs: @@ -206,7 +206,7 @@ jobs: name: Clone Cassandra dtest Repository (via git) command: | export LANG=en_US.UTF-8 - git clone --single-branch --branch master --depth 1 git://github.com/apache/cassandra-dtest.git ~/cassandra-dtest + git clone --single-branch --branch avoid-querying-self-through-ms --depth 1 git://github.com/ifesdjeen/cassandra-dtest.git ~/cassandra-dtest - run: name: Configure virtualenv and python Dependencies command: | From c51a95c846feabd9cbe28ad0d91c78b76d9aafa8 Mon Sep 17 00:00:00 2001 From: Alex Petrov Date: Tue, 9 Oct 2018 18:29:54 +0100 Subject: [PATCH 3/4] =?UTF-8?q?Address=20Ariel=E2=80=99s=20comments?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../cassandra/net/MessagingService.java | 2 +- .../reads/repair/AbstractReadRepair.java | 32 +++++++++---------- 2 files changed, 17 insertions(+), 17 deletions(-) diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java index 50b62f2420a7..766562855eb7 100644 --- a/src/java/org/apache/cassandra/net/MessagingService.java +++ b/src/java/org/apache/cassandra/net/MessagingService.java @@ -1078,7 +1078,7 @@ public void sendOneWay(MessageOut message, int id, InetAddressAndPort to) logger.trace("{} sending {} to {}@{}", FBUtilities.getBroadcastAddressAndPort(), message.verb, id, to); if (to.equals(FBUtilities.getBroadcastAddressAndPort())) - logger.warn("Message-to-self {} going over MessagingService", message); + logger.debug("Message-to-self {} going over MessagingService", message); // message sinks are a testing hook for (IMessageSink ms : messageSinks) diff --git a/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java b/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java index 2dd27eefa08a..2b2d7008a61b 100644 --- a/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java +++ b/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java @@ -25,6 +25,9 @@ import com.codahale.metrics.Meter; import com.google.common.base.Predicates; + +import org.apache.cassandra.concurrent.Stage; +import org.apache.cassandra.concurrent.StageManager; import org.apache.cassandra.config.DatabaseDescriptor; import org.apache.cassandra.db.ColumnFamilyStore; import org.apache.cassandra.db.ConsistencyLevel; @@ -42,6 +45,7 @@ import org.apache.cassandra.net.MessageOut; import org.apache.cassandra.net.MessagingService; import org.apache.cassandra.net.ParameterType; +import org.apache.cassandra.service.StorageProxy; import org.apache.cassandra.service.reads.DataResolver; import org.apache.cassandra.service.reads.DigestResolver; import org.apache.cassandra.service.reads.ReadCallback; @@ -89,6 +93,13 @@ protected P replicaPlan() void sendReadCommand(Replica to, ReadCallback readCallback, boolean speculative) { ReadCommand command = this.command; + + if (to.isSelf()) + { + StageManager.getStage(Stage.READ).maybeExecuteImmediately(new StorageProxy.LocalReadRunnable(command, readCallback)); + return; + } + if (to.isTransient()) { // It's OK to send queries to transient nodes during RR, as we may have contacted them for their data request initially @@ -105,23 +116,12 @@ void sendReadCommand(Replica to, ReadCallback readCallback, boolean speculative) Tracing.trace("Enqueuing {} data read to {}", type, to); } - if (to.isSelf()) - { - try (ReadExecutionController executionController = command.executionController(); - UnfilteredPartitionIterator iterator = command.executeLocally(executionController)) - { - readCallback.response(command.createResponse(iterator)); - } - } - else - { - MessageOut message = command.createMessage(); - // if enabled, request additional info about repaired data from any full replicas - if (command.isTrackingRepairedStatus() && to.isFull()) - message = message.withParameter(ParameterType.TRACK_REPAIRED_DATA, MessagingService.ONE_BYTE); + MessageOut message = command.createMessage(); + // if enabled, request additional info about repaired data from any full replicas + if (command.isTrackingRepairedStatus() && to.isFull()) + message = message.withParameter(ParameterType.TRACK_REPAIRED_DATA, MessagingService.ONE_BYTE); - MessagingService.instance().sendRRWithFailure(message, to.endpoint(), readCallback); - } + MessagingService.instance().sendRRWithFailure(message, to.endpoint(), readCallback); } abstract Meter getRepairMeter(); From 91caf766ad3707341f206cc8e1adb671e253bd8b Mon Sep 17 00:00:00 2001 From: Alex Petrov Date: Wed, 10 Oct 2018 12:12:45 +0100 Subject: [PATCH 4/4] Avoid using logs as a reliable source of information for testing purposes --- src/java/org/apache/cassandra/net/MessagingService.java | 2 +- .../cassandra/service/reads/repair/AbstractReadRepair.java | 3 --- 2 files changed, 1 insertion(+), 4 deletions(-) diff --git a/src/java/org/apache/cassandra/net/MessagingService.java b/src/java/org/apache/cassandra/net/MessagingService.java index 766562855eb7..c6e8496d5b41 100644 --- a/src/java/org/apache/cassandra/net/MessagingService.java +++ b/src/java/org/apache/cassandra/net/MessagingService.java @@ -1078,7 +1078,7 @@ public void sendOneWay(MessageOut message, int id, InetAddressAndPort to) logger.trace("{} sending {} to {}@{}", FBUtilities.getBroadcastAddressAndPort(), message.verb, id, to); if (to.equals(FBUtilities.getBroadcastAddressAndPort())) - logger.debug("Message-to-self {} going over MessagingService", message); + logger.trace("Message-to-self {} going over MessagingService", message); // message sinks are a testing hook for (IMessageSink ms : messageSinks) diff --git a/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java b/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java index 2b2d7008a61b..761ffb0233a6 100644 --- a/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java +++ b/src/java/org/apache/cassandra/service/reads/repair/AbstractReadRepair.java @@ -33,10 +33,8 @@ import org.apache.cassandra.db.ConsistencyLevel; import org.apache.cassandra.db.Keyspace; import org.apache.cassandra.db.ReadCommand; -import org.apache.cassandra.db.ReadExecutionController; import org.apache.cassandra.db.SinglePartitionReadCommand; import org.apache.cassandra.db.partitions.PartitionIterator; -import org.apache.cassandra.db.partitions.UnfilteredPartitionIterator; import org.apache.cassandra.exceptions.ReadTimeoutException; import org.apache.cassandra.locator.Endpoints; import org.apache.cassandra.locator.Replica; @@ -115,7 +113,6 @@ void sendReadCommand(Replica to, ReadCallback readCallback, boolean speculative) else type = to.isFull() ? "full" : "transient"; Tracing.trace("Enqueuing {} data read to {}", type, to); } - MessageOut message = command.createMessage(); // if enabled, request additional info about repaired data from any full replicas if (command.isTrackingRepairedStatus() && to.isFull())