Skip to content

Commit

Permalink
Use legacy intake pipeline by default (#10120)
Browse files Browse the repository at this point in the history
Signed-off-by: Austin Littley <austin@swirldslabs.com>
  • Loading branch information
litt3 authored and agadzhalov committed Dec 1, 2023
1 parent e0f87b2 commit f5f435d
Show file tree
Hide file tree
Showing 3 changed files with 15 additions and 10 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -72,4 +72,4 @@ public record EventConfig(
@ConfigProperty(defaultValue = "/opt/hgcapp/eventsStreams") String eventsLogDir,
@ConfigProperty(defaultValue = "true") boolean enableEventStreaming,
@ConfigProperty(defaultValue = "8") int prehandlePoolSize,
@ConfigProperty(defaultValue = "false") boolean useLegacyIntake) {}
@ConfigProperty(defaultValue = "true") boolean useLegacyIntake) {}
Original file line number Diff line number Diff line change
Expand Up @@ -699,9 +699,9 @@ public class SwirldsPlatform implements Platform {
final EventValidator eventValidator = new EventValidator(
eventValidators, eventIntake::addUnlinkedEvent, eventIntakePhaseTimer, intakeEventCounter);

platformWiring = new PlatformWiring(platformContext, time);
if (eventConfig.useLegacyIntake()) {
intakeHandler = eventValidator::validateEvent;
platformWiring = null;
} else {
final InternalEventValidator internalEventValidator = new InternalEventValidator(
platformContext, time, currentAddressBook.getSize() == 1, intakeEventCounter);
Expand All @@ -727,7 +727,6 @@ public class SwirldsPlatform implements Platform {
preConsensusEventHandler::preconsensusEvent,
intakeEventCounter);

platformWiring = new PlatformWiring(platformContext, time);
platformWiring.bind(
internalEventValidator,
eventDeduplicator,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
import com.swirlds.base.state.Startable;
import com.swirlds.base.state.Stoppable;
import com.swirlds.base.time.Time;
import com.swirlds.common.config.EventConfig;
import com.swirlds.common.context.PlatformContext;
import com.swirlds.common.threading.interrupt.InterruptableConsumer;
import com.swirlds.common.utility.Clearable;
Expand All @@ -35,12 +36,14 @@
import com.swirlds.platform.event.validation.EventSignatureValidator;
import com.swirlds.platform.event.validation.InternalEventValidator;
import edu.umd.cs.findbugs.annotations.NonNull;
import java.util.Objects;
import java.util.function.Consumer;

/**
* Encapsulates wiring for {@link com.swirlds.platform.SwirldsPlatform}.
*/
public class PlatformWiring implements Startable, Stoppable, Clearable {
private final PlatformContext platformContext;
private final WiringModel model;

private final InternalEventValidatorWiring internalEventValidatorWiring;
Expand All @@ -57,6 +60,7 @@ public class PlatformWiring implements Startable, Stoppable, Clearable {
* @param time provides wall clock time
*/
public PlatformWiring(@NonNull final PlatformContext platformContext, @NonNull final Time time) {
this.platformContext = Objects.requireNonNull(platformContext);
model = WiringModel.create(platformContext, time);

final PlatformSchedulers schedulers = PlatformSchedulers.create(platformContext, model);
Expand Down Expand Up @@ -214,14 +218,16 @@ private void flushAll() {
*/
@Override
public void clear() {
// pause the orphan buffer to break the cycle, and flush the pause through
orphanBufferWiring.pauseInput().inject(true);
orphanBufferWiring.flushRunnable().run();
if (!platformContext.getConfiguration().getConfigData(EventConfig.class).useLegacyIntake()) {
// pause the orphan buffer to break the cycle, and flush the pause through
orphanBufferWiring.pauseInput().inject(true);
orphanBufferWiring.flushRunnable().run();

// now that no cycles exist, flush all the wiring objects
flushAll();
// now that no cycles exist, flush all the wiring objects
flushAll();

// once everything has been flushed through the system, it's safe to unpause the orphan buffer
orphanBufferWiring.pauseInput().inject(false);
// once everything has been flushed through the system, it's safe to unpause the orphan buffer
orphanBufferWiring.pauseInput().inject(false);
}
}
}

0 comments on commit f5f435d

Please sign in to comment.