Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Provide public infrastructure for using a single actor as Flow implementation #16985

Closed
sirthias opened this issue Mar 3, 2015 · 13 comments
Closed

Comments

@sirthias
Copy link
Contributor

sirthias commented Mar 3, 2015

Currently akka-stream provides me with ActorPublisher and ActorSubscriber traits that I can use to easily implement a Source or Sink (resp.) manually by using these constructors:

Source.apply[T](Props): Source[T, ActorRef]
Sink.apply[T](Props): Sink[T, ActorRef]

I suggest we add

trait ActorPublisher[T] extends ActorSubscriber with ActorPublisher[T]

as well as

Flow.apply[A, B](Props): Flow[A, B, ActorRef]

to let me do the same for flows that are backed by a single actor.

Is there work-around that I can use with the current infrastructure to achieve the same effect?
(Other than splitting my processor logic into two actors, one for the Sink and one for the Source side.)

@sirthias
Copy link
Contributor Author

sirthias commented Mar 4, 2015

Could anyone comment on the question of whether there is a work-around currently?

My use case:
I need to implement a Flow[A, B, _] that wraps/manages an inner (nested) Flow[A, B, _]. This inner flow needs to be materialized on demand, i.e. only when the first A has actually arrived. Also, the inner stream is allowed to terminate (even with an error) without it affecting the outside. In this case the next A should simply rematerialize the inner stream.
I guess this is easiest done by completely decoupling the inner stream from the outside with an actor-based stage, but I might be missing something.

@sirthias
Copy link
Contributor Author

sirthias commented Mar 4, 2015

/cc @jrudolph

@rkuhn
Copy link
Contributor

rkuhn commented Mar 30, 2015

Implementing the stream handling in an Actor is a complex endeavor, I’d rather get rid of ActorPublisher/ActorSubscriber than put more weight on them (yes, I do appreciate that sometimes we need them, but AFAICS only for Sinks and Sources). I think more complex transformation steps should be modeled using mapAsync: asking an Actor keeps that Actor oblivious to the backpressure concerns and nicely separates concerns.

@rkuhn rkuhn added this to the streams-1.0 milestone Mar 30, 2015
@sirthias
Copy link
Contributor Author

That's not going to be good enough.
My use case:
I have a stream of this setup:

A ==> [B ==> C] ==> D

To the outside this behaves exactly like a Flow[A, B].
Internally a part of the stream (the inner Flow[B, C]) needs to be attached with very flexible "joints" that

  1. do not materialize the inner Flow[B, C] unless the first B actually arrives at the joint
  2. do not allow completion or errors from the inner flow to carry over to the outside
  3. rematerialize the inner flow if whenever necessary

This is very easily solved with a single actor "wrapping" the inner flow.

Another (similar) use case, where I also need the "single-actor processor", is dynamic subscription.
I have a running stream whose "start" port is an ActorPublisher and its "end" port an ActorSubscriber.
This running stream constitutes a service that clients can attach to. A client is a Flow which, when materialized, routes its data elements through the service stream. When the client flow completes the inner service flow is not completed. Rather, it is kept running and the client simply detaches.
This is currently easiest built with the use of single-actor processors.

While we might want to support all this with special constructs in the future we have the need for these stream stages right now, rather in half a year. So we need a solution now!
I don't see an easier way to supply me with the flexibility that I need than to give me a single-actor processor. For my needs it'd be fine if it was private[akka] and not "true use API".

@oseval
Copy link

oseval commented Mar 30, 2015

@sirthias +1 I have a similar case with dynamic subscriptions in #16873

@oseval
Copy link

oseval commented Mar 30, 2015

Also I need to note that ActorPublisher[T] and ActorSubscriber must have the FanIn/FanOut behaviour sometimes.

@sirthias
Copy link
Contributor Author

@SmLin In most cases you should get away with an ActorPublisher followed by a FlexiRoute or a FlexiMerge before an ActorSubscriber.

@rkuhn
Copy link
Contributor

rkuhn commented Apr 20, 2015

Closing this one as invalid: AsyncStage should be able to cover all cases, especially once we port InputBunch/OutputBunch to it.

@jroper
Copy link
Contributor

jroper commented Aug 7, 2015

I'd like to reopen this discussion. I think my use case may be a bit different to @sirthias. We currently have integration that most people use in Play for handling websockets with actors, here's the docs:

https://www.playframework.com/documentation/2.4.x/ScalaWebSockets#Handling-WebSockets-with-actors

My hope was that I could deprecate everything in Play specific to handling WebSockets in Play with actors, and point users to something in Akka streams that replaces it. Not quite.

So, firstly, mapAsync is not appropriate for use here, as there typically no "mapping" between input websocket messages and output websocket messages, reactive maps is a good example, an input message updates the users location or viewport area, an output message updates the users viewed area periodically - there is no map done.

AsyncStage could be used, but this is awkward to integrate into an Akka application, again consider reactive maps, you can't directly subscribe an AsyncStage to a distributed pubsub actor, which reactive maps needs to do. Instead you have to create an actor that would subscribe itself to the distributed pubsub actor and feed everything it receives into the AsyncStage somehow. Since you're creating an actor anyway, why not have that actor just be the actor that handles the flow?

So, the interface that I'd like to see on Flow is:

def actorRef[In, Out](props: ActorRef => Props, onCompleteMessage: Any, bufferSize: Int, overflowStrategy: OverflowStrategy): Flow[In, Out, Unit]

The props function would pass in the actor that the flow actor can use to send messages to, that actor would have the same contract as Source.actorRef, and it would return an actor that can be used to receive messages, that actor will have the same contract as the actor passed to Sink.actorRef. Note, give the contracts of these actors, back pressure is never propagated, but that's a natural consequence of using actors, and is not necessarily a bad thing for applications that have embraced the actor model, such as reactive maps.

Note that the above can almost be implemented today on top of the Akka streams API, except that it requires an intermediate materialization, making it only possible to use it once:

  def actorRef[In, Out](props: ActorRef => Props, onCompleteMessage: Any,
      bufferSize: Int, overflowStrategy: OverflowStrategy)
      (implicit factory: ActorRefFactory, mat: Materializer): Flow[In, Out, Unit] = {

    val (outActor, publisher) = Source.actorRef[Out](bufferSize, overflowStrategy)
        .toMat(Sink.publisher)(Keep.both).run()

    Flow.wrap(
      Sink.actorRef(factory.actorOf(props(outActor)), onCompleteMessage),
      Source(publisher)
    )(Keep.none)
  }

Perhaps the only additional thing here, is since the actor is being created by the method, it should also be automatically shutdown when the stream completes.

@rkuhn
Copy link
Contributor

rkuhn commented Oct 1, 2015

Sorry for the long delay, the github notification somehow floated to the bottom of the pile. I think that we do indeed need to discuss this further, in particular which exact semantics we want to model—that will then inform our decision on which API to offer.

@rkuhn rkuhn reopened this Oct 1, 2015
@rkuhn rkuhn modified the milestones: http-1.1, will not fix Oct 1, 2015
@ktoso ktoso modified the milestones: http-2.0-M1, http-2.0-M2 Nov 6, 2015
@ktoso ktoso modified the milestones: http-2.0-M2, http-2.0-M3 Nov 30, 2015
@ktoso ktoso removed this from the http-2.0-M2 milestone Nov 30, 2015
@rkuhn
Copy link
Contributor

rkuhn commented Dec 16, 2015

We have StageActorRef now for GraphStageLogic, so using Actors anywhere near a Stream is both unneeded and possible :-) (i.e. when interfacing with existing APIs a GraphStage can now do so natively).

@rkuhn rkuhn closed this as completed Dec 16, 2015
@rkuhn rkuhn modified the milestones: http-2.0-M3, http-2.0 Dec 16, 2015
@jroper
Copy link
Contributor

jroper commented Dec 17, 2015

One question though - when interacting with StageActorRef, do you have to manage back pressure? Or more to the point, if I don't want backpressure, but rather want a configurable strategy for what to do when buffers are full, is there a simple solution for me to use?

The example use case is the one everyone always uses in WebSockets 101 - a simple chat room. In a chatroom, it's fine to drop the connection if the client doesn't consume messages fast enough, and if it's a problem, you can place a limit on the rate at which any one client can send messages, dropping the connection when they exceed that, so back pressure is not needed. In node.js, implementing a simple chat room is only a handful of lines of code:

var connections = {};
var count = 0;
ws.createServer(function(connection) {
    var id = ++count;
    connections[id] = connection;
    connection.on("text", function(msg) {
        connections.forEach(function(c) {
            c.sendText(msg);
        });
    });
    connection.on("close", function() {
        delete connections[id];
    });
}).listen(8000);

This makes for a great code example, it's very easy to understand, and when people want to evaluate what server they are going to use to provide their websockets, they're going to look at things like this to see how simple it is. So what's our answer to that with Play using Akka streams (or Akka HTTP using Akka streams even)?

The simplest I can come up with is this:

class ChatRoom extends Actor {
  var users: Set[ActorRef] = Nil
  def receive = {
    case msg: String =>
      users.foreach(user => user ! msg)
    case user: ActorRef =>
      users += user
      context.watch(user)
    case Terminated(user) =>
      users -= user
    case Unit =>
      // Ignore, we're already watching the stream that terminated
  }
}

val chatRoom = system.actorOf(Props[ChatRoom])

def webSocket = WebSocket { _ =>
  Flow.wrap(
    Sink.actorRef[String](chatRoom, ()),
    Source.actorRef[String](10, OverflowStrategy.fail)
  ) { (_, user) => chatRoom ! user }
}

Aside from being more lines of code, this has the following problems:

  • You need to understand actor watching.
  • You need to understand the semantically very complex Sink.actorRef and Source.actorRef interfaces.
  • You need to understand materialization, and why you need to pass a function to handle it in Flow.wrap
  • There's a race condition where a user could send a message before they are in the list of users to receive a message - ie, they won't receive their own messages - this doesn't exist for the node.js solution. Solving this, at my best effort, requires about another 30 or so lines of code, where you have to have another actor per user, and do some state management to buffer messages until you've received the actor to send with. This is probably the biggest problem with the above solution. It's also a problem if you want to do any request/response processing on the WebSocket, since there's no association between the in and the out, and getting this association requires a similar solution to the above.

So, does the new API make this very common and very simple use case for WebSockets simple to implement with Akka streams? Or do we need to keep this supplemental Akka streams API in Play to provide it?

@rkuhn
Copy link
Contributor

rkuhn commented Dec 17, 2015

StageActorRef locally has all the semantics of an ActorRef, nothing more and nothing less (making it remote-capable is scheduled for later and not relevant for most use-cases).

The answer to your questions is “yes”: GraphStage gives you—the library developer—all the tools to create a nice single-line abstraction that users can use to plug their Actor into a websocket flow. We can discuss details separately.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Projects
None yet
Development

No branches or pull requests

5 participants