diff --git a/lib/nebulex/adapter/transaction.ex b/lib/nebulex/adapter/transaction.ex index bce800a4..b25ea2aa 100644 --- a/lib/nebulex/adapter/transaction.ex +++ b/lib/nebulex/adapter/transaction.ex @@ -86,7 +86,7 @@ defmodule Nebulex.Adapter.Transaction do import Nebulex.Helpers @impl true - def transaction(%{pid: pid} = adapter_meta, opts, fun) do + def transaction(%{cache: cache, pid: pid} = adapter_meta, opts, fun) do adapter_meta |> do_in_transaction?() |> do_transaction( @@ -95,7 +95,6 @@ defmodule Nebulex.Adapter.Transaction do Keyword.get(opts, :keys, []), Keyword.get(opts, :nodes, [node()]), Keyword.get(opts, :retries, :infinity), - adapter_meta[:name] || adapter_meta[:cache], fun ) end @@ -110,15 +109,15 @@ defmodule Nebulex.Adapter.Transaction do ## Helpers defp do_in_transaction?(%{pid: pid}) do - not is_nil(Process.get({pid, self()})) + !!Process.get({pid, self()}) end - defp do_transaction(true, _pid, _keys, _nodes, _retries, _cache, fun) do + defp do_transaction(true, _pid, _name, _keys, _nodes, _retries, fun) do {:ok, fun.()} end - defp do_transaction(false, pid, keys, nodes, retries, cache, fun) do - ids = lock_ids(pid, keys) + defp do_transaction(false, pid, name, keys, nodes, retries, fun) do + ids = lock_ids(name, keys) case set_locks(ids, nodes, retries) do true -> @@ -133,7 +132,7 @@ defmodule Nebulex.Adapter.Transaction do end false -> - wrap_error Nebulex.Error, reason: {:transaction_aborted, cache, nodes} + wrap_error Nebulex.Error, reason: {:transaction_aborted, name, nodes} end end diff --git a/lib/nebulex/adapters/replicated.ex b/lib/nebulex/adapters/replicated.ex index 26d40f5c..3bef10d1 100644 --- a/lib/nebulex/adapters/replicated.ex +++ b/lib/nebulex/adapters/replicated.ex @@ -525,20 +525,31 @@ defmodule Nebulex.Adapters.Replicated do other end - defp with_transaction( - %{pid: pid, name: name} = adapter_meta, - action, - keys, - args, - opts \\ [] - ) do - nodes = Cluster.get_nodes(name) - - # Ensure it waits until ongoing delete_all or sync operations finish, - # if there's any. - :global.trans( - {name, pid}, - fn -> + defp with_transaction(adapter_meta, action, keys, args, opts \\ []) do + do_with_transaction(adapter_meta, action, keys, args, opts, 1) + end + + defp do_with_transaction(%{name: name} = adapter_meta, action, keys, args, opts, times) do + # This is a bit hacky because the `:global_locks` table managed by + # `:global` is being accessed directly breaking the encapsulation. + # So far, this has been the simplest and fastest way to validate if + # the global sync lock `:"$sync_lock"` is set, so we block write-like + # operations until it finishes. The other option would be trying to + # lock the same key `:"$sync_lock"`, and then when the lock is acquired, + # delete it before processing the write operation. But this means another + # global lock across the cluster everytime there is a write. So for the + # time being, we just read the global table to validate it which is much + # faster; since it is a local read with the global ETS, there is no global + # locks across the cluster. + case :ets.lookup(:global_locks, :"$sync_lock") do + [_] -> + :ok = random_sleep(times) + + do_with_transaction(adapter_meta, action, keys, args, opts, times + 1) + + [] -> + nodes = Cluster.get_nodes(name) + # Write-like operation must be wrapped within a transaction # to ensure proper replication with {:ok, res} <- @@ -547,9 +558,7 @@ defmodule Nebulex.Adapters.Replicated do end) do res end - end, - nodes - ) + end end defp multicall(%{name: name} = meta, action, args, opts) do @@ -721,9 +730,12 @@ defmodule Nebulex.Adapters.Replicated.Bootstrap do end @impl true - def terminate(_reason, state) do + def terminate(_reason, %{name: name}) do + # Delete global lock set when the server started + :ok = unlock(name) + # Ensure leaving the cluster when the cache stops - :ok = Cluster.leave(state.name) + :ok = Cluster.leave(name) end ## Helpers diff --git a/test/nebulex/adapters/partitioned_test.exs b/test/nebulex/adapters/partitioned_test.exs index ab91f6b9..577b9ca1 100644 --- a/test/nebulex/adapters/partitioned_test.exs +++ b/test/nebulex/adapters/partitioned_test.exs @@ -33,7 +33,9 @@ defmodule Nebulex.Adapters.PartitionedTest do on_exit(fn -> _ = Partitioned.put_dynamic_cache(default_dynamic_cache) + :ok = Process.sleep(100) + stop_caches(node_pid_list) end) diff --git a/test/nebulex/adapters/replicated_test.exs b/test/nebulex/adapters/replicated_test.exs index 0d9cf2ac..bf1eb5af 100644 --- a/test/nebulex/adapters/replicated_test.exs +++ b/test/nebulex/adapters/replicated_test.exs @@ -18,7 +18,9 @@ defmodule Nebulex.Adapters.ReplicatedTest do on_exit(fn -> _ = Replicated.put_dynamic_cache(default_dynamic_cache) + :ok = Process.sleep(100) + stop_caches(node_pid_list) end)