Skip to content

Commit

Permalink
remove channel from topic path for policy commands
Browse files Browse the repository at this point in the history
Signed-off-by: Dominik Guggemos <dominik.guggemos@bosch-si.com>
  • Loading branch information
dguggemos committed Feb 10, 2020
1 parent dafba10 commit 4acad50
Show file tree
Hide file tree
Showing 49 changed files with 1,388 additions and 1,620 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,7 @@
public interface Adapter<T extends Jsonifiable<?>> {

/**
* Maps the given {@code adaptable} to it's corresponding {@code T}.
* Maps the given {@code adaptable} to its corresponding {@code T}.
*
* @param adaptable the adaptable to map.
* @return the mapped object.
Expand All @@ -43,10 +43,10 @@ default Adaptable toAdaptable(final T t) {
}

/**
* Maps the given {@code t} to it's corresponding {@code Adaptable}.
* Maps the given {@code t} to its corresponding {@code Adaptable}.
*
* @param t the object to map.
* @param channel the Channel (Twin/Live) to use.
* @param channel the channel that was used to send the signal
* @return the mapped adaptable.
* @throws NullPointerException if {@code t} is {@code null}.
* @throws IllegalArgumentException if {@code channel} is unknown.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,9 @@ public Adapter<? extends Signal<?>> getAdapter(final Adaptable adaptable, final
if (TopicPath.Channel.LIVE.equals(channel)) { // /<group>/live
adapter = Optional.ofNullable(fromLiveAdaptable(adaptable, adapterProvider));
} else if (TopicPath.Channel.TWIN.equals(channel)) { // /<group>/twin
adapter = Optional.ofNullable(fromTwinAdaptable(adaptable, adapterProvider));
adapter = Optional.ofNullable(signalFromAdaptable(adaptable, adapterProvider));
} else if (TopicPath.Channel.NONE.equals(channel)) { // no channel (policies group)
adapter = Optional.ofNullable(signalFromAdaptable(adaptable, adapterProvider));
} else {
adapter = Optional.empty();
}
Expand All @@ -78,18 +80,12 @@ private Adapter<? extends Signal<?>> fromLiveAdaptable(final Adaptable adaptable
return adapterProvider.getMessageCommandAdapter();
}
} else {
return signalFromThingAdaptable(adaptable, adapterProvider); // /<group>/live/(commands|events)
return signalFromAdaptable(adaptable, adapterProvider); // /<group>/live/(commands|events)
}
}

private Adapter<? extends Signal<?>> fromTwinAdaptable(final Adaptable adaptable,
final AdapterProvider adapterProvider) {
final TopicPath topicPath = adaptable.getTopicPath();
return signalFromThingAdaptable(adaptable, adapterProvider); // /<group>/twin/(commands|events)
}

@Nullable
private Adapter<? extends Signal<?>> signalFromThingAdaptable(final Adaptable adaptable,
private Adapter<? extends Signal<?>> signalFromAdaptable(final Adaptable adaptable,
final AdapterProvider adapterProvider) {
final TopicPath topicPath = adaptable.getTopicPath();
if (TopicPath.Criterion.COMMANDS.equals(topicPath.getCriterion())) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,11 @@
package org.eclipse.ditto.protocoladapter;

import static org.eclipse.ditto.model.base.common.ConditionChecker.checkNotNull;
import static org.eclipse.ditto.protocoladapter.TopicPath.Channel.LIVE;
import static org.eclipse.ditto.protocoladapter.TopicPath.Channel.NONE;
import static org.eclipse.ditto.protocoladapter.TopicPath.Channel.TWIN;

import java.util.Arrays;

import org.eclipse.ditto.model.base.exceptions.DittoRuntimeException;
import org.eclipse.ditto.model.base.headers.DittoHeaderDefinition;
Expand All @@ -26,6 +31,7 @@
import org.eclipse.ditto.signals.commands.base.CommandResponse;
import org.eclipse.ditto.signals.commands.messages.MessageCommand;
import org.eclipse.ditto.signals.commands.messages.MessageCommandResponse;
import org.eclipse.ditto.signals.commands.policies.PolicyCommand;
import org.eclipse.ditto.signals.commands.policies.PolicyCommandResponse;
import org.eclipse.ditto.signals.commands.policies.PolicyErrorResponse;
import org.eclipse.ditto.signals.commands.policies.modify.PolicyModifyCommand;
Expand Down Expand Up @@ -102,8 +108,10 @@ public Adaptable toAdaptable(Command<?> command) {
public Adaptable toAdaptable(final Signal<?> signal) {
final TopicPath.Channel channel = determineChannel(signal);
if (signal instanceof MessageCommand) {
checkChannel(channel, signal, LIVE);
return toAdaptable((MessageCommand<?, ?>) signal);
} else if (signal instanceof MessageCommandResponse) {
checkChannel(channel, signal, LIVE);
return toAdaptable((MessageCommandResponse<?, ?>) signal);
} else if (signal instanceof Command) {
return toAdaptable((Command<?>) signal, channel);
Expand All @@ -115,27 +123,25 @@ public Adaptable toAdaptable(final Signal<?> signal) {
throw UnknownSignalException.newBuilder(signal.getName()).dittoHeaders(signal.getDittoHeaders()).build();
}

private TopicPath.Channel determineChannel(final Signal<?> signal) {
final boolean isLiveSignal =
signal.getDittoHeaders().getChannel().filter(TopicPath.Channel.LIVE.getName()::equals).isPresent();
return isLiveSignal ? TopicPath.Channel.LIVE : TopicPath.Channel.TWIN;
}

@Override
public Adaptable toAdaptable(final CommandResponse<?> commandResponse, final TopicPath.Channel channel) {
if (commandResponse instanceof MessageCommandResponse && channel == TopicPath.Channel.LIVE) {
if (commandResponse instanceof MessageCommandResponse) {
checkChannel(channel, commandResponse, LIVE);
return toAdaptable((MessageCommandResponse<?, ?>) commandResponse);
} else if (commandResponse instanceof ThingCommandResponse) {
checkChannel(channel, commandResponse, LIVE, TWIN);
return toAdaptable((ThingCommandResponse<?>) commandResponse, channel);
} else if (commandResponse instanceof PolicyCommandResponse) {
return toAdaptable((PolicyCommandResponse<?>) commandResponse, channel);
checkChannel(channel, commandResponse, NONE);
return toAdaptable((PolicyCommandResponse<?>) commandResponse);
} else {
throw UnknownCommandResponseException.newBuilder(commandResponse.getName()).build();
}
}

@Override
public Adaptable toAdaptable(final ThingCommandResponse<?> thingCommandResponse, final TopicPath.Channel channel) {
checkChannel(channel, thingCommandResponse, LIVE, TWIN);
if (thingCommandResponse instanceof ThingQueryCommandResponse) {
return toAdaptable((ThingQueryCommandResponse<?>) thingCommandResponse, channel);
} else if (thingCommandResponse instanceof ThingModifyCommandResponse) {
Expand All @@ -148,118 +154,152 @@ public Adaptable toAdaptable(final ThingCommandResponse<?> thingCommandResponse,
}

@Override
public Adaptable toAdaptable(final PolicyCommandResponse<?> policyCommandResponse,
final TopicPath.Channel channel) {
public Adaptable toAdaptable(final PolicyCommandResponse<?> policyCommandResponse) {
if (policyCommandResponse instanceof PolicyQueryCommandResponse) {
return toAdaptable((PolicyQueryCommandResponse<?>) policyCommandResponse, channel);
return toAdaptable((PolicyQueryCommandResponse<?>) policyCommandResponse);
} else if (policyCommandResponse instanceof PolicyModifyCommandResponse) {
return toAdaptable((PolicyModifyCommandResponse<?>) policyCommandResponse, channel);
return toAdaptable((PolicyModifyCommandResponse<?>) policyCommandResponse);
} else if (policyCommandResponse instanceof PolicyErrorResponse) {
return toAdaptable((PolicyErrorResponse) policyCommandResponse, channel);
return toAdaptable((PolicyErrorResponse) policyCommandResponse);
} else {
throw UnknownCommandResponseException.newBuilder(policyCommandResponse.getName()).build();
}
}

@Override
public Adaptable toAdaptable(final Command<?> command, final TopicPath.Channel channel) {
if (command instanceof MessageCommand && channel == TopicPath.Channel.LIVE) {
if (command instanceof MessageCommand) {
checkChannel(channel, command, LIVE);
return toAdaptable((MessageCommand<?, ?>) command);
} else if (command instanceof ThingModifyCommand) {
checkChannel(channel, command, LIVE, TWIN);
return toAdaptable((ThingModifyCommand<?>) command, channel);
} else if (command instanceof ThingQueryCommand) {
checkChannel(channel, command, LIVE, TWIN);
return toAdaptable((ThingQueryCommand<?>) command, channel);
} else if (command instanceof PolicyModifyCommand) {
return toAdaptable((PolicyModifyCommand<?>) command, channel);
checkChannel(channel, command, NONE);
return toAdaptable((PolicyModifyCommand<?>) command);
} else if (command instanceof PolicyQueryCommand) {
return toAdaptable((PolicyQueryCommand<?>) command, channel);
checkChannel(channel, command, NONE);
return toAdaptable((PolicyQueryCommand<?>) command);
} else {
throw UnknownCommandException.newBuilder(command.getName()).build();
}
}

@Override
public Adaptable toAdaptable(final ThingModifyCommand<?> thingModifyCommand, final TopicPath.Channel channel) {
return thingsAdapters.getModifyCommandAdapter().toAdaptable(thingModifyCommand, channel);
public Adaptable toAdaptable(final ThingQueryCommand<?> thingQueryCommand, final TopicPath.Channel channel) {
checkChannel(channel, thingQueryCommand, TWIN, LIVE);
return thingsAdapters.getQueryCommandAdapter().toAdaptable(thingQueryCommand, channel);
}

@Override
public Adaptable toAdaptable(final PolicyModifyCommand<?> policyModifyCommand, final TopicPath.Channel channel) {
return policiesAdapters.getModifyCommandAdapter().toAdaptable(policyModifyCommand, channel);
public Adaptable toAdaptable(final ThingQueryCommandResponse<?> thingQueryCommandResponse,
final TopicPath.Channel channel) {
checkChannel(channel, thingQueryCommandResponse, TWIN, LIVE);
return thingsAdapters.getQueryCommandResponseAdapter().toAdaptable(thingQueryCommandResponse, channel);
}

@Override
public Adaptable toAdaptable(final ThingModifyCommandResponse<?> thingModifyCommandResponse,
final TopicPath.Channel channel) {
return thingsAdapters.getModifyCommandResponseAdapter().toAdaptable(thingModifyCommandResponse, channel);
public Adaptable toAdaptable(final ThingModifyCommand<?> thingModifyCommand, final TopicPath.Channel channel) {
checkChannel(channel, thingModifyCommand, TWIN, LIVE);
return thingsAdapters.getModifyCommandAdapter().toAdaptable(thingModifyCommand, channel);
}

@Override
public Adaptable toAdaptable(final PolicyModifyCommandResponse<?> policyModifyCommandResponse,
public Adaptable toAdaptable(final ThingModifyCommandResponse<?> thingModifyCommandResponse,
final TopicPath.Channel channel) {
return policiesAdapters.getModifyCommandResponseAdapter().toAdaptable(policyModifyCommandResponse, channel);
checkChannel(channel, thingModifyCommandResponse, TWIN, LIVE);
return thingsAdapters.getModifyCommandResponseAdapter().toAdaptable(thingModifyCommandResponse, channel);
}

@Override
public Adaptable toAdaptable(final ThingQueryCommand<?> thingQueryCommand, final TopicPath.Channel channel) {
return thingsAdapters.getQueryCommandAdapter().toAdaptable(thingQueryCommand, channel);
public Adaptable toAdaptable(final ThingErrorResponse thingErrorResponse, final TopicPath.Channel channel) {
checkChannel(channel, thingErrorResponse, TWIN, LIVE);
return thingsAdapters.getErrorResponseAdapter().toAdaptable(thingErrorResponse, channel);
}

@Override
public Adaptable toAdaptable(final PolicyQueryCommand<?> policyQueryCommand, final TopicPath.Channel channel) {
return policiesAdapters.getQueryCommandAdapter().toAdaptable(policyQueryCommand, channel);
public Adaptable toAdaptable(final Event<?> event, final TopicPath.Channel channel) {
if (event instanceof ThingEvent) {
checkChannel(channel, event, TWIN, LIVE);
return toAdaptable((ThingEvent<?>) event, channel);
} else {
throw UnknownEventException.newBuilder(event.getName()).build();
}
}

@Override
public Adaptable toAdaptable(final ThingQueryCommandResponse<?> thingQueryCommandResponse,
final TopicPath.Channel channel) {
return thingsAdapters.getQueryCommandResponseAdapter().toAdaptable(thingQueryCommandResponse, channel);
public Adaptable toAdaptable(final ThingEvent<?> thingEvent, final TopicPath.Channel channel) {
checkChannel(channel, thingEvent, TWIN, LIVE);
return thingsAdapters.getEventAdapter().toAdaptable(thingEvent, channel);
}

@Override
public Adaptable toAdaptable(final PolicyQueryCommandResponse<?> policyQueryCommandResponse,
final TopicPath.Channel channel) {
return policiesAdapters.getQueryCommandResponseAdapter().toAdaptable(policyQueryCommandResponse, channel);
public Adaptable toAdaptable(final PolicyQueryCommand<?> policyQueryCommand) {
return policiesAdapters.getQueryCommandAdapter().toAdaptable(policyQueryCommand, NONE);
}

@Override
public Adaptable toAdaptable(final ThingErrorResponse thingErrorResponse, final TopicPath.Channel channel) {
return thingsAdapters.getErrorResponseAdapter().toAdaptable(thingErrorResponse, channel);
public Adaptable toAdaptable(final PolicyQueryCommandResponse<?> policyQueryCommandResponse) {
return policiesAdapters.getQueryCommandResponseAdapter().toAdaptable(policyQueryCommandResponse, NONE);
}

@Override
public Adaptable toAdaptable(final PolicyErrorResponse policyErrorResponse,
final TopicPath.Channel channel) {
return policiesAdapters.getErrorResponseAdapter().toAdaptable(policyErrorResponse, channel);
public Adaptable toAdaptable(final PolicyModifyCommand<?> policyModifyCommand) {
return policiesAdapters.getModifyCommandAdapter().toAdaptable(policyModifyCommand, NONE);
}

@Override
public Adaptable toAdaptable(final Event<?> event, final TopicPath.Channel channel) {
if (event instanceof ThingEvent) {
return toAdaptable((ThingEvent<?>) event, channel);
} else {
throw UnknownEventException.newBuilder(event.getName()).build();
}
public Adaptable toAdaptable(final PolicyModifyCommandResponse<?> policyModifyCommandResponse) {
return policiesAdapters.getModifyCommandResponseAdapter().toAdaptable(policyModifyCommandResponse, NONE);
}

@Override
public Adaptable toAdaptable(final ThingEvent<?> thingEvent, final TopicPath.Channel channel) {
return thingsAdapters.getEventAdapter().toAdaptable(thingEvent, channel);
public Adaptable toAdaptable(final PolicyErrorResponse policyErrorResponse) {
return policiesAdapters.getErrorResponseAdapter().toAdaptable(policyErrorResponse, NONE);
}

@Override
public Adaptable toAdaptable(final MessageCommand<?, ?> messageCommand) {
return thingsAdapters.getMessageCommandAdapter().toAdaptable(messageCommand);
return thingsAdapters.getMessageCommandAdapter().toAdaptable(messageCommand, LIVE);
}

@Override
public Adaptable toAdaptable(final MessageCommandResponse<?, ?> messageCommandResponse) {
return thingsAdapters.getMessageCommandResponseAdapter().toAdaptable(messageCommandResponse);
return thingsAdapters.getMessageCommandResponseAdapter()
.toAdaptable(messageCommandResponse, LIVE);
}

@Override
public HeaderTranslator headerTranslator() {
return headerTranslator;
}

private TopicPath.Channel determineChannel(final Signal<?> signal) {
// internally a twin command/event and live command/event are distinguished only by the channel header i.e.
// a twin and live command "look the same" except for the channel header
final boolean isLiveSignal =
signal.getDittoHeaders().getChannel().filter(LIVE.getName()::equals).isPresent();

final boolean isPolicyCommand = signal instanceof PolicyCommand || signal instanceof PolicyCommandResponse;

return isPolicyCommand ? NONE // policy commands have no channel
: isLiveSignal ? LIVE // live signals (live commands/events) use the live channel
: TopicPath.Channel.TWIN; // all other commands use the twin channel
}

private void checkChannel(final TopicPath.Channel channel,
final Signal<?> signal, final TopicPath.Channel... supportedChannels) {
if (!Arrays.asList(supportedChannels).contains(channel)) {
throw unknownChannelException(signal, channel);
}
}

private UnknownChannelException unknownChannelException(final Signal<?> signal, final TopicPath.Channel channel) {
return UnknownChannelException.newBuilder(channel, signal.getType())
.dittoHeaders(signal.getDittoHeaders())
.build();
}
}
Loading

0 comments on commit 4acad50

Please sign in to comment.