Permalink
Browse files

Add exq

  • Loading branch information...
rrrene committed Jun 21, 2016
1 parent 61afc26 commit 0bd4fd8a88c21c53331958f506297f2e82d92651
@@ -64,4 +64,10 @@ config :rollbax,
environment: "",
enabled: :log
config :exq,
host: "127.0.0.1",
port: 6379,
namespace: "exq",
queues: [{"priority", 10}, {"default", 10}]
import_config "dev.secret.exs"
@@ -11,7 +11,6 @@ defmodule HexFaktor do
supervisor(HexFaktor.Endpoint, []),
# Start the Ecto repository
supervisor(HexFaktor.Repo, []),
supervisor(Refaktor.Worker.Supervisor, []),
# Here you could define other workers and supervisors as children
# worker(HexFaktor.Worker, [arg1, arg2, arg3]),
]
@@ -34,7 +34,7 @@ defmodule Refaktor.Builder do
{:ok, build, pid}
end
def run_clone(build, git_repo, git_branch, jobs_to_schedule, meta, parent_pid) do
def run_clone(build, git_repo, git_branch, jobs_to_schedule, meta) do
progress_callback = ProgressCallback.cast(meta["progress_callback_data"])
progress_callback.("cloning")
@@ -56,34 +56,32 @@ defmodule Refaktor.Builder do
# now schedule the jobs for execution
pids =
jobs_to_schedule
|> Enum.map(&run_job(&1, meta, parent_pid))
|> Enum.map(&run_job(&1, meta))
{:ok, build, pids}
{:error, _job_id, _job_dir, _output, _exit_code} = error_tuple ->
# we couldn't even clone this repo
progress_callback.("error")
send(parent_pid, error_tuple)
jobs_to_schedule
|> Enum.each(fn({_, job_id}) ->
|> Enum.each(fn(%{"job_id" => job_id}) ->
BuildJob.update_status(job_id, "failure", error_tuple)
BuildJob.update_timestamp(job_id, :finished_at)
end)
error_tuple
end
end
defp run_job(%{"job" => job, "job_id" => job_id}, meta, parent) do
result = Refaktor.Worker.JobRunner.run_job(job_id, Job.dir(job_id), job, meta)
send(parent, {:job_done, job_id, result})
defp run_job(%{"job" => job, "job_id" => job_id}, meta) do
Refaktor.Worker.JobRunner.run_job(job_id, Job.dir(job_id), job, meta)
end
defp duplicate_job_dir!(first_job_dir, jobs_to_schedule) do
jobs_to_schedule
|> Enum.slice((1..-1))
|> Enum.each(fn %{"job_id" => job_id} ->
File.cp_r first_job_dir, Job.dir(job_id)
end)
File.cp_r first_job_dir, Job.dir(job_id)
end)
end
defp update_git_revision(%{"job" => _job, "job_id" => job_id}, git_revision_id) do
@@ -35,6 +35,10 @@ defmodule Refaktor.Persistence.Build do
preload: [build_jobs: :git_revision])
end
def find_by_id(id) do
Repo.one(from r in Build, where: r.id == ^id)
end
def last(count \\ 5) do
Repo.all(from r in Build,
order_by: [desc: :id],
@@ -17,4 +17,8 @@ defmodule Refaktor.Persistence.GitRepo do
def by_uid(uid) do
Repo.one(from r in GitRepo, where: r.uid == ^uid)
end
def find_by_id(id) do
Repo.one(from r in GitRepo, where: r.id == ^id)
end
end
@@ -1,25 +1,8 @@
defmodule Refaktor.Worker.JobRunner do
use GenServer
alias HexFaktor.Persistence.Project
alias Refaktor.Job
alias Refaktor.Persistence.BuildJob
alias HexFaktor.Persistence.Project
def start_link([]) do
GenServer.start_link(__MODULE__, [], [])
end
def init(state) do
{:ok, state}
end
def handle_call({:run_clone, build, git_repo, branch_name, jobs_to_schedule, meta, parent}, _from, state) do
Refaktor.Builder.run_clone(build, git_repo, branch_name, jobs_to_schedule, meta, parent)
{:reply, [], state}
end
def run_job(job_id, job_dir, job, meta) do
update_job(job_id, "running", [], :started_at)
@@ -1,44 +1,28 @@
defmodule Refaktor.Worker.Supervisor do
use Supervisor
alias Refaktor.Persistence.Build
alias Refaktor.Persistence.GitRepo
alias Refaktor.Persistence.GitBranch
@pool_name :refaktor_pool
@pool_size Application.get_env(:hex_faktor, :worker_pool_size)
@pool_overflow Application.get_env(:hex_faktor, :worker_pool_overflow)
@pool_timeout :infinity
def start_link do
Supervisor.start_link(__MODULE__, [])
def enqueue_clone(build, git_repo, git_branch, jobs_to_schedule, meta) do
queue = trigger_to_queue(build.trigger)
params = [build.id, git_repo.id, git_branch.id, jobs_to_schedule, meta]
Exq.enqueue(Exq, queue, __MODULE__, params) # calls perform below
end
def init([]) do
poolboy_config = [
{:name, {:local, @pool_name}},
{:worker_module, Refaktor.Worker.JobRunner},
{:size, @pool_size},
{:max_overflow, @pool_overflow}
]
children = [:poolboy.child_spec(@pool_name, poolboy_config, [])]
options = [strategy: :one_for_one]
def perform(build_id, git_repo_id, git_branch_id, jobs_to_schedule, meta) do
build = Build.find_by_id(build_id)
git_repo = GitRepo.find_by_id(git_repo_id)
git_branch = GitBranch.find_by_id(git_branch_id)
jobs_to_schedule = jobs_to_schedule |> Enum.map(&stringify_job/1)
supervise(children, options)
Refaktor.Builder.run_clone(build, git_repo, git_branch, jobs_to_schedule, meta)
end
def enqueue_clone(build, git_repo, branch_name, jobs_to_schedule, meta) do
parent = self()
spawn(fn() ->
%{"job_id" => first_job_id} = Enum.at(jobs_to_schedule, 0)
run_clone(build, git_repo, branch_name, jobs_to_schedule, meta, parent)
end)
# job was serialized into a String/binary and needs to be an Atom again
defp stringify_job(%{"job" => job, "job_id" => job_id}) do
%{"job" => Module.safe_concat([job]), "job_id" => job_id}
end
def run_clone(build, git_repo, branch_name, jobs_to_schedule, meta, parent) do
:poolboy.transaction(
@pool_name,
fn(pid) ->
%{"job_id" => first_job_id} = Enum.at(jobs_to_schedule, 0)
:gen_server.call(pid, {:run_clone, build, git_repo, branch_name, jobs_to_schedule, meta, parent})
end,
@pool_timeout
)
end
defp trigger_to_queue("manual"), do: "priority"
defp trigger_to_queue(_), do: "default"
end
@@ -19,7 +19,7 @@ defmodule HexFaktor.Mixfile do
def application do
[mod: {HexFaktor, []},
applications: [:phoenix, :phoenix_html, :cowboy, :logger, :gettext,
:phoenix_ecto, :postgrex, :oauth2, :poolboy, :rollbax]]
:phoenix_ecto, :postgrex, :oauth2, :poolboy, :rollbax, :exq]]
end
# Specifies which paths to compile per environment.
@@ -42,9 +42,10 @@ defmodule HexFaktor.Mixfile do
{:poolboy, "~> 1.5"},
{:oauth2, "~> 0.5.0"},
{:mailgun, "~> 0.1.1"},
{:timex, "~> 1.0.0"},
{:timex, "~> 1.0"},
{:timex_ecto, "~> 0.7.0"},
{:rollbax, "~> 0.6"}
{:rollbax, "~> 0.6"},
{:exq, "~> 0.6.0"}
]
end
@@ -1,15 +1,16 @@
%{"combine": {:hex, :combine, "0.7.0", "2ac6ae852a9835fe8189af18121cddd5bed2677f5df706dc0d208af668ab845d", [:mix], []},
"connection": {:hex, :connection, "1.0.2", "f4a06dd3ecae4141aa66f94ce92ea4c4b8753069472814932f1cadbc3078ab80", [:mix], []},
%{"combine": {:hex, :combine, "0.8.0", "390fe5f632a8c890d378abc697ca7c4c52cd3e38bb232a5cad1677cd4af013df", [:mix], []},
"connection": {:hex, :connection, "1.0.3", "3145f7416be3df248a4935f24e3221dc467c1e3a158d62015b35bd54da365786", [:mix], []},
"cowboy": {:hex, :cowboy, "1.0.4", "a324a8df9f2316c833a470d918aaf73ae894278b8aa6226ce7a9bf699388f878", [:rebar, :make], [{:cowlib, "~> 1.0.0", [hex: :cowlib, optional: false]}, {:ranch, "~> 1.0", [hex: :ranch, optional: false]}]},
"cowlib": {:hex, :cowlib, "1.0.2", "9d769a1d062c9c3ac753096f868ca121e2730b9a377de23dec0f7e08b1df84ee", [:make], []},
"decimal": {:hex, :decimal, "1.1.0", "3333732f17a90ff3057d7ab8c65f0930ca2d67e15cca812a91ead5633ed472fe", [:mix], []},
"ecto": {:hex, :ecto, "1.1.1", "9432c1f7da8d91ffb7ee1ac7f8a7b55c893a227553e5ed7f4d59e0423d48e551", [:mix], [{:sbroker, "~> 0.7", [hex: :sbroker, optional: true]}, {:postgrex, "~> 0.10", [hex: :postgrex, optional: true]}, {:poolboy, "~> 1.4", [hex: :poolboy, optional: false]}, {:poison, "~> 1.0", [hex: :poison, optional: true]}, {:mariaex, "~> 0.5", [hex: :mariaex, optional: true]}, {:decimal, "~> 1.0", [hex: :decimal, optional: false]}]},
"exjsx": {:hex, :exjsx, "3.2.0"},
"exq": {:hex, :exq, "0.6.4", "890c06e2263d8231bba2829ce6e648ac9e12f661b72bf9905f9aaab4629c0b71", [:mix], [{:uuid, ">= 1.0.0", [hex: :uuid, optional: false]}, {:timex, ">= 1.0.0", [hex: :timex, optional: false]}, {:redix, ">= 0.3.4", [hex: :redix, optional: false]}, {:poison, ">= 1.2.0 and < 2.0.0", [hex: :poison, optional: false]}]},
"fs": {:hex, :fs, "0.9.2", "ed17036c26c3f70ac49781ed9220a50c36775c6ca2cf8182d123b6566e49ec59", [:rebar], []},
"gettext": {:hex, :gettext, "0.9.0", "b732430dce0cbb3e370d89cdf6a7dbe48647a708f92754262601b62a0eb47e6f", [:mix], []},
"hackney": {:hex, :hackney, "1.3.2", "43bd07ab88753f5e136e38fddd2a09124bee25733b03361eeb459d0173fc17ab", [:rebar, :make], [{:ssl_verify_hostname, "~> 1.0.5", [hex: :ssl_verify_hostname, optional: false]}, {:idna, "~> 1.0.2", [hex: :idna, optional: false]}]},
"httpoison": {:hex, :httpoison, "0.7.5", "7f4a1dc1245f6a4a7d944786b75a44c94056bd54830299229e4b57fd75cb9daa", [:mix], [{:hackney, "~> 1.3.1", [hex: :hackney, optional: false]}]},
"idna": {:hex, :idna, "1.0.2", "397e3d001c002319da75759b0a81156bf11849c71d565162436d50020cb7265e", [:make], []},
"idna": {:hex, :idna, "1.0.3", "d456a8761cad91c97e9788c27002eb3b773adaf5c893275fc35ba4e3434bbd9b", [:rebar3], []},
"jsx": {:hex, :jsx, "2.6.2"},
"mailgun": {:hex, :mailgun, "0.1.2", "37c1306675cf27a66a13dea3c9d479da2a990f0aed296b5addbd0b07529b667d", [:mix], [{:poison, "~> 1.4", [hex: :poison, optional: false]}]},
"mimetype_parser": {:hex, :mimetype_parser, "0.1.0", "069fcf6aa98b1eb90d879b3035f3392acecef1ee212ddc59fa930a6cc15d85e3", [:mix], []},
@@ -23,9 +24,11 @@
"poolboy": {:hex, :poolboy, "1.5.1", "6b46163901cfd0a1b43d692657ed9d7e599853b3b21b95ae5ae0a777cf9b6ca8", [:rebar], []},
"postgrex": {:hex, :postgrex, "0.10.0", "98cf581bbb921d696db261d1a9e1740252714dbea1f9224d3291d58f2d88ea82", [:mix], [{:decimal, "~> 1.0", [hex: :decimal, optional: false]}, {:connection, "~> 1.0", [hex: :connection, optional: false]}]},
"ranch": {:hex, :ranch, "1.2.0", "b286a948a0706a700a9f577e5cecbb2dc66097ea79f3ddb20ba5536069bdb7aa", [:make], []},
"redix": {:hex, :redix, "0.3.6", "882b3b9a81ac1245277a0ee60a0f33fef39dc5f988f513b605c8c93f40f7206f", [:mix], [{:connection, "~> 1.0.0", [hex: :connection, optional: false]}]},
"rollbax": {:hex, :rollbax, "0.6.0", "ed5e4c29ca3b46d3a773f632bddc4cc1be19f28951a643bb2fe904894104f4d7", [:mix], [{:poison, "~> 1.4 or ~> 2.0", [hex: :poison, optional: false]}, {:hackney, "~> 1.1", [hex: :hackney, optional: false]}]},
"ssl_verify_hostname": {:hex, :ssl_verify_hostname, "1.0.6", "45866d958d9ae51cfe8fef0050ab8054d25cba23ace43b88046092aa2c714645", [:make], []},
"tentacat": {:hex, :tentacat, "0.2.1"},
"timex": {:hex, :timex, "1.0.0", "c2f4c86c6ec1af5b6614c85b0afa6bd2422f6d72fa3e8ab42baacb801e3235b5", [:mix], [{:tzdata, "== 0.1.8 or ~> 0.5", [hex: :tzdata, optional: false]}, {:combine, "~> 0.7", [hex: :combine, optional: false]}]},
"timex": {:hex, :timex, "1.0.2", "a07f2234298749c0eeede7e6da2534acf35c2c4529c4240d390828953aafd038", [:mix], [{:tzdata, "~> 0.1.8 or ~> 0.5", [hex: :tzdata, optional: false]}, {:combine, "~> 0.7", [hex: :combine, optional: false]}]},
"timex_ecto": {:hex, :timex_ecto, "0.7.0", "cd1d4cd7ec17fc7a2210e787adcc0fe179b490847b304592de907348bc98ba10", [:mix], [{:timex, ">= 0.19.0", [hex: :timex, optional: false]}, {:ecto, "~> 1.1", [hex: :ecto, optional: false]}]},
"tzdata": {:hex, :tzdata, "0.5.6", "d44c59175b8e9a170b3b17d6933820de98a3f0932f3a0615d884b8c20e93a87a", [:mix], [{:hackney, "~> 1.0", [hex: :hackney, optional: false]}]}}
"tzdata": {:hex, :tzdata, "0.5.8", "a4ffe564783c6519e4df230a5d0e1cf44b7db7f576bcae76d05540b5da5b6143", [:mix], [{:hackney, "~> 1.0", [hex: :hackney, optional: false]}]},
"uuid": {:hex, :uuid, "1.1.4", "36c7734e4c8e357f2f67ba57fb61799d60c20a7f817b104896cca64b857e3686", [:mix], []}}

0 comments on commit 0bd4fd8

Please sign in to comment.