Skip to content

Commit

Permalink
[#116] Instrument the partitioned adapter with Telemetry events
Browse files Browse the repository at this point in the history
  • Loading branch information
cabol committed Apr 25, 2021
1 parent b67e460 commit 4b19a8f
Showing 1 changed file with 89 additions and 46 deletions.
135 changes: 89 additions & 46 deletions lib/nebulex/adapters/partitioned.ex
Original file line number Diff line number Diff line change
Expand Up @@ -225,9 +225,9 @@ defmodule Nebulex.Adapters.Partitioned do
# Inherit default keyslot implementation
use Nebulex.Adapter.Keyslot

import Nebulex.Adapter
import Nebulex.Helpers

alias Nebulex.Adapter
alias Nebulex.Cache.Cluster
alias Nebulex.RPC

Expand Down Expand Up @@ -265,7 +265,7 @@ defmodule Nebulex.Adapters.Partitioned do
A convenience function to get the node of the given `key`.
"""
def get_node(key) do
Adapter.with_meta(get_dynamic_cache(), fn _adapter, %{name: name, keyslot: keyslot} ->
with_meta(get_dynamic_cache(), fn _adapter, %{name: name, keyslot: keyslot} ->
Cluster.get_node(name, key, keyslot)
end)
end
Expand Down Expand Up @@ -371,6 +371,12 @@ defmodule Nebulex.Adapters.Partitioned do

@impl true
def get_all(adapter_meta, keys, opts) do
with_span(adapter_meta, :get_all, fn ->
do_get_all(adapter_meta, keys, opts)
end)
end

defp do_get_all(adapter_meta, keys, opts) do
map_reduce(
keys,
adapter_meta,
Expand Down Expand Up @@ -410,11 +416,15 @@ defmodule Nebulex.Adapters.Partitioned do

@impl true
def put_all(adapter_meta, entries, _ttl, :put, opts) do
do_put_all(:put_all, adapter_meta, entries, opts)
with_span(adapter_meta, :put_all, fn ->
do_put_all(:put_all, adapter_meta, entries, opts)
end)
end

def put_all(adapter_meta, entries, _ttl, :put_new, opts) do
do_put_all(:put_new_all, adapter_meta, entries, opts)
with_span(adapter_meta, :put_new_all, fn ->
do_put_all(:put_new_all, adapter_meta, entries, opts)
end)
end

def do_put_all(action, adapter_meta, entries, opts) do
Expand Down Expand Up @@ -492,62 +502,93 @@ defmodule Nebulex.Adapters.Partitioned do

@impl true
def execute(%{name: name, task_sup: task_sup} = adapter_meta, operation, query, opts) do
reducer =
case operation do
:all -> &List.flatten/1
_ -> &Enum.sum/1
end
with_span(adapter_meta, operation, fn ->
reducer =
case operation do
:all -> &List.flatten/1
_ -> &Enum.sum/1
end

task_sup
|> RPC.multi_call(
Cluster.get_nodes(name),
__MODULE__,
:with_dynamic_cache,
[adapter_meta, operation, [query, opts]],
opts
)
|> handle_rpc_multi_call(operation, reducer)
task_sup
|> RPC.multi_call(
Cluster.get_nodes(name),
__MODULE__,
:with_dynamic_cache,
[adapter_meta, operation, [query, opts]],
opts
)
|> handle_rpc_multi_call(operation, reducer)
end)
end

@impl true
def stream(%{name: name, task_sup: task_sup} = adapter_meta, query, opts) do
Stream.resource(
fn ->
Cluster.get_nodes(name)
end,
fn
[] ->
{:halt, []}

[node | nodes] ->
elements =
rpc_call(
task_sup,
node,
__MODULE__,
:eval_stream,
[adapter_meta, query, opts],
opts
)

{elements, nodes}
end,
& &1
)
with_span(adapter_meta, :stream, fn ->
Stream.resource(
fn ->
Cluster.get_nodes(name)
end,
fn
[] ->
{:halt, []}

[node | nodes] ->
elements =
rpc_call(
task_sup,
node,
__MODULE__,
:eval_stream,
[adapter_meta, query, opts],
opts
)

{elements, nodes}
end,
& &1
)
end)
end

## Nebulex.Adapter.Persistence

@impl true
def dump(adapter_meta, path, opts) do
with_span(adapter_meta, :dump, fn ->
super(adapter_meta, path, opts)
end)
end

@impl true
def load(adapter_meta, path, opts) do
with_span(adapter_meta, :load, fn ->
super(adapter_meta, path, opts)
end)
end

## Nebulex.Adapter.Transaction

@impl true
def transaction(%{name: name} = adapter_meta, opts, fun) do
super(adapter_meta, Keyword.put(opts, :nodes, Cluster.get_nodes(name)), fun)
with_span(adapter_meta, :transaction, fn ->
super(adapter_meta, Keyword.put(opts, :nodes, Cluster.get_nodes(name)), fun)
end)
end

@impl true
def in_transaction?(adapter_meta) do
with_span(adapter_meta, :in_transaction?, fn ->
super(adapter_meta)
end)
end

## Nebulex.Adapter.Stats

@impl true
def stats(adapter_meta) do
with_dynamic_cache(adapter_meta, :stats, [])
with_span(adapter_meta, :stats, fn ->
with_dynamic_cache(adapter_meta, :stats, [])
end)
end

## Helpers
Expand Down Expand Up @@ -582,9 +623,11 @@ defmodule Nebulex.Adapters.Partitioned do
end

defp call(adapter_meta, key, fun, args, opts \\ []) do
adapter_meta
|> get_node(key)
|> rpc_call(adapter_meta, fun, args, opts)
with_span(adapter_meta, fun, fn ->
adapter_meta
|> get_node(key)
|> rpc_call(adapter_meta, fun, args, opts)
end)
end

defp rpc_call(node, %{task_sup: task_sup} = meta, fun, args, opts) do
Expand Down

0 comments on commit 4b19a8f

Please sign in to comment.