Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

09837 9578 advanced wire transformers #9894

Merged
merged 38 commits into from
Nov 15, 2023
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
38 commits
Select commit Hold shift + click to select a range
ee13cf3
Added heartbeats to wiring framework.
cody-littley Nov 3, 2023
c19c01b
New tests, fixed bugs.
cody-littley Nov 3, 2023
d3b216e
Added offer soldering.
cody-littley Nov 3, 2023
bc15c50
Added direct task scheduler.
cody-littley Nov 3, 2023
45e549d
Incremental progress fitting direct schedulers into the wiring model.
cody-littley Nov 3, 2023
db3735b
Incremental changes.
cody-littley Nov 6, 2023
c282ebc
Merge branch 'develop' into 09618-heartbeats
cody-littley Nov 6, 2023
561483e
Merge branch '09618-heartbeats' into 09670-9618-offer-solder
cody-littley Nov 6, 2023
07f3760
Made suggested changes.
cody-littley Nov 6, 2023
be02775
Merge branch '09670-9618-offer-solder' into 09665-9670-direct-scheduler
cody-littley Nov 6, 2023
498f4b5
Incremental progress.
cody-littley Nov 6, 2023
d7a5e64
Merge branch 'develop' into 09665-9670-direct-scheduler
cody-littley Nov 7, 2023
83cccf3
Minor changes.
cody-littley Nov 7, 2023
cbafb57
Incremental progress.
cody-littley Nov 7, 2023
bb6d0a0
Merge branch 'develop' into 09665-9670-direct-scheduler
cody-littley Nov 10, 2023
dd07b4a
Added unit tests.
cody-littley Nov 10, 2023
9829ff7
Made suggested changes.
cody-littley Nov 10, 2023
0844963
Create SEQENTIAL_THREAD scheduler.
cody-littley Nov 10, 2023
9dec554
Simplify wiring model inheritence.
cody-littley Nov 10, 2023
301ec6d
Cleanup.
cody-littley Nov 10, 2023
54bd257
Cleanup.
cody-littley Nov 10, 2023
7565855
Made suggested change.
cody-littley Nov 13, 2023
9dc66a6
Merge branch 'develop' into 09665-9670-direct-scheduler
cody-littley Nov 13, 2023
e3a94ef
Merge branch '09665-9670-direct-scheduler' into 09578-9665-sequential…
cody-littley Nov 13, 2023
f60820e
Bugfixes, made suggested changes
cody-littley Nov 13, 2023
06d221d
Made suggested changes.
cody-littley Nov 13, 2023
6cecc7f
Merge branch '09665-9670-direct-scheduler' into 09578-9665-sequential…
cody-littley Nov 13, 2023
2be48e4
Merge branch 'develop' into 09578-9665-sequential-thread-scheduler
cody-littley Nov 14, 2023
aba9331
Made suggested change.
cody-littley Nov 14, 2023
3689197
Fixed test flake.
cody-littley Nov 14, 2023
23a0ed0
Added framework to support state reservation lifecycle.
cody-littley Nov 14, 2023
09b96fb
Cleanup.
cody-littley Nov 14, 2023
e1e4494
Cleanup.
cody-littley Nov 14, 2023
7feac7c
Rename mutating wire to transforming wire.
cody-littley Nov 14, 2023
7e2978c
Unit tests.
cody-littley Nov 14, 2023
0f1f6fd
Merge branch 'develop' into 09837-9578-wire-mutator
cody-littley Nov 15, 2023
dbd02d9
Made suggested changes.
cody-littley Nov 15, 2023
89aa4a5
Made suggested changes.
cody-littley Nov 15, 2023
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,10 @@
import com.swirlds.common.wiring.builders.TaskSchedulerMetricsBuilder;
import com.swirlds.common.wiring.builders.TaskSchedulerType;
import com.swirlds.common.wiring.counters.ObjectCounter;
import com.swirlds.common.wiring.wires.input.InputWire;
import com.swirlds.common.wiring.wires.input.TaskSchedulerInput;
import com.swirlds.common.wiring.wires.output.OutputWire;
import com.swirlds.common.wiring.wires.output.StandardOutputWire;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.util.Objects;
import java.util.function.Consumer;
Expand All @@ -38,13 +42,13 @@
*
* @param <OUT> the output type of the primary output wire (use {@link Void} if no output is needed)
*/
public abstract class TaskScheduler<OUT> {
public abstract class TaskScheduler<OUT> extends TaskSchedulerInput<OUT> {

private final boolean flushEnabled;
private final WiringModel model;
private final String name;
private final TaskSchedulerType type;
private final OutputWire<OUT> primaryOutputWire;
private final StandardOutputWire<OUT> primaryOutputWire;
private final boolean insertionIsBlocking;

/**
Expand All @@ -68,7 +72,7 @@ protected TaskScheduler(
this.name = Objects.requireNonNull(name);
this.type = Objects.requireNonNull(type);
this.flushEnabled = flushEnabled;
primaryOutputWire = new OutputWire<>(model, name);
primaryOutputWire = new StandardOutputWire<>(model, name);
this.insertionIsBlocking = insertionIsBlocking;
}

Expand Down Expand Up @@ -110,10 +114,10 @@ public OutputWire<OUT> getOutputWire() {
* @return the secondary output wire
*/
@NonNull
public <T> OutputWire<T> buildSecondaryOutputWire() {
public <T> StandardOutputWire<T> buildSecondaryOutputWire() {
// Intentionally do not register this with the model. Connections using this output wire will be represented
// in the model in the same way as connections to the primary output wire.
return new OutputWire<>(model, name);
return new StandardOutputWire<>(model, name);
}

/**
Expand Down Expand Up @@ -194,34 +198,6 @@ public final <X> TaskScheduler<X> cast() {
*/
public abstract void flush();

/**
* Add a task to the scheduler. May block if back pressure is enabled.
*
* @param handler handles the provided data
* @param data the data to be processed by the task scheduler
*/
protected abstract void put(@NonNull Consumer<Object> handler, @NonNull Object data);

/**
* Add a task to the scheduler. If backpressure is enabled and there is not immediately capacity available, this
* method will not accept the data.
*
* @param handler handles the provided data
* @param data the data to be processed by the scheduler
* @return true if the data was accepted, false otherwise
*/
protected abstract boolean offer(@NonNull Consumer<Object> handler, @NonNull Object data);

/**
* Inject data into the scheduler, doing so even if it causes the number of unprocessed tasks to exceed the capacity
* specified by configured back pressure. If backpressure is disabled, this operation is logically equivalent to
* {@link #put(Consumer, Object)}.
*
* @param handler handles the provided data
* @param data the data to be processed by the scheduler
*/
protected abstract void inject(@NonNull Consumer<Object> handler, @NonNull Object data);

/**
* Throw an {@link UnsupportedOperationException} if flushing is not enabled.
*/
Expand All @@ -232,12 +208,10 @@ protected final void throwIfFlushDisabled() {
}

/**
* Pass data to this scheduler's primary output wire.
* <p>
* This method is implemented here to allow classes in this package to call forward(), which otherwise would not be
* visible.
* {@inheritDoc}
*/
protected final void forward(@NonNull final OUT data) {
@Override
protected void forward(@NonNull final OUT data) {
primaryOutputWire.forward(data);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,8 @@
import com.swirlds.common.wiring.schedulers.HeartbeatScheduler;
import com.swirlds.common.wiring.schedulers.SequentialThreadTaskScheduler;
import com.swirlds.common.wiring.utility.ModelGroup;
import com.swirlds.common.wiring.wires.SolderType;
import com.swirlds.common.wiring.wires.output.OutputWire;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.time.Duration;
import java.time.Instant;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -48,13 +48,14 @@ public class ConcurrentTaskScheduler<OUT> extends TaskScheduler<OUT> {
* @param onRamp an object counter that is incremented when data is added to the scheduler
* @param offRamp an object counter that is decremented when data is removed from the scheduler
* @param flushEnabled if true, then {@link #flush()} will be enabled, otherwise it will throw.
* @param insertionIsBlocking when data is inserted into this scheduler, will it block until capacity is available?
* @param insertionIsBlocking when data is inserted into this scheduler, will it block until capacity is
* available?
*/
public ConcurrentTaskScheduler(
@NonNull final WiringModel model,
@NonNull final String name,
@NonNull ForkJoinPool pool,
@NonNull UncaughtExceptionHandler uncaughtExceptionHandler,
@NonNull final ForkJoinPool pool,
@NonNull final UncaughtExceptionHandler uncaughtExceptionHandler,
@NonNull final ObjectCounter onRamp,
@NonNull final ObjectCounter offRamp,
final boolean flushEnabled,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@
import com.swirlds.base.state.Startable;
import com.swirlds.base.state.Stoppable;
import com.swirlds.base.time.Time;
import com.swirlds.common.wiring.OutputWire;
import com.swirlds.common.wiring.WiringModel;
import com.swirlds.common.wiring.builders.TaskSchedulerType;
import com.swirlds.common.wiring.wires.output.OutputWire;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.time.Duration;
import java.time.Instant;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,9 @@
package com.swirlds.common.wiring.schedulers;

import com.swirlds.base.time.Time;
import com.swirlds.common.wiring.OutputWire;
import com.swirlds.common.wiring.WiringModel;
import com.swirlds.common.wiring.wires.output.OutputWire;
import com.swirlds.common.wiring.wires.output.StandardOutputWire;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.time.Duration;
import java.time.Instant;
Expand All @@ -32,7 +33,7 @@ class HeartbeatTask extends TimerTask {

private final Time time;
private final Duration period;
private final OutputWire<Instant> outputWire;
private final StandardOutputWire<Instant> outputWire;

/**
* Constructor.
Expand All @@ -49,7 +50,7 @@ public HeartbeatTask(
this.period = Objects.requireNonNull(period);
Objects.requireNonNull(name);

this.outputWire = new OutputWire<>(model, name);
this.outputWire = new StandardOutputWire<>(model, name);
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@ public class SequentialThreadTaskScheduler<OUT> extends TaskScheduler<OUT> imple
private static final int BUFFER_SIZE = 1024;

private final AtomicBoolean alive = new AtomicBoolean(true);

private final Thread thread;

/**
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,48 @@
/*
* Copyright (C) 2023 Hedera Hashgraph, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.swirlds.common.wiring.transformers;

import edu.umd.cs.findbugs.annotations.NonNull;
import edu.umd.cs.findbugs.annotations.Nullable;

/**
* Executes a transformation for an advanced transformer as created by
* {@link com.swirlds.common.wiring.wires.output.OutputWire#buildAdvancedTransformer(String, AdvancedTransformation)}.
*
* @param <A> the original wire output type
* @param <B> the output type of the transformer
*/
public interface AdvancedTransformation<A, B> {

/**
* Given data that comes off of the original output wire, this method transforms it before it is passed to each
* input wire that is connected to this transformer. Called once per data element per listener.
*
* @param a a data element from the original output wire
* @return the transformed data element, or null if the data should not be forwarded
*/
@Nullable
B transform(@NonNull A a);

/**
* Called on the original data element after it has been forwarded to all listeners. This method can do cleanup if
* necessary. Doing nothing is perfectly ok if the use case does not require cleanup.
*
* @param a the original data element
*/
void cleanup(@NonNull A a);
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,78 @@
/*
* Copyright (C) 2023 Hedera Hashgraph, LLC
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/

package com.swirlds.common.wiring.transformers.internal;

import com.swirlds.common.wiring.WiringModel;
import com.swirlds.common.wiring.builders.TaskSchedulerType;
import com.swirlds.common.wiring.wires.output.OutputWire;
import com.swirlds.common.wiring.wires.output.internal.ForwardingOutputWire;
import com.swirlds.common.wiring.wires.output.internal.TransformingOutputWire;
import edu.umd.cs.findbugs.annotations.NonNull;
import edu.umd.cs.findbugs.annotations.Nullable;
import java.util.function.Consumer;
import java.util.function.Function;

/**
* Similar to a {@link WireTransformer} but for more advanced use cases. Unlike a {@link WireTransformer}, the
* transforming function is called once per output per data item, and a special method can be called after the data is
* forwarded to all destinations.
*
* @param <A> the input type
* @param <B> the output type
*/
public class AdvancedWireTransformer<A, B> implements Consumer<A> {

private final ForwardingOutputWire<A, B> outputWire;

/**
* Constructor.
*
* @param model the wiring model containing this output wire
* @param name the name of the output wire
* @param transformer the function to transform the data from the input type to the output type. Is called once per
* output per data item. If this method returns null then the data is not forwarded.
* @param cleanup an optional method that is called after the data is forwarded to all destinations. The
* original data is passed to this method. Ignored if null.
*/
public AdvancedWireTransformer(
@NonNull final WiringModel model,
@NonNull final String name,
@NonNull final Function<A, B> transformer,
@Nullable final Consumer<A> cleanup) {

model.registerVertex(name, TaskSchedulerType.DIRECT_STATELESS, true);
outputWire = new TransformingOutputWire<>(model, name, transformer, cleanup);
}

/**
* {@inheritDoc}
*/
@Override
public void accept(@NonNull final A a) {
outputWire.forward(a);
}

/**
* Get the output wire for this transformer.
*
* @return the output wire
*/
@NonNull
public OutputWire<B> getOutputWire() {
return outputWire;
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@
* limitations under the License.
*/

package com.swirlds.common.wiring.transformers;
package com.swirlds.common.wiring.transformers.internal;

import com.swirlds.common.wiring.OutputWire;
import com.swirlds.common.wiring.WiringModel;
import com.swirlds.common.wiring.builders.TaskSchedulerType;
import com.swirlds.common.wiring.wires.output.OutputWire;
import com.swirlds.common.wiring.wires.output.StandardOutputWire;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.util.Objects;
import java.util.function.Consumer;
Expand All @@ -30,7 +31,7 @@
public class WireFilter<T> implements Consumer<T> {

private final Predicate<T> predicate;
private final OutputWire<T> outputWire;
private final StandardOutputWire<T> outputWire;

/**
* Constructor.
Expand All @@ -44,7 +45,7 @@ public class WireFilter<T> implements Consumer<T> {
public WireFilter(
@NonNull final WiringModel model, @NonNull final String name, @NonNull final Predicate<T> predicate) {
this.predicate = Objects.requireNonNull(predicate);
this.outputWire = new OutputWire<>(model, name);
this.outputWire = new StandardOutputWire<>(model, name);
model.registerVertex(name, TaskSchedulerType.DIRECT_STATELESS, true);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@
* limitations under the License.
*/

package com.swirlds.common.wiring.transformers;
package com.swirlds.common.wiring.transformers.internal;

import com.swirlds.common.wiring.OutputWire;
import com.swirlds.common.wiring.WiringModel;
import com.swirlds.common.wiring.builders.TaskSchedulerType;
import com.swirlds.common.wiring.wires.output.OutputWire;
import com.swirlds.common.wiring.wires.output.StandardOutputWire;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.util.List;
import java.util.function.Consumer;
Expand All @@ -29,7 +30,7 @@
*/
public class WireListSplitter<T> implements Consumer<List<T>> {

private final OutputWire<T> outputWire;
private final StandardOutputWire<T> outputWire;

/**
* Constructor.
Expand All @@ -39,7 +40,7 @@ public class WireListSplitter<T> implements Consumer<List<T>> {
*/
public WireListSplitter(@NonNull final WiringModel model, @NonNull final String name) {
model.registerVertex(name, TaskSchedulerType.DIRECT_STATELESS, true);
outputWire = new OutputWire<>(model, name);
outputWire = new StandardOutputWire<>(model, name);
}

/**
Expand Down