Skip to content
Browse files

use new {stream, Size, Fun} webmachine body to do all body production…

… (bz://487)
  • Loading branch information...
1 parent 3350e38 commit 83a65915cc4ff1c09c19f9ac6e125105396f6a06 Bryan Fink committed
Showing with 14 additions and 127 deletions.
  1. +1 −1 rebar.config
  2. +13 −126 src/luwak_wm_file.erl
View
2 rebar.config
@@ -3,7 +3,7 @@
{deps, [{skerl, "0\.1",
{hg, "ssh://hg@bitbucket.org/basho/skerl", "1233b8462076"}},
{webmachine, "1.7.1",
- {hg, "http://bitbucket.org/basho/webmachine", "webmachine-1.7.1"}},
+ {hg, "http://bitbucket.org/basho/webmachine", "f103deda4daf"}},
{riak_kv, "0.12.0",
{hg, "http://bitbucket.org/basho/riak_kv", "92b956595db4"}}]}.
View
139 src/luwak_wm_file.erl
@@ -105,8 +105,6 @@
create_path/2,
process_post/2,
produce_doc_body/2,
- produce_single_byterange/2,
- produce_byteranges/2,
accept_doc_body/2,
delete_resource/2
]).
@@ -117,8 +115,7 @@
prefix, %% string() - prefix for resource uris
handle, %% {ok, riak_object()}|{error, term()}
%% - the object found
- method, %% atom() - HTTP method for the request
- ranges %% [byterange()] - parsed Range header
+ method %% atom() - HTTP method for the request
}).
-include_lib("webmachine/include/webmachine.hrl").
@@ -240,12 +237,7 @@ malformed_request(RD, Ctx) ->
RD)),
HCtx};
_ ->
- case extract_ranges(RD) of
- {ok, Ranges} ->
- {false, RD, HCtx#ctx{ranges=Ranges}};
- {error, Invalid} ->
- {true, invalid_range_message(RD, Invalid), HCtx}
- end
+ {false, RD, HCtx}
end.
%% @spec content_types_provided(reqdata(), context()) ->
@@ -260,22 +252,15 @@ content_types_provided(RD, Ctx=#ctx{key=undefined}) ->
content_types_provided(RD, Ctx=#ctx{method=Method}=Ctx) when Method =:= 'PUT';
Method =:= 'POST' ->
{ContentType, _} = extract_content_type(RD),
- {ctypes_or_byteranges(ContentType, Ctx), RD, Ctx};
+ {[{ContentType, produce_doc_body}], RD, Ctx};
content_types_provided(RD, Ctx0) ->
case defined_attribute(Ctx0, ?MD_CTYPE) of
{undefined, Ctx} ->
- {ctypes_or_byteranges("application/octet-stream", Ctx), RD, Ctx};
+ {[{"application/octet-stream", produce_doc_body}], RD, Ctx};
{Ctype, Ctx} ->
- {ctypes_or_byteranges(Ctype, Ctx), RD, Ctx}
+ {[{Ctype, produce_doc_body}], RD, Ctx}
end.
-ctypes_or_byteranges(CType, #ctx{ranges=[]}) ->
- [{CType, produce_doc_body}];
-ctypes_or_byteranges(CType, #ctx{ranges=[_]}) ->
- [{CType, produce_single_byterange}];
-ctypes_or_byteranges(_CType, #ctx{ranges=[_|_]}) ->
- [{"multipart/byteranges", produce_byteranges}].
-
%% @spec charsets_provided(reqdata(), context()) ->
%% {no_charset|[{Charset::string(), Producer::function()}],
%% reqdata(), context()}
@@ -554,80 +539,14 @@ extract_user_meta(RD) ->
end,
mochiweb_headers:to_list(wrq:req_headers(RD))).
-%% @spec extract_ranges(reqdata()) -> {ok, [byterange()]}|{error, [string()]}
-%% @type byterange() = {integer(), integer()}
-%% |{integer(), eof}
-%% |{suffix, integer()}
-extract_ranges(RD) ->
- case wrq:get_req_header(?HEAD_RANGE, RD) of
- undefined ->
- {ok, []};
- "bytes="++RawHeader ->
- RawRanges = string:tokens(RawHeader, ", "),
- Parsed = [ {catch parse_range(R), R} || R <- RawRanges ],
- case [ R || {{'EXIT',_}, R} <- Parsed ] of
- [] ->
- %% TODO: merge overlapping ranges
- {ok, [ P || {P, _} <- Parsed ]};
- Errors ->
- {error, Errors}
- end;
- Invalid ->
- {error, [Invalid]}
- end.
-
-parse_range([$-|R]) ->
- {suffix, list_to_integer(R)};
-parse_range(R) ->
- case string:tokens(R, "-") of
- [Start] ->
- {list_to_integer(Start), eof};
- [Start, End] ->
- {list_to_integer(Start), list_to_integer(End)}
- end.
-
-concrete_range(C, H, {suffix, Length}) ->
- FileLength = luwak_file:length(C, H),
- {FileLength-Length, Length};
-concrete_range(C, H, {Offset, eof}) ->
- FileLength = luwak_file:length(C, H),
- {Offset, FileLength-Offset};
-concrete_range(_C, _H, {Start, End}) ->
- %% HTTP byte range is inclusive, luwak's is not
- {Start, 1+End-Start}.
-
%% @spec produce_doc_body(reqdata(), context()) -> {binary(), reqdata(), context()}
%% @doc Extract the value of the document, and place it in the response
%% body of the request.
produce_doc_body(RD, Ctx=#ctx{handle={ok, H}, client=C}) ->
- {send_file(C, H, 0, luwak_file:length(C, H)),
+ {{stream, luwak_file:length(C, H), file_sender(C, H)},
add_user_metadata(RD, H),
Ctx}.
-produce_single_byterange(RD, Ctx=#ctx{handle={ok, H},
- client=C,
- ranges=[Range]}) ->
- {Start, End} = concrete_range(C, H, Range),
- FileLength = luwak_file:length(C, H),
- RangeHead = content_range_header(Start, End, FileLength),
- CLRD = wrq:set_resp_header(?HEAD_CRANGE, RangeHead, RD),
- {send_file(C, H, Start, End),
- add_user_metadata(CLRD, H),
- Ctx}.
-
-produce_byteranges(RD, Ctx=#ctx{handle={ok, H},
- client=C,
- ranges=Ranges}) ->
- CRanges = [ concrete_range(C, H, R) || R <- Ranges ],
- FileLength = luwak_file:length(C, H),
- Boundary = riak_core_util:unique_id_62(),
- BRD = wrq:set_resp_header(?HEAD_CTYPE,
- "multipart/byteranges; boundary="++Boundary,
- RD),
- {{stream, multi_send_file(C, H, FileLength, Boundary, CRanges)},
- add_user_metadata(BRD, H),
- Ctx}.
-
add_user_metadata(RD, Handle) ->
Attr = luwak_file:get_attributes(Handle),
case dict:find(?MD_USERMETA, Attr) of
@@ -639,12 +558,13 @@ add_user_metadata(RD, Handle) ->
error -> RD
end.
-content_range_header(Start, End, Length) ->
- io_lib:format("bytes=~b-~b/~b", [Start, Start+End-1, Length]).
-
-send_file(Client, Handle, Start, End) ->
- Stream = luwak_get_stream:start(Client, Handle, Start, End),
- {stream, (send_file_helper(Stream))()}.
+file_sender(C, H) ->
+ fun(Start, End) ->
+ %% HTTP specifies the last byte to send,
+ %% but luwak wants a number of bytes after offset
+ Stream = luwak_get_stream:start(C, H, Start, 1+End-Start),
+ (send_file_helper(Stream))()
+ end.
-define(STREAM_TIMEOUT, 5000).
@@ -662,32 +582,6 @@ send_file_helper(Stream) ->
end
end.
-%% basic strategy of multi_send_file is to wrap up calls to
-%% send_file in thunks, so that as each range finishes sending
-%% we can start the next one
-multi_send_file(_C, _H, _Length, Boundary, []) ->
- {[<<"\r\n--">>, Boundary, <<"--\r\n">>], done};
-multi_send_file(C, H, Length, Boundary, [{Start, End}|Rest]) ->
- {stream, {Data, Thunk}} = send_file(C, H, Start, End),
- {[multipart_header(Boundary, Start, End, Length), Data],
- fun() ->
- multi_send_helper(C, H, Length, Boundary, Rest, Thunk)
- end}.
-
-multi_send_helper(C, H, Length, Boundary, Rest, done) ->
- multi_send_file(C, H, Length, Boundary, Rest);
-multi_send_helper(C, H, Length, Boundary, Rest, Thunk) ->
- {Data, NewThunk} = Thunk(),
- {Data,
- fun() ->
- multi_send_helper(C, H, Length, Boundary, Rest, NewThunk)
- end}.
-
-multipart_header(Boundary, Start, End, Length) ->
- [<<"\r\n--">>, Boundary, <<"\r\n">>,
- ?HEAD_CRANGE, <<": ">>,
- content_range_header(Start, End, Length), <<"\r\n\r\n">>].
-
%% @spec ensure_doc(context()) -> context()
%% @doc Ensure that the 'doc' field of the context() has been filled
%% with the result of a riak_client:get request. This is a
@@ -760,10 +654,3 @@ send_precommit_error(RD, Reason) ->
Reason
end,
wrq:append_to_response_body(Error, RD1).
-
-invalid_range_message(RD, Ranges) ->
- RD1 = wrq:set_resp_header("Content-Type", "text/plain", RD),
- wrq:append_to_response_body(
- io_lib:format("The Range header specified invalid ranges: ~p",
- [Ranges]),
- RD1).

0 comments on commit 83a6591

Please sign in to comment.
Something went wrong with that request. Please try again.