Skip to content

Commit

Permalink
fixing bugs
Browse files Browse the repository at this point in the history
  • Loading branch information
jkvor committed Feb 22, 2011
1 parent 6725d07 commit ed33c28
Show file tree
Hide file tree
Showing 3 changed files with 89 additions and 78 deletions.
79 changes: 7 additions & 72 deletions src/nsync.erl
Original file line number Diff line number Diff line change
Expand Up @@ -64,17 +64,19 @@ handle_cast(_Msg, State) ->
handle_info({tcp, Socket, Data}, #state{callback=Callback,
caller_pid=CallerPid,
socket=Socket,
map=Map,
state=loading,
rdb_state=RdbState}=State) ->
NewState =
case rdb_load:packet(RdbState, Data, Callback) of
{error, eof} ->
{eof, Rest} ->
case CallerPid of
undefined -> ok;
_ -> CallerPid ! {self(), load_complete}
end,
nsync_utils:do_callback(Callback, [{load, eof}]),
State#state{state=up};
{ok, Rest1} = redis_text_proto:parse_commands(Rest, Callback, Map),
State#state{state=up, buffer=Rest1};
RdbState1 ->
State#state{rdb_state=RdbState1}
end,
Expand All @@ -85,7 +87,7 @@ handle_info({tcp, Socket, Data}, #state{callback=Callback,
socket=Socket,
buffer=Buffer,
map=Map}=State) ->
{ok, Rest} = parse_commands(<<Buffer/binary, Data/binary>>, Callback, Map),
{ok, Rest} = redis_text_proto:parse_commands(<<Buffer/binary, Data/binary>>, Callback, Map),
inet:setopts(Socket, [{active, once}]),
{noreply, State#state{buffer=Rest}};

Expand Down Expand Up @@ -172,6 +174,8 @@ authenticate(Socket, Auth) ->
case gen_tcp:recv(Socket, 0, ?TIMEOUT) of
{ok, <<"OK\r\n">>} ->
ok;
{ok, <<"+OK\r\n">>} ->
ok;
Error ->
Error
end;
Expand Down Expand Up @@ -211,72 +215,3 @@ init_map() ->
init_sync(Socket) ->
gen_tcp:send(Socket, <<"SYNC\r\n">>).

parse_commands(<<>>, _Callback, _Map) ->
{ok, <<>>};

parse_commands(Data, Callback, Map) ->
parse_commands(Data, Callback, Map, Data).

parse_commands(<<"*", Rest/binary>>, Callback, Map, Orig) ->
case parse_num(Rest, <<>>) of
{ok, Num, Rest1} ->
case parse_num_commands(Rest1, Num, []) of
{ok, [Cmd|Args], Rest2} ->
dispatch_cmd(Cmd, Args, Callback, Map),
parse_commands(Rest2, Callback, Map);
{error, eof} ->
{ok, Orig}
end;
{error, eof} ->
{ok, Orig}
end.

dispatch_cmd(Cmd, Args, Callback, Map) ->
Cmd1 = string:to_lower(binary_to_list(Cmd)),
case dict:find(Cmd1, Map) of
{ok, Mod} ->
case nsync_utils:do_callback(Callback, {cmd, Cmd1, Args}) of
undefined ->
ok;
Tid ->
Mod:handle(Cmd1, Args, Tid)
end;
error ->
catch nsync_utils:do_callback(Callback, {error, {unhandled_command, Cmd1}})
end.

parse_num(<<"\r\n", Rest/binary>>, Acc) ->
{ok, list_to_integer(binary_to_list(Acc)), Rest};

parse_num(<<"\r", _Rest/binary>>, _Acc) ->
{error, eof};

parse_num(<<>>, _Acc) ->
{error, eof};

parse_num(<<Char, Rest/binary>>, Acc) ->
parse_num(Rest, <<Acc/binary, Char>>).

parse_num_commands(Rest, 0, Acc) ->
{ok, lists:reverse(Acc), Rest};

parse_num_commands(<<"$", Rest/binary>>, Num, Acc) ->
case parse_num(Rest, <<>>) of
{ok, Size, Rest1} ->
case read_string(Size, Rest1) of
{ok, Cmd, Rest2} ->
parse_num_commands(Rest2, Num-1, [Cmd|Acc]);
{error, eof} ->
{error, eof}
end;
{error, eof} ->
{error, eof}
end.

read_string(Size, Data) ->
case Data of
<<Cmd:Size/binary, "\r\n", Rest/binary>> ->
{ok, Cmd, Rest};
_ ->
{error, eof}
end.
17 changes: 11 additions & 6 deletions src/rdb_load.erl
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,8 @@ packet(#state{buffer=Buffer}, Data, Callback) ->
case parse(<<Buffer/binary, Data/binary>>, Callback) of
{ok, Rest} ->
#state{buffer = Rest};
{error, eof} ->
{error, eof}
{eof, Rest} ->
{eof, Rest}
end.

parse_len(<<"$", Rest/binary>>) ->
Expand All @@ -38,11 +38,16 @@ parse(Data, Callback) ->
{ok, Type, Rest} = rdb_type(Data),
parse(Type, Rest, Callback).

parse(?REDIS_EXPIRETIME, _Data, _Callback) ->
exit("WTF is expire time?");
parse(?REDIS_EXPIRETIME, Data, Callback) ->
case Data of
<<_Time:32/unsigned-integer, Rest/binary>> ->
parse(Rest, Callback);
_ ->
{ok, <<?REDIS_EXPIRETIME, Data/binary>>}
end;

parse(?REDIS_EOF, <<>>, _Callback) ->
{error, eof};
parse(?REDIS_EOF, Rest, _Callback) ->
{eof, Rest};

parse(?REDIS_SELECTDB, Data, Callback) ->
case catch rdb_len(Data) of
Expand Down
71 changes: 71 additions & 0 deletions src/redis_text_proto.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,71 @@
-module(redis_text_proto).
-export([parse_commands/3]).

-include("nsync.hrl").

parse_commands(<<>>, _Callback, _Map) ->
{ok, <<>>};

parse_commands(<<"*", Rest/binary>>, Callback, Map) ->
case parse_num(Rest, <<>>) of
{ok, Num, Rest1} ->
case parse_num_commands(Rest1, Num, []) of
{ok, [Cmd|Args], Rest2} ->
dispatch_cmd(Cmd, Args, Callback, Map),
parse_commands(Rest2, Callback, Map);
{error, eof} ->
{ok, <<"*", Rest/binary>>}
end;
{error, eof} ->
{ok, <<"*", Rest/binary>>}
end.

dispatch_cmd(Cmd, Args, Callback, Map) ->
Cmd1 = string:to_lower(binary_to_list(Cmd)),
case dict:find(Cmd1, Map) of
{ok, Mod} ->
case nsync_utils:do_callback(Callback, [{cmd, Cmd1, Args}]) of
undefined ->
ok;
Tid ->
Mod:handle(Cmd1, Args, Tid)
end;
error ->
catch nsync_utils:do_callback(Callback, [{error, {unhandled_command, Cmd1}}])
end.

parse_num(<<"\r\n", Rest/binary>>, Acc) ->
{ok, list_to_integer(binary_to_list(Acc)), Rest};

parse_num(<<"\r", _Rest/binary>>, _Acc) ->
{error, eof};

parse_num(<<>>, _Acc) ->
{error, eof};

parse_num(<<Char, Rest/binary>>, Acc) ->
parse_num(Rest, <<Acc/binary, Char>>).

parse_num_commands(Rest, 0, Acc) ->
{ok, lists:reverse(Acc), Rest};

parse_num_commands(<<"$", Rest/binary>>, Num, Acc) ->
case parse_num(Rest, <<>>) of
{ok, Size, Rest1} ->
case read_string(Size, Rest1) of
{ok, Cmd, Rest2} ->
parse_num_commands(Rest2, Num-1, [Cmd|Acc]);
{error, eof} ->
{error, eof}
end;
{error, eof} ->
{error, eof}
end.

read_string(Size, Data) ->
case Data of
<<Cmd:Size/binary, "\r\n", Rest/binary>> ->
{ok, Cmd, Rest};
_ ->
{error, eof}
end.

0 comments on commit ed33c28

Please sign in to comment.