Skip to content
Permalink
Browse files

refactor away unnecessary stream components

Signed-off-by: Cai Yufei (INST/ECS1) <yufei.cai@bosch-si.com>
  • Loading branch information
yufei-cai committed Apr 5, 2019
1 parent 1e43c6c commit 6f8ae20f6b95fa9738ce32f5722fad2d7efc7273
@@ -14,7 +14,7 @@

import java.util.Optional;

import org.eclipse.ditto.services.utils.akka.controlflow.Pipe;
import org.eclipse.ditto.services.utils.akka.controlflow.Filter;
import org.eclipse.ditto.signals.base.Signal;

import akka.NotUsed;
@@ -67,7 +67,7 @@ default boolean isApplicable(final T command) {
final Sink<Contextual<T>, ?> sink = Sink.foreach(contextual -> createEnforcement(contextual).enforceSafely());

final Graph<FanOutShape2<Contextual<Object>, Contextual<T>, Contextual<Object>>, NotUsed> multiplexer =
Pipe.multiplexBy(contextual ->
Filter.multiplexBy(contextual ->
contextual.tryToMapMessage(message -> getCommandClass().isInstance(message)
? Optional.of(getCommandClass().cast(message)).filter(this::isApplicable)
: Optional.empty()));
@@ -24,7 +24,7 @@

import org.eclipse.ditto.model.base.headers.WithDittoHeaders;
import org.eclipse.ditto.services.utils.akka.controlflow.Pipe;
import org.eclipse.ditto.services.utils.akka.controlflow.components.ActivityChecker;
import org.eclipse.ditto.services.utils.akka.controlflow.ActivityChecker;

import akka.NotUsed;
import akka.actor.ActorRef;
@@ -80,7 +80,7 @@ private PreEnforcer() {}
@SuppressWarnings("unchecked")
final Graph<FanOutShape2<Contextual<Object>, Contextual<WithDittoHeaders>, Contextual<Object>>, NotUsed>
multiplexer =
Pipe.multiplexBy(c -> c.getMessage() instanceof WithDittoHeaders
Filter.multiplexBy(c -> c.getMessage() instanceof WithDittoHeaders
? Optional.of((Contextual<WithDittoHeaders>) (Object) c)
: Optional.empty());

@@ -211,7 +211,7 @@ private static Throwable extractRootCause(final Throwable t) {
return t;
}

private static GraphStage<SinkShape<WithSender>> unhandled() {
private static Graph<SinkShape<WithSender>, NotUsed> unhandled() {
return Consume.untyped(wrapped ->
FALLBACK_LOGGER.warn("Unexpected message <{}> from <{}>", wrapped.getMessage(), wrapped.getSender()));
}
@@ -24,7 +24,7 @@
import org.eclipse.ditto.services.models.thingsearch.commands.sudo.ThingSearchSudoCommand;
import org.eclipse.ditto.services.utils.akka.LogUtil;
import org.eclipse.ditto.services.utils.akka.controlflow.AbstractGraphActor;
import org.eclipse.ditto.services.utils.akka.controlflow.Pipe;
import org.eclipse.ditto.services.utils.akka.controlflow.Filter;
import org.eclipse.ditto.services.utils.akka.controlflow.WithSender;
import org.eclipse.ditto.signals.commands.things.query.RetrieveThings;
import org.eclipse.ditto.signals.commands.thingsearch.ThingSearchCommand;
@@ -150,7 +150,7 @@ public static Props props(final AbstractConciergeConfigReader configReader,
}

private static Graph<FanOutShape2<Dispatch, Dispatch, Dispatch>, NotUsed> multiplexBy(final Class<?>... classes) {
return Pipe.multiplexBy(dispatch ->
return Filter.multiplexBy(dispatch ->
Arrays.stream(classes).anyMatch(clazz -> clazz.isInstance(dispatch.getMessage()))
? Optional.of(dispatch)
: Optional.empty());

This file was deleted.

@@ -10,7 +10,7 @@
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.services.utils.akka.controlflow.components;
package org.eclipse.ditto.services.utils.akka.controlflow;

import java.time.Duration;
import java.util.Collections;
@@ -22,7 +22,6 @@
import akka.actor.ActorRef;
import akka.actor.PoisonPill;
import akka.event.LoggingAdapter;
import akka.stream.FanInShape2;
import akka.stream.FlowShape;
import akka.stream.Graph;
import akka.stream.SourceShape;
@@ -37,33 +36,14 @@

/**
* Terminate a graph actor after a period of inactivity.
* Start the activity checker by sending it any message.
*/
public final class ActivityChecker {

private ActivityChecker() {
throw new AssertionError();
}

/**
* Create an Akka stream graph to terminate an actor after a period of inactivity.
*
* @param interval how often to check for activity.
* @param self reference to the actor.
* @param <A> type of messages that prevents actor termination.
* @return an activity checker.
*/
public static <A> Graph<FlowShape<A, A>, NotUsed> of(final Duration interval, final ActorRef self) {
return GraphDSL.create(builder -> {
final SourceShape<Tick> ticker = builder.add(Source.tick(interval, interval, new Tick()));
final FanInShape2<A, Tick, A> killer = builder.add(PipeWithIdleRoutine.of((tick, log) -> {
log.debug("Terminating actor after <{}> of inactivity: <{}>", interval, self);
self.tell(PoisonPill.getInstance(), ActorRef.noSender());
}));
builder.from(ticker).toInlet(killer.in1());
return FlowShape.of(killer.in0(), killer.out());
});
}

/**
* Create an activity checker. The actor to terminate is obtained from the first message.
*
@@ -93,20 +73,6 @@ private ActivityChecker() {
});
}

/**
* Create an activity checker if duration is not null and a pipe otherwise.
*
* @param interval how often to check for activity.
* @param self reference to the actor.
* @param <A> type of messages that prevents actor termination.
* @return an activity checker.
*/
public static <A> Graph<FlowShape<A, A>, NotUsed> ofNullable(@Nullable final Duration interval,
final ActorRef self) {

return interval == null ? Flow.create() : of(interval, self);
}

/**
* Create an activity checker if duration is not null and a pipe otherwise.
*
@@ -15,45 +15,43 @@
import java.util.function.BiConsumer;
import java.util.function.Consumer;

import akka.NotUsed;
import akka.actor.ActorRef;
import akka.stream.Attributes;
import akka.stream.Inlet;
import akka.stream.Graph;
import akka.stream.SinkShape;
import akka.stream.stage.GraphStage;
import akka.stream.stage.GraphStageLogic;
import akka.stream.javadsl.Sink;

/**
* An Akka stream sink from a consumer of messages with sender.
*/
public class Consume<T> extends GraphStage<SinkShape<WithSender<T>>> {
public class Consume {

private final SinkShape<WithSender<T>> shape = SinkShape.of(Inlet.create("input"));
private final Consumer<WithSender<T>> consumer;

private Consume(final Consumer<WithSender<T>> consumer) {
this.consumer = consumer;
}
private Consume() {}

/**
* Create sink from consumer of {@code WithSender}.
*
* @param consumer the consumer.
* @param <T> type of messages.
* @param <S> some supertype of messages.
* @return sink.
*/
public static <T> Consume<T> of(final Consumer<WithSender<? super T>> consumer) {
return new Consume<>(consumer::accept);
@SuppressWarnings("unchecked")
public static <S, T extends S> Graph<SinkShape<WithSender<T>>, NotUsed> of(final Consumer<WithSender<S>> consumer) {
// need to cast WithSender<T> to WithSender<S> because java does not understand covariance
return Sink.<WithSender<T>>foreach(w -> consumer.accept((WithSender<S>) w))
.mapMaterializedValue(x -> NotUsed.getInstance());
}

/**
* Create sink from biconsumer of message and sender.
* Create sink from bi-consumer of message and sender.
*
* @param consumer the consumer.
* @param biConsumer the bi-consumer.
* @param <T> type of messages.
* @return sink.
*/
public static <T> Consume<T> of(final BiConsumer<? super T, ActorRef> consumer) {
return new Consume<>(withSender -> consumer.accept(withSender.getMessage(), withSender.getSender()));
public static <T> Graph<SinkShape<WithSender<T>>, NotUsed> of(final BiConsumer<? super T, ActorRef> biConsumer) {
return of(withSender -> biConsumer.accept(withSender.getMessage(), withSender.getSender()));
}

/**
@@ -63,26 +61,9 @@ private Consume(final Consumer<WithSender<T>> consumer) {
* @return sink.
*/
@SuppressWarnings("unchecked")
public static GraphStage<SinkShape<WithSender>> untyped(
final Consumer<WithSender> consumer) {

// Ignore complaints from Java type checker. The biConsumer can clearly handle all inputs.
return (GraphStage<SinkShape<WithSender>>) (Object) new Consume<>(consumer::accept);
}

@Override
public SinkShape<WithSender<T>> shape() {
return shape;
}
public static Graph<SinkShape<WithSender>, NotUsed> untyped(final Consumer<WithSender> consumer) {

@Override
@SuppressWarnings({"squid:S3599","squid:S1171"})
public GraphStageLogic createLogic(final Attributes inheritedAttributes) {
return new AbstractControlFlowLogic(shape) {
{
initOutlets(shape);
when(shape.in(), consumer::accept);
}
};
// Ignore complaints from Java type checker. The consumer can clearly handle all inputs.
return (Graph<SinkShape<WithSender>, NotUsed>) (Object) of(consumer::accept);
}
}

This file was deleted.

0 comments on commit 6f8ae20

Please sign in to comment.
You can’t perform that action at this time.