Skip to content

Commit

Permalink
Issue #561: Parallelize message mapping in a stream with the processo…
Browse files Browse the repository at this point in the history
…r actor's dispatcher.

Reason: Order guarantee.

Still TODO: remove the consistent hashing router on top of
            message mapping processors, which does not guarantee order.

Signed-off-by: Yufei Cai <yufei.cai@bosch-si.com>
  • Loading branch information
yufei-cai committed Jan 19, 2020
1 parent 268d633 commit 6953de7
Show file tree
Hide file tree
Showing 5 changed files with 175 additions and 292 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -14,58 +14,50 @@

import static org.eclipse.ditto.model.base.common.ConditionChecker.checkNotNull;

import java.util.Collection;
import java.util.Collections;
import java.util.function.BinaryOperator;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;

import org.eclipse.ditto.model.base.exceptions.DittoRuntimeException;
import org.eclipse.ditto.services.connectivity.messaging.monitoring.ConnectionMonitor;
import org.eclipse.ditto.services.models.connectivity.MappedInboundExternalMessage;
import org.eclipse.ditto.signals.base.Signal;

import akka.stream.javadsl.Source;

/**
* {@link MappingResultHandler} for inbound messages. This handler forwards to the given handlers. Additionally it
* calls the {@link MappingResultHandler#onException(Exception)} method for exceptions thrown in handlers and
* {@link org.eclipse.ditto.services.connectivity.messaging.MappingResultHandler} for messages. This handler forwards to the given handlers. Additionally it
* calls the {@link org.eclipse.ditto.services.connectivity.messaging.MappingResultHandler#onException(Exception)} method for exceptions thrown in handlers and
* increases the according counters for mapped, dropped failed messages.
*
* @param <M> type of mapped messages.
* @param <R> type of results.
*/
final class InboundMappingResultHandler<R> implements MappingResultHandler<MappedInboundExternalMessage, R> {
abstract class AbstractMappingResultHandler<M, R> implements MappingResultHandler<M, R> {

private final Function<MappedInboundExternalMessage, R> onMessageMapped;
private final Function<M, R> onMessageMapped;
private final Runnable onMessageDropped;
private final Consumer<Exception> onException;
private final R emptyResult;
private final BinaryOperator<R> combineResults;
private final ConnectionMonitor inboundMapped;
private final ConnectionMonitor inboundDropped;
private final Collection<ConnectionMonitor> mappedMonitors;
private final Collection<ConnectionMonitor> droppedMonitors;
private final ConnectionMonitor.InfoProvider infoProvider;

private InboundMappingResultHandler(final Builder<R> builder) {
protected AbstractMappingResultHandler(final AbstractBuilder<M, R, ?> builder) {
onMessageMapped = checkNotNull(builder.onMessageMapped, "onMessageMapped");
onMessageDropped = checkNotNull(builder.onMessageDropped, "onMessageDropped");
onException = checkNotNull(builder.onException, "onException");
inboundMapped = checkNotNull(builder.inboundMapped, "inboundMapped");
inboundDropped = checkNotNull(builder.inboundDropped, "inboundDropped");
mappedMonitors = checkNotNull(builder.mappedMonitors, "mappedMonitors");
droppedMonitors = checkNotNull(builder.droppedMonitors, "droppedMonitors");
infoProvider = checkNotNull(builder.infoProvider, "infoProvider");
emptyResult = checkNotNull(builder.emptyResult, "emptyResult");
combineResults = checkNotNull(builder.combineResults, "combineResults");
}

static <S> Builder<Source<S, ?>> newSourceBuilder() {
return new Builder<Source<S, ?>>()
.emptyResult(Source.empty())
.combineResults(Source::concat);
}

@Override
public R onMessageMapped(final MappedInboundExternalMessage inboundExternalMessage) {
public R onMessageMapped(final M message) {
try {
inboundMapped.success(infoProvider);
return onMessageMapped.apply(inboundExternalMessage);
mappedMonitors.forEach(monitor -> monitor.success(infoProvider));
return onMessageMapped.apply(message);
} catch (final Exception e) {
return onException(e);
}
Expand All @@ -74,7 +66,7 @@ public R onMessageMapped(final MappedInboundExternalMessage inboundExternalMessa
@Override
public R onMessageDropped() {
try {
inboundDropped.success(infoProvider);
droppedMonitors.forEach(monitor -> monitor.success(infoProvider));
onMessageDropped.run();
return emptyResult;
} catch (Exception e) {
Expand All @@ -85,9 +77,9 @@ public R onMessageDropped() {
@Override
public R onException(final Exception exception) {
if (exception instanceof DittoRuntimeException) {
inboundMapped.failure(((DittoRuntimeException) exception));
mappedMonitors.forEach(monitor -> monitor.failure(((DittoRuntimeException) exception)));
} else {
inboundMapped.exception(exception);
mappedMonitors.forEach(monitor -> monitor.exception(exception));
}
onException.accept(exception);
return emptyResult;
Expand All @@ -103,62 +95,49 @@ public R emptyResult() {
return emptyResult;
}

static final class Builder<R> {
static abstract class AbstractBuilder<M, R, T> {

private ConnectionMonitor inboundMapped;
private ConnectionMonitor inboundDropped;
protected Collection<ConnectionMonitor> mappedMonitors;
protected Collection<ConnectionMonitor> droppedMonitors;
private ConnectionMonitor.InfoProvider infoProvider;
private Function<MappedInboundExternalMessage, R> onMessageMapped;
private Function<M, R> onMessageMapped;
private Runnable onMessageDropped;
private Consumer<Exception> onException;
private R emptyResult;
private BinaryOperator<R> combineResults;

private Builder() {}

InboundMappingResultHandler<R> build() {
return new InboundMappingResultHandler<>(this);
}

Builder<R> inboundMapped(final ConnectionMonitor inboundMapped) {
this.inboundMapped = inboundMapped;
return this;
}
protected AbstractBuilder() {}

Builder<R> inboundDropped(final ConnectionMonitor inboundDropped) {
this.inboundDropped = inboundDropped;
return this;
}
protected abstract T getSelf();

Builder<R> infoProvider(final ConnectionMonitor.InfoProvider infoProvider) {
T infoProvider(final ConnectionMonitor.InfoProvider infoProvider) {
this.infoProvider = infoProvider;
return this;
return getSelf();
}

Builder<R> onMessageMapped(
final Function<MappedInboundExternalMessage, R> onMessageMapped) {
T onMessageMapped(final Function<M, R> onMessageMapped) {
this.onMessageMapped = onMessageMapped;
return this;
return getSelf();
}

Builder<R> onMessageDropped(final Runnable onMessageDropped) {
T onMessageDropped(final Runnable onMessageDropped) {
this.onMessageDropped = onMessageDropped;
return this;
return getSelf();
}

Builder<R> onException(final Consumer<Exception> onException) {
T onException(final Consumer<Exception> onException) {
this.onException = onException;
return this;
return getSelf();
}

Builder<R> emptyResult(final R emptyResult) {
T emptyResult(final R emptyResult) {
this.emptyResult = emptyResult;
return this;
return getSelf();
}

Builder<R> combineResults(final BinaryOperator<R> combineResults) {
T combineResults(final BinaryOperator<R> combineResults) {
this.combineResults = combineResults;
return this;
return getSelf();
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,14 +12,8 @@
*/
package org.eclipse.ditto.services.connectivity.messaging;

import static org.eclipse.ditto.model.base.common.ConditionChecker.checkNotNull;
import java.util.Collections;

import java.util.function.BinaryOperator;
import java.util.function.Consumer;
import java.util.function.Function;
import java.util.function.Supplier;

import org.eclipse.ditto.model.base.exceptions.DittoRuntimeException;
import org.eclipse.ditto.services.connectivity.messaging.monitoring.ConnectionMonitor;
import org.eclipse.ditto.services.models.connectivity.MappedInboundExternalMessage;
import org.eclipse.ditto.signals.base.Signal;
Expand All @@ -30,136 +24,41 @@
* {@link MappingResultHandler} for inbound messages. This handler forwards to the given handlers. Additionally it
* calls the {@link MappingResultHandler#onException(Exception)} method for exceptions thrown in handlers and
* increases the according counters for mapped, dropped failed messages.
*
* @param <R> type of results.
*/
final class InboundMappingResultHandler<R> implements MappingResultHandler<MappedInboundExternalMessage, R> {

private final Function<MappedInboundExternalMessage, R> onMessageMapped;
private final Runnable onMessageDropped;
private final Consumer<Exception> onException;
private final R emptyResult;
private final BinaryOperator<R> combineResults;
private final ConnectionMonitor inboundMapped;
private final ConnectionMonitor inboundDropped;
private final ConnectionMonitor.InfoProvider infoProvider;

private InboundMappingResultHandler(final Builder<R> builder) {
onMessageMapped = checkNotNull(builder.onMessageMapped, "onMessageMapped");
onMessageDropped = checkNotNull(builder.onMessageDropped, "onMessageDropped");
onException = checkNotNull(builder.onException, "onException");
inboundMapped = checkNotNull(builder.inboundMapped, "inboundMapped");
inboundDropped = checkNotNull(builder.inboundDropped, "inboundDropped");
infoProvider = checkNotNull(builder.infoProvider, "infoProvider");
emptyResult = checkNotNull(builder.emptyResult, "emptyResult");
combineResults = checkNotNull(builder.combineResults, "combineResults");
}

static <S> Builder<Source<S, ?>> newSourceBuilder() {
return new Builder<Source<S, ?>>()
.emptyResult(Source.empty())
.combineResults(Source::concat);
}

@Override
public R onMessageMapped(final MappedInboundExternalMessage inboundExternalMessage) {
try {
inboundMapped.success(infoProvider);
return onMessageMapped.apply(inboundExternalMessage);
} catch (final Exception e) {
return onException(e);
}
}

@Override
public R onMessageDropped() {
try {
inboundDropped.success(infoProvider);
onMessageDropped.run();
return emptyResult;
} catch (Exception e) {
return onException(e);
}
}

@Override
public R onException(final Exception exception) {
if (exception instanceof DittoRuntimeException) {
inboundMapped.failure(((DittoRuntimeException) exception));
} else {
inboundMapped.exception(exception);
}
onException.accept(exception);
return emptyResult;
}
final class InboundMappingResultHandler
extends AbstractMappingResultHandler<MappedInboundExternalMessage, Source<Signal<?>, ?>> {

@Override
public R combineResults(final R left, final R right) {
return combineResults.apply(left, right);
private InboundMappingResultHandler(final Builder builder) {
super(builder);
}

@Override
public R emptyResult() {
return emptyResult;
static Builder newBuilder() {
return new Builder().emptyResult(Source.empty()).combineResults(Source::concat);
}

static final class Builder<R> {

private ConnectionMonitor inboundMapped;
private ConnectionMonitor inboundDropped;
private ConnectionMonitor.InfoProvider infoProvider;
private Function<MappedInboundExternalMessage, R> onMessageMapped;
private Runnable onMessageDropped;
private Consumer<Exception> onException;
private R emptyResult;
private BinaryOperator<R> combineResults;
static final class Builder extends AbstractBuilder<MappedInboundExternalMessage, Source<Signal<?>, ?>, Builder> {

private Builder() {}

InboundMappingResultHandler<R> build() {
return new InboundMappingResultHandler<>(this);
}

Builder<R> inboundMapped(final ConnectionMonitor inboundMapped) {
this.inboundMapped = inboundMapped;
return this;
}

Builder<R> inboundDropped(final ConnectionMonitor inboundDropped) {
this.inboundDropped = inboundDropped;
@Override
protected Builder getSelf() {
return this;
}

Builder<R> infoProvider(final ConnectionMonitor.InfoProvider infoProvider) {
this.infoProvider = infoProvider;
return this;
InboundMappingResultHandler build() {
return new InboundMappingResultHandler(this);
}

Builder<R> onMessageMapped(
final Function<MappedInboundExternalMessage, R> onMessageMapped) {
this.onMessageMapped = onMessageMapped;
Builder inboundMapped(final ConnectionMonitor inboundMapped) {
mappedMonitors = Collections.singletonList(inboundMapped);
return this;
}

Builder<R> onMessageDropped(final Runnable onMessageDropped) {
this.onMessageDropped = onMessageDropped;
Builder inboundDropped(final ConnectionMonitor inboundDropped) {
droppedMonitors = Collections.singletonList(inboundDropped);
return this;
}

Builder<R> onException(final Consumer<Exception> onException) {
this.onException = onException;
return this;
}

Builder<R> emptyResult(final R emptyResult) {
this.emptyResult = emptyResult;
return this;
}

Builder<R> combineResults(final BinaryOperator<R> combineResults) {
this.combineResults = combineResults;
return this;
}
}

}
Loading

0 comments on commit 6953de7

Please sign in to comment.