Skip to content

Commit

Permalink
[#60] Implement adapter for replicated topology
Browse files Browse the repository at this point in the history
  • Loading branch information
cabol committed Nov 19, 2019
1 parent b1202f0 commit f0a359d
Show file tree
Hide file tree
Showing 15 changed files with 615 additions and 101 deletions.
6 changes: 5 additions & 1 deletion .credo.exs
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,11 @@

## Refactoring Opportunities
{Credo.Check.Refactor.LongQuoteBlocks, false},
{Credo.Check.Refactor.CyclomaticComplexity, max_complexity: 15}
{Credo.Check.Refactor.CyclomaticComplexity, max_complexity: 15},

## TODO and FIXME do not cause the build to fail
{Credo.Check.Design.TagTODO, exit_status: 0},
{Credo.Check.Design.TagFIXME, exit_status: 0}
]
}
]
Expand Down
16 changes: 12 additions & 4 deletions lib/nebulex/adapter/transaction.ex
Original file line number Diff line number Diff line change
Expand Up @@ -55,9 +55,9 @@ defmodule Nebulex.Adapter.Transaction do

@doc false
def transaction(cache, fun, opts) do
keys = opts[:keys]
nodes = opts[:nodes] || [node() | Node.list()]
retries = opts[:retries] || :infinity
keys = Keyword.get(opts, :keys, [])
nodes = Keyword.get(opts, :nodes, get_nodes(cache))
retries = Keyword.get(opts, :retries, :infinity)

do_transaction(cache, keys, nodes, retries, fun)
end
Expand Down Expand Up @@ -125,13 +125,21 @@ defmodule Nebulex.Adapter.Transaction do
end)
end

defp lock_ids(cache, nil) do
defp lock_ids(cache, []) do
[{cache, self()}]
end

defp lock_ids(cache, keys) do
for key <- keys, do: {{cache, key}, self()}
end

defp get_nodes(cache) do
if function_exported?(cache, :__nodes__, 0) do
cache.__nodes__
else
[node() | Node.list()]
end
end
end
end

Expand Down
78 changes: 42 additions & 36 deletions lib/nebulex/adapters/partitioned.ex
Original file line number Diff line number Diff line change
@@ -1,32 +1,34 @@
defmodule Nebulex.Adapters.Partitioned do
@moduledoc """
Adapter module for partitioned or partitioned cache.
A partitioned, or partitioned, cache is a clustered, fault-tolerant cache
that has linear scalability. Data is partitioned among all the machines
of the cluster. For fault-tolerance, partitioned caches can be configured
to keep each piece of data on one or more unique machines within a cluster.
This adapter in particular hasn't fault-tolerance built-in, each piece of
data is kept in a single node/machine (sharding), therefore, if a node fails,
the data kept by this node won't be available for the rest of the cluster.
This adapter depends on a local cache adapter, it adds a thin layer
on top of it in order to distribute requests across a group of nodes,
where is supposed the local cache is running already.
PG2 is used by the adapter to manage the cluster nodes. When the partitioned
cache is started in a node, it creates a PG2 group and joins it (the cache
supervisor PID is joined to the group). Then, when a function is invoked,
the adapter picks a node from the node list (using the PG2 group members),
and then the function is executed on that node. In the same way, when the
supervisor process of the partitioned cache dies, the PID of that process
is automatically removed from the PG2 group; this is why it's recommended
to use a partitioned hashing algorithm for the node picker.
Built-in adapter for partitioned cache topology.
A partitioned cache is a clustered, fault-tolerant cache that has linear
scalability. Data is partitioned among all the machines of the cluster.
For fault-tolerance, partitioned caches can be configured to keep each piece
of data on one or more unique machines within a cluster. This adapter
in particular hasn't fault-tolerance built-in, each piece of data is kept
in a single node/machine (sharding), therefore, if a node fails, the data
kept by this node won't be available for the rest of the cluster.
This adapter depends on a local cache adapter (primary storage), it adds
a thin layer on top of it in order to distribute requests across a group
of nodes, where is supposed the local cache is running already.
PG2 is used under-the-hood by the adapter to manage the cluster nodes.
When the partitioned cache is started in a node, it creates a PG2 group
and joins it (the cache supervisor PID is joined to the group). Then,
when a function is invoked, the adapter picks a node from the node list
(using the PG2 group members), and then the function is executed on that
node. In the same way, when the supervisor process of the partitioned cache
dies, the PID of that process is automatically removed from the PG2 group;
this is why it's recommended to use a consistent hashing algorithm for the
node selector.
## Features
* Support for partitioned topology (Sharding Distribution Model)
* Support for transactions via Erlang global name registration facility
* Configurable hash-slot module to compute the node
## Options
Expand All @@ -37,8 +39,8 @@ defmodule Nebulex.Adapters.Partitioned do
the data in there. For example, you can set the `Nebulex.Adapters.Local`
as value, unless you want to provide another one.
* `:hash_slot` - The module that implements `Nebulex.Adapter.Hash`
behaviour. Defaults to `Nebulex.Adapter.Hash.keyslot/2`.
* `:hash_slot` - The module that implements `Nebulex.Adapter.HashSlot`
behaviour.
## Runtime options
Expand All @@ -57,30 +59,30 @@ defmodule Nebulex.Adapters.Partitioned do
## Example
`Nebulex.Cache` is the wrapper around the cache. We can define the local
and partitioned cache as follows:
defmodule MyApp.LocalCache do
use Nebulex.Cache,
otp_app: :my_app,
adapter: Nebulex.Adapters.Local
end
`Nebulex.Cache` is the wrapper around the cache. We can define the
partitioned cache as follows:
defmodule MyApp.PartitionedCache do
use Nebulex.Cache,
otp_app: :my_app,
adapter: Nebulex.Adapters.Partitioned
defmodule Primary do
use Nebulex.Cache,
otp_app: :my_app,
adapter: Nebulex.Adapters.Local
end
end
Where the configuration for the cache must be in your application environment,
usually defined in your `config/config.exs`:
config :my_app, MyApp.LocalCache,
config :my_app, MyApp.PartitionedCache.Primary,
n_shards: 2,
gc_interval: 3600
config :my_app, MyApp.PartitionedCache,
primary: MyApp.LocalCache
primary: MyApp.PartitionedCache.Primary
For more information about the usage, check out `Nebulex.Cache`.
Expand Down Expand Up @@ -119,6 +121,9 @@ defmodule Nebulex.Adapters.Partitioned do
# Inherit default transaction implementation
use Nebulex.Adapter.Transaction

# Inherit default keyslot callback
use Nebulex.Adapter.HashSlot

# Provide Cache Implementation
@behaviour Nebulex.Adapter
@behaviour Nebulex.Adapter.Queryable
Expand All @@ -131,7 +136,7 @@ defmodule Nebulex.Adapters.Partitioned do
defmacro __before_compile__(env) do
otp_app = Module.get_attribute(env.module, :otp_app)
config = Module.get_attribute(env.module, :config)
hash_slot = Keyword.get(config, :hash_slot)
hash_slot = Keyword.get(config, :hash_slot, __MODULE__)
task_supervisor = Module.concat([env.module, TaskSupervisor])

unless primary = Keyword.get(config, :primary) do
Expand All @@ -142,7 +147,7 @@ defmodule Nebulex.Adapters.Partitioned do

quote do
alias Nebulex.Adapters.Local.Generation
alias Nebulex.Adapters.Partitioned.Cluster
alias Nebulex.Cache.Cluster

def __primary__, do: unquote(primary)

Expand All @@ -165,6 +170,7 @@ defmodule Nebulex.Adapters.Partitioned do
def init(opts) do
cache = Keyword.fetch!(opts, :cache)
task_sup_opts = Keyword.get(opts, :task_supervisor_opts, [])

{:ok, [{Task.Supervisor, [name: cache.__task_sup__] ++ task_sup_opts}]}
end

Expand Down
Loading

0 comments on commit f0a359d

Please sign in to comment.