diff --git a/lib/archethic/application.ex b/lib/archethic/application.ex index 68c70d9e2..3cbd19a9a 100644 --- a/lib/archethic/application.ex +++ b/lib/archethic/application.ex @@ -108,7 +108,7 @@ defmodule Archethic.Application do end def start_phase(:migrate, :normal, _options) do - Application.spec(:archethic, :vsn) |> Migrate.run() + Application.spec(:archethic, :vsn) |> Migrate.run(false) end defp try_open_port(nil), do: :ok diff --git a/lib/archethic/beacon_chain/slot.ex b/lib/archethic/beacon_chain/slot.ex index c05191234..4b33280a9 100644 --- a/lib/archethic/beacon_chain/slot.ex +++ b/lib/archethic/beacon_chain/slot.ex @@ -647,7 +647,8 @@ defmodule Archethic.BeaconChain.Slot do fn attestation = %ReplicationAttestation{transaction_summary: summary} -> new_summary = TransactionSummary.transform("1.0.8", summary) %ReplicationAttestation{attestation | transaction_summary: new_summary} - end + end, + max_concurrency: System.schedulers_online() * 10 ) |> Stream.filter(&match?({:ok, _}, &1)) |> Enum.map(fn {:ok, attestation} -> attestation end) diff --git a/lib/mix/tasks/migrate.ex b/lib/mix/tasks/migrate.ex index 841ff427a..d2b4a6da4 100644 --- a/lib/mix/tasks/migrate.ex +++ b/lib/mix/tasks/migrate.ex @@ -4,15 +4,33 @@ defmodule Mix.Tasks.Archethic.Migrate do alias Archethic.DB.EmbeddedImpl alias Archethic.DB.EmbeddedImpl.ChainWriter + alias Archethic.TaskSupervisor + require Logger - @env Mix.env() + @env if Mix.env() == :test, do: "test", else: "prod" @doc """ Run migration available migration scripts since last updated version """ - # Called by migrate.sh scripts - def run(new_version) do + @spec run(binary(), boolean()) :: :ok + def run(new_version, async? \\ false) + + def run(new_version, true) do + Task.Supervisor.start_child( + TaskSupervisor, + fn -> do_run(new_version) end, + shutdown: :brutal_kill + ) + + :ok + end + + def run(new_version, false) do + do_run(new_version) + end + + defp do_run(new_version) do Logger.info("Start of migration task for version #{new_version}") migration_file_path = EmbeddedImpl.db_path() |> ChainWriter.migration_file_path() diff --git a/lib/release/call_migrate_script.ex b/lib/release/call_migrate_script.ex index 7bc029493..228e51403 100644 --- a/lib/release/call_migrate_script.ex +++ b/lib/release/call_migrate_script.ex @@ -13,7 +13,7 @@ defmodule Archethic.Release.CallMigrateScript do def down(_, _, _, instructions, _), do: instructions defp add_migrate_script_call(new_version, instructions) do - call_instruction = {:apply, {Migrate, :run, [new_version]}} + call_instruction = {:apply, {Migrate, :run, [new_version, true]}} instructions ++ [call_instruction] end diff --git a/priv/migration_tasks/prod/1.0.8-rc1@migrate_old_tx_summaries.exs b/priv/migration_tasks/prod/1.0.8-rc1@migrate_old_tx_summaries.exs index f2288ec5a..e5e1f3ecb 100644 --- a/priv/migration_tasks/prod/1.0.8-rc1@migrate_old_tx_summaries.exs +++ b/priv/migration_tasks/prod/1.0.8-rc1@migrate_old_tx_summaries.exs @@ -22,23 +22,37 @@ defmodule Migration_1_0_8 do defp upgrade_summary_aggregates() do db_path = EmbeddedImpl.db_path() aggregate_dir = ChainWriter.base_beacon_aggregate_path(db_path) + paths = Path.wildcard("#{aggregate_dir}/*") + # For each aggregate of the dir - # (Should we run them async ?) - Path.wildcard("#{aggregate_dir}/*") - |> Enum.each(fn aggregate_path -> - File.read!(aggregate_path) - |> deserialize_aggregate() - |> elem(0) - |> migrate_aggregate - |> EmbeddedImpl.write_beacon_summaries_aggregate() - end) + Task.Supervisor.async_stream( + TaskSupervisor, + paths, + fn aggregate_path -> + File.read!(aggregate_path) + |> deserialize_aggregate() + |> elem(0) + |> migrate_aggregate + |> EmbeddedImpl.write_beacon_summaries_aggregate() + end, + shutdown: :brutal_kill, + timeout: :infinity, + ordered: false + ) + |> Stream.run() end defp migrate_aggregate(aggregate = %SummaryAggregate{replication_attestations: tx_summaries}) do replication_attestations = - Task.Supervisor.async_stream(TaskSupervisor, tx_summaries, fn tx_summary -> - create_attestation(tx_summary) - end) + Task.Supervisor.async_stream( + TaskSupervisor, + tx_summaries, + fn tx_summary -> + create_attestation(tx_summary) + end, + shutdown: :brutal_kill, + max_concurency: System.schedulers_online() * 10 + ) |> Enum.map(fn {:ok, replication_attestation} -> replication_attestation end) %SummaryAggregate{aggregate | replication_attestations: replication_attestations} @@ -47,19 +61,25 @@ defmodule Migration_1_0_8 do defp upgrade_beacon_summaries() do db_path = EmbeddedImpl.db_path() summaries_dir = ChainWriter.base_beacon_path(db_path) + paths = Path.wildcard("#{summaries_dir}/*") # For each summary of the dir - # (Should we run them async ?) - Path.wildcard("#{summaries_dir}/*") - |> Enum.each(fn summary_path -> - new_summary = File.read!(summary_path) - |> deserialize_summary() - |> elem(0) - |> migrate_summary() - - File.rm(summary_path) - EmbeddedImpl.write_beacon_summary(new_summary) - end) + Task.Supervisor.async_stream( + TaskSupervisor, + paths, + fn summary_path -> + new_summary = File.read!(summary_path) + |> deserialize_summary() + |> elem(0) + |> migrate_summary() + + File.rm(summary_path) + EmbeddedImpl.write_beacon_summary(new_summary) + end, + shutdown: :brutal_kill, + timeout: :infinity, + ordered: false + ) end defp migrate_summary(summary = %Summary{transaction_attestations: attestations}) do @@ -71,7 +91,9 @@ defmodule Migration_1_0_8 do transaction_summary: tx_summary } -> create_attestation(tx_summary) - end + end, + shutdown: :brutal_kill, + max_concurency: System.schedulers_online() * 10 ) |> Enum.map(fn {:ok, replication_attestation} -> replication_attestation end)