/
ConnectionPersistenceOperationsActorIT.java
137 lines (118 loc) · 5.74 KB
/
ConnectionPersistenceOperationsActorIT.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
/*
* Copyright (c) 2021 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.services.connectivity.messaging.persistence;
import java.util.Collections;
import java.util.Optional;
import java.util.concurrent.CompletableFuture;
import org.eclipse.ditto.model.base.auth.AuthorizationContext;
import org.eclipse.ditto.model.base.auth.AuthorizationSubject;
import org.eclipse.ditto.model.base.auth.DittoAuthorizationContextType;
import org.eclipse.ditto.model.base.entity.id.EntityId;
import org.eclipse.ditto.model.base.headers.DittoHeaders;
import org.eclipse.ditto.model.connectivity.Connection;
import org.eclipse.ditto.model.connectivity.ConnectionId;
import org.eclipse.ditto.model.connectivity.ConnectionType;
import org.eclipse.ditto.model.connectivity.ConnectivityModelFactory;
import org.eclipse.ditto.model.connectivity.ConnectivityStatus;
import org.eclipse.ditto.model.connectivity.Source;
import org.eclipse.ditto.services.connectivity.messaging.ClientActorPropsFactory;
import org.eclipse.ditto.services.connectivity.messaging.DefaultClientActorPropsFactory;
import org.eclipse.ditto.services.utils.persistence.mongo.ops.eventsource.MongoEventSourceITAssertions;
import org.eclipse.ditto.signals.commands.connectivity.ConnectivityCommand;
import org.eclipse.ditto.signals.commands.connectivity.ConnectivityCommandInterceptor;
import org.eclipse.ditto.signals.commands.connectivity.exceptions.ConnectionNotAccessibleException;
import org.eclipse.ditto.signals.commands.connectivity.modify.CreateConnection;
import org.eclipse.ditto.signals.commands.connectivity.modify.CreateConnectionResponse;
import org.eclipse.ditto.signals.commands.connectivity.query.RetrieveConnection;
import org.eclipse.ditto.signals.commands.connectivity.query.RetrieveConnectionResponse;
import org.eclipse.ditto.utils.jsr305.annotations.AllValuesAreNonnullByDefault;
import org.junit.Test;
import com.typesafe.config.Config;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.testkit.TestProbe;
/**
* Tests {@link ConnectionPersistenceOperationsActor}.
*/
@AllValuesAreNonnullByDefault
public final class ConnectionPersistenceOperationsActorIT extends MongoEventSourceITAssertions<ConnectionId> {
@Test
public void purgeEntitiesWithoutNamespace() {
assertPurgeEntitiesWithoutNamespace();
}
@Override
protected String getServiceName() {
// this loads the connectivity.conf from module "ditto-services-connectivity-config" as ActorSystem conf
return "connectivity";
}
@Override
protected String getResourceType() {
return ConnectivityCommand.RESOURCE_TYPE;
}
@Override
protected ConnectionId toEntityId(final EntityId entityId) {
return ConnectionId.of(entityId);
}
@Override
protected Object getCreateEntityCommand(final ConnectionId id) {
final AuthorizationContext authorizationContext =
AuthorizationContext.newInstance(DittoAuthorizationContextType.UNSPECIFIED,
AuthorizationSubject.newInstance("subject"));
final Source source =
ConnectivityModelFactory.newSource(authorizationContext, "address");
final Connection connection =
ConnectivityModelFactory.newConnectionBuilder(id, ConnectionType.AMQP_091, ConnectivityStatus.CLOSED,
"amqp://user:pass@8.8.8.8:5671")
.sources(Collections.singletonList(source))
.build();
return CreateConnection.of(connection, DittoHeaders.empty());
}
@Override
protected Class<?> getCreateEntityResponseClass() {
return CreateConnectionResponse.class;
}
@Override
protected Object getRetrieveEntityCommand(final ConnectionId id) {
return RetrieveConnection.of(id, DittoHeaders.empty());
}
@Override
protected Class<?> getRetrieveEntityResponseClass() {
return RetrieveConnectionResponse.class;
}
@Override
protected Class<?> getEntityNotAccessibleClass() {
return ConnectionNotAccessibleException.class;
}
@Override
protected ActorRef startActorUnderTest(final ActorSystem actorSystem, final ActorRef pubSubMediator,
final Config config) {
final Props opsActorProps = ConnectionPersistenceOperationsActor.props(pubSubMediator, mongoDbConfig, config,
persistenceOperationsConfig);
return actorSystem.actorOf(opsActorProps, ConnectionPersistenceOperationsActor.ACTOR_NAME);
}
@Override
protected ActorRef startEntityActor(final ActorSystem system, final ActorRef pubSubMediator,
final ConnectionId id) {
// essentially never restart
final TestProbe proxyActorProbe = new TestProbe(system, "proxyActor");
final ConnectivityCommandInterceptor dummyInterceptor = (command, connectionSupplier) -> {};
final ConnectionPriorityProviderFactory dummyPriorityProvider = (connectionPersistenceActor, log) ->
(connectionId, correlationId) -> CompletableFuture.completedFuture(Optional.of(4711));
final ClientActorPropsFactory entityActorFactory = DefaultClientActorPropsFactory.getInstance();
final Props props =
ConnectionSupervisorActor.props(proxyActorProbe.ref(), entityActorFactory,
dummyInterceptor, dummyPriorityProvider, pubSubMediator);
return system.actorOf(props, String.valueOf(id));
}
}