Skip to content

Commit

Permalink
Merge branch 'hmmr/develop/saml' of github.com:TI-Tokyo/riak_cs into …
Browse files Browse the repository at this point in the history
…hmmr/develop/saml
  • Loading branch information
Andriy Zavada committed Aug 19, 2023
2 parents 46a863e + f940e5a commit 0250228
Show file tree
Hide file tree
Showing 10 changed files with 80 additions and 64 deletions.
64 changes: 43 additions & 21 deletions apps/riak_cs/include/manifest.hrl
Original file line number Diff line number Diff line change
Expand Up @@ -26,12 +26,10 @@


-define(MANIFEST, #lfs_manifest_v5).
-define(MULTIPART_MANIFEST, #multipart_manifest_v1).
-define(MULTIPART_MANIFEST_RECNAME, multipart_manifest_v1).
-define(PART_MANIFEST, #part_manifest_v2).
-define(PART_MANIFEST_RECNAME, part_manifest_v2).
-define(MULTIPART_MANIFEST, #multipart_manifest_v2).
-define(PART_MANIFEST, #part_manifest_v3).
-define(MULTIPART_DESCR, #multipart_descr_v2).
-define(PART_DESCR, #part_descr_v1).
-define(PART_DESCR, #part_descr_v2).

-define(LFS_DEFAULT_OBJECT_VERSION, <<"null">>).

Expand Down Expand Up @@ -245,7 +243,7 @@
-record(part_manifest_v1, {
bucket :: binary(),
key :: binary(),
start_time :: non_neg_integer(),
start_time :: erlang:timestamp(),
part_number :: integer(),
part_id :: binary(),
content_length :: integer(),
Expand All @@ -257,6 +255,19 @@
bucket :: binary(),
key :: binary(),
%% new in v2
vsn = ?LFS_DEFAULT_OBJECT_VERSION :: binary(),
start_time :: erlang:timestamp(),
part_number :: integer(),
part_id :: binary(),
content_length :: integer(),
content_md5 :: undefined | binary(),
block_size :: integer()
}).

-record(part_manifest_v3, {
bucket :: binary(),
key :: binary(),

vsn = ?LFS_DEFAULT_OBJECT_VERSION :: binary(),

%% used to judge races between concurrent uploads
Expand All @@ -279,9 +290,19 @@
%% parts for the same upload id could have different block_sizes.
block_size :: integer()
}).
-type part_manifest() :: #part_manifest_v2{}.
-type part_manifest() :: #part_manifest_v3{}.


-record(multipart_manifest_v1, {
upload_id :: binary(),
owner :: acl_owner_old(),
parts = ordsets:new() :: ordsets:ordset(#part_manifest_v2{}),
done_parts = ordsets:new() :: ordsets:ordset({binary(), binary()}),
cleanup_parts = ordsets:new() :: ordsets:ordset(#part_manifest_v2{}),
props = [] :: proplists:proplist()
}).

-record(multipart_manifest_v2, {
upload_id :: binary(),
owner :: acl_owner(),

Expand All @@ -297,38 +318,31 @@
%% with this `upload_id' so far. A part
%% can be uploaded more than once with the same
%% part number. type = #part_manifest_vX
parts = ordsets:new() :: ordsets:ordset(?PART_MANIFEST{}),
parts = ordsets:new() :: ordsets:ordset(#part_manifest_v3{}),
%% List of UUIDs for parts that are done uploading.
%% The part number is redundant, so we only store
%% {UUID::binary(), PartETag::binary()} here.
done_parts = ordsets:new() :: ordsets:ordset({binary(), binary()}),
%% type = #part_manifest_vX
cleanup_parts = ordsets:new() :: ordsets:ordset(?PART_MANIFEST{}),
cleanup_parts = ordsets:new() :: ordsets:ordset(#part_manifest_v3{}),

%% a place to stuff future information
%% without having to change
%% the record format
props = [] :: proplists:proplist()
}).
-type multipart_manifest() :: #multipart_manifest_v1{}.

-type multipart_manifest() :: #multipart_manifest_v2{}.


%% Basis of list multipart uploads output
-record(multipart_descr_v1, {
%% Object key for the multipart upload
key :: binary(),

%% UUID of the multipart upload
upload_id :: binary(),

%% User that initiated the upload
owner_display :: string(),
owner_key_id :: string(),

%% storage class: no real options here
storage_class = standard,

%% Time that the upload was initiated
initiated :: string() %% conflict of func vs. type: riak_cs_wm_utils:iso_8601_datetime()
initiated :: string()
}).

-record(multipart_descr_v2, {
Expand All @@ -351,15 +365,23 @@

-type multipart_descr() :: #multipart_descr_v2{}.


%% Basis of multipart list parts output
-record(part_descr_v1, {
part_number :: integer(),
last_modified :: erlang:timestamp(),
etag :: binary(),
size :: integer()
}).

-record(part_descr_v2, {
part_number :: integer(),
last_modified :: non_neg_integer(),
etag :: binary(),
size :: integer()
}).

-type part_descr() :: #part_descr_v1{}.
-type part_descr() :: #part_descr_v2{}.


-endif.
2 changes: 1 addition & 1 deletion apps/riak_cs/src/riak_cs_acl.erl
Original file line number Diff line number Diff line change
Expand Up @@ -328,7 +328,7 @@ exprec_grant(Map) ->
acl_from_meta([]) ->
?ACL_UNDEF;
acl_from_meta([{?MD_ACL, Acl} | _]) ->
{ok, binary_to_term(Acl)};
{ok, upgrade_acl_record(binary_to_term(Acl))};
acl_from_meta([_ | RestMD]) ->
acl_from_meta(RestMD).

Expand Down
9 changes: 6 additions & 3 deletions apps/riak_cs/src/riak_cs_app.erl
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@
%% application API
-export([start/2,
stop/1]).
-export([atoms_for_check_bucket_props/0]).

-include("riak_cs.hrl").
-include_lib("riakc/include/riakc.hrl").
Expand Down Expand Up @@ -83,14 +84,16 @@ ensure_bucket_props(Pbc) ->
?IAM_POLICY_BUCKET,
?IAM_SAMLPROVIDER_BUCKET,
?TEMP_SESSIONS_BUCKET],
%% %% Put atoms into atom table to suppress warning logs in `check_bucket_props'
%% _PreciousAtoms = [riak_core_util, chash_std_keyfun,
%% riak_kv_wm_link_walker, mapreduce_linkfun],
[riakc_pb_socket:set_bucket(Pbc, B, [{allow_mult, true}]) || B <- BucketsWithMultiTrue],
[riakc_pb_socket:set_bucket(Pbc, B, [{allow_mult, false}]) || B <- BucketsWithMultiFalse],
?LOG_DEBUG("ensure_bucket_props done"),
ok.

%% Put atoms into atom table to suppress warning logs in `check_bucket_props'
atoms_for_check_bucket_props() ->
[riak_core_util, chash_std_keyfun,
riak_kv_wm_link_walker, mapreduce_linkfun].

check_admin_creds(Pbc) ->
case riak_cs_config:admin_creds() of
{ok, {<<"admin-key">>, _}} ->
Expand Down
10 changes: 1 addition & 9 deletions apps/riak_cs/src/riak_cs_block_server.erl
Original file line number Diff line number Diff line change
Expand Up @@ -345,9 +345,6 @@ handle_local_notfound(RcPid, FullBucket, FullKey, GetOptions2,
RetryFun({failure, [{local_quorum, Other}|ErrorReasons]})
end.

-spec get_block_local(riak_client(), binary(), binary(), list(),
timeout(), riak_cs_stats:key()) ->
{ok, binary()} | {error, term()}.
get_block_local(RcPid, FullBucket, FullKey, GetOptions, Timeout, StatsKey) ->
case riak_cs_pbc:get(block_pbc(RcPid), FullBucket, FullKey,
GetOptions, Timeout, StatsKey) of
Expand All @@ -359,9 +356,6 @@ get_block_local(RcPid, FullBucket, FullKey, GetOptions, Timeout, StatsKey) ->

-dialyzer([{no_match, get_block_remote/6}]).

-spec get_block_remote(riak_client(), binary(), binary(), binary(), get_options(),
riak_cs_stats:key()) ->
{ok, binary()} | {error, term()}.
get_block_remote(RcPid, FullBucket, FullKey, ClusterID, GetOptions0, StatsKey) ->
%% replace get_block_timeout with proxy_get_block_timeout
GetOptions = proplists:delete(timeout, GetOptions0),
Expand Down Expand Up @@ -497,8 +491,6 @@ full_bkey(Bucket, Key, UUID, BlockId) ->
find_md_usermeta(MD) ->
dict:find(?MD_USERMETA, MD).

-spec resolve_block_object(riakc_obj:riakc_obj(), riak_client()) ->
{ok, binary()} | {error, notfound}.
resolve_block_object(RObj, RcPid) ->
{{MD, Value}, NeedRepair} =
riak_cs_utils:resolve_robj_siblings(riakc_obj:get_contents(RObj)),
Expand All @@ -523,7 +515,7 @@ resolve_block_object(RObj, RcPid) ->
do_put_block(RBucket, RKey, VClock, Value, MD, RcPid,
[riakc, put_block_resolved], FailFun);
NeedRepair andalso not is_binary(Value) ->
logger:error("All checksums fail: ~P", [RObj, 200]);
logger:error("All checksums fail: ~P", [RObj]);
true ->
ok
end,
Expand Down
5 changes: 2 additions & 3 deletions apps/riak_cs/src/riak_cs_bucket.erl
Original file line number Diff line number Diff line change
Expand Up @@ -238,8 +238,6 @@ fold_delete_uploads(Bucket, RcPid, [?MULTIPART_DESCR{key = VKey,
fold_all_buckets(Fun, Acc0, RcPid) when is_function(Fun) ->
iterate_csbuckets(RcPid, Acc0, Fun, undefined).

-spec iterate_csbuckets(riak_client(), term(), fun(), binary()|undefined) ->
{ok, term()} | {error, any()}.
iterate_csbuckets(RcPid, Acc0, Fun, Cont0) ->

Options = case Cont0 of
Expand All @@ -251,7 +249,8 @@ iterate_csbuckets(RcPid, Acc0, Fun, Cont0) ->
case riak_cs_pbc:get_index_range(MasterPbc, ?BUCKETS_BUCKET,
<<"$key">>, <<0>>, <<255>>,
Options, [riakc, get_cs_buckets_by_index]) of
{ok, ?INDEX_RESULTS{keys=BucketNames, continuation=Cont}} ->
{ok, ?INDEX_RESULTS{keys = BucketNames,
continuation = Cont}} ->
Foldfun = iterate_csbuckets_fold_fun(Fun),
Acc2 = lists:foldl(Foldfun, Acc0, BucketNames),
case Cont of
Expand Down
8 changes: 4 additions & 4 deletions apps/riak_cs/src/riak_cs_lfs_utils.erl
Original file line number Diff line number Diff line change
Expand Up @@ -111,13 +111,13 @@ initial_blocks(ContentLength, BlockSize) ->
lists:seq(0, (UpperBound - 1)).

-spec initial_blocks(integer(), integer(), binary()) ->
[{binary(), integer()}].
[{binary(), integer()}].
initial_blocks(ContentLength, SafeBlockSize, UUID) ->
Bs = initial_blocks(ContentLength, SafeBlockSize),
[{UUID, B} || B <- Bs].

-spec range_blocks(integer(), integer(), integer(), binary()) ->
{[{binary(), integer()}], integer(), integer()}.
{[{binary(), integer()}], integer(), integer()}.
range_blocks(Start, End, SafeBlockSize, UUID) ->
SkipInitial = Start rem SafeBlockSize,
KeepFinal = (End rem SafeBlockSize) + 1,
Expand All @@ -128,7 +128,7 @@ range_blocks(Start, End, SafeBlockSize, UUID) ->
SkipInitial, KeepFinal}.

-spec block_sequences_for_manifest(lfs_manifest()) ->
ordsets:ordset({binary(), integer()}).
ordsets:ordset({binary(), integer()}).
block_sequences_for_manifest(?MANIFEST{uuid=UUID,
content_length=ContentLength}=Manifest)->
SafeBlockSize = safe_block_size_from_manifest(Manifest),
Expand All @@ -143,7 +143,7 @@ block_sequences_for_manifest(?MANIFEST{uuid=UUID,
end.

-spec block_sequences_for_manifest(lfs_manifest(), {integer(), integer()}) ->
{[{binary(), integer()}], integer(), integer()}.
{[{binary(), integer()}], integer(), integer()}.
block_sequences_for_manifest(?MANIFEST{uuid=UUID}=Manifest,
{Start, End})->
SafeBlockSize = safe_block_size_from_manifest(Manifest),
Expand Down
6 changes: 3 additions & 3 deletions apps/riak_cs/src/riak_cs_pbc.erl
Original file line number Diff line number Diff line change
Expand Up @@ -78,7 +78,7 @@ ping(PbcPid, Timeout, StatsKey) ->
%% @doc Get an object from Riak
-spec get_sans_stats(pid(), binary(), binary(), proplists:proplist(), timeout()) ->
{ok, riakc_obj:riakc_obj()} | {error, term()}.
get_sans_stats(PbcPid, BucketName, Key, Opts, Timeout) ->
get_sans_stats(PbcPid, BucketName, Key, Opts, Timeout) ->
riakc_pb_socket:get(PbcPid, BucketName, Key, Opts, Timeout).


Expand Down Expand Up @@ -125,8 +125,8 @@ get(Pbc, Bucket, Key, StatsItem) ->


-spec repl_get(pid(), binary(), binary(), binary(),
proplists:proplist(), timeout(), riak_cs_stats:key()) ->
{ok, riakc_obj:riakc_obj()} | {error, term()}.
proplists:proplist(), timeout(), riak_cs_stats:key()) ->
{ok, riakc_obj:riakc_obj()} | {error, term()}.
repl_get(PbcPid, BucketName, Key, ClusterID, Opts, Timeout, StatsKey) ->
?WITH_STATS(StatsKey,
riak_repl_pb_api:get(PbcPid, BucketName, Key, ClusterID, Opts, Timeout)).
Expand Down
5 changes: 3 additions & 2 deletions apps/riak_cs/src/riak_cs_riak_client.erl
Original file line number Diff line number Diff line change
Expand Up @@ -259,10 +259,11 @@ stop_pbc(Pbc) when is_pid(Pbc) ->

do_get_bucket(State) ->
case ensure_master_pbc(State) of
{ok, #state{master_pbc=MasterPbc, bucket_name=BucketName} = NewState} ->
{ok, #state{master_pbc = MasterPbc,
bucket_name = BucketName} = NewState} ->
case get_bucket_with_pbc(MasterPbc, BucketName) of
{ok, Obj} ->
{ok, NewState#state{bucket_obj=Obj}};
{ok, NewState#state{bucket_obj = Obj}};
{error, Reason} ->
{error, Reason, NewState}
end;
Expand Down
3 changes: 1 addition & 2 deletions apps/riak_cs/src/riak_cs_utils.erl
Original file line number Diff line number Diff line change
Expand Up @@ -389,8 +389,7 @@ pow(Base, Power, Acc) ->
-type resolve_ok() :: {term(), binary()}.
-type resolve_error() :: {atom(), atom()}.
-spec resolve_robj_siblings(RObj::term()) ->
{resolve_ok() | resolve_error(), NeedsRepair::boolean()}.

{resolve_ok() | resolve_error(), NeedsRepair::boolean()}.
resolve_robj_siblings(Cs) ->
[{BestRating, BestMDV}|Rest] = lists:sort([{rate_a_dict(MD, V), MDV} ||
{MD, V} = MDV <- Cs]),
Expand Down
Loading

0 comments on commit 0250228

Please sign in to comment.