Skip to content

Commit

Permalink
chore: encode event logs within a single migration (#1457)
Browse files Browse the repository at this point in the history
  • Loading branch information
jyeshe committed Jul 11, 2023
1 parent a257442 commit 3f53f9a
Showing 1 changed file with 49 additions and 12 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -47,13 +47,36 @@ defmodule AeMdw.Migrations.EventLogsHash do
{:ok, db_last_txi} ->
num_ranges = div(db_last_txi, @range_size) + 1

only_encoding? =
match?(
{:ok, {_call_txi, _log_idx, _create_txi}},
State.next(state, Model.ContractLog, nil)
)

mutations_tasks =
if only_encoding?, do: &reencoding_logs_mutations/4, else: &hash_logs_mutations/4

count =
0..(num_ranges - 1)
|> Enum.map(fn i ->
first_txi = i * @range_size
last_txi = min((i + 1) * @range_size, db_last_txi)

range_mutations = logs_mutations(state, first_txi, last_txi)
num_tasks = System.schedulers_online()
amount_per_task = trunc(:math.ceil((last_txi - first_txi) / num_tasks))

log("num_tasks: #{num_tasks}, amount_per_task: #{amount_per_task}")
log("first_txi: #{first_txi}, last_txi: #{last_txi}")

:erlang.garbage_collect()

range_mutations =
state
|> mutations_tasks.(first_txi, num_tasks, amount_per_task)
|> Task.await_many(60_000 * 20)
|> List.flatten()
|> Enum.uniq()

count = length(range_mutations)
log("commiting #{count} mutations...")

Expand All @@ -75,14 +98,7 @@ defmodule AeMdw.Migrations.EventLogsHash do
end
end

defp logs_mutations(state, first_txi, last_txi) do
:erlang.garbage_collect()
num_tasks = System.schedulers_online()
amount_per_task = trunc(:math.ceil((last_txi - first_txi) / num_tasks))

log("num_tasks: #{num_tasks}, amount_per_task: #{amount_per_task}")
log("first_txi: #{first_txi}, last_txi: #{last_txi}")

defp hash_logs_mutations(state, first_txi, num_tasks, amount_per_task) do
Enum.map(0..(num_tasks - 1), fn i ->
task_first_txi = first_txi + i * amount_per_task
task_last_txi = first_txi + (i + 1) * amount_per_task
Expand Down Expand Up @@ -120,9 +136,30 @@ defmodule AeMdw.Migrations.EventLogsHash do
end)
end)
end)
|> Task.await_many(60_000 * 20)
|> List.flatten()
|> Enum.uniq()
end

defp reencoding_logs_mutations(state, first_txi, num_tasks, amount_per_task) do
Enum.map(0..(num_tasks - 1), fn i ->
task_first_txi = first_txi + i * amount_per_task
task_last_txi = first_txi + (i + 1) * amount_per_task
cursor = {task_first_txi, 0, 0}
boundary = {cursor, {task_last_txi, nil, nil}}

Task.async(fn ->
state
|> Collection.stream(Model.IdxContractLog, :forward, boundary, cursor)
|> Enum.flat_map(fn {call_txi, log_idx, create_txi} ->
log_key = {create_txi, call_txi, log_idx}
# rewrite with new encoding
try do
[WriteMutation.new(Model.ContractLog, fetch_old!(log_key))]
rescue
_value ->
[]
end
end)
end)
end)
end

@spec log(binary()) :: :ok
Expand Down

0 comments on commit 3f53f9a

Please sign in to comment.