Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Improve lru read concurrency #1038

Merged
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
44 changes: 1 addition & 43 deletions lib/archethic/db/embedded_impl/chain_index.ex
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,7 @@ defmodule Archethic.DB.EmbeddedImpl.ChainIndex do
@archethic_db_chain_stats :archethic_db_chain_stats
@archethic_db_last_index :archethic_db_last_index
@archethic_db_type_stats :archethic_db_type_stats
@archetic_db_tx_index_cache Archethic.Db.ChainIndex.LRU
@archetic_db_tx_index_cache :chain_index_cache

require Logger

Expand All @@ -26,11 +26,6 @@ defmodule Archethic.DB.EmbeddedImpl.ChainIndex do
def init(opts) do
db_path = Keyword.fetch!(opts, :path)

## Start the LRU cache for chain indexes. Default max size is 300 Mb
cache_max_size = Application.get_env(:archethic, Archethic.DB.ChainIndex.MaxCacheSize)

LRU.start_link(@archetic_db_tx_index_cache, cache_max_size)

:ets.new(@archethic_db_chain_stats, [:set, :named_table, :public, read_concurrency: true])
:ets.new(@archethic_db_last_index, [:set, :named_table, :public, read_concurrency: true])
:ets.new(@archethic_db_type_stats, [:set, :named_table, :public, read_concurrency: true])
Expand All @@ -40,43 +35,6 @@ defmodule Archethic.DB.EmbeddedImpl.ChainIndex do
{:ok, %{db_path: db_path}}
end

def code_change("1.0.7", state, _extra) do
## We start the unstarted LRU cache
cache_max_size = Application.get_env(:archethic, Archethic.DB.ChainIndex.MaxCacheSize)
LRU.start_link(@archetic_db_tx_index_cache, cache_max_size)

## We migrate the old ETS table to the LRU cache
index = :ets.first(:archethic_db_tx_index)
:ok = migrate_lru_cache(index)
:ets.delete(:archethic_db_tx_index)
{:ok, state}
end

# Stub clause to avoid crashes on unmatched version
def code_change(_, state, _extra) do
{:ok, state}
end

defp migrate_lru_cache(:"$end_of_table") do
:ok
end

defp migrate_lru_cache(index) do
case :ets.lookup(:archethic_db_tx_index, index) do
[{key, %{size: size, offset: offset, genesis_address: genesis_address}}] ->
LRU.put(@archetic_db_tx_index_cache, key, %{
size: size,
offset: offset,
genesis_address: genesis_address
})

_ ->
:ok
end

migrate_lru_cache(:ets.next(:archethic_db_tx_index, index))
end

defp fill_tables(db_path) do
Enum.each(0..255, fn subset ->
subset_summary_filename = index_summary_path(db_path, subset)
Expand Down
20 changes: 18 additions & 2 deletions lib/archethic/db/embedded_impl/supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,8 @@ defmodule Archethic.DB.EmbeddedImpl.Supervisor do
alias Archethic.DB.EmbeddedImpl.P2PView
alias Archethic.DB.EmbeddedImpl.StatsInfo

alias ArchethicCache.LRU

require Logger

def start_link(arg \\ []) do
Expand All @@ -29,14 +31,28 @@ defmodule Archethic.DB.EmbeddedImpl.Supervisor do
initialize_chain_writers(path)

children = [
chain_index_cache(),
{ChainIndex, path: path},
# {ChainWriter, path: path},
{BootstrapInfo, path: path},
{P2PView, path: path},
{StatsInfo, path: path}
]

Supervisor.init(children, strategy: :rest_for_one)
Supervisor.init(children, strategy: :one_for_one)
end

defp chain_index_cache() do
cache_max_size = Application.get_env(:archethic, Archethic.DB.ChainIndex.MaxCacheSize)

%{
id: :chain_index_cache,
start:
{LRU, :start_link,
[
:chain_index_cache,
cache_max_size
]}
}
end

defp initialize_chain_writers(path) do
Expand Down
41 changes: 20 additions & 21 deletions lib/archethic_cache/lru.ex
Original file line number Diff line number Diff line change
Expand Up @@ -25,9 +25,19 @@ defmodule ArchethicCache.LRU do
GenServer.cast(server, {:put, key, value})
end

@spec get(GenServer.server(), term()) :: nil | term()
def get(server, key) do
GenServer.call(server, {:get, key})
@spec get(GenServer.name(), term()) :: nil | term()
def get(server_name, key) do
case :ets.lookup(server_name, key) do
[{^key, {_size, value}}] ->
GenServer.cast(server_name, {:move_front, key})
get_fn = :persistent_term.get(server_name)
get_fn.(key, value)

[] ->
nil
end
rescue
_ -> nil
end

@spec purge(GenServer.server()) :: :ok
Expand All @@ -36,7 +46,9 @@ defmodule ArchethicCache.LRU do
end

def init([name, max_bytes, opts]) do
table = :ets.new(:"aecache_#{name}", [:set, {:read_concurrency, true}])
table = :ets.new(name, [:set, :named_table, {:read_concurrency, true}])

:persistent_term.put(name, Keyword.get(opts, :get_fn, fn _key, value -> value end))

{:ok,
%{
Expand All @@ -45,27 +57,10 @@ defmodule ArchethicCache.LRU do
bytes_used: 0,
keys: [],
put_fn: Keyword.get(opts, :put_fn, fn _key, value -> value end),
get_fn: Keyword.get(opts, :get_fn, fn _key, value -> value end),
evict_fn: Keyword.get(opts, :evict_fn, fn _key, _value -> :ok end)
}}
end

def handle_call({:get, key}, _from, state = %{table: table, keys: keys, get_fn: get_fn}) do
{reply, new_state} =
case :ets.lookup(table, key) do
[{^key, {_size, value}}] ->
{
get_fn.(key, value),
%{state | keys: keys |> move_front(key)}
}

[] ->
{nil, state}
end

{:reply, reply, new_state}
end

def handle_call(:purge, _from, state = %{table: table, evict_fn: evict_fn}) do
# we call the evict_fn to be able to clean effects (ex: file written to disk)
:ets.foldr(
Expand All @@ -81,6 +76,10 @@ defmodule ArchethicCache.LRU do
{:reply, :ok, %{state | keys: [], bytes_used: 0}}
end

def handle_cast({:move_front, key}, state = %{keys: keys}) do
{:noreply, %{state | keys: keys |> move_front(key)}}
end

def handle_cast(
{:put, key, value},
state = %{table: table, bytes_max: bytes_max, put_fn: put_fn, evict_fn: evict_fn}
Expand Down
3 changes: 2 additions & 1 deletion test/archethic/db/embedded_impl/chain_index_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ defmodule Archethic.DB.EmbeddedImpl.ChainIndexTest do

test "should load transactions tables", %{db_path: db_path} do
{:ok, _pid} = ChainIndex.start_link(path: db_path)
LRU.start_link(:chain_index_cache, 30_000_000)
tx_address = <<0::8, 0::8, :crypto.strong_rand_bytes(32)::binary>>
genesis_address = <<0::8, 0::8, :crypto.strong_rand_bytes(32)::binary>>

Expand All @@ -58,7 +59,7 @@ defmodule Archethic.DB.EmbeddedImpl.ChainIndexTest do
assert {^tx_address, _} = ChainIndex.get_last_chain_address(genesis_address, db_path)

# Remove the transaction from the cache and try to fetch from the file instead
LRU.purge(Archethic.Db.ChainIndex.LRU)
LRU.purge(:chain_index_cache)
assert true == ChainIndex.transaction_exists?(tx_address, db_path)
assert false == ChainIndex.transaction_exists?(:crypto.strong_rand_bytes(32), db_path)
end
Expand Down
104 changes: 63 additions & 41 deletions test/archethic_cache/lru_disk_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -11,30 +11,35 @@ defmodule ArchethicCache.LRUDiskTest do

describe "single disk cache" do
test "should return nil when key is not in cache" do
{:ok, pid} = LRUDisk.start_link(:my_cache, 10 * 1024, @cache_dir)
{:ok, _pid} = LRUDisk.start_link(:my_cache, 10 * 1024, @cache_dir)

assert nil == LRUDisk.get(pid, :key1)
assert nil == LRUDisk.get(:my_cache, :key1)
end

test "should cache binaries" do
{:ok, pid} = LRUDisk.start_link(:my_cache, 10 * 1024, @cache_dir)

LRUDisk.put(pid, :key1, "my binary")
LRUDisk.put(pid, :key2, "my binary2")
LRUDisk.put(pid, :key3, "my binary3")
LRUDisk.put(:my_cache, :key1, "my binary")
LRUDisk.put(:my_cache, :key2, "my binary2")
LRUDisk.put(:my_cache, :key3, "my binary3")

# This get_state is used to wait for all messages in the GenServer to be processed
:sys.get_state(pid)

assert "my binary" == LRUDisk.get(pid, :key1)
assert "my binary2" == LRUDisk.get(pid, :key2)
assert "my binary3" == LRUDisk.get(pid, :key3)
assert "my binary" == LRUDisk.get(:my_cache, :key1)
assert "my binary2" == LRUDisk.get(:my_cache, :key2)
assert "my binary3" == LRUDisk.get(:my_cache, :key3)
end

test "should be able to replace binaries" do
{:ok, pid} = LRUDisk.start_link(:my_cache, 10 * 1024, @cache_dir)

LRUDisk.put(pid, :key1, "my binary")
LRUDisk.put(pid, :key1, "my binary2")
LRUDisk.put(:my_cache, :key1, "my binary")
LRUDisk.put(:my_cache, :key1, "my binary2")

assert "my binary2" == LRUDisk.get(pid, :key1)
:sys.get_state(pid)

assert "my binary2" == LRUDisk.get(:my_cache, :key1)
assert 1 == length(File.ls!(cache_dir_for_ls(:my_cache)))
end

Expand All @@ -43,12 +48,14 @@ defmodule ArchethicCache.LRUDiskTest do

{:ok, pid} = LRUDisk.start_link(:my_cache, 500, @cache_dir)

LRUDisk.put(pid, :key1, binary)
LRUDisk.put(pid, :key2, binary)
LRUDisk.put(pid, :key3, get_a_binary_of_bytes(400))
LRUDisk.put(:my_cache, :key1, binary)
LRUDisk.put(:my_cache, :key2, binary)
LRUDisk.put(:my_cache, :key3, get_a_binary_of_bytes(400))

:sys.get_state(pid)

assert nil == LRUDisk.get(pid, :key1)
assert nil == LRUDisk.get(pid, :key2)
assert nil == LRUDisk.get(:my_cache, :key1)
assert nil == LRUDisk.get(:my_cache, :key2)
assert 1 == length(File.ls!(cache_dir_for_ls(:my_cache)))
end

Expand All @@ -57,14 +64,19 @@ defmodule ArchethicCache.LRUDiskTest do

{:ok, pid} = LRUDisk.start_link(:my_cache, 500, @cache_dir)

LRUDisk.put(pid, :key1, binary)
LRUDisk.put(pid, :key2, binary)
LRUDisk.get(pid, :key1)
LRUDisk.put(pid, :key3, binary)
LRUDisk.put(:my_cache, :key1, binary)
LRUDisk.put(:my_cache, :key2, binary)

assert ^binary = LRUDisk.get(pid, :key1)
assert ^binary = LRUDisk.get(pid, :key3)
assert nil == LRUDisk.get(pid, :key2)
:sys.get_state(pid)

LRUDisk.get(:my_cache, :key1)
LRUDisk.put(:my_cache, :key3, binary)

:sys.get_state(pid)

assert ^binary = LRUDisk.get(:my_cache, :key1)
assert ^binary = LRUDisk.get(:my_cache, :key3)
assert nil == LRUDisk.get(:my_cache, :key2)
assert 2 == length(File.ls!(cache_dir_for_ls(:my_cache)))
end

Expand All @@ -73,8 +85,11 @@ defmodule ArchethicCache.LRUDiskTest do

{:ok, pid} = LRUDisk.start_link(:my_cache, 200, @cache_dir)

assert :ok == LRUDisk.put(pid, :key1, binary)
assert nil == LRUDisk.get(pid, :key1)
assert :ok == LRUDisk.put(:my_cache, :key1, binary)

:sys.get_state(pid)

assert nil == LRUDisk.get(:my_cache, :key1)
assert Enum.empty?(File.ls!(cache_dir_for_ls(:my_cache)))
end

Expand All @@ -83,13 +98,16 @@ defmodule ArchethicCache.LRUDiskTest do

server = :my_cache

start_supervised!(%{
id: ArchethicCache.LRUDisk,
start: {ArchethicCache.LRUDisk, :start_link, [server, 500, @cache_dir]}
})
pid_before_crash =
start_supervised!(%{
id: ArchethicCache.LRUDisk,
start: {ArchethicCache.LRUDisk, :start_link, [server, 500, @cache_dir]}
})

LRUDisk.put(server, :key1, binary)

:sys.get_state(pid_before_crash)

assert ^binary = LRUDisk.get(server, :key1)

# example of external intervention
Expand All @@ -98,8 +116,6 @@ defmodule ArchethicCache.LRUDiskTest do
# we loose the cached value
assert nil == LRUDisk.get(server, :key1)

pid_before_crash = Process.whereis(server)

# capture_log is used to hide the LRU process terminating
# because we don't want red in our logs when it's expected
# ps: only use it with async: false
Expand All @@ -119,19 +135,22 @@ defmodule ArchethicCache.LRUDiskTest do

# cache should automatically restart later
LRUDisk.put(server, :key1, binary)

:sys.get_state(pid_after_crash)

assert ^binary = LRUDisk.get(server, :key1)
end

test "should remove when purged" do
binary = get_a_binary_of_bytes(400)

{:ok, pid} = LRUDisk.start_link(:my_cache, 500, @cache_dir)
{:ok, _pid} = LRUDisk.start_link(:my_cache, 500, @cache_dir)

LRUDisk.put(pid, :key1, binary)
LRUDisk.put(pid, :key2, binary)
LRUDisk.purge(pid)
assert nil == LRUDisk.get(pid, :key1)
assert nil == LRUDisk.get(pid, :key2)
LRUDisk.put(:my_cache, :key1, binary)
LRUDisk.put(:my_cache, :key2, binary)
LRUDisk.purge(:my_cache)
assert nil == LRUDisk.get(:my_cache, :key1)
assert nil == LRUDisk.get(:my_cache, :key2)

assert Enum.empty?(File.ls!(cache_dir_for_ls(:my_cache)))
end
Expand All @@ -143,11 +162,14 @@ defmodule ArchethicCache.LRUDiskTest do
assert {:error, _} = LRUDisk.start_link(:my_cache, 10 * 1024, @cache_dir)

{:ok, pid2} = LRUDisk.start_link(:my_cache2, 10 * 1024, @cache_dir)
LRUDisk.put(pid, :key1, "value1a")
LRUDisk.put(pid2, :key1, "value1b")
LRUDisk.put(:my_cache, :key1, "value1a")
LRUDisk.put(:my_cache2, :key1, "value1b")

:sys.get_state(pid)
:sys.get_state(pid2)

assert "value1a" == LRUDisk.get(pid, :key1)
assert "value1b" == LRUDisk.get(pid2, :key1)
assert "value1a" == LRUDisk.get(:my_cache, :key1)
assert "value1b" == LRUDisk.get(:my_cache2, :key1)
assert 1 == length(File.ls!(cache_dir_for_ls(:my_cache)))
assert 1 == length(File.ls!(cache_dir_for_ls(:my_cache2)))
end
Expand Down
Loading