Skip to content

Commit

Permalink
09837 9578 advanced wire transformers (#9894)
Browse files Browse the repository at this point in the history
Signed-off-by: Cody Littley <cody@swirldslabs.com>
  • Loading branch information
cody-littley committed Nov 15, 2023
1 parent f00aa3c commit 649ea48
Show file tree
Hide file tree
Showing 32 changed files with 940 additions and 161 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@
import com.swirlds.common.wiring.builders.TaskSchedulerMetricsBuilder;
import com.swirlds.common.wiring.builders.TaskSchedulerType;
import com.swirlds.common.wiring.counters.ObjectCounter;
import com.swirlds.common.wiring.wires.input.InputWire;
import com.swirlds.common.wiring.wires.input.TaskSchedulerInput;
import com.swirlds.common.wiring.wires.output.OutputWire;
import com.swirlds.common.wiring.wires.output.StandardOutputWire;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.util.Objects;
import java.util.function.Consumer;
Expand All @@ -38,13 +42,13 @@
*
* @param <OUT> the output type of the primary output wire (use {@link Void} if no output is needed)
*/
public abstract class TaskScheduler<OUT> {
public abstract class TaskScheduler<OUT> extends TaskSchedulerInput<OUT> {

private final boolean flushEnabled;
private final WiringModel model;
private final String name;
private final TaskSchedulerType type;
private final OutputWire<OUT> primaryOutputWire;
private final StandardOutputWire<OUT> primaryOutputWire;
private final boolean insertionIsBlocking;

/**
Expand All @@ -68,7 +72,7 @@ protected TaskScheduler(
this.name = Objects.requireNonNull(name);
this.type = Objects.requireNonNull(type);
this.flushEnabled = flushEnabled;
primaryOutputWire = new OutputWire<>(model, name);
primaryOutputWire = new StandardOutputWire<>(model, name);
this.insertionIsBlocking = insertionIsBlocking;
}

Expand Down Expand Up @@ -110,10 +114,10 @@ public OutputWire<OUT> getOutputWire() {
* @return the secondary output wire
*/
@NonNull
public <T> OutputWire<T> buildSecondaryOutputWire() {
public <T> StandardOutputWire<T> buildSecondaryOutputWire() {
// Intentionally do not register this with the model. Connections using this output wire will be represented
// in the model in the same way as connections to the primary output wire.
return new OutputWire<>(model, name);
return new StandardOutputWire<>(model, name);
}

/**
Expand Down Expand Up @@ -194,34 +198,6 @@ public final <X> TaskScheduler<X> cast() {
*/
public abstract void flush();

/**
* Add a task to the scheduler. May block if back pressure is enabled.
*
* @param handler handles the provided data
* @param data the data to be processed by the task scheduler
*/
protected abstract void put(@NonNull Consumer<Object> handler, @NonNull Object data);

/**
* Add a task to the scheduler. If backpressure is enabled and there is not immediately capacity available, this
* method will not accept the data.
*
* @param handler handles the provided data
* @param data the data to be processed by the scheduler
* @return true if the data was accepted, false otherwise
*/
protected abstract boolean offer(@NonNull Consumer<Object> handler, @NonNull Object data);

/**
* Inject data into the scheduler, doing so even if it causes the number of unprocessed tasks to exceed the capacity
* specified by configured back pressure. If backpressure is disabled, this operation is logically equivalent to
* {@link #put(Consumer, Object)}.
*
* @param handler handles the provided data
* @param data the data to be processed by the scheduler
*/
protected abstract void inject(@NonNull Consumer<Object> handler, @NonNull Object data);

/**
* Throw an {@link UnsupportedOperationException} if flushing is not enabled.
*/
Expand All @@ -232,12 +208,10 @@ protected final void throwIfFlushDisabled() {
}

/**
* Pass data to this scheduler's primary output wire.
* <p>
* This method is implemented here to allow classes in this package to call forward(), which otherwise would not be
* visible.
* {@inheritDoc}
*/
protected final void forward(@NonNull final OUT data) {
@Override
protected void forward(@NonNull final OUT data) {
primaryOutputWire.forward(data);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import com.swirlds.common.wiring.schedulers.HeartbeatScheduler;
import com.swirlds.common.wiring.schedulers.SequentialThreadTaskScheduler;
import com.swirlds.common.wiring.utility.ModelGroup;
import com.swirlds.common.wiring.wires.SolderType;
import com.swirlds.common.wiring.wires.output.OutputWire;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.time.Duration;
import java.time.Instant;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,14 @@ public class ConcurrentTaskScheduler<OUT> extends TaskScheduler<OUT> {
* @param onRamp an object counter that is incremented when data is added to the scheduler
* @param offRamp an object counter that is decremented when data is removed from the scheduler
* @param flushEnabled if true, then {@link #flush()} will be enabled, otherwise it will throw.
* @param insertionIsBlocking when data is inserted into this scheduler, will it block until capacity is available?
* @param insertionIsBlocking when data is inserted into this scheduler, will it block until capacity is
* available?
*/
public ConcurrentTaskScheduler(
@NonNull final WiringModel model,
@NonNull final String name,
@NonNull ForkJoinPool pool,
@NonNull UncaughtExceptionHandler uncaughtExceptionHandler,
@NonNull final ForkJoinPool pool,
@NonNull final UncaughtExceptionHandler uncaughtExceptionHandler,
@NonNull final ObjectCounter onRamp,
@NonNull final ObjectCounter offRamp,
final boolean flushEnabled,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
import com.swirlds.base.state.Startable;
import com.swirlds.base.state.Stoppable;
import com.swirlds.base.time.Time;
import com.swirlds.common.wiring.OutputWire;
import com.swirlds.common.wiring.WiringModel;
import com.swirlds.common.wiring.builders.TaskSchedulerType;
import com.swirlds.common.wiring.wires.output.OutputWire;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.time.Duration;
import java.time.Instant;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@
package com.swirlds.common.wiring.schedulers;

import com.swirlds.base.time.Time;
import com.swirlds.common.wiring.OutputWire;
import com.swirlds.common.wiring.WiringModel;
import com.swirlds.common.wiring.wires.output.OutputWire;
import com.swirlds.common.wiring.wires.output.StandardOutputWire;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.time.Duration;
import java.time.Instant;
Expand All @@ -32,7 +33,7 @@ class HeartbeatTask extends TimerTask {

private final Time time;
private final Duration period;
private final OutputWire<Instant> outputWire;
private final StandardOutputWire<Instant> outputWire;

/**
* Constructor.
Expand All @@ -49,7 +50,7 @@ public HeartbeatTask(
this.period = Objects.requireNonNull(period);
Objects.requireNonNull(name);

this.outputWire = new OutputWire<>(model, name);
this.outputWire = new StandardOutputWire<>(model, name);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public class SequentialThreadTaskScheduler<OUT> extends TaskScheduler<OUT> imple
private static final int BUFFER_SIZE = 1024;

private final AtomicBoolean alive = new AtomicBoolean(true);

private final Thread thread;

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright (C) 2023 Hedera Hashgraph, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.swirlds.common.wiring.transformers;

import edu.umd.cs.findbugs.annotations.NonNull;
import edu.umd.cs.findbugs.annotations.Nullable;

/**
* Executes a transformation for an advanced transformer as created by
* {@link com.swirlds.common.wiring.wires.output.OutputWire#buildAdvancedTransformer(String, AdvancedTransformation)}.
*
* @param <A> the original wire output type
* @param <B> the output type of the transformer
*/
public interface AdvancedTransformation<A, B> {

/**
* Given data that comes off of the original output wire, this method transforms it before it is passed to each
* input wire that is connected to this transformer. Called once per data element per listener.
*
* @param a a data element from the original output wire
* @return the transformed data element, or null if the data should not be forwarded
*/
@Nullable
B transform(@NonNull A a);

/**
* Called on the original data element after it has been forwarded to all listeners. This method can do cleanup if
* necessary. Doing nothing is perfectly ok if the use case does not require cleanup.
*
* @param a the original data element
*/
void cleanup(@NonNull A a);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright (C) 2023 Hedera Hashgraph, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.swirlds.common.wiring.transformers.internal;

import com.swirlds.common.wiring.WiringModel;
import com.swirlds.common.wiring.builders.TaskSchedulerType;
import com.swirlds.common.wiring.wires.output.OutputWire;
import com.swirlds.common.wiring.wires.output.internal.ForwardingOutputWire;
import com.swirlds.common.wiring.wires.output.internal.TransformingOutputWire;
import edu.umd.cs.findbugs.annotations.NonNull;
import edu.umd.cs.findbugs.annotations.Nullable;
import java.util.function.Consumer;
import java.util.function.Function;

/**
* Similar to a {@link WireTransformer} but for more advanced use cases. Unlike a {@link WireTransformer}, the
* transforming function is called once per output per data item, and a special method can be called after the data is
* forwarded to all destinations.
*
* @param <A> the input type
* @param <B> the output type
*/
public class AdvancedWireTransformer<A, B> implements Consumer<A> {

private final ForwardingOutputWire<A, B> outputWire;

/**
* Constructor.
*
* @param model the wiring model containing this output wire
* @param name the name of the output wire
* @param transformer the function to transform the data from the input type to the output type. Is called once per
* output per data item. If this method returns null then the data is not forwarded.
* @param cleanup an optional method that is called after the data is forwarded to all destinations. The
* original data is passed to this method. Ignored if null.
*/
public AdvancedWireTransformer(
@NonNull final WiringModel model,
@NonNull final String name,
@NonNull final Function<A, B> transformer,
@Nullable final Consumer<A> cleanup) {

model.registerVertex(name, TaskSchedulerType.DIRECT_STATELESS, true);
outputWire = new TransformingOutputWire<>(model, name, transformer, cleanup);
}

/**
* {@inheritDoc}
*/
@Override
public void accept(@NonNull final A a) {
outputWire.forward(a);
}

/**
* Get the output wire for this transformer.
*
* @return the output wire
*/
@NonNull
public OutputWire<B> getOutputWire() {
return outputWire;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@
* limitations under the License.
*/

package com.swirlds.common.wiring.transformers;
package com.swirlds.common.wiring.transformers.internal;

import com.swirlds.common.wiring.OutputWire;
import com.swirlds.common.wiring.WiringModel;
import com.swirlds.common.wiring.builders.TaskSchedulerType;
import com.swirlds.common.wiring.wires.output.OutputWire;
import com.swirlds.common.wiring.wires.output.StandardOutputWire;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.util.Objects;
import java.util.function.Consumer;
Expand All @@ -30,7 +31,7 @@
public class WireFilter<T> implements Consumer<T> {

private final Predicate<T> predicate;
private final OutputWire<T> outputWire;
private final StandardOutputWire<T> outputWire;

/**
* Constructor.
Expand All @@ -44,7 +45,7 @@ public class WireFilter<T> implements Consumer<T> {
public WireFilter(
@NonNull final WiringModel model, @NonNull final String name, @NonNull final Predicate<T> predicate) {
this.predicate = Objects.requireNonNull(predicate);
this.outputWire = new OutputWire<>(model, name);
this.outputWire = new StandardOutputWire<>(model, name);
model.registerVertex(name, TaskSchedulerType.DIRECT_STATELESS, true);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@
* limitations under the License.
*/

package com.swirlds.common.wiring.transformers;
package com.swirlds.common.wiring.transformers.internal;

import com.swirlds.common.wiring.OutputWire;
import com.swirlds.common.wiring.WiringModel;
import com.swirlds.common.wiring.builders.TaskSchedulerType;
import com.swirlds.common.wiring.wires.output.OutputWire;
import com.swirlds.common.wiring.wires.output.StandardOutputWire;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.util.List;
import java.util.function.Consumer;
Expand All @@ -29,7 +30,7 @@
*/
public class WireListSplitter<T> implements Consumer<List<T>> {

private final OutputWire<T> outputWire;
private final StandardOutputWire<T> outputWire;

/**
* Constructor.
Expand All @@ -39,7 +40,7 @@ public class WireListSplitter<T> implements Consumer<List<T>> {
*/
public WireListSplitter(@NonNull final WiringModel model, @NonNull final String name) {
model.registerVertex(name, TaskSchedulerType.DIRECT_STATELESS, true);
outputWire = new OutputWire<>(model, name);
outputWire = new StandardOutputWire<>(model, name);
}

/**
Expand Down

0 comments on commit 649ea48

Please sign in to comment.