Skip to content

Commit

Permalink
Registry improvements and add the cache adapter to the adapter metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
cabol committed Aug 28, 2022
1 parent b7ea897 commit f72d4d7
Show file tree
Hide file tree
Showing 23 changed files with 160 additions and 84 deletions.
29 changes: 23 additions & 6 deletions lib/nebulex/adapter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -14,11 +14,12 @@ defmodule Nebulex.Adapter do
@typedoc """
The metadata returned by the adapter `c:init/1`.
It must be a map and Nebulex itself will always inject two keys into
the meta:
It must be a map and Nebulex itself will always inject
the following keys into the meta:
* `:cache` - The cache module.
* `:pid` - The PID returned by the child spec returned in `c:init/1`
* `:pid` - The PID returned by the child spec returned in `c:init/1`.
* `:adapter` - The defined cache adapter.
"""
@type adapter_meta :: metadata
Expand All @@ -40,16 +41,32 @@ defmodule Nebulex.Adapter do

## API

# Inline common instructions
@compile {:inline, lookup_meta: 1}

@doc """
Returns the adapter metadata from its `c:init/1` callback.
It expects a process name of the cache. The name is either
an atom or a PID. For a given cache, you often want to call
this function based on the dynamic cache:
Nebulex.Adapter.lookup_meta(cache.get_dynamic_cache())
"""
@spec lookup_meta(atom | pid) :: {:ok, adapter_meta} | {:error, Nebulex.Error.t()}
defdelegate lookup_meta(name_or_pid), to: Nebulex.Cache.Registry, as: :lookup

@doc """
Executes the function `fun` passing as parameters the adapter and metadata
(from the `c:init/1` callback) associated with the given cache `name_or_pid`.
It expects a name or a PID representing the cache.
"""
@spec with_meta(atom | pid, (module, adapter_meta -> term)) :: term | {:error, Nebulex.Error.t()}
@spec with_meta(atom | pid, (adapter_meta -> term)) :: term | {:error, Nebulex.Error.t()}
def with_meta(name_or_pid, fun) do
with {:ok, {adapter, adapter_meta}} <- Nebulex.Cache.Registry.lookup(name_or_pid) do
fun.(adapter, adapter_meta)
with {:ok, adapter_meta} <- lookup_meta(name_or_pid) do
fun.(adapter_meta)
end
end

Expand Down
2 changes: 1 addition & 1 deletion lib/nebulex/adapters/local/generation.ex
Original file line number Diff line number Diff line change
Expand Up @@ -233,7 +233,7 @@ defmodule Nebulex.Adapters.Local.Generation do
end

defp get_meta_tab(server_ref) when is_atom(server_ref) or is_pid(server_ref) do
unwrap_or_raise Adapter.with_meta(server_ref, fn _, %{meta_tab: meta_tab} -> meta_tab end)
unwrap_or_raise Adapter.with_meta(server_ref, & &1.meta_tab)
end

defp get_meta_tab(server_ref), do: server_ref
Expand Down
4 changes: 1 addition & 3 deletions lib/nebulex/adapters/multilevel.ex
Original file line number Diff line number Diff line change
Expand Up @@ -230,9 +230,7 @@ defmodule Nebulex.Adapters.Multilevel do
A convenience function to get the cache model.
"""
def model(name \\ __MODULE__) do
with_meta(name, fn _adapter, %{model: model} ->
model
end)
with_meta(name, & &1.model)
end
end
end
Expand Down
2 changes: 1 addition & 1 deletion lib/nebulex/adapters/partitioned.ex
Original file line number Diff line number Diff line change
Expand Up @@ -349,7 +349,7 @@ defmodule Nebulex.Adapters.Partitioned do
A convenience function to get the node of the given `key`.
"""
def get_node(key) do
with_meta(get_dynamic_cache(), fn _adapter, %{name: name, keyslot: keyslot} ->
with_meta(get_dynamic_cache(), fn %{name: name, keyslot: keyslot} ->
Cluster.get_node(name, key, keyslot)
end)
end
Expand Down
5 changes: 3 additions & 2 deletions lib/nebulex/adapters/replicated.ex
Original file line number Diff line number Diff line change
Expand Up @@ -613,7 +613,8 @@ defmodule Nebulex.Adapters.Replicated do
# The key was not found on remote node, ignore the error
{acc1, [error | acc2]}

{_node, {:error, %Nebulex.Error{reason: {:registry_error, _}}}} = error, {acc1, acc2} ->
{_node, {:error, %Nebulex.Error{reason: {:registry_lookup_error, _}}}} = error,
{acc1, acc2} ->
# The cache was not found in the remote node, maybe it was stopped and
# :pg ("Process Groups") is not updated yet, then ignore the error
{acc1, [error | acc2]}
Expand Down Expand Up @@ -722,7 +723,7 @@ defmodule Nebulex.Adapters.Replicated.Bootstrap do
def handle_info(:timeout, %{name: name, retries: retries} = state)
when retries < @max_retries do
with {:error, _} <-
Adapter.with_meta(name, fn _adapter, adapter_meta ->
Adapter.with_meta(name, fn adapter_meta ->
handle_info(:timeout, adapter_meta)
end) do
{:noreply, %{state | retries: retries + 1}, 1}
Expand Down
4 changes: 3 additions & 1 deletion lib/nebulex/cache.ex
Original file line number Diff line number Diff line change
Expand Up @@ -64,7 +64,8 @@ defmodule Nebulex.Cache do
* `[:nebulex, :cache, :init]` - it is dispatched whenever a cache starts.
The measurement is a single `system_time` entry in native unit. The
metadata is the `:cache` and all initialization options under `:opts`.
metadata is the `:cache`, the `:name`, and all initialization options
under `:opts`.
### Adapter-specific events
Expand Down Expand Up @@ -390,6 +391,7 @@ defmodule Nebulex.Cache do

try do
_ = put_dynamic_cache(name)

fun.()
after
_ = put_dynamic_cache(default_dynamic_cache)
Expand Down
32 changes: 18 additions & 14 deletions lib/nebulex/cache/entry.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@ defmodule Nebulex.Cache.Entry do
Implementation for `c:Nebulex.Cache.fetch/2`.
"""
def fetch(name, key, opts) do
Adapter.with_meta(name, & &1.fetch(&2, key, opts))
Adapter.with_meta(name, & &1.adapter.fetch(&1, key, opts))
end

@doc """
Expand All @@ -26,7 +26,7 @@ defmodule Nebulex.Cache.Entry do
Implementation for `c:Nebulex.Cache.get/3`.
"""
def get(name, key, default, opts) do
Adapter.with_meta(name, &do_get(&1, &2, key, default, opts))
Adapter.with_meta(name, &do_get(&1.adapter, &1, key, default, opts))
end

defp do_get(adapter, adapter_meta, key, default, opts) do
Expand All @@ -52,7 +52,7 @@ defmodule Nebulex.Cache.Entry do
end

def get_all(name, keys, opts) do
Adapter.with_meta(name, & &1.get_all(&2, keys, opts))
Adapter.with_meta(name, & &1.adapter.get_all(&1, keys, opts))
end

@doc """
Expand Down Expand Up @@ -109,7 +109,7 @@ defmodule Nebulex.Cache.Entry do
end

defp do_put(name, key, value, on_write, opts) do
Adapter.with_meta(name, & &1.put(&2, key, value, get_ttl(opts), on_write, opts))
Adapter.with_meta(name, & &1.adapter.put(&1, key, value, get_ttl(opts), on_write, opts))
end

@doc """
Expand All @@ -127,6 +127,7 @@ defmodule Nebulex.Cache.Entry do
"""
def put_all!(name, entries, opts) do
_ = unwrap_or_raise do_put_all(name, entries, :put, opts)

:ok
end

Expand All @@ -153,14 +154,14 @@ defmodule Nebulex.Cache.Entry do
end

def do_put_all(name, entries, on_write, opts) do
Adapter.with_meta(name, & &1.put_all(&2, entries, get_ttl(opts), on_write, opts))
Adapter.with_meta(name, & &1.adapter.put_all(&1, entries, get_ttl(opts), on_write, opts))
end

@doc """
Implementation for `c:Nebulex.Cache.delete/2`.
"""
def delete(name, key, opts) do
Adapter.with_meta(name, & &1.delete(&2, key, opts))
Adapter.with_meta(name, & &1.adapter.delete(&1, key, opts))
end

@doc """
Expand All @@ -174,7 +175,7 @@ defmodule Nebulex.Cache.Entry do
Implementation for `c:Nebulex.Cache.take/2`.
"""
def take(name, key, opts) do
Adapter.with_meta(name, & &1.take(&2, key, opts))
Adapter.with_meta(name, & &1.adapter.take(&1, key, opts))
end

@doc """
Expand All @@ -191,14 +192,14 @@ defmodule Nebulex.Cache.Entry do
Implementation for `c:Nebulex.Cache.exists?/1`.
"""
def exists?(name, key) do
Adapter.with_meta(name, & &1.exists?(&2, key))
Adapter.with_meta(name, & &1.adapter.exists?(&1, key))
end

@doc """
Implementation for `c:Nebulex.Cache.get_and_update/3`.
"""
def get_and_update(name, key, fun, opts) when is_function(fun, 1) do
Adapter.with_meta(name, fn adapter, adapter_meta ->
Adapter.with_meta(name, fn %{adapter: adapter} = adapter_meta ->
with {:ok, current} <- do_get(adapter, adapter_meta, key, nil, opts) do
{:ok, eval_get_and_update_function(current, adapter, adapter_meta, key, opts, fun)}
end
Expand Down Expand Up @@ -239,7 +240,7 @@ defmodule Nebulex.Cache.Entry do
Implementation for `c:Nebulex.Cache.update/4`.
"""
def update(name, key, initial, fun, opts) do
Adapter.with_meta(name, fn adapter, adapter_meta ->
Adapter.with_meta(name, fn %{adapter: adapter} = adapter_meta ->
value =
case adapter.fetch(adapter_meta, key, opts) do
{:ok, value} -> fun.(value)
Expand Down Expand Up @@ -280,7 +281,10 @@ defmodule Nebulex.Cache.Entry do
0
end

Adapter.with_meta(name, & &1.update_counter(&2, key, amount, get_ttl(opts), default, opts))
Adapter.with_meta(
name,
& &1.adapter.update_counter(&1, key, amount, get_ttl(opts), default, opts)
)
end

def incr(_cache, _key, amount, _opts) do
Expand Down Expand Up @@ -318,7 +322,7 @@ defmodule Nebulex.Cache.Entry do
Implementation for `c:Nebulex.Cache.ttl/1`.
"""
def ttl(name, key) do
Adapter.with_meta(name, & &1.ttl(&2, key))
Adapter.with_meta(name, & &1.adapter.ttl(&1, key))
end

@doc """
Expand All @@ -339,7 +343,7 @@ defmodule Nebulex.Cache.Entry do
(Time.timeout?(ttl) && ttl) ||
raise ArgumentError, "expected ttl to be a valid timeout, got: #{inspect(ttl)}"

Adapter.with_meta(name, & &1.expire(&2, key, ttl))
Adapter.with_meta(name, & &1.adapter.expire(&1, key, ttl))
end

@doc """
Expand All @@ -353,7 +357,7 @@ defmodule Nebulex.Cache.Entry do
Implementation for `c:Nebulex.Cache.touch/1`.
"""
def touch(name, key) do
Adapter.with_meta(name, & &1.touch(&2, key))
Adapter.with_meta(name, & &1.adapter.touch(&1, key))
end

@doc """
Expand Down
4 changes: 2 additions & 2 deletions lib/nebulex/cache/persistence.ex
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ defmodule Nebulex.Cache.Persistence do
Implementation for `c:Nebulex.Cache.dump/2`.
"""
def dump(name, path, opts) do
Adapter.with_meta(name, & &1.dump(&2, path, opts))
Adapter.with_meta(name, & &1.adapter.dump(&1, path, opts))
end

@doc """
Expand All @@ -23,7 +23,7 @@ defmodule Nebulex.Cache.Persistence do
Implementation for `c:Nebulex.Cache.load/2`.
"""
def load(name, path, opts) do
Adapter.with_meta(name, & &1.load(&2, path, opts))
Adapter.with_meta(name, & &1.adapter.load(&1, path, opts))
end

@doc """
Expand Down
8 changes: 4 additions & 4 deletions lib/nebulex/cache/queryable.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ defmodule Nebulex.Cache.Queryable do
Implementation for `c:Nebulex.Cache.all/2`.
"""
def all(name, query, opts) do
Adapter.with_meta(name, & &1.execute(&2, :all, query, opts))
Adapter.with_meta(name, & &1.adapter.execute(&1, :all, query, opts))
end

@doc """
Expand All @@ -25,7 +25,7 @@ defmodule Nebulex.Cache.Queryable do
Implementation for `c:Nebulex.Cache.count_all/2`.
"""
def count_all(name, query, opts) do
Adapter.with_meta(name, & &1.execute(&2, :count_all, query, opts))
Adapter.with_meta(name, & &1.adapter.execute(&1, :count_all, query, opts))
end

@doc """
Expand All @@ -39,7 +39,7 @@ defmodule Nebulex.Cache.Queryable do
Implementation for `c:Nebulex.Cache.delete_all/2`.
"""
def delete_all(name, query, opts) do
Adapter.with_meta(name, & &1.execute(&2, :delete_all, query, opts))
Adapter.with_meta(name, & &1.adapter.execute(&1, :delete_all, query, opts))
end

@doc """
Expand All @@ -54,7 +54,7 @@ defmodule Nebulex.Cache.Queryable do
"""
def stream(name, query, opts) do
opts = Keyword.put_new(opts, :page_size, @default_page_size)
Adapter.with_meta(name, & &1.stream(&2, query, opts))
Adapter.with_meta(name, & &1.adapter.stream(&1, query, opts))
end

@doc """
Expand Down
33 changes: 23 additions & 10 deletions lib/nebulex/cache/registry.ex
Original file line number Diff line number Diff line change
Expand Up @@ -12,9 +12,9 @@ defmodule Nebulex.Cache.Registry do
GenServer.start_link(__MODULE__, :ok, name: __MODULE__)
end

@spec register(pid, term) :: :ok
def register(pid, value) when is_pid(pid) do
GenServer.call(__MODULE__, {:register, pid, value})
@spec register(pid, atom, term) :: :ok
def register(pid, name, value) when is_pid(pid) and is_atom(name) do
GenServer.call(__MODULE__, {:register, pid, name, value})
end

@spec lookup(atom | pid) :: {:ok, term} | {:error, Nebulex.Error.t()}
Expand All @@ -24,39 +24,52 @@ defmodule Nebulex.Cache.Registry do
if pid = GenServer.whereis(name) do
lookup(pid)
else
wrap_error Nebulex.Error, reason: {:registry_error, name}
wrap_error Nebulex.Error, reason: {:registry_lookup_error, name}
end
end

def lookup(pid) when is_pid(pid) do
case :persistent_term.get({__MODULE__, pid}, nil) do
{_ref, value} -> {:ok, value}
nil -> wrap_error Nebulex.Error, reason: {:registry_error, pid}
{_ref, _name, value} -> {:ok, value}
nil -> wrap_error Nebulex.Error, reason: {:registry_lookup_error, pid}
end
end

@spec all_running() :: [atom | pid]
def all_running do
for {{__MODULE__, pid}, {_ref, name, _value}} <- :persistent_term.get() do
name || pid
end
end

## GenServer Callbacks

@impl true
def init(:ok) do
{:ok, :ok}
{:ok, nil}
end

@impl true
def handle_call({:register, pid, value}, _from, state) do
def handle_call({:register, pid, name, value}, _from, state) do
# Monitor the process so that when it is down it can be removed
ref = Process.monitor(pid)

:ok = :persistent_term.put({__MODULE__, pid}, {ref, value})
# Store the process data
:ok = :persistent_term.put({__MODULE__, pid}, {ref, name, value})

# Reply with success
{:reply, :ok, state}
end

@impl true
def handle_info({:DOWN, ref, _type, pid, _reason}, state) do
{^ref, _} = :persistent_term.get({__MODULE__, pid})
# Check the process reference
{^ref, _, _} = :persistent_term.get({__MODULE__, pid})

# Remove the process data
_ = :persistent_term.erase({__MODULE__, pid})

# Continue
{:noreply, state}
end
end
4 changes: 2 additions & 2 deletions lib/nebulex/cache/stats.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ defmodule Nebulex.Cache.Stats do
Implementation for `c:Nebulex.Cache.stats/0`.
"""
def stats(name) do
Adapter.with_meta(name, & &1.stats(&2))
Adapter.with_meta(name, & &1.adapter.stats(&1))
end

@doc """
Expand All @@ -26,7 +26,7 @@ defmodule Nebulex.Cache.Stats do
Implementation for `c:Nebulex.Cache.dispatch_stats/1`.
"""
def dispatch_stats(name, opts \\ []) do
Adapter.with_meta(name, fn adapter, meta ->
Adapter.with_meta(name, fn %{adapter: adapter} = meta ->
with true <- is_list(meta.telemetry_prefix),
{:ok, %Nebulex.Stats{} = info} <- adapter.stats(meta) do
:telemetry.execute(
Expand Down
Loading

0 comments on commit f72d4d7

Please sign in to comment.