Skip to content

Commit

Permalink
Java style accessors for AbstractFSM #22592
Browse files Browse the repository at this point in the history
  • Loading branch information
johanandren committed Mar 17, 2017
1 parent 363ca39 commit 07e8830
Show file tree
Hide file tree
Showing 24 changed files with 85 additions and 58 deletions.
26 changes: 26 additions & 0 deletions akka-actor/src/main/scala/akka/actor/AbstractFSM.scala
Expand Up @@ -36,6 +36,32 @@ abstract class AbstractFSM[S, D] extends FSM[S, D] {
import java.util.{ List JList }
import FSM._

/**
* Returns this AbstractActor's ActorContext
* The ActorContext is not thread safe so do not expose it outside of the
* AbstractActor.
*/
def getContext(): AbstractActor.ActorContext = context.asInstanceOf[AbstractActor.ActorContext]

/**
* Returns the ActorRef for this actor.
*
* Same as `self()`.
*/
def getSelf(): ActorRef = self

/**
* The reference sender Actor of the currently processed message. This is
* always a legal destination to send to, even if there is no logical recipient
* for the reply, in which case it will be sent to the dead letter mailbox.
*
* Same as `sender()`.
*
* WARNING: Only valid within the Actor itself, so do not close over it and
* publish it to other threads!
*/
def getSender(): ActorRef = sender()

/**
* Insert a new StateFunction at the end of the processing chain for the
* given state.
Expand Down
6 changes: 3 additions & 3 deletions akka-docs/rst/java/code/jdocs/actor/ActorDocTest.java
Expand Up @@ -398,7 +398,7 @@ private AbstractActor.Receive shuttingDown() {
getSender().tell("service unavailable, shutting down", getSelf())
)
.match(Terminated.class, t -> t.actor().equals(worker), t ->
getContext().stop(self())
getContext().stop(getSelf())
)
.build();
}
Expand Down Expand Up @@ -716,15 +716,15 @@ public Receive createReceive() {
getContext().become(active(ref));
})
.match(ActorIdentity.class, id -> !id.getActorRef().isPresent(), id -> {
getContext().stop(self());
getContext().stop(getSelf());
})
.build();
}

final AbstractActor.Receive active(final ActorRef another) {
return receiveBuilder()
.match(Terminated.class, t -> t.actor().equals(another), t ->
getContext().stop(self())
getContext().stop(getSelf())
)
.build();
}
Expand Down
@@ -1,7 +1,7 @@
/**
* Copyright (C) 2009-2017 Lightbend Inc. <http://www.lightbend.com>
*/
package jdocs.actor.japi;
package jdocs.actor;

//#all
//#imports
Expand All @@ -14,6 +14,7 @@
import akka.dispatch.Mapper;
import akka.event.LoggingReceive;
import akka.japi.pf.DeciderBuilder;
import akka.pattern.Patterns;
import akka.util.Timeout;
import com.typesafe.config.Config;
import com.typesafe.config.ConfigFactory;
Expand All @@ -27,10 +28,10 @@
import static akka.pattern.Patterns.ask;
import static akka.pattern.Patterns.pipe;

import static jdocs.actor.japi.FaultHandlingDocSample.WorkerApi.*;
import static jdocs.actor.japi.FaultHandlingDocSample.CounterServiceApi.*;
import static jdocs.actor.japi.FaultHandlingDocSample.CounterApi.*;
import static jdocs.actor.japi.FaultHandlingDocSample.StorageApi.*;
import static jdocs.actor.FaultHandlingDocSample.WorkerApi.*;
import static jdocs.actor.FaultHandlingDocSample.CounterServiceApi.*;
import static jdocs.actor.FaultHandlingDocSample.CounterApi.*;
import static jdocs.actor.FaultHandlingDocSample.StorageApi.*;

//#imports

Expand Down Expand Up @@ -148,7 +149,7 @@ public Receive createReceive() {
counterService.tell(new Increment(1), getSelf());
counterService.tell(new Increment(1), getSelf());
// Send current progress to the initial sender
pipe(ask(counterService, GetCurrentCount, askTimeout)
pipe(Patterns.ask(counterService, GetCurrentCount, askTimeout)
.mapTo(classTag(CurrentCount.class))
.map(new Mapper<CurrentCount, Progress>() {
public Progress apply(CurrentCount c) {
Expand Down
2 changes: 1 addition & 1 deletion akka-docs/rst/java/code/jdocs/actor/fsm/Buncher.java
Expand Up @@ -39,7 +39,7 @@ public class Buncher extends AbstractFSM<State, Data> {
// reuse this matcher
final UnitMatch<Data> m = UnitMatch.create(
matchData(Todo.class,
todo -> todo.getTarget().tell(new Batch(todo.getQueue()), self())));
todo -> todo.getTarget().tell(new Batch(todo.getQueue()), getSelf())));
m.match(stateData());
}).
state(Idle, Active, () -> {/* Do something here */}));
Expand Down
12 changes: 6 additions & 6 deletions akka-docs/rst/java/code/jdocs/actor/fsm/FSMDocTest.java
Expand Up @@ -131,10 +131,10 @@ public class MyFSM extends AbstractLoggingFSM<StateType, Data> {
log().warning("Failure in state " + state + " with data " + data + "\n" +
"Events leading up to this point:\n\t" + lastEvents);
//#logging-fsm
target.tell(reason.cause(), self());
target.tell(state, self());
target.tell(data, self());
target.tell(lastEvents, self());
target.tell(reason.cause(), getSelf());
target.tell(state, getSelf());
target.tell(data, getSelf());
target.tell(lastEvents, getSelf());
//#logging-fsm
})
);
Expand All @@ -143,11 +143,11 @@ public class MyFSM extends AbstractLoggingFSM<StateType, Data> {
startWith(SomeState, Data.Foo);
when(SomeState, matchEvent(ActorRef.class, Data.class, (ref, data) -> {
target = ref;
target.tell("going active", self());
target.tell("going active", getSelf());
return goTo(Active);
}));
when(Active, matchEventEquals("stop", (event, data) -> {
target.tell("stopping", self());
target.tell("stopping", getSelf());
return stop(new Failure("This is not the error you're looking for"));
}));
initialize();
Expand Down
Expand Up @@ -18,7 +18,7 @@ public Receive createReceive() {
CompletableFuture.supplyAsync(() -> factorial(n))
.thenApply((factorial) -> new FactorialResult(n, factorial));

pipe(result, getContext().dispatcher()).to(sender());
pipe(result, getContext().dispatcher()).to(getSender());

})
.build();
Expand Down
Expand Up @@ -51,7 +51,7 @@ public Receive createReceive() {
if (repeat)
sendJobs();
else
getContext().stop(self());
getContext().stop(getSelf());
}
})
.match(ReceiveTimeout.class, x -> {
Expand Down
Expand Up @@ -53,7 +53,7 @@ public void preStart() {
//re-subscribe when restart
@Override
public void postStop() {
cluster.unsubscribe(self());
cluster.unsubscribe(getSelf());
tickTask.cancel();
}

Expand Down
Expand Up @@ -18,13 +18,13 @@ public class TransformationBackend extends AbstractActor {
//subscribe to cluster changes, MemberUp
@Override
public void preStart() {
cluster.subscribe(self(), MemberUp.class);
cluster.subscribe(getSelf(), MemberUp.class);
}

//re-subscribe when restart
@Override
public void postStop() {
cluster.unsubscribe(self());
cluster.unsubscribe(getSelf());
}

@Override
Expand Down
Expand Up @@ -31,8 +31,8 @@ public Receive createReceive() {
.forward(job, getContext());
})
.matchEquals(BACKEND_REGISTRATION, x -> {
getContext().watch(sender());
backends.add(sender());
getContext().watch(getSender());
backends.add(getSender());
})
.match(Terminated.class, terminated -> {
backends.remove(terminated.getActor());
Expand Down
Expand Up @@ -118,7 +118,7 @@ public Receive createReceive() {
return receiveBuilder()
.match(String.class, a -> a.equals("increment"), a -> {
// incoming command to increase the counter
Optional<Object> reqContext = Optional.of(sender());
Optional<Object> reqContext = Optional.of(getSender());
Replicator.Update<PNCounter> upd = new Replicator.Update<PNCounter>(counter1Key,
PNCounter.create(), writeTwo, reqContext, curr -> curr.increment(node, 1));
replicator.tell(upd, getSelf());
Expand Down Expand Up @@ -217,7 +217,7 @@ public Receive createReceive() {
return receiveBuilder()
.match(String.class, a -> a.equals("get-count"), a -> {
// incoming request to retrieve current value of the counter
Optional<Object> reqContext = Optional.of(sender());
Optional<Object> reqContext = Optional.of(getSender());
replicator.tell(new Replicator.Get<PNCounter>(counter1Key,
readTwo), getSelf());
})
Expand Down
2 changes: 1 addition & 1 deletion akka-docs/rst/java/code/jdocs/io/JavaReadBackPressure.java
Expand Up @@ -46,7 +46,7 @@ public void preStart() throws Exception {
tcp = Tcp.get(getContext().getSystem()).manager();
final List<Inet.SocketOption> options = new ArrayList<Inet.SocketOption>();
tcp.tell(
TcpMessage.bind(self(), new InetSocketAddress("localhost", 0), 100, options, true),
TcpMessage.bind(getSelf(), new InetSocketAddress("localhost", 0), 100, options, true),
getSelf()
);
//#pull-mode-bind
Expand Down
2 changes: 1 addition & 1 deletion akka-docs/rst/java/code/jdocs/io/JavaUdpMulticast.java
Expand Up @@ -73,7 +73,7 @@ public Listener(String iface, String group, Integer port, ActorRef sink) {
final ActorRef mgr = Udp.get(getContext().getSystem()).getManager();
// listen for datagrams on this address
InetSocketAddress endpoint = new InetSocketAddress(port);
mgr.tell(UdpMessage.bind(self(), endpoint, options), getSelf());
mgr.tell(UdpMessage.bind(getSelf(), endpoint, options), getSelf());
//#bind
}

Expand Down
10 changes: 5 additions & 5 deletions akka-docs/rst/java/code/jdocs/io/UdpDocTest.java
Expand Up @@ -35,7 +35,7 @@ public SimpleSender(InetSocketAddress remote) {
public Receive createReceive() {
return receiveBuilder()
.match(Udp.SimpleSenderReady.class, message -> {
getContext().become(ready(sender()));
getContext().become(ready(getSender()));
//#sender
getSender().tell(UdpMessage.send(ByteString.fromString("hello"), remote), getSelf());
//#sender
Expand Down Expand Up @@ -68,7 +68,7 @@ public Listener(ActorRef nextActor) {
// request creation of a bound listen socket
final ActorRef mgr = Udp.get(getContext().getSystem()).getManager();
mgr.tell(
UdpMessage.bind(self(), new InetSocketAddress("localhost", 0)),
UdpMessage.bind(getSelf(), new InetSocketAddress("localhost", 0)),
getSelf());
}

Expand Down Expand Up @@ -116,14 +116,14 @@ public Connected(InetSocketAddress remote) {

// create a restricted a.k.a. “connected” socket
final ActorRef mgr = UdpConnected.get(getContext().getSystem()).getManager();
mgr.tell(UdpConnectedMessage.connect(self(), remote), getSelf());
mgr.tell(UdpConnectedMessage.connect(getSelf(), remote), getSelf());
}

@Override
public Receive createReceive() {
return receiveBuilder()
.match(UdpConnected.Connected.class, message -> {
getContext().become(ready(sender()));
getContext().become(ready(getSender()));
//#connected
getSender()
.tell(UdpConnectedMessage.send(ByteString.fromString("hello")),
Expand Down Expand Up @@ -154,7 +154,7 @@ private Receive ready(final ActorRef connection) {
connection.tell(message, getSelf());
})
.match(UdpConnected.Disconnected.class, x -> {
getContext().stop(self());
getContext().stop(getSelf());
})
.build();
}
Expand Down
10 changes: 5 additions & 5 deletions akka-docs/rst/java/code/jdocs/io/japi/EchoHandler.java
Expand Up @@ -87,7 +87,7 @@ private Receive writing() {
.match(ConnectionClosed.class, msg -> {
if (msg.isPeerClosed()) {
if (storage.isEmpty()) {
getContext().stop(self());
getContext().stop(getSelf());
} else {
getContext().become(closing());
}
Expand Down Expand Up @@ -119,7 +119,7 @@ protected Receive buffering(final Ack nack) {
if (msg.isPeerClosed())
state.peerClosed = true;
else
getContext().stop(self());
getContext().stop(getSelf());

})
.match(Integer.class, ack -> {
Expand All @@ -130,7 +130,7 @@ protected Receive buffering(final Ack nack) {

if (storage.isEmpty()) {
if (state.peerClosed)
getContext().stop(self());
getContext().stop(getSelf());
else
getContext().become(writing);

Expand Down Expand Up @@ -165,7 +165,7 @@ protected Receive closing() {
.match(Integer.class, msg -> {
acknowledge(msg);
if (storage.isEmpty())
getContext().stop(self());
getContext().stop(getSelf());
})
.build();
}
Expand Down Expand Up @@ -197,7 +197,7 @@ protected void buffer(ByteString data) {

if (stored > MAX_STORED) {
log.warning("drop connection to [{}] (buffer overrun)", remote);
getContext().stop(self());
getContext().stop(getSelf());

} else if (stored > HIGH_WATERMARK) {
log.debug("suspending reading at {}", currentOffset());
Expand Down
6 changes: 3 additions & 3 deletions akka-docs/rst/java/code/jdocs/io/japi/EchoManager.java
Expand Up @@ -40,14 +40,14 @@ public void preStart() throws Exception {
final ActorRef tcpManager = Tcp.get(getContext().getSystem()).manager();
//#manager
tcpManager.tell(
TcpMessage.bind(self(), new InetSocketAddress("localhost", 0), 100),
TcpMessage.bind(getSelf(), new InetSocketAddress("localhost", 0), 100),
getSelf());
}

@Override
public void postRestart(Throwable arg0) throws Exception {
// do not restart
getContext().stop(self());
getContext().stop(getSelf());
}

@Override
Expand All @@ -59,7 +59,7 @@ public Receive createReceive() {
.match(Tcp.CommandFailed.class, failed -> {
if (failed.cmd() instanceof Bind) {
log.warning("cannot bind to [{}]", ((Bind) failed.cmd()).localAddress());
getContext().stop(self());
getContext().stop(getSelf());
} else {
log.warning("unknown command failed [{}]", failed.cmd());
}
Expand Down
14 changes: 7 additions & 7 deletions akka-docs/rst/java/code/jdocs/io/japi/IODocTest.java
Expand Up @@ -48,7 +48,7 @@ public static Props props(ActorRef manager) {
@Override
public void preStart() throws Exception {
final ActorRef tcp = Tcp.get(getContext().getSystem()).manager();
tcp.tell(TcpMessage.bind(self(),
tcp.tell(TcpMessage.bind(getSelf(),
new InetSocketAddress("localhost", 0), 100), getSelf());
}

Expand All @@ -60,7 +60,7 @@ public Receive createReceive() {

})
.match(CommandFailed.class, msg -> {
getContext().stop(self());
getContext().stop(getSelf());

})
.match(Connected.class, conn -> {
Expand All @@ -87,7 +87,7 @@ public Receive createReceive() {
getSender().tell(TcpMessage.write(data), getSelf());
})
.match(ConnectionClosed.class, msg -> {
getContext().stop(self());
getContext().stop(getSelf());
})
.build();
}
Expand Down Expand Up @@ -118,13 +118,13 @@ public Receive createReceive() {
return receiveBuilder()
.match(CommandFailed.class, msg -> {
listener.tell("failed", getSelf());
getContext().stop(self());
getContext().stop(getSelf());

})
.match(Connected.class, msg -> {
listener.tell(msg, getSelf());
getSender().tell(TcpMessage.register(self()), getSelf());
getContext().become(connected(sender()));
getSender().tell(TcpMessage.register(getSelf()), getSelf());
getContext().become(connected(getSender()));
})
.build();
}
Expand All @@ -144,7 +144,7 @@ private Receive connected(final ActorRef connection) {
connection.tell(TcpMessage.close(), getSelf());
})
.match(ConnectionClosed.class, msg -> {
getContext().stop(self());
getContext().stop(getSelf());
})
.build();
}
Expand Down

0 comments on commit 07e8830

Please sign in to comment.