Skip to content

Commit

Permalink
Merge pull request #1280 from basho/bugfix/cleanup-manifests
Browse files Browse the repository at this point in the history
Handle socket error by catching badmatch

Reviewed-by: shino
  • Loading branch information
borshop committed Jan 14, 2016
2 parents aa74720 + 6f5e135 commit 8018a32
Show file tree
Hide file tree
Showing 7 changed files with 166 additions and 6 deletions.
36 changes: 35 additions & 1 deletion riak_test/src/rtcs_object.erl
Expand Up @@ -22,14 +22,48 @@

-compile(export_all).
-include_lib("eunit/include/eunit.hrl").
-include_lib("erlcloud/include/erlcloud_aws.hrl").

upload(UserConfig, normal, B, K) ->
Content = crypto:rand_bytes(mb(4)),
erlcloud_s3:put_object(B, K, Content, UserConfig),
{B, K, Content};
upload(UserConfig, multipart, B, K) ->
Content = rtcs_multipart:multipart_upload(B, K, [mb(10), 400], UserConfig),
{B, K, Content}.
{B, K, Content};
upload(UserConfig, {normal_partial, CL, Actual}, B, K) when is_list(K),
CL >= Actual ->
%% Dumbest handmade S3 PUT Client
%% Send partial payload to the socket and suddenly close
Host = io_lib:format("~s.s3.amazonaws.com", [B]),
Date = httpd_util:rfc1123_date(erlang:localtime()),
%% Fake checksum, this request should fail if all payloads were sent
MD5 = "1B2M2Y8AsgTpgAmY7PhCfg==",
ToSign = ["PUT", $\n, MD5, $\n, "application/octet-stream", $\n,
Date, $\n, [], $/, B, $/, K, []],
lager:debug("String to Sign: ~s", [ToSign]),
Sig = base64:encode_to_string(crypto:hmac(
sha,
UserConfig#aws_config.secret_access_key,
ToSign)),
Auth = io_lib:format("Authorization: AWS ~s:~s",
[UserConfig#aws_config.access_key_id, Sig]),
{ok, Sock} = gen_tcp:connect("127.0.0.1", 15018, [{active, false}]),
FirstLine = io_lib:format("PUT /~s HTTP/1.1", [K]),
Binary = binary:copy(<<"*">>, Actual),
ReqHdr = [FirstLine, $\n, "Host: ", Host, $\n, Auth, $\n,
"Content-Length: ", integer_to_list(CL), $\n,
"Content-Md5: ", MD5, $\n,
"Content-Type: application/octet-stream", $\n,
"Date: ", Date, $\n],
lager:info("~s", [iolist_to_binary(ReqHdr)]),
case gen_tcp:send(Sock, [ReqHdr, $\n, Binary]) of
ok ->
%% Let caller handle the socket call, either close or continue
{ok, Sock};
Error ->
Error
end.

upload(UserConfig, normal_copy, B, DstK, SrcK) ->
?assertEqual([{copy_source_version_id, "false"}, {version_id, "null"}],
Expand Down
75 changes: 75 additions & 0 deletions riak_test/tests/regression_tests.erl
Expand Up @@ -28,6 +28,7 @@

-export([confirm/0]).
-include_lib("eunit/include/eunit.hrl").
-include("riak_cs.hrl").

-define(TEST_BUCKET_CS347, "test-bucket-cs347").

Expand All @@ -38,6 +39,7 @@ confirm() ->
ok = verify_cs347(SetupInfo, "test-bucket-cs347"),
ok = verify_cs436(SetupInfo, "test-bucket-cs436"),
ok = verify_cs512(UserConfig, "test-bucket-cs512"),
ok = verify_cs770(SetupInfo, "test-bucket-cs770"),

%% Append your next regression tests here

Expand Down Expand Up @@ -145,6 +147,79 @@ verify_cs512(UserConfig, BucketName) ->
assert_notfound(UserConfig,BucketName),
ok.

verify_cs770({UserConfig, {RiakNodes, _, _}}, BucketName) ->
%% put object and cancel it;
?assertEqual(ok, erlcloud_s3:create_bucket(BucketName, UserConfig)),
Key = "foobar",
lager:debug("starting cs770 verification: ~s ~s", [BucketName, Key]),

{ok, Socket} = rtcs_object:upload(UserConfig,
{normal_partial, 3*1024*1024, 1024*1024},
BucketName, Key),

[[{UUID, M}]] = get_manifests(RiakNodes, BucketName, Key),

%% Even if CS is smart enough to remove canceled upload, at this
%% time the socket will be still alive, so no cancellation logic
%% shouldn't be triggerred.
?assertEqual(writing, M?MANIFEST.state),
lager:debug("UUID of ~s ~s: ~p", [BucketName, Key, UUID]),

%% Emulate socket error with {error, closed} at server
ok = gen_tcp:close(Socket),
%% This wait is just for convenience
timer:sleep(1000),
rt:wait_until(fun() ->
[[{UUID, Mx}]] = get_manifests(RiakNodes, BucketName, Key),
scheduled_delete =:= Mx?MANIFEST.state
end, 8, 4096),

Pbc = rtcs:pbc(RiakNodes, objects, BucketName),

%% verify that object is also stored in latest GC bucket
Ms = all_manifests_in_gc_bucket(Pbc),
lager:info("Retrieved ~p manifets from GC bucket", [length(Ms)]),
?assertMatch(
[{UUID, _}],
lists:filter(fun({UUID0, M1}) when UUID0 =:= UUID ->
?assertEqual(pending_delete, M1?MANIFEST.state),
true;
({UUID0, _}) ->
lager:debug("UUID=~p / ~p",
[mochihex:to_hex(UUID0), mochihex:to_hex(UUID)]),
false;
(_Other) ->
lager:error("Unexpected: ~p", [_Other]),
false
end, Ms)),

lager:info("cs770 verification ok", []),
?assertEqual(ok, erlcloud_s3:delete_bucket(BucketName, UserConfig)),
ok.

all_manifests_in_gc_bucket(Pbc) ->
{ok, Keys} = riakc_pb_socket:list_keys(Pbc, ?GC_BUCKET),
Ms = rt:pmap(fun(K) ->
{ok, O} = riakc_pb_socket:get(Pbc, <<"riak-cs-gc">>, K),
Some = [binary_to_term(V) || {_, V} <- riakc_obj:get_contents(O),
V =/= <<>>],
twop_set:to_list(twop_set:resolve(Some))
end, Keys),
%% lager:debug("All manifests in GC buckets: ~p", [Ms]),
lists:flatten(Ms).

get_manifests(RiakNodes, BucketName, Key) ->
rt:wait_until(fun() ->
case rc_helper:get_riakc_obj(RiakNodes, objects, BucketName, Key) of
{ok, _} -> true;
Error -> Error
end
end, 8, 500),
{ok, Obj} = rc_helper:get_riakc_obj(RiakNodes, objects, BucketName, Key),
%% Assuming no tombstone;
[binary_to_term(V) || {_, V} <- riakc_obj:get_contents(Obj),
V =/= <<>>].

put_and_get(UserConfig, BucketName, Data) ->
erlcloud_s3:put_object(BucketName, ?KEY, Data, UserConfig),
Props = erlcloud_s3:get_object(BucketName, ?KEY, UserConfig),
Expand Down
3 changes: 3 additions & 0 deletions riak_test/tests/upgrade_downgrade_test.erl
Expand Up @@ -51,6 +51,9 @@ confirm() ->
ok = rt:upgrade(RiakNode, RiakCurrentVsn),
rt:wait_for_service(RiakNode, riak_kv),
ok = rtcs_config:upgrade_cs(N, AdminCreds),
rtcs:set_advanced_conf({cs, current, N},
[{riak_cs,
[{riak_host, {"127.0.0.1", rtcs_config:pb_port(1)}}]}]),
rtcs_exec:start_cs(N, current)
end
|| RiakNode <- RiakNodes],
Expand Down
35 changes: 33 additions & 2 deletions src/riak_cs_manifest_fsm.erl
Expand Up @@ -41,6 +41,7 @@
update_manifest/2,
update_manifests/2,
delete_specific_manifest/2,
gc_specific_manifest/2,
update_manifest_with_confirmation/2,
update_manifests_with_confirmation/2,
maybe_stop_manifest_fsm/1,
Expand Down Expand Up @@ -127,6 +128,14 @@ update_manifest(Pid, Manifest) ->
delete_specific_manifest(Pid, UUID) ->
gen_fsm:sync_send_event(Pid, {delete_manifest, UUID}, infinity).

%% @doc Not only delete a specific manifest version from a manifest
%% and update the manifest value in riak or delete the manifest key
%% from riak if there are no manifest versions remaining, but also
%% moves them into GC bucket.
-spec gc_specific_manifest(pid(), binary()) -> ok | {error, term()}.
gc_specific_manifest(Pid, UUID) ->
gen_fsm:sync_send_event(Pid, {gc_specific_manifest, UUID}, infinity).

-spec update_manifests_with_confirmation(pid(), orddict:orddict()) -> ok | {error, term()}.
update_manifests_with_confirmation(Pid, Manifests) ->
gen_fsm:sync_send_event(Pid, {update_manifests_with_confirmation, Manifests},
Expand Down Expand Up @@ -196,7 +205,6 @@ waiting_update_command({update_manifests, WrappedManifests},
WrappedManifests),
{next_state, waiting_update_command, State#state{riak_object=undefined, manifests=undefined}}.


waiting_command(get_manifests, _From, State) ->
{Reply, NewState} = handle_get_manifests(State),
{reply, Reply, waiting_update_command, NewState};
Expand Down Expand Up @@ -244,7 +252,30 @@ waiting_update_command({update_manifests_with_confirmation, WrappedManifests}, _
WrappedManifests)
end,
{reply, Reply, waiting_update_command, State#state{riak_object=undefined,
manifests=undefined}}.
manifests=undefined}};
waiting_update_command({gc_specific_manifest, UUID}, _From,
#state{
riak_object = RiakObj0,
bucket = Bucket,
key = Key,
riak_client = RcPid
} = State) ->
%% put_fsm has issued delete_manifest caused by force_stop
Res = case RiakObj0 of
undefined ->
case riak_cs_manifest:get_manifests(RcPid, Bucket, Key) of
{ok, RiakObj, _} ->
riak_cs_gc:gc_specific_manifests([UUID], RiakObj,
Bucket, Key, RcPid);
Error ->
Error
end;
RiakObj ->
riak_cs_gc:gc_specific_manifests([UUID], RiakObj,
Bucket, Key, RcPid)
end,
{stop, normal, Res, State}.

handle_event(_Event, StateName, State) ->
{next_state, StateName, State}.

Expand Down
7 changes: 5 additions & 2 deletions src/riak_cs_put_fsm.erl
Expand Up @@ -348,8 +348,11 @@ handle_event(_Event, StateName, State) ->
handle_sync_event(current_state, _From, StateName, State) ->
Reply = {StateName, State},
{reply, Reply, StateName, State};
handle_sync_event(force_stop, _From, _StateName, State) ->
{stop, normal, ok, State};
handle_sync_event(force_stop, _From, _StateName, State = #state{mani_pid=ManiPid,
uuid=UUID}) ->
Res = riak_cs_manifest_fsm:gc_specific_manifest(ManiPid, UUID),
lager:debug("Manifest collection on upload failure: ~p", [Res]),
{stop, normal, Res, State};
handle_sync_event(_Event, _From, StateName, State) ->
Reply = ok,
{reply, Reply, StateName, State}.
Expand Down
13 changes: 12 additions & 1 deletion src/riak_cs_wm_object.erl
Expand Up @@ -343,7 +343,18 @@ handle_normal_put(RD, Ctx) ->
Args = [{Bucket, list_to_binary(Key), Size, list_to_binary(ContentType),
Metadata, BlockSize, ACL, timer:seconds(60), self(), RcPid}],
{ok, Pid} = riak_cs_put_fsm_sup:start_put_fsm(node(), Args),
accept_streambody(RD, Ctx, Pid, wrq:stream_req_body(RD, riak_cs_lfs_utils:block_size())).
try
accept_streambody(RD, Ctx, Pid,
wrq:stream_req_body(RD, riak_cs_lfs_utils:block_size()))
catch
Type:Error ->
%% Want to catch mochiweb_socket:recv() returns {error,
%% einval} or disconnected stuff, any errors prevents this
%% manifests from being uploaded anymore
Res = riak_cs_put_fsm:force_stop(Pid),
_ = lager:debug("PUT FSM force_stop: ~p Reason: ~p", [Res, {Type, Error}]),
error({Type, Error})
end.

%% @doc the head is PUT copy path
-spec handle_copy_put(#wm_reqdata{}, #context{}, binary(), binary()) ->
Expand Down
3 changes: 3 additions & 0 deletions src/riak_cs_wm_object_upload_part.erl
Expand Up @@ -280,6 +280,9 @@ prepare_part_upload(RD, #context{riak_client=RcPid,
case SrcManifest of
undefined ->
BlockSize = riak_cs_lfs_utils:block_size(),
%% No badmatch errow by machiweb_socket:recv()
%% will be catched here because writing state
%% parts can be collected in Multipart Abort API.
accept_streambody(RD, Ctx, PutPid,
wrq:stream_req_body(RD, BlockSize));
_ ->
Expand Down

6 comments on commit 8018a32

@shino
Copy link
Contributor

@shino shino commented on 8018a32 Jan 18, 2016

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1

@borshop
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

saw approval from shino
at 8018a32

@borshop
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

merging basho/riak_cs/2.1 = 8018a32 into borshop-integration-1287-2.1

@borshop
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

basho/riak_cs/2.1 = 8018a32 merged ok, testing candidate = c0c1012

@borshop
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@borshop
Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

fast-forwarding develop to borshop-integration-1287-2.1 = c0c1012

Please sign in to comment.