Permalink
Browse files

early ketama integration.

Makefile needs more love, but I guess I will write a rakefile instead.
  • Loading branch information...
1 parent 6e0421e commit 91eea8dfa1865f488d51719ca6dc41b4296b5dda @cstar committed Apr 8, 2009
Showing with 364 additions and 96 deletions.
  1. +12 −1 Makefile
  2. +1 −76 README
  3. +76 −0 README.orig
  4. +6 −0 ketama.servers
  5. +139 −0 src/ketama.erl
  6. +53 −0 src/ketama_erlang_driver.c
  7. +77 −19 src/merle.erl
View
@@ -1,12 +1,21 @@
LIBDIR=`erl -eval 'io:format("~s~n", [code:lib_dir()])' -s init stop -noshell`
APP_NAME="merle"
VSN="0.3"
+CFLAGS+=-fPIC -W -Wall -Werror
-all: compile
+all: compile drivermacos
+
+macos: compile drivermacos
docs:
erl -noshell -run edoc_run application "'$(APP_NAME)'" '"."' '$(VSN)' -s init stop
+drivermacos:
+ gcc $(CFLAGS) -DMACOSX -I. -O3 -lm -lketama -o priv/ketama_erlang_driver src/ketama_erlang_driver.c
+
+driver:
+ gcc $(CFLAGS) -I. -O3 -lm -lketama -o priv/ketama_erlang_driver src/ketama_erlang_driver.c
+
compile:
@mkdir -p ebin
@erl -make
@@ -20,4 +29,6 @@ test: all
install: all
mkdir -p ${LIBDIR}/${APP_NAME}-${VSN}/ebin
+ mkdir -p ${LIBDIR}/${APP_NAME}-${VSN}/priv
+ install priv/ketama_erlang_driver $(LIBDIR)/${APP_NAME}-${VSN}/priv/ketama_erlang_driver
for i in ebin/*.beam; do install $$i $(LIBDIR)/${APP_NAME}-${VSN}/$$i ; done
View
77 README
@@ -1,76 +1 @@
-merle : An erlang based memcached client.
-
-Version : 0.3
-
-Author : Joe Williams <joe@joetify.com>
-Contributors : Nick Gerakines <nick@gerakines.net>
-
-Info : http://github.com/joewilliams/merle/
-
-merle uses LShift's gen_server2 module/behavior for faster message queues.
-http://hg.rabbitmq.com/rabbitmq-server/file/b95f2fd4e3f6/src/gen_server2.erl
-
-This code is available as Open Source Software under the MIT license.
-
-
-Features:
-* Support for stats, version, getkey, getskey, delete, set, add, replace, cas, flushall, verbosity
-
-Notes:
-* Uses term_to_binary and binary_to_term to serialize/deserialize Erlang terms before sending/receiving them. This allows for native Erlang terms to be returned from memcached but doesn't play well using other languages after setting values with merle or using merle to get values set by other languages.
-
-Usage:
-
-* Connecting to memcached *
-
-Using defaults:
-
-> merle:connect().
-
-Set your own:
-
-> merle:connect("HOSTNAME", 11211).
-
-
-* A few operations *
-
-> merle:set(a, asdf).
-ok
-> merle:getkey(a).
-asdf
-
-> merle:set(a, asdf).
-ok
-> merle:getskey(a).
-[4,asdf]
-> merle:cas(a, 4, asdfasdf).
-ok
-> merle:getskey(a).
-[5,asdfasdf]
-
-> merle:delete(a).
-ok
-
-* Informational commands *
-
-> merle:version().
-["VERSION 1.2.6"]
-
-> merle:stats(slabs).
-["STAT 1:chunk_size 104","STAT 1:chunks_per_page 10082",
- "STAT 1:total_pages 1","STAT 1:total_chunks 10082",
- "STAT 1:used_chunks 10081","STAT 1:free_chunks 1",
- "STAT 1:free_chunks_end 10080","STAT active_slabs 1",
- "STAT total_malloced 1048528","END"]
-
-> merle:stats().
-["STAT pid 27195","STAT uptime 497","STAT time 1232843046",
- "STAT version 1.2.6","STAT pointer_size 64",
- "STAT rusage_user 0.000000","STAT rusage_system 0.008000",
- "STAT curr_items 1","STAT total_items 5","STAT bytes 83",
- "STAT curr_connections 2","STAT total_connections 5",
- "STAT connection_structures 3","STAT cmd_get 5",
- "STAT cmd_set 5","STAT get_hits 5","STAT get_misses 0",
- "STAT evictions 0","STAT bytes_read 216",
- "STAT bytes_written 468","STAT limit_maxbytes 67108864",
- "STAT threads 1","END"]
+ketama client integrated
View
@@ -0,0 +1,76 @@
+merle : An erlang based memcached client.
+
+Version : 0.3
+
+Author : Joe Williams <joe@joetify.com>
+Contributors : Nick Gerakines <nick@gerakines.net>
+
+Info : http://github.com/joewilliams/merle/
+
+merle uses LShift's gen_server2 module/behavior for faster message queues.
+http://hg.rabbitmq.com/rabbitmq-server/file/b95f2fd4e3f6/src/gen_server2.erl
+
+This code is available as Open Source Software under the MIT license.
+
+
+Features:
+* Support for stats, version, getkey, getskey, delete, set, add, replace, cas, flushall, verbosity
+
+Notes:
+* Uses term_to_binary and binary_to_term to serialize/deserialize Erlang terms before sending/receiving them. This allows for native Erlang terms to be returned from memcached but doesn't play well using other languages after setting values with merle or using merle to get values set by other languages.
+
+Usage:
+
+* Connecting to memcached *
+
+Using defaults:
+
+> merle:connect().
+
+Set your own:
+
+> merle:connect("HOSTNAME", 11211).
+
+
+* A few operations *
+
+> merle:set(a, asdf).
+ok
+> merle:getkey(a).
+asdf
+
+> merle:set(a, asdf).
+ok
+> merle:getskey(a).
+[4,asdf]
+> merle:cas(a, 4, asdfasdf).
+ok
+> merle:getskey(a).
+[5,asdfasdf]
+
+> merle:delete(a).
+ok
+
+* Informational commands *
+
+> merle:version().
+["VERSION 1.2.6"]
+
+> merle:stats(slabs).
+["STAT 1:chunk_size 104","STAT 1:chunks_per_page 10082",
+ "STAT 1:total_pages 1","STAT 1:total_chunks 10082",
+ "STAT 1:used_chunks 10081","STAT 1:free_chunks 1",
+ "STAT 1:free_chunks_end 10080","STAT active_slabs 1",
+ "STAT total_malloced 1048528","END"]
+
+> merle:stats().
+["STAT pid 27195","STAT uptime 497","STAT time 1232843046",
+ "STAT version 1.2.6","STAT pointer_size 64",
+ "STAT rusage_user 0.000000","STAT rusage_system 0.008000",
+ "STAT curr_items 1","STAT total_items 5","STAT bytes 83",
+ "STAT curr_connections 2","STAT total_connections 5",
+ "STAT connection_structures 3","STAT cmd_get 5",
+ "STAT cmd_set 5","STAT get_hits 5","STAT get_misses 0",
+ "STAT evictions 0","STAT bytes_read 216",
+ "STAT bytes_written 468","STAT limit_maxbytes 67108864",
+ "STAT threads 1","END"]
View
@@ -0,0 +1,6 @@
+#------ Server ------- -Mem-#
+#255.255.255.255:65535 66666#
+127.0.0.1:11211 10
+127.0.0.1:11212 10
+127.0.0.1:11213 10
+#127.0.0.1:11214 10
View
@@ -0,0 +1,139 @@
+%%%-------------------------------------------------------------------
+%%% File : ketama.erl
+%%% Author : Richard Jones <rj@last.fm>
+%%% Description : Port driver for libketama hasing
+%%%-------------------------------------------------------------------
+-module(ketama).
+
+-behaviour(gen_server).
+
+%% API
+-export([start_link/0, start_link/1, start_link/2, getserver/1]).
+
+%% gen_server callbacks
+-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
+ terminate/2, code_change/3]).
+
+-record(state, {port, filename, last_modified, exe}).
+
+%%====================================================================
+%% API
+%%====================================================================
+%%--------------------------------------------------------------------
+%% Function: start_link() -> {ok,Pid} | ignore | {error,Error}
+%% Description: Starts the server
+%%--------------------------------------------------------------------
+start_link() ->
+ start_link("/web/site/GLOBAL/ketama.servers").
+
+start_link(ServersFile) ->
+ Bin = code:priv_dir(merle)++"/"++"ketama_erlang_driver",
+ start_link(ServersFile, Bin).
+
+start_link(ServersFile, BinPath) ->
+ gen_server:start_link({local, ?MODULE}, ?MODULE, [ServersFile, BinPath], []).
+
+getserver(Key) ->
+ gen_server:call(?MODULE, {getserver, Key}).
+
+%%====================================================================
+%% gen_server callbacks
+%%====================================================================
+
+%%--------------------------------------------------------------------
+%% Function: init(Args) -> {ok, State} |
+%% {ok, State, Timeout} |
+%% ignore |
+%% {stop, Reason}
+%% Description: Initiates the server
+%% ServersFile: ketama.servers list
+%% BinPath: path to ketama_erlang_driver binary
+%%--------------------------------------------------------------------
+init([ServersFile, BinPath]) ->
+ Exe = BinPath ++ " " ++ ServersFile,
+ Port = open_port({spawn, Exe}, [binary, {packet, 1}, use_stdio]),
+ LastMod = filelib:last_modified(ServersFile),
+ {ok, #state{port=Port,filename=ServersFile,last_modified=LastMod, exe=Exe}}.
+
+%%--------------------------------------------------------------------
+%% Function: %% handle_call(Request, From, State) -> {reply, Reply, State} |
+%% {reply, Reply, State, Timeout} |
+%% {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, Reply, State} |
+%% {stop, Reason, State}
+%% Description: Handling call messages
+%%--------------------------------------------------------------------
+handle_call({getserver, Key}, _From, #state{port=Port, filename=ServersFile,last_modified=LastMod, exe=Exe}=State) ->
+ ModDate = filelib:last_modified(ServersFile),
+ NewState = case (ModDate /= LastMod) of
+ true ->
+ error_logger:info_msg("Ketama : Reloading driver (server file changed changed)", []),
+ port_close(Port),
+ NewPort = open_port({spawn, Exe}, [binary, {packet, 1}, use_stdio]),
+ #state{port=NewPort,filename=ServersFile,last_modified=ModDate, exe=Exe};
+ false ->
+ State
+ end,
+ Port2 = NewState#state.port,
+ Port2 ! {self(), {command, Key}},
+ receive
+ {Port2, {data, Data}} ->
+ error_logger:info_msg("Received data : ~p", [Data]),
+ {reply, Data, NewState}
+ after 1000 -> % if it takes this long, you have serious issues.
+ error_logger:info_msg("Timeout on ~p", [Key]),
+ {stop, ketama_port_timeout, NewState}
+ end;
+
+handle_call(Msg, From, State)->
+ error_logger:info_msg("Received call ~p from ~p", [Msg, From]),
+ {noreply, State}.
+
+%%--------------------------------------------------------------------
+%% Function: handle_cast(Msg, State) -> {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, State}
+%% Description: Handling cast messages
+%%--------------------------------------------------------------------
+handle_cast(Msg, State) ->
+ error_logger:info_msg("Received cast ~p", [Msg]),
+ {noreply, State}.
+
+%%--------------------------------------------------------------------
+%% Function: handle_info(Info, State) -> {noreply, State} |
+%% {noreply, State, Timeout} |
+%% {stop, Reason, State}
+%% Description: Handling all non call/cast messages
+%%--------------------------------------------------------------------
+handle_info({'EXIT', Port, Reason}, #state{port = Port} = State) ->
+ {stop, {port_terminated, Reason}, State};
+
+handle_info(Msg, State)->
+ error_logger:info_msg("Received info ~p", [Msg]),
+ {noreply, State}.
+
+%%--------------------------------------------------------------------
+%% Function: terminate(Reason, State) -> void()
+%% Description: This function is called by a gen_server when it is about to
+%% terminate. It should be the opposite of Module:init/1 and do any necessary
+%% cleaning up. When it returns, the gen_server terminates with Reason.
+%% The return value is ignored.
+%%--------------------------------------------------------------------
+terminate({port_terminated, _Reason}, _State) ->
+ ok;
+
+terminate(_Reason, #state{port = Port} = _State) ->
+ port_close(Port).
+
+%%--------------------------------------------------------------------
+%% Func: code_change(OldVsn, State, Extra) -> {ok, NewState}
+%% Description: Convert process state when code is changed
+%%--------------------------------------------------------------------
+code_change(_OldVsn, State, _Extra) ->
+ {ok, State}.
+
+%%--------------------------------------------------------------------
+%%% Internal functions
+%%--------------------------------------------------------------------
+
View
@@ -0,0 +1,53 @@
+/*
+ * Expects a one-byte length header, followed by a key (<255bytes)
+ * Returns an ip:port string with 1 byte len header *
+ *
+ */
+#include <ketama.h>
+#include <stdio.h>
+#include <stdlib.h>
+#include <unistd.h>
+#include <string.h>
+
+typedef unsigned char byte;
+
+
+int read_exact(byte *buf, int len)
+{
+ int i, got = 0;
+ do {
+ if((i=read(0,buf+got, len-got))<=0) return i;
+ got += i;
+ } while(got<len);
+ return len;
+}
+
+int main(int argc, char **argv)
+{
+ if(argc==1){
+ printf("Usage: %s <ketama.servers file>\n", *argv);
+ return 1;
+ }
+
+ ketama_continuum c;
+ char *fname = *++argv;
+ mcs *m;
+ ketama_roll( &c, fname );
+ byte len;
+ byte buffer[256];
+ while ( 1 ) {
+ if( 1 != read_exact(&len, 1) ) break;
+ if( (int)len >= 255 ) break;
+ read_exact((byte *)&buffer, (int)len);
+ buffer[len] = '\0';
+
+ m = ketama_get_server( (char *) &buffer, c );
+ sprintf((char *)&buffer, "%s",m->ip);
+ int respleni = strlen(m->ip);
+ char l = (0xff & respleni);
+ write(1, &l, 1);
+ write(1, (char*)&buffer, respleni);
+ }
+
+ return 0;
+}
Oops, something went wrong.

0 comments on commit 91eea8d

Please sign in to comment.