Skip to content

Commit

Permalink
Better gen_stage implementation this time around
Browse files Browse the repository at this point in the history
  • Loading branch information
craigp committed Sep 28, 2016
1 parent 6a897f9 commit 57a1967
Show file tree
Hide file tree
Showing 8 changed files with 212 additions and 80 deletions.
7 changes: 5 additions & 2 deletions lib/slack_logger_backend.ex
Original file line number Diff line number Diff line change
Expand Up @@ -53,13 +53,16 @@ defmodule SlackLoggerBackend do
"""

use Application
alias SlackLoggerBackend.Pool
alias SlackLoggerBackend.{Pool, Formatter, Producer, Consumer}

@doc false
def start(_type, _args) do
import Supervisor.Spec, warn: false
children = [
worker(Pool, [20])
worker(Producer, []),
worker(Formatter, [10, 5]),
worker(Consumer, [10, 5]),
worker(Pool, [10])
]
opts = [strategy: :one_for_one, name: SlackLoggerBackend.Supervisor]
Supervisor.start_link(children, opts)
Expand Down
43 changes: 43 additions & 0 deletions lib/slack_logger_backend/consumer.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,43 @@
alias Experimental.GenStage

defmodule SlackLoggerBackend.Consumer do

@moduledoc """
Consumes logger events and pushes them onto the worker pool to send to Slack.
"""
use GenStage
alias SlackLoggerBackend.{Formatter, Pool}

@doc false
def start_link(max_demand, min_demand) do
GenStage.start_link(__MODULE__, {max_demand, min_demand}, name: __MODULE__)
end

@doc false
def init({max_demand, min_demand}) do
{:consumer, %{},
subscribe_to: [{Formatter, max_demand: max_demand, min_demand: min_demand}]}
end

@doc false
def handle_events([], _from, interval) do
process_events([], interval)
end

@doc false
def handle_events(events, _from, state) do
events
|> Enum.filter(fn evt -> evt != :empty end)
|> process_events(state)
end

defp process_events([], state) do
{:noreply, [], state}
end

defp process_events([{url, json}|events], state) do
Pool.post(url, json)
process_events(events, state)
end

end
81 changes: 81 additions & 0 deletions lib/slack_logger_backend/format_helper.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
defmodule SlackLoggerBackend.FormatHelper do

@moduledoc """
Simple formatter for Slack messages.
"""

import Poison, only: [encode: 1]

@doc """
Formats a log event for Slack.
"""
def format_event({level, message, module, function, file, line}) do
IO.puts "MESSAGE: #{message}"
{:ok, event} = %{attachments: [%{
fallback: "An #{level} level event has occurred: #{message}",
pretext: message,
fields: [%{
title: "Level",
value: level,
short: true
}, %{
title: "Module",
value: module,
short: true
}, %{
title: "Function",
value: function,
short: true
}, %{
title: "File",
value: file,
short: true
}, %{
title: "Line",
value: line,
short: true
}]
}]}
|> encode
event
end

@doc """
Formats a log event for Slack.
"""
def format_event({level, message, application, module, function, file, line}) do
IO.puts "MESSAGE: #{message}"
{:ok, event} = %{attachments: [%{
fallback: "An #{level} level event has occurred: #{message}",
pretext: message,
fields: [%{
title: "Level",
value: level,
short: true
}, %{
title: "Application",
value: application,
short: true
}, %{
title: "Module",
value: module,
short: true
}, %{
title: "Function",
value: function,
short: true
}, %{
title: "File",
value: file,
short: true
}, %{
title: "Line",
value: line,
short: true
}]
}]}
|> encode
event
end

end
87 changes: 22 additions & 65 deletions lib/slack_logger_backend/formatter.ex
Original file line number Diff line number Diff line change
@@ -1,79 +1,36 @@
alias Experimental.GenStage

defmodule SlackLoggerBackend.Formatter do

@moduledoc """
Simple formatter for Slack messages.
Formats log events into pretty Slack messages.
"""

import Poison, only: [encode: 1]
use GenStage
alias SlackLoggerBackend.{Producer, FormatHelper}

@doc """
Formats a log event for Slack.
"""
def format_event({level, message, module, function, file, line}) do
{:ok, event} = %{attachments: [%{
fallback: "An #{level} level event has occurred: #{message}",
pretext: message,
fields: [%{
title: "Level",
value: level,
short: true
}, %{
title: "Module",
value: module,
short: true
}, %{
title: "Function",
value: function,
short: true
}, %{
title: "File",
value: file,
short: true
}, %{
title: "Line",
value: line,
short: true
}]
}]}
|> encode
event
@doc false
def start_link(max_demand, min_demand) do
GenStage.start_link(__MODULE__, {max_demand, min_demand}, name: __MODULE__)
end

@doc false
def init({max_demand, min_demand}) do
{:producer_consumer, %{},
subscribe_to: [{Producer, max_demand: max_demand, min_demand: min_demand}]}
end

@doc false
def handle_events(events, _from, state) do
events = Enum.map(events, &format_event/1)
{:noreply, events, state}
end

@doc """
Formats a log event for Slack.
"""
def format_event({level, message, application, module, function, file, line}) do
{:ok, event} = %{attachments: [%{
fallback: "An #{level} level event has occurred: #{message}",
pretext: message,
fields: [%{
title: "Level",
value: level,
short: true
}, %{
title: "Application",
value: application,
short: true
}, %{
title: "Module",
value: module,
short: true
}, %{
title: "Function",
value: function,
short: true
}, %{
title: "File",
value: file,
short: true
}, %{
title: "Line",
value: line,
short: true
}]
}]}
|> encode
event
def format_event({url, event}) do
{url, FormatHelper.format_event(event)}
end

end
11 changes: 2 additions & 9 deletions lib/slack_logger_backend/logger.ex
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,7 @@ defmodule SlackLoggerBackend.Logger do
"""

use GenEvent
alias SlackLoggerBackend.{Formatter, Pool}
import Formatter, only: [format_event: 1]
alias SlackLoggerBackend.Producer

@env_webhook "SLACK_LOGGER_WEBHOOK_URL"
@default_log_levels [:error]
Expand Down Expand Up @@ -72,26 +71,20 @@ defmodule SlackLoggerBackend.Logger do

defp handle_event(level, message, [pid: _, application: application, module: module, function: function, file: file, line: line]) do
{level, message, application, module, function, file, line}
|> format_event
|> send_event
end

defp handle_event(level, message, [pid: _, module: module, function: function, file: file, line: line]) do
{level, message, module, function, file, line}
|> format_event
|> send_event
end

defp handle_event(_, _, _, _) do
:noop
end

defp handle_event(_, _, _) do
:noop
end

defp send_event(event) do
Pool.post(get_url, event)
Producer.add_event({get_url, event})
end

end
55 changes: 55 additions & 0 deletions lib/slack_logger_backend/producer.ex
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
alias Experimental.GenStage

defmodule SlackLoggerBackend.Producer do

@moduledoc """
Produces logger events to be consumed and send to Slack.
"""
use GenStage

@doc false
def start_link do
GenStage.start_link(__MODULE__, :ok, name: __MODULE__)
end

@doc false
def init(:ok) do
{:producer, {:queue.new, 0}}
end

@doc false
def handle_cast({:add, event}, {queue, demand}) when demand > 0 do
{:noreply, [event], {queue, demand - 1}}
end

@doc false
def handle_cast({:add, event}, {queue, demand}) do
{:noreply, [], {:queue.in(event, queue), demand}}
end

@doc false
def handle_demand(incoming_demand, {queue, demand}) when incoming_demand > 0 do
dispatch_events(queue, incoming_demand + demand, [])
end

@doc """
Adds a logger event to the queue for sending to Slack.
"""
def add_event(event) do
GenStage.cast(__MODULE__, {:add, event})
end

defp dispatch_events(queue, demand, events) when demand > 0 do
case :queue.out(queue) do
{:empty, queue} ->
{:noreply, events, {queue, demand}}
{{:value, event}, queue} ->
dispatch_events(queue, demand - 1, [event|events])
end
end

defp dispatch_events(queue, demand, events) do
{:noreply, events, {queue, demand}}
end

end
2 changes: 1 addition & 1 deletion mix.exs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ defmodule SlackLoggerBackend.Mixfile do
[
app: :slack_logger_backend,
description: "A logger backend for posting errors to Slack.",
version: "0.1.11",
version: "0.1.12",
build_embedded: Mix.env == :prod,
start_permanent: Mix.env == :prod,
deps: deps,
Expand Down
6 changes: 3 additions & 3 deletions mix.lock
Original file line number Diff line number Diff line change
Expand Up @@ -6,14 +6,14 @@
"credo": {:hex, :credo, "0.4.11", "03a64e9d53309b7132556284dda0be57ba1013885725124cfea7748d740c6170", [:mix], [{:bunt, "~> 0.1.6", [hex: :bunt, optional: false]}]},
"dialyxir": {:hex, :dialyxir, "0.3.5", "eaba092549e044c76f83165978979f60110dc58dd5b92fd952bf2312f64e9b14", [:mix], []},
"earmark": {:hex, :earmark, "1.0.1", "2c2cd903bfdc3de3f189bd9a8d4569a075b88a8981ded9a0d95672f6e2b63141", [:mix], []},
"ex_doc": {:hex, :ex_doc, "0.13.1", "658dbfc8cc5b0fac192f0f3254efe66ee6294200804a291549e0aeb052053bba", [:mix], [{:earmark, "~> 1.0", [hex: :earmark, optional: false]}]},
"ex_doc": {:hex, :ex_doc, "0.13.2", "1059a588d2ad3ffab25a0b85c58abf08e437d3e7a9124ac255e1d15cec68ab79", [:mix], [{:earmark, "~> 1.0", [hex: :earmark, optional: false]}]},
"excoveralls": {:hex, :excoveralls, "0.5.6", "35a903f6f78619ee7f951448dddfbef094b3a0d8581657afaf66465bc930468e", [:mix], [{:exjsx, "~> 3.0", [hex: :exjsx, optional: false]}, {:hackney, ">= 0.12.0", [hex: :hackney, optional: false]}]},
"exjsx": {:hex, :exjsx, "3.2.0", "7136cc739ace295fc74c378f33699e5145bead4fdc1b4799822d0287489136fb", [:mix], [{:jsx, "~> 2.6.2", [hex: :jsx, optional: false]}]},
"gen_stage": {:hex, :gen_stage, "0.5.0", "758068f3a81286e342a609d668e9624714c2e4d43cc26699ead04d1680fde6c6", [:mix], []},
"hackney": {:hex, :hackney, "1.6.1", "ddd22d42db2b50e6a155439c8811b8f6df61a4395de10509714ad2751c6da817", [:rebar3], [{:certifi, "0.4.0", [hex: :certifi, optional: false]}, {:idna, "1.2.0", [hex: :idna, optional: false]}, {:metrics, "1.0.1", [hex: :metrics, optional: false]}, {:mimerl, "1.0.2", [hex: :mimerl, optional: false]}, {:ssl_verify_fun, "1.1.0", [hex: :ssl_verify_fun, optional: false]}]},
"httpoison": {:hex, :httpoison, "0.9.1", "6c2b4eaf2588a6f3ef29663d28c992531ca3f0bc832a97e0359bc822978e1c5d", [:mix], [{:hackney, "~> 1.6.0", [hex: :hackney, optional: false]}]},
"httpoison": {:hex, :httpoison, "0.9.2", "a211a8e87403a043c41218e64df250d321f236ac57f786c6a0ccf3e9e817c819", [:mix], [{:hackney, "~> 1.6.0", [hex: :hackney, optional: false]}]},
"idna": {:hex, :idna, "1.2.0", "ac62ee99da068f43c50dc69acf700e03a62a348360126260e87f2b54eced86b2", [:rebar3], []},
"inch_ex": {:hex, :inch_ex, "0.5.3", "39f11e96181ab7edc9c508a836b33b5d9a8ec0859f56886852db3d5708889ae7", [:mix], [{:poison, "~> 1.5 or ~> 2.0", [hex: :poison, optional: false]}]},
"inch_ex": {:hex, :inch_ex, "0.5.4", "a2b032ad141a335a0a119f49b157b36326f5928d16a1d129b0f582398fdc25d2", [:mix], [{:poison, "~> 1.5 or ~> 2.0", [hex: :poison, optional: false]}]},
"jsx": {:hex, :jsx, "2.6.2", "213721e058da0587a4bce3cc8a00ff6684ced229c8f9223245c6ff2c88fbaa5a", [:mix, :rebar], []},
"metrics": {:hex, :metrics, "1.0.1", "25f094dea2cda98213cecc3aeff09e940299d950904393b2a29d191c346a8486", [:rebar3], []},
"mime": {:hex, :mime, "1.0.1", "05c393850524767d13a53627df71beeebb016205eb43bfbd92d14d24ec7a1b51", [:mix], []},
Expand Down

0 comments on commit 57a1967

Please sign in to comment.