Skip to content
Fetching contributors…
Cannot retrieve contributors at this time
313 lines (279 sloc) 9.68 KB
% Licensed under the Apache License, Version 2.0 (the "License"); you may not
% use this file except in compliance with the License. You may obtain a copy of
% the License at
%
% http://www.apache.org/licenses/LICENSE-2.0
%
% Unless required by applicable law or agreed to in writing, software
% distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
% WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
% License for the specific language governing permissions and limitations under
% the License.
-module(couch_stream).
-behaviour(gen_server).
% public API
-export([open/1, open/2, close/1]).
-export([foldl/4, foldl/5, foldl_decode/6, range_foldl/6]).
-export([copy_to_new_stream/3, write/2]).
% gen_server callbacks
-export([init/1, terminate/2, code_change/3]).
-export([handle_cast/2, handle_call/3, handle_info/2]).
-include("couch_db.hrl").
-define(DEFAULT_BUFFER_SIZE, 4096).
-record(stream,
{fd = 0,
written_pointers=[],
buffer_list = [],
buffer_len = 0,
max_buffer,
written_len = 0,
md5,
% md5 of the content without any transformation applied (e.g. compression)
% needed for the attachment upload integrity check (ticket 558)
identity_md5,
identity_len = 0,
encoding,
encoding_fun,
end_encoding_fun
}).
%%% Interface functions %%%
open(Fd) ->
open(Fd, []).
open(Fd, Options) ->
gen_server:start_link(couch_stream, {Fd, Options}, []).
close(Pid) ->
gen_server:call(Pid, close, infinity).
copy_to_new_stream(Fd, PosList, DestFd) ->
{ok, Dest} = open(DestFd),
foldl(Fd, PosList,
fun(Bin, _) ->
ok = write(Dest, Bin)
end, ok),
close(Dest).
foldl(_Fd, [], _Fun, Acc) ->
Acc;
foldl(Fd, [Pos|Rest], Fun, Acc) ->
{ok, Bin} = couch_file:pread_iolist(Fd, Pos),
foldl(Fd, Rest, Fun, Fun(Bin, Acc)).
foldl(Fd, PosList, <<>>, Fun, Acc) ->
foldl(Fd, PosList, Fun, Acc);
foldl(Fd, PosList, Md5, Fun, Acc) ->
foldl(Fd, PosList, Md5, couch_util:md5_init(), Fun, Acc).
foldl_decode(Fd, PosList, Md5, Enc, Fun, Acc) ->
{DecDataFun, DecEndFun} = case Enc of
gzip ->
ungzip_init();
identity ->
identity_enc_dec_funs()
end,
Result = foldl_decode(
DecDataFun, Fd, PosList, Md5, couch_util:md5_init(), Fun, Acc
),
DecEndFun(),
Result.
foldl(_Fd, [], Md5, Md5Acc, _Fun, Acc) ->
Md5 = couch_util:md5_final(Md5Acc),
Acc;
foldl(Fd, [{Pos, _Size}], Md5, Md5Acc, Fun, Acc) -> % 0110 UPGRADE CODE
foldl(Fd, [Pos], Md5, Md5Acc, Fun, Acc);
foldl(Fd, [Pos], Md5, Md5Acc, Fun, Acc) ->
{ok, Bin} = couch_file:pread_iolist(Fd, Pos),
Md5 = couch_util:md5_final(couch_util:md5_update(Md5Acc, Bin)),
Fun(Bin, Acc);
foldl(Fd, [{Pos, _Size}|Rest], Md5, Md5Acc, Fun, Acc) ->
foldl(Fd, [Pos|Rest], Md5, Md5Acc, Fun, Acc);
foldl(Fd, [Pos|Rest], Md5, Md5Acc, Fun, Acc) ->
{ok, Bin} = couch_file:pread_iolist(Fd, Pos),
foldl(Fd, Rest, Md5, couch_util:md5_update(Md5Acc, Bin), Fun, Fun(Bin, Acc)).
range_foldl(Fd, PosList, From, To, Fun, Acc) ->
range_foldl(Fd, PosList, From, To, 0, Fun, Acc).
range_foldl(_Fd, _PosList, _From, To, Off, _Fun, Acc) when Off >= To ->
Acc;
range_foldl(Fd, [Pos|Rest], From, To, Off, Fun, Acc) when is_integer(Pos) -> % old-style attachment
{ok, Bin} = couch_file:pread_iolist(Fd, Pos),
range_foldl(Fd, [{Pos, iolist_size(Bin)}] ++ Rest, From, To, Off, Fun, Acc);
range_foldl(Fd, [{_Pos, Size}|Rest], From, To, Off, Fun, Acc) when From > Off + Size ->
range_foldl(Fd, Rest, From, To, Off + Size, Fun, Acc);
range_foldl(Fd, [{Pos, Size}|Rest], From, To, Off, Fun, Acc) ->
{ok, Bin} = couch_file:pread_iolist(Fd, Pos),
Bin1 = if
From =< Off andalso To >= Off + Size -> Bin; %% the whole block is covered
true ->
PrefixLen = clip(From - Off, 0, Size),
PostfixLen = clip(Off + Size - To, 0, Size),
MatchLen = Size - PrefixLen - PostfixLen,
<<_Prefix:PrefixLen/binary,Match:MatchLen/binary,_Postfix:PostfixLen/binary>> = iolist_to_binary(Bin),
Match
end,
range_foldl(Fd, Rest, From, To, Off + Size, Fun, Fun(Bin1, Acc)).
clip(Value, Lo, Hi) ->
if
Value < Lo -> Lo;
Value > Hi -> Hi;
true -> Value
end.
foldl_decode(_DecFun, _Fd, [], Md5, Md5Acc, _Fun, Acc) ->
Md5 = couch_util:md5_final(Md5Acc),
Acc;
foldl_decode(DecFun, Fd, [{Pos, _Size}], Md5, Md5Acc, Fun, Acc) ->
foldl_decode(DecFun, Fd, [Pos], Md5, Md5Acc, Fun, Acc);
foldl_decode(DecFun, Fd, [Pos], Md5, Md5Acc, Fun, Acc) ->
{ok, EncBin} = couch_file:pread_iolist(Fd, Pos),
Md5 = couch_util:md5_final(couch_util:md5_update(Md5Acc, EncBin)),
Bin = DecFun(EncBin),
Fun(Bin, Acc);
foldl_decode(DecFun, Fd, [{Pos, _Size}|Rest], Md5, Md5Acc, Fun, Acc) ->
foldl_decode(DecFun, Fd, [Pos|Rest], Md5, Md5Acc, Fun, Acc);
foldl_decode(DecFun, Fd, [Pos|Rest], Md5, Md5Acc, Fun, Acc) ->
{ok, EncBin} = couch_file:pread_iolist(Fd, Pos),
Bin = DecFun(EncBin),
Md5Acc2 = couch_util:md5_update(Md5Acc, EncBin),
foldl_decode(DecFun, Fd, Rest, Md5, Md5Acc2, Fun, Fun(Bin, Acc)).
gzip_init(Options) ->
case couch_util:get_value(compression_level, Options, 0) of
Lvl when Lvl >= 1 andalso Lvl =< 9 ->
Z = zlib:open(),
ok = zlib:deflateInit(Z, Lvl),
{
fun(Data) ->
zlib:deflate(Z, Data)
end,
fun() ->
Last = zlib:deflate(Z, [], finish),
ok = zlib:deflateEnd(Z),
ok = zlib:close(Z),
Last
end
};
_ ->
identity_enc_dec_funs()
end.
ungzip_init() ->
Z = zlib:open(),
zlib:inflateInit(Z),
{
fun(Data) ->
zlib:inflate(Z, Data)
end,
fun() ->
ok = zlib:inflateEnd(Z),
ok = zlib:close(Z)
end
}.
identity_enc_dec_funs() ->
{
fun(Data) -> Data end,
fun() -> [] end
}.
write(_Pid, <<>>) ->
ok;
write(Pid, Bin) ->
gen_server:call(Pid, {write, Bin}, infinity).
init({Fd, Options}) ->
Encoding = couch_util:get_value(encoding, Options, identity),
{EncodingFun, EndEncodingFun} =
case Encoding of
identity ->
identity_enc_dec_funs();
gzip ->
gzip_init(Options)
end,
{ok, #stream{
fd=Fd,
md5=couch_util:md5_init(),
identity_md5=couch_util:md5_init(),
encoding=Encoding,
encoding_fun=EncodingFun,
end_encoding_fun=EndEncodingFun,
max_buffer=couch_util:get_value(
buffer_size, Options, ?DEFAULT_BUFFER_SIZE)
}
}.
terminate(_Reason, _Stream) ->
ok.
handle_call({write, Bin}, _From, Stream) ->
BinSize = iolist_size(Bin),
#stream{
fd = Fd,
written_len = WrittenLen,
written_pointers = Written,
buffer_len = BufferLen,
buffer_list = Buffer,
max_buffer = Max,
md5 = Md5,
identity_md5 = IdenMd5,
identity_len = IdenLen,
encoding = Encoding,
encoding_fun = EncodingFun} = Stream,
if BinSize + BufferLen > Max ->
WriteBin = lists:reverse(Buffer, [Bin]),
IdenMd5_2 = couch_util:md5_update(IdenMd5, WriteBin),
case EncodingFun(WriteBin) of
[] ->
% case where the encoder did some internal buffering
% (zlib does it for example)
WrittenLen2 = WrittenLen,
Md5_2 = Md5,
Written2 = Written;
WriteBin2 ->
{ok, Pos, _} = couch_file:append_binary(Fd, WriteBin2),
WrittenLen2 = WrittenLen + iolist_size(WriteBin2),
Md5_2 = case Encoding of
identity ->
IdenMd5_2;
gzip ->
couch_util:md5_update(Md5, WriteBin2)
end,
Written2 = [{Pos, iolist_size(WriteBin2)}|Written]
end,
{reply, ok, Stream#stream{
written_len=WrittenLen2,
written_pointers=Written2,
buffer_list=[],
buffer_len=0,
md5=Md5_2,
identity_md5=IdenMd5_2,
identity_len=IdenLen + BinSize}};
true ->
{reply, ok, Stream#stream{
buffer_list=[Bin|Buffer],
buffer_len=BufferLen + BinSize,
identity_len=IdenLen + BinSize}}
end;
handle_call(close, _From, Stream) ->
#stream{
fd = Fd,
written_len = WrittenLen,
written_pointers = Written,
buffer_list = Buffer,
md5 = Md5,
identity_md5 = IdenMd5,
identity_len = IdenLen,
encoding = Encoding,
encoding_fun = EncodingFun,
end_encoding_fun = EndEncodingFun} = Stream,
WriteBin = lists:reverse(Buffer),
IdenMd5Final = couch_util:md5_final(couch_util:md5_update(IdenMd5, WriteBin)),
WriteBin2 = EncodingFun(WriteBin) ++ EndEncodingFun(),
Md5Final = case Encoding of
identity ->
IdenMd5Final;
gzip ->
couch_util:md5_final(couch_util:md5_update(Md5, WriteBin2))
end,
Result = case WriteBin2 of
[] ->
{lists:reverse(Written), WrittenLen, IdenLen, Md5Final, IdenMd5Final};
_ ->
{ok, Pos, _} = couch_file:append_binary(Fd, WriteBin2),
StreamInfo = lists:reverse(Written, [{Pos, iolist_size(WriteBin2)}]),
StreamLen = WrittenLen + iolist_size(WriteBin2),
{StreamInfo, StreamLen, IdenLen, Md5Final, IdenMd5Final}
end,
{stop, normal, Result, Stream}.
handle_cast(_Msg, State) ->
{noreply,State}.
code_change(_OldVsn, State, _Extra) ->
{ok, State}.
handle_info(_Info, State) ->
{noreply, State}.
Something went wrong with that request. Please try again.