Skip to content

Commit

Permalink
Works for stats.
Browse files Browse the repository at this point in the history
  • Loading branch information
dustin committed Mar 31, 2011
1 parent a34e2f0 commit 4aaa40a
Show file tree
Hide file tree
Showing 6 changed files with 258 additions and 0 deletions.
1 change: 1 addition & 0 deletions Emakefile
@@ -0,0 +1 @@
{ ['src/*'], [ {i, "include" }, {outdir, "ebin"}, debug_info ]}.
7 changes: 7 additions & 0 deletions Makefile
@@ -0,0 +1,7 @@
all: ebin compile

ebin:
mkdir -p ebin

compile:
erl -pa build -noinput +B -eval 'case make:all([{i, "'${COUCH_SRC}'"}]) of up_to_date -> halt(0); error -> halt(1) end.'
64 changes: 64 additions & 0 deletions include/mc_constants.hrl
@@ -0,0 +1,64 @@
-define(HEADER_LEN, 24).
-define(REQ_MAGIC, 16#80).
-define(RES_MAGIC, 16#81).

% Command codes.
-define(GET, 16#00).
-define(SET, 16#01).
-define(ADD, 16#02).
-define(REPLACE, 16#03).
-define(DELETE, 16#04).
-define(INCREMENT, 16#05).
-define(DECREMENT, 16#06).
-define(QUIT, 16#07).
-define(FLUSH, 16#08).
-define(GETQ, 16#09).
-define(NOOP, 16#0a).
-define(VERSION, 16#0b).
-define(GETK, 16#0c).
-define(GETKQ, 16#0d).
-define(APPEND, 16#0e).
-define(PREPEND, 16#0f).
-define(STAT, 16#10).
-define(SETQ, 16#11).
-define(ADDQ, 16#12).
-define(REPLACEQ, 16#13).
-define(DELETEQ, 16#14).
-define(INCREMENTQ, 16#15).
-define(DECREMENTQ, 16#16).
-define(QUITQ, 16#17).
-define(FLUSHQ, 16#18).
-define(APPENDQ, 16#19).
-define(PREPENDQ, 16#1a).
-define(RGET, 16#30).
-define(RSET, 16#31).
-define(RSETQ, 16#32).
-define(RAPPEND, 16#33).
-define(RAPPENDQ, 16#34).
-define(RPREPEND, 16#35).
-define(RPREPENDQ, 16#36).
-define(RDELETE, 16#37).
-define(RDELETEQ, 16#38).
-define(RINCR, 16#39).
-define(RINCRQ, 16#3a).
-define(RDECR, 16#3b).
-define(RDECRQ, 16#3c).

% Response status codes.
-define(SUCCESS, 16#00).
-define(KEY_ENOENT, 16#01).
-define(KEY_EEXISTS, 16#02).
-define(E2BIG, 16#03).
-define(EINVAL, 16#04).
-define(NOT_STORED, 16#05).
-define(DELTA_BADVAL, 16#06).
-define(UNKNOWN_COMMAND, 16#81).
-define(ENOMEM, 16#82).

-record(mc_response, {
status=0,
extra,
key,
body,
cas=0
}).
68 changes: 68 additions & 0 deletions src/mc_connection.erl
@@ -0,0 +1,68 @@
-module (mc_connection).

-export([loop/2]).
-export([respond/4]).

-include("mc_constants.hrl").

bin_size(undefined) -> 0;
bin_size(List) when is_list(List) -> bin_size(list_to_binary(List));
bin_size(Binary) -> size(Binary).

xmit(_Socket, undefined) -> ok;
xmit(Socket, List) when is_list(List) -> xmit(Socket, list_to_binary(List));
xmit(Socket, Data) -> gen_tcp:send(Socket, Data).

respond(Socket, OpCode, Opaque, Res) ->
KeyLen = bin_size(Res#mc_response.key),
ExtraLen = bin_size(Res#mc_response.extra),
BodyLen = bin_size(Res#mc_response.body) + (KeyLen + ExtraLen),
Status = Res#mc_response.status,
CAS = Res#mc_response.cas,
ok = gen_tcp:send(Socket, <<?RES_MAGIC, OpCode:8, KeyLen:16,
ExtraLen:8, 0:8, Status:16,
BodyLen:32, Opaque:32, CAS:64>>),
ok = xmit(Socket, Res#mc_response.extra),
ok = xmit(Socket, Res#mc_response.key),
ok = xmit(Socket, Res#mc_response.body).

% Read-data special cases a 0 size to just return an empty binary.
read_data(_Socket, 0, _ForWhat) -> <<>>;
read_data(Socket, N, ForWhat) ->
error_logger:info_msg("Reading ~p bytes of ~p~n", [N, ForWhat]),
{ok, Data} = gen_tcp:recv(Socket, N),
Data.

process_message(Socket, StorageServer, {ok, <<?REQ_MAGIC:8, ?STAT:8, KeyLen:16,
ExtraLen:8, 0:8, 0:16,
BodyLen:32,
Opaque:32,
CAS:64>>}) ->
error_logger:info_msg("Got a stat request for ~p.~n", [StorageServer]),

Extra = read_data(Socket, ExtraLen, extra),
Key = read_data(Socket, KeyLen, key),
Body = read_data(Socket, BodyLen - (KeyLen + ExtraLen), body),

% Hand the request off to the server.
gen_server:cast(StorageServer, {?STAT, Extra, Key, Body, CAS, Socket, Opaque});
process_message(Socket, StorageServer, {ok, <<?REQ_MAGIC:8, OpCode:8, KeyLen:16,
ExtraLen:8, 0:8, 0:16,
BodyLen:32,
Opaque:32,
CAS:64>>}) ->
error_logger:info_msg("Got message of type ~p to give to ~p.~n",
[OpCode, StorageServer]),

Extra = read_data(Socket, ExtraLen, extra),
Key = read_data(Socket, KeyLen, key),
Body = read_data(Socket, BodyLen - (KeyLen + ExtraLen), body),

% Hand the request off to the server.
Res = gen_server:call(StorageServer, {OpCode, Extra, Key, Body, CAS}),

respond(Socket, OpCode, Opaque, Res).

loop(Socket, Handler) ->
process_message(Socket, Handler, gen_tcp:recv(Socket, ?HEADER_LEN)),
loop(Socket, Handler).
82 changes: 82 additions & 0 deletions src/mc_daemon.erl
@@ -0,0 +1,82 @@
-module(mc_daemon).

-behaviour(gen_server).

%% API
-export([start_link/0]).

%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).

-define(SERVER, ?MODULE).

-include("couch_db.hrl").
-include("mc_constants.hrl").

-record(state, {mc_serv}).

start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).

init([]) ->
?LOG_INFO("MC daemon: starting.", []),
{ok, S} = mc_tcp_listener:start_link(11213, self()),
{ok, #state{mc_serv=S}}.

handle_call({_OpCode, _Header, _Key, _Body, _CAS}, _From, State) ->
{reply, #mc_response{status=?UNKNOWN_COMMAND, body="WTF, mate?"}, State};

handle_call(Request, _From, State) ->
?LOG_DEBUG("MC daemon: got call.", [Request]),
Reply = ok,
{reply, Reply, State}.

mk_stat(K, V) -> #mc_response{key=K, body=V}.

round_value(Val) when not is_number(Val) ->
Val;
round_value(Val) when Val == 0 ->
integer_to_list(Val);
round_value(Val) when is_float(Val) ->
lists:flatten(io_lib:format("~.6f", [Val]));
round_value(Val) ->
lists:flatten(io_lib:format("~p", [Val])).

%% float_to_list(erlang:round(Val * 1000.0) / 1000.0).

emit_stat_prop(_Socket, _Opaque, _Prefix, _Key, null) ->
ok;
emit_stat_prop(Socket, Opaque, Prefix, Key, Value) ->
mc_connection:respond(Socket, ?STAT, Opaque,
mk_stat(Prefix ++ atom_to_list(Key),
round_value(Value))).

stats_section(Socket, Opaque, {Values}, Prefix, 0) ->
lists:foreach(fun(StatKey) -> emit_stat_prop(Socket, Opaque, Prefix, StatKey,
proplists:get_value(StatKey, Values))
end, proplists:get_keys(Values));
stats_section(Socket, Opaque, {Values}, Prefix, N) ->
lists:foreach(fun(StatKey) ->
stats_section(Socket, Opaque,
proplists:get_value(StatKey, Values),
Prefix ++ atom_to_list(StatKey) ++ ":", N - 1)
end, proplists:get_keys(Values)).


handle_cast({?STAT, _Extra, _Key, _Body, _CAS, Socket, Opaque}, State) ->
{AllStats} = couch_stats_aggregator:all(),
?LOG_INFO("Got stats: ~p~n", [proplists:get_keys(AllStats)]),
stats_section(Socket, Opaque, {AllStats}, "", 2),
mc_connection:respond(Socket, ?STAT, Opaque, mk_stat("", "")),
{noreply, State};
handle_cast(_Msg, State) ->
{noreply, State}.
handle_info(_Info, State) ->
{noreply, State}.

terminate(_Reason, _State) ->
ok.

code_change(_OldVsn, State, _Extra) ->
{ok, State}.
36 changes: 36 additions & 0 deletions src/mc_tcp_listener.erl
@@ -0,0 +1,36 @@
-module (mc_tcp_listener).

-export([start/1, start/2, start_link/1, start_link/2, init/2]).

% Starting the server
start(Handler) ->
start(11211, Handler).

start(PortNum, Handler) when is_integer(PortNum) ->
{ok, spawn(?MODULE, init, [PortNum, Handler])}.

start_link(Handler) ->
start_link(11211, Handler).

start_link(PortNum, Handler) when is_integer(PortNum) ->
{ok, spawn_link(?MODULE, init, [PortNum, Handler])}.


%
% The server itself
%

% server self-init
init(PortNum, StorageServer) ->
{ok, LS} = gen_tcp:listen(PortNum, [binary,
{reuseaddr, true},
{packet, raw},
{active, false}]),
accept_loop(LS, StorageServer).

% Accept incoming connections
accept_loop(LS, StorageServer) ->
{ok, NS} = gen_tcp:accept(LS),
Pid = spawn(mc_connection, loop, [NS, StorageServer]),
gen_tcp:controlling_process(NS, Pid),
accept_loop(LS, StorageServer).

0 comments on commit 4aaa40a

Please sign in to comment.