diff --git a/services/concierge/enforcement/src/main/java/org/eclipse/ditto/services/concierge/enforcement/EnforcementProvider.java b/services/concierge/enforcement/src/main/java/org/eclipse/ditto/services/concierge/enforcement/EnforcementProvider.java index 38fba86472..8bf974f020 100644 --- a/services/concierge/enforcement/src/main/java/org/eclipse/ditto/services/concierge/enforcement/EnforcementProvider.java +++ b/services/concierge/enforcement/src/main/java/org/eclipse/ditto/services/concierge/enforcement/EnforcementProvider.java @@ -62,12 +62,10 @@ default boolean isApplicable(final T command) { /** * Convert this enforcement provider into a stream of contextual messages. * - * @param bufferSize size of the buffer of concurrently scheduled enforcement futures. * @return the stream. */ @SuppressWarnings("unchecked") // due to GraphDSL usage - default Graph, Contextual>, NotUsed> toContextualFlow( - final int bufferSize) { + default Graph, Contextual>, NotUsed> toContextualFlow() { final Graph, Contextual, Contextual>, NotUsed> multiplexer = @@ -80,9 +78,10 @@ default Graph, Contextual, Contextual, Contextual> fanout = builder.add(multiplexer); + // using parallelism=1 to ensure that authorization-changing commands affect the next command immediately final Flow, Contextual, NotUsed> enforcementFlow = Flow.>create() - .mapAsync(bufferSize, contextual -> createEnforcement(contextual).enforceSafely()); + .mapAsync(1, contextual -> createEnforcement(contextual).enforceSafely()); // by default, ignore unhandled messages: final SinkShape> unhandledSink = builder.add(Sink.ignore()); diff --git a/services/concierge/enforcement/src/main/java/org/eclipse/ditto/services/concierge/enforcement/EnforcerActor.java b/services/concierge/enforcement/src/main/java/org/eclipse/ditto/services/concierge/enforcement/EnforcerActor.java index 6a853e5ac6..29685c11f3 100644 --- a/services/concierge/enforcement/src/main/java/org/eclipse/ditto/services/concierge/enforcement/EnforcerActor.java +++ b/services/concierge/enforcement/src/main/java/org/eclipse/ditto/services/concierge/enforcement/EnforcerActor.java @@ -159,7 +159,7 @@ private Flow, Contextual, NotUsed final ArrayList> providers = new ArrayList<>(enforcementProviders); for (int i = 0; i < providers.size(); i++) { builder.from(bcast.out(i)) - .via(builder.add(providers.get(i).toContextualFlow(partitionBufferSize))) + .via(builder.add(providers.get(i).toContextualFlow())) .toInlet(merge.in(i)); } diff --git a/services/concierge/enforcement/src/test/java/org/eclipse/ditto/services/concierge/enforcement/EnforcementProviderTest.java b/services/concierge/enforcement/src/test/java/org/eclipse/ditto/services/concierge/enforcement/EnforcementProviderTest.java deleted file mode 100644 index 2d8e78692c..0000000000 --- a/services/concierge/enforcement/src/test/java/org/eclipse/ditto/services/concierge/enforcement/EnforcementProviderTest.java +++ /dev/null @@ -1,122 +0,0 @@ -/* - * Copyright (c) 2020 Contributors to the Eclipse Foundation - * - * See the NOTICE file(s) distributed with this work for additional - * information regarding copyright ownership. - * - * This program and the accompanying materials are made available under the - * terms of the Eclipse Public License 2.0 which is available at - * http://www.eclipse.org/legal/epl-2.0 - * - * SPDX-License-Identifier: EPL-2.0 - */ -package org.eclipse.ditto.services.concierge.enforcement; - -import java.time.Duration; -import java.util.concurrent.CompletionStage; - -import org.eclipse.ditto.model.base.headers.DittoHeaders; -import org.eclipse.ditto.model.base.headers.WithDittoHeaders; -import org.eclipse.ditto.model.things.ThingId; -import org.eclipse.ditto.services.utils.akka.logging.DittoDiagnosticLoggingAdapter; -import org.eclipse.ditto.services.utils.cache.Cache; -import org.eclipse.ditto.signals.events.things.ThingDeleted; -import org.junit.After; -import org.junit.Before; -import org.junit.Test; -import org.mockito.Mockito; - -import akka.NotUsed; -import akka.actor.ActorRef; -import akka.actor.ActorSystem; -import akka.japi.Pair; -import akka.pattern.Patterns; -import akka.stream.ActorMaterializer; -import akka.stream.javadsl.Sink; -import akka.stream.javadsl.Source; -import akka.stream.testkit.TestPublisher; -import akka.stream.testkit.TestSubscriber; -import akka.stream.testkit.javadsl.TestSink; -import akka.stream.testkit.javadsl.TestSource; -import akka.testkit.javadsl.TestKit; - -/** - * Tests {@link org.eclipse.ditto.services.concierge.enforcement.EnforcementProvider}. - */ -public final class EnforcementProviderTest { - - private ActorSystem actorSystem; - private ActorMaterializer materializer; - private TestPublisher.Probe> sourceProbe; - private TestSubscriber.Probe> sinkProbe; - private Source, NotUsed> testSource; - private Sink, NotUsed> testSink; - - @Before - public void init() { - actorSystem = ActorSystem.create(); - materializer = ActorMaterializer.create(actorSystem); - final Pair>, Source, NotUsed>> - sourcePair = TestSource.>probe(actorSystem).preMaterialize(materializer); - final Pair>, Sink, NotUsed>> - sinkPair = TestSink.>probe(actorSystem).preMaterialize(materializer); - sourceProbe = sourcePair.first(); - sinkProbe = sinkPair.first(); - testSource = sourcePair.second(); - testSink = sinkPair.second(); - } - - @After - public void cleanup() { - if (actorSystem != null) { - TestKit.shutdownActorSystem(actorSystem); - } - } - - @Test - @SuppressWarnings("unchecked") - public void parallelAsksArePossible() { - new TestKit(actorSystem) {{ - // GIVEN: an EnforcementProvider that asks the test kit for contextual results - final DittoDiagnosticLoggingAdapter log = Mockito.mock(DittoDiagnosticLoggingAdapter.class); - final Cache cache = Mockito.mock(Cache.class); - final ThingDeleted signal = ThingDeleted.of(ThingId.dummy(), 1L, DittoHeaders.empty()); - final Contextual contextual = new Contextual<>(signal, getRef(), getRef(), - getRef(), getRef(), Duration.ofMinutes(9L), log, null, null, - null, null, cache); - final EnforcementProvider underTest = new EnforcementProvider<>() { - @Override - public Class getCommandClass() { - return ThingDeleted.class; - } - - @Override - public AbstractEnforcement createEnforcement(final Contextual context) { - return new AbstractEnforcement<>(contextual.withMessage(signal)) { - @Override - public CompletionStage> enforce() { - return Patterns.ask(context.getSender(), context.getMessage(), context.getAskTimeout()) - .thenApply(reply -> (Contextual) reply); - } - }; - } - }; - - testSource.via(underTest.toContextualFlow(100)).runWith(testSink, materializer); - - // WHEN: multiple elements are requested downstream - sinkProbe.request(99L); - sourceProbe.expectRequest(); - sourceProbe.sendNext(contextual).sendNext(contextual).sendComplete(); - - // THEN: the ask-futures run in parallel - expectMsg(signal); - final ActorRef sender1 = getLastSender(); - expectMsg(signal); - final ActorRef sender2 = getLastSender(); - sender1.tell(contextual, getRef()); - sender2.tell(contextual, getRef()); - sinkProbe.expectNext(contextual, contextual).expectComplete(); - }}; - } -} diff --git a/services/concierge/starter/src/main/resources/concierge.conf b/services/concierge/starter/src/main/resources/concierge.conf index 04d59b417a..c69509d872 100644 --- a/services/concierge/starter/src/main/resources/concierge.conf +++ b/services/concierge/starter/src/main/resources/concierge.conf @@ -30,7 +30,7 @@ ditto { parallelism = 256 parallelism = ${?ENFORCEMENT_PARALLELISM} - # buffer size for input message buffer per lane and also the number of futures to schedule per lane + # buffer size for input message buffer per lane partition-buffer-size = 100 partition-buffer-size = ${?ENFORCEMENT_PARTITION_BUFFER_SIZE}