Skip to content

Commit

Permalink
add parse query using PEG and tests
Browse files Browse the repository at this point in the history
  • Loading branch information
Manuel Rubio committed Jul 17, 2013
1 parent 9130ce9 commit 806f635
Show file tree
Hide file tree
Showing 17 changed files with 969 additions and 44 deletions.
5 changes: 4 additions & 1 deletion .gitignore
Expand Up @@ -2,4 +2,7 @@
deps
*.o
*.beam
*.plt
*.plt
ebin/
log/
src/mysql.erl
1 change: 1 addition & 0 deletions README.org
Expand Up @@ -19,6 +19,7 @@
#+BEGIN_EXAMPLE
{myproto, [
{handler, my_dummy_handler},
{parse_query, true},
{server_sign, <<"5.5-myproto">>},
{port, 3306}
]}.
Expand Down
26 changes: 20 additions & 6 deletions include/myproto.hrl
Expand Up @@ -121,14 +121,17 @@
-define(CLIENT_CONNECT_ATTRS, 16#100000).
-define(CLIENT_PLUGIN_AUTH_LENENC_CLIENT_DATA, 16#200000).

-include("sql.hrl").

-record(request, {
command :: integer(),
info :: string(),
info :: string() | sql(),
continue = false :: boolean(),
id = 0 :: integer()
}).

-type request() :: #request{}.

-record(user, {
name :: binary(),
password :: binary(),
Expand All @@ -137,26 +140,37 @@
charset :: binary()
}).

-type user() :: #user{}.

-record(response, {
status = 0 :: integer(),
id = 0 :: integer(),
affected_rows = 0 :: integer(), %% as var_integer
last_insert_id = 0 :: integer(), %% as var_integer
status_flags = 0 :: integer(),
warnings = 0 :: integer(), %% only with protocol 4.1
info = <<"">> :: binary(),
info = <<>> :: binary(),
error_code = 0 :: integer(),
error_info = <<"">> :: binary()
error_info = <<>> :: binary()
}).

-type response() :: #response{}.

-record(column, {
schema = <<"">> :: binary(),
table = <<"">> :: binary(),
schema = <<>> :: binary(),
table = <<>> :: binary(),
name :: binary(),
charset = ?UTF8_GENERAL_CI :: integer(),
length :: integer(),
type :: integer(),
flags = 0 :: integer(),
decimals = 0 :: integer(),
default = <<"">> :: ( binary() | integer() )
default = <<>> :: ( binary() | integer() )
}).

-type column() :: #column{}.

-type user_string() :: binary().
-type password() :: binary().
-type hash() :: binary().

41 changes: 41 additions & 0 deletions include/sql.hrl
@@ -0,0 +1,41 @@
%% -*- erlang; utf-8 -*-

% COMMON
-record(table, {name, alias}).
-record(all, {table}).
-record(subquery, {name, subquery }).
-record(key, {alias, name, table}).
-record(value, {name, value}).
-record(condition, {nexo, op1, op2}).
-record(function, {name, params, alias}).
-record(operation, {type, op1, op2}).
-record(variable, {name, label, scope}).

% SHOW
-record(show, {type, full, from}).

-type show() :: #show{}.

% SELECT
-record(select, {params, tables, conditions, group, order, limit, offset}).
-record(order, {key, sort}).

-type select() :: #select{}.

% UPDATE
-record(update, {table, set, conditions}).
-record(set, {key, value}).

-type update() :: #update{}.

% DELETE
-record(delete, {table, conditions}).

-type delete() :: #delete{}.

% INSERT
-record(insert, {table, values}).

-type insert() :: #insert{}.

-type sql() :: show() | select() | update() | delete() | insert().
7 changes: 6 additions & 1 deletion rebar.config
@@ -1,4 +1,9 @@
{erl_opts, [{parse_transform, lager_transform}]}.
{deps, [
{lager, ".*", {git, "git://github.com/basho/lager.git", master}}
{lager, ".*", {git, "git://github.com/basho/lager.git", master}},
{neotoma, ".*", {git, "git://github.com/seancribbs/neotoma.git", master}}
]}.
{eunit_opts, [verbose, {report,{eunit_surefire,[{dir,"."}]}}]}.

{cover_enabled, true}.
{cover_print_enable, true}.
10 changes: 5 additions & 5 deletions src/gen_myproto.erl
Expand Up @@ -3,17 +3,17 @@
-include("../include/myproto.hrl").

-callback check_pass(
User::binary(),
Hash::binary(),
Password::binary()
User::user_string(),
Hash::hash(),
Password::password()
) ->
{ok, binary(), State::term()} |
{ok, password(), State::term()} |
{error, Reason::binary()} |
{error, Code::integer(), Reason::binary()}.


-callback execute(
Query :: #request{},
Query :: request(),
State :: term()
) ->
{#response{}, State::term()}.
Expand Down
17 changes: 9 additions & 8 deletions src/my_acceptor.erl
Expand Up @@ -5,24 +5,25 @@

-define(SERVER, ?MODULE).

-export([start_link/2]).
-export([start_link/3]).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2,
terminate/2, code_change/3]).

-record(state, {
lsocket :: get_tcp:socket(),
id :: integer(), %% counter connection ID
handler :: atom() %% handler for queries/auth
handler :: atom(), %% handler for queries/auth
parse_query :: boolean() %% if the query will be parsed or not
}).

start_link(Port, Handler) ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [Port, Handler], []).
start_link(Port, Handler, ParseQuery) ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [Port, Handler, ParseQuery], []).

init([Port, Handler]) ->
init([Port, Handler, ParseQuery]) ->
Opts = [binary, {packet, 0}, {active, true}, {reuseaddr, true}],
case gen_tcp:listen(Port, Opts) of
{ok, LSocket} ->
{ok, #state{lsocket=LSocket, id=1, handler=Handler}, 0};
{ok, #state{lsocket=LSocket, id=1, handler=Handler, parse_query=ParseQuery}, 0};
{error, Reason} ->
{stop, Reason}
end.
Expand All @@ -34,9 +35,9 @@ handle_call(_Request, _From, State) ->
handle_cast(_Msg, State) ->
{noreply, State}.

handle_info(timeout, State=#state{lsocket=LSocket, id=Id, handler=Handler}) ->
handle_info(timeout, State=#state{lsocket=LSocket, id=Id, handler=Handler, parse_query=ParseQuery}) ->
{ok, Socket} = gen_tcp:accept(LSocket),
Res = my_request:start(Socket, Id, Handler),
Res = my_request:start(Socket, Id, Handler, ParseQuery),
lager:info("Incoming connection: ~p~n", [Res]),
{noreply, State#state{id=(Id+1) rem 16#100000000}, 0};

Expand Down
12 changes: 12 additions & 0 deletions src/my_datatypes.erl
Expand Up @@ -11,35 +11,47 @@

%% String.NUL

-spec string_nul_to_binary(String :: binary()) -> binary().

string_nul_to_binary(String) ->
list_to_binary(lists:takewhile(fun(X) ->
X =/= 0
end, binary_to_list(String))).

%% Varchars

-spec binary_to_varchar(Binary::binary()) -> binary().

binary_to_varchar(Binary) ->
Len = number_to_var_integer(byte_size(Binary)),
<<Len/binary, Binary/binary>>.

%% Fix Integers

-spec fix_integer_to_number(Size::integer(), Data::integer()) -> integer().

fix_integer_to_number(Size, Data) when is_integer(Size) andalso is_integer(Data) ->
BitSize = Size * 8,
<<Num:BitSize/little>> = Data,
Num.

-spec number_to_fix_integer(Size::integer(), Data::binary()) -> binary().

number_to_fix_integer(Size, Data) when is_integer(Size) andalso is_binary(Data) ->
BitSize = Size * 8,
<<Data:BitSize/little>>.

%% Var integers

-spec var_integer_to_number(Var::binary()) -> integer().

var_integer_to_number(<<16#fc, Data:16/little>>) -> Data;
var_integer_to_number(<<16#fd, Data:24/little>>) -> Data;
var_integer_to_number(<<16#fe, Data:64/little>>) -> Data;
var_integer_to_number(<<Data:8/little>>) -> Data.

-spec number_to_var_integer(Data::integer()) -> binary().

number_to_var_integer(Data) when is_integer(Data) andalso Data < 251 ->
<<Data:8>>;
number_to_var_integer(Data) when is_integer(Data) andalso Data < 16#10000 ->
Expand Down
5 changes: 3 additions & 2 deletions src/my_dummy_handler.erl
Expand Up @@ -11,13 +11,14 @@ check_pass(User, Hash, Password) ->
_ -> {error, <<"Password incorrect!">>}
end.

execute(#request{info = <<"select @@version_comment", _/binary>>}, State) ->
execute(#request{info = #select{params=[#variable{name = <<"version_comment">>}]}}, State) ->
Info = {
[#column{name = <<"@@version_comment">>, type=?TYPE_VARCHAR, length=20}],
[[<<"myproto 0.1">>]]
},
{#response{status=?STATUS_OK, info=Info}, State};
execute(_Request, State) ->
execute(Request, State) ->
lager:info("Request: ~p~n", [Request]),
Info = {
[
#column{name = <<"Info">>, type=?TYPE_VARCHAR, length=20},
Expand Down
51 changes: 36 additions & 15 deletions src/my_request.erl
Expand Up @@ -7,25 +7,31 @@

-include("../include/myproto.hrl").

-export([start/3, check_clean_pass/2, check_sha1_pass/2, sha1_hex/1, to_hex/1]).
-export([start/4, check_clean_pass/2, check_sha1_pass/2, sha1_hex/1, to_hex/1]).
-export([init/1, handle_sync_event/4, handle_event/3, handle_info/3,
terminate/3, code_change/4]).

-record(state, {
socket :: gen_tcp:socket(), %% TCP connection
id :: integer(), %% connection id
hash :: binary(), %% hash for auth
handler :: atom(), %% Handler for auth/queries
packet = <<>> :: binary(), %% Received packet
socket :: gen_tcp:socket(), %% TCP connection
id :: integer(), %% connection id
hash :: binary(), %% hash for auth
handler :: atom(), %% Handler for auth/queries
packet = <<>> :: binary(), %% Received packet
parse_query = false :: boolean(), %% parse the received string or not
handler_state
}).

%% API

-spec start(Socket :: gen_tcp:socket(), Id :: integer(), Handler :: atom()) -> {ok, pid()}.
-spec start(
Socket::gen_tcp:socket(),
Id::integer(),
Handler::atom(),
ParseQuery::boolean()
) -> {ok, pid()}.

start(Socket, Id, Handler) ->
{ok, Pid} = gen_fsm:start(?MODULE, [Socket, Id, Handler], []),
start(Socket, Id, Handler, ParseQuery) ->
{ok, Pid} = gen_fsm:start(?MODULE, [Socket, Id, Handler, ParseQuery], []),
gen_tcp:controlling_process(Socket, Pid),
inet:setopts(Socket, [{active, true}]),
{ok, Pid}.
Expand Down Expand Up @@ -60,7 +66,7 @@ check_clean_pass(Pass, Salt) ->

%% callbacks

init([Socket, Id, Handler]) ->
init([Socket, Id, Handler, ParseQuery]) ->
Hash = list_to_binary(
lists:map(fun
(0) -> 1;
Expand All @@ -75,7 +81,7 @@ init([Socket, Id, Handler]) ->
info=Hash
},
gen_tcp:send(Socket, my_packet:encode(Hello)),
{ok, auth, #state{socket=Socket, id=Id, hash=Hash, handler=Handler}}.
{ok, auth, #state{socket=Socket, id=Id, hash=Hash, handler=Handler, parse_query=ParseQuery}}.

handle_info({tcp,_Port, Info}, auth, StateData=#state{hash=Hash,socket=Socket,handler=Handler}) ->
#request{info=#user{
Expand Down Expand Up @@ -116,12 +122,27 @@ handle_info({tcp,_Port, Info}, auth, StateData=#state{hash=Hash,socket=Socket,ha
handle_info({tcp,_Port,Msg}, normal, #state{socket=Socket,handler=Handler,packet=Packet,handler_state=HandlerState}=StateData) ->
case my_packet:decode(Msg) of
#request{continue=true, info=Info}=Request ->
lager:info("Received (partial): ~p~n", [Request]),
lager:debug("Received (partial): ~p~n", [Request]),
{next_state, normal, StateData#state{packet = <<Packet/binary, Info/binary>>}};
#request{continue=false, id=Id, info=Info}=Request ->
lager:info("Received: ~p~n", [Request]),
{Response, NewHandlerState} = Handler:execute(Request#request{info = <<Packet/binary, Info/binary>>}, HandlerState),
lager:info("Response: ~p~n", [Response]),
lager:debug("Received: ~p~n", [Request]),
FullPacket = <<Packet/binary, Info/binary>>,
Query = case StateData#state.parse_query of
false -> Request#request{info = FullPacket};
true ->
case mysql:parse(FullPacket) of
{fail,Expected} ->
lager:error("SQL invalid: ~p~n", [Expected]),
Request#request{info = FullPacket};
{_, Extra, Where} ->
lager:error("SQL error: ~p ~p~n", [Extra, Where]),
Request#request{info = FullPacket};
Parsed ->
Request#request{info = Parsed}
end
end,
{Response, NewHandlerState} = Handler:execute(Query, HandlerState),
lager:debug("Response: ~p~n", [Response]),
gen_tcp:send(Socket, my_packet:encode(
Response#response{id = Id+1}
)),
Expand Down
6 changes: 5 additions & 1 deletion src/myproto_app.erl
Expand Up @@ -12,7 +12,11 @@
start(_StartType, _StartArgs) ->
{ok, Port} = application:get_env(myproto, port),
{ok, Handler} = application:get_env(myproto, handler),
myproto_sup:start_link(Port, Handler).
ParseQuery = case application:get_env(myproto, parse_query) of
{ok, PQ} -> PQ;
_ -> false
end,
myproto_sup:start_link(Port, Handler, ParseQuery).

stop(_State) ->
ok.

0 comments on commit 806f635

Please sign in to comment.