Skip to content
This repository has been archived by the owner. It is now read-only.
Permalink
Browse files
Initial implementation
There are a few places we could be more efficient in managing the ets
table for event listeners but this simple approach should be able to
give us a rough idea on how fast we can pump messages through this sort
of design.
  • Loading branch information
davisp authored and rnewson committed Jul 30, 2014
1 parent 73b38e8 commit cc616a77717730ac4dddc2940aff51f584385e6f
Showing 9 changed files with 527 additions and 0 deletions.
@@ -0,0 +1,2 @@
deps/
ebin/
@@ -0,0 +1,22 @@
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.

{application, couch_event, [
{description, "Event notification system for Apache CouchDB"},
{vsn, git},
{registered, [
couch_event_sup,
couch_event_server
]},
{applications, [kernel, stdlib, couch_log, config]},
{mod, {couch_event_app, []}}
]}.
@@ -0,0 +1,45 @@
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.

-module(couch_event).

-export([
register/2,
register_all/1,
unregister/2,
unregister_all/1,
notify/2
]).


-define(REGISTRY, couch_event_registry).
-define(DIST, couch_event_dist).


register(Pid, DbName) ->
gen_server:call(?REGISTRY, {register, Pid, DbName}).


register_all(Pid) ->
gen_server:call(?REGISTRY, {register, Pid, all_dbs}).


unregister(Pid, DbName) ->
gen_server:call(?REGISTRY, {unregister, Pid, DbName}).


unregister_all(Pid) ->
gen_server:call(?REGISTRY, {unregister, Pid}).


notify(DbName, Event) ->
gen_server:cast(?DIST, {DbName, Event}).
@@ -0,0 +1,27 @@
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.

-module(couch_event_app).
-behavior(application).

-export([
start/2,
stop/1
]).


start(_StartType, _StartArgs) ->
couch_event_sup2:start_link().


stop(_State) ->
ok.
@@ -0,0 +1,83 @@
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.

-module(couch_event_dist).
-behavior(gen_server).


-export([
start_link/0
]).

-export([
init/1,
terminate/2,
handle_call/3,
handle_cast/2,
handle_info/2,
code_change/3
]).


-include("couch_event_int.hrl").


-record(st, {
batch_size
}).


start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, nil, []).


init(_) ->
{ok, #st{batch_size=25}}.


terminate(_Reason, _St) ->
ok.


handle_call(Msg, From, St) ->
couch_log:notice("~s ignoring call ~w from ~w", [?MODULE, Msg, From]),
{reply, ignored, St}.


handle_cast({DbName, Event}, #st{batch_size=BS}=St) when is_binary(DbName) ->
P1 = #client{dbname=DbName, _='_'},
notify_clients(ets:select(?REGISTRY_TABLE, P1, BS), DbName, Event),
P2 = #client{dbname=all_dbs, _='_'},
notify_clients(ets:select(?REGISTRY_TABLE, P2, BS), DbName, Event),
{noreply, St};

handle_cast(Msg, St) ->
couch_log:notice("~s ignoring cast ~w", [?MODULE, Msg]),
{noreply, St}.


handle_info(Msg, St) ->
couch_log:notice("~s ignoring info ~w", [?MODULE, Msg]),
{noreply, St}.


code_change(_OldVsn, St, _Extra) ->
{ok, St}.


notify_clients('$end_of_table', _DbName, _Event) ->
ok;
notify_clients({Clients, Cont}, DbName, Event) ->
lists:foreach(fun(#client{pid=Pid}) ->
Pid ! {'$couch_event', DbName, Event}
end, Clients),
notify_clients(ets:select(Cont), DbName, Event).
@@ -0,0 +1,19 @@
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.

-define(REGISTRY_TABLE, couch_event_registry).

-record(client, {
dbname,
pid,
ref
}).
@@ -0,0 +1,169 @@
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.

-module(couch_event_listener).
-behavior(gen_server).


-export([
start/3,
start/4,
start_link/3,
start_link/4
]).

-export([
behaviour_info/1
]).

-export([
init/1,
terminate/2,
handle_call/3,
handle_cast/2,
handle_info/2,
code_change/3
]).


behaviour_info(callbacks) ->
[
{init,1},
{terminate/2},
{handle_event/2}
];
behaviour_info(_) ->
undefined.


start(Mod, Args, Options) ->
gen_server:start(?MODULE, {Mod, Args}, Options).


start(Name, Mod, Args, Options) ->
gen_server:start(Name, ?MODULE, {Mod, Args}, Options).


start_link(Mod, Args, Options) ->
gen_server:start_link(?MODULE, {Mod, Args}, Options).


start_link(Name, Mod, Args, Options) ->
gen_server:start_link(Name, ?MODULE, {Mod, Args}, Options).


init({Mod, Args}) ->
case Mod:init(Args) of
{ok, St} ->
{ok, {Mod, St}};
{ok, St, Timeout} ->
{ok, {Mod, St}, Timeout};
{stop, Reason} ->
{stop, Reason};
ignore ->
ignore;
Else ->
erlang:error({bad_return, Else})
end.


terminate(Reason, {Mod, St}) ->
Mod:terminate(Reason, St).


handle_call(Msg, From, {Mod, St}) ->
case erlang:function_exported(Mod, handle_call, 3) of
true ->
case Mod:handle_call(Msg, From, St) of
{reply, Reply, NewState} ->
{reply, Reply, {Mod, NewState}};
{reply, Reply, NewState, Timeout} ->
{reply, Reply, {Mod, NewState}, Timeout};
{noreply, NewState} ->
{noreply, {Mod, NewState}};
{noreply, NewState, Timeout} ->
{noreply, {Mod, NewState}, Timeout};
{stop, Reason, Reply, NewState} ->
{stop, Reason, Reply, {Mod, NewState}};
{stop, Reason, NewState} ->
{stop, Reason, {Mod, NewState}};
Else ->
erlang:error({bad_return, Else})
end;
false ->
{stop, {invalid_call, Msg}, invalid_call, St}
end.


handle_cast(Msg, {Mod, St}) ->
case erlang:function_exported(Mod, handle_cast, 2) of
true ->
case Mod:handle_cast(Msg, St) of
{noreply, NewState} ->
{noreply, {Mod, NewState}};
{noreply, NewState, Timeout} ->
{noreply, {Mod, NewState}, Timeout};
{stop, Reason, NewState} ->
{stop, Reason, {Mod, NewState}};
Else ->
erlang:error({bad_return, Else})
end;
false ->
{stop, {invalid_cast, Msg}, St}
end.


handle_info({'$couch_event', DbName, Event}, {Mod, St}) ->
case Mod:handle_event(DbName, Event, St) of
{noreply, NewState} ->
{noreply, {Mod, NewState}};
{noreply, NewState, Timeout} ->
{noreply, {Mod, NewState}, Timeout};
{stop, Reason, NewState} ->
{stop, Reason, {Mod, NewState}};
Else ->
erlang:error({bad_return, Else})
end;

handle_info(Msg, {Mod, St}) ->
case erlang:function_export(Mod, handle_info, 2) of
true ->
case Mod:handle_info(Msg, St) of
{noreply, NewState} ->
{noreply, {Mod, NewState}};
{noreply, NewState, Timeout} ->
{noreply, {Mod, NewState}, Timeout};
{stop, Reason, NewState} ->
{stop, Reason, {Mod, NewState}};
Else ->
erlang:error({bad_return, Else})
end;
false ->
{stop, {invalid_info, Msg}, St}
end.


code_change(OldVsn, {Mod, St}, Extra) ->
case erlang:function_exported(Mod, code_change, 3) of
true ->
case Mod:code_change(OldVsn, St, Extra) of
{ok, NewState} ->
{ok, {Mod, NewState}};
{error, Reason} ->
{error, Reason};
Else ->
erlang:error({bad_return, Else})
end;
false ->
{ok, {Mod, St}}
end.

0 comments on commit cc616a7

Please sign in to comment.