Skip to content
Permalink
Browse files
Merge branch 'cassandra-4.1' into trunk
  • Loading branch information
David Capwell committed May 10, 2022
2 parents 53a67ff + 6396562 commit 95848870403413e216135976a780596b8085b8d3
Showing 7 changed files with 187 additions and 15 deletions.
@@ -2,6 +2,7 @@
* Add guardrail for ALTER TABLE ADD / DROP / REMOVE column operations (CASSANDRA-17495)
* Rename DisableFlag class to EnableFlag on guardrails (CASSANDRA-17544)
Merged from 4.1:
* StorageService read threshold get methods throw NullPointerException due to not handling null configs (CASSANDRA-17593)
Merged from 4.0:
Merged from 3.11:
Merged from 3.0:
@@ -38,6 +38,8 @@
import java.util.function.Function;
import java.util.function.Supplier;

import javax.annotation.Nullable;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Preconditions;
import com.google.common.collect.ImmutableSet;
@@ -4037,67 +4039,73 @@ public static void setReadThresholdsEnabled(boolean value)
}
}

@Nullable
public static DataStorageSpec getCoordinatorReadSizeWarnThreshold()
{
return conf.coordinator_read_size_warn_threshold;
}

public static void setCoordinatorReadSizeWarnThreshold(DataStorageSpec value)
public static void setCoordinatorReadSizeWarnThreshold(@Nullable DataStorageSpec value)
{
logger.info("updating coordinator_read_size_warn_threshold to {}", value);
conf.coordinator_read_size_warn_threshold = value;
}

@Nullable
public static DataStorageSpec getCoordinatorReadSizeFailThreshold()
{
return conf.coordinator_read_size_fail_threshold;
}

public static void setCoordinatorReadSizeFailThreshold(DataStorageSpec value)
public static void setCoordinatorReadSizeFailThreshold(@Nullable DataStorageSpec value)
{
logger.info("updating coordinator_read_size_fail_threshold to {}", value);
conf.coordinator_read_size_fail_threshold = value;
}

@Nullable
public static DataStorageSpec getLocalReadSizeWarnThreshold()
{
return conf.local_read_size_warn_threshold;
}

public static void setLocalReadSizeWarnThreshold(DataStorageSpec value)
public static void setLocalReadSizeWarnThreshold(@Nullable DataStorageSpec value)
{
logger.info("updating local_read_size_warn_threshold to {}", value);
conf.local_read_size_warn_threshold = value;
}

@Nullable
public static DataStorageSpec getLocalReadSizeFailThreshold()
{
return conf.local_read_size_fail_threshold;
}

public static void setLocalReadSizeFailThreshold(DataStorageSpec value)
public static void setLocalReadSizeFailThreshold(@Nullable DataStorageSpec value)
{
logger.info("updating local_read_size_fail_threshold to {}", value);
conf.local_read_size_fail_threshold = value;
}

@Nullable
public static DataStorageSpec getRowIndexReadSizeWarnThreshold()
{
return conf.row_index_read_size_warn_threshold;
}

public static void setRowIndexReadSizeWarnThreshold(DataStorageSpec value)
public static void setRowIndexReadSizeWarnThreshold(@Nullable DataStorageSpec value)
{
logger.info("updating row_index_size_warn_threshold to {}", value);
conf.row_index_read_size_warn_threshold = value;
}

@Nullable
public static DataStorageSpec getRowIndexReadSizeFailThreshold()
{
return conf.row_index_read_size_fail_threshold;
}

public static void setRowIndexReadSizeFailThreshold(DataStorageSpec value)
public static void setRowIndexReadSizeFailThreshold(@Nullable DataStorageSpec value)
{
logger.info("updating row_index_read_size_fail_threshold to {}", value);
conf.row_index_read_size_fail_threshold = value;
@@ -601,7 +601,7 @@ public Collection<Token> getTokens(InetAddressAndPort endpoint)
lock.readLock().lock();
try
{
assert isMember(endpoint); // don't want to return nulls
assert isMember(endpoint): String.format("Unable to get tokens for %s; it is not a member", endpoint); // don't want to return nulls
return new ArrayList<>(tokenToEndpointMap.inverse().get(endpoint));
}
finally
@@ -6440,7 +6440,7 @@ public void setReadThresholdsEnabled(boolean value)
@Override
public String getCoordinatorLargeReadWarnThreshold()
{
return DatabaseDescriptor.getCoordinatorReadSizeWarnThreshold().toString();
return toString(DatabaseDescriptor.getCoordinatorReadSizeWarnThreshold());
}

@Override
@@ -6452,7 +6452,7 @@ public void setCoordinatorLargeReadWarnThreshold(String threshold)
@Override
public String getCoordinatorLargeReadAbortThreshold()
{
return DatabaseDescriptor.getCoordinatorReadSizeFailThreshold().toString();
return toString(DatabaseDescriptor.getCoordinatorReadSizeFailThreshold());
}

@Override
@@ -6464,7 +6464,7 @@ public void setCoordinatorLargeReadAbortThreshold(String threshold)
@Override
public String getLocalReadTooLargeWarnThreshold()
{
return DatabaseDescriptor.getLocalReadSizeWarnThreshold().toString();
return toString(DatabaseDescriptor.getLocalReadSizeWarnThreshold());
}

@Override
@@ -6476,7 +6476,7 @@ public void setLocalReadTooLargeWarnThreshold(String threshold)
@Override
public String getLocalReadTooLargeAbortThreshold()
{
return DatabaseDescriptor.getLocalReadSizeFailThreshold().toString();
return toString(DatabaseDescriptor.getLocalReadSizeFailThreshold());
}

@Override
@@ -6488,7 +6488,7 @@ public void setLocalReadTooLargeAbortThreshold(String threshold)
@Override
public String getRowIndexReadSizeWarnThreshold()
{
return DatabaseDescriptor.getRowIndexReadSizeWarnThreshold().toString();
return toString(DatabaseDescriptor.getRowIndexReadSizeWarnThreshold());
}

@Override
@@ -6500,7 +6500,7 @@ public void setRowIndexReadSizeWarnThreshold(String threshold)
@Override
public String getRowIndexReadSizeAbortThreshold()
{
return DatabaseDescriptor.getRowIndexReadSizeFailThreshold().toString();
return toString(DatabaseDescriptor.getRowIndexReadSizeFailThreshold());
}

@Override
@@ -6509,6 +6509,11 @@ public void setRowIndexReadSizeAbortThreshold(String threshold)
DatabaseDescriptor.setRowIndexReadSizeFailThreshold(parseDataStorageSpec(threshold));
}

private static String toString(DataStorageSpec value)
{
return value == null ? null : value.toString();
}

public void setDefaultKeyspaceReplicationFactor(int value)
{
DatabaseDescriptor.setDefaultKeyspaceRF(value);
@@ -33,6 +33,7 @@
import java.util.Objects;
import java.util.UUID;
import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@@ -750,11 +751,25 @@ public Future<Void> shutdown(boolean graceful)

error = parallelRun(error, executor, StorageService.instance::disableAutoCompaction);

// trigger init early or else it could try to init and touch a thread pool that got shutdown
HintsService hints = HintsService.instance;
ThrowingRunnable shutdownHints = () -> {
// this is to allow shutdown in the case hints were halted already
try
{
HintsService.instance.shutdownBlocking();
}
catch (IllegalStateException e)
{
if (!"HintsService has already been shut down".equals(e.getMessage()))
throw e;
}
};
error = parallelRun(error, executor,
() -> Gossiper.instance.stopShutdownAndWait(1L, MINUTES),
CompactionManager.instance::forceShutdown,
() -> BatchlogManager.instance.shutdownAndWait(1L, MINUTES),
HintsService.instance::shutdownBlocking,
shutdownHints,
() -> CompactionLogger.shutdownNowAndWait(1L, MINUTES),
() -> AuthCache.shutdownAllAndWait(1L, MINUTES),
() -> Sampler.shutdownNowAndWait(1L, MINUTES),
@@ -0,0 +1,143 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.cassandra.distributed.test.jmx;

import java.net.InetAddress;
import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Set;
import java.util.TreeSet;
import javax.management.JMRuntimeException;
import javax.management.MBeanAttributeInfo;
import javax.management.MBeanInfo;
import javax.management.MBeanOperationInfo;
import javax.management.MBeanServerConnection;
import javax.management.ObjectName;
import javax.management.remote.JMXConnector;
import javax.management.remote.JMXConnectorFactory;
import javax.management.remote.JMXConnectorServer;
import javax.management.remote.JMXServiceURL;

import com.google.common.collect.ImmutableSet;
import org.junit.Test;

import org.apache.cassandra.distributed.Cluster;
import org.apache.cassandra.distributed.api.Feature;
import org.apache.cassandra.distributed.test.TestBaseImpl;
import org.apache.cassandra.utils.JMXServerUtils;

import static org.apache.cassandra.config.CassandraRelevantProperties.IS_DISABLED_MBEAN_REGISTRATION;
import static org.apache.cassandra.cql3.CQLTester.getAutomaticallyAllocatedPort;

public class JMXGetterCheckTest extends TestBaseImpl
{
private static final Set<String> IGNORE_ATTRIBUTES = ImmutableSet.of(
"org.apache.cassandra.net:type=MessagingService:BackPressurePerHost" // throws unsupported saying the feature was removed... dropped in CASSANDRA-15375
);
private static final Set<String> IGNORE_OPERATIONS = ImmutableSet.of(
"org.apache.cassandra.db:type=StorageService:stopDaemon", // halts the instance, which then causes the JVM to exit
"org.apache.cassandra.db:type=StorageService:drain", // don't drain, it stops things which can cause other APIs to be unstable as we are in a stopped state
"org.apache.cassandra.db:type=StorageService:stopGossiping" // if we stop gossip this can causes other issues, so avoid
);

@Test
public void test() throws Exception
{
// start JMX server, which the instance will register with
InetAddress loopback = InetAddress.getLoopbackAddress();
String jmxHost = loopback.getHostAddress();
int jmxPort = getAutomaticallyAllocatedPort(loopback);
JMXConnectorServer jmxServer = JMXServerUtils.createJMXServer(jmxPort, true);
jmxServer.start();
String url = "service:jmx:rmi:///jndi/rmi://" + jmxHost + ":" + jmxPort + "/jmxrmi";

IS_DISABLED_MBEAN_REGISTRATION.setBoolean(false);
try (Cluster cluster = Cluster.build(1).withConfig(c -> c.with(Feature.values())).start())
{
List<Named> errors = new ArrayList<>();
try (JMXConnector jmxc = JMXConnectorFactory.connect(new JMXServiceURL(url), null))
{
MBeanServerConnection mbsc = jmxc.getMBeanServerConnection();
Set<ObjectName> metricNames = new TreeSet<>(mbsc.queryNames(null, null));
for (ObjectName name : metricNames)
{
if (!name.getDomain().startsWith("org.apache.cassandra"))
continue;
MBeanInfo info = mbsc.getMBeanInfo(name);
for (MBeanAttributeInfo a : info.getAttributes())
{
String fqn = String.format("%s:%s", name, a.getName());
if (!a.isReadable() || IGNORE_ATTRIBUTES.contains(fqn))
continue;
try
{
mbsc.getAttribute(name, a.getName());
}
catch (JMRuntimeException e)
{
errors.add(new Named(String.format("Attribute %s", fqn), e.getCause()));
}
}

for (MBeanOperationInfo o : info.getOperations())
{
String fqn = String.format("%s:%s", name, o.getName());
if (o.getSignature().length != 0 || IGNORE_OPERATIONS.contains(fqn))
continue;
try
{
mbsc.invoke(name, o.getName(), new Object[0], new String[0]);
}
catch (JMRuntimeException e)
{
errors.add(new Named(String.format("Operation %s", fqn), e.getCause()));
}
}
}
}
if (!errors.isEmpty())
{
AssertionError root = new AssertionError();
errors.forEach(root::addSuppressed);
throw root;
}
}
}

/**
* This class is meant to make new errors easier to read, by adding the JMX endpoint, and cleaning up the unneded JMX/Reflection logic cluttering the stacktrace
*/
private static class Named extends RuntimeException
{
public Named(String msg, Throwable cause)
{
super(msg + "\nCaused by: " + cause.getClass().getCanonicalName() + ": " + cause.getMessage(), cause.getCause());
StackTraceElement[] stack = cause.getStackTrace();
List<StackTraceElement> copy = new ArrayList<>();
for (StackTraceElement s : stack)
{
if (!s.getClassName().startsWith("org.apache.cassandra"))
break;
copy.add(s);
}
Collections.reverse(copy);
setStackTrace(copy.toArray(new StackTraceElement[0]));
}
}
}
@@ -226,7 +226,7 @@ public void useSuperUser()
*
* @return a port number
*/
private static int getAutomaticallyAllocatedPort(InetAddress address)
public static int getAutomaticallyAllocatedPort(InetAddress address)
{
try
{

0 comments on commit 9584887

Please sign in to comment.