Skip to content

Commit

Permalink
Remove parallel scheduling of enforcement futures to ensure that cach…
Browse files Browse the repository at this point in the history
…e invavlidation by authorization-changing commands takes effect immediately.

Signed-off-by: Yufei Cai <yufei.cai@bosch-si.com>
  • Loading branch information
yufei-cai committed Feb 26, 2020
1 parent 3ca679a commit 3d83e7a
Show file tree
Hide file tree
Showing 4 changed files with 5 additions and 128 deletions.
Expand Up @@ -62,12 +62,10 @@ default boolean isApplicable(final T command) {
/**
* Convert this enforcement provider into a stream of contextual messages.
*
* @param bufferSize size of the buffer of concurrently scheduled enforcement futures.
* @return the stream.
*/
@SuppressWarnings("unchecked") // due to GraphDSL usage
default Graph<FlowShape<Contextual<WithDittoHeaders>, Contextual<WithDittoHeaders>>, NotUsed> toContextualFlow(
final int bufferSize) {
default Graph<FlowShape<Contextual<WithDittoHeaders>, Contextual<WithDittoHeaders>>, NotUsed> toContextualFlow() {

final Graph<FanOutShape2<Contextual<WithDittoHeaders>, Contextual<T>, Contextual<WithDittoHeaders>>, NotUsed>
multiplexer =
Expand All @@ -80,9 +78,10 @@ default Graph<FlowShape<Contextual<WithDittoHeaders>, Contextual<WithDittoHeader
final FanOutShape2<Contextual<WithDittoHeaders>, Contextual<T>, Contextual<WithDittoHeaders>> fanout =
builder.add(multiplexer);

// using parallelism=1 to ensure that authorization-changing commands affect the next command immediately
final Flow<Contextual<T>, Contextual<WithDittoHeaders>, NotUsed> enforcementFlow =
Flow.<Contextual<T>>create()
.mapAsync(bufferSize, contextual -> createEnforcement(contextual).enforceSafely());
.mapAsync(1, contextual -> createEnforcement(contextual).enforceSafely());

// by default, ignore unhandled messages:
final SinkShape<Contextual<WithDittoHeaders>> unhandledSink = builder.add(Sink.ignore());
Expand Down
Expand Up @@ -159,7 +159,7 @@ private Flow<Contextual<WithDittoHeaders>, Contextual<WithDittoHeaders>, NotUsed
final ArrayList<EnforcementProvider<?>> providers = new ArrayList<>(enforcementProviders);
for (int i = 0; i < providers.size(); i++) {
builder.from(bcast.out(i))
.via(builder.add(providers.get(i).toContextualFlow(partitionBufferSize)))
.via(builder.add(providers.get(i).toContextualFlow()))
.toInlet(merge.in(i));
}

Expand Down

This file was deleted.

Expand Up @@ -30,7 +30,7 @@ ditto {
parallelism = 256
parallelism = ${?ENFORCEMENT_PARALLELISM}

# buffer size for input message buffer per lane and also the number of futures to schedule per lane
# buffer size for input message buffer per lane
partition-buffer-size = 100
partition-buffer-size = ${?ENFORCEMENT_PARTITION_BUFFER_SIZE}

Expand Down

0 comments on commit 3d83e7a

Please sign in to comment.