Skip to content
This repository has been archived by the owner on Apr 4, 2024. It is now read-only.

Commit

Permalink
Run a separate Batcher for each project
Browse files Browse the repository at this point in the history
  • Loading branch information
notriddle committed Jan 20, 2017
1 parent 31bf3ee commit db2a3c1
Show file tree
Hide file tree
Showing 7 changed files with 158 additions and 100 deletions.
5 changes: 4 additions & 1 deletion lib/aelita2.ex
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,10 @@ defmodule Aelita2 do

run_batcher = Application.get_env(:aelita2, Aelita2.Batcher)[:run]
children = if run_batcher do
children ++ [worker(Aelita2.Batcher, [])]
children ++ [
supervisor(Aelita2.Batcher.Supervisor, []),
worker(Aelita2.Batcher.Registry, []),
]
else
children
end
Expand Down
78 changes: 47 additions & 31 deletions lib/aelita2/batcher.ex
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
defmodule Aelita2.Batcher do
@moduledoc """
The "Batcher" manages the backlog of batches that each project has.
A "Batcher" manages the backlog of batches a project has.
It implements this set of rules:
* When a patch is reviewed ("r+'ed"),
Expand Down Expand Up @@ -30,57 +30,58 @@ defmodule Aelita2.Batcher do
alias Aelita2.Status
alias Aelita2.LinkPatchBatch

@poll_period 1000
@poll_period 2000
@github_api Application.get_env(:aelita2, Aelita2.GitHub)[:api]

# Public API

def start_link do
GenServer.start_link(__MODULE__, :ok, name: Aelita2.Batcher)
def start_link(project_id) do
GenServer.start_link(__MODULE__, project_id)
end

def reviewed(patch_id) when is_integer(patch_id) do
GenServer.cast(Aelita2.Batcher, {:reviewed, patch_id})
def reviewed(pid, patch_id) when is_integer(patch_id) do
GenServer.cast(pid, {:reviewed, patch_id})
end

def status(commit, identifier, state, url) do
GenServer.cast(Aelita2.Batcher, {:status, commit, identifier, state, url})
def status(pid, stat) do
GenServer.cast(pid, {:status, stat})
end

def cancel(patch_id) when is_integer(patch_id) do
GenServer.cast(Aelita2.Batcher, {:cancel, patch_id})
def cancel(pid, patch_id) when is_integer(patch_id) do
GenServer.cast(pid, {:cancel, patch_id})
end

def cancel_all(project_id) when is_integer(project_id) do
GenServer.cast(Aelita2.Batcher, {:cancel_all, project_id})
def cancel_all(pid) do
GenServer.cast(pid, {:cancel_all})
end

# Server callbacks

def init(:ok) do
def init(project_id) do
Process.send_after(self(), :poll, @poll_period)
{:ok, :ok}
{:ok, project_id}
end

def handle_cast(args, state) do
Repo.transaction(fn -> do_handle_cast(args) end)
{:noreply, state}
def handle_cast(args, project_id) do
Repo.transaction(fn -> do_handle_cast(args, project_id) end)
{:noreply, project_id}
end

def do_handle_cast({:reviewed, patch_id}) do
def do_handle_cast({:reviewed, patch_id}, project_id) do
patch = Repo.get!(Patch.all(:awaiting_review), patch_id)
batch = get_new_batch(patch.project_id)
project_id = batch.project_id
^project_id = patch.project_id
batch = get_new_batch(project_id)
^project_id = batch.project_id
params = %{batch_id: batch.id, patch_id: patch.id}
Repo.insert!(LinkPatchBatch.changeset(%LinkPatchBatch{}, params))
end

def do_handle_cast({:status, commit, identifier, state, url}) do
def do_handle_cast({:status, {commit, identifier, state, url}}, project_id) do
batch = Repo.all(Batch.get_assoc_by_commit(commit))
state = Status.numberize_state(state)
case batch do
[batch] ->
^project_id = batch.project_id
batch.id
|> Status.get_for_batch(identifier)
|> Repo.update_all([set: [state: state, url: url]])
Expand All @@ -91,10 +92,11 @@ defmodule Aelita2.Batcher do
end
end

def do_handle_cast({:cancel, patch_id}) do
def do_handle_cast({:cancel, patch_id}, project_id) do
batch = patch_id
|> Batch.all_for_patch(:incomplete)
|> Repo.one!()
^project_id = batch.project_id
if batch.state == Batch.numberize_state(:running) do
cancel_batch(batch, patch_id)
else
Expand All @@ -107,7 +109,7 @@ defmodule Aelita2.Batcher do
end
end

def do_handle_cast({:cancel_all, project_id}) do
def do_handle_cast({:cancel_all}, project_id) do
canceled = Batch.numberize_state(:canceled)
project_id
|> Batch.all_for_project(:waiting)
Expand All @@ -120,20 +122,34 @@ defmodule Aelita2.Batcher do
|> Enum.each(&Repo.update!/1)
end

def handle_info(:poll, :ok) do
Repo.transaction(&poll_all/0)
def handle_info(:poll, project_id) do
Repo.transaction(fn -> poll(project_id) end)
Process.send_after(self(), :poll, @poll_period)
{:noreply, :ok}
{:noreply, project_id}
end

# Private implementation details

defp poll_all do
:incomplete
|> Batch.all_assoc()
defp poll(project_id) do
project = Repo.get(Project, project_id)
project_id
|> Batch.all_for_project(:incomplete)
|> Repo.all()
|> Aelita2.Batcher.Queue.organize_batches_into_project_queues()
|> Enum.each(&poll_batches/1)
|> Enum.map(&%Batch{&1 | project: project})
|> sort_batches()
|> poll_batches()
end

def sort_batches(batches) do
sorted_batches = Enum.sort_by(batches, &{-&1.state, &1.last_polled})
new_batches = Enum.dedup_by(sorted_batches, &(&1.id))
state = if new_batches != [] and hd(new_batches).state == 1 do
:running
else
Enum.each(new_batches, fn batch -> 0 = batch.state end)
:waiting
end
{state, new_batches}
end

defp poll_batches({:waiting, batches}) do
Expand Down
72 changes: 72 additions & 0 deletions lib/aelita2/batcher/registry.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,72 @@
defmodule Aelita2.Batcher.Registry do
@moduledoc """
The "Batcher" manages the backlog of batches that each project has.
This is the registry of each individual batcher.
It starts the batcher if it doesn't exist,
restarts it if it crashes,
and logs the crashes because that's needed sometimes.
Note that the batcher and registry are always on the same node.
Sharding between them will be done by directing which registry to go to.
"""

use GenServer

alias Aelita2.Batcher
alias Aelita2.Project
alias Aelita2.Repo

@name Aelita2.Batcher.Registry

# Public API

def start_link do
GenServer.start_link(__MODULE__, :ok, name: @name)
end

def get(project_id) when is_integer(project_id) do
GenServer.call(@name, {:get, project_id})
end

# Server callbacks

def init(:ok) do
names = Project
|> Repo.all()
|> Enum.map(&{&1.id, do_start(&1.id)})
|> Map.new()
refs = names
|> Enum.map(&{Process.monitor(elem(&1, 1)), elem(&1, 0)})
|> Map.new()
{:ok, {names, refs}}
end

def do_start(project_id) do
{:ok, pid} = Batcher.Supervisor.start(project_id)
pid
end

def handle_call({:get, project_id}, _from, {names, refs}) do
{pid, names, refs} = case names[project_id] do
nil ->
pid = do_start(project_id)
names = Map.put(names, project_id, pid)
ref = Process.monitor(pid)
refs = Map.put(names, ref, project_id)
{pid, names, refs}
pid ->
{pid, names, refs}
end
{:reply, pid, {names, refs}}
end

def handle_info({:DOWN, ref, :process, _pid, _reason}, {names, refs}) do
{project_id, refs} = Map.pop(refs, ref)
names = Map.delete(names, project_id)
{:noreply, {names, refs}}
end

def handle_info(_msg, state) do
{:noreply, state}
end
end
24 changes: 24 additions & 0 deletions lib/aelita2/batcher/supervisor.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,24 @@
defmodule Aelita2.Batcher.Supervisor do
@moduledoc """
The supervisor of all of the batchers.
"""
use Supervisor

@name Aelita2.Batcher.Supervisor

def start_link do
Supervisor.start_link(__MODULE__, :ok, name: @name)
end

def start(project_id) do
Supervisor.start_child(@name, [project_id])
end

def init(:ok) do
children = [
worker(Aelita2.Batcher, [], restart: :temporary)
]

supervise(children, strategy: :simple_one_for_one)
end
end
63 changes: 0 additions & 63 deletions test/batcher/queue_test.exs

This file was deleted.

4 changes: 3 additions & 1 deletion web/controllers/project_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,9 @@ defmodule Aelita2.ProjectController do
end

def cancel_all(conn, project, _params) do
Batcher.cancel_all(project.id)
project.id
|> Batcher.Registry.get()
|> Batcher.cancel_all()
conn
|> put_flash(:ok, "Canceled all running batches")
|> redirect(to: project_path(conn, :show, project))
Expand Down
12 changes: 8 additions & 4 deletions web/controllers/webhook_controller.ex
Original file line number Diff line number Diff line change
Expand Up @@ -137,11 +137,14 @@ defmodule Aelita2.WebhookController do
identifier = conn.body_params["context"]
commit = conn.body_params["sha"]
url = conn.body_params["target_url"]
repo_xref = conn.body_params["repository"]["id"]
state = @github_api.map_state_to_status(conn.body_params["state"])
Aelita2.Batcher.status(commit, identifier, state, url)
project = Repo.get_by(Project, repo_xref: repo_xref)
batcher = Batcher.Registry.get(project.id)
Batcher.status(batcher, {commit, identifier, state, url})

commit_msg = conn.body_params["commit"]["commit"]["message"]
err_msg = Aelita2.Batcher.Message.generate_staging_tmp_message(identifier)
err_msg = Batcher.Message.generate_staging_tmp_message(identifier)
case commit_msg do
"-bors-staging-tmp-" <> pr_xref when not is_nil err_msg ->
conn.body_params["repository"]["id"]
Expand Down Expand Up @@ -215,6 +218,7 @@ defmodule Aelita2.WebhookController do
link = Repo.get_by(LinkUserProject,
project_id: project.id,
user_id: commenter.id)
batcher = Batcher.Registry.get(project.id)
case {activated, deactivated, link} do
{_, _, nil} ->
project.repo_xref
Expand All @@ -225,9 +229,9 @@ defmodule Aelita2.WebhookController do
p.pr_xref,
":lock: Permission denied")
{_activated, :nomatch, _} ->
Batcher.reviewed(p.id)
Batcher.reviewed(batcher, p.id)
{:nomatch, _deactivated, _} ->
Batcher.cancel(p.id)
Batcher.cancel(batcher, p.id)
end
end
end
Expand Down

0 comments on commit db2a3c1

Please sign in to comment.