From 05a77ad0784cbbd15085a56eee1f4edcd08919ea Mon Sep 17 00:00:00 2001 From: Neylix Date: Mon, 3 Jul 2023 18:16:33 +0200 Subject: [PATCH] Clean summary cache on node up and summary --- lib/archethic/beacon_chain/subset.ex | 9 +-- .../beacon_chain/subset/summary_cache.ex | 58 ++++++++++++------ .../subset/summary_cache_test.exs | 61 ++++++++++++++++--- test/archethic/beacon_chain/subset_test.exs | 8 ++- 4 files changed, 101 insertions(+), 35 deletions(-) diff --git a/lib/archethic/beacon_chain/subset.ex b/lib/archethic/beacon_chain/subset.ex index 75a95ac40..00de5d948 100644 --- a/lib/archethic/beacon_chain/subset.ex +++ b/lib/archethic/beacon_chain/subset.ex @@ -381,10 +381,7 @@ defmodule Archethic.BeaconChain.Subset do Task.Supervisor.async_nolink(TaskSupervisor, fn -> get_network_patches(subset, time) end) summary = - %Summary{ - subset: subset, - summary_time: Utils.truncate_datetime(time, second?: true, microsecond?: true) - } + %Summary{subset: subset, summary_time: time} |> Summary.aggregate_slots( beacon_slots, P2PSampling.list_nodes_to_sample(subset) @@ -411,9 +408,9 @@ defmodule Archethic.BeaconChain.Subset do [] end - BeaconChain.write_beacon_summary(%{summary | network_patches: network_patches}) + :ok = BeaconChain.write_beacon_summary(%{summary | network_patches: network_patches}) - :ok + SummaryCache.clean_previous_summary_slots(subset, time) end end diff --git a/lib/archethic/beacon_chain/subset/summary_cache.ex b/lib/archethic/beacon_chain/subset/summary_cache.ex index e1a1fbdc3..3edaafeeb 100644 --- a/lib/archethic/beacon_chain/subset/summary_cache.ex +++ b/lib/archethic/beacon_chain/subset/summary_cache.ex @@ -5,7 +5,6 @@ defmodule Archethic.BeaconChain.Subset.SummaryCache do alias Archethic.BeaconChain alias Archethic.BeaconChain.Slot - alias Archethic.BeaconChain.SlotTimer alias Archethic.BeaconChain.SummaryTimer alias Archethic.Crypto @@ -34,33 +33,44 @@ defmodule Archethic.BeaconChain.Subset.SummaryCache do :ok = recover_slots(SummaryTimer.next_summary(DateTime.utc_now())) PubSub.register_to_current_epoch_of_slot_time() + PubSub.register_to_node_status() {:ok, %{}} end + def code_change("1.2.3", state, _extra) do + PubSub.register_to_node_status() + {:ok, state} + end + + def code_change(_version, state, _extra), do: {:ok, state} + def handle_info({:current_epoch_of_slot_timer, slot_time}, state) do - # Check if the slot in the first one of the summary interval - previous_summary_time = SummaryTimer.previous_summary(slot_time) - first_slot_time = SlotTimer.next_slot(previous_summary_time) - - if slot_time == first_slot_time do - Enum.each( - BeaconChain.list_subsets(), - &clean_previous_summary_cache(&1, previous_summary_time) - ) - - next_summary_backup_path = SummaryTimer.next_summary(slot_time) |> recover_path() - - Utils.mut_dir("slot_backup*") - |> Path.wildcard() - |> Enum.reject(&(&1 == next_summary_backup_path)) - |> Enum.each(&File.rm/1) - end + if SummaryTimer.match_interval?(slot_time), do: delete_old_backup_file(slot_time) {:noreply, state} end - defp clean_previous_summary_cache(subset, previous_summary_time) do + def handle_info(:node_up, state) do + previous_summary_time = SummaryTimer.previous_summary(DateTime.utc_now()) + delete_old_backup_file(previous_summary_time) + + BeaconChain.list_subsets() + |> Enum.each(&clean_previous_summary_slots(&1, previous_summary_time)) + + {:noreply, state} + end + + def handle_info(:node_down, state), do: {:noreply, state} + + @doc """ + Remove slots of previous summary time from ets table + """ + @spec clean_previous_summary_slots( + subset :: binary(), + previous_summary_time :: DateTime.t() + ) :: :ok + def clean_previous_summary_slots(subset, previous_summary_time) do subset |> stream_current_slots() |> Stream.filter(fn @@ -110,6 +120,16 @@ defmodule Archethic.BeaconChain.Subset.SummaryCache do backup_slot(slot, node_public_key) end + defp delete_old_backup_file(previous_summary_time) do + # We keep 2 backup, the current one and the last one + previous_backup_path = recover_path(previous_summary_time) + + Utils.mut_dir("slot_backup*") + |> Path.wildcard() + |> Enum.filter(&(&1 < previous_backup_path)) + |> Enum.each(&File.rm/1) + end + defp recover_path(summary_time = %DateTime{}), do: Utils.mut_dir("slot_backup-#{DateTime.to_unix(summary_time)}") diff --git a/test/archethic/beacon_chain/subset/summary_cache_test.exs b/test/archethic/beacon_chain/subset/summary_cache_test.exs index d575cdb51..0de0e35bb 100644 --- a/test/archethic/beacon_chain/subset/summary_cache_test.exs +++ b/test/archethic/beacon_chain/subset/summary_cache_test.exs @@ -5,7 +5,6 @@ defmodule Archethic.BeaconChain.Subset.SummaryCacheTest do alias Archethic.BeaconChain.Subset.SummaryCache alias Archethic.BeaconChain.Slot - alias Archethic.BeaconChain.SlotTimer alias Archethic.BeaconChain.Slot.EndOfNodeSync alias Archethic.BeaconChain.SummaryTimer @@ -15,14 +14,59 @@ defmodule Archethic.BeaconChain.Subset.SummaryCacheTest do alias Archethic.TransactionChain.TransactionSummary - test "should clean the previous summary slots after the new epoch event" do - {:ok, _pid} = SummaryTimer.start_link([interval: "0 0 * * * *"], []) - {:ok, _pid} = SlotTimer.start_link([interval: "10 * * * *"], []) + import Mock + + test "should clean the previous backup on summary time" do + {:ok, _pid} = SummaryTimer.start_link([interval: "0 * * * * *"], []) + {:ok, pid} = SummaryCache.start_link() + File.mkdir_p!(Utils.mut_dir()) + + subset = <<0>> + + slot_pre_summary = %Slot{ + slot_time: ~U[2023-01-01 07:59:50Z], + subset: subset + } + + slot_pre_summary2 = %Slot{ + slot_time: ~U[2023-01-01 08:00:00Z], + subset: subset + } + + slot_post_summary = %Slot{ + slot_time: ~U[2023-01-01 08:00:20Z], + subset: subset + } + + summary_time = ~U[2023-01-01 08:01:00Z] + + SummaryCache.add_slot(subset, slot_pre_summary, "node_key") + SummaryCache.add_slot(subset, slot_pre_summary2, "node_key") + SummaryCache.add_slot(subset, slot_post_summary, "node_key") + + send(pid, {:current_epoch_of_slot_timer, summary_time}) + Process.sleep(100) + + previous_summary_time = SummaryTimer.previous_summary(summary_time) + recover_path = Utils.mut_dir("slot_backup-#{DateTime.to_unix(previous_summary_time)}") + refute File.exists?(recover_path) + end + + test_with_mock "should clean the previous backup and ets table on node up", + DateTime, + [:passthrough], + utc_now: fn -> ~U[2023-01-01 08:00:50Z] end do + {:ok, _pid} = SummaryTimer.start_link([interval: "0 * * * * *"], []) {:ok, pid} = SummaryCache.start_link() File.mkdir_p!(Utils.mut_dir()) subset = <<0>> + slot_in_old_backup = %Slot{ + slot_time: ~U[2023-01-01 07:58:50Z], + subset: subset + } + slot_pre_summary = %Slot{ slot_time: ~U[2023-01-01 07:59:50Z], subset: subset @@ -38,14 +82,14 @@ defmodule Archethic.BeaconChain.Subset.SummaryCacheTest do subset: subset } - previous_summary_time = SummaryTimer.previous_summary(~U[2023-01-01 08:01:00Z]) + summary_time = ~U[2023-01-01 08:00:00Z] - first_slot_time = SlotTimer.next_slot(previous_summary_time) + SummaryCache.add_slot(subset, slot_in_old_backup, "node_key") SummaryCache.add_slot(subset, slot_pre_summary, "node_key") SummaryCache.add_slot(subset, slot_pre_summary2, "node_key") SummaryCache.add_slot(subset, slot_post_summary, "node_key") - send(pid, {:current_epoch_of_slot_timer, first_slot_time}) + send(pid, :node_up) Process.sleep(100) assert [{^slot_post_summary, "node_key"}] = @@ -53,8 +97,9 @@ defmodule Archethic.BeaconChain.Subset.SummaryCacheTest do |> SummaryCache.stream_current_slots() |> Enum.to_list() + previous_summary_time = SummaryTimer.previous_summary(summary_time) recover_path = Utils.mut_dir("slot_backup-#{DateTime.to_unix(previous_summary_time)}") - assert !File.exists?(recover_path) + refute File.exists?(recover_path) end test "summary cache should backup a slot, recover it on restart" do diff --git a/test/archethic/beacon_chain/subset_test.exs b/test/archethic/beacon_chain/subset_test.exs index ec4309bf6..99d76b114 100644 --- a/test/archethic/beacon_chain/subset_test.exs +++ b/test/archethic/beacon_chain/subset_test.exs @@ -31,6 +31,7 @@ defmodule Archethic.BeaconChain.SubsetTest do alias Archethic.TransactionChain.TransactionSummary import Mox + import Mock setup do P2P.add_and_connect_node(%Node{ @@ -450,8 +451,11 @@ defmodule Archethic.BeaconChain.SubsetTest do DateTime.utc_now() |> DateTime.truncate(:millisecond) - send(pid, {:create_slot, now}) - assert_receive :beacon_transaction_summary_stored, 2_000 + with_mock SummaryCache, [:passthrough], clean_previous_summary_slots: fn _, _ -> :ok end do + send(pid, {:create_slot, now}) + assert_receive :beacon_transaction_summary_stored, 2_000 + assert_called(SummaryCache.clean_previous_summary_slots(:_, :_)) + end end end