diff --git a/lib/archethic/beacon_chain/subset/summary_cache.ex b/lib/archethic/beacon_chain/subset/summary_cache.ex index 9e1e49a83..98c580550 100644 --- a/lib/archethic/beacon_chain/subset/summary_cache.ex +++ b/lib/archethic/beacon_chain/subset/summary_cache.ex @@ -31,7 +31,7 @@ defmodule Archethic.BeaconChain.Subset.SummaryCache do read_concurrency: true ]) - :ok = recover_slots() + :ok = recover_slots(SummaryTimer.next_summary(DateTime.utc_now())) PubSub.register_to_current_epoch_of_slot_time() @@ -39,38 +39,36 @@ defmodule Archethic.BeaconChain.Subset.SummaryCache do end def handle_info({:current_epoch_of_slot_timer, slot_time}, state) do - Enum.each(BeaconChain.list_subsets(), &ensure_clean_cache(&1, slot_time)) - {:noreply, state} - end - - defp ensure_clean_cache(subset, slot_time) do # Check if the slot in the first one of the summary interval previous_summary_time = SummaryTimer.previous_summary(slot_time) - previous_slot_time = SlotTimer.previous_slot(slot_time) - first_slot_time? = previous_slot_time == previous_summary_time - - if first_slot_time? do - # Check if there are slots for the next summary - contains_next_summary_slots? = - subset - |> stream_current_slots() - |> Enum.any?(fn - {%Slot{slot_time: slot_time}, _} -> - DateTime.diff(slot_time, previous_summary_time) >= 0 - - %Slot{slot_time: slot_time} -> - DateTime.diff(slot_time, previous_summary_time) >= 0 - end) - - # Clean the slots if there is previous cache information - unless contains_next_summary_slots?, do: clean_slots(subset) + first_slot_time = SlotTimer.next_slot(previous_summary_time) + + if slot_time == first_slot_time do + Enum.each( + BeaconChain.list_subsets(), + &clean_previous_summary_cache(&1, previous_summary_time) + ) + + File.rm(recover_path(previous_summary_time)) end + + {:noreply, state} end - defp clean_slots(subset) do - :ets.delete(@table_name, subset) - File.rm(recover_path()) - :ok + defp clean_previous_summary_cache(subset, previous_summary_time) do + subset + |> stream_current_slots() + |> Stream.filter(fn + {%Slot{slot_time: slot_time}, _} -> + DateTime.compare(slot_time, previous_summary_time) == :lt + + %Slot{slot_time: slot_time} -> + DateTime.compare(slot_time, previous_summary_time) == :lt + end) + |> Stream.map(fn item -> + :ets.delete_object(@table_name, {subset, item}) + end) + |> Stream.run() end @doc """ @@ -98,23 +96,6 @@ defmodule Archethic.BeaconChain.Subset.SummaryCache do {slot, :ets.select(continuation)} end - @doc """ - Extract all the entries in the cache - """ - @spec pop_slots(subset :: binary()) :: list({Slot.t(), Crypto.key()}) - def pop_slots(subset) do - recover_path() |> File.rm() - - :ets.take(@table_name, subset) - |> Enum.map(fn - {_, {slot = %Slot{}, _}} -> - slot - - {_, slot = %Slot{}} -> - slot - end) - end - @doc """ Add new beacon slots to the summary's cache """ @@ -124,20 +105,23 @@ defmodule Archethic.BeaconChain.Subset.SummaryCache do backup_slot(slot, node_public_key) end - defp recover_path(), do: Utils.mut_dir("slot_backup") + defp recover_path(summary_time = %DateTime{}), + do: Utils.mut_dir("slot_backup-#{DateTime.to_unix(summary_time)}") - defp backup_slot(slot, node_public_key) do + defp backup_slot(slot = %Slot{slot_time: slot_time}, node_public_key) do content = serialize(slot, node_public_key) + next_summary_time = SummaryTimer.next_summary(slot_time) - recover_path() + next_summary_time + |> recover_path() |> File.write!(content, [:append, :binary]) end - defp recover_slots() do - if File.exists?(recover_path()) do + defp recover_slots(summary_time) do + if File.exists?(recover_path(summary_time)) do next_summary_time = DateTime.utc_now() |> SummaryTimer.next_summary() |> DateTime.to_unix() - content = File.read!(recover_path()) + content = File.read!(recover_path(summary_time)) deserialize(content, []) |> Enum.each(fn diff --git a/test/archethic/beacon_chain/subset/summary_cache_test.exs b/test/archethic/beacon_chain/subset/summary_cache_test.exs index 032448241..e0f84264a 100644 --- a/test/archethic/beacon_chain/subset/summary_cache_test.exs +++ b/test/archethic/beacon_chain/subset/summary_cache_test.exs @@ -5,8 +5,8 @@ defmodule Archethic.BeaconChain.Subset.SummaryCacheTest do alias Archethic.BeaconChain.Subset.SummaryCache alias Archethic.BeaconChain.Slot + alias Archethic.BeaconChain.SlotTimer alias Archethic.BeaconChain.Slot.EndOfNodeSync - alias Archethic.BeaconChain.SummaryTimer alias Archethic.Crypto @@ -15,11 +15,48 @@ defmodule Archethic.BeaconChain.Subset.SummaryCacheTest do alias Archethic.TransactionChain.TransactionSummary - test "summary cache should backup a slot, recover it on restart and delete backup on pop_slots" do + test "should clean the previous summary slots after the new epoch event" do + {:ok, _pid} = SummaryTimer.start_link([interval: "0 0 * * * *"], []) + {:ok, _pid} = SlotTimer.start_link([interval: "10 * * * *"], []) + {:ok, pid} = SummaryCache.start_link() + File.mkdir_p!(Utils.mut_dir()) + + subset = <<0>> + + slot_pre_summary = %Slot{ + slot_time: ~U[2023-01-01 07:59:50Z], + subset: subset + } + + slot_post_summary = %Slot{ + slot_time: ~U[2023-01-01 08:00:20Z], + subset: subset + } + + previous_summary_time = SummaryTimer.previous_summary(~U[2023-01-01 08:01:00Z]) + + first_slot_time = SlotTimer.next_slot(previous_summary_time) + SummaryCache.add_slot(subset, slot_pre_summary, "node_key") + SummaryCache.add_slot(subset, slot_post_summary, "node_key") + + send(pid, {:current_epoch_of_slot_timer, first_slot_time}) + Process.sleep(100) + + assert [{^slot_post_summary, "node_key"}] = + subset + |> SummaryCache.stream_current_slots() + |> Enum.to_list() + + recover_path = Utils.mut_dir("slot_backup-#{DateTime.to_unix(previous_summary_time)}") + assert !File.exists?(recover_path) + end + + test "summary cache should backup a slot, recover it on restart" do {:ok, _pid} = SummaryTimer.start_link([interval: "0 0 * * * * *"], []) File.mkdir_p!(Utils.mut_dir()) - path = Utils.mut_dir("slot_backup") + next_summary_time = SummaryTimer.next_summary(DateTime.utc_now()) + path = Utils.mut_dir("slot_backup-#{DateTime.to_unix(next_summary_time)}") {:ok, pid} = SummaryCache.start_link() @@ -74,9 +111,7 @@ defmodule Archethic.BeaconChain.Subset.SummaryCacheTest do {:ok, _} = SummaryCache.start_link() - slots = SummaryCache.pop_slots(<<0>>) - assert !File.exists?(path) - - assert [^slot] = slots + slots = SummaryCache.stream_current_slots(<<0>>) |> Enum.to_list() + assert [{^slot, ^node_key}] = slots end end