Skip to content

Commit

Permalink
Initial version
Browse files Browse the repository at this point in the history
  • Loading branch information
msaraiva committed Nov 9, 2018
0 parents commit 8762cce
Show file tree
Hide file tree
Showing 15 changed files with 756 additions and 0 deletions.
4 changes: 4 additions & 0 deletions .formatter.exs
@@ -0,0 +1,4 @@
# Used by "mix format"
[
inputs: ["{mix,.formatter}.exs", "{config,lib,test}/**/*.{ex,exs}"]
]
25 changes: 25 additions & 0 deletions .gitignore
@@ -0,0 +1,25 @@
# The directory Mix will write compiled artifacts to.
/_build/

# If you run "mix test --cover", coverage assets end up here.
/cover/

# The directory Mix downloads your dependencies sources to.
/deps/

# Where 3rd-party dependencies like ExDoc output generated docs.
/doc/

# Ignore .fetch files in case you like to edit your project deps locally.
/.fetch

# If the VM crashes, it generates a dump, let's ignore it too.
erl_crash.dump

# Also ignore archive artifacts (built via "mix archive.build").
*.ez

# Ignore package tarball (built via "mix hex.build").
broadway-*.tar

.elixir_ls
21 changes: 21 additions & 0 deletions README.md
@@ -0,0 +1,21 @@
# Broadway

**TODO: Add description**

## Installation

If [available in Hex](https://hex.pm/docs/publish), the package can be installed
by adding `broadway` to your list of dependencies in `mix.exs`:

```elixir
def deps do
[
{:broadway, "~> 0.1.0"}
]
end
```

Documentation can be generated with [ExDoc](https://github.com/elixir-lang/ex_doc)
and published on [HexDocs](https://hexdocs.pm). Once published, the docs can
be found at [https://hexdocs.pm/broadway](https://hexdocs.pm/broadway).

1 change: 1 addition & 0 deletions config/config.exs
@@ -0,0 +1 @@
use Mix.Config
214 changes: 214 additions & 0 deletions lib/broadway.ex
@@ -0,0 +1,214 @@
defmodule Broadway do
use GenServer

@callback handle_message(event :: any, context :: any) :: {:partition, key :: any, event :: any}
@callback handle_batch(partition :: any, batch :: any, context :: any) :: {:ok, batch :: any}

alias Broadway.{Processor, Batcher, Consumer}

defmodule State do
defstruct name: nil,
module: nil,
processors_config: nil,
producers_config: [],
publishers_config: [],
context: nil,
supervisor: nil,
producers: [],
processors: [],
batchers: [],
consumers: []
end

def start_link(module, context, opts) do
opts = Keyword.put(opts, :name, opts[:name] || broadway_name())
GenServer.start_link(__MODULE__, {module, context, opts}, opts)
end

def init({module, context, opts}) do
state =
opts
|> init_state(module, context)
|> init_supervisor()
|> init_producers()
|> init_processors()
|> init_batchers_and_consumers()

{:ok, state}
end

defp init_state(opts, module, context) do
broadway_name = Keyword.fetch!(opts, :name)
producers_config = Keyword.fetch!(opts, :producers)
publishers_config = Keyword.get(opts, :publishers) |> normalize_publishers_config()
processors_config = Keyword.get(opts, :processors) |> normalize_processors_config()

%State{
name: broadway_name,
module: module,
processors_config: processors_config,
producers_config: producers_config,
publishers_config: publishers_config,
context: context
}
end

def init_supervisor(%State{name: name} = state) do
supervisor_name = Module.concat(name, "Supervisor")
{:ok, supervisor} = Supervisor.start_link([], name: supervisor_name, strategy: :rest_for_one)

%State{state | supervisor: supervisor}
end

defp init_producers(state) do
%State{
name: broadway_name,
producers_config: producers_config,
supervisor: supervisor
} = state

producers =
for {[module: mod, arg: args], index} <- Enum.with_index(producers_config, 1) do
mod_name = mod |> Module.split() |> Enum.join(".")
name = process_name(broadway_name, mod_name, index, 1)
opts = [name: name]

spec =
Supervisor.child_spec(
%{start: {mod, :start_link, [args, opts]}},
id: make_ref()
)

{:ok, _} = Supervisor.start_child(supervisor, spec)
name
end

%State{state | producers: producers}
end

defp init_processors(state) do
%State{
name: broadway_name,
module: module,
processors_config: processors_config,
context: context,
supervisor: supervisor,
publishers_config: publishers_config,
producers: producers,
supervisor: supervisor
} = state

n_processors = Keyword.fetch!(processors_config, :stages)

processors =
for index <- 1..n_processors do
args = [
publishers_config: publishers_config,
module: module,
context: context,
producers: producers
]

name = process_name(broadway_name, "Processor", index, n_processors)
opts = [name: name]
spec = Supervisor.child_spec({Processor, [args, opts]}, id: make_ref())
{:ok, _} = Supervisor.start_child(supervisor, spec)
name
end

%State{state | processors: processors}
end

defp init_batchers_and_consumers(state) do
%State{
name: broadway_name,
module: module,
context: context,
supervisor: supervisor,
publishers_config: publishers_config,
supervisor: supervisor,
processors: processors
} = state

stages =
Enum.reduce(publishers_config, %{batchers: [], consumers: []}, fn publisher, acc ->
{batcher, batcher_spec} = init_batcher(broadway_name, publisher, processors)
{:ok, _} = Supervisor.start_child(supervisor, batcher_spec)

{consumer, consumer_spec} =
init_consumer(broadway_name, module, context, publisher, batcher)

{:ok, _} = Supervisor.start_child(supervisor, consumer_spec)

%{acc | batchers: [batcher | acc.batchers], consumers: [consumer | acc.consumers]}
end)

%State{
state
| batchers: Enum.reverse(stages.batchers),
consumers: Enum.reverse(stages.consumers)
}
end

defp init_batcher(broadway_name, publisher, processors) do
{key, options} = publisher
batcher = process_name(broadway_name, "Batcher", key)
opts = [name: batcher]

spec =
Supervisor.child_spec(
{Batcher, [options ++ [partition: key, processors: processors], opts]},
id: make_ref()
)

{batcher, spec}
end

defp init_consumer(broadway_name, module, context, publisher, batcher) do
{key, _options} = publisher
consumer = process_name(broadway_name, "Consumer", key)
opts = [name: consumer]

spec =
Supervisor.child_spec(
{Consumer, [[module: module, context: context, batcher: batcher], opts]},
id: make_ref()
)

{consumer, spec}
end

defp normalize_processors_config(nil) do
[stages: :erlang.system_info(:schedulers_online) * 2]
end

defp normalize_processors_config(config) do
config
end

defp normalize_publishers_config(nil) do
[{:default, []}]
end

defp normalize_publishers_config(publishers) do
Enum.map(publishers, fn
publisher when is_atom(publisher) -> {publisher, []}
publisher -> publisher
end)
end

defp broadway_name() do
:"Broadway#{System.unique_integer([:positive, :monotonic])}"
end

defp process_name(prefix, type, key) do
:"#{prefix}.#{type}_#{key}"
end

defp process_name(prefix, type, index, max) do
index
|> to_string()
|> String.pad_leading(String.length("#{max}"), "0")
|> (&:"#{prefix}.#{type}_#{&1}").()
end
end
5 changes: 5 additions & 0 deletions lib/broadway/acknowledger.ex
@@ -0,0 +1,5 @@
defmodule Broadway.Acknowledger do
alias Broadway.Message

@callback ack(successful :: [Message.t()], failed :: [Message.t()], context :: any) :: no_return
end
7 changes: 7 additions & 0 deletions lib/broadway/batch.ex
@@ -0,0 +1,7 @@
defmodule Broadway.Batch do
defstruct [
:events,
:partition,
:batcher
]
end
77 changes: 77 additions & 0 deletions lib/broadway/batcher.ex
@@ -0,0 +1,77 @@
defmodule Broadway.Batcher do
use GenStage

defmodule State do
defstruct [
:batch_size,
:batch_timeout,
:partition,
:pending_events
]
end

def start_link(args, opts) do
GenStage.start_link(__MODULE__, args, opts)
end

def child_spec(args) do
%{start: {__MODULE__, :start_link, args}}
end

def init(args) do
batch_timeout = Keyword.get(args, :batch_timeout, 1000)
batch_size = Keyword.get(args, :batch_size, 100)
partition = Keyword.fetch!(args, :partition)

subscribe_to =
args
|> Keyword.fetch!(:processors)
|> Enum.map(&{&1, partition: partition, max_demand: 4, min_demand: 2})

schedule_flush_pending(batch_timeout)

{
:producer_consumer,
%State{
partition: partition,
batch_size: batch_size,
batch_timeout: batch_timeout,
pending_events: []
},
subscribe_to: subscribe_to
}
end

def handle_events(events, _from, state) do
%State{pending_events: pending_events, batch_size: batch_size} = state
do_handle_events(pending_events ++ events, state, batch_size)
end

def handle_info(:flush_pending, state) do
%State{pending_events: pending_events, batch_timeout: batch_timeout} = state
schedule_flush_pending(batch_timeout)
do_handle_events(pending_events, state, 1)
end

defp do_handle_events(events, state, min_size) do
%State{batch_size: batch_size, partition: partition} = state
{batch_events, new_pending_events} = split_events(events, partition, batch_size, min_size)

{:noreply, batch_events, %State{state | pending_events: new_pending_events}}
end

defp split_events(events, partition, batch_size, min_size) do
{batch_events, pending_events} = Enum.split(events, batch_size)

if length(batch_events) >= min_size do
{[%Broadway.Batch{events: batch_events, partition: partition, batcher: self()}],
pending_events}
else
{[], events}
end
end

defp schedule_flush_pending(delay) do
Process.send_after(self(), :flush_pending, delay)
end
end

0 comments on commit 8762cce

Please sign in to comment.