Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

List objects FSM v2 bugfix and tests #788

4 changes: 3 additions & 1 deletion .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -17,4 +17,6 @@ current_counterexample.eqc
.client_test/
priv/riak_cs_drv.so
pkg.vars.config
client_tests/python/ceph_tests/s3-tests
client_tests/python/ceph_tests/s3-tests
.eqc-info
.riak_cs_list_objects_fsm_v2_eqc.txt
13 changes: 12 additions & 1 deletion include/list_objects.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,9 @@
%% see also: http://docs.amazonwebservices.com/AmazonS3/latest/API/RESTBucketGET.html
%% non mandatory keys have `| undefined' as a
%% type option.
%%
%% This record actually does not need to be versioned,
%% as it's never persisted.
-record(list_objects_request_v1, {
%% the name of the bucket
name :: binary(),
Expand All @@ -39,6 +42,10 @@
-type list_object_request() :: #list_objects_request_v1{}.
-define(LOREQ, #list_objects_request_v1).

-type next_marker() :: 'undefined' | binary().

%% This record actually does not need to be versioned,
%% as it's never persisted.
-record(list_objects_response_v1, {
%% Params just echoed back from the request --------------------------

Expand All @@ -55,9 +62,13 @@
%% a binary to group keys by
delimiter :: binary() | undefined,

%% the key to start with
%% the marker used in the _request_
marker :: binary() | undefined,

%% the (optional) marker to use for pagination
%% in the _next_ request
next_marker :: next_marker(),

%% The actual response -----------------------------------------------
is_truncated :: boolean(),

Expand Down
55 changes: 55 additions & 0 deletions riak_test/tests/cs_781_regression_test.erl
Original file line number Diff line number Diff line change
@@ -0,0 +1,55 @@
%% ---------------------------------------------------------------------
%%
%% Copyright (c) 2007-2013 Basho Technologies, Inc. All Rights Reserved.
%%
%% This file is provided to you under the Apache License,
%% Version 2.0 (the "License"); you may not use this file
%% except in compliance with the License. You may obtain
%% a copy of the License at
%%
%% http://www.apache.org/licenses/LICENSE-2.0
%%
%% Unless required by applicable law or agreed to in writing,
%% software distributed under the License is distributed on an
%% "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
%% KIND, either express or implied. See the License for the
%% specific language governing permissions and limitations
%% under the License.
%%
%% ---------------------------------------------------------------------

-module(cs_781_regression_test).
%% @doc Integration test for [https://github.com/basho/riak_cs/issues/781]

-compile(export_all).
-include_lib("eunit/include/eunit.hrl").


-export([confirm/0]).

confirm() ->
Config = [{riak, rtcs:riak_config()}, {stanchion, rtcs:stanchion_config()},
{cs, rtcs:cs_config([{fold_objects_for_list_keys, true}])}],
{UserConfig, {_RiakNodes, _CSNodes, _Stanchion}} = rtcs:setup(4, Config),
run_test(UserConfig).

-define(TEST_BUCKET_1, "cs-781-test-bucket-1").

format_int(Int) ->
binary_to_list(iolist_to_binary(io_lib:format("~4..0B", [Int]))).

run_test(UserConfig) ->
?assertEqual(ok, erlcloud_s3:create_bucket(?TEST_BUCKET_1, UserConfig)),
Count = 1003,
[erlcloud_s3:put_object(?TEST_BUCKET_1,
format_int(X),
crypto:rand_bytes(100),
UserConfig) || X <- lists:seq(1, Count)],
erlcloud_s3:delete_object(?TEST_BUCKET_1, format_int(1), UserConfig),
erlcloud_s3:delete_object(?TEST_BUCKET_1, format_int(2), UserConfig),
?assertEqual(true,
proplists:get_value(is_truncated,
erlcloud_s3:list_objects(?TEST_BUCKET_1,
[],
UserConfig))).

6 changes: 4 additions & 2 deletions src/riak_cs_list_objects.erl
Original file line number Diff line number Diff line change
Expand Up @@ -28,7 +28,7 @@
%% API
-export([new_request/1,
new_request/3,
new_response/4,
new_response/5,
manifest_to_keycontent/1]).

%%%===================================================================
Expand Down Expand Up @@ -68,6 +68,7 @@ process_options_helper({marker, Val}, Req) ->

-spec new_response(list_object_request(),
IsTruncated :: boolean(),
NextMarker :: next_marker(),
CommonPrefixes :: list(list_objects_common_prefixes()),
ObjectContents :: list(list_objects_key_content())) ->
list_object_response().
Expand All @@ -76,12 +77,13 @@ new_response(?LOREQ{name=Name,
prefix=Prefix,
delimiter=Delimiter,
marker=Marker},
IsTruncated, CommonPrefixes, ObjectContents) ->
IsTruncated, NextMarker, CommonPrefixes, ObjectContents) ->
?LORESP{name=Name,
max_keys=MaxKeys,
prefix=Prefix,
delimiter=Delimiter,
marker=Marker,
next_marker=NextMarker,
is_truncated=IsTruncated,
contents=ObjectContents,
common_prefixes=CommonPrefixes}.
Expand Down
2 changes: 1 addition & 1 deletion src/riak_cs_list_objects_fsm.erl
Original file line number Diff line number Diff line change
Expand Up @@ -422,7 +422,7 @@ make_response(Request=?LOREQ{max_keys=NumKeysRequested},
riak_cs_list_objects_utils:untagged_manifest_and_prefix(SlicedTaggedItems),
KeyContents = lists:map(fun riak_cs_list_objects:manifest_to_keycontent/1,
NewManis),
riak_cs_list_objects:new_response(Request, IsTruncated, NewPrefixes,
riak_cs_list_objects:new_response(Request, IsTruncated, undefined, NewPrefixes,
KeyContents).

-spec next_mr_query_spec(list(),
Expand Down
117 changes: 82 additions & 35 deletions src/riak_cs_list_objects_fsm_v2.erl
Original file line number Diff line number Diff line change
Expand Up @@ -37,7 +37,8 @@
-endif.

%% API
-export([start_link/2]).
-export([start_link/2,
start_link/3]).

%% Observability
-export([]).
Expand Down Expand Up @@ -73,7 +74,7 @@
-record(state, {riakc_pid :: pid(),
req :: list_object_request(),
reply_ref :: undefined | {pid(), any()},
key_multiplier :: float(),
fold_objects_batch_size :: pos_integer(),
object_list_req_id :: undefined | reference(),
reached_end_of_keyspace=false :: boolean(),
object_buffer=[] :: list(),
Expand Down Expand Up @@ -111,22 +112,24 @@
-spec start_link(pid(), list_object_request()) ->
{ok, pid()} | {error, term()}.
start_link(RiakcPid, ListKeysRequest) ->
gen_fsm:start_link(?MODULE, [RiakcPid, ListKeysRequest], []).
FoldObjectsBatchSize = 1002,
start_link(RiakcPid, ListKeysRequest, FoldObjectsBatchSize).

-spec start_link(pid(), list_object_request(), pos_integer()) ->
{ok, pid()} | {error, term()}.
start_link(RiakcPid, ListKeysRequest, FoldObjectsBatchSize) ->
BatchSize2 = max(2, FoldObjectsBatchSize),
gen_fsm:start_link(?MODULE, [RiakcPid, ListKeysRequest, BatchSize2], []).

%%%===================================================================
%%% gen_fsm callbacks
%%%===================================================================

-spec init(list()) -> {ok, prepare, state(), 0}.
init([RiakcPid, Request]) ->
%% TODO: this should not be hardcoded. Maybe there should
%% be two `start_link' arities, and one will use a default
%% val from app.config and the other will explicitly
%% take a val
KeyMultiplier = riak_cs_list_objects_utils:get_key_list_multiplier(),
init([RiakcPid, Request, FoldObjectsBatchSize]) ->

State = #state{riakc_pid=RiakcPid,
key_multiplier=KeyMultiplier,
fold_objects_batch_size=FoldObjectsBatchSize,
req=Request},
{ok, prepare, State, 0}.

Expand Down Expand Up @@ -199,7 +202,8 @@ handle_done(State=#state{object_buffer=ObjectBuffer,
objects=PrevObjects,
last_request_num_keys_requested=NumKeysRequested,
common_prefixes=CommonPrefixes,
req=Request=?LOREQ{max_keys=UserMaxKeys}}) ->
req=Request}) ->

ObjectBufferLength = length(ObjectBuffer),
RangeUpdatedStateData =
update_profiling_and_last_request(State, ObjectBuffer, ObjectBufferLength),
Expand All @@ -211,23 +215,16 @@ handle_done(State=#state{object_buffer=ObjectBuffer,
NewObjects = PrevObjects ++ Active,
ObjectPrefixTuple = {NewObjects, CommonPrefixes},

ObjectPrefixTuple2 =
{NewManis, NewPrefixes} =
riak_cs_list_objects_utils:filter_prefix_keys(ObjectPrefixTuple, Request),
ReachedEnd = ObjectBufferLength < NumKeysRequested,

Truncated = truncated(UserMaxKeys, ObjectPrefixTuple2),
SlicedTaggedItems =
riak_cs_list_objects_utils:manifests_and_prefix_slice(ObjectPrefixTuple2,
UserMaxKeys),

{NewManis, NewPrefixes} =
riak_cs_list_objects_utils:untagged_manifest_and_prefix(SlicedTaggedItems),

NewStateData = RangeUpdatedStateData#state{objects=NewManis,
common_prefixes=NewPrefixes,
reached_end_of_keyspace=ReachedEnd,
object_buffer=[]},
respond(NewStateData, NewManis, NewPrefixes, Truncated).
respond(NewStateData, NewManis, NewPrefixes).

-spec update_profiling_and_last_request(state(), list(), integer()) ->
state().
Expand All @@ -236,16 +233,26 @@ update_profiling_and_last_request(State, ObjectBuffer, ObjectBufferLength) ->
ObjectBufferLength),
update_last_request_state(State2, ObjectBuffer).

-spec respond(state(), list(), ordsets:ordset(), boolean()) ->
-spec respond(state(), list(), ordsets:ordset()) ->
fsm_state_return().
respond(StateData=#state{req=Request},
Manifests, Prefixes, Truncated) ->
respond(StateData=#state{req=Request=?LOREQ{max_keys=UserMaxKeys,
delimiter=Delimiter}},
Manifests, Prefixes) ->
case enough_results(StateData) of
true ->
Truncated = truncated(UserMaxKeys, {Manifests, Prefixes}),
SlicedTaggedItems =
riak_cs_list_objects_utils:manifests_and_prefix_slice({Manifests, Prefixes},
UserMaxKeys),
NextMarker = next_marker(Delimiter, SlicedTaggedItems),

{NewManis, NewPrefixes} =
riak_cs_list_objects_utils:untagged_manifest_and_prefix(SlicedTaggedItems),
Response =
response_from_manifests_and_common_prefixes(Request,
Truncated,
{Manifests, Prefixes}),
NextMarker,
{NewManis, NewPrefixes}),
try_reply({ok, Response}, StateData);
false ->
RiakcPid = StateData#state.riakc_pid,
Expand All @@ -271,31 +278,52 @@ enough_results(#state{req=?LOREQ{max_keys=UserMaxKeys},
objects=Objects,
common_prefixes=CommonPrefixes}) ->
riak_cs_list_objects_utils:manifests_and_prefix_length({Objects, CommonPrefixes})
>= UserMaxKeys
> UserMaxKeys
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would you mind add some comments here?
I wonder why there is no equal sign when the name of function says "enough results".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, good catch. I'll update in the morning.

orelse EndOfKeyspace.

-spec next_marker(undefined | binary(),
riak_cs_list_objects_utils:tagged_item_list()) ->
next_marker().
next_marker(undefined, _List) ->
undefined;
next_marker(_Delimiter, []) ->
undefined;
next_marker(_Delimiter, List) ->
next_marker_from_element(lists:last(List)).

-spec next_marker_from_element(riak_cs_list_objects_utils:tagged_item()) ->
next_marker().
next_marker_from_element({prefix, Name}) ->
Name;
next_marker_from_element({manifest, ?MANIFEST{bkey={_Bucket, Key}}}) ->
Key;
next_marker_from_element({manifest, {Key, ?MANIFEST{}}}) ->
Key.

response_from_manifests_and_common_prefixes(Request,
Truncated,
NextMarker,
{Manifests, CommonPrefixes}) ->
KeyContent = lists:map(fun riak_cs_list_objects:manifest_to_keycontent/1,
Manifests),
riak_cs_list_objects:new_response(Request, Truncated, CommonPrefixes,
riak_cs_list_objects:new_response(Request, Truncated, NextMarker,
CommonPrefixes,
KeyContent).

-spec make_2i_request(pid(), state()) ->
{state(), {ok, reference()} | {error, term()}}.
make_2i_request(RiakcPid, State=#state{req=?LOREQ{name=BucketName}}) ->
make_2i_request(RiakcPid, State=#state{req=?LOREQ{name=BucketName},
fold_objects_batch_size=BatchSize}) ->
ManifestBucket = riak_cs_utils:to_bucket_name(objects, BucketName),
StartKey = make_start_key(State),
EndKey = big_end_key(128),
NumResults = 1002,
NewStateData = State#state{last_request_start_key=StartKey,
last_request_num_keys_requested=NumResults},
last_request_num_keys_requested=BatchSize},
NewStateData2 = update_profiling_state_with_start(NewStateData,
StartKey,
EndKey,
os:timestamp()),
Opts = [{max_results, NumResults},
Opts = [{max_results, BatchSize},
{start_key, StartKey},
{end_key, EndKey}],
FoldResult = riakc_pb_socket:cs_bucket_fold(RiakcPid,
Expand Down Expand Up @@ -367,22 +395,41 @@ common_prefix_from_key(Key, Prefix, Delimiter) ->
make_start_key(#state{object_list_ranges=[], req=Request}) ->
make_start_key_from_marker(Request);
make_start_key(State=#state{object_list_ranges=PrevRanges,
common_prefixes=CommonPrefixes,
req=?LOREQ{prefix=Prefix,
delimiter=Delimiter}}) ->
Key = element(2, lists:last(PrevRanges)),
case last_result_is_common_prefix(State) of
true ->
Key = element(2, lists:last(PrevRanges)),
LastPrefix = common_prefix_from_key(Key, Prefix, Delimiter),
skip_past_prefix_and_delimiter(LastPrefix);
case ordsets:is_element(LastPrefix, CommonPrefixes) of
true ->
skip_past_prefix_and_delimiter(LastPrefix);
false ->
Key
end;
false ->
element(2, lists:last(PrevRanges))
Key
end.

-spec make_start_key_from_marker(list_object_request()) -> binary().
make_start_key_from_marker(?LOREQ{marker=undefined}) ->
<<0:8/integer>>;
make_start_key_from_marker(?LOREQ{marker=Marker}) ->
Marker.
make_start_key_from_marker(?LOREQ{marker=Marker,
delimiter=undefined}) ->
Marker;
make_start_key_from_marker(?LOREQ{marker=Marker,
delimiter=Delimiter}) ->
DelSize = byte_size(Delimiter),
case binary:longest_common_suffix([Marker, Delimiter]) of
DelSize ->
%% when the `Marker' itself ends with the delimiter,
%% then we should skip past the entire set of keys
%% that would be rolled up into that common delimiter
skip_past_prefix_and_delimiter(Marker);
_Else ->
Marker
end.

big_end_key(NumBytes) ->
MaxByte = <<255:8/integer>>,
Expand Down
9 changes: 7 additions & 2 deletions src/riak_cs_xml.erl
Original file line number Diff line number Diff line change
Expand Up @@ -109,8 +109,13 @@ list_objects_response_to_xml(Resp) ->
Prefix <- Resp?LORESP.common_prefixes],
Contents = [make_external_node('Name', Resp?LORESP.name),
make_external_node('Prefix', Resp?LORESP.prefix),
make_external_node('Marker', Resp?LORESP.marker),
make_external_node('MaxKeys', Resp?LORESP.max_keys),
make_external_node('Marker', Resp?LORESP.marker)] ++
%% use a list-comprehension trick to only include
%% the `NextMarker' element if it's not `undefined'
[make_external_node('NextMarker', NextMarker) ||
NextMarker <- [Resp?LORESP.next_marker],
NextMarker =/= undefined] ++
[make_external_node('MaxKeys', Resp?LORESP.max_keys),
make_external_node('Delimiter', Resp?LORESP.delimiter),
make_external_node('IsTruncated', Resp?LORESP.is_truncated)] ++
KeyContents ++ CommonPrefixes,
Expand Down