Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

GenRouter -> GenBroker #9

Closed
josevalim opened this issue Mar 22, 2016 · 12 comments
Closed

GenRouter -> GenBroker #9

josevalim opened this issue Mar 22, 2016 · 12 comments

Comments

@josevalim
Copy link
Member

This proposal introduce two new components into Elixir,
GenStage and Broker.

Stages are computation stages that send and/or receive data
from other stages. When a stage sends data, it acts as
a producer. When it receives data, it acts as a consumer.
Stages may take both producer and consumer roles at once.
From now on, when we mention "producer" and "consumer", we
imply a stage taking its producer or consumer roles.

When data is sent between stages, it is done by a message
protocol that provides back-pressure. It starts by the
consumer stage subscribing to the producer stage and
asking for events. A consumer stage will never receive
more data than it has asked for from its producer stage.

By default, a stage may only connect to a single producer
and/or a single consumer. A broker lifts this limitation
by allowing M producers to connect to N subscribers according
to a given strategy.

This document describes the messages received by both
producers and consumer roles. It also specifies both stage
and broker behaviours.

Message protocol

This section specifies the message protocol for both producers
and consumers. Most developers won't implement those messages
but rely on GenStage and Broker behaviours defined in later
sections.

Producer

The producer is responsible for sending events to consumers
based on demand.

A producer MUST manage at least one subscription by receiving a
subscription message from a consumer stage or from a consumer
broker. Once a subscription is established, new connections
MAY be established and demand MAY be received.

Except by the initial subscription message, the producer does
not make distinction about its consumers. All messages it must
receive are defined below:

  • {:"$gen_producer", from :: {consumer_pid, subscription_ref}, {:stage, options}} -
    sent by the consumer to the producer to start a new subscription.

    Once sent, the consumer MAY immediately send demand to the producer.
    The subscription_ref is unique to identify the subscription. The
    consumer MUST monitor the producer for clean-up purposes in case of
    crashes. The consumer MUST NOT establish new connections over this
    subscription.

    Once received, the producer MUST monitor the consumer. If the producer
    already has a subscription, it MAY ignore future subscriptions by
    sending a disconnect reply (defined in the Consumer section) except
    for cases where the new subscription matches the subscription_ref.
    In such cases, the producer MUST crash.

  • {:"$gen_producer", from :: {consumer_pid, subscription_ref}, {:broker, strategy, options}} -
    sent by the consumer to the producer to start a new subscription.

    The consumer MAY establish new connections by sending :connect
    messages defined below. The subscription_ref is unique to identify
    the subscription. The consumer MUST monitor the producer for clean-up
    purposes in case of crashes.

    Once received, the producer MUST monitor the consumer. The producer
    MUST initialize the strategy by calling strategy.init(from, options).
    If the producer already has a subscription, it MAY ignore future
    subscriptions by sending a disconnect reply (defined in the Consumer
    section) except for cases where the new subscription matches the
    subscription_ref. In such cases, the producer MUST crash.

  • {:"$gen_producer", from :: {pid, subscription_ref}, {:connect, consumers :: [pid]}} -
    sent by the consumer to producers to start new connections.

    Once sent, the consumer MAY immediately send demand to the producer.
    The subscription_ref is unique to identify the subscription.

    Once received, the producer MUST call strategy.connect(consumers, from, state)
    if one is available. If the subscription_ref is unknown, the
    producer MUST send an appropriate disconnect reply to each consumer.

  • {:"$gen_producer", from :: {consumer_pid, subscription_ref}, {:disconnect, reason}} -
    sent by the consumer to disconnect a given consumer-subscription pair.

    Once received, the producer MAY call strategy.disconnect(reason, from, state)
    if one is available. The strategy MUST send a disconnect message to the
    consumer pid. If the consumer_pid refers to the process that started
    the subscription, all connections MUST be disconnected. If the
    consumer-subscription is unknown, a disconnect MUST still be sent with
    proper reason. In all cases, however, there is no guarantee the message
    will be delivered (for example, the producer may crash just before sending
    the confirmation).

  • {:"$gen_producer", from :: {consumer_pid, subscription_ref}, {:ask, count}} -
    sent by consumers to ask data from a producer for a given consumer-subscription pair.

    Once received, the producer MUST call strategy.ask(count, from, state)
    if one is available. The producer MUST send data up to the demand. If the
    pair is unknown, the produder MUST send an appropriate disconnect reply.

Consumer

The consumer is responsible for starting the subscription
and sending demand to producers.

A consumer MUST manage at least one subscription by sending a
subscription message to a producer. Once a subscription is
established, new connections MAY be established and demand MAY
be sent. Once demand is sent, messages may be received as
defined below:

  • {:"$gen_consumer", from :: {producer_pid, subscription_ref}, {:connect, producers :: [pid]}} -
    sent by producers to consumers to start new connections.

    Once received, the consumer MAY immediately send demand to
    the producer. The subscription_ref is unique to identify
    the subscription. If the subscription is not known, a
    disconnect message must be sent back to each producer.

  • {:"$gen_consumer", from :: {producer_pid, subscription_ref}, {:disconnect, reason}} -
    sent by producers to disconnect a given producer-subscription pair.

    It is used as a confirmation for client disconnects OR whenever
    the producer wants to cancel some upstream demand. Reason may be
    :done, :halted or :unknown_subscription.

  • {:"$gen_consumer", from :: {producer_pid, subscription_ref}, [event]} -
    events sent by producers to consumers.

    subscription_ref identifies the subscription. The third argument
    is a non-empty list of events. If the subscription is unknown, the
    events must be ignored.

GenStage

GenStage is a generic stage that may act as a producer,
consumer or both. It is built on top of a GenServer with
the following changes:

  • init(args) may return {:ok, state, opts} where opts
    MAY contain keys such as:
    • :subscribe_to - the producer to subscribe to (enables consumer)
    • :max_demand - the maximum demand it may ask from producer
    • :min_demand - the minimum demand which, once reached, requests for more demand upstream
  • handle_event(event, from, state) invoked on consumers.
    Must return the same as GenServer.handle_info/2.
  • handle_call/3, handle_cast/2 and handle_info/2 will
    be changed to allow emitting events (for producers).
  • handle_demand(demand, from, state) invoked on producers.
    Must return the same as GenStage.handle_call/2.

TODO: Should we copy all of the GenServer API (call, cast, multicall) into GenStage? Part of it?
Or should we ask them to use GenServer?

Consumer example

A simple consumer that inspects events:

defmodule InspectConsumer do
  use GenStage

  def init(_) do
    # TODO: How to specify options for the subscription itself?
    # I.e. the options in {:"$gen_producer", from, {:stage, options}}?
    {:ok, %{}, subscribe_to: ..., max_demand: 50, min_demand: 25}
  end

  def handle_event(event, _from, state) do
    IO.inspect event
    {:noreply, state}
  end
end

Producer example

A simple producer that returns data according to a counter:

defmodule CounterProducer do
  use GenStage

  def init(_) do
    {:ok, 0}
  end

  def handle_demand(demand, _from, state) do
    {:dispatch, Enum.to_list(counter..demand-1), counter + demand}
  end
end

Broker

The broker is responsible for connecting M producers to
N consumers. The connections between producers and consumers
are established directly and not intermediated by the broker.
This means consumers will send demand to M producers and
producers will send events to N consumers. How the demand is
handled by the producer is done via a broker strategy.

Subscribing a consumer to a broker is the same as subscribing it
to any other producer. A broker may also subscribe itself to a
producer, the only difference from the producer perspective is
that subscription message is tagged as :broker with a strategy
instead of :stage (as specified in the "$gen_producer" messages
defined in earlier sections).

A broker will never send demand to its producers. That's because
the producer is never expected to send events directly to the
broker. Demand is always received directly from consumers and
events are sent directly to customers to avoid overhead.

A broker, however, will receive demand from consumers. Such
are used for dynamically dispatch events through the broker.

Finally, a broker is responsible for monitoring all producers
and consumers and relay the proper connect and disconnect
messages to producers and consumers.

Connection management

When a new producer is added to the broker, the broker will send
N connect messages to the producer, each referencing all the
existing N consumers. After it will send a connect message to all
existing N consumers referencing the producer. The Broker will
remain subscribed to the producer but never send demand upstream.

When a new consumer is added to the broker, a demand will be
established between broker and consumer, where the broker is
effectively a producer. This will be used for dynamic borker
dispatch. After the broker-consumer relationship is established,
the broker will send M subscribe messages to all existing M
producers referencing the consumer as well as M messages to
the new consumer referencing all M producers.

Broker strategy

TODO: specify all callbacks in the broker strategy

Dynamic broker dispatch

TODO: specify how dynamic dispatch through the broker works

@josevalim
Copy link
Member Author

@fishcakez I am stuck on the following problem. Imagine the following pipeline:

[A] -> [B] -> [C]

C is going to send demand to B which is going to send demand to A. For A, events will likely be emitted by handle_demand/2. For B, it needs send demand upstream, but typically we want to do so only after we receive demand from C. C is the one generating the demand.

I thought about the following configurations:

  • A will implement handle_demand
  • B will subscribe to A and have min_demand == 0. min_demand works as a buffer. Because we have demand from downstream, we don't want to buffer and keep everything in flow
  • C will subscribe to B and have min_demand != 0. We need to set its min_demand to something different than 0 because it will never receive demand from downstream so it needs to "buffer" for itself

The question is, given those dynamics, what is the best configuration API?

Because ideally we would prefer the connection of stages to be decoupled from their definition, I was rather thinking about having both call_subscribe/2 and cast_subscribe/2. This way, if I want to subscribe on init, I can do:

GenStage.cast_subscribe(self(), to: Producer)

We could also use this function to call subscribe from "outside". So I could define both producers and consumers and then call GenStage.cast_subscribe(Consumer, to: Producer). The question is: if I call cast_subscribe and handle_demand is defined, should we raise? I think we should. Which means we should likely make handle_demand and handle_event optional callbacks.

What do you think?

@fishcakez
Copy link
Member

I think in init/1 we should have a return similar to :gen_statem that declares whether the stage is going to use handle_demand or a cast_subscribe. For example return:

def init(state), do: {:handle_demand_function, state}

Then handle_demand is still optional but init/1 declares what is used. We could run into confusing situations with code reloading if we use function_exported?.

@josevalim
Copy link
Member Author

So what if we do this, from init you must return the roles you want to perform and a set of options that will be used if someone eventually subscribe. Examples:

* `{:ok, state, [:producer]}` - it is going to act as a producer (subscriptions will fail)
* `{:ok, state, [:producer, :consumer]}` - it is going to act as both producer and consumer
* `{:ok, state, [:consumer, min_demand: 15]}` - it is going to act as consumer with initial demand of 15

When you call cast_subscribe, you can call it as: GenStage.cast_subscribe(consumer, to: producer, min_demand: 30). We would still invoke both handle_demand/2 and handle_event/2 callbacks which will be optional.

@fishcakez
Copy link
Member

@josevalim sounds good!

@josevalim
Copy link
Member Author

For those following at home, I have renamed the project to GenBroker and pushed a default documentation and behaviour description for GenStage: https://github.com/elixir-lang/gen_broker/blob/master/lib/gen_stage.ex - it contains many updates to this proposal, including the new init/1 return values and a description of both dynamic and overflown event buffers.

Next step is to specify GenStage message protocol in the docs (which is slightly different than the one in this PR) and describe/document GenBroker itself.

@mrkaspa
Copy link

mrkaspa commented May 21, 2016

@josevalim what should happen when the producer can't satisfy all the demand of the consumer, for example the producer reads data from a queue every second, the consumer asks for 100 but the producer can only send 10, let's say that after 5 seconds there be more data in the queue, should it trigger the handle_demand function?

@josevalim
Copy link
Member Author

The handle_demand function is invoked as soon as the producer gets the demand. This way the producer can store how much data the consumer needs and at first we don't care how long it takes to fulfil the demand. Whenever it has data, it sends downstream, in a way the demand cannot be exceeded.

@josevalim
Copy link
Member Author

GenStage is in master. What is left to be implemented is marked as TODO in the code. GenBroker didn't make the cut. Instead we have something called a dispatcher that runs as part of producer stages. If we are confident this is the way to go, we will eventually rename the project to gen_stage.

@dweinstein
Copy link

what happened to handle_demand/3 that included the from. somehow we only ended up with handle_demand/2 which does not include from

@josevalim
Copy link
Member Author

@dweinstein GenStage has the idea of dispatcher which is responsible for tracking and abstracting consumers. Therefore you can't target specific consumers, as this would violate the dispatcher contract. You can however, for example send the information to another producer, who will distribute that according to a subset of your consumers or similar.

@dweinstein
Copy link

dweinstein commented Aug 1, 2018

@josevalim thanks for your reply. got it. the from seemed like it'd be something useful to track for metrics reasons at least for my current use case. Like to understand which consumers are asking for demand and how much. locally I hacked the lib/gen_stage.ex to pass from by pulling it out of args in the dispatcher_callback (e.g., around here

defp dispatcher_callback(callback, args, %{dispatcher_mod: dispatcher_mod} = stage) do
).

-   defp dispatcher_callback(callback, args, %{dispatcher_mod: dispatcher_mod} = stage) do
+  defp dispatcher_callback(callback, [_, {consumer_pid, _}, _], %{dispatcher_mod: dispatcher_mod} = stage) do
...
-                noreply_callback(:handle_demand, [counter, state], stage)
+                noreply_callback(:handle_demand, [counter, consumer_pid, state], stage)
...

@josevalim
Copy link
Member Author

@dweinstein if the reasoning is metrics, then a dispatcher is definitely the way to go. It was provided exactly so developers can change how demand is tracked/served without changing their code.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Development

No branches or pull requests

4 participants