Skip to content

Commit

Permalink
add support for attachments.
Browse files Browse the repository at this point in the history
- open_attachment
- open_attachment_async
- save_attachment
- delete_attachment
  • Loading branch information
benoitc committed Jun 27, 2011
1 parent b5a29b5 commit 311ecbc
Show file tree
Hide file tree
Showing 2 changed files with 395 additions and 2 deletions.
354 changes: 352 additions & 2 deletions couchc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -31,7 +31,14 @@
delete_docs/2, delete_docs/3,
db_exec/2, db_admin_exec/2,
all/1, all/2, all/3,
fold/2, fold/3, fold/4]).
fold/2, fold/3, fold/4,
open_attachment/3, open_attachment/4,
open_attachment_async/3, open_attachment_async/4,
get_attachment_part/1, get_attachment_part/2,
collect_attachment/1, collect_attachment/2,
collect_attachment_ranges/1, collect_attachment_ranges/2,
save_attachment/4, save_attachment/5,
delete_attachment/3, delete_attachment/4]).


get_uuid() ->
Expand Down Expand Up @@ -532,7 +539,195 @@ fold(Db, {DName, VName}, Fun, Options) ->
fold(_, _, _, _) ->
{error, {bad_view_name, <<"bad view name">>}}.

%% utility functions

open_attachment(Db, DocId, FileName) ->
open_attachment(Db, DocId, FileName, []).

open_attachment(Db, DocId, FileName, Options) ->
case open_attachment_async(Db, DocId, FileName, Options) of
{ok, Pid, ranges} ->
{ok, collect_attachment_ranges(Pid)};
{ok, Pid, {content_md5, _}=Md5} ->
{ok, collect_attachment(Pid), Md5};
{ok, Pid} ->
{ok, Pid, collect_attachment(Pid)};
Error ->
Error
end.

open_attachment_async(Db, DocId, FileName) ->
open_attachment_async(Db, DocId, FileName, []).

open_attachment_async(Db, DocId, FileName, Options) ->
FileName1 = couch_util:to_binary(FileName),
Rev = proplists:get_value(rev, Options, nil),
DocId1 = couch_util:to_binary(DocId),
case open_doc_rev1(Db, DocId1, Rev, Options) of
{ok, Doc} ->
#doc{atts=Atts} = Doc,
case [A || A <- Atts, A#att.name == FileName1] of
[] ->
{error, {not_found, "Document is missing attachment"}};
[#att{type=Type, encoding=Enc, disk_len=DiskLen,
att_len=AttLen}=Att] ->
case acceptable_encoding(Options) of
{error, Error} ->
{error, Error};
EncList ->
AcceptsAttEnc = lists:member(Enc, EncList),
Len = case {Enc, AcceptsAttEnc} of
{identity, _} ->
DiskLen;
{_, false} when DiskLen =/= AttLen ->
DiskLen;
{_, true} ->
AttLen;
_ ->
undefined
end,
AttFun = case AcceptsAttEnc of
false ->
fun couch_doc:att_foldl_decode/3;
true ->
fun couch_doc:att_foldl/3
end,
case Len of
undefined ->
Pid = spawn(fun() ->
stream_attachment(AttFun, Att)
end),
{ok, Pid, []};
_ ->
Ranges = parse_ranges(proplists:get_value(ranges, Options), Len),
case {Enc, Ranges} of
{identity, Ranges} when is_list(Ranges) ->
Pid = spawn(fun() ->
stream_attachment_ranges(Type,
Len, Att, Ranges)
end),
{ok, Pid, ranges};
_ ->
Pid = spawn(fun() ->
stream_attachment(AttFun, Att)
end),
if Enc =:= identity orelse AcceptsAttEnc =:= true ->
{ok, Pid, {content_md5,
base64:encode(Att#att.md5)}};
true ->
{ok, Pid}
end
end
end

end
end;
{error, conflict} ->
{error, not_found};
Error ->
Error
end.

get_attachment_part(Pid) ->
get_attachment_part(Pid, infinity).

get_attachment_part(Pid, Timeout) ->
Pid ! {ack, self()},
receive
{attachment_done, Pid} ->
{ok, done};
{attachment_range, Pid, {Type, From, To, Len}} ->
Pid ! {ack, self()},
{ok, {new_range, {Type, From, To, Len}}};
{attachment_range_part, RangePart} ->
Pid ! {ack, self()},
{ok, {range_part, RangePart}};
{attachment_part, Pid, Part} ->
Pid ! {ack, self()},
{ok, Part}
after Timeout ->
kill_attachment_streamer(Pid)
end.

save_attachment(Db, DocId, FileName, Payload) ->
save_attachment(Db, DocId, FileName, Payload, []).

save_attachment(Db, DocId, FileName, Payload, Options) ->
FileName1 = couch_util:to_binary(FileName),
CType = couch_util:to_binary(proplists:get_value(content_type,
Options, <<"application/octet-stream">>)),
CLen = proplists:get_value(content_len, Options),
Md5 = couch_util:to_binary(proplists:get_value(content_md5, Options,
<<>>)),
Encoding = proplists:get_value(encoding, Options, identity),
NewAtt = [#att{
name = FileName1,
type = CType,
data = Payload,
att_len = CLen,
md5 = Md5,
encoding = Encoding}],

db_exec(Db, fun(Db0) ->
Result = case proplists:get_value(rev, Options) of
undefined ->
case couch_db:open_doc(Db0, DocId, Options) of
{ok, Doc0} ->
Doc0;
_ ->
#doc{id=DocId}
end;
Rev ->
Rev1 = couch_doc:parse_rev(Rev),
case couch_db:open_doc_revs(Db0, DocId, [Rev1], []) of
{ok, [{ok, Doc0}]} ->
Doc0;
{ok, [{{not_found, missing}, Rev1}]} ->
{error, conflict};
{ok, [Error]} ->
{error, Error}
end
end,
case Result of
{error, _} = Error1 ->
Error1;
Doc ->
#doc{atts=Atts} = Doc,
DocEdited = Doc#doc{
atts = NewAtt ++ [A || A <- Atts, A#att.name /=
FileName1]
},
{ok, UpdatedRev} = couch_db:update_doc(Db0, DocEdited, []),
UpdatedRevStr = couch_doc:rev_to_str(UpdatedRev),
{ok, DocId, UpdatedRevStr}
end
end).

delete_attachment(Db, DocId, FileName) ->
delete_attachment(Db, DocId, FileName, []).

delete_attachment(Db, DocId, FileName, Options) ->
FileName1 = couch_util:to_binary(FileName),
Rev = proplists:get_value(rev, Options, nil),
DocId1 = couch_util:to_binary(DocId),
NewAtt = [],

case open_doc_rev1(Db, DocId1, Rev, Options) of
{ok, Doc} ->
#doc{atts=Atts} = Doc,
DocEdited = Doc#doc{
atts = NewAtt ++ [A || A <- Atts, A#att.name /=
FileName1]
},
db_exec(Db, fun(Db0) ->
{ok, UpdatedRev} = couch_db:update_doc(Db0, DocEdited, []),
UpdatedRevStr = couch_doc:rev_to_str(UpdatedRev),
{ok, DocId, UpdatedRevStr}
end);
Error ->
Error
end.

%% utility functionss

db_exec(#cdb{name=DbName,options=Options}, Fun) ->
case couch_db:open(dbname(DbName), Options) of
Expand All @@ -555,7 +750,162 @@ db_admin_exec(Db, Fun) ->
end
end).

collect_attachment(Pid) ->
collect_attachment(Pid, []).

collect_attachment(Pid, Acc) ->
case get_attachment_part(Pid) of
{ok, done} ->
iolist_to_binary(lists:reverse(Acc));
{ok, {Bin, _}} ->
collect_attachment(Pid, [Bin|Acc]);
Error ->
Error
end.


collect_attachment_ranges(Pid) ->
collect_attachment_ranges(Pid, orddict:new()).

collect_attachment_ranges(Pid, Ranges) ->
case get_attachment_part(Pid) of
{ok, done} ->
RangesList = dict:to_list(Ranges),
lists:map(fun({_, {From, To, Type, Len, Acc}}) ->
Bin = iolist_to_binary(lists:reverse(Acc)),
{From, To, Type, Len, Bin}
end, RangesList);
{ok, range_part, {{From, To, Type, Len}, Bin, _}}->
NewRanges = case dict:find({From, To}, Ranges) of
{ok, {Type, From, To, Len, Acc}} ->
dict:store({From, To}, {Type, From, To, Len,
[Bin|Acc]}, Ranges);
_ ->
dict:store({From, To}, {Type, From, To, Len,
[Bin]}, Ranges)
end,
collect_attachment_ranges(Pid, NewRanges);
Error ->
Error
end.

%% private functions

stream_attachment(AttFun, Att) ->
Self = self(),
AttFun(Att, fun(Bins, _) ->
BinSegment = list_to_binary(Bins),
receive
{ack, From} ->
From ! {attachment_part, Self, {BinSegment,
size(BinSegment)}}
end,
{ok, []}
end, []),
receive
{ack, From} ->
From ! {attachment_done, Self}
end.

stream_attachment_ranges(Type, Len, Att, Ranges) ->
Self = self(),
lists:foreach(fun({From, To}) ->
receive
{ack, From} ->
From ! {attachment_range, Self, {Type,
From, To, Len}}
end,
couch_doc:range_att_foldl(Att, From, To + 1, fun(Bins, _) ->
BinSegment = list_to_binary(Bins),
receive
{ack, CFrom} ->
CFrom ! {attachment_range_part, Self, {{From, To,
Type, Len}, BinSegment,
size(BinSegment)}}
end
end, {ok, []})
end, Ranges),
receive
{ack, From} ->
From ! {attachment_done, Self}
end.

kill_attachment_streamer(Pid) ->
erlang:monitor(Pid),
unlink(Pid),
exit(Pid, timeout),
receive
{'DOWN', _, process, Pid, timeout} ->
{error, timeout};
{'DOWN', _, process, Pid, Reason} ->
erlang:error(Reason)
end.


parse_ranges(undefined, _Len) ->
undefined;
parse_ranges(Ranges, Len) ->
parse_ranges(Ranges, Len, []).

parse_ranges([], _Len, Acc) ->
lists:reverse(Acc);
parse_ranges([{From, To}|_], _Len, _Acc) when is_integer(From) andalso is_integer(To) andalso To < From ->
throw(requested_range_not_satisfiable);
parse_ranges([{From, To}|Rest], Len, Acc) when is_integer(To) andalso To >= Len ->
parse_ranges([{From, Len-1}] ++ Rest, Len, Acc);
parse_ranges([{none, To}|Rest], Len, Acc) ->
parse_ranges([{Len - To, Len - 1}] ++ Rest, Len, Acc);
parse_ranges([{From, none}|Rest], Len, Acc) ->
parse_ranges([{From, Len - 1}] ++ Rest, Len, Acc);
parse_ranges([{From,To}|Rest], Len, Acc) ->
parse_ranges(Rest, Len, [{From, To}] ++ Acc).


acceptable_encoding(Options) ->
case proplists:get_value(accepted_encoding, Options) of
undefined ->
[identity];
EncList ->
EncList1 = lists:foldl(fun(Enc, Acc) ->
case lists:member(Enc, [gzip, identity]) of
true ->
[Enc|Acc];
_ ->
Acc
end
end, [], EncList),
case EncList1 of
[] ->
{error, bad_accept_encoding_value};
_ ->
EncList1
end
end.

open_doc_rev1(Db, DocId, nil, Options) ->
db_exec(Db, fun(Db0) ->
case couch_db:open_doc(Db0, DocId, Options) of
{ok, Doc} ->
{ok, Doc};
Error ->
{error, Error}
end
end);
open_doc_rev1(Db, DocId, Rev, Options) ->
db_exec(Db, fun(Db0) ->
Rev1 = couch_doc:parse_rev(Rev),
Options1 = proplists:delete(rev, Options),
case couch_db:open_doc_revs(Db0, DocId, [Rev1], Options1) of
{ok, [{ok, Doc}]} ->
{ok, Doc};
{ok, [{{not_found, missing}, Rev}]} ->
{error, [{{not_found, missing}, Rev}]};
{ok, Else} ->
{error, Else}
end
end).


fold_map_view(View, Group, Fun, Db, QueryArgs, nil) ->
#view_query_args{
limit = Limit,
Expand Down
Loading

0 comments on commit 311ecbc

Please sign in to comment.