Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve migration script #1003

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion lib/archethic/application.ex
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,7 @@ defmodule Archethic.Application do
end

def start_phase(:migrate, :normal, _options) do
Application.spec(:archethic, :vsn) |> Migrate.run()
Application.spec(:archethic, :vsn) |> Migrate.run(false)
end

defp try_open_port(nil), do: :ok
Expand Down
3 changes: 2 additions & 1 deletion lib/archethic/beacon_chain/slot.ex
Original file line number Diff line number Diff line change
Expand Up @@ -647,7 +647,8 @@ defmodule Archethic.BeaconChain.Slot do
fn attestation = %ReplicationAttestation{transaction_summary: summary} ->
new_summary = TransactionSummary.transform("1.0.8", summary)
%ReplicationAttestation{attestation | transaction_summary: new_summary}
end
end,
max_concurrency: System.schedulers_online() * 10
)
|> Stream.filter(&match?({:ok, _}, &1))
|> Enum.map(fn {:ok, attestation} -> attestation end)
Expand Down
24 changes: 21 additions & 3 deletions lib/mix/tasks/migrate.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,15 +4,33 @@ defmodule Mix.Tasks.Archethic.Migrate do
alias Archethic.DB.EmbeddedImpl
alias Archethic.DB.EmbeddedImpl.ChainWriter

alias Archethic.TaskSupervisor

require Logger

@env Mix.env()
@env if Mix.env() == :test, do: "test", else: "prod"

@doc """
Run migration available migration scripts since last updated version
"""
# Called by migrate.sh scripts
def run(new_version) do
@spec run(binary(), boolean()) :: :ok
def run(new_version, async? \\ false)

def run(new_version, true) do
Task.Supervisor.start_child(
TaskSupervisor,
fn -> do_run(new_version) end,
shutdown: :brutal_kill
)

:ok
end

def run(new_version, false) do
do_run(new_version)
end

defp do_run(new_version) do
Logger.info("Start of migration task for version #{new_version}")
migration_file_path = EmbeddedImpl.db_path() |> ChainWriter.migration_file_path()

Expand Down
2 changes: 1 addition & 1 deletion lib/release/call_migrate_script.ex
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ defmodule Archethic.Release.CallMigrateScript do
def down(_, _, _, instructions, _), do: instructions

defp add_migrate_script_call(new_version, instructions) do
call_instruction = {:apply, {Migrate, :run, [new_version]}}
call_instruction = {:apply, {Migrate, :run, [new_version, true]}}

instructions ++ [call_instruction]
end
Expand Down
70 changes: 46 additions & 24 deletions priv/migration_tasks/prod/1.0.8-rc1@migrate_old_tx_summaries.exs
Original file line number Diff line number Diff line change
Expand Up @@ -22,23 +22,37 @@ defmodule Migration_1_0_8 do
defp upgrade_summary_aggregates() do
db_path = EmbeddedImpl.db_path()
aggregate_dir = ChainWriter.base_beacon_aggregate_path(db_path)
paths = Path.wildcard("#{aggregate_dir}/*")

# For each aggregate of the dir
# (Should we run them async ?)
Path.wildcard("#{aggregate_dir}/*")
|> Enum.each(fn aggregate_path ->
File.read!(aggregate_path)
|> deserialize_aggregate()
|> elem(0)
|> migrate_aggregate
|> EmbeddedImpl.write_beacon_summaries_aggregate()
end)
Task.Supervisor.async_stream(
TaskSupervisor,
paths,
fn aggregate_path ->
File.read!(aggregate_path)
|> deserialize_aggregate()
|> elem(0)
|> migrate_aggregate
|> EmbeddedImpl.write_beacon_summaries_aggregate()
end,
shutdown: :brutal_kill,
timeout: :infinity,
ordered: false
)
|> Stream.run()
end

defp migrate_aggregate(aggregate = %SummaryAggregate{replication_attestations: tx_summaries}) do
replication_attestations =
Task.Supervisor.async_stream(TaskSupervisor, tx_summaries, fn tx_summary ->
create_attestation(tx_summary)
end)
Task.Supervisor.async_stream(
TaskSupervisor,
tx_summaries,
fn tx_summary ->
create_attestation(tx_summary)
end,
shutdown: :brutal_kill,
max_concurency: System.schedulers_online() * 10
)
|> Enum.map(fn {:ok, replication_attestation} -> replication_attestation end)

%SummaryAggregate{aggregate | replication_attestations: replication_attestations}
Expand All @@ -47,19 +61,25 @@ defmodule Migration_1_0_8 do
defp upgrade_beacon_summaries() do
db_path = EmbeddedImpl.db_path()
summaries_dir = ChainWriter.base_beacon_path(db_path)
paths = Path.wildcard("#{summaries_dir}/*")

# For each summary of the dir
# (Should we run them async ?)
Path.wildcard("#{summaries_dir}/*")
|> Enum.each(fn summary_path ->
new_summary = File.read!(summary_path)
|> deserialize_summary()
|> elem(0)
|> migrate_summary()

File.rm(summary_path)
EmbeddedImpl.write_beacon_summary(new_summary)
end)
Task.Supervisor.async_stream(
TaskSupervisor,
paths,
fn summary_path ->
new_summary = File.read!(summary_path)
|> deserialize_summary()
|> elem(0)
|> migrate_summary()

File.rm(summary_path)
EmbeddedImpl.write_beacon_summary(new_summary)
end,
shutdown: :brutal_kill,
timeout: :infinity,
ordered: false
)
end

defp migrate_summary(summary = %Summary{transaction_attestations: attestations}) do
Expand All @@ -71,7 +91,9 @@ defmodule Migration_1_0_8 do
transaction_summary: tx_summary
} ->
create_attestation(tx_summary)
end
end,
shutdown: :brutal_kill,
max_concurency: System.schedulers_online() * 10
)
|> Enum.map(fn {:ok, replication_attestation} -> replication_attestation end)

Expand Down
Loading