-
Notifications
You must be signed in to change notification settings - Fork 215
/
SearchForwardingClientActorPropsFactory.java
130 lines (111 loc) · 5.28 KB
/
SearchForwardingClientActorPropsFactory.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
/*
* Copyright (c) 2022 Contributors to the Eclipse Foundation
*
* See the NOTICE file(s) distributed with this work for additional
* information regarding copyright ownership.
*
* This program and the accompanying materials are made available under the
* terms of the Eclipse Public License 2.0 which is available at
* http://www.eclipse.org/legal/epl-2.0
*
* SPDX-License-Identifier: EPL-2.0
*/
package org.eclipse.ditto.connectivity.service.messaging;
import javax.annotation.Nullable;
import org.eclipse.ditto.base.model.headers.DittoHeaders;
import org.eclipse.ditto.base.model.headers.WithDittoHeaders;
import org.eclipse.ditto.connectivity.model.Connection;
import org.eclipse.ditto.internal.utils.akka.controlflow.WithSender;
import org.eclipse.ditto.internal.utils.akka.logging.DittoLoggerFactory;
import org.eclipse.ditto.internal.utils.cluster.DistPubSubAccess;
import com.typesafe.config.Config;
import akka.actor.AbstractActorWithStash;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.cluster.pubsub.DistributedPubSub;
import akka.cluster.pubsub.DistributedPubSubMediator;
import akka.event.DiagnosticLoggingAdapter;
import akka.japi.pf.ReceiveBuilder;
public final class SearchForwardingClientActorPropsFactory extends ClientActorPropsFactory {
/**
* @param actorSystem the actor system in which to load the extension.
*/
public SearchForwardingClientActorPropsFactory(final ActorSystem actorSystem) {
super(actorSystem);
}
@Override
public Props getActorPropsForType(final Connection connection, final ActorRef proxyActor,
final ActorRef connectionActor,
final ActorSystem actorSystem, final DittoHeaders dittoHeaders,
final Config connectivityConfigOverwrites) {
return SearchForwardingClientActor.props(connectionActor);
}
/**
* Mocks a {@link org.eclipse.ditto.connectivity.service.messaging.persistence.ConnectionPersistenceActor} and provides abstraction for a real connection.
*/
public static class SearchForwardingClientActor extends AbstractActorWithStash {
private final DiagnosticLoggingAdapter log = DittoLoggerFactory.getDiagnosticLoggingAdapter(this);
private final ActorRef mediator = DistributedPubSub.get(getContext().getSystem()).mediator();
@Nullable private ActorRef delegate;
@Nullable private ActorRef gossip;
private ActorRef connectionActor;
private SearchForwardingClientActor(final ActorRef connectionActor) {
this.connectionActor = connectionActor;
}
public static Props props(final ActorRef connectionActor) {
return Props.create(SearchForwardingClientActor.class, connectionActor);
}
@Override
public void preStart() {
log.info("Mock client actor started.");
connectionActor.tell(getSelf(), getSelf());
if (gossip != null) {
gossip.tell(getSelf(), getSelf());
}
subscribeForSnapshotPubSubTopic(mediator);
}
private void subscribeForSnapshotPubSubTopic(final ActorRef pubSubMediator) {
final var self = getSelf();
final var subscriptionMessage =
DistPubSubAccess.subscribe("mockClientActor:change", self);
pubSubMediator.tell(subscriptionMessage, self);
}
@Override
public void postStop() {
log.info("Mock client actor was stopped.");
}
@Override
public Receive createReceive() {
return receiveBuilder()
.match(DistributedPubSubMediator.SubscribeAck.class, subscribeAck ->
mediator.tell(DistPubSubAccess.publish("mockClientActor:subscribed",
new MockClientActorPropsFactory.MockClientActor.Subscribed()),
getSelf()))
.match(MockClientActorPropsFactory.MockClientActor.ChangeActorRef.class, s -> {
delegate =
s.delegate != null ? getContext().getSystem().provider().resolveActorRef(s.delegate) :
null;
gossip =
s.gossip != null ? getContext().getSystem().provider().resolveActorRef(s.gossip) : null;
if (gossip != null) {
gossip.tell(getSelf(), getSelf());
}
getSender().tell(new MockClientActorPropsFactory.MockClientActor.ActorRefChanged(), getSelf());
log.info("Switching state.");
getContext().become(initializedBehavior(), false);
unstashAll();
})
.matchAny(any -> stash())
.build();
}
private Receive initializedBehavior() {
return ReceiveBuilder.create()
.match(WithDittoHeaders.class, message -> delegate
.tell(WithSender.of(message, getSelf()), getSender()))
.match(ActorRef.class, actorRef ->
gossip.forward(actorRef, getContext()))
.build();
}
}
}