Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
The task has a new strategy and much improved tests to verify that it actually works. The previous implementation may never have worked: #61.
- Loading branch information
1 parent
80a3713
commit 7d92a4f
Showing
19 changed files
with
474 additions
and
194 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,77 @@ | ||
defmodule Cloak.Migrator do | ||
@moduledoc false | ||
|
||
import Ecto.Query | ||
|
||
alias Ecto.Changeset | ||
|
||
def migrate(repo, schema) when is_atom(repo) and is_atom(schema) do | ||
validate(repo, schema) | ||
|
||
min_id = repo.aggregate(schema, :min, :id) | ||
max_id = repo.aggregate(schema, :max, :id) | ||
fields = cloak_fields(schema) | ||
|
||
min_id..max_id | ||
|> Flow.from_enumerable(stages: System.schedulers_online()) | ||
|> Flow.map(&migrate_row(&1, repo, schema, fields)) | ||
|> Flow.run() | ||
end | ||
|
||
defp migrate_row(id, repo, schema, fields) do | ||
repo.transaction(fn -> | ||
query = | ||
schema | ||
|> where(id: ^id) | ||
|> lock("FOR UPDATE") | ||
|
||
case repo.one(query) do | ||
nil -> | ||
:noop | ||
|
||
row -> | ||
row | ||
|> force_changes(fields) | ||
|> repo.update() | ||
end | ||
end) | ||
end | ||
|
||
defp force_changes(row, fields) do | ||
Enum.reduce(fields, Changeset.change(row), fn field, changeset -> | ||
Changeset.force_change(changeset, field, Map.get(row, field)) | ||
end) | ||
end | ||
|
||
defp cloak_fields(schema) do | ||
:fields | ||
|> schema.__schema__() | ||
|> Enum.map(fn field -> | ||
{field, schema.__schema__(:type, field)} | ||
end) | ||
|> Enum.filter(fn {_field, type} -> | ||
Code.ensure_loaded?(type) && function_exported?(type, :__cloak__, 0) | ||
end) | ||
|> Enum.map(fn {field, _type} -> | ||
field | ||
end) | ||
end | ||
|
||
defp validate(repo, schema) do | ||
unless ecto_repo?(repo) do | ||
raise ArgumentError, "#{inspect(repo)} is not an Ecto.Repo" | ||
end | ||
|
||
unless ecto_schema?(schema) do | ||
raise ArgumentError, "#{inspect(schema)} is not an Ecto.Schema" | ||
end | ||
end | ||
|
||
defp ecto_repo?(repo) do | ||
Code.ensure_loaded?(repo) && function_exported?(repo, :__adapter__, 0) | ||
end | ||
|
||
defp ecto_schema?(schema) do | ||
Code.ensure_loaded?(schema) && function_exported?(schema, :__schema__, 1) | ||
end | ||
end |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,77 @@ | ||
defmodule Mix.Cloak do | ||
@moduledoc false | ||
# Helpers for building Mix tasks for Cloak | ||
|
||
# %{ app => %{repo: repo, schemas: schemas}} | ||
def parse_config(args) do | ||
{opts, _, _} = OptionParser.parse(args, aliases: [s: :schema, r: :repo]) | ||
|
||
opts | ||
|> Enum.into(%{}) | ||
|> do_parse_config() | ||
end | ||
|
||
defp do_parse_config(%{repo: repo, schema: schema}) do | ||
%{current_app() => %{repo: to_module(repo), schemas: [to_module(schema)]}} | ||
end | ||
|
||
defp do_parse_config(_argv) do | ||
get_apps() | ||
|> Enum.map(&get_app_config/1) | ||
|> Enum.into(%{}) | ||
|> validate_config!() | ||
end | ||
|
||
defp get_apps do | ||
apps = Mix.Project.apps_paths() | ||
|
||
if apps do | ||
Map.keys(apps) | ||
else | ||
[current_app()] | ||
end | ||
end | ||
|
||
defp get_app_config(app) do | ||
{app, | ||
%{ | ||
repo: Application.get_env(app, :cloak_repo), | ||
schemas: Application.get_env(app, :cloak_schemas) | ||
}} | ||
end | ||
|
||
defp current_app do | ||
Mix.Project.config()[:app] | ||
end | ||
|
||
defp validate_config!(config) do | ||
invalid_configs = Enum.filter(config, &(!valid?(&1))) | ||
|
||
unless length(invalid_configs) == 0 do | ||
apps = Keyword.keys(invalid_configs) | ||
|
||
raise Mix.Error, """ | ||
warning: no configured Ecto repos or schemas found in any of the apps: #{inspect(apps)} | ||
You can avoid this by passing the -r and -s flags or by setting the repo and schemas | ||
in your config/config.exs: | ||
config #{inspect(hd(apps))}, | ||
cloak_repo: ..., | ||
cloak_schemas: [...] | ||
""" | ||
end | ||
|
||
config | ||
end | ||
|
||
defp valid?({_app, %{repo: repo, schemas: [schema | _]}}) | ||
when is_atom(repo) and is_atom(schema), | ||
do: true | ||
|
||
defp valid?(_config), do: false | ||
|
||
defp to_module(name) do | ||
String.to_existing_atom("Elixir." <> name) | ||
end | ||
end |
Oops, something went wrong.