-
Notifications
You must be signed in to change notification settings - Fork 216
/
DefaultClientActorPropsFactory.java
171 lines (156 loc) · 8.05 KB
/
DefaultClientActorPropsFactory.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
/*
* Copyright (c) 2017 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.connectivity.service.messaging;
import java.util.Map;
import java.util.Optional;
import javax.annotation.Nullable;
import javax.annotation.concurrent.Immutable;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.connectivity.api.HonoConfig;
import org.eclipse.ditto.connectivity.model.Connection;
import org.eclipse.ditto.connectivity.model.ConnectionId;
import org.eclipse.ditto.connectivity.model.ConnectionType;
import org.eclipse.ditto.connectivity.model.ConnectivityModelFactory;
import org.eclipse.ditto.connectivity.model.HonoAddressAlias;
import org.eclipse.ditto.connectivity.model.Source;
import org.eclipse.ditto.connectivity.model.Target;
import org.eclipse.ditto.connectivity.service.messaging.amqp.AmqpClientActor;
import org.eclipse.ditto.connectivity.service.messaging.httppush.HttpPushClientActor;
import org.eclipse.ditto.connectivity.service.messaging.kafka.KafkaClientActor;
import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.HiveMqtt3ClientActor;
import org.eclipse.ditto.connectivity.service.messaging.mqtt.hivemq.HiveMqtt5ClientActor;
import org.eclipse.ditto.connectivity.service.messaging.rabbitmq.RabbitMQClientActor;
import org.eclipse.ditto.json.JsonArray;
import org.eclipse.ditto.json.JsonFactory;
import org.eclipse.ditto.json.JsonObject;
import org.eclipse.ditto.json.JsonObjectBuilder;
import org.eclipse.ditto.json.JsonValue;
import com.typesafe.config.Config;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
/**
* The default implementation of {@link ClientActorPropsFactory}. Singleton which is created just once
* and otherwise returns the already created instance.
*/
@Immutable
public final class DefaultClientActorPropsFactory implements ClientActorPropsFactory {
@Nullable private static DefaultClientActorPropsFactory instance;
private DefaultClientActorPropsFactory() {}
/**
* Returns an instance of {@code DefaultClientActorPropsFactory}. Creates a new one if not already done.
*
* @return the factory instance.
*/
public static DefaultClientActorPropsFactory getInstance() {
if (null == instance) {
instance = new DefaultClientActorPropsFactory();
}
return instance;
}
@Override
public Props getActorPropsForType(final Connection connection, final ActorRef proxyActor,
final ActorRef connectionActor,
final ActorSystem actorSystem,
final DittoHeaders dittoHeaders,
final Config connectivityConfigOverwrites) {
final ConnectionType connectionType = connection.getConnectionType();
final Props result;
switch (connectionType) {
case AMQP_091:
result = RabbitMQClientActor.props(connection, proxyActor, connectionActor, dittoHeaders,
connectivityConfigOverwrites);
break;
case AMQP_10:
result = AmqpClientActor.props(connection, proxyActor, connectionActor, connectivityConfigOverwrites,
actorSystem, dittoHeaders);
break;
case MQTT:
result = HiveMqtt3ClientActor.props(connection, proxyActor, connectionActor, dittoHeaders,
connectivityConfigOverwrites);
break;
case MQTT_5:
result = HiveMqtt5ClientActor.props(connection, proxyActor, connectionActor, dittoHeaders,
connectivityConfigOverwrites);
break;
case KAFKA:
result = KafkaClientActor.props(connection, proxyActor, connectionActor, dittoHeaders,
connectivityConfigOverwrites);
break;
case HONO:
result = KafkaClientActor.props(getEnrichedConnection(actorSystem, connection),
proxyActor, connectionActor, dittoHeaders, connectivityConfigOverwrites);
break;
case HTTP_PUSH:
result = HttpPushClientActor.props(connection, proxyActor, connectionActor, dittoHeaders,
connectivityConfigOverwrites);
break;
default:
throw new IllegalArgumentException("ConnectionType <" + connectionType + "> is not supported.");
}
return result;
}
private Connection getEnrichedConnection(final ActorSystem actorSystem, final Connection connection) {
var honoConfig = HonoConfig.get(actorSystem);
final ConnectionId connectionId = connection.getId();
return ConnectivityModelFactory.newConnectionBuilder(
connection.getId(),
connection.getConnectionType(),
connection.getConnectionStatus(),
honoConfig.getBaseUri())
.validateCertificate(honoConfig.getValidateCertificates())
.specificConfig(Map.of(
"saslMechanism", honoConfig.getSaslMechanism().getValue(),
"bootstrapServers", honoConfig.getBootstrapServers(),
"groupId", honoConfig.getTenantId(connectionId) + "_" + connectionId))
.credentials(honoConfig.getCredentials(connectionId))
.sources(connection.getSources()
.stream()
.map(source -> ConnectivityModelFactory.sourceFromJson(
resolveSourceAliases(source, honoConfig.getTenantId(connectionId)), 1))
.toList())
.targets(connection.getTargets()
.stream()
.map(target -> ConnectivityModelFactory.targetFromJson(
resolveTargetAlias(target, honoConfig.getTenantId(connectionId))))
.toList())
.build();
}
private JsonObject resolveSourceAliases(final Source source, String tenantId) {
JsonObjectBuilder sourceBuilder = JsonFactory.newObjectBuilder(source.toJson())
.set(Source.JsonFields.ADDRESSES, JsonArray.of(source.getAddresses().stream()
.map(address -> HonoAddressAlias.resolve(address, tenantId))
.map(JsonValue::of)
.toList()));
source.getReplyTarget().ifPresent(replyTarget -> {
sourceBuilder.set("replyTarget/address", HonoAddressAlias.resolve(replyTarget.getAddress(), tenantId, true));
switch (HonoAddressAlias.fromName(replyTarget.getAddress()).orElse(HonoAddressAlias.UNKNOWN)) {
case COMMAND -> {
sourceBuilder.set("replyTarget/headerMapping/device_id", "{{ thing:id }}");
sourceBuilder.set("replyTarget/headerMapping/subject",
"{{ header:subject | fn:default(topic:action-subject) | fn:default(topic:criterion) }}-response");
}
case COMMAND_RESPONSE -> sourceBuilder.set("replyTarget/headerMapping/status", "{{ header:status }}");
}
sourceBuilder.set("replyTarget/headerMapping/correlation-id", "{{ header:correlation-id }}");
});
return sourceBuilder.build();
}
private JsonObject resolveTargetAlias(final Target target, String tenantId) {
JsonObjectBuilder targetBuilder = JsonFactory.newObjectBuilder(target.toJson())
.set(Target.JsonFields.ADDRESS, Optional.of(target.getAddress())
.map(address -> HonoAddressAlias.resolve(address, tenantId, true))
.orElse(null), jsonField -> !target.getAddress().isEmpty());
return targetBuilder.build();
}
}