Skip to content

Commit

Permalink
Lock migrations to support multiple concurrent migrators
Browse files Browse the repository at this point in the history
* Locks migrations table in a transaction
* Wraps each migration in a Task, which may have its own transaction
* Propogates errors up, while ensure transactions are committed

Signed-off-by: José Valim <jose.valim@plataformatec.com.br>
  • Loading branch information
blatyo authored and José Valim committed Sep 14, 2017
1 parent 01c47d9 commit d28aeba
Show file tree
Hide file tree
Showing 8 changed files with 202 additions and 125 deletions.
2 changes: 1 addition & 1 deletion integration_test/cases/migrator.exs
Original file line number Diff line number Diff line change
Expand Up @@ -111,7 +111,7 @@ defmodule Ecto.Integration.MigratorTest do
end

defp count_entries() do
length Process.get(:migrations)
PoolRepo.aggregate(SchemaMigration, :count, :version)
end

defp create_migration(num) do
Expand Down
30 changes: 2 additions & 28 deletions integration_test/sql/alter.exs
Original file line number Diff line number Diff line change
Expand Up @@ -58,26 +58,8 @@ defmodule Ecto.Integration.AlterTest do
result ->
assert [%Decimal{}] = result
end

PoolRepo.transaction(fn() ->
assert [%Decimal{}] = PoolRepo.all(values)

assert :ok == down(PoolRepo, 20161112130000, AlterMigrationTwo, log: false)

# optionally fail once with database error when already prepared on
# connection (and clear cache)
try do
PoolRepo.all(values, [mode: :savepoint])
catch
:error, _ ->
assert PoolRepo.all(values) == [1]
else
result ->
assert result == [1]
end
end)

after
assert :ok == down(PoolRepo, 20161112130000, AlterMigrationTwo, log: false)
assert :ok == down(PoolRepo, 20161112120000, AlterMigrationOne, log: false)
end

Expand All @@ -101,16 +83,8 @@ defmodule Ecto.Integration.AlterTest do
result ->
assert result == {1, nil}
end

PoolRepo.transaction(fn() ->
assert PoolRepo.update_all(values, [set: [value: Decimal.new(5)]]) == {1, nil}

assert :ok == down(PoolRepo, 20161112130000, AlterMigrationTwo, log: false)

assert PoolRepo.update_all(values, [set: [value: 6]]) == {1, nil}
end)

after
assert :ok == down(PoolRepo, 20161112130000, AlterMigrationTwo, log: false)
assert :ok == down(PoolRepo, 20161112120000, AlterMigrationOne, log: false)
end
end
10 changes: 10 additions & 0 deletions lib/ecto/adapter/migration.ex
Original file line number Diff line number Diff line change
Expand Up @@ -50,4 +50,14 @@ defmodule Ecto.Adapter.Migration do
* `:log` - When false, does not log begin/commit/rollback queries
"""
@callback execute_ddl(repo :: Ecto.Repo.t, command, options :: Keyword.t) :: :ok | no_return

@doc """
The callback responsible for locking the migrations table
and emitting the locked versions for callback execution.
It returns the result of calling the given function with a
list of versions.
"""
@callback lock_for_migrations(repo :: Ecto.Repo.t, query :: Ecto.Query.t,
options :: Keyword.t, ([version :: integer] -> result)) :: result when result: var
end
23 changes: 22 additions & 1 deletion lib/ecto/adapters/sql.ex
Original file line number Diff line number Diff line change
Expand Up @@ -136,8 +136,14 @@ defmodule Ecto.Adapters.SQL do
:ok
end

@doc false
def lock_for_migrations(repo, query, opts, fun) do
Ecto.Adapters.SQL.lock_for_migrations(repo, query, opts, fun)
end

defoverridable [prepare: 2, execute: 6, insert: 6, update: 6, delete: 4, insert_all: 7,
execute_ddl: 3, loaders: 2, dumpers: 2, autogenerate: 1, ensure_all_started: 2]
execute_ddl: 3, loaders: 2, dumpers: 2, autogenerate: 1, ensure_all_started: 2,
lock_for_migrations: 4]
end
end

Expand Down Expand Up @@ -596,6 +602,21 @@ defmodule Ecto.Adapters.SQL do
end
end

## Migrations

@doc false
def lock_for_migrations(repo, query, opts, fun) do
{:ok, result} =
transaction(repo, opts ++ [log: false, timeout: :infinity], fn ->
query
|> Map.put(:lock, "FOR UPDATE")
|> repo.all()
|> fun.()
end)

result
end

## Log

defp with_log(repo, params, opts) do
Expand Down
7 changes: 3 additions & 4 deletions lib/ecto/migration/schema_migration.ex
Original file line number Diff line number Diff line change
Expand Up @@ -18,10 +18,9 @@ defmodule Ecto.Migration.SchemaMigration do
create_migrations_table(adapter, repo, prefix)
end

def migrated_versions(repo, prefix) do
from(p in {get_source(repo), __MODULE__}, select: p.version)
def versions(repo, prefix) do
from(p in get_source(repo), select: type(p.version, :integer))
|> Map.put(:prefix, prefix)
|> repo.all(@opts)
end

def up(repo, version, prefix) do
Expand All @@ -31,7 +30,7 @@ defmodule Ecto.Migration.SchemaMigration do
end

def down(repo, version, prefix) do
from(p in {get_source(repo), __MODULE__}, where: p.version == ^version)
from(p in get_source(repo), where: p.version == type(^version, :integer))
|> Map.put(:prefix, prefix)
|> repo.delete_all(@opts)
end
Expand Down

0 comments on commit d28aeba

Please sign in to comment.