diff --git a/riak_test/src/rtcs_object.erl b/riak_test/src/rtcs_object.erl index f76bd3e48..b2128bae4 100644 --- a/riak_test/src/rtcs_object.erl +++ b/riak_test/src/rtcs_object.erl @@ -22,6 +22,7 @@ -compile(export_all). -include_lib("eunit/include/eunit.hrl"). +-include_lib("erlcloud/include/erlcloud_aws.hrl"). upload(UserConfig, normal, B, K) -> Content = crypto:rand_bytes(mb(4)), @@ -29,7 +30,40 @@ upload(UserConfig, normal, B, K) -> {B, K, Content}; upload(UserConfig, multipart, B, K) -> Content = rtcs_multipart:multipart_upload(B, K, [mb(10), 400], UserConfig), - {B, K, Content}. + {B, K, Content}; +upload(UserConfig, {normal_partial, CL, Actual}, B, K) when is_list(K), + CL >= Actual -> + %% Dumbest handmade S3 PUT Client + %% Send partial payload to the socket and suddenly close + Host = io_lib:format("~s.s3.amazonaws.com", [B]), + Date = httpd_util:rfc1123_date(erlang:localtime()), + %% Fake checksum, this request should fail if all payloads were sent + MD5 = "1B2M2Y8AsgTpgAmY7PhCfg==", + ToSign = ["PUT", $\n, MD5, $\n, "application/octet-stream", $\n, + Date, $\n, [], $/, B, $/, K, []], + lager:debug("String to Sign: ~s", [ToSign]), + Sig = base64:encode_to_string(crypto:hmac( + sha, + UserConfig#aws_config.secret_access_key, + ToSign)), + Auth = io_lib:format("Authorization: AWS ~s:~s", + [UserConfig#aws_config.access_key_id, Sig]), + {ok, Sock} = gen_tcp:connect("127.0.0.1", 15018, [{active, false}]), + FirstLine = io_lib:format("PUT /~s HTTP/1.1", [K]), + Binary = binary:copy(<<"*">>, Actual), + ReqHdr = [FirstLine, $\n, "Host: ", Host, $\n, Auth, $\n, + "Content-Length: ", integer_to_list(CL), $\n, + "Content-Md5: ", MD5, $\n, + "Content-Type: application/octet-stream", $\n, + "Date: ", Date, $\n], + lager:info("~s", [iolist_to_binary(ReqHdr)]), + case gen_tcp:send(Sock, [ReqHdr, $\n, Binary]) of + ok -> + %% Let caller handle the socket call, either close or continue + {ok, Sock}; + Error -> + Error + end. upload(UserConfig, normal_copy, B, DstK, SrcK) -> ?assertEqual([{copy_source_version_id, "false"}, {version_id, "null"}], diff --git a/riak_test/tests/regression_tests.erl b/riak_test/tests/regression_tests.erl index ae88dba19..55dbcc653 100644 --- a/riak_test/tests/regression_tests.erl +++ b/riak_test/tests/regression_tests.erl @@ -28,6 +28,7 @@ -export([confirm/0]). -include_lib("eunit/include/eunit.hrl"). +-include("riak_cs.hrl"). -define(TEST_BUCKET_CS347, "test-bucket-cs347"). @@ -38,6 +39,7 @@ confirm() -> ok = verify_cs347(SetupInfo, "test-bucket-cs347"), ok = verify_cs436(SetupInfo, "test-bucket-cs436"), ok = verify_cs512(UserConfig, "test-bucket-cs512"), + ok = verify_cs770(SetupInfo, "test-bucket-cs770"), %% Append your next regression tests here @@ -145,6 +147,79 @@ verify_cs512(UserConfig, BucketName) -> assert_notfound(UserConfig,BucketName), ok. +verify_cs770({UserConfig, {RiakNodes, _, _}}, BucketName) -> + %% put object and cancel it; + ?assertEqual(ok, erlcloud_s3:create_bucket(BucketName, UserConfig)), + Key = "foobar", + lager:debug("starting cs770 verification: ~s ~s", [BucketName, Key]), + + {ok, Socket} = rtcs_object:upload(UserConfig, + {normal_partial, 3*1024*1024, 1024*1024}, + BucketName, Key), + + [[{UUID, M}]] = get_manifests(RiakNodes, BucketName, Key), + + %% Even if CS is smart enough to remove canceled upload, at this + %% time the socket will be still alive, so no cancellation logic + %% shouldn't be triggerred. + ?assertEqual(writing, M?MANIFEST.state), + lager:debug("UUID of ~s ~s: ~p", [BucketName, Key, UUID]), + + %% Emulate socket error with {error, closed} at server + ok = gen_tcp:close(Socket), + %% This wait is just for convenience + timer:sleep(1000), + rt:wait_until(fun() -> + [[{UUID, Mx}]] = get_manifests(RiakNodes, BucketName, Key), + scheduled_delete =:= Mx?MANIFEST.state + end, 8, 4096), + + Pbc = rtcs:pbc(RiakNodes, objects, BucketName), + + %% verify that object is also stored in latest GC bucket + Ms = all_manifests_in_gc_bucket(Pbc), + lager:info("Retrieved ~p manifets from GC bucket", [length(Ms)]), + ?assertMatch( + [{UUID, _}], + lists:filter(fun({UUID0, M1}) when UUID0 =:= UUID -> + ?assertEqual(pending_delete, M1?MANIFEST.state), + true; + ({UUID0, _}) -> + lager:debug("UUID=~p / ~p", + [mochihex:to_hex(UUID0), mochihex:to_hex(UUID)]), + false; + (_Other) -> + lager:error("Unexpected: ~p", [_Other]), + false + end, Ms)), + + lager:info("cs770 verification ok", []), + ?assertEqual(ok, erlcloud_s3:delete_bucket(BucketName, UserConfig)), + ok. + +all_manifests_in_gc_bucket(Pbc) -> + {ok, Keys} = riakc_pb_socket:list_keys(Pbc, ?GC_BUCKET), + Ms = rt:pmap(fun(K) -> + {ok, O} = riakc_pb_socket:get(Pbc, <<"riak-cs-gc">>, K), + Some = [binary_to_term(V) || {_, V} <- riakc_obj:get_contents(O), + V =/= <<>>], + twop_set:to_list(twop_set:resolve(Some)) + end, Keys), + %% lager:debug("All manifests in GC buckets: ~p", [Ms]), + lists:flatten(Ms). + +get_manifests(RiakNodes, BucketName, Key) -> + rt:wait_until(fun() -> + case rc_helper:get_riakc_obj(RiakNodes, objects, BucketName, Key) of + {ok, _} -> true; + Error -> Error + end + end, 8, 500), + {ok, Obj} = rc_helper:get_riakc_obj(RiakNodes, objects, BucketName, Key), + %% Assuming no tombstone; + [binary_to_term(V) || {_, V} <- riakc_obj:get_contents(Obj), + V =/= <<>>]. + put_and_get(UserConfig, BucketName, Data) -> erlcloud_s3:put_object(BucketName, ?KEY, Data, UserConfig), Props = erlcloud_s3:get_object(BucketName, ?KEY, UserConfig), diff --git a/riak_test/tests/upgrade_downgrade_test.erl b/riak_test/tests/upgrade_downgrade_test.erl index 5e82cfa0d..cff44d5a7 100644 --- a/riak_test/tests/upgrade_downgrade_test.erl +++ b/riak_test/tests/upgrade_downgrade_test.erl @@ -51,6 +51,9 @@ confirm() -> ok = rt:upgrade(RiakNode, RiakCurrentVsn), rt:wait_for_service(RiakNode, riak_kv), ok = rtcs_config:upgrade_cs(N, AdminCreds), + rtcs:set_advanced_conf({cs, current, N}, + [{riak_cs, + [{riak_host, {"127.0.0.1", rtcs_config:pb_port(1)}}]}]), rtcs_exec:start_cs(N, current) end || RiakNode <- RiakNodes], diff --git a/src/riak_cs_manifest_fsm.erl b/src/riak_cs_manifest_fsm.erl index f5b65bfe1..de193073f 100644 --- a/src/riak_cs_manifest_fsm.erl +++ b/src/riak_cs_manifest_fsm.erl @@ -41,6 +41,7 @@ update_manifest/2, update_manifests/2, delete_specific_manifest/2, + gc_specific_manifest/2, update_manifest_with_confirmation/2, update_manifests_with_confirmation/2, maybe_stop_manifest_fsm/1, @@ -127,6 +128,14 @@ update_manifest(Pid, Manifest) -> delete_specific_manifest(Pid, UUID) -> gen_fsm:sync_send_event(Pid, {delete_manifest, UUID}, infinity). +%% @doc Not only delete a specific manifest version from a manifest +%% and update the manifest value in riak or delete the manifest key +%% from riak if there are no manifest versions remaining, but also +%% moves them into GC bucket. +-spec gc_specific_manifest(pid(), binary()) -> ok | {error, term()}. +gc_specific_manifest(Pid, UUID) -> + gen_fsm:sync_send_event(Pid, {gc_specific_manifest, UUID}, infinity). + -spec update_manifests_with_confirmation(pid(), orddict:orddict()) -> ok | {error, term()}. update_manifests_with_confirmation(Pid, Manifests) -> gen_fsm:sync_send_event(Pid, {update_manifests_with_confirmation, Manifests}, @@ -196,7 +205,6 @@ waiting_update_command({update_manifests, WrappedManifests}, WrappedManifests), {next_state, waiting_update_command, State#state{riak_object=undefined, manifests=undefined}}. - waiting_command(get_manifests, _From, State) -> {Reply, NewState} = handle_get_manifests(State), {reply, Reply, waiting_update_command, NewState}; @@ -244,7 +252,30 @@ waiting_update_command({update_manifests_with_confirmation, WrappedManifests}, _ WrappedManifests) end, {reply, Reply, waiting_update_command, State#state{riak_object=undefined, - manifests=undefined}}. + manifests=undefined}}; +waiting_update_command({gc_specific_manifest, UUID}, _From, + #state{ + riak_object = RiakObj0, + bucket = Bucket, + key = Key, + riak_client = RcPid + } = State) -> + %% put_fsm has issued delete_manifest caused by force_stop + Res = case RiakObj0 of + undefined -> + case riak_cs_manifest:get_manifests(RcPid, Bucket, Key) of + {ok, RiakObj, _} -> + riak_cs_gc:gc_specific_manifests([UUID], RiakObj, + Bucket, Key, RcPid); + Error -> + Error + end; + RiakObj -> + riak_cs_gc:gc_specific_manifests([UUID], RiakObj, + Bucket, Key, RcPid) + end, + {stop, normal, Res, State}. + handle_event(_Event, StateName, State) -> {next_state, StateName, State}. diff --git a/src/riak_cs_put_fsm.erl b/src/riak_cs_put_fsm.erl index 3a99fc35b..3646e2b50 100644 --- a/src/riak_cs_put_fsm.erl +++ b/src/riak_cs_put_fsm.erl @@ -348,8 +348,11 @@ handle_event(_Event, StateName, State) -> handle_sync_event(current_state, _From, StateName, State) -> Reply = {StateName, State}, {reply, Reply, StateName, State}; -handle_sync_event(force_stop, _From, _StateName, State) -> - {stop, normal, ok, State}; +handle_sync_event(force_stop, _From, _StateName, State = #state{mani_pid=ManiPid, + uuid=UUID}) -> + Res = riak_cs_manifest_fsm:gc_specific_manifest(ManiPid, UUID), + lager:debug("Manifest collection on upload failure: ~p", [Res]), + {stop, normal, Res, State}; handle_sync_event(_Event, _From, StateName, State) -> Reply = ok, {reply, Reply, StateName, State}. diff --git a/src/riak_cs_wm_object.erl b/src/riak_cs_wm_object.erl index f6e8a584e..e8254e292 100644 --- a/src/riak_cs_wm_object.erl +++ b/src/riak_cs_wm_object.erl @@ -343,7 +343,18 @@ handle_normal_put(RD, Ctx) -> Args = [{Bucket, list_to_binary(Key), Size, list_to_binary(ContentType), Metadata, BlockSize, ACL, timer:seconds(60), self(), RcPid}], {ok, Pid} = riak_cs_put_fsm_sup:start_put_fsm(node(), Args), - accept_streambody(RD, Ctx, Pid, wrq:stream_req_body(RD, riak_cs_lfs_utils:block_size())). + try + accept_streambody(RD, Ctx, Pid, + wrq:stream_req_body(RD, riak_cs_lfs_utils:block_size())) + catch + Type:Error -> + %% Want to catch mochiweb_socket:recv() returns {error, + %% einval} or disconnected stuff, any errors prevents this + %% manifests from being uploaded anymore + Res = riak_cs_put_fsm:force_stop(Pid), + _ = lager:debug("PUT FSM force_stop: ~p Reason: ~p", [Res, {Type, Error}]), + error({Type, Error}) + end. %% @doc the head is PUT copy path -spec handle_copy_put(#wm_reqdata{}, #context{}, binary(), binary()) -> diff --git a/src/riak_cs_wm_object_upload_part.erl b/src/riak_cs_wm_object_upload_part.erl index 089c96e95..174733c54 100644 --- a/src/riak_cs_wm_object_upload_part.erl +++ b/src/riak_cs_wm_object_upload_part.erl @@ -280,6 +280,9 @@ prepare_part_upload(RD, #context{riak_client=RcPid, case SrcManifest of undefined -> BlockSize = riak_cs_lfs_utils:block_size(), + %% No badmatch errow by machiweb_socket:recv() + %% will be catched here because writing state + %% parts can be collected in Multipart Abort API. accept_streambody(RD, Ctx, PutPid, wrq:stream_req_body(RD, BlockSize)); _ ->