Permalink
Browse files

initial commit

  • Loading branch information...
maxlapshin committed Nov 4, 2011
0 parents commit e7b1f583c52a5f0e59ad9cc3f40dd47082452e7c
Showing with 301 additions and 0 deletions.
  1. +1 −0 .gitignore
  2. +12 −0 src/rack.app.src
  3. +67 −0 src/rack.erl
  4. +63 −0 src/rack_handler.erl
  5. +41 −0 src/rack_sup.erl
  6. +117 −0 src/rack_worker.erl
@@ -0,0 +1 @@
+ebin
@@ -0,0 +1,12 @@
+{application, rack,
+ [
+ {description, ""},
+ {vsn, "1"},
+ {registered, []},
+ {applications, [
+ kernel,
+ stdlib
+ ]},
+ {mod, { rack, []}},
+ {env, []}
+ ]}.
@@ -0,0 +1,67 @@
+-module(rack).
+
+-behaviour(application).
+
+%% Application callbacks
+-export([start/2, stop/1]).
+
+-export([find_worker/1, start_rack/1, start_rack/2]).
+
+%% ===================================================================
+%% Application callbacks
+%% ===================================================================
+
+start(_StartType, _StartArgs) ->
+ rack_sup:start_link().
+
+stop(_State) ->
+ ok.
+
+
+find_worker(Path) ->
+ start_rack(Path),
+ Pids = [Pid || {_, Pid, _, _} <- supervisor:which_children(worker_id(Path))],
+ N = random:uniform(length(Pids)),
+ {ok, lists:nth(N, Pids)}.
+
+
+start_rack(Path) ->
+ case erlang:whereis(rack_sup) of
+ undefined -> application:start(rack);
+ _ -> ok
+ end,
+ case erlang:whereis(worker_id(Path)) of
+ undefined ->
+ start_rack(Path, [{workers, 4}]);
+ Pid ->
+ {ok, Pid}
+ end.
+
+worker_id(Path) ->
+ list_to_atom(lists:flatten(io_lib:format("rack_worker_~s", [Path]))).
+
+worker_id(Path, N) ->
+ list_to_atom(lists:flatten(io_lib:format("rack_worker_~s_~p", [Path, N]))).
+
+start_rack(Path, Options) ->
+ Id = worker_id(Path),
+ Workers = proplists:get_value(workers, Options, 1),
+ supervisor:start_child(rack_sup, {
+ Id,
+ {supervisor, start_link, [{local, Id}, rack_sup, [rack_worker_pool_sup]]},
+ permanent,
+ infinity,
+ supervisor,
+ []
+ }),
+ lists:foreach(fun(N) ->
+ SubId = worker_id(Path, N),
+ supervisor:start_child(Id, {
+ SubId,
+ {rack_worker, start_link, [[{path,Path}]]},
+ permanent,
+ 1000,
+ worker,
+ [rack_worker]
+ })
+ end, lists:seq(1,Workers)).
@@ -0,0 +1,63 @@
+-module(rack_handler).
+-author('Max Lapshin <max@maxidoors.ru>').
+
+-export([init/3, handle/2, terminate/2]).
+
+-record(state, {
+ path
+}).
+
+init({_Transport, http}, Req, Options) ->
+ Path = proplists:get_value(path, Options, "./priv"),
+ {ok, Req, #state{path = Path}}.
+
+
+join(Tokens, <<"/">>) ->
+ iolist_to_binary([[<<"/">>, PT] || PT <- Tokens, PT =/= <<"..">> andalso PT =/= <<"">>]);
+
+join(Tokens, Sep) ->
+ iolist_to_binary(string:join([binary_to_list(PT) || PT <- Tokens, PT =/= <<"..">> andalso PT =/= <<"">>], Sep)).
+
+translate_headers(Headers) ->
+ lists:foldl(fun
+ ({'Host', _}, Acc) ->
+ Acc;
+ ({K,V}, Acc) when is_binary(K) ->
+ Name = "HTTP_" ++ re:replace(string:to_upper(binary_to_list(K)), "\\-", "_"),
+ [{list_to_binary(Name), V}|Acc];
+ ({K,V}, Acc) when is_atom(K) ->
+ Name = "HTTP_" ++ re:replace(string:to_upper(atom_to_list(K)), "\\-", "_"),
+ [{list_to_binary(Name), V}|Acc]
+ end, [], Headers).
+
+handle(Req, #state{path = Path} = State) ->
+ {RequestMethod, Req1} = cowboy_http_req:method(Req),
+ {ScriptName, Req2} = cowboy_http_req:path(Req1),
+ {PathInfo, Req3} = cowboy_http_req:path_info(Req2),
+ {QueryString, Req4} = cowboy_http_req:raw_qs(Req3),
+ {ServerName, Req5} = cowboy_http_req:host(Req4),
+ {ServerPort, Req6} = cowboy_http_req:port(Req5),
+ {RequestHeaders, _} = cowboy_http_req:headers(Req6),
+ {ok, Body, Req7} = case RequestMethod of
+ 'POST' -> cowboy_http_req:body(Req6);
+ _ -> {ok, <<"">>, Req6}
+ end,
+
+ RackSession = [
+ {<<"REQUEST_METHOD">>, atom_to_binary(RequestMethod, latin1)},
+ {<<"SCRIPT_NAME">>, join(lists:sublist(ScriptName, length(ScriptName) - length(PathInfo)), <<"/">>)},
+ {<<"PATH_INFO">>, join(PathInfo, <<"/">>)},
+ {<<"QUERY_STRING">>, QueryString},
+ {<<"SERVER_NAME">>, join(ServerName, ".")},
+ {<<"SERVER_PORT">>, list_to_binary(integer_to_list(ServerPort))},
+ {<<"HTTP_HOST">>, <<(join(ServerName, "."))/binary, ":", (list_to_binary(integer_to_list(ServerPort)))/binary>>}
+ ] ++ translate_headers(RequestHeaders),
+
+ {ok, {Status, ReplyHeaders, ReplyBody}} = rack_worker:request(Path, RackSession, Body),
+
+ {ok, Req8} = cowboy_http_req:reply(Status, ReplyHeaders, ReplyBody, Req7),
+ {ok, Req8, State}.
+
+
+terminate(_,_) ->
+ ok.
@@ -0,0 +1,41 @@
+
+-module(rack_sup).
+
+-behaviour(supervisor).
+
+%% API
+-export([start_link/0]).
+
+%% Supervisor callbacks
+-export([init/1]).
+
+%% Helper macro for declaring children of supervisor
+-define(CHILD(I, Type), {I, {I, start_link, []}, permanent, 5000, Type, [I]}).
+
+%% ===================================================================
+%% API functions
+%% ===================================================================
+
+start_link() ->
+ supervisor:start_link({local, ?MODULE}, ?MODULE, []).
+
+%% ===================================================================
+%% Supervisor callbacks
+%% ===================================================================
+
+init([rack_worker_pool_sup]) ->
+ {ok, {{one_for_one, 5, 10}, []}};
+
+init([]) ->
+ Supervisors = [
+ % { rack_worker_sup,
+ % {rack_worker,start_link,[]},
+ % permanent, % Restart = permanent | transient | temporary
+ % 1000, % Shutdown = brutal_kill | int() >= 0 | infinity
+ % worker, % Type = worker | supervisor
+ % [rack_worker] % Modules = [Module] | dynamic
+ % }
+
+ ],
+ {ok, { {one_for_one, 5, 10}, Supervisors} }.
+
@@ -0,0 +1,117 @@
+-module(rack_worker).
+-author('Max Lapshin <max@maxidoors.ru>').
+
+-export([start_link/0, start_link/1, request/3]).
+
+-export([init/1, handle_call/3, handle_info/2, terminate/2]).
+
+start_link() ->
+ start_link([]).
+
+start_link(Options) ->
+ gen_server:start_link(?MODULE, [Options], []).
+
+request(Path, Headers, Body) when is_binary(Path) ->
+ request(binary_to_list(Path), Headers, Body);
+
+request(Path, Headers, Body) when is_list(Path) ->
+ {ok, Pid} = rack:find_worker(Path),
+ request(Pid, Headers, Body);
+
+request(Pid, Headers, Body) when is_pid(Pid) ->
+ gen_server:call(Pid, {request, Headers, Body}, 30000).
+
+
+-record(state, {
+ queue,
+ port,
+ timeout,
+ timer,
+ options,
+ max_len,
+ path,
+ reply
+}).
+
+init([Options]) ->
+ Path = proplists:get_value(path, Options, "./priv"),
+ Timeout = proplists:get_value(timeout, Options, 30000),
+
+ State = #state{
+ queue = queue:new(),
+ path = Path,
+ timeout = Timeout,
+ max_len = proplists:get_value(max_len, Options, 1000),
+ options = Options
+ },
+
+ {ok, restart_worker(State)}.
+
+restart_worker(#state{port = OldPort, path = Path} = State) ->
+ (catch erlang:port_close(OldPort)),
+ Port = erlang:open_port({spawn, "./priv/worker.rb "++Path}, [use_stdio,binary,exit_status,{packet,4}]),
+ io:format("Start Rack worker at path ~s~n", [Path]),
+ try_start_request(State#state{port = Port}).
+
+
+handle_call({request, _H, _} = Request, From, #state{queue = Queue, max_len = MaxLen} = State) ->
+ case queue:len(Queue) of
+ Len when Len >= MaxLen ->
+ {reply, {error, busy}, State};
+ _ ->
+ State1 = try_start_request(State#state{queue = queue:in({Request, From}, Queue)}),
+ {noreply, State1}
+ end.
+
+try_start_request(#state{reply = undefined, queue = Queue} = State) ->
+ case queue:out(Queue) of
+ {{value, Element}, Queue1} ->
+ start_request(Element, State#state{queue = Queue1});
+ {empty, _} ->
+ State
+ end;
+
+try_start_request(#state{} = State) ->
+ State.
+
+start_request({{request, Headers, Body}, From}, #state{port = Port, timeout = Timeout} = State) ->
+ Packed = iolist_to_binary([<<(length(Headers)):32>>,[
+ <<(size(Key)):32, Key/binary, (size(Value)):32, Value/binary>> || {Key, Value} <- Headers
+ ], <<(size(Body)):32>>, Body]),
+ port_command(Port, Packed),
+ Timer = timer:send_after(Timeout, kill_request),
+ State#state{reply = From, timer = Timer}.
+
+
+
+handle_info({Port, {data, Bin}}, #state{port = Port, reply = Reply, timer = Timer} = State) ->
+ timer:cancel(Timer),
+ <<Status:32, HeadersCount:32, Rest/binary>> = Bin,
+ {Headers, BodyType, RawBody} = extract_headers(Rest, HeadersCount, []),
+ Body = case BodyType of
+ file -> {ok, B} = file:read_file(binary_to_list(RawBody)), B;
+ raw -> RawBody
+ end,
+ gen_server:reply(Reply, {ok, {Status, Headers, Body}}),
+ {noreply, try_start_request(State#state{reply = undefined, timer = undefined})};
+
+handle_info(kill_request, #state{reply = Reply} = State) ->
+ (catch gen_server:reply(Reply, {error, timeout})),
+ {noreply, restart_worker(State#state{reply = undefined, timer = undefined})};
+
+handle_info({Port, {exit_status, _Status}}, #state{port = Port} = State) ->
+ {noreply, restart_worker(State)}.
+
+
+extract_headers(<<BodyFlag, BodyLen:32, Body:BodyLen/binary>>, 0, Acc) ->
+ BodyType = case BodyFlag of
+ 1 -> file;
+ 0 -> raw
+ end,
+ {lists:reverse(Acc), BodyType, Body};
+
+extract_headers(<<KeyLen:32, Key:KeyLen/binary, ValueLen:32, Value:ValueLen/binary, Rest/binary>>, HeadersCount, Acc) ->
+ extract_headers(Rest, HeadersCount - 1, [{Key, Value}|Acc]).
+
+
+terminate(_, _) -> ok.

0 comments on commit e7b1f58

Please sign in to comment.