Skip to content
Permalink
Browse files
CASSANDRA-17013: CEP-10 Simulator Improvements
  • Loading branch information
belliottsmith committed Oct 5, 2021
1 parent 159afb9 commit 3d295744890ccec7ee6fea827fa700280045ca67
Showing 11 changed files with 436 additions and 24 deletions.
@@ -0,0 +1,30 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cassandra.distributed.api;

public interface IClassTransformer {

/**
* Modify the bytecode of the provided class. Provides the original bytecode and the fully qualified name of the class.
* Note, bytecode may be null indicating the class definition could not be found. In this case a synthetic definition
* may be returned, or null.
*/
byte[] transform(String name, byte[] bytecode);

}
@@ -56,6 +56,15 @@

IMessageFilters filters();

default void setMessageSink(IMessageSink messageSink) { throw new UnsupportedOperationException(); }

default void deliverMessage(InetSocketAddress to, IMessage msg)
{
IInstance toInstance = get(to);
if (toInstance != null)
toInstance.receiveMessage(msg);
}

/**
* dynamically sets the current uncaught exceptions filter
*
@@ -21,6 +21,7 @@
import java.net.InetSocketAddress;
import java.util.List;
import java.util.UUID;
import java.util.concurrent.Executor;
import java.util.concurrent.Future;

import org.apache.cassandra.distributed.shared.Metrics;
@@ -83,9 +84,12 @@ default int nodetool(String... commandAndArgs)

// these methods are not for external use, but for simplicity we leave them public and on the normal IInstance interface
void startup(ICluster cluster);
default void postStartup() {}

void receiveMessage(IMessage message);

void receiveMessageWithInvokingThread(IMessage message);

int getMessagingVersion();

void setMessagingVersion(InetSocketAddress addressAndPort, int version);
@@ -96,6 +100,8 @@ default int nodetool(String... commandAndArgs)

void forceCompact(String keyspace, String table);

default Executor executorFor(int verb) { throw new UnsupportedOperationException(); }

default boolean getLogsEnabled()
{
try
@@ -61,6 +61,8 @@

IInstanceConfig set(String fieldName, Object value);

default IInstanceConfig forceSet(String fieldName, Object value) { throw new UnsupportedOperationException(); }

Object get(String fieldName);

String getString(String fieldName);
@@ -69,7 +71,9 @@

boolean has(Feature featureFlag);

public IInstanceConfig forVersion(Semver series);
IInstanceConfig forVersion(Semver series);

Map<String, Object> getParams();

public static class ParameterizedClass
{
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package org.apache.cassandra.distributed.api;

public interface IInstanceInitializer
{
default void initialise(ClassLoader classLoader, int num) { throw new UnsupportedOperationException(); }
void initialise(ClassLoader classLoader, ThreadGroup threadGroup, int num, int generation);
default void beforeStartup(IInstance instance) {}
default void afterStartup(IInstance instance) {}
}
@@ -47,19 +47,115 @@ public interface IInvokableInstance extends IInstance

default <I> Function<I, Future<?>> asyncAcceptsOnInstance(SerializableConsumer<I> consumer) { return async(transfer(consumer)); }
default <I> Consumer<I> acceptsOnInstance(SerializableConsumer<I> consumer) { return sync(transfer(consumer)); }
default <I1> void acceptOnInstance(SerializableConsumer<I1> consumer, I1 i1) { acceptsOnInstance(consumer).accept(i1); }

default <I1, I2> BiFunction<I1, I2, Future<?>> asyncAcceptsOnInstance(SerializableBiConsumer<I1, I2> consumer) { return async(transfer(consumer)); }
default <I1, I2> BiConsumer<I1, I2> acceptsOnInstance(SerializableBiConsumer<I1, I2> consumer) { return sync(transfer(consumer)); }
default <I1, I2> void acceptOnInstance(SerializableBiConsumer<I1, I2> consumer, I1 i1, I2 i2) { acceptsOnInstance(consumer).accept(i1, i2); }

default <I1, I2, I3> TriFunction<I1, I2, I3, Future<?>> asyncAcceptsOnInstance(SerializableTriConsumer<I1, I2, I3> consumer) { return async(transfer(consumer)); }
default <I1, I2, I3> TriConsumer<I1, I2, I3> acceptsOnInstance(SerializableTriConsumer<I1, I2, I3> consumer) { return sync(transfer(consumer)); }
default <I1, I2, I3> void acceptOnInstance(SerializableTriConsumer<I1, I2, I3> consumer, I1 i1, I2 i2, I3 i3) { acceptsOnInstance(consumer).accept(i1, i2, i3); }

default <I, O> Function<I, Future<O>> asyncAppliesOnInstance(SerializableFunction<I, O> f) { return async(transfer(f)); }
default <I, O> Function<I, O> appliesOnInstance(SerializableFunction<I, O> f) { return sync(transfer(f)); }
default <I1, O> O applyOnInstance(SerializableFunction<I1, O> f, I1 i1) { return sync(transfer(f)).apply(i1); }

default <I1, I2, O> BiFunction<I1, I2, Future<O>> asyncAppliesOnInstance(SerializableBiFunction<I1, I2, O> f) { return async(transfer(f)); }
default <I1, I2, O> BiFunction<I1, I2, O> appliesOnInstance(SerializableBiFunction<I1, I2, O> f) { return sync(transfer(f)); }
default <I1, I2, O> O applyOnInstance(SerializableBiFunction<I1, I2, O> f, I1 i1, I2 i2) { return sync(transfer(f)).apply(i1, i2); }

default <I1, I2, I3, O> TriFunction<I1, I2, I3, Future<O>> asyncAppliesOnInstance(SerializableTriFunction<I1, I2, I3, O> f) { return async(transfer(f)); }
default <I1, I2, I3, O> TriFunction<I1, I2, I3, O> appliesOnInstance(SerializableTriFunction<I1, I2, I3, O> f) { return sync(transfer(f)); }
default <I1, I2, I3, O> O applyOnInstance(SerializableTriFunction<I1, I2, I3, O> f, I1 i1, I2 i2, I3 i3) { return sync(transfer(f)).apply(i1, i2, i3); }

<E extends Serializable> E transfer(E object);
default <I1, I2, I3, I4, O> QuadFunction<I1, I2, I3, I4, Future<O>> asyncAppliesOnInstance(SerializableQuadFunction<I1, I2, I3, I4, O> f) { return async(transfer(f)); }
default <I1, I2, I3, I4, O> QuadFunction<I1, I2, I3, I4, O> appliesOnInstance(SerializableQuadFunction<I1, I2, I3, I4, O> f) { return sync(transfer(f)); }
default <I1, I2, I3, I4, O> O applyOnInstance(SerializableQuadFunction<I1, I2, I3, I4, O> f, I1 i1, I2 i2, I3 i3, I4 i4) { return sync(transfer(f)).apply(i1, i2, i3, i4); }

default <I1, I2, I3, I4, I5, O> QuintFunction<I1, I2, I3, I4, I5, Future<O>> asyncAppliesOnInstance(SerializableQuintFunction<I1, I2, I3, I4, I5, O> f) { return async(transfer(f)); }
default <I1, I2, I3, I4, I5, O> QuintFunction<I1, I2, I3, I4, I5, O> appliesOnInstance(SerializableQuintFunction<I1, I2, I3, I4, I5, O> f) { return sync(transfer(f)); }
default <I1, I2, I3, I4, I5, O> O applyOnInstance(SerializableQuintFunction<I1, I2, I3, I4, I5, O> f, I1 i1, I2 i2, I3 i3, I4 i4, I5 i5) { return sync(transfer(f)).apply(i1, i2, i3, i4, i5); }

/**
* {@link #runOnInstance(SerializableRunnable)} on the invoking thread
*/
default void unsafeRunOnThisThread(IIsolatedExecutor.SerializableRunnable invoke)
{
transfer(invoke).run();
}

/**
* {@link #callOnInstance(SerializableCallable)} on the invoking thread
*/
default <O> O unsafeCallOnThisThread(IIsolatedExecutor.SerializableCallable<O> invoke)
{
return transfer(invoke).call();
}

/**
* {@link #acceptOnInstance(SerializableConsumer, Object)} on the invoking thread
*/
default <I> void unsafeAcceptOnThisThread(IIsolatedExecutor.SerializableConsumer<I> apply, I i1)
{
transfer(apply).accept(i1);
}

/**
* {@link #acceptOnInstance(SerializableBiConsumer, Object, Object)} on the invoking thread
*/
default <I1, I2> void unsafeAcceptOnThisThread(IIsolatedExecutor.SerializableBiConsumer<I1, I2> apply, I1 i1, I2 i2)
{
transfer(apply).accept(i1, i2);
}

/**
* {@link #acceptOnInstance(SerializableBiConsumer, Object, Object)} on the invoking thread
*/
default <I1, I2, I3> void unsafeAcceptOnThisThread(IIsolatedExecutor.SerializableTriConsumer<I1, I2, I3> apply, I1 i1, I2 i2, I3 i3)
{
transfer(apply).accept(i1, i2, i3);
}

/**
* {@link #applyOnInstance(SerializableFunction, Object)} on the invoking thread
*/
default <I, O> O unsafeApplyOnThisThread(IIsolatedExecutor.SerializableFunction<I, O> apply, I i1)
{
return transfer(apply).apply(i1);
}

/**
* {@link #applyOnInstance(SerializableBiFunction, Object, Object)} on the invoking thread
*/
default <I1, I2, O> O unsafeApplyOnThisThread(IIsolatedExecutor.SerializableBiFunction<I1, I2, O> apply, I1 i1, I2 i2)
{
return transfer(apply).apply(i1, i2);
}

/**
* {@link #applyOnInstance(SerializableTriFunction, Object, Object, Object)} on the invoking thread
*/
default <I1, I2, I3, O> O unsafeApplyOnThisThread(IIsolatedExecutor.SerializableTriFunction<I1, I2, I3, O> apply, I1 i1, I2 i2, I3 i3)
{
return transfer(apply).apply(i1, i2, i3);
}

/**
* {@link #applyOnInstance(SerializableTriFunction, Object, Object, Object)} on the invoking thread
*/
default <I1, I2, I3, I4, O> O unsafeApplyOnThisThread(IIsolatedExecutor.SerializableQuadFunction<I1, I2, I3, I4, O> apply, I1 i1, I2 i2, I3 i3, I4 i4)
{
return transfer(apply).apply(i1, i2, i3, i4);
}

/**
* {@link #applyOnInstance(SerializableTriFunction, Object, Object, Object)} on the invoking thread
*/
default <I1, I2, I3, I4, I5, O> O unsafeApplyOnThisThread(IIsolatedExecutor.SerializableQuintFunction<I1, I2, I3, I4, I5, O> apply, I1 i1, I2 i2, I3 i3, I4 i4, I5 i5)
{
return transfer(apply).apply(i1, i2, i3, i4, i5);
}

<E extends Serializable> E transfer(E object);
}
@@ -20,6 +20,8 @@

import java.io.Serializable;
import java.util.concurrent.Callable;
import java.util.concurrent.Executor;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.function.BiConsumer;
import java.util.function.BiFunction;
@@ -41,17 +43,33 @@
{
interface CallableNoExcept<O> extends Callable<O> { O call(); }

interface SerializableCallable<O> extends CallableNoExcept<O>, Serializable {}
interface SerializableRunnable extends Runnable, Serializable {}
interface SerializableConsumer<O> extends Consumer<O>, Serializable {}

interface SerializableSupplier<O> extends Supplier<O>, Serializable {}
interface SerializableCallable<O> extends CallableNoExcept<O>, Serializable {}

interface SerializableConsumer<O> extends Consumer<O>, Serializable {}
interface SerializableBiConsumer<I1, I2> extends BiConsumer<I1, I2>, Serializable {}
interface TriConsumer<I1, I2, I3> { void accept(I1 i1, I2 i2, I3 i3); }
interface SerializableTriConsumer<I1, I2, I3> extends TriConsumer<I1, I2, I3>, Serializable { }

interface DynamicFunction<IO>
{
<IO2 extends IO> IO2 apply(IO2 i);
}
interface SerializableDynamicFunction<IO> extends DynamicFunction<IO>, Serializable {}

interface SerializableFunction<I, O> extends Function<I, O>, Serializable {}
interface SerializableBiFunction<I1, I2, O> extends BiFunction<I1, I2, O>, Serializable {}

interface TriFunction<I1, I2, I3, O> { O apply(I1 i1, I2 i2, I3 i3); }

interface SerializableTriFunction<I1, I2, I3, O> extends Serializable, TriFunction<I1, I2, I3, O> {}
interface QuadFunction<I1, I2, I3, I4, O> { O apply(I1 i1, I2 i2, I3 i3, I4 i4); }
interface SerializableQuadFunction<I1, I2, I3, I4, O> extends Serializable, QuadFunction<I1, I2, I3, I4, O> {}
interface QuintFunction<I1, I2, I3, I4, I5, O> { O apply(I1 i1, I2 i2, I3 i3, I4 i4, I5 i5); }
interface SerializableQuintFunction<I1, I2, I3, I4, I5, O> extends Serializable, QuintFunction<I1, I2, I3, I4, I5, O> {}

default IIsolatedExecutor with(ExecutorService executor) { throw new UnsupportedOperationException(); }
default Executor executor() { throw new UnsupportedOperationException(); }

Future<Void> shutdown();

@@ -95,6 +113,16 @@ interface SerializableRunnable extends Runnable, Serializable {}
*/
<I1, I2> BiConsumer<I1, I2> sync(BiConsumer<I1, I2> consumer);

/**
* Convert the execution to one performed synchronously on the IsolatedExecutor
*/
<I1, I2, I3> TriFunction<I1, I2, I3, Future<?>> async(TriConsumer<I1, I2, I3> consumer);

/**
* Convert the execution to one performed synchronously on the IsolatedExecutor
*/
<I1, I2, I3> TriConsumer<I1, I2, I3> sync(TriConsumer<I1, I2, I3> consumer);

/**
* Convert the execution to one performed asynchronously on the IsolatedExecutor, returning a Future of the execution result
*/
@@ -124,4 +152,24 @@ interface SerializableRunnable extends Runnable, Serializable {}
* Convert the execution to one performed synchronously on the IsolatedExecutor
*/
<I1, I2, I3, O> TriFunction<I1, I2, I3, O> sync(TriFunction<I1, I2, I3, O> f);

/**
* Convert the execution to one performed asynchronously on the IsolatedExecutor, returning a Future of the execution result
*/
<I1, I2, I3, I4, O> QuadFunction<I1, I2, I3, I4, Future<O>> async(QuadFunction<I1, I2, I3, I4, O> f);

/**
* Convert the execution to one performed synchronously on the IsolatedExecutor
*/
<I1, I2, I3, I4, O> QuadFunction<I1, I2, I3, I4, O> sync(QuadFunction<I1, I2, I3, I4, O> f);

/**
* Convert the execution to one performed asynchronously on the IsolatedExecutor, returning a Future of the execution result
*/
<I1, I2, I3, I4, I5, O> QuintFunction<I1, I2, I3, I4, I5, Future<O>> async(QuintFunction<I1, I2, I3, I4, I5, O> f);

/**
* Convert the execution to one performed synchronously on the IsolatedExecutor
*/
<I1, I2, I3, I4, I5, O> QuintFunction<I1, I2, I3, I4, I5, O> sync(QuintFunction<I1, I2, I3, I4, I5, O> f);
}
@@ -25,7 +25,6 @@
interface Filter
{
Filter off();

Filter on();
}

@@ -94,6 +93,9 @@ default Builder allVerbs()

void reset();

default boolean hasInbound() { return true; }
default boolean hasOutbound() { return true; }

/**
* Checks if the message should be delivered. This is expected to run on "inbound", or on the reciever of
* the message (instance.config.num == to).
@@ -0,0 +1,27 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
* KIND, either express or implied. See the License for the
* specific language governing permissions and limitations
* under the License.
*/
package org.apache.cassandra.distributed.api;

import java.net.InetSocketAddress;

public interface IMessageSink {

void accept(InetSocketAddress to, IMessage message);

}

0 comments on commit 3d29574

Please sign in to comment.