From 105214dafbb4e1b1ba80bf27a792d9b17fa49e13 Mon Sep 17 00:00:00 2001 From: Chris Lohfink Date: Wed, 2 May 2018 00:52:16 -0700 Subject: [PATCH 1/2] Allow noarg toppartitions for CASSANDRA-14360 --- .../cassandra/service/StorageService.java | 36 +++++++ .../service/StorageServiceMBean.java | 6 ++ .../org/apache/cassandra/tools/NodeProbe.java | 21 ++-- .../tools/nodetool/TopPartitions.java | 99 ++++++++++++------- .../service/StorageServiceServerTest.java | 1 + .../cassandra/tools/TopPartitionsTest.java | 66 +++++++++++++ 6 files changed, 186 insertions(+), 43 deletions(-) create mode 100644 test/unit/org/apache/cassandra/tools/TopPartitionsTest.java diff --git a/src/java/org/apache/cassandra/service/StorageService.java b/src/java/org/apache/cassandra/service/StorageService.java index ce1f9ad71a14..f8d6831433ff 100644 --- a/src/java/org/apache/cassandra/service/StorageService.java +++ b/src/java/org/apache/cassandra/service/StorageService.java @@ -33,9 +33,13 @@ import javax.annotation.Nullable; import javax.management.*; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.CompositeDataSupport; +import javax.management.openmbean.OpenDataException; import javax.management.openmbean.TabularData; import javax.management.openmbean.TabularDataSupport; +import com.clearspring.analytics.stream.Counter; import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Predicate; import com.google.common.collect.*; @@ -73,6 +77,7 @@ import org.apache.cassandra.io.util.FileUtils; import org.apache.cassandra.locator.*; import org.apache.cassandra.metrics.StorageMetrics; +import org.apache.cassandra.metrics.TableMetrics.Sampler; import org.apache.cassandra.net.*; import org.apache.cassandra.repair.*; import org.apache.cassandra.repair.messages.RepairOption; @@ -95,6 +100,7 @@ import org.apache.cassandra.tracing.TraceKeyspace; import org.apache.cassandra.transport.ProtocolVersion; import org.apache.cassandra.utils.*; +import org.apache.cassandra.utils.TopKSampler.SamplerResult; import org.apache.cassandra.utils.logging.LoggingSupportFactory; import org.apache.cassandra.utils.progress.ProgressEvent; import org.apache.cassandra.utils.progress.ProgressEventType; @@ -5217,6 +5223,36 @@ public List sampleKeyRange() // do not rename to getter - see CASSANDRA- return sampledKeys; } + /* + * little hard to parse for JMX MBean requirements, but the output looks something like: + * + * {"keyspace.table": + * {"SAMPLER": [{cardinality:i partitions: [{raw:"", string:"", count:i, error:i}, ...]}, ...]} + * } + */ + @Override + public Map> samplePartitions(long duration, int capacity, int count, + List samplers) throws OpenDataException + { + for (String sampler : samplers) + { + for (ColumnFamilyStore table : ColumnFamilyStore.all()) + { + table.beginLocalSampling(sampler, capacity); + } + } + Uninterruptibles.sleepUninterruptibly(duration, TimeUnit.MILLISECONDS); + ConcurrentHashMap> result = new ConcurrentHashMap<>(); + for (String sampler : samplers) + for (ColumnFamilyStore table : ColumnFamilyStore.all()) + { + String name = table.keyspace.getName() + "." + table.name; + Map topk = result.computeIfAbsent(name, x -> new HashMap<>()); + topk.put(sampler, table.finishLocalSampling(sampler, count)); + } + return result; + } + public void rebuildSecondaryIndex(String ksName, String cfName, String... idxNames) { String[] indices = asList(idxNames).stream() diff --git a/src/java/org/apache/cassandra/service/StorageServiceMBean.java b/src/java/org/apache/cassandra/service/StorageServiceMBean.java index 0676419f3bb5..9e917dd7f9f0 100644 --- a/src/java/org/apache/cassandra/service/StorageServiceMBean.java +++ b/src/java/org/apache/cassandra/service/StorageServiceMBean.java @@ -27,8 +27,12 @@ import java.util.concurrent.TimeoutException; import javax.annotation.Nullable; import javax.management.NotificationEmitter; +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.OpenDataException; import javax.management.openmbean.TabularData; +import org.apache.cassandra.metrics.TableMetrics.Sampler; + public interface StorageServiceMBean extends NotificationEmitter { /** @@ -614,6 +618,8 @@ public interface StorageServiceMBean extends NotificationEmitter */ public void setTraceProbability(double probability); + public Map> samplePartitions(long duration, int capacity, int count, List samplers) throws OpenDataException; + /** * Returns the configured tracing probability. */ diff --git a/src/java/org/apache/cassandra/tools/NodeProbe.java b/src/java/org/apache/cassandra/tools/NodeProbe.java index 1bace2e43733..f75185da70d4 100644 --- a/src/java/org/apache/cassandra/tools/NodeProbe.java +++ b/src/java/org/apache/cassandra/tools/NodeProbe.java @@ -89,6 +89,7 @@ import com.google.common.base.Function; import com.google.common.base.Strings; +import com.google.common.collect.ImmutableMap; import com.google.common.collect.Iterables; import com.google.common.collect.Maps; import com.google.common.collect.Multimap; @@ -413,21 +414,27 @@ public void repairAsync(final PrintStream out, final String keyspace, Map> getPartitionSample(int capacity, int duration, int count, List samplers) throws OpenDataException + { + return ssProxy.samplePartitions(duration, capacity, count, samplers); + } - public Map getPartitionSample(String ks, String cf, int capacity, int duration, int count, List samplers) throws OpenDataException + public Map> getPartitionSample(String ks, String cf, int capacity, int duration, int count, List samplers) throws OpenDataException { ColumnFamilyStoreMBean cfsProxy = getCfsProxy(ks, cf); - for(Sampler sampler : samplers) + for(String sampler : samplers) { - cfsProxy.beginLocalSampling(sampler.name(), capacity); + cfsProxy.beginLocalSampling(sampler, capacity); } Uninterruptibles.sleepUninterruptibly(duration, TimeUnit.MILLISECONDS); - Map result = Maps.newHashMap(); - for(Sampler sampler : samplers) + Map result = Maps.newHashMap(); + for(String sampler : samplers) { - result.put(sampler, cfsProxy.finishLocalSampling(sampler.name(), count)); + result.put(sampler, cfsProxy.finishLocalSampling(sampler, count)); } - return result; + return new ImmutableMap.Builder>() + .put(ks + "." + cf, result) + .build(); } public void invalidateCounterCache() diff --git a/src/java/org/apache/cassandra/tools/nodetool/TopPartitions.java b/src/java/org/apache/cassandra/tools/nodetool/TopPartitions.java index 73bf2fbce135..c977f1c04c29 100644 --- a/src/java/org/apache/cassandra/tools/nodetool/TopPartitions.java +++ b/src/java/org/apache/cassandra/tools/nodetool/TopPartitions.java @@ -19,9 +19,6 @@ import static com.google.common.base.Preconditions.checkArgument; import static org.apache.commons.lang3.StringUtils.join; -import io.airlift.airline.Arguments; -import io.airlift.airline.Command; -import io.airlift.airline.Option; import java.util.ArrayList; import java.util.Collections; @@ -37,14 +34,20 @@ import org.apache.cassandra.metrics.TableMetrics.Sampler; import org.apache.cassandra.tools.NodeProbe; import org.apache.cassandra.tools.NodeTool.NodeToolCmd; +import org.apache.cassandra.tools.nodetool.formatter.TableBuilder; +import org.apache.cassandra.utils.Pair; import com.google.common.collect.Lists; import com.google.common.collect.Ordering; -@Command(name = "toppartitions", description = "Sample and print the most active partitions for a given column family") +import io.airlift.airline.Arguments; +import io.airlift.airline.Command; +import io.airlift.airline.Option; + +@Command(name = "toppartitions", description = "Sample and print the most active partitions") public class TopPartitions extends NodeToolCmd { - @Arguments(usage = " ", description = "The keyspace, column family name, and duration in milliseconds") + @Arguments(usage = "[keyspace table] [duration]", description = "The keyspace, table name, and duration in milliseconds") private List args = new ArrayList<>(); @Option(name = "-s", description = "Capacity of stream summary, closer to the actual cardinality of partitions will yield more accurate results (Default: 256)") private int size = 256; @@ -55,63 +58,87 @@ public class TopPartitions extends NodeToolCmd @Override public void execute(NodeProbe probe) { - checkArgument(args.size() == 3, "toppartitions requires keyspace, column family name, and duration"); + checkArgument(args.size() == 3 || args.size() == 1 || args.size() == 0, "Invalid arguments, either [keyspace table duration] or [duration] or no args"); checkArgument(topCount < size, "TopK count (-k) option must be smaller then the summary capacity (-s)"); - String keyspace = args.get(0); - String cfname = args.get(1); - Integer duration = Integer.valueOf(args.get(2)); + String keyspace = null; + String table = null; + Integer duration = 10000; + if(args.size() == 3) + { + keyspace = args.get(0); + table = args.get(1); + duration = Integer.valueOf(args.get(2)); + } + else if (args.size() == 1) + { + duration = Integer.valueOf(args.get(0)); + } // generate the list of samplers - List targets = Lists.newArrayList(); + List targets = Lists.newArrayList(); for (String s : samplers.split(",")) { try { - targets.add(Sampler.valueOf(s.toUpperCase())); + targets.add(Sampler.valueOf(s.toUpperCase()).toString()); } catch (Exception e) { throw new IllegalArgumentException(s + " is not a valid sampler, choose one of: " + join(Sampler.values(), ", ")); } } - Map results; + Map> results; try { - results = probe.getPartitionSample(keyspace, cfname, size, duration, topCount, targets); + if (keyspace == null) + { + results = probe.getPartitionSample(size, duration, topCount, targets); + } + else + { + results = probe.getPartitionSample(keyspace, table, size, duration, topCount, targets); + } } catch (OpenDataException e) { throw new RuntimeException(e); } boolean first = true; - for(Entry result : results.entrySet()) + for(String sampler : targets) { - CompositeData sampling = result.getValue(); - // weird casting for http://bugs.sun.com/view_bug.do?bug_id=6548436 - List topk = (List) (Object) Lists.newArrayList(((TabularDataSupport) sampling.get("partitions")).values()); - Collections.sort(topk, new Ordering() + if(!first) + System.out.println(); + first = false; + System.out.printf(sampler + " Sampler Top %d partitions:%n", topCount); + TableBuilder out = new TableBuilder(); + out.add("\t", "Table", "Partition", "Count", "+/-"); + List> topk = new ArrayList<>(topCount); + for (Entry> tableResult : results.entrySet()) + { + String tableName = tableResult.getKey(); + CompositeData sampling = tableResult.getValue().get(sampler); + // weird casting for http://bugs.sun.com/view_bug.do?bug_id=6548436 + for(CompositeData cd : (List) (Object) Lists.newArrayList(((TabularDataSupport) sampling.get("partitions")).values())) + { + topk.add(Pair.create(tableName, cd)); + } + } + Collections.sort(topk, new Ordering>() { - public int compare(CompositeData left, CompositeData right) + public int compare(Pair left, Pair right) { - return Long.compare((long) right.get("count"), (long) left.get("count")); + return Long.compare((long) right.right.get("count"), (long) left.right.get("count")); } }); - if(!first) - System.out.println(); - System.out.println(result.getKey().toString()+ " Sampler:"); - System.out.printf(" Cardinality: ~%d (%d capacity)%n", sampling.get("cardinality"), size); - System.out.printf(" Top %d partitions:%n", topCount); - if (topk.size() == 0) + for (Pair entry : topk.subList(0, Math.min(topk.size(), 10))) { - System.out.println("\tNothing recorded during sampling period..."); - } else + CompositeData cd = entry.right; + out.add("\t", entry.left, cd.get("string").toString(), cd.get("count").toString(), cd.get("error").toString()); + } + out.printTo(System.out); + if (topk.size() == 0) { - int offset = 0; - for (CompositeData entry : topk) - offset = Math.max(offset, entry.get("string").toString().length()); - System.out.printf("\t%-" + offset + "s%10s%10s%n", "Partition", "Count", "+/-"); - for (CompositeData entry : topk) - System.out.printf("\t%-" + offset + "s%10d%10d%n", entry.get("string").toString(), entry.get("count"), entry.get("error")); + System.out.println("\t Nothing recorded during sampling period..."); } - first = false; } } -} \ No newline at end of file +} + diff --git a/test/unit/org/apache/cassandra/service/StorageServiceServerTest.java b/test/unit/org/apache/cassandra/service/StorageServiceServerTest.java index 3884f5adb70a..01bfae80f387 100644 --- a/test/unit/org/apache/cassandra/service/StorageServiceServerTest.java +++ b/test/unit/org/apache/cassandra/service/StorageServiceServerTest.java @@ -592,4 +592,5 @@ public void testCreateRepairRangeFrom() throws Exception repairRangeFrom = StorageService.instance.createRepairRangeFrom("2000", "2000"); assert repairRangeFrom.size() == 0; } + } diff --git a/test/unit/org/apache/cassandra/tools/TopPartitionsTest.java b/test/unit/org/apache/cassandra/tools/TopPartitionsTest.java new file mode 100644 index 000000000000..bf9d79baa1e2 --- /dev/null +++ b/test/unit/org/apache/cassandra/tools/TopPartitionsTest.java @@ -0,0 +1,66 @@ +package org.apache.cassandra.tools; + +import static java.lang.String.format; +import static org.apache.cassandra.cql3.QueryProcessor.executeInternal; +import static org.junit.Assert.assertEquals; + +import java.util.List; +import java.util.Map; +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.BlockingQueue; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; + +import javax.management.openmbean.CompositeData; +import javax.management.openmbean.TabularDataSupport; + +import org.apache.cassandra.SchemaLoader; +import org.apache.cassandra.db.ColumnFamilyStore; +import org.apache.cassandra.db.SystemKeyspace; +import org.apache.cassandra.exceptions.ConfigurationException; +import org.apache.cassandra.service.StorageService; +import org.junit.BeforeClass; +import org.junit.Test; + +import com.google.common.collect.Lists; + +public class TopPartitionsTest +{ + @BeforeClass + public static void loadSchema() throws ConfigurationException + { + SchemaLoader.prepareServer(); + } + @Test + public void testServiceTopPartitionsNoArg() throws Exception + { + BlockingQueue>> q = new ArrayBlockingQueue<>(1); + ColumnFamilyStore.all(); + Executors.newCachedThreadPool().execute(() -> + { + try + { + q.put(StorageService.instance.samplePartitions(1000, 100, 10, Lists.newArrayList("READS", "WRITES"))); + } + catch (Exception e) + { + e.printStackTrace(); + } + }); + SystemKeyspace.persistLocalMetadata(); + Map> result = q.poll(11, TimeUnit.SECONDS); + List cd = (List) (Object) Lists.newArrayList(((TabularDataSupport) result.get("system.local").get("WRITES").get("partitions")).values()); + assertEquals(1, cd.size()); + } + + @Test + public void testServiceTopPartitionsSingleTable() throws Exception + { + ColumnFamilyStore.getIfExists("system", "local").beginLocalSampling("READS", 5); + String req = "SELECT * FROM system.%s WHERE key='%s'"; + executeInternal(format(req, SystemKeyspace.LOCAL, SystemKeyspace.LOCAL)); + CompositeData result = ColumnFamilyStore.getIfExists("system", "local").finishLocalSampling("READS", 5); + List cd = (List) (Object) Lists.newArrayList(((TabularDataSupport) result.get("partitions")).values()); + assertEquals(1, cd.size()); + } +} From 24d35fc61e70e084d0a0d715ef65b7c4e299f738 Mon Sep 17 00:00:00 2001 From: Chris Lohfink Date: Thu, 10 May 2018 13:53:58 -0500 Subject: [PATCH 2/2] add dtests --- .circleci/config.yml | 10 ++++------ 1 file changed, 4 insertions(+), 6 deletions(-) diff --git a/.circleci/config.yml b/.circleci/config.yml index f881b709bdfe..855a58a22478 100644 --- a/.circleci/config.yml +++ b/.circleci/config.yml @@ -58,16 +58,14 @@ with_dtest_jobs_only: &with_dtest_jobs_only - build # Set env_settings, env_vars, and workflows/build_and_run_tests based on environment env_settings: &env_settings - <<: *default_env_settings - #<<: *high_capacity_env_settings + <<: *high_capacity_env_settings env_vars: &env_vars - <<: *resource_constrained_env_vars - #<<: *high_capacity_env_vars + <<: *high_capacity_env_vars workflows: version: 2 - build_and_run_tests: *default_jobs + #build_and_run_tests: *default_jobs #build_and_run_tests: *with_dtest_jobs_only - #build_and_run_tests: *with_dtest_jobs + build_and_run_tests: *with_dtest_jobs docker_image: &docker_image kjellman/cassandra-test:0.4.3 version: 2 jobs: