Skip to content

Commit

Permalink
Clean summary cache on node up and summary
Browse files Browse the repository at this point in the history
  • Loading branch information
Neylix authored and samuelmanzanera committed Jul 10, 2023
1 parent 767a4c3 commit 4163152
Show file tree
Hide file tree
Showing 4 changed files with 101 additions and 35 deletions.
9 changes: 3 additions & 6 deletions lib/archethic/beacon_chain/subset.ex
Original file line number Diff line number Diff line change
Expand Up @@ -375,10 +375,7 @@ defmodule Archethic.BeaconChain.Subset do
Task.Supervisor.async_nolink(TaskSupervisor, fn -> get_network_patches(subset, time) end)

summary =
%Summary{
subset: subset,
summary_time: Utils.truncate_datetime(time, second?: true, microsecond?: true)
}
%Summary{subset: subset, summary_time: time}
|> Summary.aggregate_slots(
beacon_slots,
P2PSampling.list_nodes_to_sample(subset)
Expand All @@ -405,9 +402,9 @@ defmodule Archethic.BeaconChain.Subset do
[]
end

BeaconChain.write_beacon_summary(%{summary | network_patches: network_patches})
:ok = BeaconChain.write_beacon_summary(%{summary | network_patches: network_patches})

:ok
SummaryCache.clean_previous_summary_slots(subset, time)
end
end

Expand Down
58 changes: 39 additions & 19 deletions lib/archethic/beacon_chain/subset/summary_cache.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ defmodule Archethic.BeaconChain.Subset.SummaryCache do

alias Archethic.BeaconChain
alias Archethic.BeaconChain.Slot
alias Archethic.BeaconChain.SlotTimer
alias Archethic.BeaconChain.SummaryTimer
alias Archethic.Crypto

Expand Down Expand Up @@ -34,33 +33,44 @@ defmodule Archethic.BeaconChain.Subset.SummaryCache do
:ok = recover_slots(SummaryTimer.next_summary(DateTime.utc_now()))

PubSub.register_to_current_epoch_of_slot_time()
PubSub.register_to_node_status()

{:ok, %{}}
end

def code_change("1.2.3", state, _extra) do
PubSub.register_to_node_status()
{:ok, state}
end

def code_change(_version, state, _extra), do: {:ok, state}

def handle_info({:current_epoch_of_slot_timer, slot_time}, state) do
# Check if the slot in the first one of the summary interval
previous_summary_time = SummaryTimer.previous_summary(slot_time)
first_slot_time = SlotTimer.next_slot(previous_summary_time)

if slot_time == first_slot_time do
Enum.each(
BeaconChain.list_subsets(),
&clean_previous_summary_cache(&1, previous_summary_time)
)

next_summary_backup_path = SummaryTimer.next_summary(slot_time) |> recover_path()

Utils.mut_dir("slot_backup*")
|> Path.wildcard()
|> Enum.reject(&(&1 == next_summary_backup_path))
|> Enum.each(&File.rm/1)
end
if SummaryTimer.match_interval?(slot_time), do: delete_old_backup_file(slot_time)

{:noreply, state}
end

defp clean_previous_summary_cache(subset, previous_summary_time) do
def handle_info(:node_up, state) do
previous_summary_time = SummaryTimer.previous_summary(DateTime.utc_now())
delete_old_backup_file(previous_summary_time)

BeaconChain.list_subsets()
|> Enum.each(&clean_previous_summary_slots(&1, previous_summary_time))

{:noreply, state}
end

def handle_info(:node_down, state), do: {:noreply, state}

@doc """
Remove slots of previous summary time from ets table
"""
@spec clean_previous_summary_slots(
subset :: binary(),
previous_summary_time :: DateTime.t()
) :: :ok
def clean_previous_summary_slots(subset, previous_summary_time) do
subset
|> stream_current_slots()
|> Stream.filter(fn {%Slot{slot_time: slot_time}, _} ->
Expand Down Expand Up @@ -106,6 +116,16 @@ defmodule Archethic.BeaconChain.Subset.SummaryCache do
backup_slot(slot, node_public_key)
end

defp delete_old_backup_file(previous_summary_time) do
# We keep 2 backup, the current one and the last one
previous_backup_path = recover_path(previous_summary_time)

Utils.mut_dir("slot_backup*")
|> Path.wildcard()
|> Enum.filter(&(&1 < previous_backup_path))
|> Enum.each(&File.rm/1)
end

defp recover_path(summary_time = %DateTime{}),
do: Utils.mut_dir("slot_backup-#{DateTime.to_unix(summary_time)}")

Expand Down
61 changes: 53 additions & 8 deletions test/archethic/beacon_chain/subset/summary_cache_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,6 @@ defmodule Archethic.BeaconChain.Subset.SummaryCacheTest do
alias Archethic.BeaconChain.Subset.SummaryCache

alias Archethic.BeaconChain.Slot
alias Archethic.BeaconChain.SlotTimer
alias Archethic.BeaconChain.Slot.EndOfNodeSync
alias Archethic.BeaconChain.SummaryTimer

Expand All @@ -15,14 +14,59 @@ defmodule Archethic.BeaconChain.Subset.SummaryCacheTest do

alias Archethic.TransactionChain.TransactionSummary

test "should clean the previous summary slots after the new epoch event" do
{:ok, _pid} = SummaryTimer.start_link([interval: "0 0 * * * *"], [])
{:ok, _pid} = SlotTimer.start_link([interval: "10 * * * *"], [])
import Mock

test "should clean the previous backup on summary time" do
{:ok, _pid} = SummaryTimer.start_link([interval: "0 * * * * *"], [])
{:ok, pid} = SummaryCache.start_link()
File.mkdir_p!(Utils.mut_dir())

subset = <<0>>

slot_pre_summary = %Slot{
slot_time: ~U[2023-01-01 07:59:50Z],
subset: subset
}

slot_pre_summary2 = %Slot{
slot_time: ~U[2023-01-01 08:00:00Z],
subset: subset
}

slot_post_summary = %Slot{
slot_time: ~U[2023-01-01 08:00:20Z],
subset: subset
}

summary_time = ~U[2023-01-01 08:01:00Z]

SummaryCache.add_slot(subset, slot_pre_summary, "node_key")
SummaryCache.add_slot(subset, slot_pre_summary2, "node_key")
SummaryCache.add_slot(subset, slot_post_summary, "node_key")

send(pid, {:current_epoch_of_slot_timer, summary_time})
Process.sleep(100)

previous_summary_time = SummaryTimer.previous_summary(summary_time)
recover_path = Utils.mut_dir("slot_backup-#{DateTime.to_unix(previous_summary_time)}")
refute File.exists?(recover_path)
end

test_with_mock "should clean the previous backup and ets table on node up",
DateTime,
[:passthrough],
utc_now: fn -> ~U[2023-01-01 08:00:50Z] end do
{:ok, _pid} = SummaryTimer.start_link([interval: "0 * * * * *"], [])
{:ok, pid} = SummaryCache.start_link()
File.mkdir_p!(Utils.mut_dir())

subset = <<0>>

slot_in_old_backup = %Slot{
slot_time: ~U[2023-01-01 07:58:50Z],
subset: subset
}

slot_pre_summary = %Slot{
slot_time: ~U[2023-01-01 07:59:50Z],
subset: subset
Expand All @@ -38,23 +82,24 @@ defmodule Archethic.BeaconChain.Subset.SummaryCacheTest do
subset: subset
}

previous_summary_time = SummaryTimer.previous_summary(~U[2023-01-01 08:01:00Z])
summary_time = ~U[2023-01-01 08:00:00Z]

first_slot_time = SlotTimer.next_slot(previous_summary_time)
SummaryCache.add_slot(subset, slot_in_old_backup, "node_key")
SummaryCache.add_slot(subset, slot_pre_summary, "node_key")
SummaryCache.add_slot(subset, slot_pre_summary2, "node_key")
SummaryCache.add_slot(subset, slot_post_summary, "node_key")

send(pid, {:current_epoch_of_slot_timer, first_slot_time})
send(pid, :node_up)
Process.sleep(100)

assert [{^slot_post_summary, "node_key"}] =
subset
|> SummaryCache.stream_current_slots()
|> Enum.to_list()

previous_summary_time = SummaryTimer.previous_summary(summary_time)
recover_path = Utils.mut_dir("slot_backup-#{DateTime.to_unix(previous_summary_time)}")
assert !File.exists?(recover_path)
refute File.exists?(recover_path)
end

test "summary cache should backup a slot, recover it on restart" do
Expand Down
8 changes: 6 additions & 2 deletions test/archethic/beacon_chain/subset_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ defmodule Archethic.BeaconChain.SubsetTest do
alias Archethic.TransactionChain.TransactionSummary

import Mox
import Mock

setup do
P2P.add_and_connect_node(%Node{
Expand Down Expand Up @@ -450,8 +451,11 @@ defmodule Archethic.BeaconChain.SubsetTest do
DateTime.utc_now()
|> DateTime.truncate(:millisecond)

send(pid, {:create_slot, now})
assert_receive :beacon_transaction_summary_stored, 2_000
with_mock SummaryCache, [:passthrough], clean_previous_summary_slots: fn _, _ -> :ok end do
send(pid, {:create_slot, now})
assert_receive :beacon_transaction_summary_stored, 2_000
assert_called(SummaryCache.clean_previous_summary_slots(:_, :_))
end
end
end

Expand Down

0 comments on commit 4163152

Please sign in to comment.