Permalink
Browse files

Use `is_binary/1`, not `is_bitstring/1`, for string guards

  • Loading branch information...
slashdotdash committed Jan 10, 2019
1 parent 0fe7d60 commit b147134eff8321ab95584275e86713b21216887f
@@ -21,7 +21,7 @@ defmodule Commanded.Aggregates.Supervisor do
Returns `{:ok, aggregate_uuid}` when a process is sucessfully started, or is
already running.
"""
def open_aggregate(aggregate_module, aggregate_uuid) when is_bitstring(aggregate_uuid) do
def open_aggregate(aggregate_module, aggregate_uuid) when is_binary(aggregate_uuid) do
Logger.debug(fn ->
"Locating aggregate process for `#{inspect(aggregate_module)}` with UUID " <>
inspect(aggregate_uuid)
@@ -230,7 +230,7 @@ defmodule Commanded.Commands.Router do
prefix when is_function(prefix, 0) ->
prefix

prefix when is_bitstring(prefix) ->
prefix when is_binary(prefix) ->
prefix

invalid ->
@@ -308,7 +308,7 @@ defmodule Commanded.Event.Handler do
def parse_name(module, name) when name in [nil, ""],
do: raise("#{inspect(module)} expects `:name` to be given")

def parse_name(_module, name) when is_bitstring(name), do: name
def parse_name(_module, name) when is_binary(name), do: name
def parse_name(_module, name), do: inspect(name)

@doc false
@@ -9,9 +9,7 @@ defmodule Commanded.Subscriptions do
@subscriptions_topic "subscriptions"
@ack_topic "ack_event"

defstruct streams_table: nil,
started_at: nil,
subscribers: []
defstruct [:streams_table, :started_at, subscribers: []]

def start_link(arg) do
GenServer.start_link(__MODULE__, arg, name: __MODULE__)
@@ -100,7 +98,11 @@ defmodule Commanded.Subscriptions do
{:reply, reply, state}
end

def handle_call({:subscribe, stream_uuid, stream_version, opts, pid}, _from, %Subscriptions{} = state) do
def handle_call(
{:subscribe, stream_uuid, stream_version, opts, pid},
_from,
%Subscriptions{} = state
) do
{consistency, exclude} = parse_opts(opts)

%Subscriptions{subscribers: subscribers} = state
@@ -127,10 +129,10 @@ defmodule Commanded.Subscriptions do
{:reply, :ok, state}
end

def handle_call({:unsubscribe, pid}, _from, %Subscriptions{subscribers: subscribers} = state) do
state = %Subscriptions{state |
subscribers: remove_by_pid(subscribers, pid),
}
def handle_call({:unsubscribe, pid}, _from, %Subscriptions{} = state) do
%Subscriptions{subscribers: subscribers} = state

state = %Subscriptions{state | subscribers: remove_by_pid(subscribers, pid)}

{:reply, :ok, state}
end
@@ -144,9 +146,7 @@ defmodule Commanded.Subscriptions do

:ets.insert(streams_table, {{name, stream_uuid}, stream_version, inserted_at_epoch})

state = %Subscriptions{state |
subscribers: notify_subscribers(stream_uuid, state),
}
state = %Subscriptions{state | subscribers: notify_subscribers(stream_uuid, state)}

{:noreply, state}
end
@@ -162,25 +162,31 @@ defmodule Commanded.Subscriptions do
{:noreply, state}
end

def handle_info({:DOWN, _ref, :process, pid, _reason}, %Subscriptions{subscribers: subscribers} = state) do
state = %Subscriptions{state |
subscribers: remove_by_pid(subscribers, pid),
}
def handle_info({:DOWN, _ref, :process, pid, _reason}, %Subscriptions{} = state) do
%Subscriptions{subscribers: subscribers} = state

state = %Subscriptions{state | subscribers: remove_by_pid(subscribers, pid)}

{:noreply, state}
end

defp initial_state do
%Subscriptions{
streams_table: :ets.new(:streams, [:set, :private]),
started_at: monotonic_time(),
started_at: monotonic_time()
}
end

defp subscriptions, do: PubSub.list(@subscriptions_topic)

# Have all subscriptions handled the event for the given stream and version
defp handled_by_all?(stream_uuid, stream_version, consistency, exclude, %Subscriptions{} = state) do
defp handled_by_all?(
stream_uuid,
stream_version,
consistency,
exclude,
%Subscriptions{} = state
) do
subscriptions()
|> Enum.reject(fn {_name, pid} -> MapSet.member?(exclude, pid) end)
|> Enum.filter(fn {name, _pid} ->
@@ -203,15 +209,16 @@ defmodule Commanded.Subscriptions do

defp remove_by_pid(subscribers, pid) do
Enum.reduce(subscribers, subscribers, fn
({^pid, _, _, _, _} = subscriber, subscribers) -> subscribers -- [subscriber]
(_subscriber, subscribers) -> subscribers
{^pid, _, _, _, _} = subscriber, subscribers -> subscribers -- [subscriber]
_subscriber, subscribers -> subscribers
end)
end

# Notify any subscribers waiting on a given stream if it is at the expected version
defp notify_subscribers(stream_uuid, %Subscriptions{subscribers: subscribers} = state) do
Enum.reduce(subscribers, subscribers, fn
({pid, ^stream_uuid, expected_stream_version, consistency, exclude} = subscriber, subscribers) ->
{pid, ^stream_uuid, expected_stream_version, consistency, exclude} = subscriber,
subscribers ->
case handled_by_all?(stream_uuid, expected_stream_version, consistency, exclude, state) do
true ->
notify_subscriber(pid, stream_uuid, expected_stream_version)
@@ -223,7 +230,8 @@ defmodule Commanded.Subscriptions do
subscribers
end

(_subscriber, subscribers) -> subscribers
_subscriber, subscribers ->
subscribers
end)
end

@@ -237,8 +245,10 @@ defmodule Commanded.Subscriptions do
end

# Delete subscription ack's that are older than the configured ttl
defp purge_expired_streams(ttl, %Subscriptions{streams_table: streams_table, started_at: started_at}) do
stale_epoch = monotonic_time() - started_at - (ttl / 1_000)
defp purge_expired_streams(ttl, %Subscriptions{} = state) do
%Subscriptions{streams_table: streams_table, started_at: started_at} = state

stale_epoch = monotonic_time() - started_at - ttl / 1_000

streams_table
|> :ets.select([{{:"$1", :"$2", :"$3"}, [{:"=<", :"$3", stale_epoch}], [:"$1"]}])
@@ -253,9 +263,10 @@ defmodule Commanded.Subscriptions do
end

defp parse_consistency(consistency) when consistency in [:eventual, :strong], do: consistency

defp parse_consistency(consistency) when is_list(consistency) do
Enum.map(consistency, fn
name when is_bitstring(name) ->
name when is_binary(name) ->
name

module when is_atom(module) ->
@@ -266,10 +277,11 @@ defmodule Commanded.Subscriptions do
end

invalid ->
raise "Invalid consistency: #{inspect invalid}"
raise "Invalid consistency: #{inspect(invalid)}"
end)
end
defp parse_consistency(invalid), do: raise "Invalid consistency: #{inspect invalid}"

defp parse_consistency(invalid), do: raise("Invalid consistency: #{inspect(invalid)}")

defp parse_exclude(exclude), do: exclude |> List.wrap() |> MapSet.new()

@@ -278,10 +290,12 @@ defmodule Commanded.Subscriptions do
@default_ttl :timer.hours(1)
@default_consistency_timeout :timer.seconds(5)

# time to live period for ack'd events before they can be safely purged in milliseconds
defp default_ttl,
do: Application.get_env(:commanded, :subscriptions_ttl, @default_ttl)
# Time to live (ttl) period for ack'd events before they can be safely purged (in milliseconds).
defp default_ttl do
Application.get_env(:commanded, :subscriptions_ttl, @default_ttl)
end

defp default_consistency_timeout,
do: Application.get_env(:commanded, :dispatch_consistency_timeout, @default_consistency_timeout)
defp default_consistency_timeout do
Application.get_env(:commanded, :dispatch_consistency_timeout, @default_consistency_timeout)
end
end

0 comments on commit b147134

Please sign in to comment.