Skip to content
GitHub no longer supports this web browser. Learn more about the browsers we support.
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Telemetry integration #663

Merged
merged 11 commits into from May 7, 2019
@@ -16,9 +16,11 @@ defmodule Absinthe.Blueprint do
schema: nil,
adapter: nil,
# Added by phases
telemetry: %{},
flags: %{},
errors: [],
input: nil,
source: nil,
execution: %Blueprint.Execution{},
result: %{}

@@ -31,8 +33,11 @@ defmodule Absinthe.Blueprint do
schema: nil | Absinthe.Schema.t(),
adapter: nil | Absinthe.Adapter.t(),
# Added by phases
telemetry: map,
errors: [Absinthe.Phase.Error.t()],
flags: flags_t,
input: nil | Absinthe.Language.Document.t(),
source: nil | String.t() | Absinthe.Language.Source.t(),
execution: Blueprint.Execution.t(),
result: result_t
}
@@ -0,0 +1,58 @@
defmodule Absinthe.Middleware.Telemetry do
@moduledoc """
Gather and report telemetry about an individual field resolution
"""
@field_start [:absinthe, :resolve, :field, :start]
@field [:absinthe, :resolve, :field]

@behaviour Absinthe.Middleware

@impl Absinthe.Middleware
def call(resolution, _) do
id = :erlang.unique_integer()
start_time = System.system_time()
start_time_mono = System.monotonic_time()

:telemetry.execute(@field_start, %{start_time: start_time}, %{id: id})

%{
resolution
| middleware:
resolution.middleware ++
[
{{__MODULE__, :on_complete},
%{
id: id,
start_time: start_time,
start_time_mono: start_time_mono,
middleware: resolution.middleware
}}
]
This conversation was marked as resolved by binaryseed

This comment has been minimized.

Copy link
@benwilson512

benwilson512 Jan 8, 2019

Contributor

💯

}
end

def on_complete(
%{state: :resolved} = resolution,
%{
id: id,
start_time: start_time,
start_time_mono: start_time_mono,
middleware: middleware
}
) do
end_time_mono = System.monotonic_time()

:telemetry.execute(
@field,
%{duration: end_time_mono - start_time_mono},
%{
id: id,
start_time: start_time,
middleware: middleware,
resolution: resolution
}
)

resolution
end
end
@@ -0,0 +1,12 @@
defmodule Absinthe.Phase.Init do
@moduledoc false

use Absinthe.Phase

alias Absinthe.{Blueprint, Language, Phase}

@spec run(Language.Source.t(), Keyword.t()) :: Phase.result_t()
def run(input, _options \\ []) do
{:ok, %Blueprint{input: input}}
end
end
@@ -5,7 +5,8 @@ defmodule Absinthe.Phase.Parse do

alias Absinthe.{Language, Phase}

@spec run(Language.Source.t(), Keyword.t()) :: Phase.result_t()
@type input_t :: Language.Source.t() | Blueprint.t()
@spec run(input_t, Keyword.t()) :: Phase.result_t()
def run(input, options \\ [])

def run(%Absinthe.Blueprint{} = blueprint, options) do
@@ -28,7 +28,11 @@ defmodule Absinthe.Phase.Subscription.SubscribeSelf do
for field_key <- field_keys,
do: Absinthe.Subscription.subscribe(pubsub, field_key, subscription_id, blueprint)

{:replace, blueprint, [{Phase.Subscription.Result, topic: subscription_id}]}
{:replace, blueprint,
[
{Phase.Subscription.Result, topic: subscription_id},
{Phase.Telemetry, [:execute, :operation, options]}

This comment has been minimized.

Copy link
@benwilson512

benwilson512 May 7, 2019

Contributor

Nice catch.

]}
else
{:error, error} ->
blueprint = update_in(blueprint.execution.validation_errors, &[error | &1])
@@ -0,0 +1,88 @@
defmodule Absinthe.Phase.Telemetry do
@moduledoc """
Gather and report telemetry about an operation.
"""
@operation_start [:absinthe, :execute, :operation, :start]
@operation [:absinthe, :execute, :operation]

@subscription_start [:absinthe, :subscription, :publish, :start]
@subscription [:absinthe, :subscription, :publish]

use Absinthe.Phase

def run(blueprint, [:execute, :operation, :start]) do
id = :erlang.unique_integer()
start_time = System.system_time()
start_time_mono = System.monotonic_time()

:telemetry.execute(@operation_start, %{start_time: start_time}, %{id: id})

{:ok,
%{
blueprint
| source: blueprint.input,
telemetry: %{
id: id,
start_time: start_time,
start_time_mono: start_time_mono
}
}}
end

def run(blueprint, [:subscription, :publish, :start]) do
id = :erlang.unique_integer()
start_time = System.system_time()
start_time_mono = System.monotonic_time()

:telemetry.execute(@subscription_start, %{start_time: start_time}, %{id: id})

{:ok,
%{
blueprint
| telemetry: %{
id: id,
start_time: start_time,
start_time_mono: start_time_mono
}
}}
end

def run(blueprint, [:subscription, :publish]) do
end_time_mono = System.monotonic_time()

with %{id: id, start_time: start_time, start_time_mono: start_time_mono} <-
blueprint.telemetry do
:telemetry.execute(
@subscription,
%{duration: end_time_mono - start_time_mono},
%{
id: id,
start_time: start_time,
blueprint: blueprint
}
)
end

{:ok, blueprint}
end

def run(blueprint, [:execute, :operation, options]) do
end_time_mono = System.monotonic_time()

with %{id: id, start_time: start_time, start_time_mono: start_time_mono} <-
blueprint.telemetry do
:telemetry.execute(
@operation,
%{duration: end_time_mono - start_time_mono},
%{
id: id,
start_time: start_time,
blueprint: blueprint,
options: options
}
)
end

{:ok, blueprint}
end
end
@@ -45,6 +45,8 @@ defmodule Absinthe.Pipeline do

[
# Parse Document
Phase.Init,
{Phase.Telemetry, [:execute, :operation, :start]},
{Phase.Parse, options},
# Convert to Blueprint
{Phase.Blueprint, options},
@@ -104,7 +106,8 @@ defmodule Absinthe.Pipeline do
{Phase.Subscription.SubscribeSelf, options},
{Phase.Document.Execution.Resolution, options},
# Format Result
Phase.Document.Result
Phase.Document.Result,
{Phase.Telemetry, [:execute, :operation, options]}
]
end

@@ -8,6 +8,7 @@ defmodule Absinthe.Pipeline.BatchResolver do
def run([], _), do: []

def run([bp | _] = blueprints, options) do
{initial_phases, options} = Keyword.pop(options, :initial_phases, [])
schema = Keyword.fetch!(options, :schema)
plugins = schema.plugins()

@@ -24,17 +25,16 @@ defmodule Absinthe.Pipeline.BatchResolver do
result: nil
}

resolution_phase = {Execution.Resolution, [plugin_callbacks: false] ++ options}
resolution_phase = [{Execution.Resolution, [plugin_callbacks: false] ++ options}]
phases = initial_phases ++ [resolution_phase]

do_resolve(blueprints, [resolution_phase], exec, plugins, resolution_phase, options)
do_resolve(blueprints, phases, exec, plugins, resolution_phase, options)
end

defp init(blueprints, attr) do
Enum.reduce(blueprints, %{}, &Map.merge(Map.fetch!(&1.execution, attr), &2))
end

# defp update()

defp do_resolve(blueprints, phases, exec, plugins, resolution_phase_template, options) do
exec =
Enum.reduce(plugins, exec, fn plugin, exec ->
@@ -536,6 +536,7 @@ defmodule Absinthe.Schema.Notation do
|> recordable!(:resolve, @placement[:resolve])

quote do
meta :absinthe_telemetry, true
middleware Absinthe.Resolution, unquote(func_ast)
end
end
@@ -1684,8 +1685,12 @@ defmodule Absinthe.Schema.Notation do
[{Absinthe.Middleware.MapGet, identifier}]
end

def __ensure_middleware__(middleware, _field, _object) do
middleware
def __ensure_middleware__(middleware, field, _object) do
if Absinthe.Type.meta(field, :absinthe_telemetry) do
[{Absinthe.Middleware.Telemetry, []} | middleware]
else
middleware
end
end

defp reverse_with_descs(attrs, descs, acc \\ [])
@@ -44,10 +44,17 @@ defmodule Absinthe.Subscription.Local do

defp run_docset(pubsub, docs_and_topics) do
{topics, docs} = Enum.unzip(docs_and_topics)
docs = BatchResolver.run(docs, schema: hd(docs).schema, abort_on_error: false)

docs =
BatchResolver.run(docs,
schema: hd(docs).schema,
abort_on_error: false,
initial_phases: [{Absinthe.Phase.Telemetry, [:subscription, :publish, :start]}]
)

pipeline = [
Absinthe.Phase.Document.Result
Absinthe.Phase.Document.Result,
{Absinthe.Phase.Telemetry, [:subscription, :publish]}
]

for {doc, {topic, key_strategy}} <- Enum.zip(docs, topics), doc != :error do
@@ -58,17 +58,18 @@ defmodule Absinthe.Mixfile do
defp elixirc_paths(_), do: ["lib"]

def application do
[applications: [:logger]]
[extra_applications: [:logger]]
end

defp deps do
[
{:benchee, ">= 0.0.0", only: :dev},
{:nimble_parsec, "~> 0.4"},
{:telemetry, "~> 0.4.0"},
{:dataloader, "~> 1.0.0", optional: true},
{:decimal, "~> 1.0", optional: true},
{:ex_doc, "~> 0.20", only: :dev},
{:benchee, ">= 0.0.0", only: :dev},
{:dialyxir, "~> 1.0.0-rc.6", only: [:dev], runtime: false},
{:decimal, "~> 1.0", optional: true},
{:phoenix_pubsub, ">= 0.0.0", only: :test},
{:mix_test_watch, "~> 0.4.1", only: [:test, :dev]}
]
@@ -16,4 +16,5 @@
"mix_test_watch": {:hex, :mix_test_watch, "0.4.1", "a98a84c795623f1ba020324f4354cf30e7120ba4dab65f9c2ae300f830a25f75", [:mix], [{:fs, "~> 0.9.1", [hex: :fs, repo: "hexpm", optional: false]}], "hexpm"},
"nimble_parsec": {:hex, :nimble_parsec, "0.5.0", "90e2eca3d0266e5c53f8fbe0079694740b9c91b6747f2b7e3c5d21966bba8300", [:mix], [], "hexpm"},
"phoenix_pubsub": {:hex, :phoenix_pubsub, "1.1.0", "d55e25ff1ff8ea2f9964638366dfd6e361c52dedfd50019353598d11d4441d14", [:mix], [], "hexpm"},
"telemetry": {:hex, :telemetry, "0.4.0", "8339bee3fa8b91cb84d14c2935f8ecf399ccd87301ad6da6b71c09553834b2ab", [:rebar3], [], "hexpm"},
}
ProTip! Use n and p to navigate between commits in a pull request.
You can’t perform that action at this time.