Skip to content
This repository
Browse code

Serialize registration and deregistration through a gen_server process.

  • Loading branch information...
commit d5f707f42f0f3b71c763981915ef05d6bfb3d37d 1 parent feb15a8
Sean Cribbs authored August 20, 2012
1  rebar.config
... ...
@@ -1,5 +1,6 @@
1 1
 {cover_enabled, true}.
2 2
 {erl_opts, [warnings_as_errors, {parse_transform, lager_transform}]}.
  3
+{eunit_opts, [verbose]}.
3 4
 {deps, [
4 5
         {lager, ".*", {git, "git://github.com/basho/lager.git", "master"}},
5 6
         {riak_pb, ".*", {git, "git://github.com/basho/riak_pb.git", "master"}},
13  src/riak_api_app.erl
@@ -43,20 +43,9 @@
43 43
 start(_Type, _StartArgs) ->
44 44
     riak_core_util:start_app_deps(riak_api),
45 45
 
46  
-    %% TODO: cluster_info registration. What do we expose?
47  
-    %% catch cluster_info:register_app(riak_api_cinfo),
48  
-
49  
-    ok = riak_api_pb_service:register(?SERVICES),
50  
-
51 46
     case riak_api_sup:start_link() of
52 47
         {ok, Pid} ->
53  
-            %% TODO: Is it necessary to register the service? We might
54  
-            %% want to use the registration to cause service_up events
55  
-            %% and then propagate config information for client
56  
-            %% auto-config.
57  
-            %% riak_core:register(riak_api, []),
58  
-            %% register stats
59  
-            riak_core:register(riak_api, [{stat_mod, riak_api_stat}]),
  48
+            ok = riak_api_pb_service:register(?SERVICES),
60 49
             {ok, Pid};
61 50
         {error, Reason} ->
62 51
             {error, Reason}
246  src/riak_api_pb_registrar.erl
... ...
@@ -0,0 +1,246 @@
  1
+%% -------------------------------------------------------------------
  2
+%%
  3
+%% riak_api_pb_registrar: Riak Client APIs Protocol Buffers Service Registration
  4
+%%
  5
+%% Copyright (c) 2007-2010 Basho Technologies, Inc.  All Rights Reserved.
  6
+%%
  7
+%% This file is provided to you under the Apache License,
  8
+%% Version 2.0 (the "License"); you may not use this file
  9
+%% except in compliance with the License.  You may obtain
  10
+%% a copy of the License at
  11
+%%
  12
+%%   http://www.apache.org/licenses/LICENSE-2.0
  13
+%%
  14
+%% Unless required by applicable law or agreed to in writing,
  15
+%% software distributed under the License is distributed on an
  16
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
  17
+%% KIND, either express or implied.  See the License for the
  18
+%% specific language governing permissions and limitations
  19
+%% under the License.
  20
+%%
  21
+%% -------------------------------------------------------------------
  22
+
  23
+%% @doc Encapsulates the Protocol Buffers service registration and
  24
+%% deregistration as a gen_server process. This is used to serialize
  25
+%% write access to the registration table so that it is less prone to
  26
+%% race-conditions.
  27
+
  28
+-module(riak_api_pb_registrar).
  29
+
  30
+-behaviour(gen_server).
  31
+
  32
+-ifdef(TEST).
  33
+-include_lib("eunit/include/eunit.hrl").
  34
+-compile([export_all]).
  35
+-endif.
  36
+
  37
+-define(SERVER, ?MODULE).
  38
+
  39
+%% External exports
  40
+-export([
  41
+         start_link/0,
  42
+         register/1,
  43
+         deregister/1
  44
+        ]).
  45
+
  46
+%% gen_server callbacks
  47
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
  48
+
  49
+-import(riak_api_pb_service, [services/0, dispatch_table/0]).
  50
+
  51
+%%--------------------------------------------------------------------
  52
+%%% Public API
  53
+%%--------------------------------------------------------------------
  54
+-spec start_link() -> {ok, pid()}.
  55
+start_link() ->
  56
+    gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
  57
+
  58
+-spec register([riak_api_pb_service:registration()]) -> ok | {error, Reason::term()}.
  59
+register(Registrations) ->
  60
+    gen_server:call(?SERVER, {register, Registrations}, infinity).
  61
+
  62
+-spec deregister([riak_api_pb_service:registration()]) -> ok | {error, Reason::term()}.
  63
+deregister(Registrations) ->
  64
+    gen_server:call(?SERVER, {deregister, Registrations}, infinity).
  65
+
  66
+%%--------------------------------------------------------------------
  67
+%%% gen_server callbacks
  68
+%%--------------------------------------------------------------------
  69
+init([]) ->
  70
+    {ok, undefined}.
  71
+
  72
+handle_call({register, Registrations}, _From, State) ->
  73
+    Reply = do_register(Registrations),
  74
+    {reply, Reply, State};
  75
+handle_call({deregister, Registrations}, _From, State) ->
  76
+    Reply = do_deregister(Registrations),
  77
+    {reply, Reply, State}.
  78
+
  79
+handle_cast(_Msg, State) ->
  80
+    {noreply, State}.
  81
+
  82
+handle_info(_Info, State) ->
  83
+    {noreply, State}.
  84
+
  85
+terminate(_Reason, _State) ->
  86
+    ok.
  87
+
  88
+code_change(_OldVsn, State, _Extra) ->
  89
+    {ok, State}.
  90
+
  91
+%%--------------------------------------------------------------------
  92
+%%% Internal functions
  93
+%%--------------------------------------------------------------------
  94
+
  95
+do_register([]) ->
  96
+    ok;
  97
+do_register([{Module, MinCode, MaxCode}|Rest]) ->
  98
+    case do_register(Module, MinCode, MaxCode) of
  99
+        ok ->
  100
+            do_register(Rest);
  101
+        Error ->
  102
+            Error
  103
+    end.
  104
+
  105
+do_register(_Module, MinCode, MaxCode) when MinCode > MaxCode orelse
  106
+MinCode < 1 orelse
  107
+MaxCode < 1 ->
  108
+    {error, invalid_message_code_range};
  109
+do_register(Module, MinCode, MaxCode) ->
  110
+    Registrations = dispatch_table(),
  111
+    IsRegistered = fun(I) -> dict:is_key(I, Registrations) end,
  112
+    CodeRange = lists:seq(MinCode, MaxCode),
  113
+    case lists:filter(IsRegistered, CodeRange) of
  114
+        [] ->
  115
+            NewRegs = lists:foldl(fun(I, D) ->
  116
+                                          dict:store(I, Module, D)
  117
+                                  end, Registrations, CodeRange),
  118
+            application:set_env(riak_api, services, NewRegs),
  119
+            riak_api_pb_sup:service_registered(Module),
  120
+            ok;
  121
+        AlreadyClaimed ->
  122
+            {error, {already_registered, AlreadyClaimed}}
  123
+    end.
  124
+
  125
+
  126
+do_deregister([]) ->
  127
+    ok;
  128
+do_deregister([{Module, MinCode, MaxCode}|Rest]) ->
  129
+    case do_deregister(Module, MinCode, MaxCode) of
  130
+        ok ->
  131
+            do_deregister(Rest);
  132
+        Other ->
  133
+            Other
  134
+    end.
  135
+
  136
+do_deregister(_Module, MinCode, MaxCode) when MinCode > MaxCode orelse
  137
+MinCode < 1 orelse
  138
+MaxCode < 1 ->
  139
+    {error, invalid_message_code_range};
  140
+do_deregister(Module, MinCode, MaxCode) ->
  141
+    Registrations = dispatch_table(),
  142
+    CodeRange = lists:seq(MinCode, MaxCode),
  143
+    %% Figure out whether all of the codes can be deregistered.
  144
+    Mapper = fun(I) ->
  145
+                     case dict:find(I, Registrations) of
  146
+                         error ->
  147
+                             {error, {unregistered, I}};
  148
+                         {ok, Module} ->
  149
+                             I;
  150
+                         {ok, _OtherModule} ->
  151
+                             {error, {not_owned, I}}
  152
+                     end
  153
+             end,
  154
+    ToRemove = [ Mapper(I) || I <- CodeRange ],
  155
+    case ToRemove of
  156
+        CodeRange ->
  157
+            %% All codes are valid, so remove them, set the env and
  158
+            %% notify active server processes.
  159
+            NewRegs = lists:foldl(fun dict:erase/2, Registrations, CodeRange),
  160
+            application:set_env(riak_api, services, NewRegs),
  161
+            riak_api_pb_sup:service_registered(Module),
  162
+            ok;
  163
+        _ ->
  164
+            %% There was at least one error, return it.
  165
+            lists:keyfind(error, 1, ToRemove)
  166
+    end.
  167
+
  168
+
  169
+
  170
+-ifdef(TEST).
  171
+
  172
+test_start() ->
  173
+    gen_server:start({local, ?SERVER}, ?MODULE, [], []).
  174
+
  175
+setup() ->
  176
+    OldServices = app_helper:get_env(riak_api, services, dict:new()),
  177
+    application:set_env(riak_api, services, dict:new()),
  178
+    {ok, Pid} = test_start(),
  179
+    {Pid, OldServices}.
  180
+
  181
+cleanup({Pid, Services}) ->
  182
+    exit(Pid, kill),
  183
+    application:set_env(riak_api, services, Services).
  184
+
  185
+deregister_test_() ->
  186
+    {foreach,
  187
+     fun setup/0,
  188
+     fun cleanup/1,
  189
+     [
  190
+      %% Deregister a previously registered service
  191
+      ?_assertEqual(ok, begin
  192
+                            ok = riak_api_pb_service:register(foo, 1, 2),
  193
+                            riak_api_pb_service:deregister(foo, 1, 2)
  194
+                        end),
  195
+      %% Invalid deregistration: range is invalid
  196
+      ?_assertEqual({error, invalid_message_code_range}, riak_api_pb_service:deregister(foo, 2, 1)),
  197
+      %% Invalid deregistration: unregistered range
  198
+      ?_assertEqual({error, {unregistered, 1}}, riak_api_pb_service:deregister(foo, 1, 1)),
  199
+      %% Invalid deregistration: registered to other service
  200
+      ?_assertEqual({error, {not_owned, 1}}, begin
  201
+                                                 ok = riak_api_pb_service:register(foo, 1, 2),
  202
+                                                 riak_api_pb_service:deregister(bar, 1)
  203
+                                             end),
  204
+      %% Deregister multiple
  205
+      ?_assertEqual(ok, begin
  206
+                            ok = register([{foo, 1, 2}, {bar, 3, 4}]),
  207
+                            deregister([{bar, 3, 4}, {foo, 1, 2}])
  208
+                        end)
  209
+     ]}.
  210
+
  211
+register_test_() ->
  212
+    {foreach,
  213
+     fun setup/0,
  214
+     fun cleanup/1,
  215
+     [
  216
+      %% Valid registration range
  217
+      ?_assertEqual(foo, begin
  218
+                             ok = riak_api_pb_service:register(foo,1,2),
  219
+                             dict:fetch(1, dispatch_table())
  220
+                         end),
  221
+      %% Registration ranges that are invalid
  222
+      ?_assertEqual({error, invalid_message_code_range},
  223
+                    riak_api_pb_service:register(foo, 2, 1)),
  224
+      ?_assertEqual({error, {already_registered, [1, 2]}},
  225
+                    begin
  226
+                        ok = riak_api_pb_service:register(foo, 1, 2),
  227
+                        riak_api_pb_service:register(bar, 1, 3)
  228
+                    end),
  229
+      %% Register multiple
  230
+      ?_assertEqual(ok, register([{foo, 1, 2}, {bar, 3, 4}]))
  231
+     ]}.
  232
+
  233
+services_test_() ->
  234
+    {setup,
  235
+     fun setup/0,
  236
+     fun cleanup/1,
  237
+     [
  238
+      ?_assertEqual([], services()),
  239
+      ?_assertEqual([bar, foo], begin
  240
+                                    riak_api_pb_service:register(foo, 1, 2),
  241
+                                    riak_api_pb_service:register(bar, 3, 4),
  242
+                                    services()
  243
+                                end)
  244
+     ]}.
  245
+
  246
+-endif.
147  src/riak_api_pb_service.erl
@@ -31,11 +31,7 @@
31 31
 %% @end
32 32
 
33 33
 -module(riak_api_pb_service).
34  
-
35  
--ifdef(TEST).
36  
--include_lib("eunit/include/eunit.hrl").
37  
--compile([export_all, {no_auto_import, [register/2]}]).
38  
--endif.
  34
+-compile([{no_auto_import, [register/2]}]).
39 35
 
40 36
 %% Behaviour API
41 37
 -export([behaviour_info/1]).
@@ -52,6 +48,10 @@
52 48
 -export([dispatch_table/0,
53 49
          services/0]).
54 50
 
  51
+-type registration() :: {Service::module(), MinCode::pos_integer(), MaxCode::pos_integer()}.
  52
+
  53
+-export_type([registration/0]).
  54
+
55 55
 %% @doc Behaviour information callback. PB API services must implement
56 56
 %% the given functions.
57 57
 behaviour_info(callbacks) ->
@@ -65,24 +65,18 @@ behaviour_info(_) ->
65 65
 
66 66
 %% @doc Registers a number of services at once.
67 67
 %% @see register/3
68  
--type registration() :: {Service::module(), MinCode::pos_integer(), MaxCode::pos_integer()}.
69 68
 -spec register([registration()]) -> ok | {error, Reason::term()}.
70 69
 register([]) ->
71 70
     ok;
72  
-register([{Module, MinCode, MaxCode}|Rest]) ->
73  
-    case register(Module, MinCode, MaxCode) of
74  
-        ok ->
75  
-            register(Rest);
76  
-        Other ->
77  
-            Other
78  
-    end.
  71
+register(List) ->
  72
+    riak_api_pb_registrar:register(List).
79 73
 
80 74
 %% @doc Registers a service module for a given message code.
81 75
 %% @equiv register(Module, Code, Code)
82 76
 %% @see register/3
83 77
 -spec register(Module::module(), Code::pos_integer()) -> ok | {error, Err::term()}.
84 78
 register(Module, Code) ->
85  
-    register(Module, Code, Code).
  79
+    register([{Module, Code, Code}]).
86 80
 
87 81
 %% @doc Registers a service module for a given range of message
88 82
 %% codes. The service module must implement the behaviour and be able
@@ -90,25 +84,8 @@ register(Module, Code) ->
90 84
 %% codes.  Service modules should be registered before the riak_api
91 85
 %% application starts.
92 86
 -spec register(Module::module(), MinCode::pos_integer(), MaxCode::pos_integer()) -> ok | {error, Err::term()}.
93  
-register(_Module, MinCode, MaxCode) when MinCode > MaxCode orelse
94  
-                                         MinCode < 1 orelse
95  
-                                         MaxCode < 1 ->
96  
-    {error, invalid_message_code_range};
97 87
 register(Module, MinCode, MaxCode) ->
98  
-    Registrations = dispatch_table(),
99  
-    IsRegistered = fun(I) -> dict:is_key(I, Registrations) end,
100  
-    CodeRange = lists:seq(MinCode, MaxCode),
101  
-    case lists:filter(IsRegistered, CodeRange) of
102  
-        [] ->
103  
-            NewRegs = lists:foldl(fun(I, D) ->
104  
-                                          dict:store(I, Module, D)
105  
-                                  end, Registrations, CodeRange),
106  
-            application:set_env(riak_api, services, NewRegs),
107  
-            riak_api_pb_sup:service_registered(Module),
108  
-            ok;
109  
-        AlreadyClaimed ->
110  
-            {error, {already_registered, AlreadyClaimed}}
111  
-    end.
  88
+    register([{Module, MinCode, MaxCode}]).
112 89
 
113 90
 %% @doc Removes the registration of a number of services modules at
114 91
 %% once.
@@ -116,47 +93,22 @@ register(Module, MinCode, MaxCode) ->
116 93
 -spec deregister([registration()]) -> ok | {error, Reason::term()}.
117 94
 deregister([]) ->
118 95
     ok;
119  
-deregister([{Module, MinCode, MaxCode}|Rest]) ->
120  
-    case deregister(Module, MinCode, MaxCode) of
121  
-        ok ->
122  
-            deregister(Rest);
123  
-        Other ->
124  
-            Other
125  
-    end.
  96
+deregister(List) ->
  97
+    riak_api_pb_registrar:deregister(List).
126 98
 
127 99
 %% @doc Removes the registration of a previously-registered service
128 100
 %% module. Inputs will be validated such that the registered module
129 101
 %% must match the one being removed.
130 102
 -spec deregister(Module::module(), Code::pos_integer()) -> ok | {error, Err::term()}.
131 103
 deregister(Module, Code) ->
132  
-    Registrations = dispatch_table(),
133  
-    case dict:find(Code, Registrations) of
134  
-        error ->
135  
-            {error, {unregistered, Code}};
136  
-        {ok, Module} ->
137  
-            NewRegs = dict:erase(Code, Registrations),
138  
-            application:set_env(riak_api, services, NewRegs),
139  
-            riak_api_pb_sup:service_registered(Module),
140  
-            ok;
141  
-        {ok, _OtherModule} ->
142  
-            {error, {not_owned, Code}}
143  
-    end.
  104
+    deregister([{Module, Code, Code}]).
144 105
 
145 106
 %% @doc Removes the registration of a previously-registered service
146 107
 %% module.
147 108
 %% @see deregister/2
148 109
 -spec deregister(Module::module(), MinCode::pos_integer(), MaxCode::pos_integer()) -> ok | {error, Err::term()}.
149  
-deregister(_Module, MinCode, MaxCode) when MaxCode < MinCode ->
150  
-    {error, invalid_message_code_range};
151  
-deregister(Module, Code, Code) ->
152  
-    deregister(Module, Code);
153 110
 deregister(Module, MinCode, MaxCode) ->
154  
-    case deregister(Module, MinCode) of
155  
-        ok ->
156  
-            deregister(Module, MinCode + 1, MaxCode);
157  
-        Error ->
158  
-            Error
159  
-    end.
  111
+    deregister([{Module, MinCode, MaxCode}]).
160 112
 
161 113
 %% @doc Returns the current mappings from message codes to service
162 114
 %% modules. This is called by riak_api_pb_socket on startup so that
@@ -170,76 +122,3 @@ dispatch_table() ->
170 122
 -spec services() -> [ module() ].
171 123
 services() ->
172 124
     lists:usort([ V || {_K,V} <- dict:to_list(dispatch_table()) ]).
173  
-
174  
--ifdef(TEST).
175  
-
176  
-setup() ->
177  
-    OldServices = app_helper:get_env(riak_api, services, dict:new()),
178  
-    application:set_env(riak_api, services, dict:new()),
179  
-    OldServices.
180  
-
181  
-cleanup(Services) ->
182  
-    application:set_env(riak_api, services, Services).
183  
-
184  
-deregister_test_() ->
185  
-    {foreach,
186  
-     fun setup/0,
187  
-     fun cleanup/1,
188  
-     [
189  
-      %% Deregister a previously registered service
190  
-      ?_assertEqual(ok, begin
191  
-                            ok = register(foo, 1, 2),
192  
-                            deregister(foo, 1, 2)
193  
-                        end),
194  
-      %% Invalid deregistration: range is invalid
195  
-      ?_assertEqual({error, invalid_message_code_range}, deregister(foo, 2, 1)),
196  
-      %% Invalid deregistration: unregistered range
197  
-      ?_assertEqual({error, {unregistered, 1}}, deregister(foo, 1, 1)),
198  
-      %% Invalid deregistration: registered to other service
199  
-      ?_assertEqual({error, {not_owned, 1}}, begin
200  
-                                                 ok = register(foo, 1, 2),
201  
-                                                 deregister(bar, 1)
202  
-                                             end),
203  
-      %% Deregister multiple
204  
-      ?_assertEqual(ok, begin
205  
-                            ok = register([{foo, 1, 2}, {bar, 3, 4}]),
206  
-                            deregister([{bar, 3, 4}, {foo, 1, 2}])
207  
-                        end)
208  
-     ]}.
209  
-
210  
-register_test_() ->
211  
-    {foreach,
212  
-     fun setup/0,
213  
-     fun cleanup/1,
214  
-     [
215  
-      %% Valid registration range
216  
-      ?_assertEqual(foo, begin
217  
-                             ok = register(foo,1,2),
218  
-                             dict:fetch(1, dispatch_table())
219  
-                         end),
220  
-      %% Registration ranges that are invalid
221  
-      ?_assertEqual({error, invalid_message_code_range},
222  
-                    register(foo, 2, 1)),
223  
-      ?_assertEqual({error, {already_registered, [1, 2]}},
224  
-                    begin
225  
-                        ok = register(foo, 1, 2),
226  
-                        register(bar, 1, 3)
227  
-                    end),
228  
-      %% Register multiple
229  
-      ?_assertEqual(ok, register([{foo, 1, 2}, {bar, 3, 4}]))
230  
-     ]}.
231  
-
232  
-services_test_() ->
233  
-    {setup,
234  
-     fun setup/0,
235  
-     fun cleanup/1,
236  
-     [
237  
-      ?_assertEqual([], services()),
238  
-      ?_assertEqual([bar, foo], begin
239  
-                                    register(foo, 1, 2),
240  
-                                    register(bar, 3, 4),
241  
-                                    services()
242  
-                                end)
243  
-     ]}.
244  
-
245  
--endif.
15  src/riak_api_sup.erl
@@ -47,10 +47,11 @@ init([]) ->
47 47
     Port = riak_api_pb_listener:get_port(),
48 48
     IP = riak_api_pb_listener:get_ip(),
49 49
     IsPbConfigured = (Port /= undefined) andalso (IP /= undefined),
50  
-    Processes = if IsPbConfigured ->
51  
-                        [?CHILD(riak_api_pb_sup, supervisor),
52  
-                         ?CHILD(riak_api_pb_listener, worker, [IP, Port])];
53  
-                   true ->
54  
-                        []
55  
-                end,
56  
-    {ok, {{one_for_one, 10, 10}, Processes}}.
  50
+    Registrar = ?CHILD(riak_api_pb_registrar, worker),
  51
+    NetworkProcesses = if IsPbConfigured ->
  52
+                               [?CHILD(riak_api_pb_sup, supervisor),
  53
+                                ?CHILD(riak_api_pb_listener, worker, [IP, Port])];
  54
+                          true ->
  55
+                               []
  56
+                       end,
  57
+    {ok, {{one_for_one, 10, 10}, [Registrar|NetworkProcesses]}}.
2  test/pb_service_test.erl
@@ -85,10 +85,10 @@ setup() ->
85 85
     application:set_env(riak_api, services, dict:new()),
86 86
     application:set_env(riak_api, pb_ip, "127.0.0.1"),
87 87
     application:set_env(riak_api, pb_port, 32767),
88  
-    riak_api_pb_service:register(?MODULE, ?MSGMIN, ?MSGMAX),
89 88
 
90 89
     [ application:start(A) || A <- Deps ],
91 90
     wait_for_port(),
  91
+    riak_api_pb_service:register(?MODULE, ?MSGMIN, ?MSGMAX),
92 92
     {OldServices, OldHost, OldPort, Deps}.
93 93
 
94 94
 cleanup({S, H, P, Deps}) ->

0 notes on commit d5f707f

Please sign in to comment.
Something went wrong with that request. Please try again.