Skip to content

Commit

Permalink
[#3] Add counters support – incr function to handle increments and …
Browse files Browse the repository at this point in the history
…decrements
  • Loading branch information
cabol committed Aug 23, 2017
1 parent 3316e17 commit 9421adf
Show file tree
Hide file tree
Showing 10 changed files with 196 additions and 17 deletions.
5 changes: 5 additions & 0 deletions bench/dist_bench.exs
Original file line number Diff line number Diff line change
Expand Up @@ -85,4 +85,9 @@ defmodule DistBench do
Dist.update(:non_existent, 1, &Dist.update_fun/1)
:ok
end

bench "incr" do
Dist.incr(bench_context, 1)
:ok
end
end
5 changes: 5 additions & 0 deletions bench/local_bench.exs
Original file line number Diff line number Diff line change
Expand Up @@ -84,4 +84,9 @@ defmodule LocalBench do
Cache.update(bench_context, 1, &(&1))
:ok
end

bench "incr" do
Cache.incr(bench_context, 1)
:ok
end
end
7 changes: 7 additions & 0 deletions lib/nebulex/adapter.ex
Original file line number Diff line number Diff line change
Expand Up @@ -112,4 +112,11 @@ defmodule Nebulex.Adapter do
See `Nebulex.Cache.update/4`.
"""
@callback update(cache, key, initial :: value, (value -> value), opts) :: value | no_return

@doc """
Updates (increment or decrement) the counter mapped to the given `key`.
See `Nebulex.Cache.incr/3`.
"""
@callback incr(cache, key, incr :: integer, opts) :: integer | no_return
end
5 changes: 5 additions & 0 deletions lib/nebulex/adapters/dist.ex
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,11 @@ defmodule Nebulex.Adapters.Dist do
call(cache, :update, [key, initial, fun, opts])
end

@doc false
def incr(cache, key, incr, opts) do
call(cache, :incr, [key, incr, opts])
end

## Private Functions

defp call(cache, fun, [key | _] = args) do
Expand Down
60 changes: 47 additions & 13 deletions lib/nebulex/adapters/local.ex
Original file line number Diff line number Diff line change
Expand Up @@ -149,17 +149,17 @@ defmodule Nebulex.Adapters.Local do
end
end

defp fetch(cache, gen, key, opts, fun \\ :get) do
Local
|> apply(fun, [gen, key, nil, cache.__state__])
defp fetch(cache, gen, key, opts, fun \\ &local_get/4) do
fun
|> apply([gen, key, nil, cache.__state__])
|> validate_vsn(:get, opts)
|> validate_ttl(gen, cache)
|> validate_return(opts)
end

defp retrieve([newest | olders], cache, key, opts) do
Enum.reduce_while(olders, {newest, nil}, fn(gen, {newer, _}) ->
if object = fetch(cache, gen, key, ret_obj(opts), :pop) do
if object = fetch(cache, gen, key, ret_obj(opts), &local_pop/4) do
{:halt, {gen, do_set(object, newer, cache, opts)}}
else
{:cont, {gen, nil}}
Expand Down Expand Up @@ -197,9 +197,9 @@ defmodule Nebulex.Adapters.Local do
|> validate_return(opts)
end

defp set_object(object, gen, cache) do
_ = Local.put(gen, object.key, object, cache.__state__)
object
defp set_object(obj, gen, cache) do
_ = Local.insert(gen, {obj.key, obj.value, obj.version, obj.ttl}, cache.__state__)
obj
end

@doc false
Expand Down Expand Up @@ -258,16 +258,20 @@ defmodule Nebulex.Adapters.Local do

@doc false
def keys(cache) do
ms = [{{:"$1", :_, :_, :_}, [], [:"$1"]}]

cache.__metadata__.generations
|> Enum.reduce([], fn(gen, acc) -> Local.keys(gen, cache.__state__) ++ acc end)
|> Enum.reduce([], fn(gen, acc) -> Local.select(gen, ms, cache.__state__) ++ acc end)
|> :lists.usort()
end

@doc false
def reduce(cache, acc_in, fun, opts) do
Enum.reduce(cache.__metadata__.generations, acc_in, fn(gen, acc) ->
Local.foldl(fn({key, object}, fold_acc) ->
return = validate_return(object, opts)
Local.foldl(fn({key, val, vsn, ttl}, fold_acc) ->
return =
%Object{key: key, value: val, version: vsn, ttl: ttl}
|> validate_return(opts)
fun.({key, return}, fold_acc)
end, acc, gen, cache.__state__)
end)
Expand All @@ -277,8 +281,11 @@ defmodule Nebulex.Adapters.Local do
def to_map(cache, opts) do
match_spec =
case Keyword.get(opts, :return, :key) do
:object -> [{{:"$1", :"$2"}, [], [{{:"$1", :"$2"}}]}]
_ -> [{{:"$1", %{value: :"$2"}}, [], [{{:"$1", :"$2"}}]}]
:object ->
object_match = %Object{key: :"$1", value: :"$2", version: :"$3", ttl: :"$4"}
[{{:"$1", :"$2", :"$3", :"$4"}, [], [{{:"$1", object_match}}]}]
_ ->
[{{:"$1", :"$2", :_, :_}, [], [{{:"$1", :"$2"}}]}]
end

Enum.reduce(cache.__metadata__.generations, %{}, fn(gen, acc) ->
Expand Down Expand Up @@ -336,6 +343,19 @@ defmodule Nebulex.Adapters.Local do
end
end

@doc false
def incr(cache, key, incr, opts) do
ttl = seconds_since_epoch(opts[:ttl])

try do
cache.__metadata__.generations
|> hd()
|> Local.update_counter(key, {2, incr}, {key, 0, nil, ttl}, cache.__state__)
rescue
_e -> raise ArgumentError, "key #{inspect(key)} has not a valid integer value"
end
end

@doc false
def transaction(cache, opts, fun) do
keys = opts[:keys] || []
Expand All @@ -346,6 +366,20 @@ defmodule Nebulex.Adapters.Local do

## Helpers

defp local_get(tab, key, default, state) do
case Local.lookup(tab, key, state) do
[] -> default
[{^key, val, vsn, ttl}] -> %Object{key: key, value: val, version: vsn, ttl: ttl}
end
end

def local_pop(tab, key, default, state) do
case Local.take(tab, key, state) do
[] -> default
[{^key, val, vsn, ttl}] -> %Object{key: key, value: val, version: vsn, ttl: ttl}
end
end

defp on_conflict(nil, :get, _, _),
do: nil
defp on_conflict(:replace, :set, cached, object),
Expand Down Expand Up @@ -393,7 +427,7 @@ defmodule Nebulex.Adapters.Local do

defp validate_return(nil, _),
do: nil
defp validate_return(object, opts) do
defp validate_return(%Object{} = object, opts) do
case Keyword.get(opts, :return, :value) do
:object -> object
:value -> object.value
Expand Down
16 changes: 16 additions & 0 deletions lib/nebulex/adapters/multilevel.ex
Original file line number Diff line number Diff line change
Expand Up @@ -391,6 +391,22 @@ defmodule Nebulex.Adapters.Multilevel do
eval(cache, :update, [key, initial, fun, opts], opts)
end

@doc """
Updates (increment or decrement) the counter mapped to the given `key`.
## Options
* `:level` - Check shared options in module documentation.
## Example
# Entry is set at first cache level
MyCache.incr("foo", "bar")
"""
def incr(cache, key, incr, opts) do
eval(cache, :incr, [key, incr, opts], opts)
end

@doc """
Runs the given function inside a transaction.
Expand Down
33 changes: 32 additions & 1 deletion lib/nebulex/cache.ex
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ defmodule Nebulex.Cache do
end

def get!(key, opts \\ []) do
case execute(:get, [key, opts]) do
case get(key, opts) do
nil -> raise KeyError, key: key, term: __MODULE__
val -> val
end
Expand Down Expand Up @@ -178,6 +178,12 @@ defmodule Nebulex.Cache do
execute(:update, [key, initial, fun, opts])
end

def incr(key, incr \\ 1, opts \\ [])
def incr(key, incr, opts) when is_integer(incr),
do: execute(:incr, [key, incr, opts])
def incr(_key, incr, _opts),
do: raise ArgumentError, "the incr must be an integer, got: #{inspect incr}"

if function_exported?(@adapter, :transaction, 3) do
def transaction(fun, opts \\ []) do
execute(:transaction, [opts, fun])
Expand Down Expand Up @@ -649,6 +655,31 @@ defmodule Nebulex.Cache do
"""
@callback update(key, initial :: value, (value -> value), opts) :: value | no_return

@doc """
Updates (increment or decrement) the counter mapped to the given `key`.
If `incr >= 0` then the current value is incremented by that amount,
otherwise the current value is decremented.
If `incr` is not a valid integer, then an `ArgumentError` exception
is raised.
## Options
See the "Shared options" section at the module documentation.
## Examples
1 = MyCache.incr(:a)
3 = MyCache.incr(:a, 2)
2 = MyCache.incr(:a, -1)
%Nebulex.Object{key: :a, value: 2} = MyCache.incr(:a, 0)
"""
@callback incr(key, incr :: integer, opts) :: integer | no_return

@doc """
Runs the given function inside a transaction.
Expand Down
26 changes: 25 additions & 1 deletion test/nebulex/adapters/local_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ defmodule Nebulex.Adapters.LocalTest do
use ExUnit.Case, async: true
use Nebulex.CacheTest, cache: Nebulex.TestCache.Local

alias Nebulex.Object
alias Nebulex.TestCache.Local, as: TestCache

setup do
Expand Down Expand Up @@ -78,6 +79,24 @@ defmodule Nebulex.Adapters.LocalTest do
end
end

test "incr with update" do
TestCache.new_generation

assert TestCache.incr(:counter) == 1
assert TestCache.incr(:counter) == 2

assert TestCache.get_and_update(:counter, &({&1, &1 * 2})) == {2, 4}
assert TestCache.incr(:counter) == 5

assert TestCache.update(:counter, 1, &(&1 * 2)) == 10
assert TestCache.incr(:counter, -10) == 0

TestCache.set("foo", "bar")
assert_raise ArgumentError, fn ->
TestCache.incr("foo")
end
end

test "push generations" do
# create 1st generation
TestCache.new_generation
Expand Down Expand Up @@ -135,5 +154,10 @@ defmodule Nebulex.Adapters.LocalTest do
|> get_from(key)
end

defp get_from(gen, key), do: ExShards.Local.get(gen, key, nil, TestCache.__state__)
defp get_from(gen, key) do
case ExShards.Local.lookup(gen, key, TestCache.__state__) do
[] -> nil
[{^key, val, vsn, ttl}] -> %Object{key: key, value: val, version: vsn, ttl: ttl}
end
end
end
33 changes: 31 additions & 2 deletions test/shared/cache_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -221,6 +221,36 @@ defmodule Nebulex.CacheTest do
end
end

test "incr" do
@cache.new_generation

assert @cache.incr(:counter) == 1
assert @cache.incr(:counter) == 2
assert @cache.incr(:counter, 2) == 4
assert @cache.incr(:counter, 3) == 7
assert @cache.incr(:counter, 0) == 7

assert @cache.get(:counter) == 7

assert @cache.incr(:counter, -1) == 6
assert @cache.incr(:counter, -1) == 5
assert @cache.incr(:counter, -2) == 3
assert @cache.incr(:counter, -3) == 0

expected_counter_obj = %Object{key: :counter, value: 0, ttl: :infinity}
assert @cache.get(:counter, return: :object) == expected_counter_obj

assert @cache.incr(:counter_with_ttl, 1, ttl: 1) == 1
assert @cache.incr(:counter_with_ttl) == 2
assert @cache.get(:counter_with_ttl) == 2
_ = :timer.sleep(1010)
refute @cache.get(:counter_with_ttl)

assert_raise ArgumentError, fn ->
@cache.incr(:counter, "foo")
end
end

test "key expiration with ttl" do
@cache.new_generation

Expand All @@ -229,8 +259,7 @@ defmodule Nebulex.CacheTest do
|> @cache.set(11, ttl: 1, return: :key)
|> @cache.get!

_ = :timer.sleep 1010

_ = :timer.sleep(1010)
refute @cache.get(1)
end

Expand Down
23 changes: 23 additions & 0 deletions test/shared/multilevel_test.exs
Original file line number Diff line number Diff line change
Expand Up @@ -233,6 +233,29 @@ defmodule Nebulex.MultilevelTest do
assert @l3.get(2) == 4
end

test "incr" do
assert @cache.incr(1) == 1
assert @l1.get(1) == 1
refute @l2.get(1)
refute @l3.get(1)

assert @cache.incr(2, 2, level: 2) == 2
assert @l2.get(2) == 2
refute @l1.get(2)
refute @l3.get(2)

assert @cache.incr(3, 3, level: :all) == 3
assert @l1.get(3) == 3
assert @l2.get(3) == 3
assert @l3.get(3) == 3

assert @cache.incr(4, 5, level: :all) == 5
assert @cache.incr(4, -5, level: :all) == 0
assert @l1.get(4) == 0
assert @l2.get(4) == 0
assert @l3.get(4) == 0
end

test "transaction" do
refute @cache.transaction fn ->
@cache.set(1, 11, return: :key)
Expand Down

0 comments on commit 9421adf

Please sign in to comment.