From 649ea484929f12c47184469b38dd00bdabd88846 Mon Sep 17 00:00:00 2001 From: Cody Littley <56973212+cody-littley@users.noreply.github.com> Date: Wed, 15 Nov 2023 11:46:46 -0600 Subject: [PATCH] 09837 9578 advanced wire transformers (#9894) Signed-off-by: Cody Littley --- .../swirlds/common/wiring/TaskScheduler.java | 50 +-- .../swirlds/common/wiring/WiringModel.java | 2 + .../schedulers/ConcurrentTaskScheduler.java | 7 +- .../wiring/schedulers/HeartbeatScheduler.java | 2 +- .../wiring/schedulers/HeartbeatTask.java | 7 +- .../SequentialThreadTaskScheduler.java | 1 + .../transformers/AdvancedTransformation.java | 48 +++ .../internal/AdvancedWireTransformer.java | 78 ++++ .../{ => internal}/WireFilter.java | 9 +- .../{ => internal}/WireListSplitter.java | 9 +- .../{ => internal}/WireTransformer.java | 9 +- .../common/wiring/{ => wires}/SolderType.java | 10 +- .../wiring/{ => wires/input}/InputWire.java | 21 +- .../wires/input/TaskSchedulerInput.java | 65 ++++ .../wiring/{ => wires/output}/OutputWire.java | 145 +++---- .../wires/output/StandardOutputWire.java | 79 ++++ .../output/internal/ForwardingOutputWire.java | 50 +++ .../internal/TransformingOutputWire.java | 102 +++++ .../src/main/java/module-info.java | 4 + .../wiring/benchmark/WiringBenchmark.java | 2 +- .../common/wiring/model/ModelTests.java | 6 +- .../ConcurrentTaskSchedulerTests.java | 2 +- .../schedulers/DirectTaskSchedulerTests.java | 6 +- .../schedulers/HeartbeatSchedulerTests.java | 2 +- .../SequentialTaskSchedulerTests.java | 10 +- .../TaskSchedulerTransformersTests.java | 353 +++++++++++++++++- .../wiring/EventDeduplicatorScheduler.java | 4 +- .../EventSignatureValidatorScheduler.java | 4 +- .../wiring/InOrderLinkerScheduler.java | 4 +- .../InternalEventValidatorScheduler.java | 4 +- .../wiring/LinkedEventIntakeScheduler.java | 2 +- .../wiring/OrphanBufferScheduler.java | 4 +- 32 files changed, 940 insertions(+), 161 deletions(-) create mode 100644 platform-sdk/swirlds-common/src/main/java/com/swirlds/common/wiring/transformers/AdvancedTransformation.java create mode 100644 platform-sdk/swirlds-common/src/main/java/com/swirlds/common/wiring/transformers/internal/AdvancedWireTransformer.java rename platform-sdk/swirlds-common/src/main/java/com/swirlds/common/wiring/transformers/{ => internal}/WireFilter.java (88%) rename platform-sdk/swirlds-common/src/main/java/com/swirlds/common/wiring/transformers/{ => internal}/WireListSplitter.java (85%) rename platform-sdk/swirlds-common/src/main/java/com/swirlds/common/wiring/transformers/{ => internal}/WireTransformer.java (89%) rename platform-sdk/swirlds-common/src/main/java/com/swirlds/common/wiring/{ => wires}/SolderType.java (77%) rename platform-sdk/swirlds-common/src/main/java/com/swirlds/common/wiring/{ => wires/input}/InputWire.java (89%) create mode 100644 platform-sdk/swirlds-common/src/main/java/com/swirlds/common/wiring/wires/input/TaskSchedulerInput.java rename platform-sdk/swirlds-common/src/main/java/com/swirlds/common/wiring/{ => wires/output}/OutputWire.java (58%) create mode 100644 platform-sdk/swirlds-common/src/main/java/com/swirlds/common/wiring/wires/output/StandardOutputWire.java create mode 100644 platform-sdk/swirlds-common/src/main/java/com/swirlds/common/wiring/wires/output/internal/ForwardingOutputWire.java create mode 100644 platform-sdk/swirlds-common/src/main/java/com/swirlds/common/wiring/wires/output/internal/TransformingOutputWire.java diff --git a/platform-sdk/swirlds-common/src/main/java/com/swirlds/common/wiring/TaskScheduler.java b/platform-sdk/swirlds-common/src/main/java/com/swirlds/common/wiring/TaskScheduler.java index 13f252d2c13a..347017e0bf29 100644 --- a/platform-sdk/swirlds-common/src/main/java/com/swirlds/common/wiring/TaskScheduler.java +++ b/platform-sdk/swirlds-common/src/main/java/com/swirlds/common/wiring/TaskScheduler.java @@ -20,6 +20,10 @@ import com.swirlds.common.wiring.builders.TaskSchedulerMetricsBuilder; import com.swirlds.common.wiring.builders.TaskSchedulerType; import com.swirlds.common.wiring.counters.ObjectCounter; +import com.swirlds.common.wiring.wires.input.InputWire; +import com.swirlds.common.wiring.wires.input.TaskSchedulerInput; +import com.swirlds.common.wiring.wires.output.OutputWire; +import com.swirlds.common.wiring.wires.output.StandardOutputWire; import edu.umd.cs.findbugs.annotations.NonNull; import java.util.Objects; import java.util.function.Consumer; @@ -38,13 +42,13 @@ * * @param the output type of the primary output wire (use {@link Void} if no output is needed) */ -public abstract class TaskScheduler { +public abstract class TaskScheduler extends TaskSchedulerInput { private final boolean flushEnabled; private final WiringModel model; private final String name; private final TaskSchedulerType type; - private final OutputWire primaryOutputWire; + private final StandardOutputWire primaryOutputWire; private final boolean insertionIsBlocking; /** @@ -68,7 +72,7 @@ protected TaskScheduler( this.name = Objects.requireNonNull(name); this.type = Objects.requireNonNull(type); this.flushEnabled = flushEnabled; - primaryOutputWire = new OutputWire<>(model, name); + primaryOutputWire = new StandardOutputWire<>(model, name); this.insertionIsBlocking = insertionIsBlocking; } @@ -110,10 +114,10 @@ public OutputWire getOutputWire() { * @return the secondary output wire */ @NonNull - public OutputWire buildSecondaryOutputWire() { + public StandardOutputWire buildSecondaryOutputWire() { // Intentionally do not register this with the model. Connections using this output wire will be represented // in the model in the same way as connections to the primary output wire. - return new OutputWire<>(model, name); + return new StandardOutputWire<>(model, name); } /** @@ -194,34 +198,6 @@ public final TaskScheduler cast() { */ public abstract void flush(); - /** - * Add a task to the scheduler. May block if back pressure is enabled. - * - * @param handler handles the provided data - * @param data the data to be processed by the task scheduler - */ - protected abstract void put(@NonNull Consumer handler, @NonNull Object data); - - /** - * Add a task to the scheduler. If backpressure is enabled and there is not immediately capacity available, this - * method will not accept the data. - * - * @param handler handles the provided data - * @param data the data to be processed by the scheduler - * @return true if the data was accepted, false otherwise - */ - protected abstract boolean offer(@NonNull Consumer handler, @NonNull Object data); - - /** - * Inject data into the scheduler, doing so even if it causes the number of unprocessed tasks to exceed the capacity - * specified by configured back pressure. If backpressure is disabled, this operation is logically equivalent to - * {@link #put(Consumer, Object)}. - * - * @param handler handles the provided data - * @param data the data to be processed by the scheduler - */ - protected abstract void inject(@NonNull Consumer handler, @NonNull Object data); - /** * Throw an {@link UnsupportedOperationException} if flushing is not enabled. */ @@ -232,12 +208,10 @@ protected final void throwIfFlushDisabled() { } /** - * Pass data to this scheduler's primary output wire. - *

- * This method is implemented here to allow classes in this package to call forward(), which otherwise would not be - * visible. + * {@inheritDoc} */ - protected final void forward(@NonNull final OUT data) { + @Override + protected void forward(@NonNull final OUT data) { primaryOutputWire.forward(data); } } diff --git a/platform-sdk/swirlds-common/src/main/java/com/swirlds/common/wiring/WiringModel.java b/platform-sdk/swirlds-common/src/main/java/com/swirlds/common/wiring/WiringModel.java index 7e2a625e300e..3b3644ed2bbc 100644 --- a/platform-sdk/swirlds-common/src/main/java/com/swirlds/common/wiring/WiringModel.java +++ b/platform-sdk/swirlds-common/src/main/java/com/swirlds/common/wiring/WiringModel.java @@ -31,6 +31,8 @@ import com.swirlds.common.wiring.schedulers.HeartbeatScheduler; import com.swirlds.common.wiring.schedulers.SequentialThreadTaskScheduler; import com.swirlds.common.wiring.utility.ModelGroup; +import com.swirlds.common.wiring.wires.SolderType; +import com.swirlds.common.wiring.wires.output.OutputWire; import edu.umd.cs.findbugs.annotations.NonNull; import java.time.Duration; import java.time.Instant; diff --git a/platform-sdk/swirlds-common/src/main/java/com/swirlds/common/wiring/schedulers/ConcurrentTaskScheduler.java b/platform-sdk/swirlds-common/src/main/java/com/swirlds/common/wiring/schedulers/ConcurrentTaskScheduler.java index b776988f1169..d19aa116c3d9 100644 --- a/platform-sdk/swirlds-common/src/main/java/com/swirlds/common/wiring/schedulers/ConcurrentTaskScheduler.java +++ b/platform-sdk/swirlds-common/src/main/java/com/swirlds/common/wiring/schedulers/ConcurrentTaskScheduler.java @@ -48,13 +48,14 @@ public class ConcurrentTaskScheduler extends TaskScheduler { * @param onRamp an object counter that is incremented when data is added to the scheduler * @param offRamp an object counter that is decremented when data is removed from the scheduler * @param flushEnabled if true, then {@link #flush()} will be enabled, otherwise it will throw. - * @param insertionIsBlocking when data is inserted into this scheduler, will it block until capacity is available? + * @param insertionIsBlocking when data is inserted into this scheduler, will it block until capacity is + * available? */ public ConcurrentTaskScheduler( @NonNull final WiringModel model, @NonNull final String name, - @NonNull ForkJoinPool pool, - @NonNull UncaughtExceptionHandler uncaughtExceptionHandler, + @NonNull final ForkJoinPool pool, + @NonNull final UncaughtExceptionHandler uncaughtExceptionHandler, @NonNull final ObjectCounter onRamp, @NonNull final ObjectCounter offRamp, final boolean flushEnabled, diff --git a/platform-sdk/swirlds-common/src/main/java/com/swirlds/common/wiring/schedulers/HeartbeatScheduler.java b/platform-sdk/swirlds-common/src/main/java/com/swirlds/common/wiring/schedulers/HeartbeatScheduler.java index ea30f00d933c..624cdd34098f 100644 --- a/platform-sdk/swirlds-common/src/main/java/com/swirlds/common/wiring/schedulers/HeartbeatScheduler.java +++ b/platform-sdk/swirlds-common/src/main/java/com/swirlds/common/wiring/schedulers/HeartbeatScheduler.java @@ -19,9 +19,9 @@ import com.swirlds.base.state.Startable; import com.swirlds.base.state.Stoppable; import com.swirlds.base.time.Time; -import com.swirlds.common.wiring.OutputWire; import com.swirlds.common.wiring.WiringModel; import com.swirlds.common.wiring.builders.TaskSchedulerType; +import com.swirlds.common.wiring.wires.output.OutputWire; import edu.umd.cs.findbugs.annotations.NonNull; import java.time.Duration; import java.time.Instant; diff --git a/platform-sdk/swirlds-common/src/main/java/com/swirlds/common/wiring/schedulers/HeartbeatTask.java b/platform-sdk/swirlds-common/src/main/java/com/swirlds/common/wiring/schedulers/HeartbeatTask.java index b7f4290700b2..1e98d8110417 100644 --- a/platform-sdk/swirlds-common/src/main/java/com/swirlds/common/wiring/schedulers/HeartbeatTask.java +++ b/platform-sdk/swirlds-common/src/main/java/com/swirlds/common/wiring/schedulers/HeartbeatTask.java @@ -17,8 +17,9 @@ package com.swirlds.common.wiring.schedulers; import com.swirlds.base.time.Time; -import com.swirlds.common.wiring.OutputWire; import com.swirlds.common.wiring.WiringModel; +import com.swirlds.common.wiring.wires.output.OutputWire; +import com.swirlds.common.wiring.wires.output.StandardOutputWire; import edu.umd.cs.findbugs.annotations.NonNull; import java.time.Duration; import java.time.Instant; @@ -32,7 +33,7 @@ class HeartbeatTask extends TimerTask { private final Time time; private final Duration period; - private final OutputWire outputWire; + private final StandardOutputWire outputWire; /** * Constructor. @@ -49,7 +50,7 @@ public HeartbeatTask( this.period = Objects.requireNonNull(period); Objects.requireNonNull(name); - this.outputWire = new OutputWire<>(model, name); + this.outputWire = new StandardOutputWire<>(model, name); } /** diff --git a/platform-sdk/swirlds-common/src/main/java/com/swirlds/common/wiring/schedulers/SequentialThreadTaskScheduler.java b/platform-sdk/swirlds-common/src/main/java/com/swirlds/common/wiring/schedulers/SequentialThreadTaskScheduler.java index 0cb977c06aa5..6aaba252039d 100644 --- a/platform-sdk/swirlds-common/src/main/java/com/swirlds/common/wiring/schedulers/SequentialThreadTaskScheduler.java +++ b/platform-sdk/swirlds-common/src/main/java/com/swirlds/common/wiring/schedulers/SequentialThreadTaskScheduler.java @@ -56,6 +56,7 @@ public class SequentialThreadTaskScheduler extends TaskScheduler imple private static final int BUFFER_SIZE = 1024; private final AtomicBoolean alive = new AtomicBoolean(true); + private final Thread thread; /** diff --git a/platform-sdk/swirlds-common/src/main/java/com/swirlds/common/wiring/transformers/AdvancedTransformation.java b/platform-sdk/swirlds-common/src/main/java/com/swirlds/common/wiring/transformers/AdvancedTransformation.java new file mode 100644 index 000000000000..27c3d733956c --- /dev/null +++ b/platform-sdk/swirlds-common/src/main/java/com/swirlds/common/wiring/transformers/AdvancedTransformation.java @@ -0,0 +1,48 @@ +/* + * Copyright (C) 2023 Hedera Hashgraph, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.swirlds.common.wiring.transformers; + +import edu.umd.cs.findbugs.annotations.NonNull; +import edu.umd.cs.findbugs.annotations.Nullable; + +/** + * Executes a transformation for an advanced transformer as created by + * {@link com.swirlds.common.wiring.wires.output.OutputWire#buildAdvancedTransformer(String, AdvancedTransformation)}. + * + * @param the original wire output type + * @param the output type of the transformer + */ +public interface AdvancedTransformation { + + /** + * Given data that comes off of the original output wire, this method transforms it before it is passed to each + * input wire that is connected to this transformer. Called once per data element per listener. + * + * @param a a data element from the original output wire + * @return the transformed data element, or null if the data should not be forwarded + */ + @Nullable + B transform(@NonNull A a); + + /** + * Called on the original data element after it has been forwarded to all listeners. This method can do cleanup if + * necessary. Doing nothing is perfectly ok if the use case does not require cleanup. + * + * @param a the original data element + */ + void cleanup(@NonNull A a); +} diff --git a/platform-sdk/swirlds-common/src/main/java/com/swirlds/common/wiring/transformers/internal/AdvancedWireTransformer.java b/platform-sdk/swirlds-common/src/main/java/com/swirlds/common/wiring/transformers/internal/AdvancedWireTransformer.java new file mode 100644 index 000000000000..3086fdb0187f --- /dev/null +++ b/platform-sdk/swirlds-common/src/main/java/com/swirlds/common/wiring/transformers/internal/AdvancedWireTransformer.java @@ -0,0 +1,78 @@ +/* + * Copyright (C) 2023 Hedera Hashgraph, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.swirlds.common.wiring.transformers.internal; + +import com.swirlds.common.wiring.WiringModel; +import com.swirlds.common.wiring.builders.TaskSchedulerType; +import com.swirlds.common.wiring.wires.output.OutputWire; +import com.swirlds.common.wiring.wires.output.internal.ForwardingOutputWire; +import com.swirlds.common.wiring.wires.output.internal.TransformingOutputWire; +import edu.umd.cs.findbugs.annotations.NonNull; +import edu.umd.cs.findbugs.annotations.Nullable; +import java.util.function.Consumer; +import java.util.function.Function; + +/** + * Similar to a {@link WireTransformer} but for more advanced use cases. Unlike a {@link WireTransformer}, the + * transforming function is called once per output per data item, and a special method can be called after the data is + * forwarded to all destinations. + * + * @param the input type + * @param the output type + */ +public class AdvancedWireTransformer implements Consumer { + + private final ForwardingOutputWire outputWire; + + /** + * Constructor. + * + * @param model the wiring model containing this output wire + * @param name the name of the output wire + * @param transformer the function to transform the data from the input type to the output type. Is called once per + * output per data item. If this method returns null then the data is not forwarded. + * @param cleanup an optional method that is called after the data is forwarded to all destinations. The + * original data is passed to this method. Ignored if null. + */ + public AdvancedWireTransformer( + @NonNull final WiringModel model, + @NonNull final String name, + @NonNull final Function transformer, + @Nullable final Consumer cleanup) { + + model.registerVertex(name, TaskSchedulerType.DIRECT_STATELESS, true); + outputWire = new TransformingOutputWire<>(model, name, transformer, cleanup); + } + + /** + * {@inheritDoc} + */ + @Override + public void accept(@NonNull final A a) { + outputWire.forward(a); + } + + /** + * Get the output wire for this transformer. + * + * @return the output wire + */ + @NonNull + public OutputWire getOutputWire() { + return outputWire; + } +} diff --git a/platform-sdk/swirlds-common/src/main/java/com/swirlds/common/wiring/transformers/WireFilter.java b/platform-sdk/swirlds-common/src/main/java/com/swirlds/common/wiring/transformers/internal/WireFilter.java similarity index 88% rename from platform-sdk/swirlds-common/src/main/java/com/swirlds/common/wiring/transformers/WireFilter.java rename to platform-sdk/swirlds-common/src/main/java/com/swirlds/common/wiring/transformers/internal/WireFilter.java index 68ff4b6384ea..d2de689b9084 100644 --- a/platform-sdk/swirlds-common/src/main/java/com/swirlds/common/wiring/transformers/WireFilter.java +++ b/platform-sdk/swirlds-common/src/main/java/com/swirlds/common/wiring/transformers/internal/WireFilter.java @@ -14,11 +14,12 @@ * limitations under the License. */ -package com.swirlds.common.wiring.transformers; +package com.swirlds.common.wiring.transformers.internal; -import com.swirlds.common.wiring.OutputWire; import com.swirlds.common.wiring.WiringModel; import com.swirlds.common.wiring.builders.TaskSchedulerType; +import com.swirlds.common.wiring.wires.output.OutputWire; +import com.swirlds.common.wiring.wires.output.StandardOutputWire; import edu.umd.cs.findbugs.annotations.NonNull; import java.util.Objects; import java.util.function.Consumer; @@ -30,7 +31,7 @@ public class WireFilter implements Consumer { private final Predicate predicate; - private final OutputWire outputWire; + private final StandardOutputWire outputWire; /** * Constructor. @@ -44,7 +45,7 @@ public class WireFilter implements Consumer { public WireFilter( @NonNull final WiringModel model, @NonNull final String name, @NonNull final Predicate predicate) { this.predicate = Objects.requireNonNull(predicate); - this.outputWire = new OutputWire<>(model, name); + this.outputWire = new StandardOutputWire<>(model, name); model.registerVertex(name, TaskSchedulerType.DIRECT_STATELESS, true); } diff --git a/platform-sdk/swirlds-common/src/main/java/com/swirlds/common/wiring/transformers/WireListSplitter.java b/platform-sdk/swirlds-common/src/main/java/com/swirlds/common/wiring/transformers/internal/WireListSplitter.java similarity index 85% rename from platform-sdk/swirlds-common/src/main/java/com/swirlds/common/wiring/transformers/WireListSplitter.java rename to platform-sdk/swirlds-common/src/main/java/com/swirlds/common/wiring/transformers/internal/WireListSplitter.java index bb476a32f512..2672a0112353 100644 --- a/platform-sdk/swirlds-common/src/main/java/com/swirlds/common/wiring/transformers/WireListSplitter.java +++ b/platform-sdk/swirlds-common/src/main/java/com/swirlds/common/wiring/transformers/internal/WireListSplitter.java @@ -14,11 +14,12 @@ * limitations under the License. */ -package com.swirlds.common.wiring.transformers; +package com.swirlds.common.wiring.transformers.internal; -import com.swirlds.common.wiring.OutputWire; import com.swirlds.common.wiring.WiringModel; import com.swirlds.common.wiring.builders.TaskSchedulerType; +import com.swirlds.common.wiring.wires.output.OutputWire; +import com.swirlds.common.wiring.wires.output.StandardOutputWire; import edu.umd.cs.findbugs.annotations.NonNull; import java.util.List; import java.util.function.Consumer; @@ -29,7 +30,7 @@ */ public class WireListSplitter implements Consumer> { - private final OutputWire outputWire; + private final StandardOutputWire outputWire; /** * Constructor. @@ -39,7 +40,7 @@ public class WireListSplitter implements Consumer> { */ public WireListSplitter(@NonNull final WiringModel model, @NonNull final String name) { model.registerVertex(name, TaskSchedulerType.DIRECT_STATELESS, true); - outputWire = new OutputWire<>(model, name); + outputWire = new StandardOutputWire<>(model, name); } /** diff --git a/platform-sdk/swirlds-common/src/main/java/com/swirlds/common/wiring/transformers/WireTransformer.java b/platform-sdk/swirlds-common/src/main/java/com/swirlds/common/wiring/transformers/internal/WireTransformer.java similarity index 89% rename from platform-sdk/swirlds-common/src/main/java/com/swirlds/common/wiring/transformers/WireTransformer.java rename to platform-sdk/swirlds-common/src/main/java/com/swirlds/common/wiring/transformers/internal/WireTransformer.java index 14a608ab45e6..30f572c83a1d 100644 --- a/platform-sdk/swirlds-common/src/main/java/com/swirlds/common/wiring/transformers/WireTransformer.java +++ b/platform-sdk/swirlds-common/src/main/java/com/swirlds/common/wiring/transformers/internal/WireTransformer.java @@ -14,11 +14,12 @@ * limitations under the License. */ -package com.swirlds.common.wiring.transformers; +package com.swirlds.common.wiring.transformers.internal; -import com.swirlds.common.wiring.OutputWire; import com.swirlds.common.wiring.WiringModel; import com.swirlds.common.wiring.builders.TaskSchedulerType; +import com.swirlds.common.wiring.wires.output.OutputWire; +import com.swirlds.common.wiring.wires.output.StandardOutputWire; import edu.umd.cs.findbugs.annotations.NonNull; import java.util.Objects; import java.util.function.Consumer; @@ -33,7 +34,7 @@ public class WireTransformer implements Consumer { private final Function transformer; - private final OutputWire outputWire; + private final StandardOutputWire outputWire; /** * Constructor. @@ -49,7 +50,7 @@ public WireTransformer( @NonNull final WiringModel model, @NonNull final String name, @NonNull final Function transformer) { model.registerVertex(name, TaskSchedulerType.DIRECT_STATELESS, true); this.transformer = Objects.requireNonNull(transformer); - outputWire = new OutputWire<>(model, name); + outputWire = new StandardOutputWire<>(model, name); } /** diff --git a/platform-sdk/swirlds-common/src/main/java/com/swirlds/common/wiring/SolderType.java b/platform-sdk/swirlds-common/src/main/java/com/swirlds/common/wiring/wires/SolderType.java similarity index 77% rename from platform-sdk/swirlds-common/src/main/java/com/swirlds/common/wiring/SolderType.java rename to platform-sdk/swirlds-common/src/main/java/com/swirlds/common/wiring/wires/SolderType.java index 46c908945185..ece92634bf18 100644 --- a/platform-sdk/swirlds-common/src/main/java/com/swirlds/common/wiring/SolderType.java +++ b/platform-sdk/swirlds-common/src/main/java/com/swirlds/common/wiring/wires/SolderType.java @@ -14,10 +14,12 @@ * limitations under the License. */ -package com.swirlds.common.wiring; +package com.swirlds.common.wiring.wires; + +import com.swirlds.common.wiring.wires.input.InputWire; /** - * The type of solder connection. + * The type of solder connection between an output wire and an input wire. */ public enum SolderType { /** @@ -30,8 +32,8 @@ public enum SolderType { */ INJECT, /** - * When data is passed to the input wire, call {@link InputWire#offer(Object)}. If the input wire has - * backpressure enabled and the input wire is full, then the data will be dropped. + * When data is passed to the input wire, call {@link InputWire#offer(Object)}. If the input wire has backpressure + * enabled and the input wire is full, then the data will be dropped. */ OFFER } diff --git a/platform-sdk/swirlds-common/src/main/java/com/swirlds/common/wiring/InputWire.java b/platform-sdk/swirlds-common/src/main/java/com/swirlds/common/wiring/wires/input/InputWire.java similarity index 89% rename from platform-sdk/swirlds-common/src/main/java/com/swirlds/common/wiring/InputWire.java rename to platform-sdk/swirlds-common/src/main/java/com/swirlds/common/wiring/wires/input/InputWire.java index 528cbc8d3237..737595446ac7 100644 --- a/platform-sdk/swirlds-common/src/main/java/com/swirlds/common/wiring/InputWire.java +++ b/platform-sdk/swirlds-common/src/main/java/com/swirlds/common/wiring/wires/input/InputWire.java @@ -14,8 +14,9 @@ * limitations under the License. */ -package com.swirlds.common.wiring; +package com.swirlds.common.wiring.wires.input; +import com.swirlds.common.wiring.TaskScheduler; import edu.umd.cs.findbugs.annotations.NonNull; import java.util.Objects; import java.util.function.Consumer; @@ -29,9 +30,10 @@ */ public class InputWire { - private final TaskScheduler taskScheduler; + private final TaskSchedulerInput taskSchedulerInput; private Consumer handler; private final String name; + private final String taskSchedulerName; /** * Constructor. @@ -39,9 +41,10 @@ public class InputWire { * @param taskScheduler the scheduler to insert data into * @param name the name of the input wire */ - InputWire(@NonNull final TaskScheduler taskScheduler, @NonNull final String name) { - this.taskScheduler = Objects.requireNonNull(taskScheduler); + public InputWire(@NonNull final TaskScheduler taskScheduler, @NonNull final String name) { + this.taskSchedulerInput = Objects.requireNonNull(taskScheduler); this.name = Objects.requireNonNull(name); + this.taskSchedulerName = taskScheduler.getName(); } /** @@ -61,7 +64,7 @@ public String getName() { */ @NonNull public String getTaskSchedulerName() { - return taskScheduler.getName(); + return taskSchedulerName; } /** @@ -131,7 +134,7 @@ public InputWire bind(@NonNull final Function handler) { this.handler = i -> { final OUT output = handler.apply((IN) i); if (output != null) { - taskScheduler.forward(output); + taskSchedulerInput.forward(output); } }; @@ -144,7 +147,7 @@ public InputWire bind(@NonNull final Function handler) { * @param data the data to be processed by the task scheduler */ public void put(@NonNull final IN data) { - taskScheduler.put(handler, data); + taskSchedulerInput.put(handler, data); } /** @@ -155,7 +158,7 @@ public void put(@NonNull final IN data) { * @return true if the data was accepted, false otherwise */ public boolean offer(@NonNull final IN data) { - return taskScheduler.offer(handler, data); + return taskSchedulerInput.offer(handler, data); } /** @@ -166,6 +169,6 @@ public boolean offer(@NonNull final IN data) { * @param data the data to be processed by the task scheduler */ public void inject(@NonNull final IN data) { - taskScheduler.inject(handler, data); + taskSchedulerInput.inject(handler, data); } } diff --git a/platform-sdk/swirlds-common/src/main/java/com/swirlds/common/wiring/wires/input/TaskSchedulerInput.java b/platform-sdk/swirlds-common/src/main/java/com/swirlds/common/wiring/wires/input/TaskSchedulerInput.java new file mode 100644 index 000000000000..6e967df899d4 --- /dev/null +++ b/platform-sdk/swirlds-common/src/main/java/com/swirlds/common/wiring/wires/input/TaskSchedulerInput.java @@ -0,0 +1,65 @@ +/* + * Copyright (C) 2023 Hedera Hashgraph, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.swirlds.common.wiring.wires.input; + +import com.swirlds.common.wiring.TaskScheduler; +import edu.umd.cs.findbugs.annotations.NonNull; +import java.util.function.Consumer; + +/** + * An object that knows how to add data to a {@link TaskScheduler} for processing, and how to forward data to a task + * scheduler's output. This class is defined inside the input wire package to prevent anything that isn't an input wire + * from accessing its methods. + */ +public abstract class TaskSchedulerInput { + + /** + * Add a task to the scheduler. May block if back pressure is enabled. + * + * @param handler handles the provided data + * @param data the data to be processed by the task scheduler + */ + protected abstract void put(@NonNull Consumer handler, @NonNull Object data); + + /** + * Add a task to the scheduler. If backpressure is enabled and there is not immediately capacity available, this + * method will not accept the data. + * + * @param handler handles the provided data + * @param data the data to be processed by the scheduler + * @return true if the data was accepted, false otherwise + */ + protected abstract boolean offer(@NonNull Consumer handler, @NonNull Object data); + + /** + * Inject data into the scheduler, doing so even if it causes the number of unprocessed tasks to exceed the capacity + * specified by configured back pressure. If backpressure is disabled, this operation is logically equivalent to + * {@link #put(Consumer, Object)}. + * + * @param handler handles the provided data + * @param data the data to be processed by the scheduler + */ + protected abstract void inject(@NonNull Consumer handler, @NonNull Object data); + + /** + * Pass data to this scheduler's primary output wire. + *

+ * This method is implemented here to allow classes in this package to call forward(), which otherwise would not be + * visible. + */ + protected abstract void forward(@NonNull final OUT data); +} diff --git a/platform-sdk/swirlds-common/src/main/java/com/swirlds/common/wiring/OutputWire.java b/platform-sdk/swirlds-common/src/main/java/com/swirlds/common/wiring/wires/output/OutputWire.java similarity index 58% rename from platform-sdk/swirlds-common/src/main/java/com/swirlds/common/wiring/OutputWire.java rename to platform-sdk/swirlds-common/src/main/java/com/swirlds/common/wiring/wires/output/OutputWire.java index 69cb8458524b..6af6240d7a8a 100644 --- a/platform-sdk/swirlds-common/src/main/java/com/swirlds/common/wiring/OutputWire.java +++ b/platform-sdk/swirlds-common/src/main/java/com/swirlds/common/wiring/wires/output/OutputWire.java @@ -14,35 +14,32 @@ * limitations under the License. */ -package com.swirlds.common.wiring; - -import static com.swirlds.logging.legacy.LogMarker.EXCEPTION; - -import com.swirlds.common.wiring.transformers.WireFilter; -import com.swirlds.common.wiring.transformers.WireListSplitter; -import com.swirlds.common.wiring.transformers.WireTransformer; +package com.swirlds.common.wiring.wires.output; + +import com.swirlds.common.wiring.WiringModel; +import com.swirlds.common.wiring.transformers.AdvancedTransformation; +import com.swirlds.common.wiring.transformers.internal.AdvancedWireTransformer; +import com.swirlds.common.wiring.transformers.internal.WireFilter; +import com.swirlds.common.wiring.transformers.internal.WireListSplitter; +import com.swirlds.common.wiring.transformers.internal.WireTransformer; +import com.swirlds.common.wiring.wires.SolderType; +import com.swirlds.common.wiring.wires.input.InputWire; import edu.umd.cs.findbugs.annotations.NonNull; -import java.util.ArrayList; -import java.util.List; +import edu.umd.cs.findbugs.annotations.Nullable; import java.util.Objects; import java.util.function.Consumer; import java.util.function.Function; import java.util.function.Predicate; -import org.apache.logging.log4j.LogManager; -import org.apache.logging.log4j.Logger; /** * Describes the output of a task scheduler. Can be soldered to wire inputs or lambdas. * * @param the output type of the object */ -public final class OutputWire { - - private static final Logger logger = LogManager.getLogger(OutputWire.class); +public abstract class OutputWire { private final WiringModel model; private final String name; - private final List> forwardingDestinations = new ArrayList<>(); /** * Constructor. @@ -51,7 +48,6 @@ public final class OutputWire { * @param name the name of the output wire */ public OutputWire(@NonNull final WiringModel model, @NonNull final String name) { - this.model = Objects.requireNonNull(model); this.name = Objects.requireNonNull(name); } @@ -67,29 +63,6 @@ public String getName() { return name; } - /** - * Forward output data to any wires/consumers that are listening for it. - *

- * Although it will technically work, it is a violation of convention to directly put data into this output wire - * except from within code being executed by the task scheduler that owns this output wire. Don't do it. - * - * @param data the output data to forward - */ - public void forward(@NonNull final OUT data) { - for (final Consumer destination : forwardingDestinations) { - try { - destination.accept(data); - } catch (final Exception e) { - logger.error( - EXCEPTION.getMarker(), - "Exception thrown on output wire {} while forwarding data {}", - name, - data, - e); - } - } - } - /** * Specify an input wire where output data should be passed. This forwarding operation respects back pressure. * Equivalent to calling {@link #solderTo(InputWire, SolderType)} with {@link SolderType#PUT}. @@ -126,9 +99,9 @@ public void solderTo(@NonNull final InputWire inputWire, @NonNull final model.registerEdge(name, inputWire.getTaskSchedulerName(), inputWire.getName(), solderType); switch (solderType) { - case PUT -> forwardingDestinations.add(inputWire::put); - case INJECT -> forwardingDestinations.add(inputWire::inject); - case OFFER -> forwardingDestinations.add(inputWire::offer); + case PUT -> addForwardingDestination(inputWire::put); + case INJECT -> addForwardingDestination(inputWire::inject); + case OFFER -> addForwardingDestination(inputWire::offer); default -> throw new IllegalArgumentException("Unknown solder type: " + solderType); } } @@ -149,7 +122,7 @@ public void solderTo(@NonNull final InputWire inputWire, @NonNull final */ public void solderTo(@NonNull final String handlerName, @NonNull final Consumer handler) { model.registerEdge(name, handlerName, "", SolderType.PUT); - forwardingDestinations.add(Objects.requireNonNull(handler)); + addForwardingDestination(Objects.requireNonNull(handler)); } /** @@ -175,47 +148,91 @@ public OutputWire buildFilter(@NonNull final String name, @NonNull final Pr * comes out of the wire will be inserted into the splitter). The output wire of the splitter is returned by this * method. * - * @param the type of the list elements + * @param the type of the list elements * @return output wire of the splitter */ @SuppressWarnings("unchecked") @NonNull - public OutputWire buildSplitter() { + public OutputWire buildSplitter() { final String splitterName = name + "_splitter"; - final WireListSplitter splitter = new WireListSplitter<>(model, splitterName); + final WireListSplitter splitter = new WireListSplitter<>(model, splitterName); solderTo(splitterName, (Consumer) splitter); return splitter.getOutputWire(); } /** - * Build a {@link WireListSplitter} that is soldered to the output of this wire. Creating a splitter for wires - * without a list output type will cause runtime exceptions. The input wire to the splitter is automatically - * soldered to this output wire (i.e. all data that comes out of the wire will be inserted into the splitter). The - * output wire of the splitter is returned by this method. + * Build a {@link WireTransformer}. The input wire to the transformer is automatically soldered to this output wire + * (i.e. all data that comes out of the wire will be inserted into the transformer). The output wire of the + * transformer is returned by this method. * - * @param clazz the class of the list elements, convince parameter for hinting generic type to the compiler - * @param the type of the list elements + * @param name the name of the transformer + * @param transformer the function that transforms the output of this wire into the output of the transformer. + * Called once per data item. Null data returned by this method his not forwarded. + * @param the output type of the transformer + * @return the output wire of the transformer */ @NonNull - public OutputWire buildSplitter(@NonNull final Class clazz) { - return buildSplitter(); + public OutputWire buildTransformer( + @NonNull final String name, @NonNull final Function transformer) { + final WireTransformer wireTransformer = + new WireTransformer<>(model, Objects.requireNonNull(name), Objects.requireNonNull(transformer)); + solderTo(name, wireTransformer); + return wireTransformer.getOutputWire(); } /** - * Build a {@link WireTransformer}. The input wire to the transformer is automatically soldered to this output wire - * (i.e. all data that comes out of the wire will be inserted into the transformer). The output wire of the - * transformer is returned by this method. + * Build a {@link AdvancedWireTransformer}. The input wire to the transformer is automatically soldered to this + * output wire (i.e. all data that comes out of the wire will be inserted into the transformer). The output wire of + * the transformer is returned by this method. Similar to {@link #buildTransformer(String, Function)}, but instead + * of the transformer method being called once per data item, it is called once per output per data item. * * @param name the name of the transformer - * @param transform the function that transforms the output of this wire into the output of the transformer - * @param the output type of the transformer + * @param transform the function that transforms the output of this wire into the output of the transformer, called + * once per output per data item. Null data returned by this method his not forwarded. + * @param cleanup an optional method that is called after the data is forwarded to all destinations. The original + * data is passed to this method. Ignored if null. + * @param the output type of the transformer * @return the output wire of the transformer */ @NonNull - public OutputWire buildTransformer(@NonNull final String name, @NonNull final Function transform) { - final WireTransformer transformer = - new WireTransformer<>(model, Objects.requireNonNull(name), Objects.requireNonNull(transform)); - solderTo(name, transformer); - return transformer.getOutputWire(); + public OutputWire buildAdvancedTransformer( + @NonNull final String name, + @NonNull final Function transform, + @Nullable final Consumer cleanup) { + + final AdvancedWireTransformer wireTransformer = + new AdvancedWireTransformer<>(model, Objects.requireNonNull(name), transform, cleanup); + + solderTo(name, wireTransformer); + + return wireTransformer.getOutputWire(); } + + /** + * Build a {@link AdvancedWireTransformer}. The input wire to the transformer is automatically soldered to this + * output wire (i.e. all data that comes out of the wire will be inserted into the transformer). The output wire of + * the transformer is returned by this method. Similar to {@link #buildTransformer(String, Function)}, but instead + * of the transformer method being called once per data item, it is called once per output per data item. + * + *

+ * This method is very similar to {@link #buildAdvancedTransformer(String, Function, Consumer)}, but with a + * different way of describing the transformation. + * + * @param name the name of the transformer + * @param transformer an object that manages the transformation + * @param the output type of the transformer + * @return the output wire of the transformer + */ + @NonNull + public OutputWire buildAdvancedTransformer( + @NonNull final String name, @NonNull final AdvancedTransformation transformer) { + return buildAdvancedTransformer(name, transformer::transform, transformer::cleanup); + } + + /** + * Creates a new forwarding destination. + * + * @param destination the destination to forward data to + */ + protected abstract void addForwardingDestination(@NonNull final Consumer destination); } diff --git a/platform-sdk/swirlds-common/src/main/java/com/swirlds/common/wiring/wires/output/StandardOutputWire.java b/platform-sdk/swirlds-common/src/main/java/com/swirlds/common/wiring/wires/output/StandardOutputWire.java new file mode 100644 index 000000000000..7973e98e8d62 --- /dev/null +++ b/platform-sdk/swirlds-common/src/main/java/com/swirlds/common/wiring/wires/output/StandardOutputWire.java @@ -0,0 +1,79 @@ +/* + * Copyright (C) 2023 Hedera Hashgraph, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.swirlds.common.wiring.wires.output; + +import static com.swirlds.logging.legacy.LogMarker.EXCEPTION; + +import com.swirlds.common.wiring.WiringModel; +import com.swirlds.common.wiring.wires.output.internal.ForwardingOutputWire; +import edu.umd.cs.findbugs.annotations.NonNull; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.function.Consumer; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +/** + * An output wire that will take data and forward it to its outputs. Output type is the same as the input type. + * + * @param the type of data passed to the forwarding method + */ +public class StandardOutputWire extends ForwardingOutputWire { + + private static final Logger logger = LogManager.getLogger(StandardOutputWire.class); + + private final List> forwardingDestinations = new ArrayList<>(); + + /** + * Constructor. + * + * @param model the wiring model containing this output wire + * @param name the name of the output wire + */ + public StandardOutputWire(@NonNull final WiringModel model, @NonNull final String name) { + super(model, name); + } + + /** + * {@inheritDoc} + */ + @Override + protected void addForwardingDestination(@NonNull final Consumer destination) { + Objects.requireNonNull(destination); + forwardingDestinations.add(destination); + } + + /** + * {@inheritDoc} + */ + @Override + public void forward(@NonNull final OUT data) { + for (final Consumer destination : forwardingDestinations) { + try { + destination.accept(data); + } catch (final Exception e) { + logger.error( + EXCEPTION.getMarker(), + "Exception thrown on output wire {} while forwarding data {}", + getName(), + data, + e); + } + } + } +} diff --git a/platform-sdk/swirlds-common/src/main/java/com/swirlds/common/wiring/wires/output/internal/ForwardingOutputWire.java b/platform-sdk/swirlds-common/src/main/java/com/swirlds/common/wiring/wires/output/internal/ForwardingOutputWire.java new file mode 100644 index 000000000000..9c158ec187ed --- /dev/null +++ b/platform-sdk/swirlds-common/src/main/java/com/swirlds/common/wiring/wires/output/internal/ForwardingOutputWire.java @@ -0,0 +1,50 @@ +/* + * Copyright (C) 2023 Hedera Hashgraph, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.swirlds.common.wiring.wires.output.internal; + +import com.swirlds.common.wiring.WiringModel; +import com.swirlds.common.wiring.wires.output.OutputWire; +import edu.umd.cs.findbugs.annotations.NonNull; + +/** + * An output wire that will take data and forward it to its outputs. + * + * @param the type of data passed to the forwarding method + * @param the type of data forwarded to things soldered to this wire + */ +public abstract class ForwardingOutputWire extends OutputWire { + + /** + * Constructor. + * + * @param model the wiring model containing this output wire + * @param name the name of the output wire + */ + protected ForwardingOutputWire(@NonNull final WiringModel model, final @NonNull String name) { + super(model, name); + } + + /** + * Forward output data to any wires/consumers that are listening for it. + *

+ * Although it will technically work, it is a violation of convention to directly put data into this output wire + * except from within code being executed by the task scheduler that owns this output wire. Don't do it. + * + * @param data the output data to forward + */ + public abstract void forward(@NonNull final IN data); +} diff --git a/platform-sdk/swirlds-common/src/main/java/com/swirlds/common/wiring/wires/output/internal/TransformingOutputWire.java b/platform-sdk/swirlds-common/src/main/java/com/swirlds/common/wiring/wires/output/internal/TransformingOutputWire.java new file mode 100644 index 000000000000..15c27f19a55a --- /dev/null +++ b/platform-sdk/swirlds-common/src/main/java/com/swirlds/common/wiring/wires/output/internal/TransformingOutputWire.java @@ -0,0 +1,102 @@ +/* + * Copyright (C) 2023 Hedera Hashgraph, LLC + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package com.swirlds.common.wiring.wires.output.internal; + +import static com.swirlds.logging.legacy.LogMarker.EXCEPTION; + +import com.swirlds.common.wiring.WiringModel; +import com.swirlds.common.wiring.wires.output.OutputWire; +import edu.umd.cs.findbugs.annotations.NonNull; +import edu.umd.cs.findbugs.annotations.Nullable; +import java.util.ArrayList; +import java.util.List; +import java.util.Objects; +import java.util.function.Consumer; +import java.util.function.Function; +import org.apache.logging.log4j.LogManager; +import org.apache.logging.log4j.Logger; + +/** + * An output wire that transforms data that flows across it. For advanced use cases where + * {@link OutputWire#buildTransformer(String, Function)} semantics are insufficient. + * + * @param the type of data passed to the forwarding method + * @param the type of data forwarded to things soldered to this wire + */ +public class TransformingOutputWire extends ForwardingOutputWire { + + private static final Logger logger = LogManager.getLogger(TransformingOutputWire.class); + private final List> forwardingDestinations = new ArrayList<>(); + + private final Function transform; + private final Consumer cleanup; + + /** + * Constructor. + * + * @param model the wiring model containing this output wire + * @param name the name of the output wire + * @param transformer the function to transform the data from the input type to the output type. Is called once per + * output per data item. If this method returns null then the data is not forwarded. + * @param cleanup an optional method that is called after the data is forwarded to all destinations. The + * original data is passed to this method. Ignored if null. + */ + public TransformingOutputWire( + @NonNull final WiringModel model, + @NonNull final String name, + @NonNull final Function transformer, + @Nullable final Consumer cleanup) { + super(model, name); + + this.transform = Objects.requireNonNull(transformer); + this.cleanup = cleanup == null ? (data) -> {} : cleanup; + } + + /** + * {@inheritDoc} + */ + @Override + protected void addForwardingDestination(@NonNull final Consumer destination) { + Objects.requireNonNull(destination); + forwardingDestinations.add(destination); + } + + /** + * {@inheritDoc} + */ + @Override + public void forward(@NonNull final IN data) { + for (final Consumer destination : forwardingDestinations) { + try { + final OUT transformed = transform.apply(data); + if (transformed == null) { + // Do not forward null values. + return; + } + destination.accept(transformed); + } catch (final Exception e) { + logger.error( + EXCEPTION.getMarker(), + "Exception thrown on output wire {} while forwarding data {}", + getName(), + data, + e); + } + } + cleanup.accept(data); + } +} diff --git a/platform-sdk/swirlds-common/src/main/java/module-info.java b/platform-sdk/swirlds-common/src/main/java/module-info.java index cf80db810b0d..1478c5fd344f 100644 --- a/platform-sdk/swirlds-common/src/main/java/module-info.java +++ b/platform-sdk/swirlds-common/src/main/java/module-info.java @@ -80,7 +80,11 @@ exports com.swirlds.common.wiring; exports com.swirlds.common.wiring.builders; exports com.swirlds.common.wiring.counters; + exports com.swirlds.common.wiring.transformers; exports com.swirlds.common.wiring.utility; + exports com.swirlds.common.wiring.wires; + exports com.swirlds.common.wiring.wires.input; + exports com.swirlds.common.wiring.wires.output; /* Targeted exports */ exports com.swirlds.common.internal to diff --git a/platform-sdk/swirlds-common/src/test/java/com/swirlds/common/wiring/benchmark/WiringBenchmark.java b/platform-sdk/swirlds-common/src/test/java/com/swirlds/common/wiring/benchmark/WiringBenchmark.java index 279ea8792bbd..514a4684bb13 100644 --- a/platform-sdk/swirlds-common/src/test/java/com/swirlds/common/wiring/benchmark/WiringBenchmark.java +++ b/platform-sdk/swirlds-common/src/test/java/com/swirlds/common/wiring/benchmark/WiringBenchmark.java @@ -22,12 +22,12 @@ import com.swirlds.base.time.Time; import com.swirlds.common.context.PlatformContext; -import com.swirlds.common.wiring.InputWire; import com.swirlds.common.wiring.TaskScheduler; import com.swirlds.common.wiring.WiringModel; import com.swirlds.common.wiring.builders.TaskSchedulerType; import com.swirlds.common.wiring.counters.BackpressureObjectCounter; import com.swirlds.common.wiring.counters.ObjectCounter; +import com.swirlds.common.wiring.wires.input.InputWire; import com.swirlds.test.framework.context.TestPlatformContextBuilder; import java.time.Duration; import java.util.concurrent.ForkJoinPool; diff --git a/platform-sdk/swirlds-common/src/test/java/com/swirlds/common/wiring/model/ModelTests.java b/platform-sdk/swirlds-common/src/test/java/com/swirlds/common/wiring/model/ModelTests.java index 2aa718c201a3..cf42179cecbd 100644 --- a/platform-sdk/swirlds-common/src/test/java/com/swirlds/common/wiring/model/ModelTests.java +++ b/platform-sdk/swirlds-common/src/test/java/com/swirlds/common/wiring/model/ModelTests.java @@ -19,12 +19,12 @@ import static org.junit.jupiter.api.Assertions.assertEquals; import com.swirlds.base.time.Time; -import com.swirlds.common.wiring.InputWire; -import com.swirlds.common.wiring.OutputWire; -import com.swirlds.common.wiring.SolderType; import com.swirlds.common.wiring.TaskScheduler; import com.swirlds.common.wiring.WiringModel; import com.swirlds.common.wiring.utility.ModelGroup; +import com.swirlds.common.wiring.wires.SolderType; +import com.swirlds.common.wiring.wires.input.InputWire; +import com.swirlds.common.wiring.wires.output.OutputWire; import com.swirlds.test.framework.context.TestPlatformContextBuilder; import edu.umd.cs.findbugs.annotations.NonNull; import java.time.Instant; diff --git a/platform-sdk/swirlds-common/src/test/java/com/swirlds/common/wiring/schedulers/ConcurrentTaskSchedulerTests.java b/platform-sdk/swirlds-common/src/test/java/com/swirlds/common/wiring/schedulers/ConcurrentTaskSchedulerTests.java index 31063b985c97..6022e202f865 100644 --- a/platform-sdk/swirlds-common/src/test/java/com/swirlds/common/wiring/schedulers/ConcurrentTaskSchedulerTests.java +++ b/platform-sdk/swirlds-common/src/test/java/com/swirlds/common/wiring/schedulers/ConcurrentTaskSchedulerTests.java @@ -22,10 +22,10 @@ import static java.util.concurrent.TimeUnit.MICROSECONDS; import static org.junit.jupiter.api.Assertions.assertEquals; -import com.swirlds.common.wiring.InputWire; import com.swirlds.common.wiring.TaskScheduler; import com.swirlds.common.wiring.WiringModel; import com.swirlds.common.wiring.builders.TaskSchedulerType; +import com.swirlds.common.wiring.wires.input.InputWire; import com.swirlds.test.framework.TestWiringModelBuilder; import edu.umd.cs.findbugs.annotations.Nullable; import java.time.Duration; diff --git a/platform-sdk/swirlds-common/src/test/java/com/swirlds/common/wiring/schedulers/DirectTaskSchedulerTests.java b/platform-sdk/swirlds-common/src/test/java/com/swirlds/common/wiring/schedulers/DirectTaskSchedulerTests.java index 52fa76ffd2c3..3a8fd1d492f0 100644 --- a/platform-sdk/swirlds-common/src/test/java/com/swirlds/common/wiring/schedulers/DirectTaskSchedulerTests.java +++ b/platform-sdk/swirlds-common/src/test/java/com/swirlds/common/wiring/schedulers/DirectTaskSchedulerTests.java @@ -20,13 +20,13 @@ import static com.swirlds.common.utility.NonCryptographicHashing.hash32; import static org.junit.jupiter.api.Assertions.assertEquals; -import com.swirlds.common.wiring.InputWire; -import com.swirlds.common.wiring.OutputWire; -import com.swirlds.common.wiring.SolderType; import com.swirlds.common.wiring.TaskScheduler; import com.swirlds.common.wiring.WiringModel; import com.swirlds.common.wiring.builders.TaskSchedulerType; import com.swirlds.common.wiring.counters.StandardObjectCounter; +import com.swirlds.common.wiring.wires.SolderType; +import com.swirlds.common.wiring.wires.input.InputWire; +import com.swirlds.common.wiring.wires.output.OutputWire; import com.swirlds.test.framework.TestWiringModelBuilder; import java.lang.Thread.UncaughtExceptionHandler; import java.time.Duration; diff --git a/platform-sdk/swirlds-common/src/test/java/com/swirlds/common/wiring/schedulers/HeartbeatSchedulerTests.java b/platform-sdk/swirlds-common/src/test/java/com/swirlds/common/wiring/schedulers/HeartbeatSchedulerTests.java index 181046ee3d2b..7a0154ea9090 100644 --- a/platform-sdk/swirlds-common/src/test/java/com/swirlds/common/wiring/schedulers/HeartbeatSchedulerTests.java +++ b/platform-sdk/swirlds-common/src/test/java/com/swirlds/common/wiring/schedulers/HeartbeatSchedulerTests.java @@ -22,8 +22,8 @@ import com.swirlds.base.test.fixtures.time.FakeTime; import com.swirlds.common.context.PlatformContext; -import com.swirlds.common.wiring.OutputWire; import com.swirlds.common.wiring.WiringModel; +import com.swirlds.common.wiring.wires.output.OutputWire; import com.swirlds.test.framework.context.TestPlatformContextBuilder; import java.time.Duration; import java.time.Instant; diff --git a/platform-sdk/swirlds-common/src/test/java/com/swirlds/common/wiring/schedulers/SequentialTaskSchedulerTests.java b/platform-sdk/swirlds-common/src/test/java/com/swirlds/common/wiring/schedulers/SequentialTaskSchedulerTests.java index ba6f3f707174..986fc335f8e2 100644 --- a/platform-sdk/swirlds-common/src/test/java/com/swirlds/common/wiring/schedulers/SequentialTaskSchedulerTests.java +++ b/platform-sdk/swirlds-common/src/test/java/com/swirlds/common/wiring/schedulers/SequentialTaskSchedulerTests.java @@ -30,14 +30,14 @@ import static org.junit.jupiter.api.Assertions.assertTrue; import com.swirlds.common.threading.framework.config.ThreadConfiguration; -import com.swirlds.common.wiring.InputWire; -import com.swirlds.common.wiring.OutputWire; -import com.swirlds.common.wiring.SolderType; import com.swirlds.common.wiring.TaskScheduler; import com.swirlds.common.wiring.WiringModel; import com.swirlds.common.wiring.builders.TaskSchedulerType; import com.swirlds.common.wiring.counters.BackpressureObjectCounter; import com.swirlds.common.wiring.counters.ObjectCounter; +import com.swirlds.common.wiring.wires.SolderType; +import com.swirlds.common.wiring.wires.input.InputWire; +import com.swirlds.common.wiring.wires.output.StandardOutputWire; import com.swirlds.test.framework.TestWiringModelBuilder; import java.time.Duration; import java.util.HashSet; @@ -1959,8 +1959,8 @@ void multipleOutputChannelsTest(final String typeString) { final TaskScheduler taskSchedulerA = model.schedulerBuilder("A").withType(type).build().cast(); final InputWire aIn = taskSchedulerA.buildInputWire("aIn"); - final OutputWire aOutBoolean = taskSchedulerA.buildSecondaryOutputWire(); - final OutputWire aOutString = taskSchedulerA.buildSecondaryOutputWire(); + final StandardOutputWire aOutBoolean = taskSchedulerA.buildSecondaryOutputWire(); + final StandardOutputWire aOutString = taskSchedulerA.buildSecondaryOutputWire(); final TaskScheduler taskSchedulerB = model.schedulerBuilder("B").withType(type).build().cast(); diff --git a/platform-sdk/swirlds-common/src/test/java/com/swirlds/common/wiring/transformers/TaskSchedulerTransformersTests.java b/platform-sdk/swirlds-common/src/test/java/com/swirlds/common/wiring/transformers/TaskSchedulerTransformersTests.java index 729236615f41..9e4178b4e999 100644 --- a/platform-sdk/swirlds-common/src/test/java/com/swirlds/common/wiring/transformers/TaskSchedulerTransformersTests.java +++ b/platform-sdk/swirlds-common/src/test/java/com/swirlds/common/wiring/transformers/TaskSchedulerTransformersTests.java @@ -17,15 +17,23 @@ package com.swirlds.common.wiring.transformers; import static com.swirlds.common.test.fixtures.AssertionUtils.assertEventuallyEquals; +import static com.swirlds.common.test.fixtures.AssertionUtils.assertEventuallyTrue; import static com.swirlds.common.utility.NonCryptographicHashing.hash32; +import static org.junit.jupiter.api.Assertions.assertFalse; +import static org.junit.jupiter.api.Assertions.assertTrue; -import com.swirlds.common.wiring.InputWire; -import com.swirlds.common.wiring.OutputWire; import com.swirlds.common.wiring.TaskScheduler; import com.swirlds.common.wiring.WiringModel; +import com.swirlds.common.wiring.wires.input.InputWire; +import com.swirlds.common.wiring.wires.output.OutputWire; import com.swirlds.test.framework.TestWiringModelBuilder; +import edu.umd.cs.findbugs.annotations.NonNull; +import edu.umd.cs.findbugs.annotations.Nullable; +import java.lang.Thread.UncaughtExceptionHandler; import java.time.Duration; +import java.util.ArrayList; import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; import java.util.concurrent.atomic.AtomicInteger; import org.junit.jupiter.api.Test; @@ -238,4 +246,345 @@ void wireTransformerTest() { assertEventuallyEquals(expectedCountC, countC::get, Duration.ofSeconds(1), "C did not receive all data"); assertEventuallyEquals(expectedCountD, countD::get, Duration.ofSeconds(1), "D did not receive all data"); } + + /** + * This test performs the same actions as the {@link #wireTransformerTest()} test, but it uses the advanced + * transformer implementation. It should be possible to perform these tasks with both implementations. + */ + @Test + void advancedWireTransformerSimpleTaskTest() { + final WiringModel model = TestWiringModelBuilder.create(); + + // A produces data of type TestData. + // B wants all of A's data, C wants the integer values, and D wants the boolean values. + + final TaskScheduler taskSchedulerA = + model.schedulerBuilder("A").build().cast(); + final InputWire inA = taskSchedulerA.buildInputWire("A in"); + + final TaskScheduler taskSchedulerB = + model.schedulerBuilder("B").build().cast(); + final InputWire inB = taskSchedulerB.buildInputWire("B in"); + + final TaskScheduler taskSchedulerC = + model.schedulerBuilder("C").build().cast(); + final InputWire inC = taskSchedulerC.buildInputWire("C in"); + + final TaskScheduler taskSchedulerD = + model.schedulerBuilder("D").build().cast(); + final InputWire inD = taskSchedulerD.buildInputWire("D in"); + + taskSchedulerA.getOutputWire().solderTo(inB); + taskSchedulerA + .getOutputWire() + .buildAdvancedTransformer("getValue", TestData::value, null) + .solderTo(inC); + taskSchedulerA + .getOutputWire() + .buildAdvancedTransformer("getInvert", TestData::invert, null) + .solderTo(inD); + + final AtomicInteger countA = new AtomicInteger(0); + inA.bind(x -> { + final int invert = x.invert() ? -1 : 1; + countA.set(hash32(countA.get(), x.value() * invert)); + return x; + }); + + final AtomicInteger countB = new AtomicInteger(0); + inB.bind(x -> { + final int invert = x.invert() ? -1 : 1; + countB.set(hash32(countB.get(), x.value() * invert)); + }); + + final AtomicInteger countC = new AtomicInteger(0); + inC.bind(x -> { + countC.set(hash32(countC.get(), x)); + }); + + final AtomicInteger countD = new AtomicInteger(0); + inD.bind(x -> { + countD.set(hash32(countD.get(), x ? 1 : 0)); + }); + + int expectedCountAB = 0; + int expectedCountC = 0; + int expectedCountD = 0; + + for (int i = 0; i < 100; i++) { + final boolean invert = i % 3 == 0; + inA.put(new TestData(i, invert)); + + expectedCountAB = hash32(expectedCountAB, i * (invert ? -1 : 1)); + expectedCountC = hash32(expectedCountC, i); + expectedCountD = hash32(expectedCountD, invert ? 1 : 0); + } + + assertEventuallyEquals(expectedCountAB, countA::get, Duration.ofSeconds(1), "A did not receive all data"); + assertEventuallyEquals(expectedCountAB, countB::get, Duration.ofSeconds(1), "B did not receive all data"); + assertEventuallyEquals(expectedCountC, countC::get, Duration.ofSeconds(1), "C did not receive all data"); + assertEventuallyEquals(expectedCountD, countD::get, Duration.ofSeconds(1), "D did not receive all data"); + } + + /** + * A test object with vaguely similar semantics to a reserved signed state. + */ + private static class FooBar { + private final AtomicInteger referenceCount; + + public FooBar() { + referenceCount = new AtomicInteger(1); + } + + private FooBar(@NonNull final FooBar that) { + this.referenceCount = that.referenceCount; + } + + /** + * Make a copy and increase the reference count. + * + * @return a copy of this object + */ + @NonNull + public FooBar copyAndReserve() { + final int previousCount = referenceCount.getAndIncrement(); + if (previousCount == 0) { + throw new IllegalStateException("Cannot reserve a copy once it has been fully released"); + } + + return new FooBar(this); + } + + /** + * Release this copy. + */ + public void release() { + final int count = referenceCount.decrementAndGet(); + if (count < 0) { + throw new IllegalStateException("Cannot release a copy more times than it was reserved"); + } + } + + /** + * Get the reference count. + * + * @return the reference count + */ + public int getReferenceCount() { + return referenceCount.get(); + } + } + + /** + * Test a wiring setup that vaguely resembles the way states are reserved and passed around. How to pass around + * state reservations was the original use case for advanced wire transformers. + */ + @Test + void advancedWireTransformerTest() { + // Component A passes data to components B, C, and D. + final WiringModel model = TestWiringModelBuilder.create(); + + final AtomicBoolean error = new AtomicBoolean(false); + final UncaughtExceptionHandler exceptionHandler = (t, e) -> error.set(true); + + final TaskScheduler taskSchedulerA = model.schedulerBuilder("A") + .withUncaughtExceptionHandler(exceptionHandler) + .build() + .cast(); + final InputWire inA = taskSchedulerA.buildInputWire("A in"); + final OutputWire outA = taskSchedulerA.getOutputWire(); + final OutputWire outAReserved = + outA.buildAdvancedTransformer("reserve FooBar", FooBar::copyAndReserve, FooBar::release); + + final TaskScheduler taskSchedulerB = model.schedulerBuilder("B") + .withUncaughtExceptionHandler(exceptionHandler) + .build() + .cast(); + final InputWire inB = taskSchedulerB.buildInputWire("B in"); + + final TaskScheduler taskSchedulerC = model.schedulerBuilder("C") + .withUncaughtExceptionHandler(exceptionHandler) + .build() + .cast(); + final InputWire inC = taskSchedulerC.buildInputWire("C in"); + + final TaskScheduler taskSchedulerD = model.schedulerBuilder("D") + .withUncaughtExceptionHandler(exceptionHandler) + .build() + .cast(); + final InputWire inD = taskSchedulerD.buildInputWire("D in"); + + outAReserved.solderTo(inB); + outAReserved.solderTo(inC); + outAReserved.solderTo(inD); + + final AtomicInteger countA = new AtomicInteger(); + inA.bind(x -> { + assertTrue(x.getReferenceCount() > 0); + countA.getAndIncrement(); + return x; + }); + + final AtomicInteger countB = new AtomicInteger(); + inB.bind(x -> { + assertTrue(x.getReferenceCount() > 0); + countB.getAndIncrement(); + x.release(); + }); + + final AtomicInteger countC = new AtomicInteger(); + inC.bind(x -> { + assertTrue(x.getReferenceCount() > 0); + countC.getAndIncrement(); + x.release(); + }); + + final AtomicInteger countD = new AtomicInteger(); + inD.bind(x -> { + assertTrue(x.getReferenceCount() > 0); + countD.getAndIncrement(); + x.release(); + }); + + final List fooBars = new ArrayList<>(100); + for (int i = 0; i < 100; i++) { + final FooBar fooBar = new FooBar(); + fooBars.add(fooBar); + inA.put(fooBar); + } + + assertEventuallyEquals(100, countA::get, Duration.ofSeconds(1), "A did not receive all data"); + assertEventuallyEquals(100, countB::get, Duration.ofSeconds(1), "B did not receive all data"); + assertEventuallyEquals(100, countC::get, Duration.ofSeconds(1), "C did not receive all data"); + assertEventuallyEquals(100, countD::get, Duration.ofSeconds(1), "D did not receive all data"); + + assertEventuallyTrue( + () -> { + for (final FooBar fooBar : fooBars) { + if (fooBar.getReferenceCount() != 0) { + return false; + } + } + return true; + }, + Duration.ofSeconds(1), + "Not all FooBars were released"); + + assertFalse(error.get()); + + model.stop(); + } + + private static class FooBarTransformer implements AdvancedTransformation { + @Nullable + @Override + public FooBar transform(@NonNull final FooBar fooBar) { + return fooBar.copyAndReserve(); + } + + @Override + public void cleanup(@NonNull final FooBar fooBar) { + fooBar.release(); + } + } + + /** + * Tests the version of the buildAdvancedTransformer() method that takes a single object implementing + * {@link AdvancedTransformation}. + */ + @Test + void advancedWireTransformerInterfaceVariationTest() { + // Component A passes data to components B, C, and D. + final WiringModel model = TestWiringModelBuilder.create(); + + final AtomicBoolean error = new AtomicBoolean(false); + final UncaughtExceptionHandler exceptionHandler = (t, e) -> error.set(true); + + final TaskScheduler taskSchedulerA = model.schedulerBuilder("A") + .withUncaughtExceptionHandler(exceptionHandler) + .build() + .cast(); + final InputWire inA = taskSchedulerA.buildInputWire("A in"); + final OutputWire outA = taskSchedulerA.getOutputWire(); + final OutputWire outAReserved = + outA.buildAdvancedTransformer("reserve FooBar", new FooBarTransformer()); + + final TaskScheduler taskSchedulerB = model.schedulerBuilder("B") + .withUncaughtExceptionHandler(exceptionHandler) + .build() + .cast(); + final InputWire inB = taskSchedulerB.buildInputWire("B in"); + + final TaskScheduler taskSchedulerC = model.schedulerBuilder("C") + .withUncaughtExceptionHandler(exceptionHandler) + .build() + .cast(); + final InputWire inC = taskSchedulerC.buildInputWire("C in"); + + final TaskScheduler taskSchedulerD = model.schedulerBuilder("D") + .withUncaughtExceptionHandler(exceptionHandler) + .build() + .cast(); + final InputWire inD = taskSchedulerD.buildInputWire("D in"); + + outAReserved.solderTo(inB); + outAReserved.solderTo(inC); + outAReserved.solderTo(inD); + + final AtomicInteger countA = new AtomicInteger(); + inA.bind(x -> { + assertTrue(x.getReferenceCount() > 0); + countA.getAndIncrement(); + return x; + }); + + final AtomicInteger countB = new AtomicInteger(); + inB.bind(x -> { + assertTrue(x.getReferenceCount() > 0); + countB.getAndIncrement(); + x.release(); + }); + + final AtomicInteger countC = new AtomicInteger(); + inC.bind(x -> { + assertTrue(x.getReferenceCount() > 0); + countC.getAndIncrement(); + x.release(); + }); + + final AtomicInteger countD = new AtomicInteger(); + inD.bind(x -> { + assertTrue(x.getReferenceCount() > 0); + countD.getAndIncrement(); + x.release(); + }); + + final List fooBars = new ArrayList<>(100); + for (int i = 0; i < 100; i++) { + final FooBar fooBar = new FooBar(); + fooBars.add(fooBar); + inA.put(fooBar); + } + + assertEventuallyEquals(100, countA::get, Duration.ofSeconds(1), "A did not receive all data"); + assertEventuallyEquals(100, countB::get, Duration.ofSeconds(1), "B did not receive all data"); + assertEventuallyEquals(100, countC::get, Duration.ofSeconds(1), "C did not receive all data"); + assertEventuallyEquals(100, countD::get, Duration.ofSeconds(1), "D did not receive all data"); + + assertEventuallyTrue( + () -> { + for (final FooBar fooBar : fooBars) { + if (fooBar.getReferenceCount() != 0) { + return false; + } + } + return true; + }, + Duration.ofSeconds(1), + "Not all FooBars were released"); + + assertFalse(error.get()); + + model.stop(); + } } diff --git a/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/wiring/EventDeduplicatorScheduler.java b/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/wiring/EventDeduplicatorScheduler.java index 4d5a05f94d00..5db7465d76fd 100644 --- a/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/wiring/EventDeduplicatorScheduler.java +++ b/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/wiring/EventDeduplicatorScheduler.java @@ -16,11 +16,11 @@ package com.swirlds.platform.wiring; -import com.swirlds.common.wiring.InputWire; -import com.swirlds.common.wiring.OutputWire; import com.swirlds.common.wiring.TaskScheduler; import com.swirlds.common.wiring.WiringModel; import com.swirlds.common.wiring.builders.TaskSchedulerType; +import com.swirlds.common.wiring.wires.input.InputWire; +import com.swirlds.common.wiring.wires.output.OutputWire; import com.swirlds.platform.event.GossipEvent; import com.swirlds.platform.event.deduplication.EventDeduplicator; import edu.umd.cs.findbugs.annotations.NonNull; diff --git a/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/wiring/EventSignatureValidatorScheduler.java b/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/wiring/EventSignatureValidatorScheduler.java index a491d8ebc260..23c1f3703f70 100644 --- a/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/wiring/EventSignatureValidatorScheduler.java +++ b/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/wiring/EventSignatureValidatorScheduler.java @@ -16,11 +16,11 @@ package com.swirlds.platform.wiring; -import com.swirlds.common.wiring.InputWire; -import com.swirlds.common.wiring.OutputWire; import com.swirlds.common.wiring.TaskScheduler; import com.swirlds.common.wiring.WiringModel; import com.swirlds.common.wiring.builders.TaskSchedulerType; +import com.swirlds.common.wiring.wires.input.InputWire; +import com.swirlds.common.wiring.wires.output.OutputWire; import com.swirlds.platform.event.GossipEvent; import com.swirlds.platform.event.validation.EventSignatureValidator; import edu.umd.cs.findbugs.annotations.NonNull; diff --git a/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/wiring/InOrderLinkerScheduler.java b/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/wiring/InOrderLinkerScheduler.java index e563989eebab..80d198ab3f11 100644 --- a/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/wiring/InOrderLinkerScheduler.java +++ b/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/wiring/InOrderLinkerScheduler.java @@ -16,11 +16,11 @@ package com.swirlds.platform.wiring; -import com.swirlds.common.wiring.InputWire; -import com.swirlds.common.wiring.OutputWire; import com.swirlds.common.wiring.TaskScheduler; import com.swirlds.common.wiring.WiringModel; import com.swirlds.common.wiring.builders.TaskSchedulerType; +import com.swirlds.common.wiring.wires.input.InputWire; +import com.swirlds.common.wiring.wires.output.OutputWire; import com.swirlds.platform.event.GossipEvent; import com.swirlds.platform.event.linking.InOrderLinker; import com.swirlds.platform.internal.EventImpl; diff --git a/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/wiring/InternalEventValidatorScheduler.java b/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/wiring/InternalEventValidatorScheduler.java index 6d34cf41ef77..07ab2da7fe24 100644 --- a/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/wiring/InternalEventValidatorScheduler.java +++ b/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/wiring/InternalEventValidatorScheduler.java @@ -16,11 +16,11 @@ package com.swirlds.platform.wiring; -import com.swirlds.common.wiring.InputWire; -import com.swirlds.common.wiring.OutputWire; import com.swirlds.common.wiring.TaskScheduler; import com.swirlds.common.wiring.WiringModel; import com.swirlds.common.wiring.builders.TaskSchedulerType; +import com.swirlds.common.wiring.wires.input.InputWire; +import com.swirlds.common.wiring.wires.output.OutputWire; import com.swirlds.platform.event.GossipEvent; import com.swirlds.platform.event.validation.InternalEventValidator; import edu.umd.cs.findbugs.annotations.NonNull; diff --git a/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/wiring/LinkedEventIntakeScheduler.java b/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/wiring/LinkedEventIntakeScheduler.java index aa44725678bf..050ef890b910 100644 --- a/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/wiring/LinkedEventIntakeScheduler.java +++ b/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/wiring/LinkedEventIntakeScheduler.java @@ -16,10 +16,10 @@ package com.swirlds.platform.wiring; -import com.swirlds.common.wiring.InputWire; import com.swirlds.common.wiring.TaskScheduler; import com.swirlds.common.wiring.WiringModel; import com.swirlds.common.wiring.builders.TaskSchedulerType; +import com.swirlds.common.wiring.wires.input.InputWire; import com.swirlds.platform.components.LinkedEventIntake; import com.swirlds.platform.internal.EventImpl; import edu.umd.cs.findbugs.annotations.NonNull; diff --git a/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/wiring/OrphanBufferScheduler.java b/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/wiring/OrphanBufferScheduler.java index 50abb3fdba3d..3b83769741d0 100644 --- a/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/wiring/OrphanBufferScheduler.java +++ b/platform-sdk/swirlds-platform-core/src/main/java/com/swirlds/platform/wiring/OrphanBufferScheduler.java @@ -16,11 +16,11 @@ package com.swirlds.platform.wiring; -import com.swirlds.common.wiring.InputWire; -import com.swirlds.common.wiring.OutputWire; import com.swirlds.common.wiring.TaskScheduler; import com.swirlds.common.wiring.WiringModel; import com.swirlds.common.wiring.builders.TaskSchedulerType; +import com.swirlds.common.wiring.wires.input.InputWire; +import com.swirlds.common.wiring.wires.output.OutputWire; import com.swirlds.platform.event.GossipEvent; import com.swirlds.platform.event.orphan.OrphanBuffer; import edu.umd.cs.findbugs.annotations.NonNull;