/
DefaultClientActorPropsFactoryTest.java
155 lines (134 loc) · 6.21 KB
/
DefaultClientActorPropsFactoryTest.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
/*
* Copyright (c) 2017 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.connectivity.service.messaging;
import static org.assertj.core.api.Assertions.assertThat;
import static org.eclipse.ditto.connectivity.model.ConnectionType.AMQP_091;
import static org.eclipse.ditto.connectivity.model.ConnectionType.AMQP_10;
import static org.eclipse.ditto.connectivity.model.ConnectionType.KAFKA;
import static org.eclipse.ditto.connectivity.model.ConnectionType.MQTT;
import static org.eclipse.ditto.connectivity.model.ConnectionType.MQTT_5;
import java.util.concurrent.TimeUnit;
import org.eclipse.ditto.connectivity.model.Connection;
import org.eclipse.ditto.connectivity.model.ConnectionType;
import org.eclipse.ditto.connectivity.model.ConnectivityModelFactory;
import org.junit.After;
import org.junit.Before;
import org.junit.Test;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.remote.DaemonMsgCreate;
import akka.serialization.Serialization;
import akka.serialization.SerializationExtension;
import akka.testkit.javadsl.TestKit;
/**
* Unit tests for {@link DefaultClientActorPropsFactory}.
*/
public final class DefaultClientActorPropsFactoryTest extends WithMockServers {
private ActorSystem actorSystem;
private Serialization serialization;
private ClientActorPropsFactory underTest;
@Before
public void setUp() {
actorSystem = ActorSystem.create("AkkaTestSystem", TestConstants.CONFIG);
serialization = SerializationExtension.get(actorSystem);
underTest = DefaultClientActorPropsFactory.getInstance();
}
@After
public void tearDown() {
if (actorSystem != null) {
TestKit.shutdownActorSystem(actorSystem, scala.concurrent.duration.Duration.apply(5, TimeUnit.SECONDS),
false);
}
}
/**
* Tests serialization of props of AMQP_091 client actor.
* The props needs to be serializable because client actors may be created on a different connectivity service
* instance using a local connection object.
*/
@Test
public void amqp091ActorPropsIsSerializable() {
actorPropsIsSerializableAndEqualDeserializedObject(AMQP_091);
}
/**
* Tests serialization of props of AMQP_10 client actor.
* The props needs to be serializable because client actors may be created on a different connectivity service
* instance using a local connection object.
*/
@Test
public void amqp10ActorPropsIsSerializable() {
actorPropsIsSerializableAndEqualDeserializedObject(AMQP_10);
}
/**
* Tests serialization of props of MQTT client actor. The props needs to be serializable because client actors
* may be created on a different connectivity service instance using a local connection object.
*/
@Test
public void mqttActorPropsIsSerializable() {
actorPropsIsSerializableAndEqualDeserializedObject(MQTT);
}
/**
* Tests serialization of props of MQTT client actor. The props needs to be serializable because client actors
* may be created on a different connectivity service instance using a local connection object.
*/
@Test
public void mqtt5ActorPropsIsSerializable() {
actorPropsIsSerializableAndEqualDeserializedObject(MQTT_5);
}
/**
* Tests serialization of props of Kafka client actor. The props needs to be serializable because client actors
* may be created on a different connectivity service instance using a local connection object.
*/
@Test
@SuppressWarnings("squid:S2699")
public void kafkaActorPropsIsSerializable() {
actorPropsIsSerializable(KAFKA);
}
private void actorPropsIsSerializable(final ConnectionType connectionType) {
final Props props = underTest.getActorPropsForType(randomConnection(connectionType), actorSystem.deadLetters(),
actorSystem.deadLetters(), actorSystem);
final Object objectToSerialize = wrapForSerialization(props);
serializeAndDeserialize(objectToSerialize);
}
private void actorPropsIsSerializableAndEqualDeserializedObject(final ConnectionType connectionType) {
final Props props = underTest.getActorPropsForType(randomConnection(connectionType), actorSystem.deadLetters(),
actorSystem.deadLetters(), actorSystem);
final Object objectToSerialize = wrapForSerialization(props);
final Object deserializedObject = serializeAndDeserialize(objectToSerialize);
assertThat(deserializedObject).isEqualTo(objectToSerialize);
}
private Object serializeAndDeserialize(final Object objectToSerialize) {
final byte[] bytes = serialization.findSerializerFor(objectToSerialize).toBinary(objectToSerialize);
return serialization.deserialize(bytes, objectToSerialize.getClass()).get();
}
/**
* Wrap Props in an object with a reasonable Akka serializer, namely one that applies our configured
* serializer on each argument of Props. For Akka 2.5.13, that object belongs to the Akka-internal class
* DaemonMsgCreate. The class may change in future versions of Akka.
*/
private Object wrapForSerialization(final Props props) {
final String actorClassNameAsPath = props.actorClass().getSimpleName();
return DaemonMsgCreate.apply(props, props.deploy(), actorClassNameAsPath, actorSystem.deadLetters());
}
private Connection randomConnection(final ConnectionType connectionType) {
final Connection template =
TestConstants.createConnection(TestConstants.createRandomConnectionId());
return ConnectivityModelFactory
.newConnectionBuilder(template.getId(),
connectionType,
template.getConnectionStatus(),
template.getUri())
.sources(template.getSources())
.targets(template.getTargets())
.build();
}
}