Skip to content

Commit

Permalink
Merge pull request #42 from benonymus/benonymus/fix_warnings
Browse files Browse the repository at this point in the history
Address most of the warnings
  • Loading branch information
chen-anders committed Dec 13, 2023
2 parents 4035cd4 + a5cd929 commit 1617c6a
Show file tree
Hide file tree
Showing 9 changed files with 113 additions and 192 deletions.
72 changes: 22 additions & 50 deletions lib/nsq/conn_info.ex
Original file line number Diff line number Diff line change
Expand Up @@ -4,142 +4,115 @@ defmodule NSQ.ConnInfo do
connection ID.
"""
def conn_id(parent, {host, port} = _nsqd) do
"parent:#{inspect parent}:conn:#{host}:#{port}"
"parent:#{inspect(parent)}:conn:#{host}:#{port}"
end


@doc """
Given a `conn` object return by `Consumer.get_connections`, return the
Given a `conn` object return by `Consumer.get_connections` or connection state object, return the
connection ID.
"""
def conn_id({conn_id, conn_pid} = _conn) when is_pid(conn_pid) do
conn_id
end


@doc """
Given a connection state object, return the connection ID.
"""
def conn_id(%{parent: parent, nsqd: {host, port}} = _conn_state) do
conn_id(parent, {host, port})
end


@doc """
Get info for all connections in a map like `%{conn_id: %{... data ...}}`.
"""
def all(agent_pid) when is_pid(agent_pid) do
Agent.get(agent_pid, fn(data) -> data end)
Agent.get(agent_pid, fn data -> data end)
end


@doc """
`func` is passed `conn_info` for each connection.
"""
def reduce(agent_pid, start_acc, func) do
Agent.get agent_pid, fn(all_conn_info) ->
Agent.get(agent_pid, fn all_conn_info ->
Enum.reduce(all_conn_info, start_acc, func)
end
end)
end


@doc """
Get info for the connection matching `conn_id`.
"""
def fetch(agent_pid, conn_id) when is_pid(agent_pid) do
Agent.get(agent_pid, fn(data) -> data[conn_id] || %{} end)
Agent.get(agent_pid, fn data -> data[conn_id] || %{} end)
end


@doc false
def fetch(%{conn_info_pid: agent_pid}, conn_id) do
fetch(agent_pid, conn_id)
end


@doc """
Get specific data for the connection, e.g.:
[rdy_count, last_rdy] = fetch(pid, "conn_id", [:rdy_count, :last_rdy])
rdy_count = fetch(pid, "conn_id", :rdy_count)
"""
def fetch(agent_pid, conn_id, keys) when is_pid(agent_pid) do
Agent.get agent_pid, fn(data) ->
Agent.get(agent_pid, fn data ->
conn_map = data[conn_id] || %{}

if is_list(keys) do
Enum.map keys, &Map.get(conn_map, &1)
Enum.map(keys, &Map.get(conn_map, &1))
else
Map.get(conn_map, keys)
end
end
end)
end


@doc false
def fetch(%{conn_info_pid: agent_pid} = _state, {conn_id, _conn_pid} = _conn, keys) do
fetch(agent_pid, conn_id, keys)
end


@doc false
def fetch(%{conn_info_pid: agent_pid} = _state, conn_id, keys) do
fetch(agent_pid, conn_id, keys)
end


@doc """
Update the info for a specific connection matching `conn_id`. `conn_info` is
passed to `func`, and the result of `func` is saved as the new `conn_info`.
Update the info for a specific connection matching `conn_id`.
If a function is supplied `conn_info`is passed to `func`, and the result of `func` is saved as the new `conn_info`.
if a map is supplied, the map is merged into the existing conn_info.
"""
def update(agent_pid, conn_id, func) when is_pid(agent_pid) and is_function(func) do
Agent.update agent_pid, fn(data) ->
Agent.update(agent_pid, fn data ->
Map.put(data, conn_id, func.(data[conn_id] || %{}))
end
end)
end


@doc """
Update the info for a specific connection matching `conn_id`. The map is
merged into the existing conn_info.
"""
def update(agent_pid, conn_id, map) when is_pid(agent_pid) and is_map(map) do
Agent.update agent_pid, fn(data) ->
Agent.update(agent_pid, fn data ->
new_conn_data = Map.merge(data[conn_id] || %{}, map)
Map.put(data, conn_id, new_conn_data)
end
end)
end


@doc false
def update(%{conn_info_pid: agent_pid}, conn_id, func) do
update(agent_pid, conn_id, func)
end


@doc false
def update(%{conn_info_pid: agent_pid, parent: parent, nsqd: nsqd}, func) do
update(agent_pid, conn_id(parent, nsqd), func)
end


@doc """
Delete connection info matching `conn_id`. This should be called when a
connection is terminated.
"""
def delete(agent_pid, conn_id) when is_pid(agent_pid) do
Agent.update(agent_pid, fn(data) -> Map.delete(data, conn_id) end)
Agent.update(agent_pid, fn data -> Map.delete(data, conn_id) end)
end


@doc false
def delete(%{conn_info_pid: agent_pid}, conn_id) do
delete(agent_pid, conn_id)
end


@spec init(map) :: any
def init(state) do
update state, %{
update(state, %{
max_rdy: state.max_rdy,
rdy_count: 0,
last_rdy: 0,
Expand All @@ -149,14 +122,13 @@ defmodule NSQ.ConnInfo do
finished_count: 0,
requeued_count: 0,
failed_count: 0,
backoff_count: 0,
}
backoff_count: 0
})
end


@spec now :: integer
defp now do
{megasec, sec, microsec} = :os.timestamp
{megasec, sec, microsec} = :os.timestamp()
1_000_000 * megasec + sec + microsec / 1_000_000
end
end
13 changes: 7 additions & 6 deletions lib/nsq/connection/supervisor.ex
Original file line number Diff line number Diff line change
Expand Up @@ -11,18 +11,17 @@ defmodule NSQ.Connection.Supervisor do
use Supervisor
alias NSQ.ConnInfo, as: ConnInfo


# ------------------------------------------------------- #
# Behaviour Implementation #
# ------------------------------------------------------- #
def start_link(opts \\ []) do
Supervisor.start_link(__MODULE__, :ok, opts)
end


def start_child(parent, nsqd, parent_state \\ nil, opts \\ []) do
parent_state = parent_state || GenServer.call(parent, :state)
conn_sup_pid = parent_state.conn_sup_pid

args = [
parent,
nsqd,
Expand All @@ -32,18 +31,20 @@ defmodule NSQ.Connection.Supervisor do
parent_state.conn_info_pid,
parent_state.event_manager_pid
]

conn_id = ConnInfo.conn_id(parent, nsqd)

# When using nsqlookupd, we expect connections will be naturally
# rediscovered if they fail.
opts = [restart: :temporary, id: conn_id] ++ opts
config =
[id: conn_id, start: {NSQ.Connection, :start_link, args}, restart: :temporary] ++ opts

child = Map.new(config)

child = worker(NSQ.Connection, args, opts)
Supervisor.start_child(conn_sup_pid, child)
end


def init(:ok) do
supervise([], strategy: :one_for_one)
Supervisor.init([], strategy: :one_for_one)
end
end

0 comments on commit 1617c6a

Please sign in to comment.