Skip to content

Commit

Permalink
Major improvements:
Browse files Browse the repository at this point in the history
 * Corrects packet reading to read buffer first.
 * Use {active,once} to only read packets when we're interested in
   them
 * Implement enough query response code to execute simple queries.
  • Loading branch information
archaelus committed Jan 6, 2009
1 parent a8800a5 commit 932930d
Showing 1 changed file with 99 additions and 16 deletions.
115 changes: 99 additions & 16 deletions src/mysql_client.erl
Expand Up @@ -6,7 +6,10 @@
-module(mysql_client).
-behaviour(plain_fsm).

-export([spawn_link/5]).
-export([spawn_link/5
,squery/2
]).

-export([data_vsn/0, code_change/3]).
-include("plain_fsm.hrl").

Expand All @@ -26,6 +29,8 @@
error_logger:error_msg("(~p ~p:~p) " ++ Format,
[self(), ?MODULE, ?LINE | Args])).

-define(CALL_TAG, '$mysql_call').

spawn_link(Host, Port, Username, Password, Options) ->
crypto:start(),
Conf = #conf{host=Host,
Expand All @@ -38,6 +43,15 @@ spawn_link(Host, Port, Username, Password, Options) ->
connect(#state{conf=Conf})
end).

call(Pid, Message) ->
gen:call(Pid, ?CALL_TAG, Message).

reply(From, Reply) ->
gen:reply(From, Reply).

squery(Pid, Query) ->
call(Pid, {squery, Query}).

%%====================================================================
%% FSM States
%%====================================================================
Expand All @@ -46,7 +60,9 @@ connect(S = #state{sock=undefined,
conf=#conf{host=Host,
port=Port}}) ->
case gen_tcp:connect(Host, Port,
[{active, true}, {packet, raw}, binary]) of
[{active, once},
{packet, raw},
binary]) of
{ok, Sock} ->
wait_handshake(S#state{sock=Sock});
{error, Reason} ->
Expand Down Expand Up @@ -80,39 +96,106 @@ wait_handshake(S = #state{conf=#conf{username=Username,
wait_handshake_response(S = #state{seq=Seq}) ->
{Bytes, _S1} = read_packet(S),
{packet, Seq, Resp, <<>>} = mysql_proto:decode(response, Bytes),
?INFO("Got response, ~p~n", [Resp]),
erlang:exit(nyi).
case Resp of
{response, ok, _} ->
connected(S#state{seq=0});
{response, error, _, Msg} ->
erlang:exit({error, Msg})
end.

connected(S = #state{}) ->
plain_fsm:extended_receive(
receive
{?CALL_TAG, From, Msg} ->
handle_call(Msg, From, S);
{tcp_closed, _Sock} ->
erlang:exit(socket_closed);
{tcp_data, _Sock, Data} ->
erlang:exit({unexpected_tcp_data, Data});
Message ->
erlang:exit({unexpected_message, Message})
end).

handle_call({squery, Q}, From, S) ->
State = send({command, 'query', [{sql, Q}]}, reset_seq(S)),
sent_query(From, State).

sent_query(From, State) ->
{Bytes, NewState} = read_packet(State),
case mysql_proto:decode(result_set_header, Bytes) of
{packet, _Seq, {result_set_header, FieldCount, _Extra}, _Rest} ->
read_fields(FieldCount, From, NewState)
end.

read_fields(FieldCount, From, State) ->
read_fields(FieldCount, From, [], State).

read_fields(FieldsLeft, From, Acc, State) when FieldsLeft > 0 ->
{Bytes, NewState} = read_packet(State),
case mysql_proto:decode(field, Bytes) of
{packet, _Seq, {field, Info}, _Rest} ->
read_fields(FieldsLeft - 1,
From,
[ {proplists:get_value(name, Info), Info} | Acc],
NewState)
end;
read_fields(0, From, FieldAcc, State) ->
{Bytes, NewState} = read_packet(State),
case mysql_proto:decode(field, Bytes) of
{packet, _Seq, {end_of_fields, _Info}, _Rest} ->
read_rows([], FieldAcc, From, NewState)
end.

read_rows(Acc, FieldInfo, From, State) ->
{Bytes, NewState} = read_packet(State),
case mysql_proto:decode(row, Bytes) of
{packet, _Seq, {row, Columns}, _Rest} ->
read_rows([ list_to_tuple(Columns) | Acc],
FieldInfo, From, NewState);
{packet, _Seq, {row_eof, _Info}, _Rest} ->
reply(From, {query_result, FieldInfo, lists:reverse(Acc)}),
connected(State)
end.
%%====================================================================
%% Internal Functions
%%====================================================================

send(Response, State = #state{sock=Sock, seq=Seq}) ->
RespBytes = mysql_proto:encode(Response),
Bytes = mysql_proto:encode_packet(Seq, RespBytes),
send(Message, State = #state{sock=Sock, seq=Seq}) ->
MsgBytes = mysql_proto:encode(Message),
Bytes = mysql_proto:encode_packet(Seq, MsgBytes),
?INFO("Sent ~P~n", [Bytes, 10000]),
case gen_tcp:send(Sock, Bytes) of
ok -> State#state{seq=Seq+1};
Err -> erlang:exit({send_error, Err})
end.

read_packet(S = #state{buf=Buf}) ->
case mysql_proto:decode(packet, Buf) of
{packet, Bytes, Rest} ->
{Bytes, S#state{buf=Rest}};
{incomplete, Rest} ->
read_wire_packet(S#state{buf=Rest});
{incomplete, _SizeNeeded, Rest} ->
read_wire_packet(S#state{buf=Rest})
end.

read_wire_packet(S = #state{sock=Sock, buf=Buf}) ->
ok = inet:setopts(Sock, [{active, once}]),
receive
{tcp_closed, _Sock} ->
erlang:exit(normal);
{tcp, _Sock, Data} ->
{tcp, Sock, Data} ->
?INFO("Received ~P~n", [Data, 10000]),
NewBuf = iolist_to_binary([Buf, Data]),
case mysql_proto:decode(packet, NewBuf) of
{packet, Bytes, Rest} ->
{Bytes, S#state{buf=Rest}};
{incomplete, Rest} ->
read_packet(S#state{buf=Rest});
{incomplete, _SizeNeeded, Rest} ->
read_packet(S#state{buf=Rest})
end
read_packet(S#state{buf=NewBuf})
end.

reset_seq(S = #state{}) ->
S#state{seq=0}.

incr_seq(S = #state{seq=Seq}) ->
S#state{seq=Seq + 1}.

code_change(_OldVsn, _State, _Extra) ->
{ok, {newstate, data_vsn()}}.

0 comments on commit 932930d

Please sign in to comment.