Skip to content

Commit

Permalink
Merge branch 'bugfix/resurrection' into release/1.3
Browse files Browse the repository at this point in the history
  • Loading branch information
andrewjstone committed Apr 2, 2013
2 parents f1b68b3 + 49cb5d2 commit c077ff1
Show file tree
Hide file tree
Showing 5 changed files with 148 additions and 50 deletions.
56 changes: 56 additions & 0 deletions riak_test/tests/cs512_regression_test.erl
@@ -0,0 +1,56 @@
%% ---------------------------------------------------------------------
%%
%% Copyright (c) 2007-2013 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% ---------------------------------------------------------------------

-module(cs512_regression_test).

-export([confirm/0]).
-include_lib("eunit/include/eunit.hrl").

-define(BUCKET, "riak-test-bucket").
-define(KEY, "test-key").

confirm() ->
{ok, UserConfig} = setup(),
put_and_get(UserConfig, <<"OLD">>),
put_and_get(UserConfig, <<"NEW">>),
delete(UserConfig),
assert_notfound(UserConfig),
pass.

put_and_get(UserConfig, Data) ->
erlcloud_s3:put_object(?BUCKET, ?KEY, Data, UserConfig),
Props = erlcloud_s3:get_object(?BUCKET, ?KEY, UserConfig),
?assertEqual(proplists:get_value(content, Props), Data).

delete(UserConfig) ->
erlcloud_s3:delete_object(?BUCKET, ?KEY, UserConfig).

assert_notfound(UserConfig) ->
?assertException(_,
{aws_error,{http_error,404,"Object Not Found",<<>>}},
erlcloud_s3:get_object(?BUCKET, ?KEY, UserConfig)).

setup() ->
{UserConfig, _} = rtcs:setup(4),
?assertEqual([{buckets, []}], erlcloud_s3:list_buckets(UserConfig)),
?assertEqual(ok, erlcloud_s3:create_bucket(?BUCKET, UserConfig)),
?assertMatch([{buckets, [[{name, ?BUCKET}, _]]}],
erlcloud_s3:list_buckets(UserConfig)),
{ok, UserConfig}.
102 changes: 80 additions & 22 deletions src/riak_cs_gc.erl
Expand Up @@ -32,7 +32,7 @@
-export([decode_and_merge_siblings/2,
gc_interval/0,
gc_retry_interval/0,
gc_active_manifests/5,
gc_active_manifests/3,
gc_specific_manifests/5,
epoch_start/0,
leeway_seconds/0,
Expand All @@ -43,30 +43,88 @@
%%% Public API
%%%===================================================================

gc_active_manifests(Manifests, RiakObject, Bucket, Key, RiakcPid) ->
case riak_cs_manifest_utils:active_manifest(Manifests) of
{ok, M} ->
case riak_cs_mp_utils:clean_multipart_unused_parts(M, RiakcPid) of
same ->
ActiveUUIDs = [M?MANIFEST.uuid],
GCManiResponse = gc_specific_manifests(ActiveUUIDs,
RiakObject,
Bucket, Key,
RiakcPid),
return_active_uuids_from_gc_response(GCManiResponse,
ActiveUUIDs);
updated ->
updated
%% @doc Keep requesting manifests until there are no more active manifests or
%% there is an error. This requires the following to be occur:
%% 1) All previously active multipart manifests have had their unused parts cleaned
%% and become active+multipart_clean
%% 2) All active manifests and active+multipart_clean manifests for multipart are GC'd
%%
%% Note that any error is irrespective of the current position of the GC states.
%% Some manifests may have been GC'd and then an error occurs. In this case the
%% client will only get the error response.
-spec gc_active_manifests(binary(), binary(), pid()) ->
{ok, [binary()]} | {error, term()}.
gc_active_manifests(Bucket, Key, RiakcPid) ->
gc_active_manifests(Bucket, Key, RiakcPid, []).

%% @private
-spec gc_active_manifests(binary(), binary(), pid(), [binary]) ->
{ok, [binary()]} | {error, term()}.
gc_active_manifests(Bucket, Key, RiakcPid, UUIDs) ->
case get_active_manifests(Bucket, Key, RiakcPid) of
{ok, _RiakObject, []} -> {ok, UUIDs};
{ok, RiakObject, Manifests} ->
UnchangedManifests = clean_manifests(Manifests, RiakcPid),
case gc_manifests(UnchangedManifests, RiakObject, Bucket, Key, RiakcPid) of
{error, _}=Error -> Error;
NewUUIDs -> gc_active_manifests(Bucket, Key, RiakcPid, UUIDs ++ NewUUIDs)
end;
_ ->
{ok, []}
{error, notfound} ->{ok, UUIDs};
{error, _}=Error -> Error
end.

%% @private
return_active_uuids_from_gc_response({ok, _RiakObject}, ActiveUUIDs) ->
{ok, ActiveUUIDs};
return_active_uuids_from_gc_response({error, _Error}=Error, _ActiveUUIDs) ->
Error.
-spec get_active_manifests(binary(), binary(), pid()) ->
{ok, riakc_obj:riakc_obj(), [lfs_manifest()]} | {error, term()}.
get_active_manifests(Bucket, Key, RiakcPid) ->
active_manifests(riak_cs_utils:get_manifests(RiakcPid, Bucket, Key)).

-spec active_manifests({ok, riakc_obj:riakc_obj(), [lfs_manifest()]}) ->
{ok, riakc_obj:riakc_obj(), [lfs_manifest()]};
({error, term()}) ->
{error, term()}.
active_manifests({ok, RiakObject, Manifests}) ->
{ok, RiakObject, riak_cs_manifest_utils:active_manifests(Manifests)};
active_manifests({error, _}=Error) -> Error.

-spec clean_manifests([lfs_manifest()], pid()) -> [lfs_manifest()].
clean_manifests(ActiveManifests, RiakcPid) ->
[M || M <- ActiveManifests, clean_multipart_manifest(M, RiakcPid)].

-spec clean_multipart_manifest(lfs_manifest(), pid()) -> true | false.
clean_multipart_manifest(M, RiakcPid) ->
is_multipart_clean(riak_cs_mp_utils:clean_multipart_unused_parts(M, RiakcPid)).

is_multipart_clean(same) ->
true;
is_multipart_clean(updated) ->
false.

-spec gc_manifests(Manifests :: [lfs_manifest()],
RiakObject :: riakc_obj:riakc_obj(),
Bucket :: binary(),
Key :: binary(),
RiakcPid :: pid()) ->
[binary()] | {error, term()}.
gc_manifests(Manifests, RiakObject, Bucket, Key, RiakcPid) ->
catch lists:foldl(fun(M, UUIDs) ->
gc_manifest(M, RiakObject, Bucket, Key, RiakcPid, UUIDs)
end, [], Manifests).

-spec gc_manifest(M :: lfs_manifest(),
RiakObject :: riakc_obj:riakc_obj(),
Bucket :: binary(),
Key :: binary(),
RiakcPid :: pid(),
UUIDs :: [binary()]) ->
[binary()] | no_return().
gc_manifest(M, RiakObject, Bucket, Key, RiakcPid, UUIDs) ->
UUID = M?MANIFEST.uuid,
check(gc_specific_manifests([UUID], RiakObject, Bucket, Key, RiakcPid), [UUID | UUIDs]).

check({ok, _}, Val) ->
Val;
check({error, _}=Error, _Val) ->
throw(Error).

%% @private
-spec gc_specific_manifests(UUIDsToMark :: [binary()],
Expand Down
9 changes: 9 additions & 0 deletions src/riak_cs_manifest_utils.erl
Expand Up @@ -31,6 +31,7 @@
%% export Public API
-export([new_dict/2,
active_manifest/1,
active_manifests/1,
active_and_writing_manifests/1,
overwritten_UUIDs/1,
mark_pending_delete/2,
Expand Down Expand Up @@ -64,6 +65,11 @@ active_manifest(Dict) ->
{ok, Manifest}
end.

%% @doc Return all active manifests
-spec active_manifests(orddict:orddict()) -> [lfs_manifest()] | [].
active_manifests(Dict) ->
lists:filter(fun manifest_is_active/1, orddict_values(Dict)).

%% @doc Return a list of all manifests in the
%% `active' or `writing' state
-spec active_and_writing_manifests(orddict:orddict()) -> [lfs_manifest()].
Expand Down Expand Up @@ -298,6 +304,9 @@ leeway_elapsed(Timestamp) ->
orddict_values(OrdDict) ->
[V || {_K, V} <- orddict:to_list(OrdDict)].

manifest_is_active(?MANIFEST{state=active}) -> true;
manifest_is_active(_Manifest) -> false.

%% NOTE: This is a foldl function, initial acc = no_active_manifest
most_recent_active_manifest(Manifest=?MANIFEST{state=active}, no_active_manifest) ->
Manifest;
Expand Down
29 changes: 2 additions & 27 deletions src/riak_cs_utils.erl
Expand Up @@ -339,22 +339,8 @@ delete_bucket(User, UserObj, Bucket, RiakPid) ->
-spec delete_object(binary(), binary(), pid()) ->
{ok, [binary()]} | {error, term()}.
delete_object(Bucket, Key, RiakcPid) ->
StartTime = os:timestamp(),
DoIt = fun() ->
maybe_gc_active_manifests(
get_manifests(RiakcPid, Bucket, Key), Bucket, Key, StartTime, RiakcPid)
end,
case DoIt() of
updated ->
%% Some multipart upload parts were deleted in
%% a minor transition from active state to
%% active + props=[multipart_clean|...] state.
%% The Riak object that get_manifests returned
%% is invalid, so retry once.
DoIt();
Else ->
Else
end.
ok = riak_cs_stats:update_with_start(object_delete, os:timestamp()),
riak_cs_gc:gc_active_manifests(Bucket, Key, RiakcPid).

-spec encode_term(term()) -> binary().
encode_term(Term) ->
Expand All @@ -369,17 +355,6 @@ encode_term(Term) ->
use_t2b_compression() ->
get_env(riak_cs, compress_terms, ?COMPRESS_TERMS).

%% @private
maybe_gc_active_manifests({ok, RiakObject, Manifests}, Bucket, Key, StartTime, RiakcPid) ->
R = riak_cs_gc:gc_active_manifests(Manifests, RiakObject, Bucket, Key, RiakcPid),
ok = riak_cs_stats:update_with_start(object_delete, StartTime),
R;
maybe_gc_active_manifests({error, notfound}, _Bucket, _Key, _StartTime, _RiakcPid) ->
{ok, []};
maybe_gc_active_manifests({error, _Reason}=Error, _Bucket, _Key, _StartTime, _RiakcPid) ->
Error.


%% Get the root bucket name for either a Riak CS object
%% bucket or the data block bucket name.
-spec from_bucket_name(binary()) -> {'blocks' | 'objects', binary()}.
Expand Down
2 changes: 1 addition & 1 deletion src/riak_cs_wm_object.erl
Expand Up @@ -237,7 +237,7 @@ delete_resource(RD, Ctx=#context{local_context=LocalCtx,

%% @private
handle_delete_object({error, Error}, UserName, BFile_str, RD, Ctx) ->
_ = lager:error("delete object failed with reason: ", [Error]),
_ = lager:error("delete object failed with reason: ~p", [Error]),
riak_cs_dtrace:dt_object_return(?MODULE, <<"object_delete">>, [0], [UserName, BFile_str]),
{false, RD, Ctx};
handle_delete_object({ok, _UUIDsMarkedforDelete}, UserName, BFile_str, RD, Ctx) ->
Expand Down

0 comments on commit c077ff1

Please sign in to comment.