Skip to content

Commit

Permalink
change to actor based hierarchy because of issues with JMS client (bl…
Browse files Browse the repository at this point in the history
…ocking)

Signed-off-by: Dominik Guggemos <dominik.guggemos@bosch-si.com>
  • Loading branch information
dguggemos committed Feb 12, 2018
1 parent 1279eaa commit 7feaf23
Show file tree
Hide file tree
Showing 29 changed files with 1,145 additions and 474 deletions.
3 changes: 2 additions & 1 deletion bom/pom.xml
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,8 @@
<jackson-databind.version>2.8.10</jackson-databind.version>
<asm.version>5.2</asm.version>
<javax-jms-api.version>2.0.1</javax-jms-api.version>
<qpid-jms-client.version>0.29.0</qpid-jms-client.version>
<!-- qpid-jms-client version 0.29 has a strange bug - beware of updating -->
<qpid-jms-client.version>0.28.0</qpid-jms-client.version>
<reactive-streams.version>1.0.1</reactive-streams.version>

<slf4j.version>1.7.25</slf4j.version>
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -158,13 +158,6 @@ final class JsonFields {
JsonFactory.newStringFieldDefinition("id", FieldType.REGULAR, JsonSchemaVersion.V_1,
JsonSchemaVersion.V_2);

/**
* JSON field containing the {@code AmqpConnection} type.
*/
public static final JsonFieldDefinition<String> TYPE =
JsonFactory.newStringFieldDefinition("type", FieldType.REGULAR, JsonSchemaVersion.V_1,
JsonSchemaVersion.V_2);

/**
* JSON field containing the {@code AmqpConnection} uri.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,8 @@
*/
package org.eclipse.ditto.model.amqpbridge;

import static org.eclipse.ditto.model.base.common.ConditionChecker.checkArgument;
import static org.eclipse.ditto.model.base.common.ConditionChecker.checkNotEmpty;
import static org.eclipse.ditto.model.base.common.ConditionChecker.checkNotNull;

import java.util.Arrays;
Expand Down Expand Up @@ -60,6 +62,22 @@ public static Optional<ConnectionType> forName(final CharSequence name) {
.findFirst();
}

/**
* Extract the connectionType from the connectionId whichis prefixed with the connectionType (i.e. type:id).
*
* @param id the connection id.
* @return the ConnectionType or an empty optional.
*/
public static Optional<ConnectionType> fromConnectionId(final CharSequence id) {
checkNotNull(id, "id");
checkNotEmpty(id, "id");
final int colonPosition = checkArgument(id.toString().indexOf(':'), idx -> idx > 0);
final CharSequence type = id.subSequence(0, colonPosition);
return Arrays.stream(values())
.filter(c -> c.name.contentEquals(type))
.findFirst();
}

@Override
public int length() {
return name.length();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@ public static ImmutableAmqpConnection of(final String id,
final AuthorizationSubject authorizationSubject, final Set<String> sources, final boolean failoverEnabled,
final boolean validateCertificates, final int throttle) {
checkNotNull(id, "ID");
checkNotNull(connectionType, "Connection Type");
checkNotNull(uri, "URI");
checkNotNull(authorizationSubject, "Authorization Subject");
checkNotNull(sources, "Sources");
Expand All @@ -124,7 +125,7 @@ public static ImmutableAmqpConnection of(final String id,
*/
public static ImmutableAmqpConnection fromJson(final JsonObject jsonObject) {
final String readId = jsonObject.getValueOrThrow(JsonFields.ID);
final ConnectionType readConnectionType = ConnectionType.forName(jsonObject.getValueOrThrow(JsonFields.TYPE))
final ConnectionType readConnectionType = ConnectionType.forName(readId.substring(0, readId.indexOf(':')))
.orElseThrow(() -> JsonParseException.newBuilder().message("Invalid connection type.").build());
final String readUri = jsonObject.getValueOrThrow(JsonFields.URI);
final AuthorizationSubject readAuthorizationSubject =
Expand Down Expand Up @@ -212,7 +213,6 @@ public JsonObject toJson(final JsonSchemaVersion schemaVersion, final Predicate<

jsonObjectBuilder.set(JsonFields.SCHEMA_VERSION, schemaVersion.toInt(), predicate);
jsonObjectBuilder.set(JsonFields.ID, id, predicate);
jsonObjectBuilder.set(JsonFields.TYPE, connectionType.getName(), predicate);
jsonObjectBuilder.set(JsonFields.URI, uri, predicate);
jsonObjectBuilder.set(JsonFields.AUTHORIZATION_SUBJECT, authorizationSubject.getId(), predicate);
jsonObjectBuilder.set(JsonFields.SOURCES, sources.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,9 +11,12 @@
*/
package org.eclipse.ditto.model.amqpbridge;

import static org.assertj.core.api.Assertions.assertThat;
import static org.mutabilitydetector.unittesting.MutabilityAssert.assertInstancesOf;
import static org.mutabilitydetector.unittesting.MutabilityMatchers.areImmutable;

import java.util.Optional;

import org.junit.Test;

import nl.jqno.equalsverifier.EqualsVerifier;
Expand All @@ -32,4 +35,15 @@ public void assertImmutability() {
assertInstancesOf(ConnectionType.class, areImmutable());
}

}
@Test
public void testFromConnectionId() {
final Optional<ConnectionType> actual = ConnectionType.fromConnectionId(ConnectionType.AMQP_10 + ":123");
assertThat(actual).containsSame(ConnectionType.AMQP_10);
}

@Test
public void testFromInvalidConnectionId() {
final Optional<ConnectionType> actual = ConnectionType.fromConnectionId("amqp-010:123");
assertThat(actual).isEmpty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,6 @@ public final class ImmutableConnectionTest {

private static final JsonObject KNOWN_JSON = JsonObject.newBuilder()
.set(AmqpConnection.JsonFields.ID, ID)
.set(AmqpConnection.JsonFields.TYPE, TYPE.getName())
.set(AmqpConnection.JsonFields.URI, URI)
.set(AmqpConnection.JsonFields.AUTHORIZATION_SUBJECT, AUTHORIZATION_SUBJECT.getId())
.set(AmqpConnection.JsonFields.SOURCES, SOURCES.stream()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -14,14 +14,14 @@

import java.lang.reflect.InvocationTargetException;
import java.nio.ByteBuffer;
import java.nio.charset.StandardCharsets;
import java.util.List;
import java.util.Map;
import java.util.Optional;
import java.util.concurrent.TimeUnit;
import java.util.stream.Collectors;

import javax.jms.BytesMessage;
import javax.jms.JMSException;
import javax.jms.TextMessage;

import org.eclipse.ditto.json.JsonFactory;
Expand Down Expand Up @@ -138,21 +138,17 @@ public Receive createReceive() {
}

private void handle(final InternalMessage m) {
try {

final TraceContext traceContext = Kamon.tracer().newContext("commandProcessor",
Option.apply(DittoHeaders.of(m.getHeaders()).getCorrelationId().orElse("no-correlation-id")));
final Command<?> command = buildCommandFromPublicProtocol(m, traceContext);
traceContext.finish();
final TraceContext traceContext = Kamon.tracer().newContext("commandProcessor",
Option.apply(DittoHeaders.of(m.getHeaders()).getCorrelationId().orElse("no-correlation-id")));
final Command<?> command = buildCommandFromPublicProtocol(m, traceContext);
traceContext.finish();

if (command != null) {
traceCommand(command);
log.info("Publishing '{}' to '{}'", command.getType(), pubSubTargetActorPath);
pubSubMediator.tell(new DistributedPubSubMediator.Send(pubSubTargetActorPath, command, true),
getSelf());
}
} finally {
getSender().tell(m.getAckMessage(), self());
if (command != null) {
traceCommand(command);
log.info("Publishing '{}' to '{}'", command.getType(), pubSubTargetActorPath);
pubSubMediator.tell(new DistributedPubSubMediator.Send(pubSubTargetActorPath, command, true),
getSelf());
}
}

Expand Down Expand Up @@ -202,8 +198,7 @@ private void traceCommand(final Command<?> command) {
}

private Command<?> buildCommandFromPublicProtocol(final InternalMessage message,
final TraceContext traceContext)
{
final TraceContext traceContext) {
try {
final DittoHeaders dittoHeaders = DittoHeaders.of(message.getHeaders());
final String contentType = dittoHeaders.get("content-type");
Expand Down Expand Up @@ -264,9 +259,18 @@ private Command<?> buildCommandFromPublicProtocol(final InternalMessage message,
}

if (jsonifiableAdaptable == null) {
// fall back trying to interpret as DittoProtocol
final JsonObject publicCommandJsonObject = JsonFactory.newObject(message.getPayload().toString());

// best-effort approach, try to read (utf8) string from payload TODO check if this makes sense
// may be null, which means there was neither a text nor a byte payload
final String stringPayload = message.getTextPayload()
.orElseGet(() -> message.getBytePayload()
.map(ByteBuffer::array)
.map(ba -> new String(ba, StandardCharsets.UTF_8))
.orElseThrow(() -> new IllegalArgumentException("The received message payload " +
"was null, which is not a valid json command.")));

// fall back trying to interpret as DittoProtocol
final JsonObject publicCommandJsonObject = JsonFactory.newObject(stringPayload);

// use correlationId from json payload if present
// TODO DG rly required??
Expand Down Expand Up @@ -295,7 +299,6 @@ private Command<?> buildCommandFromPublicProtocol(final InternalMessage message,
}



private Map<String, PayloadMapper> loadPayloadMappers(final PayloadMapperFactory factory,
final List<MappingContext> mappingContexts) {
return mappingContexts.stream().collect(Collectors.toMap(MappingContext::getContentType, mappingContext -> {
Expand Down
Loading

0 comments on commit 7feaf23

Please sign in to comment.