Skip to content

Commit

Permalink
Fix global and/or cluster transactions issue
Browse files Browse the repository at this point in the history
  • Loading branch information
cabol committed Mar 28, 2022
1 parent 78e8832 commit 284c1b3
Show file tree
Hide file tree
Showing 4 changed files with 41 additions and 26 deletions.
13 changes: 6 additions & 7 deletions lib/nebulex/adapter/transaction.ex
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ defmodule Nebulex.Adapter.Transaction do
import Nebulex.Helpers

@impl true
def transaction(%{pid: pid} = adapter_meta, opts, fun) do
def transaction(%{cache: cache, pid: pid} = adapter_meta, opts, fun) do
adapter_meta
|> do_in_transaction?()
|> do_transaction(
Expand All @@ -95,7 +95,6 @@ defmodule Nebulex.Adapter.Transaction do
Keyword.get(opts, :keys, []),
Keyword.get(opts, :nodes, [node()]),
Keyword.get(opts, :retries, :infinity),
adapter_meta[:name] || adapter_meta[:cache],
fun
)
end
Expand All @@ -110,15 +109,15 @@ defmodule Nebulex.Adapter.Transaction do
## Helpers

defp do_in_transaction?(%{pid: pid}) do
not is_nil(Process.get({pid, self()}))
!!Process.get({pid, self()})
end

defp do_transaction(true, _pid, _keys, _nodes, _retries, _cache, fun) do
defp do_transaction(true, _pid, _name, _keys, _nodes, _retries, fun) do
{:ok, fun.()}
end

defp do_transaction(false, pid, keys, nodes, retries, cache, fun) do
ids = lock_ids(pid, keys)
defp do_transaction(false, pid, name, keys, nodes, retries, fun) do
ids = lock_ids(name, keys)

case set_locks(ids, nodes, retries) do
true ->
Expand All @@ -133,7 +132,7 @@ defmodule Nebulex.Adapter.Transaction do
end

false ->
wrap_error Nebulex.Error, reason: {:transaction_aborted, cache, nodes}
wrap_error Nebulex.Error, reason: {:transaction_aborted, name, nodes}
end
end

Expand Down
50 changes: 31 additions & 19 deletions lib/nebulex/adapters/replicated.ex
Original file line number Diff line number Diff line change
Expand Up @@ -525,20 +525,31 @@ defmodule Nebulex.Adapters.Replicated do
other
end

defp with_transaction(
%{pid: pid, name: name} = adapter_meta,
action,
keys,
args,
opts \\ []
) do
nodes = Cluster.get_nodes(name)

# Ensure it waits until ongoing delete_all or sync operations finish,
# if there's any.
:global.trans(
{name, pid},
fn ->
defp with_transaction(adapter_meta, action, keys, args, opts \\ []) do
do_with_transaction(adapter_meta, action, keys, args, opts, 1)
end

defp do_with_transaction(%{name: name} = adapter_meta, action, keys, args, opts, times) do
# This is a bit hacky because the `:global_locks` table managed by
# `:global` is being accessed directly breaking the encapsulation.
# So far, this has been the simplest and fastest way to validate if
# the global sync lock `:"$sync_lock"` is set, so we block write-like
# operations until it finishes. The other option would be trying to
# lock the same key `:"$sync_lock"`, and then when the lock is acquired,
# delete it before processing the write operation. But this means another
# global lock across the cluster everytime there is a write. So for the
# time being, we just read the global table to validate it which is much
# faster; since it is a local read with the global ETS, there is no global
# locks across the cluster.
case :ets.lookup(:global_locks, :"$sync_lock") do
[_] ->
:ok = random_sleep(times)

do_with_transaction(adapter_meta, action, keys, args, opts, times + 1)

[] ->
nodes = Cluster.get_nodes(name)

# Write-like operation must be wrapped within a transaction
# to ensure proper replication
with {:ok, res} <-
Expand All @@ -547,9 +558,7 @@ defmodule Nebulex.Adapters.Replicated do
end) do
res
end
end,
nodes
)
end
end

defp multicall(%{name: name} = meta, action, args, opts) do
Expand Down Expand Up @@ -721,9 +730,12 @@ defmodule Nebulex.Adapters.Replicated.Bootstrap do
end

@impl true
def terminate(_reason, state) do
def terminate(_reason, %{name: name}) do
# Delete global lock set when the server started
:ok = unlock(name)

# Ensure leaving the cluster when the cache stops
:ok = Cluster.leave(state.name)
:ok = Cluster.leave(name)
end

## Helpers
Expand Down
2 changes: 2 additions & 0 deletions test/nebulex/adapters/partitioned_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,9 @@ defmodule Nebulex.Adapters.PartitionedTest do

on_exit(fn ->
_ = Partitioned.put_dynamic_cache(default_dynamic_cache)

:ok = Process.sleep(100)

stop_caches(node_pid_list)
end)

Expand Down
2 changes: 2 additions & 0 deletions test/nebulex/adapters/replicated_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,9 @@ defmodule Nebulex.Adapters.ReplicatedTest do

on_exit(fn ->
_ = Replicated.put_dynamic_cache(default_dynamic_cache)

:ok = Process.sleep(100)

stop_caches(node_pid_list)
end)

Expand Down

0 comments on commit 284c1b3

Please sign in to comment.