Skip to content

Commit

Permalink
#1748 Add honoTenantId configuration for HonoConnection.
Browse files Browse the repository at this point in the history
  • Loading branch information
calohmn committed Oct 30, 2023
1 parent 38af233 commit 19f9c06
Show file tree
Hide file tree
Showing 10 changed files with 646 additions and 78 deletions.
Expand Up @@ -16,8 +16,6 @@
import java.text.MessageFormat;
import java.util.Set;

import org.eclipse.ditto.connectivity.model.Connection;
import org.eclipse.ditto.connectivity.model.ConnectionId;
import org.eclipse.ditto.connectivity.model.HonoAddressAlias;
import org.eclipse.ditto.connectivity.model.UserPasswordCredentials;
import org.eclipse.ditto.connectivity.service.config.DefaultHonoConfig;
Expand All @@ -35,8 +33,6 @@ public final class DefaultHonoConnectionFactory extends HonoConnectionFactory {

private final HonoConfig honoConfig;

private ConnectionId connectionId;

/**
* Constructs a {@code DefaultHonoConnectionFactory} for the specified arguments.
*
Expand All @@ -48,11 +44,6 @@ public DefaultHonoConnectionFactory(final ActorSystem actorSystem, final Config
honoConfig = new DefaultHonoConfig(actorSystem);
}

@Override
protected void preConversion(final Connection honoConnection) {
connectionId = honoConnection.getId();
}

@Override
public URI getBaseUri() {
return honoConfig.getBaseUri();
Expand Down Expand Up @@ -86,13 +77,13 @@ protected UserPasswordCredentials getCredentials() {
@Override
protected String resolveSourceAddress(final HonoAddressAlias honoAddressAlias) {
return MessageFormat.format("hono.{0}.{1}",
honoAddressAlias.getAliasValue(), connectionId);
honoAddressAlias.getAliasValue(), getHonoTenantId());
}

@Override
protected String resolveTargetAddress(final HonoAddressAlias honoAddressAlias) {
return MessageFormat.format("hono.{0}.{1}/'{{thing:id}}'",
honoAddressAlias.getAliasValue(), connectionId);
honoAddressAlias.getAliasValue(), getHonoTenantId());
}

}
Expand Up @@ -63,6 +63,13 @@
*/
public abstract class HonoConnectionFactory implements DittoExtensionPoint {

/**
* The name of the property in the {@code specificConfig} object containing the Hono tenant identifier.
*/
protected static final String SPEC_CONFIG_HONO_TENANT_ID = "honoTenantId";

private String honoTenantId;

/**
* Constructs a {@code HonoConnectionFactory}.
*/
Expand Down Expand Up @@ -112,6 +119,9 @@ public Connection getHonoConnection(final Connection connection) {
connection.getConnectionType())
);

honoTenantId = connection.getSpecificConfig()
.getOrDefault(SPEC_CONFIG_HONO_TENANT_ID, connection.getId().toString());

preConversion(connection);

return ConnectivityModelFactory.newConnectionBuilder(connection)
Expand All @@ -134,6 +144,18 @@ protected void preConversion(final Connection honoConnection) {
// Do nothing by default.
}

/**
* Get the Hono tenant identifier associated with the connection.
*
* @return The Hono tenant identifier.
*/
protected String getHonoTenantId() {
if (honoTenantId == null) {
throw new IllegalStateException("getHonoTenantId invoked before tenant id got determined");
}
return honoTenantId;
}

protected abstract URI getBaseUri();

protected abstract boolean isValidateCertificates();
Expand Down
Expand Up @@ -30,7 +30,6 @@

import org.assertj.core.api.Assertions;
import org.eclipse.ditto.connectivity.model.Connection;
import org.eclipse.ditto.connectivity.model.ConnectionId;
import org.eclipse.ditto.connectivity.model.ConnectivityModelFactory;
import org.eclipse.ditto.connectivity.model.HonoAddressAlias;
import org.eclipse.ditto.connectivity.model.ReplyTarget;
Expand Down Expand Up @@ -58,7 +57,7 @@ public final class DefaultHonoConnectionFactoryTest {

private HonoConfig honoConfig;

private static Connection generateConnectionObjectFromJsonFile( String fileName) throws IOException {
private static Connection generateConnectionObjectFromJsonFile(final String fileName) throws IOException {
final var testClassLoader = DefaultHonoConnectionFactoryTest.class.getClassLoader();
try (final var connectionJsonFileStreamReader = new InputStreamReader(
testClassLoader.getResourceAsStream(fileName)
Expand All @@ -85,13 +84,26 @@ public void newInstanceWithNullActorSystemThrowsException() {
public void getHonoConnectionWithCustomMappingsReturnsExpected() throws IOException {
final var userProvidedHonoConnection =
generateConnectionObjectFromJsonFile("hono-connection-custom-test.json");
final var expectedHonoConnection =
generateConnectionObjectFromJsonFile("hono-connection-custom-expected.json");
final var expectedAdaptedConnection =
generateConnectionObjectFromJsonFile("hono-connection-custom-expected-after-adaptation.json");

final var underTest =
new DefaultHonoConnectionFactory(actorSystemResource.getActorSystem(), ConfigFactory.empty());

assertThat(underTest.getHonoConnection(userProvidedHonoConnection)).isEqualTo(expectedHonoConnection);
assertThat(underTest.getHonoConnection(userProvidedHonoConnection)).isEqualTo(expectedAdaptedConnection);
}

@Test
public void getHonoConnectionWithImplicitTenantIdAndCustomMappingsReturnsExpected() throws IOException {
final var userProvidedHonoConnection =
generateConnectionObjectFromJsonFile("hono-connection-implicit-tenant-custom-test.json");
final var expectedAdaptedConnection =
generateConnectionObjectFromJsonFile("hono-connection-implicit-tenant-custom-expected-after-adaptation.json");

final var underTest =
new DefaultHonoConnectionFactory(actorSystemResource.getActorSystem(), ConfigFactory.empty());

assertThat(underTest.getHonoConnection(userProvidedHonoConnection)).isEqualTo(expectedAdaptedConnection);
}

@Test
Expand All @@ -103,11 +115,11 @@ public void getHonoConnectionWithDefaultMappingReturnsExpected() throws IOExcept
new DefaultHonoConnectionFactory(actorSystemResource.getActorSystem(), ConfigFactory.empty());

assertThat(underTest.getHonoConnection(userProvidedHonoConnection))
.isEqualTo(getExpectedHonoConnection(userProvidedHonoConnection));
.isEqualTo(getExpectedAdaptedHonoConnection(userProvidedHonoConnection));
}

@SuppressWarnings("unchecked")
private Connection getExpectedHonoConnection(final Connection originalConnection) {
private Connection getExpectedAdaptedHonoConnection(final Connection originalConnection) {
final var sourcesByAddress = getSourcesByAddress(originalConnection.getSources());
final var commandReplyTargetHeaderMapping = ConnectivityModelFactory.newHeaderMapping(Map.of(
"correlation-id", "{{ header:correlation-id }}",
Expand All @@ -122,6 +134,9 @@ private Connection getExpectedHonoConnection(final Connection originalConnection
"subject", "{{ header:subject | fn:default(topic:action-subject) }}"
);
final var connectionId = originalConnection.getId();
final String honoTenantId = originalConnection.getSpecificConfig()
.getOrDefault(DefaultHonoConnectionFactory.SPEC_CONFIG_HONO_TENANT_ID, connectionId.toString());
final String expectedResolvedCommandTargetAddress = getExpectedResolvedCommandTargetAddress(honoTenantId);
return ConnectivityModelFactory.newConnectionBuilder(originalConnection)
.uri(honoConfig.getBaseUri().toString().replaceFirst("(\\S+://)(\\S+)",
"$1" + URLEncoder.encode(honoConfig.getUserPasswordCredentials().getUsername(), StandardCharsets.UTF_8)
Expand All @@ -135,22 +150,22 @@ private Connection getExpectedHonoConnection(final Connection originalConnection
)
.setSources(List.of(
ConnectivityModelFactory.newSourceBuilder(sourcesByAddress.get(TELEMETRY.getAliasValue()))
.addresses(Set.of(getExpectedResolvedSourceAddress(TELEMETRY, connectionId)))
.addresses(Set.of(getExpectedResolvedSourceAddress(TELEMETRY, honoTenantId)))
.replyTarget(ReplyTarget.newBuilder()
.address(getExpectedResolvedCommandTargetAddress(connectionId))
.address(expectedResolvedCommandTargetAddress)
.headerMapping(commandReplyTargetHeaderMapping)
.build())
.build(),
ConnectivityModelFactory.newSourceBuilder(sourcesByAddress.get(EVENT.getAliasValue()))
.addresses(Set.of(getExpectedResolvedSourceAddress(EVENT, connectionId)))
.addresses(Set.of(getExpectedResolvedSourceAddress(EVENT, honoTenantId)))
.replyTarget(ReplyTarget.newBuilder()
.address(getExpectedResolvedCommandTargetAddress(connectionId))
.address(expectedResolvedCommandTargetAddress)
.headerMapping(commandReplyTargetHeaderMapping)
.build())
.build(),
ConnectivityModelFactory.newSourceBuilder(
sourcesByAddress.get(COMMAND_RESPONSE.getAliasValue()))
.addresses(Set.of(getExpectedResolvedSourceAddress(COMMAND_RESPONSE, connectionId)))
.addresses(Set.of(getExpectedResolvedSourceAddress(COMMAND_RESPONSE, honoTenantId)))
.headerMapping(ConnectivityModelFactory.newHeaderMapping(Map.of(
"correlation-id", "{{ header:correlation-id }}",
"status", "{{ header:status }}"
Expand All @@ -159,8 +174,8 @@ private Connection getExpectedHonoConnection(final Connection originalConnection
))
.setTargets(List.of(
ConnectivityModelFactory.newTargetBuilder(targets.get(0))
.address(getExpectedResolvedCommandTargetAddress(connectionId))
.originalAddress(getExpectedResolvedCommandTargetAddress(connectionId))
.address(expectedResolvedCommandTargetAddress)
.originalAddress(expectedResolvedCommandTargetAddress)
.headerMapping(ConnectivityModelFactory.newHeaderMapping(
Stream.concat(
basicAdditionalTargetHeaderMappingEntries.entrySet().stream(),
Expand All @@ -170,8 +185,8 @@ private Connection getExpectedHonoConnection(final Connection originalConnection
))
.build(),
ConnectivityModelFactory.newTargetBuilder(targets.get(1))
.address(getExpectedResolvedCommandTargetAddress(connectionId))
.originalAddress(getExpectedResolvedCommandTargetAddress(connectionId))
.address(expectedResolvedCommandTargetAddress)
.originalAddress(expectedResolvedCommandTargetAddress)
.headerMapping(ConnectivityModelFactory.newHeaderMapping(
basicAdditionalTargetHeaderMappingEntries
))
Expand All @@ -186,12 +201,12 @@ private static Map<String, Source> getSourcesByAddress(final Iterable<Source> so
return result;
}

private static String getExpectedResolvedSourceAddress(final HonoAddressAlias honoAddressAlias, final ConnectionId connectionId) {
return "hono." + honoAddressAlias.getAliasValue() + "." + connectionId;
private static String getExpectedResolvedSourceAddress(final HonoAddressAlias honoAddressAlias, final String honoTenantId) {
return "hono." + honoAddressAlias.getAliasValue() + "." + honoTenantId;
}

private static String getExpectedResolvedCommandTargetAddress(final ConnectionId connectionId) {
return "hono." + HonoAddressAlias.COMMAND.getAliasValue() + "." + connectionId + "/{{thing:id}}";
private static String getExpectedResolvedCommandTargetAddress(final String honoTenantId) {
return "hono." + HonoAddressAlias.COMMAND.getAliasValue() + "." + honoTenantId + "/{{thing:id}}";
}

}
Expand Up @@ -221,7 +221,7 @@ public void testConnectionTypeHono() throws IOException {
.toBuilder()
.id(connectionId)
.build();
final var expectedHonoConnection = generateConnectionObjectFromJsonFile("hono-connection-custom-expected.json", connectionId)
final var expectedHonoConnection = generateConnectionObjectFromJsonFile("hono-connection-custom-expected-after-adaptation.json", connectionId)
.toBuilder()
.id(connectionId)
.build();
Expand Down
Expand Up @@ -7,7 +7,7 @@
"sources": [
{
"addresses": [
"hono.telemetry.test-connection-id"
"hono.telemetry.hono-tenant-id"
],
"consumerCount": 1,
"qos": 0,
Expand All @@ -32,7 +32,7 @@
"implicitStandaloneThingCreation"
],
"replyTarget": {
"address": "hono.command.test-connection-id/{{thing:id}}",
"address": "hono.command.hono-tenant-id/{{thing:id}}",
"headerMapping": {
"device_id": "custom_value1",
"user_key1": "user_value1",
Expand All @@ -48,7 +48,7 @@
},
{
"addresses": [
"hono.event.test-connection-id"
"hono.event.hono-tenant-id"
],
"consumerCount": 1,
"qos": 1,
Expand All @@ -72,7 +72,7 @@
"implicitStandaloneThingCreation"
],
"replyTarget": {
"address": "hono.command.test-connection-id/{{thing:id}}",
"address": "hono.command.hono-tenant-id/{{thing:id}}",
"headerMapping": {
"device_id": "{{ thing:id }}",
"subject": "custom_value2",
Expand All @@ -88,7 +88,7 @@
},
{
"addresses": [
"hono.command_response.test-connection-id"
"hono.command_response.hono-tenant-id"
],
"consumerCount": 1,
"qos": 0,
Expand Down Expand Up @@ -120,7 +120,7 @@
],
"targets": [
{
"address": "hono.command.test-connection-id/{{thing:id}}",
"address": "hono.command.hono-tenant-id/{{thing:id}}",
"topics": [
"_/_/things/live/messages",
"_/_/things/live/commands"
Expand All @@ -137,7 +137,7 @@
}
},
{
"address": "hono.command.test-connection-id/{{thing:id}}",
"address": "hono.command.hono-tenant-id/{{thing:id}}",
"topics": [
"_/_/things/twin/events",
"_/_/things/live/events"
Expand All @@ -158,6 +158,7 @@
"validateCertificates": false,
"processorPoolSize": 5,
"specificConfig": {
"honoTenantId": "hono-tenant-id",
"saslMechanism": "plain",
"bootstrapServers": "tcp://server1:port1,tcp://server2:port2,tcp://server3:port3",
"groupId": "custom_groupId"
Expand Down
Expand Up @@ -148,6 +148,7 @@
"validateCertificates": true,
"processorPoolSize": 5,
"specificConfig": {
"honoTenantId": "hono-tenant-id",
"groupId": "custom_groupId"
},
"mappingDefinitions": {
Expand Down
Expand Up @@ -135,6 +135,9 @@
"failoverEnabled": true,
"validateCertificates": true,
"processorPoolSize": 5,
"specificConfig": {
"honoTenantId": "hono-tenant-id"
},
"mappingDefinitions": {
"implicitEdgeThingCreation": {
"mappingEngine": "ImplicitThingCreation",
Expand Down

0 comments on commit 19f9c06

Please sign in to comment.