Skip to content
Permalink
Browse files
Reformat code according to Apache Cassandra styleguide
  • Loading branch information
smiklosovic authored and ifesdjeen committed Apr 2, 2020
1 parent 9da7b7a commit e620eb34b73cb3fd3e3a5eeebcd2f5553922a91c
Showing 22 changed files with 285 additions and 166 deletions.
@@ -18,7 +18,8 @@

package org.apache.cassandra.distributed.api;

public enum ConsistencyLevel {
public enum ConsistencyLevel
{
ANY,
ONE,
TWO,
@@ -26,7 +26,8 @@
import java.nio.file.Paths;
import java.util.stream.Stream;

public interface ICluster<I extends IInstance> extends AutoCloseable {
public interface ICluster<I extends IInstance> extends AutoCloseable
{
void startup();

I bootstrap(IInstanceConfig config);
@@ -51,47 +52,56 @@

IMessageFilters filters();

static void setup() throws Throwable {
static void setup() throws Throwable
{
setupLogging();
setSystemProperties();
nativeLibraryWorkaround();
processReaperWorkaround();
}

static void nativeLibraryWorkaround() {
static void nativeLibraryWorkaround()
{
// Disable the Netty tcnative library otherwise the io.netty.internal.tcnative.CertificateCallbackTask,
// CertificateVerifierTask, SSLPrivateKeyMethodDecryptTask, SSLPrivateKeyMethodSignTask,
// SSLPrivateKeyMethodTask, and SSLTask hold a gcroot against the InstanceClassLoader.
System.setProperty("cassandra.disable_tcactive_openssl", "true");
System.setProperty("io.netty.transport.noNative", "true");
}

static void processReaperWorkaround() throws Throwable {
static void processReaperWorkaround() throws Throwable
{
// Make sure the 'process reaper' thread is initially created under the main classloader,
// otherwise it gets created with the contextClassLoader pointing to an InstanceClassLoader
// which prevents it from being garbage collected.
new ProcessBuilder().command("true").start().waitFor();
}

static void setSystemProperties() {
static void setSystemProperties()
{
System.setProperty("cassandra.ring_delay_ms", Integer.toString(30 * 1000));
System.setProperty("org.apache.cassandra.disable_mbean_registration", "true");
}

static void setupLogging() {
try {
static void setupLogging()
{
try
{
File root = Files.createTempDirectory("in-jvm-dtest").toFile();
root.deleteOnExit();
String testConfPath = "test/conf/logback-dtest.xml";
Path logConfPath = Paths.get(root.getPath(), "/logback-dtest.xml");

if (!logConfPath.toFile().exists()) {
if (!logConfPath.toFile().exists())
{
Files.copy(new File(testConfPath).toPath(),
logConfPath);
}

System.setProperty("logback.configurationFile", "file://" + logConfPath);
} catch (IOException e) {
}
catch (IOException e)
{
throw new RuntimeException(e);
}
}
@@ -29,10 +29,14 @@ default Object[][] execute(String query, ConsistencyLevel consistencyLevel, Obje
{
return executeWithResult(query, consistencyLevel, boundValues).toObjectArrays();
}

QueryResult executeWithResult(String query, ConsistencyLevel consistencyLevel, Object... boundValues);

Iterator<Object[]> executeWithPaging(String query, ConsistencyLevel consistencyLevel, int pageSize, Object... boundValues);

Future<Object[][]> asyncExecuteWithTracing(UUID sessionId, String query, ConsistencyLevel consistencyLevel, Object... boundValues);

Object[][] executeWithTracing(UUID sessionId, String query, ConsistencyLevel consistencyLevel, Object... boundValues);

IInstance instance();
}
@@ -26,47 +26,61 @@
public interface IInstance extends IIsolatedExecutor
{
ICoordinator coordinator();

IListen listen();

void schemaChangeInternal(String query);

public Object[][] executeInternal(String query, Object... args);

IInstanceConfig config();

InetSocketAddress broadcastAddress();

UUID schemaVersion();

void startup();

boolean isShutdown();

Future<Void> shutdown();

Future<Void> shutdown(boolean graceful);

int liveMemberCount();

NodeToolResult nodetoolResult(boolean withNotifications, String... commandAndArgs);

default NodeToolResult nodetoolResult(String... commandAndArgs)
{
return nodetoolResult(true, commandAndArgs);
}
default int nodetool(String... commandAndArgs) {

default int nodetool(String... commandAndArgs)
{
return nodetoolResult(commandAndArgs).getRc();
}

void uncaughtException(Thread t, Throwable e);

/**
* Return the number of times the instance tried to call {@link System#exit(int)}.
*
* <p>
* When the instance is shutdown, this state should be saved, but in case not possible should return {@code -1}
* to indicate "unknown".
*/
long killAttempts();

// these methods are not for external use, but for simplicity we leave them public and on the normal IInstance interface
void startup(ICluster cluster);

void receiveMessage(IMessage message);

int getMessagingVersion();

void setMessagingVersion(InetSocketAddress addressAndPort, int version);

void flush(String keyspace);

void forceCompact(String keyspace, String table);
}
@@ -31,31 +31,41 @@
public interface IInstanceConfig
{
IInstanceConfig with(Feature featureFlag);

IInstanceConfig with(Feature... flags);

int num();

UUID hostId();

InetSocketAddress broadcastAddress();

NetworkTopology networkTopology();

String localRack();

String localDatacenter();

/**
* write the specified parameters to the Config object; we do not specify Config as the type to support a Config
* from any ClassLoader; the implementation must not directly access any fields of the Object, or cast it, but
* must use the reflection API to modify the state
*/
void propagate(Object writeToConfig, Map<Class<?>, Function<Object, Object>>executor);
void propagate(Object writeToConfig, Map<Class<?>, Function<Object, Object>> executor);

/**
* Validates whether the config properties are within range of accepted values.
*/
void validate();

IInstanceConfig set(String fieldName, Object value);

Object get(String fieldName);

String getString(String fieldName);

int getInt(String fieldName);

boolean has(Feature featureFlag);

public IInstanceConfig forVersion(Versions.Major major);
@@ -77,8 +87,8 @@ public ParameterizedClass(String class_name, Map<String, String> parameters)
@SuppressWarnings("unchecked")
public ParameterizedClass(Map<String, ?> p)
{
this((String)p.get(CLASS_NAME),
p.containsKey(PARAMETERS) ? (Map<String, String>)((List<?>)p.get(PARAMETERS)).get(0) : null);
this((String) p.get(CLASS_NAME),
p.containsKey(PARAMETERS) ? (Map<String, String>) ((List<?>) p.get(PARAMETERS)).get(0) : null);
}

@Override
@@ -98,5 +108,4 @@ public String toString()
return class_name + (parameters == null ? "" : parameters.toString());
}
}

}
@@ -25,43 +25,41 @@
import java.util.function.Consumer;
import java.util.function.Function;

import org.apache.cassandra.distributed.api.IInstance;

/**
* This version is only supported for a Cluster running the same code as the test environment, and permits
* ergonomic cross-node behaviours, without editing the cross-version API.
*
* <p>
* A lambda can be written tto be invoked on any or all of the nodes.
*
* <p>
* The reason this cannot (easily) be made cross-version is that the lambda is tied to the declaring class, which will
* not be the same in the alternate version. Even were it not, there would likely be a runtime linkage error given
* any code divergence.
*/
public interface IInvokableInstance extends IInstance
{
public default <O> CallableNoExcept<Future<O>> asyncCallsOnInstance(SerializableCallable<O> call) { return async(transfer(call)); }
public default <O> CallableNoExcept<O> callsOnInstance(SerializableCallable<O> call) { return sync(transfer(call)); }
public default <O> O callOnInstance(SerializableCallable<O> call) { return callsOnInstance(call).call(); }
default <O> CallableNoExcept<Future<O>> asyncCallsOnInstance(SerializableCallable<O> call) { return async(transfer(call)); }
default <O> CallableNoExcept<O> callsOnInstance(SerializableCallable<O> call) { return sync(transfer(call)); }
default <O> O callOnInstance(SerializableCallable<O> call) { return callsOnInstance(call).call(); }

public default CallableNoExcept<Future<?>> asyncRunsOnInstance(SerializableRunnable run) { return async(transfer(run)); }
public default Runnable runsOnInstance(SerializableRunnable run) { return sync(transfer(run)); }
public default void runOnInstance(SerializableRunnable run) { runsOnInstance(run).run(); }
default CallableNoExcept<Future<?>> asyncRunsOnInstance(SerializableRunnable run) { return async(transfer(run)); }
default Runnable runsOnInstance(SerializableRunnable run) { return sync(transfer(run)); }
default void runOnInstance(SerializableRunnable run) { runsOnInstance(run).run(); }

public default <I> Function<I, Future<?>> asyncAcceptsOnInstance(SerializableConsumer<I> consumer) { return async(transfer(consumer)); }
public default <I> Consumer<I> acceptsOnInstance(SerializableConsumer<I> consumer) { return sync(transfer(consumer)); }
default <I> Function<I, Future<?>> asyncAcceptsOnInstance(SerializableConsumer<I> consumer) { return async(transfer(consumer)); }
default <I> Consumer<I> acceptsOnInstance(SerializableConsumer<I> consumer) { return sync(transfer(consumer)); }

public default <I1, I2> BiFunction<I1, I2, Future<?>> asyncAcceptsOnInstance(SerializableBiConsumer<I1, I2> consumer) { return async(transfer(consumer)); }
public default <I1, I2> BiConsumer<I1, I2> acceptsOnInstance(SerializableBiConsumer<I1, I2> consumer) { return sync(transfer(consumer)); }
default <I1, I2> BiFunction<I1, I2, Future<?>> asyncAcceptsOnInstance(SerializableBiConsumer<I1, I2> consumer) { return async(transfer(consumer)); }
default <I1, I2> BiConsumer<I1, I2> acceptsOnInstance(SerializableBiConsumer<I1, I2> consumer) { return sync(transfer(consumer)); }

public default <I, O> Function<I, Future<O>> asyncAppliesOnInstance(SerializableFunction<I, O> f) { return async(transfer(f)); }
public default <I, O> Function<I, O> appliesOnInstance(SerializableFunction<I, O> f) { return sync(transfer(f)); }
default <I, O> Function<I, Future<O>> asyncAppliesOnInstance(SerializableFunction<I, O> f) { return async(transfer(f)); }
default <I, O> Function<I, O> appliesOnInstance(SerializableFunction<I, O> f) { return sync(transfer(f)); }

public default <I1, I2, O> BiFunction<I1, I2, Future<O>> asyncAppliesOnInstance(SerializableBiFunction<I1, I2, O> f) { return async(transfer(f)); }
public default <I1, I2, O> BiFunction<I1, I2, O> appliesOnInstance(SerializableBiFunction<I1, I2, O> f) { return sync(transfer(f)); }
default <I1, I2, O> BiFunction<I1, I2, Future<O>> asyncAppliesOnInstance(SerializableBiFunction<I1, I2, O> f) { return async(transfer(f)); }
default <I1, I2, O> BiFunction<I1, I2, O> appliesOnInstance(SerializableBiFunction<I1, I2, O> f) { return sync(transfer(f)); }

public default <I1, I2, I3, O> TriFunction<I1, I2, I3, Future<O>> asyncAppliesOnInstance(SerializableTriFunction<I1, I2, I3, O> f) { return async(transfer(f)); }
public default <I1, I2, I3, O> TriFunction<I1, I2, I3, O> appliesOnInstance(SerializableTriFunction<I1, I2, I3, O> f) { return sync(transfer(f)); }
default <I1, I2, I3, O> TriFunction<I1, I2, I3, Future<O>> asyncAppliesOnInstance(SerializableTriFunction<I1, I2, I3, O> f) { return async(transfer(f)); }
default <I1, I2, I3, O> TriFunction<I1, I2, I3, O> appliesOnInstance(SerializableTriFunction<I1, I2, I3, O> f) { return sync(transfer(f)); }

public <E extends Serializable> E transfer(E object);
<E extends Serializable> E transfer(E object);

}
}
@@ -30,28 +30,28 @@
/**
* Represents a clean way to handoff evaluation of some work to an executor associated
* with a node's lifetime.
*
* <p>
* There is no transfer of execution to the parallel class hierarchy.
*
* <p>
* Classes, such as Instance, that are themselves instantiated on the correct ClassLoader, utilise this class
* to ensure the lifetime of any thread evaluating one of its method invocations matches the lifetime of the class itself.
* Since they are instantiated on the correct ClassLoader, sharing only the interface, there is no serialization necessary.
*/
public interface IIsolatedExecutor
{
public interface CallableNoExcept<O> extends Callable<O> { public O call(); }
public interface SerializableCallable<O> extends CallableNoExcept<O>, Serializable { }
public interface SerializableRunnable extends Runnable, Serializable {}
public interface SerializableConsumer<O> extends Consumer<O>, Serializable {}
public interface SerializableSupplier<O> extends Supplier<O>, Serializable {}
public interface SerializableBiConsumer<I1, I2> extends BiConsumer<I1, I2>, Serializable {}
public interface SerializableFunction<I, O> extends Function<I, O>, Serializable {}
public interface SerializableBiFunction<I1, I2, O> extends BiFunction<I1, I2, O>, Serializable {}
public interface TriFunction<I1, I2, I3, O>
{
O apply(I1 i1, I2 i2, I3 i3);
}
public interface SerializableTriFunction<I1, I2, I3, O> extends Serializable, TriFunction<I1, I2, I3, O> { }
interface CallableNoExcept<O> extends Callable<O> { O call(); }

interface SerializableCallable<O> extends CallableNoExcept<O>, Serializable {}
interface SerializableRunnable extends Runnable, Serializable {}
interface SerializableConsumer<O> extends Consumer<O>, Serializable {}
interface SerializableSupplier<O> extends Supplier<O>, Serializable {}
interface SerializableBiConsumer<I1, I2> extends BiConsumer<I1, I2>, Serializable {}
interface SerializableFunction<I, O> extends Function<I, O>, Serializable {}
interface SerializableBiFunction<I1, I2, O> extends BiFunction<I1, I2, O>, Serializable {}

interface TriFunction<I1, I2, I3, O> { O apply(I1 i1, I2 i2, I3 i3); }

interface SerializableTriFunction<I1, I2, I3, O> extends Serializable, TriFunction<I1, I2, I3, O> {}

Future<Void> shutdown();

@@ -124,5 +124,4 @@ public interface SerializableRunnable extends Runnable, Serializable {}
* Convert the execution to one performed synchronously on the IsolatedExecutor
*/
<I1, I2, I3, O> TriFunction<I1, I2, I3, O> sync(TriFunction<I1, I2, I3, O> f);

}
@@ -20,7 +20,10 @@

public interface IListen
{
interface Cancel { void cancel(); }
interface Cancel
{
void cancel();
}

Cancel schema(Runnable onChange);

@@ -23,15 +23,19 @@

/**
* A cross-version interface for delivering internode messages via message sinks.
*
* <p>
* Message implementations should be serializable so we could load into instances.
*/
public interface IMessage extends Serializable
{
int verb();

byte[] bytes();

// TODO: need to make this a long
int id();

int version();

InetSocketAddress from();
}

0 comments on commit e620eb3

Please sign in to comment.