Permalink
Browse files

Merge branch '2.0'

  • Loading branch information...
2 parents ce34d49 + cc98fff commit 34d90ffb662961d4797f5694aaac40869c22a8f3 @jaredmorrow jaredmorrow committed Sep 4, 2014
View
1 .gitignore
@@ -5,3 +5,4 @@ priv/*
*.beam
doc
ebin
+.local_dialyzer_plt
View
19 Makefile
@@ -0,0 +1,19 @@
+.PHONY: deps
+
+all: compile
+
+deps:
+ @./rebar get-deps
+
+compile: deps
+ @./rebar compile
+
+clean:
+ @./rebar clean
+
+REPO = riak_api
+
+DIALYZER_APPS = kernel stdlib sasl erts ssl tools os_mon runtime_tools crypto inets \
+ xmerl webtool snmp public_key mnesia eunit syntax_tools compiler
+
+include tools.mk
View
4 README.md
@@ -17,9 +17,9 @@ We encourage contributions to `riak_api` from the community.
the repository.
```
- git clone git@github.com:yourusername/riak_kv.git
+ git clone git@github.com:yourusername/riak_api.git
# or
- git remote add mine git@github.com:yourusername/riak_kv.git
+ git remote add mine git@github.com:yourusername/riak_api.git
```
3. Create a topic branch for your change.
View
178 priv/riak_api.schema
@@ -0,0 +1,178 @@
+%%-*- mode: erlang -*-
+%% HTTP Listeners
+%% @doc listener.http.<name> is an IP address and TCP port that the Riak
+%% HTTP interface will bind.
+{mapping, "listener.http.$name", "riak_api.http", [
+ {default, {"{{web_ip}}",{{web_port}} }},
+ {datatype, ip},
+ {include_default, "internal"}
+]}.
+
+{translation,
+ "riak_api.http",
+ fun(Conf) ->
+ HTTP = cuttlefish_variable:filter_by_prefix("listener.http", Conf),
+ [ IP || {_, IP} <- HTTP]
+ end
+}.
+
+%% @doc listener.protobuf.<name> is an IP address and TCP port that the Riak
+%% Protocol Buffers interface will bind.
+{mapping, "listener.protobuf.$name", "riak_api.pb", [
+ {default, {"{{pb_ip}}",{{pb_port}} }},
+ {datatype, ip},
+ {include_default, "internal"}
+]}.
+
+{translation,
+ "riak_api.pb",
+ fun(Conf) ->
+ PB = cuttlefish_variable:filter_by_prefix("listener.protobuf", Conf),
+ [ IP || {_, IP} <- PB]
+ end
+}.
+
+%% @doc The maximum length to which the queue of pending connections
+%% may grow. If set, it must be an integer > 0. If you anticipate a
+%% huge number of connections being initialized *simultaneously*, set
+%% this number higher.
+{mapping, "protobuf.backlog", "riak_api.pb_backlog", [
+ {datatype, integer},
+ {default, 128},
+ {commented, 128},
+ {validators, ["greater_than_zero"]}
+]}.
+
+{validator,
+ "greater_than_zero",
+ "must be greater than zero",
+ fun(Value) -> Value > 0 end}.
+
+%% @doc Turns off Nagle's algorithm for Protocol Buffers
+%% connections. This is equivalent to setting the TCP_NODELAY option
+%% on the socket.
+{mapping, "protobuf.nagle", "riak_api.disable_pb_nagle", [
+ {datatype, {flag, off, on}},
+ {default, off},
+ hidden
+]}.
+
+%% @doc listener.https.<name> is an IP address and TCP port that the Riak
+%% HTTPS interface will bind.
+{mapping, "listener.https.$name", "riak_api.https", [
+ {commented, {"{{web_ip}}",{{web_port}} }},
+ {datatype, ip},
+ {include_default, "internal"}
+]}.
+
+{translation,
+ "riak_api.https",
+ fun(Conf) ->
+ HTTPS = cuttlefish_variable:filter_by_prefix("listener.https", Conf),
+ [ IP || {_, IP} <- HTTPS]
+ end
+}.
+
+%% @doc Whether to prefer the order in which the server lists its
+%% ciphers. When set to 'off', the client's preferred cipher order
+%% dictates which cipher is chosen.
+{mapping, "honor_cipher_order", "riak_api.honor_cipher_order", [
+ {datatype, {enum, [on, off]}},
+ {default, on},
+ hidden
+]}.
+
+{translation,
+ "riak_api.honor_cipher_order",
+ fun(Conf) ->
+ OTPVer = erlang:system_info(otp_release),
+ CipherOrder = cuttlefish:conf_get("honor_cipher_order", Conf),
+ %% This is only available, as of December 2013, in basho patched R16B02,
+ %% so disable it if the VM is not patched by basho. This can be revised
+ %% for R17, when this patch is expected to be present mainline.
+ %% The basho patched OTP can be found at:
+ %% https://github.com/basho/otp/tree/OTP_R16B02_basho3
+ case {CipherOrder, string:str(OTPVer, "basho")} of
+ {_, 0} -> false;
+ {on, _} -> true;
+ {off, _} -> false
+ end
+ end
+}.
+
+%% @doc Determine which SSL/TLS versions are allowed. By default only TLS 1.2
+%% is allowed, but other versions can be enabled if clients don't support the
+%% latest TLS standard. It is *strongly* recommended that SSLv3 is not enabled
+%% unless absolutely necessary. More than one protocol can be enabled at once.
+{mapping, "tls_protocols.sslv3", "riak_api.tls_protocols", [
+ {datatype, {enum, [on, off]}},
+ {default, off},
+ hidden
+]}.
+
+%% @see tls_protocols.sslv3
+{mapping, "tls_protocols.tlsv1", "riak_api.tls_protocols", [
+ {datatype, {enum, [on, off]}},
+ {default, off},
+ hidden
+]}.
+
+%% @see tls_protocols.sslv3
+{mapping, "tls_protocols.tlsv1.1", "riak_api.tls_protocols", [
+ {datatype, {enum, [on, off]}},
+ {default, off},
+ hidden
+]}.
+
+%% @see tls_protocols.sslv3
+{mapping, "tls_protocols.tlsv1.2", "riak_api.tls_protocols", [
+ {datatype, {enum, [on, off]}},
+ {default, on},
+ hidden
+]}.
+
+{translation,
+ "riak_api.tls_protocols",
+ fun(Conf) ->
+ Protocols = cuttlefish_variable:filter_by_prefix("tls_protocols", Conf),
+ [begin
+ case Key of
+ ["tls_protocols","sslv3"] ->
+ sslv3;
+ ["tls_protocols","tlsv1"] ->
+ tlsv1;
+ ["tls_protocols","tlsv1", "1"] ->
+ 'tlsv1.1';
+ ["tls_protocols","tlsv1", "2"] ->
+ 'tlsv1.2'
+ end
+ end || {Key, Value} <- Protocols, Value == on]
+ end
+}.
+
+%% @doc Whether to check the CRL of a client certificate. This defaults to
+%% on but some CAs may not maintain or define a CRL, so this can be disabled
+%% if no CRL is available.
+{mapping, "check_crl", "riak_api.check_crl", [
+ {datatype, {enum, [on, off]}},
+ {default, on},
+ hidden
+]}.
+
+{translation,
+ "riak_api.check_crl",
+ fun(Conf) ->
+ OTPVer = erlang:system_info(otp_release),
+ CheckCRL = cuttlefish:conf_get("check_crl", Conf),
+ %% CRL checking is broken in mainline OTP as of R16B02, so Riak will ship
+ %% with a patched SSL/public_key to fix it. This means that we don't want
+ %% to pass this option to a vanilla OTP.
+ %% The basho patched OTP can be found at:
+ %% https://github.com/basho/otp/tree/OTP_R16B02_basho3
+ case {CheckCRL, string:str(OTPVer, "basho")} of
+ {_, 0} -> false;
+ {on, _} -> true;
+ {off, _} -> false
+ end
+ end
+}.
View
BIN rebar
Binary file not shown.
View
7 rebar.config
@@ -2,6 +2,9 @@
{erl_opts, [warnings_as_errors, {parse_transform, lager_transform}]}.
{eunit_opts, [verbose]}.
{deps, [
- {riak_pb, "1.4.1.1", {git, "git://github.com/basho/riak_pb.git", {tag, "1.4.1.1"}}},
- {riak_core, ".*", {git, "git://github.com/basho/riak_core.git", {branch, "master"}}}
+ {riak_pb, "2.0.0.16", {git, "git://github.com/basho/riak_pb.git", {tag, "2.0.0.16"}}},
+ {webmachine, "1.10.5", {git, "git://github.com/basho/webmachine.git", {tag, "1.10.5"}}},
+ {riak_core, ".*", {git, "git://github.com/basho/riak_core.git", {tag, "2.0.0"}}}
]}.
+
+{xref_checks, [undefined_function_calls]}.
View
2 src/riak_api.app.src
@@ -3,7 +3,7 @@
{application, riak_api,
[
{description, "Riak Client APIs"},
- {vsn, "1.4.1"},
+ {vsn, git},
{applications, [
kernel,
stdlib,
View
3 src/riak_api_app.erl
@@ -32,7 +32,8 @@
%% Note: Riak core cannot register this itself,
%% because it is started before riak_api.
{riak_core_pb_bucket, 19, 22},
- {riak_core_pb_bucket, 29, 30}
+ {riak_core_pb_bucket, 29, 30},
+ {riak_core_pb_bucket_type, 31, 33}
]).
%% @doc The application:start callback.
View
6 src/riak_api_pb_listener.erl
@@ -44,12 +44,12 @@ init([PortNum]) ->
%% @doc Preferred socket options for the listener.
-spec sock_opts() -> [gen_tcp:option()].
sock_opts() ->
- BackLog = app_helper:get_env(riak_api, pb_backlog, 5),
- NoDelay = app_helper:get_env(riak_api, disable_pb_nagle, false),
+ BackLog = app_helper:get_env(riak_api, pb_backlog, 128),
+ NoDelay = app_helper:get_env(riak_api, disable_pb_nagle, true),
[binary, {packet, raw}, {reuseaddr, true}, {backlog, BackLog}, {nodelay, NoDelay}].
%% @doc The handle_call/3 gen_nb_server callback. Unused.
--spec handle_call(term(), pid(), #state{}) -> {reply, term(), #state{}}.
+-spec handle_call(term(), {pid(),_}, #state{}) -> {reply, term(), #state{}}.
handle_call(_Req, _From, State) ->
{reply, not_implemented, State}.
View
12 src/riak_api_pb_registrar.erl
@@ -87,7 +87,7 @@ deregister(Registrations) ->
%% @doc Atomically swap currently registered module with `NewModule'.
-spec swap(module(), pos_integer(), pos_integer()) -> ok | {error, Reason::term()}.
swap(NewModule, MinCode, MaxCode) ->
- gen_server:call(?SERVER, {swap, NewModule, MinCode, MaxCode}, infinity).
+ gen_server:call(?SERVER, {swap, {NewModule, MinCode, MaxCode}}, infinity).
%% @doc Sets the heir of the registrations table on behalf of the
%% helper process.
@@ -129,7 +129,7 @@ handle_call({register, Registrations}, _From, State) ->
handle_call({deregister, Registrations}, _From, State) ->
Reply = do_deregister(Registrations),
{reply, Reply, State};
-handle_call({swap, NewModule, MinCode, MaxCode}, _From, State) ->
+handle_call({swap, {NewModule, MinCode, MaxCode}}, _From, State) ->
Reply = do_swap(NewModule, MinCode, MaxCode),
{reply, Reply, State}.
@@ -226,7 +226,7 @@ do_deregister(Module, MinCode, MaxCode) ->
case ToRemove of
CodeRange ->
%% All codes are valid, so remove them.
- [ ets:delete(?ETS_NAME, Code) || Code <- CodeRange ],
+ _ = [ ets:delete(?ETS_NAME, Code) || Code <- CodeRange ],
riak_api_pb_sup:service_registered(Module),
ok;
_ ->
@@ -360,6 +360,7 @@ registration_inheritance_test_() ->
?_test(begin
Outer = self(),
%% Helper = whereis(riak_api_pb_registration_helper),
+ ?assertEqual(ok, riak_api_pb_service:register(bar, 1000, 1000)),
Registrar = whereis(?MODULE),
exit(Registrar, brutal_kill),
meck:new(riak_api_pb_registration_helper, [passthrough]),
@@ -377,11 +378,16 @@ registration_inheritance_test_() ->
spawn(fun() ->
Outer ! {101, riak_api_pb_service:register(foo, 101, 101)}
end),
+ spawn(fun() ->
+ Outer ! {1000, riak_api_pb_service:swap(foo, 1000, 1000)}
+ end),
?assertEqual(ok, receive {100, Msg} -> Msg after 1500 -> fail end),
?assertEqual(ok, receive {101, Msg} -> Msg after 1500 -> fail end),
+ ?assertEqual(ok, receive {1000, Msg} -> Msg after 1500 -> fail end),
meck:unload(),
exit(NewReg, brutal_kill)
end)
+
]}.
-endif.
View
2 src/riak_api_pb_registration_helper.erl
@@ -82,7 +82,7 @@ init([]) ->
undefined ->
%% Table does not exist, so we create the table and wait
%% for the registrar to claim it.
- ets:new(?ETS_NAME, ?ETS_OPTS),
+ ?ETS_NAME = ets:new(?ETS_NAME, ?ETS_OPTS),
{ok, undefined};
List when is_list(List) ->
%% This process must have been restarted, because the table
View
391 src/riak_api_pb_server.erl
@@ -30,17 +30,30 @@
-include_lib("eunit/include/eunit.hrl").
-endif.
--behaviour(gen_server).
+-include_lib("riak_pb/include/riak_pb.hrl").
+-include_lib("public_key/include/public_key.hrl").
--export([start_link/0, set_socket/2]).
+-behaviour(gen_fsm).
--export([init/1, handle_call/3, handle_cast/2, handle_info/2,
- terminate/2, code_change/3]).
+%% API
+-export([start_link/0, set_socket/2, service_registered/2]).
+
+%% States
+-export([wait_for_socket/2, wait_for_socket/3, wait_for_tls/2, wait_for_tls/3,
+ wait_for_auth/2, wait_for_auth/3, connected/2, connected/3]).
+
+-export([init/1, handle_event/3, handle_sync_event/4, handle_info/3,
+ terminate/3, code_change/4]).
-record(state, {
- socket :: port(), % socket
+ transport = {gen_tcp, inet} :: {gen_tcp, inet} | {ssl, ssl},
+ socket :: port() | ssl:sslsocket(), % socket
req, % current request
states :: orddict:orddict(), % per-service connection state
+ peername :: undefined | {inet:ip_address(), pos_integer()},
+ common_name :: undefined | string(),
+ security,
+ retries = 3,
inbuffer = <<>>, % when an incomplete message comes in, we have to unpack it ourselves
outbuffer = riak_api_pb_frame:new() :: riak_api_pb_frame:buffer() % frame buffer which we can use to optimize TCP sends
}).
@@ -55,79 +68,256 @@
%% @doc Starts a PB server, ready to service a single socket.
-spec start_link() -> {ok, pid()} | {error, term()}.
start_link() ->
- gen_server:start_link(?MODULE, [], []).
+ gen_fsm:start_link(?MODULE, [], []).
%% @doc Sets the socket to service for this server.
-spec set_socket(pid(), port()) -> ok.
set_socket(Pid, Socket) ->
- gen_server:call(Pid, {set_socket, Socket}, infinity).
+ gen_fsm:sync_send_event(Pid, {set_socket, Socket}, infinity).
+
+%% @doc Notifies the server process of a newly registered PB service.
+-spec service_registered(pid(), module()) -> ok.
+service_registered(Pid, Mod) ->
+ gen_fsm:send_all_state_event(Pid, {registered, Mod}).
%% @doc The gen_server init/1 callback, initializes the
%% riak_api_pb_server.
--spec init(list()) -> {ok, #state{}}.
+-spec init(list()) -> {ok, wait_for_socket, #state{}}.
init([]) ->
riak_api_stat:update(pbc_connect),
ServiceStates = lists:foldl(fun(Service, States) ->
orddict:store(Service, Service:init(), States)
end,
- orddict:new(), riak_api_pb_registrar:services()),
- {ok, #state{states=ServiceStates}}.
-
-%% @doc The handle_call/3 gen_server callback.
--spec handle_call(Message::term(), From::{pid(),term()}, State::#state{}) -> {reply, Message::term(), NewState::#state{}}.
-handle_call({set_socket, Socket}, _From, State) ->
- inet:setopts(Socket, [{active, once}]),
- {reply, ok, State#state{socket = Socket}}.
-
-%% @doc The handle_cast/2 gen_server callback.
--spec handle_cast(Message::term(), State::#state{}) -> {noreply, NewState::#state{}, timeout()}.
-handle_cast({registered, Service}, #state{states=ServiceStates}=State) ->
+ orddict:new(),
+ riak_api_pb_registrar:services()),
+ {ok, wait_for_socket, #state{states=ServiceStates}}.
+
+wait_for_socket(_Event, State) ->
+ {next_state, wait_for_socket, State}.
+
+wait_for_socket({set_socket, Socket}, _From, State=#state{transport={_Transport,Control}}) ->
+ case Control:peername(Socket) of
+ {ok, PeerInfo} ->
+ Control:setopts(Socket, [{active, once}]),
+ %% check if security is enabled, if it is wait for TLS, otherwise go
+ %% straight into connected state
+ case riak_core_security:is_enabled() of
+ true ->
+ {reply, ok, wait_for_tls, State#state{socket=Socket,
+ peername=PeerInfo}};
+ false ->
+ {reply, ok, connected, State#state{socket=Socket,
+ peername=PeerInfo}}
+ end;
+ {error, Reason} ->
+ lager:debug("Could not get PB socket peername: ~p", [Reason]),
+ %% It's not really "ok", but there's no reason for the
+ %% listener to crash just because this socket had an
+ %% error. See riak_api#54.
+ {stop, normal, ok, State}
+ end;
+wait_for_socket(_Event, _From, State) ->
+ {reply, unknown_message, wait_for_socket, State}.
+
+wait_for_tls({msg, MsgCode, _MsgData}, State=#state{socket=Socket,
+ transport={Transport, _Control}}) ->
+ case riak_pb_codec:msg_code(rpbstarttls) of
+ MsgCode ->
+ %% got STARTTLS msg, send ACK back to client
+ Transport:send(Socket, <<1:32/unsigned-big, MsgCode:8>>),
+ %% now do the SSL handshake
+ case ssl:ssl_accept(Socket, riak_api_ssl:options()) of
+ {ok, NewSocket} ->
+ CommonName = case ssl:peercert(NewSocket) of
+ {ok, Cert} ->
+ OTPCert = public_key:pkix_decode_cert(Cert, otp),
+ riak_core_ssl_util:get_common_name(OTPCert);
+ {error, _Reason} ->
+ undefined
+ end,
+ lager:debug("STARTTLS succeeded, peer's common name was ~p",
+ [CommonName]),
+ {next_state, wait_for_auth,
+ State#state{socket=NewSocket, common_name=CommonName, transport={ssl,ssl}}};
+ {error, Reason} ->
+ lager:warning("STARTTLS with client ~s failed: ~p",
+ [format_peername(State#state.peername), Reason]),
+ {stop, {error, {startls_failed, Reason}}, State}
+ end;
+ _ ->
+ lager:debug("Client sent unexpected message code ~p", [MsgCode]),
+ State1 = send_error_and_flush("Security is enabled, please STARTTLS first",
+ State),
+ {next_state, wait_for_tls, State1}
+ end;
+wait_for_tls(_Event, State) ->
+ {next_state, wait_for_tls, State}.
+
+wait_for_tls(_Event, _From, State) ->
+ {reply, unknown_message, wait_for_tls, State}.
+
+wait_for_auth({msg, MsgCode, MsgData}, State=#state{socket=Socket,
+ transport={Transport,_Control}}) ->
+ case riak_pb_codec:msg_code(rpbauthreq) of
+ MsgCode ->
+ %% got AUTH message, try to validate credentials
+ AuthReq = riak_pb_codec:decode(MsgCode, MsgData),
+ User = AuthReq#rpbauthreq.user,
+ Password = AuthReq#rpbauthreq.password,
+ {PeerIP, _PeerPort} = State#state.peername,
+ case riak_core_security:authenticate(User, Password, [{ip,
+ PeerIP},
+ {common_name,
+ State#state.common_name}]) of
+ {ok, SecurityContext} ->
+ lager:debug("authentication for ~p from ~p succeeded",
+ [User, PeerIP]),
+ AuthResp = riak_pb_codec:msg_code(rpbauthresp),
+ Transport:send(Socket, <<1:32/unsigned-big, AuthResp:8>>),
+ {next_state, connected,
+ State#state{security=SecurityContext}};
+ {error, Reason} ->
+ %% Allow the client to reauthenticate, I guess?
+
+ %% Add a delay to make brute-force attempts more annoying
+ timer:sleep(5000),
+ State1 = send_error_and_flush("Authentication failed",
+ State),
+ lager:debug("authentication for ~p from ~p failed: ~p",
+ [User, PeerIP, Reason]),
+ case State#state.retries of
+ N when N =< 1 ->
+ %% no more chances
+ {stop, normal, State};
+ Retries ->
+ {next_state, wait_for_auth,
+ State1#state{retries=Retries-1}}
+ end
+ end;
+ _ ->
+ State1 = send_error_and_flush("Security is enabled, please "
+ "authenticate first", State),
+ {next_state, wait_for_auth, State1}
+ end;
+wait_for_auth(_Event, State) ->
+ {next_state, wait_for_auth, State}.
+
+wait_for_auth(_Event, _From, State) ->
+ {reply, unknown_message, wait_for_auth, State}.
+
+connected(timeout, State=#state{outbuffer=Buffer}) ->
+ %% Flush any protocol messages that have been buffering
+ {ok, Data, NewBuffer} = riak_api_pb_frame:flush(Buffer),
+ {next_state, connected, flush(Data, State#state{outbuffer=NewBuffer})};
+connected({msg, MsgCode, MsgData}, State=#state{states=ServiceStates}) ->
+ try
+ %% First find the appropriate service module to dispatch
+ NewState = case riak_api_pb_registrar:lookup(MsgCode) of
+ {ok, Service} ->
+ ServiceState = orddict:fetch(Service, ServiceStates),
+ %% Decode the message according to the service
+ case Service:decode(MsgCode, MsgData) of
+ {ok, Message} ->
+ %% Process the message
+ process_message(Service, Message, ServiceState, State);
+ {ok, Message, Permissions} ->
+ case State#state.security of
+ undefined ->
+ process_message(Service, Message, ServiceState, State);
+ SecCtx ->
+ case riak_core_security:check_permissions(
+ Permissions, SecCtx) of
+ {true, NewCtx} ->
+ process_message(Service, Message,
+ ServiceState,
+ State#state{security=NewCtx});
+ {false, Error, NewCtx} ->
+ send_error(Error,
+ [],
+ State#state{security=NewCtx})
+ end
+ end;
+ {error, Reason} ->
+ send_error("Message decoding error: ~p", [Reason], State)
+ end;
+ error ->
+ case riak_pb_codec:msg_code(rpbstarttls) of
+ MsgCode ->
+ send_error("Security not enabled; STARTTLS not allowed.", State);
+ _ ->
+ send_error("Unknown message code: ~p", [MsgCode], State)
+ end
+ end,
+ {next_state, connected, NewState}
+ catch
+ %% Tell the client we errored before closing the connection.
+ Type:Failure ->
+ Trace = erlang:get_stacktrace(),
+ FState = send_error_and_flush({format, "Error processing incoming message: ~p:~p:~p",
+ [Type, Failure, Trace]}, State),
+ {stop, {Type, Failure, Trace}, FState}
+ end;
+connected(_Event, State) ->
+ {next_state, connected, State}.
+
+connected(_Event, _From, State) ->
+ {reply, unknown_message, connected, State}.
+
+%% @doc The handle_event/3 gen_fsm callback.
+handle_event({registered, Service}, StateName, #state{states=ServiceStates}=State) ->
%% When a new service is registered after a client connection is
%% already established, update the internal state to support the
%% new capabilities.
case orddict:is_key(Service, ServiceStates) of
true ->
%% This is an existing service registering
%% disjoint message codes
- {noreply, State, 0};
+ {next_state, StateName, State, 0};
false ->
%% This is a new service registering
- {noreply, State#state{states=orddict:store(Service, Service:init(), ServiceStates)}, 0}
+ {next_state, StateName,
+ State#state{states=orddict:store(Service, Service:init(),
+ ServiceStates)}, 0}
end;
-handle_cast(_Msg, State) ->
- {noreply, State, 0}.
+handle_event(_Msg, StateName, State) ->
+ {next_state, StateName, State, 0}.
-%% @doc The handle_info/2 gen_server callback.
--spec handle_info(Message::term(), State::#state{}) -> {noreply, NewState::#state{}} | {stop, Reason::atom(), NewState::#state{}}.
-handle_info(timeout, #state{outbuffer=Buffer}=State) ->
- %% Flush any protocol messages that have been buffering
- {ok, Data, NewBuffer} = riak_api_pb_frame:flush(Buffer),
- {noreply, flush(Data, State#state{outbuffer=NewBuffer})};
-handle_info({tcp_closed, Socket}, State=#state{socket=Socket}) ->
+handle_sync_event(_Event, _From, StateName, State) ->
+ {reply, unknown_message, StateName, State}.
+
+%% @doc The handle_info/3 gen_fsm callback.
+handle_info({tcp_closed, Socket}, _SN, State=#state{socket=Socket}) ->
{stop, normal, State};
-handle_info({tcp_error, Socket, _Reason}, State=#state{socket=Socket}) ->
+handle_info({ssl_closed, Socket}, _SN, State=#state{socket=Socket}) ->
{stop, normal, State};
-handle_info({tcp, _Sock, Bin}, State=#state{req=undefined,
- inbuffer=InBuffer}) ->
+handle_info({tcp_error, Socket, _Reason}, _SN, State=#state{socket=Socket}) ->
+ {stop, normal, State};
+handle_info({ssl_error, Socket, _Reason}, _SN, State=#state{socket=Socket}) ->
+ {stop, normal, State};
+handle_info({Proto, Socket, Bin}, StateName, State=#state{req=undefined,
+ socket=Socket,
+ inbuffer=InBuffer}) when
+ Proto == tcp; Proto == ssl ->
%% Because we do our own outbound framing, we need to do our own
%% inbound deframing.
NewBuffer = <<InBuffer/binary, Bin/binary>>,
- decode_buffer(State#state{inbuffer=NewBuffer});
-handle_info({tcp, _Sock, _Data}, State) ->
+ decode_buffer(StateName, State#state{inbuffer=NewBuffer});
+handle_info({Proto, Socket, _Data}, _SN, State=#state{socket=Socket}) when
+ Proto == tcp; Proto == ssl ->
%% req =/= undefined: received a new request while another was in
%% progress -> Error
lager:debug("Received a new PB socket request"
" while another was in progress"),
State1 = send_error_and_flush("Cannot send another request while one is in progress", State),
{stop, normal, State1};
-handle_info(StreamMessage, #state{req={Service,ReqId,StreamState}}=State) ->
+handle_info(StreamMessage, StateName, #state{req={Service,ReqId,StreamState}}=State) ->
%% Handle streaming messages from other processes. This should
%% help avoid creating extra middlemen. Naturally, this is only
%% valid when a streaming request has started, other messages will
%% be ignored.
try
NewState = process_stream(Service, ReqId, StreamMessage, StreamState, State),
- {noreply, NewState, 0}
+ {next_state, StateName, NewState, 0}
catch
%% Tell the client we errored before closing the connection.
Type:Reason ->
@@ -136,82 +326,57 @@ handle_info(StreamMessage, #state{req={Service,ReqId,StreamState}}=State) ->
[Type, Reason, Trace]}, State),
{stop, {Type, Reason, Trace}, FState}
end;
-handle_info(Message, State) ->
+handle_info(Message, StateName, State) ->
%% Throw out messages we don't care about, but log them
lager:error("Unrecognized message ~p", [Message]),
- {noreply, State, 0}.
+ {next_state, StateName, State, 0}.
%% @doc The gen_server terminate/2 callback, called when shutting down
%% the server.
--spec terminate(Reason, State) -> ok when
+-spec terminate(Reason, StateName, State) -> ok when
Reason :: normal | shutdown | {shutdown,term()} | term(),
+ StateName :: atom(),
State :: #state{}.
-terminate(_Reason, _State) ->
+terminate(_Reason, _StateName, _State) ->
ok.
%% @doc The gen_server code_change/3 callback, called when performing
%% a hot code upgrade on the server. Currently unused.
--spec code_change(OldVsn, State, Extra) -> {ok, State} | {error, Reason} when
+-spec code_change(OldVsn, StateName, State, Extra) -> {ok, StateName, State} when
OldVsn :: Vsn | {down, Vsn},
Vsn :: term(),
+ StateName :: atom(),
State :: #state{},
- Extra :: term(),
- Reason :: term().
-code_change(_OldVsn,State,_Extra) ->
- {ok, State}.
+ Extra :: term().
+code_change(_OldVsn, StateName, State, _Extra) ->
+ {ok, StateName, State}.
%% ===================================================================
%% Internal functions
%% ===================================================================
-decode_buffer(State=#state{socket=Socket,
- inbuffer=Buffer}) ->
+decode_buffer(StateName, State=#state{socket=Socket,
+ transport={_Transport,Control},
+ inbuffer=Buffer}) ->
case erlang:decode_packet(4, Buffer, []) of
{ok, <<MsgCode:8, MsgData/binary>>, Rest} ->
- case handle_message(MsgCode, MsgData, State) of
- {ok, NewState} ->
- decode_buffer(NewState#state{inbuffer=Rest});
+ case ?MODULE:StateName({msg, MsgCode, MsgData}, State) of
+ {next_state, NewStateName, NewState} ->
+ decode_buffer(NewStateName, NewState#state{inbuffer=Rest});
Stop ->
Stop
end;
{ok, Binary, Rest} ->
lager:error("Unexpected message format! Message: ~p, Rest: ~p", [Binary, Rest]),
{stop, badmessage, State};
{more, _Length} ->
- inet:setopts(Socket, [{active, once}]),
- {noreply, State, 0};
+ Control:setopts(Socket, [{active, once}]),
+ {next_state, StateName, State, 0};
{error, Reason} ->
FState = send_error_and_flush({format, "Invalid message packet, reason: ~p", [Reason]},
State#state{inbuffer= <<>>}),
- {noreply, FState, 0}
- end.
-
-handle_message(MsgCode, MsgData, State=#state{states=ServiceStates}) ->
- try
- %% First find the appropriate service module to dispatch
- NewState = case riak_api_pb_registrar:lookup(MsgCode) of
- {ok, Service} ->
- %% Decode the message according to the service
- case Service:decode(MsgCode, MsgData) of
- {ok, Message} ->
- %% Process the message
- ServiceState = orddict:fetch(Service, ServiceStates),
- process_message(Service, Message, ServiceState, State);
- {error, Reason} ->
- send_error("Message decoding error: ~p", [Reason], State)
- end;
- error ->
- send_error("Unknown message code.", State)
- end,
- {ok, NewState}
- catch
- %% Tell the client we errored before closing the connection.
- Type:Failure ->
- Trace = erlang:get_stacktrace(),
- FState = send_error_and_flush({format, "Error processing incoming message: ~p:~p:~p",
- [Type, Failure, Trace]}, State),
- {stop, {Type, Failure, Trace}, FState}
+ {next_state, StateName, FState, 0}
end.
@@ -329,7 +494,7 @@ send_error(Message, State) when is_list(Message) orelse is_binary(Message) ->
send_message(Packet, State).
%% @doc Formats the terms with the given string and then sends an
-%% error message to the client.
+%% error message to the client.
-spec send_error(io:format(), list(), #state{}) -> #state{}.
send_error(Format, Terms, State) ->
send_error(io_lib:format(Format, Terms), State).
@@ -346,8 +511,8 @@ send_all(Service, [Reply|Rest], State) ->
flush([], State) ->
%% The buffer was empty, so do a no-op.
State;
-flush(IoData, #state{socket=Sock}=State) ->
- gen_tcp:send(Sock, IoData),
+flush(IoData, #state{socket=Sock, transport={Transport,_Control}}=State) ->
+ Transport:send(Sock, IoData),
State.
%% @doc Sends an error and immediately flushes the message buffer.
@@ -356,3 +521,59 @@ send_error_and_flush(Error, State) ->
State1 = send_error(Error, State),
{ok, Data, NewBuffer} = riak_api_pb_frame:flush(State1#state.outbuffer),
flush(Data, State1#state{outbuffer=NewBuffer}).
+
+format_peername({IP, Port}) ->
+ io_lib:format("~s:~B", [inet_parse:ntoa(IP), Port]).
+
+-ifdef(TEST).
+
+-include("riak_api_pb_registrar.hrl").
+
+receive_closed_socket_test_() ->
+ {setup,
+ fun() ->
+ %% Create the registration table so the server will start up.
+ try ets:new(?ETS_NAME, ?ETS_OPTS) of
+ ?ETS_NAME -> true
+ catch
+ _:badarg -> false
+ end
+ end,
+ fun(true) -> ets:delete(?ETS_NAME);
+ (_) -> ok
+ end,
+ ?_test(
+ begin
+ %% Pretend that we're a listener, listen on any port
+ {ok, Listen} = gen_tcp:listen(0, []),
+ {ok, {Address, Port}} = inet:sockname(Listen),
+
+ %% Connect as a client
+ {ok, ClientSocket} = gen_tcp:connect(Address, Port, []),
+
+ %% Accept the socket, start a server, give it over to the server,
+ %% then have the client close the socket.
+ {ok, ServerSocket} = gen_tcp:accept(Listen),
+ {ok, Server} = gen_fsm:start(?MODULE, [], []),
+ MRef = monitor(process, Server),
+ ok = gen_tcp:controlling_process(ServerSocket, Server),
+ ok = gen_tcp:close(ClientSocket),
+ timer:sleep(1),
+
+ %% The call to set_socket should reply ok, but shutdown the
+ %% server, not crash and propagate back to the listener process.
+ ?assertEqual(ok, set_socket(Server, ServerSocket)),
+ receive
+ {'DOWN', MRef, process, Server, _} -> ok
+ after 5000 ->
+ %% We shouldn't miss the DOWN message, but let's
+ %% just check that the process is stopped now.
+ ?assertNot(erlang:is_process_alive(Server))
+ end,
+
+ %% Close the listening socket
+ gen_tcp:close(Listen)
+ end
+ )}.
+
+-endif.
View
48 src/riak_api_pb_service.erl
@@ -63,19 +63,23 @@
%% ```
%% decode(Code, Message) ->
%% {ok, DecodedMessage} |
+%% {ok, DecodedMessage, PermAndTarget} |
%% {error, Reason}.
%%
%% Code = non_neg_integer()
%% Message = binary()
%% DecodedMessage = Reason = term()
+%% PermAndTarget = perm_and_target()
%% '''
%%
%% The `decode/2' callback is handed a message code and wire message
%% that is registered to this service and should decode it into an
%% Erlang term that can be handled by the `process/2' callback. If the
%% message does not decode properly, it should return an `error' tuple
-%% with an appropriate reason. Most services will simply delegate
-%% encoding to the `riak_pb' application.
+%% with an appropriate reason. The decoded message may optionally
+%% include a permission and target tuple used by the security system
+%% to restrict access to the operation. Most services will simply
+%% delegate encoding to the `riak_pb' application.
%%
%% ```
%% encode(Message) ->
@@ -158,9 +162,6 @@
-module(riak_api_pb_service).
-compile([{no_auto_import, [register/2]}]).
-%% Behaviour API
--export([behaviour_info/1]).
-
%% Service-provider API
-export([register/1,
register/2,
@@ -174,16 +175,33 @@
-export_type([registration/0]).
-%% @doc Behaviour information callback. PB API services must implement
-%% the given functions.
-behaviour_info(callbacks) ->
- [{init,0},
- {decode,2},
- {encode,1},
- {process,2},
- {process_stream,3}];
-behaviour_info(_) ->
- undefined.
+-callback init() -> State :: term().
+
+-type perm_and_target() :: {Permission :: string(), Target :: term()}.
+-callback decode(Code :: non_neg_integer(), Message :: binary()) ->
+ {ok, DecodedMessage :: term()} |
+ {ok, DecodedMessage :: term(), perm_and_target()} |
+ {error, Reason :: term()}.
+
+-callback encode(Message :: term()) ->
+ {ok, EncodedMessage :: iodata()} |
+ term().
+
+-type process_error() :: iodata() |
+ {format, term()} |
+ {format, io:format(), [term()]}.
+
+-callback process(Message :: term(), State :: term()) ->
+ {reply, ReplyMessage :: term(), NewState :: term()} |
+ {reply, {stream, ReqId :: term()}, NewState :: term()} |
+ {error, Error :: process_error(), NewState :: term()}.
+
+-callback process_stream(Message :: term(), ReqId :: term(), State :: term()) ->
+ {reply, Reply :: [term()] | term(), NewState :: term()} |
+ {ignore, NewState :: term()} |
+ {done, Reply :: [term()] | term(), NewState :: term()} |
+ {done, NewState :: term()} |
+ {error, Error :: process_error(), NewState :: term()}.
%% @doc Registers a number of services at once.
%% @see register/3
View
4 src/riak_api_pb_sup.erl
@@ -40,8 +40,8 @@ service_registered(Mod) ->
undefined ->
ok;
_ ->
- [ gen_server:cast(Pid, {registered, Mod}) ||
- {_,Pid,_,_} <- supervisor:which_children(?MODULE) ],
+ _ = [ riak_api_pb_server:service_registered(Pid, Mod) ||
+ {_,Pid,_,_} <- supervisor:which_children(?MODULE) ],
ok
end.
View
276 src/riak_api_ssl.erl
@@ -0,0 +1,276 @@
+%% -------------------------------------------------------------------
+%%
+%% riak_api_ssl: configuration for SSL/TLS connections over PB and HTTP
+%%
+%% Copyright (c) 2013-2014 Basho Technologies, Inc. All Rights Reserved.
+%%
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License. You may obtain
+%% a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%%
+%% -------------------------------------------------------------------
+
+%% @doc Configuration and validation routines for SSL/TLS connections
+%% to clients.
+-module(riak_api_ssl).
+
+-export([options/0]).
+-include_lib("public_key/include/public_key.hrl").
+
+%% @doc Returns a list of common options for SSL/TLS connections.
+-spec options() -> [ssl:ssl_option()].
+options() ->
+ CoreSSL = app_helper:get_env(riak_core, ssl),
+ CACertFile = proplists:get_value(cacertfile, CoreSSL),
+ CertFile = proplists:get_value(certfile, CoreSSL),
+ KeyFile = proplists:get_value(keyfile, CoreSSL),
+ Versions = app_helper:get_env(riak_api, tls_protocols, ['tlsv1.2']),
+ HonorCipherOrder = app_helper:get_env(riak_api, honor_cipher_order, false),
+ CheckCRL = app_helper:get_env(riak_api, check_crl, false),
+
+ {Ciphers, _} = riak_core_ssl_util:parse_ciphers(riak_core_security:get_ciphers()),
+ CACerts = riak_core_ssl_util:load_certs(CACertFile),
+
+ [{certfile, CertFile},
+ {keyfile, KeyFile},
+ {cacerts, CACerts},
+ {ciphers, Ciphers},
+ {versions, Versions},
+ %% force peer validation, even though
+ %% we don't care if the peer doesn't
+ %% send a certificate
+ {verify, verify_peer},
+ {reuse_sessions, false} %% required!
+ ] ++
+ %% conditionally include the honor cipher order, don't pass it if it
+ %% disabled because it will crash any
+ %% OTP installs that lack the patch to
+ %% implement honor_cipher_order
+ [{honor_cipher_order, true} || HonorCipherOrder ] ++
+ %% if we're validating CRLs, define a
+ %% verify_fun for them.
+ [{verify_fun, {fun validate_function/3, {CACerts, []}}} || CheckCRL ].
+
+
+
+%% @doc Validator function for SSL negotiation.
+%%
+validate_function(Cert, valid_peer, State) ->
+ lager:debug("validing peer ~p with ~p intermediate certs",
+ [riak_core_ssl_util:get_common_name(Cert),
+ length(element(2, State))]),
+ %% peer certificate validated, now check the CRL
+ Res = (catch check_crl(Cert, State)),
+ lager:debug("CRL validate result for ~p: ~p",
+ [riak_core_ssl_util:get_common_name(Cert), Res]),
+ {Res, State};
+validate_function(Cert, valid, {TrustedCAs, IntermediateCerts}=State) ->
+ case public_key:pkix_is_self_signed(Cert) of
+ true ->
+ %% this is a root cert, no CRL
+ {valid, {TrustedCAs, [Cert|IntermediateCerts]}};
+ false ->
+ %% check is valid CA certificate, add to the list of
+ %% intermediates
+ Res = (catch check_crl(Cert, State)),
+ lager:debug("CRL intermediate CA validate result for ~p: ~p",
+ [riak_core_ssl_util:get_common_name(Cert), Res]),
+ {Res, {TrustedCAs, [Cert|IntermediateCerts]}}
+ end;
+validate_function(_Cert, _Event, State) ->
+ {valid, State}.
+
+%% @doc Given a certificate, find CRL distribution points for the given
+%% certificate, fetch, and attempt to validate each CRL through
+%% issuer_function/4.
+%%
+check_crl(Cert, State) ->
+ %% pull the CRL distribution point(s) out of the certificate, if any
+ case pubkey_cert:select_extension(?'id-ce-cRLDistributionPoints',
+ pubkey_cert:extensions_list(Cert#'OTPCertificate'.tbsCertificate#'OTPTBSCertificate'.extensions)) of
+ undefined ->
+ lager:debug("no CRL distribution points for ~p",
+ [riak_core_ssl_util:get_common_name(Cert)]),
+ %% fail; we can't validate if there's no CRL
+ no_crl;
+ CRLExtension ->
+ CRLDistPoints = CRLExtension#'Extension'.extnValue,
+ DPointsAndCRLs = lists:foldl(fun(Point, Acc) ->
+ %% try to read the CRL over http or from a
+ %% local file
+ case fetch_point(Point) of
+ not_available ->
+ Acc;
+ Res ->
+ [{Point, Res} | Acc]
+ end
+ end, [], CRLDistPoints),
+ public_key:pkix_crls_validate(Cert,
+ DPointsAndCRLs,
+ [{issuer_fun,
+ {fun issuer_function/4, State}}])
+ end.
+
+%% @doc Given a list of distribution points for CRLs, certificates and
+%% both trusted and intermediary certificates, attempt to build and
+%% authority chain back via build_chain to verify that it is valid.
+%%
+issuer_function(_DP, CRL, _Issuer, {TrustedCAs, IntermediateCerts}) ->
+ %% XXX the 'Issuer' we get passed here is the AuthorityKeyIdentifier,
+ %% which we are not currently smart enough to understand
+ %% Read the CA certs out of the file
+ Certs = [public_key:pkix_decode_cert(DER, otp) || DER <- TrustedCAs],
+ %% get the real issuer out of the CRL
+ Issuer = public_key:pkix_normalize_name(
+ pubkey_cert_records:transform(
+ CRL#'CertificateList'.tbsCertList#'TBSCertList'.issuer, decode)),
+ %% assume certificates are ordered from root to tip
+ case find_issuer(Issuer, IntermediateCerts ++ Certs) of
+ undefined ->
+ lager:debug("unable to find certificate matching CRL issuer ~p",
+ [Issuer]),
+ error;
+ IssuerCert ->
+ case build_chain({public_key:pkix_encode('OTPCertificate',
+ IssuerCert,
+ otp),
+ IssuerCert}, IntermediateCerts, Certs, []) of
+ undefined ->
+ error;
+ {OTPCert, Path} ->
+ {ok, OTPCert, Path}
+ end
+ end.
+
+%% @doc Attempt to build authority chain back using intermediary
+%% certificates, falling back on trusted certificates if the
+%% intermediary chain of certificates does not fully extend to the
+%% root.
+%%
+%% Returns: {RootCA :: #OTPCertificate{}, Chain :: [der_encoded()]}
+%%
+build_chain({DER, Cert}, IntCerts, TrustedCerts, Acc) ->
+ %% check if this cert is self-signed, if it is, we've reached the
+ %% root of the chain
+ Issuer = public_key:pkix_normalize_name(
+ Cert#'OTPCertificate'.tbsCertificate#'OTPTBSCertificate'.issuer),
+ Subject = public_key:pkix_normalize_name(
+ Cert#'OTPCertificate'.tbsCertificate#'OTPTBSCertificate'.subject),
+ case Issuer == Subject of
+ true ->
+ case find_issuer(Issuer, TrustedCerts) of
+ undefined ->
+ undefined;
+ TrustedCert ->
+ %% return the cert from the trusted list, to prevent
+ %% issuer spoofing
+ {TrustedCert,
+ [public_key:pkix_encode(
+ 'OTPCertificate', TrustedCert, otp)|Acc]}
+ end;
+ false ->
+ Match = lists:foldl(
+ fun(C, undefined) ->
+ S = public_key:pkix_normalize_name(C#'OTPCertificate'.tbsCertificate#'OTPTBSCertificate'.subject),
+ %% compare the subject to the current issuer
+ case Issuer == S of
+ true ->
+ %% we've found our man
+ {public_key:pkix_encode('OTPCertificate', C, otp), C};
+ false ->
+ undefined
+ end;
+ (_E, A) ->
+ %% already matched
+ A
+ end, undefined, IntCerts),
+ case Match of
+ undefined when IntCerts /= TrustedCerts ->
+ %% continue the chain by using the trusted CAs
+ lager:debug("Ran out of intermediate certs, switching to trusted certs~n"),
+ build_chain({DER, Cert}, TrustedCerts, TrustedCerts, Acc);
+ undefined ->
+ lager:debug("Can't construct chain of trust beyond ~p",
+ [riak_core_ssl_util:get_common_name(Cert)]),
+ %% can't find the current cert's issuer
+ undefined;
+ Match ->
+ build_chain(Match, IntCerts, TrustedCerts, [DER|Acc])
+ end
+ end.
+
+%% @doc Given a certificate and a list of trusted or intermediary
+%% certificates, attempt to find a match in the list or bail with
+%% undefined.
+find_issuer(Issuer, Certs) ->
+ lists:foldl(
+ fun(OTPCert, undefined) ->
+ %% check if this certificate matches the issuer
+ Normal = public_key:pkix_normalize_name(
+ OTPCert#'OTPCertificate'.tbsCertificate#'OTPTBSCertificate'.subject),
+ case Normal == Issuer of
+ true ->
+ OTPCert;
+ false ->
+ undefined
+ end;
+ (_E, Acc) ->
+ %% already found a match
+ Acc
+ end, undefined, Certs).
+
+%% @doc Find distribution points for a given CRL and then attempt to
+%% fetch the CRL from the first available.
+fetch_point(#'DistributionPoint'{distributionPoint={fullName, Names}}) ->
+ Decoded = [{NameType,
+ pubkey_cert_records:transform(Name, decode)}
+ || {NameType, Name} <- Names],
+ fetch(Decoded).
+
+%% @doc Given a list of locations to retrieve a CRL from, attempt to
+%% retrieve either from a file or http resource and bail as soon as
+%% it can be found.
+%%
+%% Currently, only hand a armored PEM or DER encoded file, with
+%% defaulting to DER.
+%%
+fetch([]) ->
+ not_available;
+fetch([{uniformResourceIdentifier, "file://"++_File}|Rest]) ->
+ lager:debug("fetching CRLs from file URIs is not supported"),
+ fetch(Rest);
+fetch([{uniformResourceIdentifier, "http"++_=URL}|Rest]) ->
+ lager:debug("getting CRL from ~p~n", [URL]),
+ _ = inets:start(),
+ case httpc:request(get, {URL, []}, [], [{body_format, binary}]) of
+ {ok, {_Status, _Headers, Body}} ->
+ case Body of
+ <<"-----BEGIN", _/binary>> ->
+ [{'CertificateList',
+ DER, _}=CertList] = public_key:pem_decode(Body),
+ {DER, public_key:pem_entry_decode(CertList)};
+ _ ->
+ %% assume DER encoded
+ CertList = public_key:pem_entry_decode(
+ {'CertificateList', Body, not_encrypted}),
+ {Body, CertList}
+ end;
+ {error, _Reason} ->
+ lager:debug("failed to get CRL ~p~n", [_Reason]),
+ fetch(Rest)
+ end;
+fetch([Loc|Rest]) ->
+ %% unsupported CRL location
+ lager:debug("unable to fetch CRL from unsupported location ~p",
+ [Loc]),
+ fetch(Rest).
View
6 src/riak_api_stat.erl
@@ -45,8 +45,8 @@ start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
register_stats() ->
- [(catch folsom_metrics:delete_metric({?APP, Name})) || {Name, _Type} <- stats()],
- [register_stat(stat_name(Stat), Type) || {Stat, Type} <- stats()],
+ _ = [(catch folsom_metrics:delete_metric({?APP, Name})) || {Name, _Type} <- stats()],
+ _ = [register_stat(stat_name(Stat), Type) || {Stat, Type} <- stats()],
riak_core_stat_cache:register_app(?APP, {?MODULE, produce_stats, []}).
%% @doc Return current aggregation of all stats.
@@ -111,7 +111,7 @@ stat_name(Name) when is_atom(Name) ->
register_stat(Name, spiral) ->
folsom_metrics:new_spiral(Name);
register_stat(Name, {function, _Module, _Function}=Fun) ->
- folsom_metrics:new_gauge(Name),
+ ok = folsom_metrics:new_gauge(Name),
folsom_metrics:notify({Name, Fun}).
active_pb_connects() ->
View
44 src/riak_api_sup.erl
@@ -31,10 +31,10 @@
-define(CHILD(I, Type), {I, {I, start_link, []}, permanent, 5000, Type, [I]}).
-define(CHILD(I, Type, Args), {I, {I, start_link, Args}, permanent, 5000, Type, [I]}).
--define(LNAME(IP, Port), lists:flatten(io_lib:format("~p:~p", [IP, Port]))).
--define(LISTENER(IP, Port), {?LNAME(IP, Port),
- {riak_api_pb_listener, start_link, [IP, Port]},
- permanent, 5000, worker, [riak_api_pb_listener]}).
+-define(LNAME(IP, Port), lists:flatten(io_lib:format("pb://~p:~p", [IP, Port]))).
+-define(PB_LISTENER(IP, Port), {?LNAME(IP, Port),
+ {riak_api_pb_listener, start_link, [IP, Port]},
+ permanent, 5000, worker, [riak_api_pb_listener]}).
%% @doc Starts the supervisor.
-spec start_link() -> {ok, pid()} | {error, term()}.
start_link() ->
@@ -47,18 +47,32 @@ start_link() ->
MaxT :: pos_integer(),
ChildSpec :: supervisor:child_spec().
init([]) ->
- Listeners = riak_api_pb_listener:get_listeners(),
Helper = ?CHILD(riak_api_pb_registration_helper, worker),
Registrar = ?CHILD(riak_api_pb_registrar, worker),
- NetworkProcesses = if Listeners /= [] ->
- [?CHILD(riak_api_pb_sup, supervisor)] ++
- listener_specs(Listeners);
- true ->
- lager:info("No PB listeners were configured,"
- " PB connections will be disabled."),
- []
- end,
+ PBProcesses = pb_processes(riak_api_pb_listener:get_listeners()),
+ WebProcesses = web_processes(riak_api_web:get_listeners()),
+ NetworkProcesses = PBProcesses ++ WebProcesses,
{ok, {{one_for_one, 10, 10}, [Helper, Registrar|NetworkProcesses]}}.
-listener_specs(Pairs) ->
- [ ?LISTENER(IP, Port) || {IP, Port} <- Pairs ].
+%% Generates child specs from the HTTP/HTTPS listener configuration.
+%% @private
+web_processes([]) ->
+ lager:info("No HTTP/HTTPS listeners were configured, HTTP connections will be disabled."),
+ [];
+web_processes(Listeners) ->
+ lists:flatten([ web_listener_spec(Scheme, Binding) ||
+ {Scheme, Binding} <- Listeners ]).
+
+web_listener_spec(Scheme, Binding) ->
+ riak_api_web:binding_config(Scheme, Binding).
+
+%% Generates child specs from the PB listener configuration.
+%% @private
+pb_processes([]) ->
+ lager:info("No PB listeners were configured, PB connections will be disabled."),
+ [];
+pb_processes(Listeners) ->
+ [?CHILD(riak_api_pb_sup, supervisor)| pb_listener_specs(Listeners)].
+
+pb_listener_specs(Pairs) ->
+ [ ?PB_LISTENER(IP, Port) || {IP, Port} <- Pairs ].
View
86 src/riak_api_web.erl
@@ -0,0 +1,86 @@
+%% -------------------------------------------------------------------
+%%
+%% riak_api_web: setup Riak's HTTP interface
+%%
+%% Copyright (c) 2007-2010 Basho Technologies, Inc. All Rights Reserved.
+%%
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License. You may obtain
+%% a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%%
+%% -------------------------------------------------------------------
+
+%% @doc Convenience functions for setting up the HTTP interface
+%% of Riak.
+-module(riak_api_web).
+
+
+-export([get_listeners/0,
+ binding_config/2]).
+
+get_listeners() ->
+ get_listeners(http) ++ get_listeners(https).
+
+get_listeners(Scheme) ->
+ Listeners = case app_helper:try_envs([{riak_api, Scheme},
+ {riak_core, Scheme}], []) of
+ {riak_api, Scheme, List} when is_list(List) ->
+ List;
+ {riak_core, Scheme, List} when is_list(List) ->
+ lager:warning("Setting riak_core/~s is deprecated, please use riak_api/~s", [Scheme, Scheme]),
+ List;
+ _ ->
+ []
+ end,
+ lists:usort([ {Scheme, Binding} || Binding <- Listeners ]).
+
+binding_config(Scheme, Binding) ->
+ {Ip, Port} = Binding,
+ Name = spec_name(Scheme, Ip, Port),
+ Config = spec_from_binding(Scheme, Name, Binding),
+
+ {Name,
+ {webmachine_mochiweb, start, [Config]},
+ permanent, 5000, worker, [mochiweb_socket_server]}.
+
+spec_from_binding(http, Name, {Ip, Port}) ->
+ lists:flatten([{name, Name},
+ {ip, Ip},
+ {port, Port},
+ {nodelay, true}],
+ common_config());
+
+spec_from_binding(https, Name, {Ip, Port}) ->
+ lists:flatten([{name, Name},
+ {ip, Ip},
+ {port, Port},
+ {ssl, true},
+ {ssl_opts, riak_api_ssl:options()},
+ {nodelay, true}],
+ common_config()).
+
+spec_name(Scheme, Ip, Port) ->
+ FormattedIP = if is_tuple(Ip); tuple_size(Ip) == 4 ->
+ inet_parse:ntoa(Ip);
+ is_tuple(Ip); tuple_size(Ip) == 8 ->
+ [$[, inet_parse:ntoa(Ip), $]];
+ true -> Ip
+ end,
+ lists:flatten(io_lib:format("~s://~s:~p", [Scheme, FormattedIP, Port])).
+
+common_config() ->
+ [{log_dir, app_helper:get_env(riak_api, http_logdir,
+ app_helper:get_env(riak_core, platform_log_dir, "log"))},
+ {backlog, 128},
+ {dispatch, [{[], riak_api_wm_urlmap, []}
+ ]}].
View
39 src/riak_api_web_security.erl
@@ -0,0 +1,39 @@
+%% @doc Some security helper functions for Riak API endpoints
+-module(riak_api_web_security).
+
+-export([is_authorized/1]).
+
+%% @doc Check if the user is authorized
+-spec is_authorized(any()) -> {true, any()} | false | insecure.
+is_authorized(ReqData) ->
+ case riak_core_security:is_enabled() of
+ true ->
+ Scheme = wrq:scheme(ReqData),
+ case Scheme == https of
+ true ->
+ case wrq:get_req_header("Authorization", ReqData) of
+ "Basic " ++ Base64 ->
+ UserPass = base64:decode_to_string(Base64),
+ [User, Pass] = [list_to_binary(X) || X <-
+ string:tokens(UserPass, ":")],
+ {ok, Peer} = inet_parse:address(wrq:peer(ReqData)),
+ case riak_core_security:authenticate(User, Pass,
+ [{ip, Peer}])
+ of
+ {ok, Sec} ->
+ {true, Sec};
+ {error, _} ->
+ false
+ end;
+ _ ->
+ false
+ end;
+ false ->
+ %% security is enabled, but they're connecting over HTTP.
+ %% which means if they authed, the credentials would be in
+ %% plaintext
+ insecure
+ end;
+ false ->
+ {true, undefined} %% no security context
+ end.
View
81 src/riak_api_wm_urlmap.erl
@@ -0,0 +1,81 @@
+%% -------------------------------------------------------------------
+%%
+%% riak_api_wm_urlmap: expose the roots of registered Webmachine resources
+%%
+%% Copyright (c) 2007-2013 Basho Technologies, Inc. All Rights Reserved.
+%%
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License. You may obtain
+%% a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%%
+%% -------------------------------------------------------------------
+
+%% @doc This module provides a Webmachine resource that lists the
+%% URLs for other resources available on this host.
+%%
+%% Links to Riak resources will be added to the Link header in
+%% the form:
+%%```
+%% <URL>; rel="RESOURCE_NAME"
+%%'''
+%% HTML output of this resource is a list of link tags like:
+%%```
+%% <a href="URL">RESOURCE_NAME</a>
+%%'''
+%% JSON output of this resource in an object with elements like:
+%%```
+%% "RESOURCE_NAME":"URL"
+%%'''
+-module(riak_api_wm_urlmap).
+-export([
+ init/1,
+ resource_exists/2,
+ content_types_provided/2,
+ to_html/2,
+ to_json/2
+ ]).
+
+-include_lib("webmachine/include/webmachine.hrl").
+
+init([]) ->
+ {ok, service_list()}.
+
+resource_exists(RD, Services) ->
+ {true, add_link_header(RD, Services), Services}.
+
+add_link_header(RD, Services) ->
+ wrq:set_resp_header(
+ "Link",
+ string:join([ ["<",Uri,">; rel=\"",Resource,"\""]
+ || {Resource, Uri} <- Services ],
+ ","),
+ RD).
+
+content_types_provided(RD, Services) ->
+ {[{"text/html", to_html},{"application/json", to_json}], RD, Services}.
+
+to_html(RD, Services) ->
+ {["<html><body><ul>",
+ [ ["<li><a href=\"", Uri, "\">", Resource, "</a></li>"]
+ || {Resource, Uri} <- Services ],
+ "</ul></body></html>"],
+ RD, Services}.
+
+to_json(RD, Services) ->
+ {mochijson:encode({struct, Services}), RD, Services}.
+
+service_list() ->
+ Dispatch = webmachine_router:get_routes(),
+ lists:usort(
+ [{atom_to_list(Resource), "/"++UriBase}
+ || {[UriBase|_], Resource, _} <- Dispatch]).
View
52 src/riak_core_pb_bucket.erl
@@ -57,32 +57,66 @@ init() ->
%% @doc decode/2 callback. Decodes an incoming message.
decode(Code, Bin) when Code == 19; Code == 21; Code == 29 ->
- {ok, riak_pb_codec:decode(Code, Bin)}.
+ Msg = riak_pb_codec:decode(Code, Bin),
+ case Msg of
+ #rpbgetbucketreq{type =T, bucket =B} ->
+ Bucket = bucket_type(T, B),
+ {ok, Msg, {"riak_core.get_bucket", Bucket}};
+ #rpbsetbucketreq{type=T, bucket=B} ->
+ Bucket = bucket_type(T, B),
+ {ok, Msg, {"riak_core.set_bucket", Bucket}};
+ #rpbresetbucketreq{type=T, bucket=B} ->
+ %% reset is just a fancy set
+ Bucket = bucket_type(T, B),
+ {ok, Msg, {"riak_core.set_bucket", Bucket}}
+ end.
%% @doc encode/1 callback. Encodes an outgoing response message.
encode(Message) ->
{ok, riak_pb_codec:encode(Message)}.
%% Get bucket properties
-process(#rpbgetbucketreq{bucket=B}, State) ->
- Props = riak_core_bucket:get_bucket(B),
- PbProps = riak_pb_codec:encode_bucket_props(Props),
- {reply, #rpbgetbucketresp{props = PbProps}, State};
+process(#rpbgetbucketreq{type=T, bucket=B}, State) ->
+ Bucket = maybe_create_bucket_type(T, B),
+ case riak_core_bucket:get_bucket(Bucket) of
+ {error, no_type} ->
+ {error, {format, "No bucket-type named '~s'", [T]}, State};
+ Props ->
+ PbProps = riak_pb_codec:encode_bucket_props(Props),
+ {reply, #rpbgetbucketresp{props = PbProps}, State}
+ end;
%% Set bucket properties
-process(#rpbsetbucketreq{bucket=B, props = PbProps}, State) ->
+process(#rpbsetbucketreq{type=T, bucket=B, props=PbProps}, State) ->
Props = riak_pb_codec:decode_bucket_props(PbProps),
- case riak_core_bucket:set_bucket(B, Props) of
+ Bucket = maybe_create_bucket_type(T, B),
+ case riak_core_bucket:set_bucket(Bucket, Props) of
ok ->
{reply, rpbsetbucketresp, State};
+ {error, no_type} ->
+ {error, {format, "No bucket-type named '~s'", [T]}, State};
{error, Details} ->
{error, {format, "Invalid bucket properties: ~p", [Details]}, State}
end;
%% Reset bucket properties
-process(#rpbresetbucketreq{bucket=B}, State) ->
- riak_core_bucket:reset_bucket(B),
+process(#rpbresetbucketreq{type = T, bucket=B}, State) ->
+ Bucket = maybe_create_bucket_type(T, B),
+ riak_core_bucket:reset_bucket(Bucket),
{reply, rpbresetbucketresp, State}.
process_stream(_, _, State) ->
{ignore, State}.
+
+maybe_create_bucket_type(<<"default">>, Bucket) ->
+ Bucket;
+maybe_create_bucket_type(undefined, Bucket) ->
+ Bucket;
+maybe_create_bucket_type(Type, Bucket) ->
+ {Type, Bucket}.
+
+%% always construct {Type, Bucket} tuple, filling in default type if needed
+bucket_type(undefined, B) ->
+ {<<"default">>, B};
+bucket_type(T, B) ->
+ {T, B}.
View
84 src/riak_core_pb_bucket_type.erl
@@ -0,0 +1,84 @@
+%% -------------------------------------------------------------------
+%%
+%% riak_core_pb_bucket_type: Expose Core bucket type functionality to Protocol Buffers
+%%
+%% Copyright (c) 2012 Basho Technologies, Inc. All Rights Reserved.
+%%
+%% This file is provided to you under the Apache License,
+%% Version 2.0 (the "License"); you may not use this file
+%% except in compliance with the License. You may obtain
+%% a copy of the License at
+%%
+%% http://www.apache.org/licenses/LICENSE-2.0
+%%
+%% Unless required by applicable law or agreed to in writing,
+%% software distributed under the License is distributed on an
+%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+%% KIND, either express or implied. See the License for the
+%% specific language governing permissions and limitations
+%% under the License.
+%%
+%% -------------------------------------------------------------------
+
+%% @doc <p>The Bucket type PB service for Riak Core. This is included
+%% in the Riak API application because of startup-time constraints.
+%% This service covers the following request messages:</p>
+%%
+%% <pre>
+%% 31 - RpbGetBucketTypeReq
+%% 32 - RpbSetBucketTypeReq
+%% </pre>
+%%
+%% @end
+-module(riak_core_pb_bucket_type).
+
+-behaviour(riak_api_pb_service).
+
+-export([init/0,
+ decode/2,
+ encode/1,
+ process/2,
+ process_stream/3]).
+
+-include_lib("riak_pb/include/riak_pb.hrl").
+
+init() ->
+ undefined.
+
+%% @doc decode/2 callback. Decodes an incoming message.
+decode(Code, Bin) when Code == 31; Code == 32 ->
+ Msg = riak_pb_codec:decode(Code, Bin),
+ case Msg of
+ #rpbgetbuckettypereq{type=T} ->
+ {ok, Msg, {"riak_core.get_bucket_type", T}};
+ #rpbsetbuckettypereq{type=T} ->
+ {ok, Msg, {"riak_core.set_bucket_type", T}}
+ end.
+
+%% @doc encode/1 callback. Encodes an outgoing response message.
+encode(Message) ->
+ {ok, riak_pb_codec:encode(Message)}.
+
+%% Get bucket type properties
+process(#rpbgetbuckettypereq{type = T}, State) ->
+ case riak_core_bucket_type:get(T) of
+ undefined ->
+ {error, {format, "Invalid bucket type: ~p", [T]}, State};
+ Props ->
+ PbProps = riak_pb_codec:encode_bucket_props(Props),
+ {reply, #rpbgetbucketresp{props = PbProps}, State}
+ end;
+
+%% Set bucket type properties
+process(#rpbsetbuckettypereq{type = T, props = PbProps}, State) ->
+ Props = riak_pb_codec:decode_bucket_props(PbProps),
+ case riak_core_bucket_type:update(T, Props) of
+ ok ->
+ {reply, rpbsetbucketresp, State};
+ {error, Details} ->
+ {error, {format, "Invalid bucket properties: ~p", [Details]}, State}
+ end.
+
+process_stream(_, _, State) ->
+ {ignore, State}.
+
View
64 test/pb_service_test.erl
@@ -86,31 +86,34 @@ process_stream(_, _, State) ->
%% Eunit tests
%% ===================================================================
setup() ->
- Deps = resolve_deps(riak_api), %% Implicitly loads apps
+ application:load(lager),
+ application:load(riak_api),
- application:set_env(sasl, sasl_error_logger, {file, "pb_service_test_sasl.log"}),
error_logger:tty(false),
- error_logger:logfile({open, "pb_service_test.log"}),
-
application:set_env(lager, handlers, [{lager_file_backend, [{"pb_service_test.log", debug, 10485760, "$D0", 5}]}]),
application:set_env(lager, error_logger_redirect, true),
- application:set_env(riak_core, handoff_port, 0),
+ %% Need riak_core.security capability, let's fake it
+ ets:new(riak_capability_ets, [named_table, {read_concurrency, true}]),
+ ets:insert(riak_capability_ets, {{riak_core, security}, false}),
OldListeners = app_helper:get_env(riak_api, pb, [{"127.0.0.1", 8087}]),
application:set_env(riak_api, pb, [{"127.0.0.1", 32767}]),
- [ application:start(A) || A <- Deps ],
- riak_core:wait_for_application(riak_api),
+ lager:start(),
+ {ok, Sup} = riak_api_sup:start_link(),
+ unlink(Sup),
wait_for_port(),
riak_api_pb_service:register(?MODULE, ?MSGMIN, ?MSGMAX),
riak_api_pb_service:register(?MODULE, 111),
- {OldListeners, Deps}.
+ {OldListeners, Sup}.
+
-cleanup({L, Deps}) ->
- [ application:stop(A) || A <- lists:reverse(Deps), not is_otp_base_app(A) ],
- wait_for_application_shutdown(riak_api),
+cleanup({L, Sup}) ->
+ ets:delete(riak_capability_ets),
+ exit(Sup, normal),
application:set_env(riak_api, pb, L),
+ application:stop(lager),
ok.
request_multi(Payloads) when is_list(Payloads) ->
@@ -270,42 +273,3 @@ wait_for_port(TRef) ->
lager:debug("PB port is up"),
ok
end.
-
-wait_for_application_shutdown(App) ->
- case lists:keymember(App, 1, application:which_applications()) of
- true ->
- timer:sleep(250),
- wait_for_application_shutdown(App);
- false ->
- ok
- end.
-
-%% The following three functions build a list of dependent
-%% applications. They will not handle circular or mutually-dependent
-%% applications.
-dep_apps(App) ->
- application:load(App),
- {ok, Apps} = application:get_key(App, applications),
- Apps.
-
-all_deps(App, Deps) ->
- [[ all_deps(Dep, [App|Deps]) || Dep <- dep_apps(App),
- not lists:member(Dep, Deps)], App].
-
-resolve_deps(App) ->
- DepList = all_deps(App, []),
- {AppOrder, _} = lists:foldl(fun(A,{List,Set}) ->
- case sets:is_element(A, Set) of
- true ->
- {List, Set};
- false ->
- {List ++ [A], sets:add_element(A, Set)}
- end
- end,
- {[], sets:new()},
- lists:flatten(DepList)),
- AppOrder.
-
-is_otp_base_app(kernel) -> true;
-is_otp_base_app(stdlib) -> true;
-is_otp_base_app(_) -> false.
View
70 test/riak_api_schema_tests.erl
@@ -0,0 +1,70 @@
+-module(riak_api_schema_tests).
+-include_lib("eunit/include/eunit.hrl").
+-compile(export_all).
+
+%% basic schema test will check to make sure that all defaults from the schema
+%% make it into the generated app.config
+basic_schema_test() ->
+ %% The defaults are defined in ../priv/riak_api.schema. it is the file under test.
+ Config = cuttlefish_unit:generate_templated_config("../priv/riak_api.schema", [], context()),
+ cuttlefish_unit:assert_config(Config, "riak_api.http", []),
+ cuttlefish_unit:assert_config(Config, "riak_api.pb", []),
+ cuttlefish_unit:assert_not_configured(Config, "riak_api.https"),
+ cuttlefish_unit:assert_config(Config, "riak_api.pb_backlog", 128),
+ cuttlefish_unit:assert_config(Config, "riak_api.disable_pb_nagle", true),
+ cuttlefish_unit:assert_config(Config, "riak_api.honor_cipher_order", basho_vm(true, false)),
+ cuttlefish_unit:assert_config(Config, "riak_api.tls_protocols", ['tlsv1.2']),
+ cuttlefish_unit:assert_config(Config, "riak_api.check_crl", basho_vm(true, false)),
+ ok.
+
+override_schema_test() ->
+ %% Conf represents the riak.conf file that would be read in by cuttlefish.
+ %% this proplists is what would be output by the conf_parse module
+ Conf = [
+ {["listener", "http", "internal"], "127.0.0.2:8000"},
+ {["listener", "http", "external"], "127.0.0.3:8000"},
+ {["listener", "protobuf", "internal"], "127.0.0.8:3000"},
+ {["listener", "protobuf", "external"], "127.0.0.9:3000"},
+ {["listener", "https", "internal"], "127.0.0.12:443"},
+ {["listener", "https", "external"], "127.0.0.13:443"},
+ {["protobuf", "backlog"], 64},
+ {["protobuf", "nagle"], on},
+ {["honor_cipher_order"], off},
+ {["tls_protocols", "sslv3"], on},
+ {["tls_protocols", "tlsv1"], on},
+ {["tls_protocols", "tlsv1", "1"], on},
+ {["tls_protocols", "tlsv1", "2"], off},
+ {["check_crl"], off}
+ ],
+ Config = cuttlefish_unit:generate_templated_config("../priv/riak_api.schema", Conf, context()),
+
+
+ cuttlefish_unit:assert_config(Config, "riak_api.http", [{"127.0.0.3", 8000}, {"127.0.0.2", 8000}]),
+ cuttlefish_unit:assert_config(Config, "riak_api.pb", [{"127.0.0.9", 3000}, {"127.0.0.8", 3000}]),
+ cuttlefish_unit:assert_config(Config, "riak_api.https", [{"127.0.0.13", 443}, {"127.0.0.12", 443}]),
+ cuttlefish_unit:assert_config(Config, "riak_api.pb_backlog", 64),
+ cuttlefish_unit:assert_config(Config, "riak_api.disable_pb_nagle", false),
+ cuttlefish_unit:assert_config(Config, "riak_api.tls_protocols", ['tlsv1.1', tlsv1, sslv3]),
+ cuttlefish_unit:assert_config(Config, "riak_api.check_crl", false),
+ ok.
+
+%% this context() represents the substitution variables that rebar
+%% will use during the build process. riak_core's schema file is
+%% written with some {{mustache_vars}} for substitution during
+%% packaging cuttlefish doesn't have a great time parsing those, so we
+%% perform the substitutions first, because that's how it would work
+%% in real life.
+context() ->
+ [
+ {web_ip, "127.0.0.1"},
+ {web_port, 8098},
+ {pb_ip, "127.0.0.1"},
+ {pb_port, 8087}
+ ].
+
+basho_vm(ExpectedIfBasho, ExpectedIfNot) ->
+ OTPVer = erlang:system_info(otp_release),
+ case string:str(OTPVer, "basho") of
+ 0 -> ExpectedIfNot;
+ _ -> ExpectedIfBasho
+ end.
View
41 test/riak_api_test_util.erl
@@ -0,0 +1,41 @@
+-module(riak_api_test_util).
+-compile(export_all).
+
+wait_for_application_shutdown(App) ->
+ case lists:keymember(App, 1, application:which_applications()) of
+ true ->
+ timer:sleep(250),
+ wait_for_application_shutdown(App);
+ false ->
+ ok
+ end.
+
+%% The following three functions build a list of dependent
+%% applications. They will not handle circular or mutually-dependent
+%% applications.
+dep_apps(App) ->
+ application:load(App),
+ {ok, Apps} = application:get_key(App, applications),
+ Apps.
+
+all_deps(App, Deps) ->
+ [[ all_deps(Dep, [App|Deps]) || Dep <- dep_apps(App),
+ not lists:member(Dep, Deps)], App].
+
+resolve_deps(App) ->
+ DepList = all_deps(App, []),
+ {AppOrder, _} = lists:foldl(fun(A,{List,Set}) ->
+ case sets:is_element(A, Set) of
+ true ->
+ {List, Set};
+ false ->
+ {List ++ [A], sets:add_element(A, Set)}
+ end
+ end,
+ {[], sets:new()},
+ lists:flatten(DepList)),
+ AppOrder.
+
+is_otp_base_app(kernel) -> true;
+is_otp_base_app(stdlib) -> true;
+is_otp_base_app(_) -> false.
View
68 tools.mk
@@ -0,0 +1,68 @@
+REBAR ?= ./rebar
+
+compile-no-deps:
+ ${REBAR} compile skip_deps=true
+
+test: compile
+ ${REBAR} eunit skip_deps=true
+
+docs:
+ ${REBAR} doc skip_deps=true
+
+xref: compile
+ ${REBAR} xref skip_deps=true
+
+PLT ?= $(HOME)/.combo_dialyzer_plt
+LOCAL_PLT = .local_dialyzer_plt
+DIALYZER_FLAGS ?= -Wunmatched_returns
+
+${PLT}: compile
+ @if [ -f $(PLT) ]; then \
+ dialyzer --check_plt --plt $(PLT) --apps $(DIALYZER_APPS) && \
+ dialyzer --add_to_plt --plt $(PLT) --output_plt $(PLT) --apps $(DIALYZER_APPS) ; test $$? -ne 1; \
+ else \
+ dialyzer --build_plt --output_plt $(PLT) --apps $(DIALYZER_APPS); test $$? -ne 1; \
+ fi
+
+${LOCAL_PLT}: compile
+ @if [ -d deps ]; then \
+ if [ -f $(LOCAL_PLT) ]; then \
+ dialyzer --check_plt --plt $(LOCAL_PLT) deps/*/ebin && \
+ dialyzer --add_to_plt --plt $(LOCAL_PLT) --output_plt $(LOCAL_PLT) deps/*/ebin ; test $$? -ne 1; \
+ else \
+ dialyzer --build_plt --output_plt $(LOCAL_PLT) deps/*/ebin ; test $$? -ne 1; \
+ fi \
+ fi
+
+dialyzer-run:
+ @echo "==> $(shell basename $(shell pwd)) (dialyzer)"
+ @if [ -f $(LOCAL_PLT) ]; then \
+ PLTS="$(PLT) $(LOCAL_PLT)"; \
+ else \
+ PLTS=$(PLT); \
+ fi; \
+ if [ -f dialyzer.ignore-warnings ]; then \
+ if [ $$(grep -cvE '[^[:space:]]' dialyzer.ignore-warnings) -ne 0 ]; then \
+ echo "ERROR: dialyzer.ignore-warnings contains a blank/empty line, this will match all messages!"; \
+ exit 1; \
+ fi; \
+ dialyzer $(DIALYZER_FLAGS) --plts $${PLTS} -c ebin > dialyzer_warnings ; \
+ egrep -v "^\s*(done|Checking|Proceeding|Compiling)" dialyzer_warnings | grep -F -f dialyzer.ignore-warnings -v > dialyzer_unhandled_warnings ; \
+ cat dialyzer_unhandled_warnings ; \
+ [ $$(cat dialyzer_unhandled_warnings | wc -l) -eq 0 ] ; \
+ else \
+ dialyzer $(DIALYZER_FLAGS) --plts $${PLTS} -c ebin; \
+ fi
+
+dialyzer-quick: compile-no-deps dialyzer-run
+
+dialyzer: ${PLT} ${LOCAL_PLT} dialyzer-run
+
+cleanplt:
+ @echo
+ @echo "Are you sure? It takes several minutes to re-build."
+ @echo Deleting $(PLT) and $(LOCAL_PLT) in 5 seconds.
+ @echo
+ sleep 5
+ rm $(PLT)
+ rm $(LOCAL_PLT)

0 comments on commit 34d90ff

Please sign in to comment.