Skip to content

Commit

Permalink
Better cleaning of the previous summary slots
Browse files Browse the repository at this point in the history
  • Loading branch information
samuelmanzanera committed Mar 22, 2023
1 parent da77d0e commit 9c8fb4a
Show file tree
Hide file tree
Showing 2 changed files with 77 additions and 58 deletions.
86 changes: 35 additions & 51 deletions lib/archethic/beacon_chain/subset/summary_cache.ex
Original file line number Diff line number Diff line change
Expand Up @@ -31,46 +31,44 @@ defmodule Archethic.BeaconChain.Subset.SummaryCache do
read_concurrency: true
])

:ok = recover_slots()
:ok = recover_slots(SummaryTimer.next_summary(DateTime.utc_now()))

PubSub.register_to_current_epoch_of_slot_time()

{:ok, %{}}
end

def handle_info({:current_epoch_of_slot_timer, slot_time}, state) do
Enum.each(BeaconChain.list_subsets(), &ensure_clean_cache(&1, slot_time))
{:noreply, state}
end

defp ensure_clean_cache(subset, slot_time) do
# Check if the slot in the first one of the summary interval
previous_summary_time = SummaryTimer.previous_summary(slot_time)
previous_slot_time = SlotTimer.previous_slot(slot_time)
first_slot_time? = previous_slot_time == previous_summary_time

if first_slot_time? do
# Check if there are slots for the next summary
contains_next_summary_slots? =
subset
|> stream_current_slots()
|> Enum.any?(fn
{%Slot{slot_time: slot_time}, _} ->
DateTime.diff(slot_time, previous_summary_time) >= 0

%Slot{slot_time: slot_time} ->
DateTime.diff(slot_time, previous_summary_time) >= 0
end)

# Clean the slots if there is previous cache information
unless contains_next_summary_slots?, do: clean_slots(subset)
first_slot_time = SlotTimer.next_slot(previous_summary_time)

if slot_time == first_slot_time do
Enum.each(
BeaconChain.list_subsets(),
&clean_previous_summary_cache(&1, previous_summary_time)
)

File.rm(recover_path(previous_summary_time))

This comment has been minimized.

Copy link
@Neylix

Neylix Mar 23, 2023

Member

After deleting, you may recreate the file with the loaded slot if there is already one for the current slot

end

{:noreply, state}
end

defp clean_slots(subset) do
:ets.delete(@table_name, subset)
File.rm(recover_path())
:ok
defp clean_previous_summary_cache(subset, previous_summary_time) do
subset
|> stream_current_slots()
|> Stream.filter(fn
{%Slot{slot_time: slot_time}, _} ->
DateTime.compare(slot_time, previous_summary_time) == :lt

%Slot{slot_time: slot_time} ->
DateTime.compare(slot_time, previous_summary_time) == :lt
end)
|> Stream.map(fn item ->
:ets.delete_object(@table_name, {subset, item})
end)
|> Stream.run()
end

@doc """
Expand Down Expand Up @@ -98,23 +96,6 @@ defmodule Archethic.BeaconChain.Subset.SummaryCache do
{slot, :ets.select(continuation)}
end

@doc """
Extract all the entries in the cache
"""
@spec pop_slots(subset :: binary()) :: list({Slot.t(), Crypto.key()})
def pop_slots(subset) do
recover_path() |> File.rm()

:ets.take(@table_name, subset)
|> Enum.map(fn
{_, {slot = %Slot{}, _}} ->
slot

{_, slot = %Slot{}} ->
slot
end)
end

@doc """
Add new beacon slots to the summary's cache
"""
Expand All @@ -124,20 +105,23 @@ defmodule Archethic.BeaconChain.Subset.SummaryCache do
backup_slot(slot, node_public_key)
end

defp recover_path(), do: Utils.mut_dir("slot_backup")
defp recover_path(summary_time = %DateTime{}),
do: Utils.mut_dir("slot_backup-#{DateTime.to_unix(summary_time)}")

defp backup_slot(slot, node_public_key) do
defp backup_slot(slot = %Slot{slot_time: slot_time}, node_public_key) do
content = serialize(slot, node_public_key)
next_summary_time = SummaryTimer.next_summary(slot_time)

recover_path()
next_summary_time
|> recover_path()
|> File.write!(content, [:append, :binary])
end

defp recover_slots() do
if File.exists?(recover_path()) do
defp recover_slots(summary_time) do
if File.exists?(recover_path(summary_time)) do
next_summary_time = DateTime.utc_now() |> SummaryTimer.next_summary() |> DateTime.to_unix()

content = File.read!(recover_path())
content = File.read!(recover_path(summary_time))

deserialize(content, [])
|> Enum.each(fn
Expand Down
49 changes: 42 additions & 7 deletions test/archethic/beacon_chain/subset/summary_cache_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,8 @@ defmodule Archethic.BeaconChain.Subset.SummaryCacheTest do
alias Archethic.BeaconChain.Subset.SummaryCache

alias Archethic.BeaconChain.Slot
alias Archethic.BeaconChain.SlotTimer
alias Archethic.BeaconChain.Slot.EndOfNodeSync

alias Archethic.BeaconChain.SummaryTimer

alias Archethic.Crypto
Expand All @@ -15,11 +15,48 @@ defmodule Archethic.BeaconChain.Subset.SummaryCacheTest do

alias Archethic.TransactionChain.TransactionSummary

test "summary cache should backup a slot, recover it on restart and delete backup on pop_slots" do
test "should clean the previous summary slots after the new epoch event" do
{:ok, _pid} = SummaryTimer.start_link([interval: "0 0 * * * *"], [])
{:ok, _pid} = SlotTimer.start_link([interval: "10 * * * *"], [])
{:ok, pid} = SummaryCache.start_link()
File.mkdir_p!(Utils.mut_dir())

subset = <<0>>

slot_pre_summary = %Slot{
slot_time: ~U[2023-01-01 07:59:50Z],
subset: subset
}

slot_post_summary = %Slot{
slot_time: ~U[2023-01-01 08:00:20Z],
subset: subset
}

previous_summary_time = SummaryTimer.previous_summary(~U[2023-01-01 08:01:00Z])

first_slot_time = SlotTimer.next_slot(previous_summary_time)
SummaryCache.add_slot(subset, slot_pre_summary, "node_key")
SummaryCache.add_slot(subset, slot_post_summary, "node_key")

send(pid, {:current_epoch_of_slot_timer, first_slot_time})
Process.sleep(100)

assert [{^slot_post_summary, "node_key"}] =
subset
|> SummaryCache.stream_current_slots()
|> Enum.to_list()

recover_path = Utils.mut_dir("slot_backup-#{DateTime.to_unix(previous_summary_time)}")
assert !File.exists?(recover_path)
end

test "summary cache should backup a slot, recover it on restart" do
{:ok, _pid} = SummaryTimer.start_link([interval: "0 0 * * * * *"], [])
File.mkdir_p!(Utils.mut_dir())

path = Utils.mut_dir("slot_backup")
next_summary_time = SummaryTimer.next_summary(DateTime.utc_now())
path = Utils.mut_dir("slot_backup-#{DateTime.to_unix(next_summary_time)}")

{:ok, pid} = SummaryCache.start_link()

Expand Down Expand Up @@ -74,9 +111,7 @@ defmodule Archethic.BeaconChain.Subset.SummaryCacheTest do

{:ok, _} = SummaryCache.start_link()

slots = SummaryCache.pop_slots(<<0>>)
assert !File.exists?(path)

assert [^slot] = slots
slots = SummaryCache.stream_current_slots(<<0>>) |> Enum.to_list()
assert [{^slot, ^node_key}] = slots
end
end

0 comments on commit 9c8fb4a

Please sign in to comment.