Skip to content

Commit

Permalink
inital work on luwak sending byte ranges for bz://487
Browse files Browse the repository at this point in the history
supports sending a single range, and has some hooks for implementing multiple ranges next
  • Loading branch information
beerriot committed Aug 5, 2010
1 parent b39201a commit 75313a1
Showing 1 changed file with 80 additions and 21 deletions.
101 changes: 80 additions & 21 deletions src/luwak_wm_file.erl
Original file line number Diff line number Diff line change
Expand Up @@ -105,6 +105,7 @@
create_path/2,
process_post/2,
produce_doc_body/2,
produce_single_byterange/2,
accept_doc_body/2,
delete_resource/2
]).
Expand All @@ -122,6 +123,9 @@
-include_lib("riak_kv/src/riak_kv_wm_raw.hrl").
-include_lib("luwak.hrl").

-define(HEAD_RANGE, "Range").
-define(HEAD_CRANGE, "Content-Range").

%% @spec init(proplist()) -> {ok, context()}
%% @doc Initialize this resource. This function extracts the
%% 'prefix' property from the dispatch args.
Expand Down Expand Up @@ -256,15 +260,24 @@ content_types_provided(RD, Ctx=#ctx{key=undefined}) ->
content_types_provided(RD, Ctx=#ctx{method=Method}=Ctx) when Method =:= 'PUT';
Method =:= 'POST' ->
{ContentType, _} = extract_content_type(RD),
{[{ContentType, produce_doc_body}], RD, Ctx};
{ctypes_or_byteranges(ContentType, RD), RD, Ctx};
content_types_provided(RD, Ctx0) ->
case defined_attribute(Ctx0, ?MD_CTYPE) of
{undefined, Ctx} ->
{[{"application/octet-stream", produce_doc_body}], RD, Ctx};
{ctypes_or_byteranges("application/octet-stream", RD), RD, Ctx};
{Ctype, Ctx} ->
{[{Ctype, produce_doc_body}], RD, Ctx}
{ctypes_or_byteranges(Ctype, RD), RD, Ctx}
end.

ctypes_or_byteranges(CType, RD) ->
case extract_ranges(RD) of
[] ->
[{CType, produce_doc_body}];
[_] ->
[{CType, produce_single_byterange}];
[_|_] ->
[{"multipart/byteranges", produce_byteranges}]
end.
%% TODO: range queries will require use of multipart/byteranges

%% @spec charsets_provided(reqdata(), context()) ->
%% {no_charset|[{Charset::string(), Producer::function()}],
Expand Down Expand Up @@ -544,33 +557,79 @@ extract_user_meta(RD) ->
end,
mochiweb_headers:to_list(wrq:req_headers(RD))).

%% @spec extract_ranges(reqdata()) -> [byterange()]
%% @type byterange() = {integer(), integer()}
%% |{integer(), eof}
%% |{suffix, integer()}
extract_ranges(RD) ->
case wrq:get_req_header(?HEAD_RANGE, RD) of
undefined ->
[];
RawHeader ->
RawRanges = string:tokens(RawHeader, ", "),
%% TODO: merge overlapping ranges
[ parse_range(R) || R <- RawRanges ]
end.

parse_range([$-|R]) ->
{suffix, list_to_integer(R)};
parse_range(R) ->
case string:tokens(R, "-") of
[Start] ->
{list_to_integer(Start), eof};
[Start, End] ->
{list_to_integer(Start), list_to_integer(End)}
end.

concrete_range(C, H, {suffix, Length}) ->
FileLength = luwak_file:length(C, H),
{FileLength-Length, Length};
concrete_range(C, H, {Offset, eof}) ->
FileLength = luwak_file:length(C, H),
{Offset, FileLength-Offset};
concrete_range(_C, _H, {Start, End}) ->
%% HTTP byte range is inclusive, luwak's is not
{Start, 1+End-Start}.

%% @spec produce_doc_body(reqdata(), context()) -> {binary(), reqdata(), context()}
%% @doc Extract the value of the document, and place it in the response
%% body of the request.
produce_doc_body(RD, Ctx=#ctx{handle={ok, H}, client=C}) ->
Attr = luwak_file:get_attributes(H),
UserMetaRD = case dict:find(?MD_USERMETA, Attr) of
{ok, UserMeta} ->
lists:foldl(fun({K,V},Acc) ->
wrq:merge_resp_headers([{K,V}],Acc)
end,
RD, UserMeta);
error -> RD
end,
{send_file(C, H), UserMetaRD, Ctx}.

send_file(Client, Handle) ->
Stream = luwak_get_stream:start(Client, Handle, 0,
luwak_file:length(Client, Handle)),
{stream, (send_file_helper(Stream, 0))()}.
{send_file(C, H, 0, luwak_file:length(C, H)),
add_user_metadata(RD, H),
Ctx}.

produce_single_byterange(RD, Ctx=#ctx{handle={ok, H}, client=C}) ->
{Start, End} = concrete_range(C, H, hd(extract_ranges(RD))),
FileLength = luwak_file:length(C, H),
RangeHead = io_lib:format("~b-~b/~b", [Start, Start+End-1, FileLength]),
CLRD = wrq:set_resp_header(?HEAD_CRANGE, RangeHead, RD),
{send_file(C, H, Start, End),
add_user_metadata(CLRD, H),
Ctx}.

add_user_metadata(RD, Handle) ->
Attr = luwak_file:get_attributes(Handle),
case dict:find(?MD_USERMETA, Attr) of
{ok, UserMeta} ->
lists:foldl(fun({K,V},Acc) ->
wrq:merge_resp_headers([{K,V}],Acc)
end,
RD, UserMeta);
error -> RD
end.

send_file(Client, Handle, Start, End) ->
Stream = luwak_get_stream:start(Client, Handle, Start, End),
{stream, (send_file_helper(Stream))()}.

-define(STREAM_TIMEOUT, 5000).

send_file_helper(Stream, Count) ->
send_file_helper(Stream) ->
fun() ->
case luwak_get_stream:recv(Stream, ?STREAM_TIMEOUT) of
{Data, _Offset} when is_binary(Data) ->
{Data, send_file_helper(Stream, Count+size(Data))};
{Data, send_file_helper(Stream)};
eos ->
{<<>>, done};
closed ->
Expand Down

0 comments on commit 75313a1

Please sign in to comment.