Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Feature/log disconnected rebased #987

Merged
merged 6 commits into from Oct 9, 2014
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
2 changes: 1 addition & 1 deletion dialyzer.ignore-warnings.ee
@@ -1,6 +1,6 @@
# Errors
Function get_block_remote/5 has no local return
riak_cs_config.erl:212: Function will never be called
riak_cs_config.erl:214: Function will never be called
# Warnings
Unknown functions:
app_helper:get_prop_or_env/3
Expand Down
4 changes: 3 additions & 1 deletion src/riak_cs_access_archiver.erl
Expand Up @@ -208,8 +208,10 @@ store(User, RcPid, Record, Slice) ->
[User, Slice]);
{error, Reason} ->
ok = lager:error("Access archiver storage failed (~p), "
"stats for ~s ~p were lost",
"stats for ~s ~p were lost.",
[Reason, User, Slice]),
riak_cs_pbc:check_connection_status(MasterPbc,
"riak_cs_access_archiver:store/4"),
{error, Reason};
{'EXIT', {noproc, _}} ->
%% just haven't gotten the 'DOWN' yet
Expand Down
3 changes: 3 additions & 0 deletions src/riak_cs_block_server.erl
Expand Up @@ -386,6 +386,9 @@ constrained_delete(RcPid, RiakObject, BlockId) ->

secondary_delete_check({error, {unsatisfied_constraint, _, _}}, RcPid, RiakObject) ->
riakc_pb_socket:delete_obj(block_pbc(RcPid), RiakObject);
secondary_delete_check({error, Reason} = E, _, _) ->
_ = lager:warning("Constrained block deletion failed. Reason: ~p", [Reason]),
E;
secondary_delete_check(_, _, _) ->
ok.

Expand Down
1 change: 1 addition & 0 deletions src/riak_cs_bucket.erl
Expand Up @@ -262,6 +262,7 @@ iterate_csbuckets(RcPid, Acc0, Fun, Cont0) ->
_ ->
iterate_csbuckets(RcPid, Acc2, Fun, Cont)
end;

Error ->
_ = lager:error("iterating CS buckets: ~p", [Error]),
{error, {Error, Acc0}}
Expand Down
21 changes: 21 additions & 0 deletions src/riak_cs_config.erl
Expand Up @@ -51,6 +51,8 @@
set_user_buckets_prune_time/1,
riak_host_port/0,
connect_timeout/0,
queue_if_disconnected/0,
auto_reconnect/0,
is_multibag_enabled/0,
max_buckets_per_user/0
]).
Expand Down Expand Up @@ -307,6 +309,25 @@ connect_timeout() ->
10000
end.

%% @doc choose client connection option: true by default
-spec auto_reconnect() -> [{auto_reconnect, boolean()}].
auto_reconnect() ->
case application:get_env(riak_cs, riakc_auto_reconnect) of
{ok, true} -> [{auto_reconnect, true}];
{ok, false} -> [{auto_reconnect, false}];
_ -> [{auto_reconnect, true}]
end.

%% @doc choose client connection option: undefined by default, let
%% riak-erlang-client choose the default behaviour
-spec queue_if_disconnected() -> [{queue_if_disconnected, boolean()}].
queue_if_disconnected() ->
case application:get_env(riak_cs, riakc_queue_if_disconnected) of
{ok, true} -> [{queue_if_disconnected, true}];
{ok, false} -> [{queue_if_disconnected, false}];
_ -> []
end.

-spec is_multibag_enabled() -> boolean().
is_multibag_enabled() ->
application:get_env(riak_cs_multibag, bags) =/= undefined.
Expand Down
29 changes: 15 additions & 14 deletions src/riak_cs_gc.erl
Expand Up @@ -365,20 +365,21 @@ move_manifests_to_gc_bucket(Manifests, RcPid) ->
Key = generate_key(),
ManifestSet = build_manifest_set(Manifests),
{ok, ManifestPbc} = riak_cs_riak_client:manifest_pbc(RcPid),
ObjectToWrite = case riakc_pb_socket:get(ManifestPbc, ?GC_BUCKET, Key) of
{error, notfound} ->
%% There was no previous value, so we'll
%% create a new riak object and write it
riakc_obj:new(?GC_BUCKET, Key, riak_cs_utils:encode_term(ManifestSet));
{ok, PreviousObject} ->
%% There is a value currently stored here,
%% so resolve all the siblings and add the
%% new set in as well. Write this
%% value back to riak
Resolved = decode_and_merge_siblings(PreviousObject, ManifestSet),
riak_cs_utils:update_obj_value(PreviousObject,
riak_cs_utils:encode_term(Resolved))
end,
ObjectToWrite =
case riakc_pb_socket:get(ManifestPbc, ?GC_BUCKET, Key) of
{error, notfound} ->
%% There was no previous value, so we'll
%% create a new riak object and write it
riakc_obj:new(?GC_BUCKET, Key, riak_cs_utils:encode_term(ManifestSet));
{ok, PreviousObject} ->
%% There is a value currently stored here,
%% so resolve all the siblings and add the
%% new set in as well. Write this
%% value back to riak
Resolved = decode_and_merge_siblings(PreviousObject, ManifestSet),
riak_cs_utils:update_obj_value(PreviousObject,
riak_cs_utils:encode_term(Resolved))
end,

%% Create a set from the list of manifests
_ = lager:debug("Manifests scheduled for deletion: ~p", [ManifestSet]),
Expand Down
13 changes: 12 additions & 1 deletion src/riak_cs_gc_key_list.erl
Expand Up @@ -151,11 +151,22 @@ gc_index_query(RcPid, EndTime, BatchSize, Continuation, UsePaginatedIndexes) ->
[]
end,
{ok, ManifestPbc} = riak_cs_riak_client:manifest_pbc(RcPid),
EpochStart = riak_cs_gc:epoch_start(),
QueryResult = riakc_pb_socket:get_index_range(
ManifestPbc,
?GC_BUCKET, ?KEY_INDEX,
riak_cs_gc:epoch_start(), EndTime,
EpochStart, EndTime,
Options),

case QueryResult of
{error, disconnected} ->
_ = lager:warning("GC index query ~p to ~p failed.",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

riak_cs_pbc:check_connection_status logs of connection status, so this log seems unnecesarry.
General {error, Reason} is logged at eligible_manifest_keys 's branch ({{error, Reason}, EndTime}, _) ->.

[EpochStart, EndTime]),
riak_cs_pbc:check_connection_status(ManifestPbc, gc_index_query);
_ ->
ok
end,

{QueryResult, EndTime}.

-ifdef(TEST).
Expand Down
3 changes: 2 additions & 1 deletion src/riak_cs_gc_worker.erl
Expand Up @@ -224,8 +224,9 @@ fetch_next_fileset(ManifestSetKey, RcPid) ->
Error;
{error, Reason}=Error ->
_ = lager:info("Error occurred trying to read the fileset"
"for ~p for gc. Reason: ~p",
" for ~p for gc. Reason: ~p",
[ManifestSetKey, Reason]),
riak_cs_pbc:check_connection_status(ManifestPbc, fetch_next_fileset),
Error
end.

Expand Down
9 changes: 8 additions & 1 deletion src/riak_cs_manifest.erl
Expand Up @@ -107,7 +107,14 @@ get_manifests_raw(RcPid, Bucket, Key) ->
ManifestBucket = riak_cs_utils:to_bucket_name(objects, Bucket),
ok = riak_cs_riak_client:set_bucket_name(RcPid, Bucket),
{ok, ManifestPbc} = riak_cs_riak_client:manifest_pbc(RcPid),
riakc_pb_socket:get(ManifestPbc, ManifestBucket, Key).
case riakc_pb_socket:get(ManifestPbc, ManifestBucket, Key) of
{ok, _} = Result -> Result;
{error, disconnected} ->
riak_cs_pbc:check_connection_status(ManifestPbc, get_manifests_raw),
{error, disconnected};
Error ->
Error
end.

gc_deleted_while_writing_manifests(Object, Manifests, Bucket, Key, RcPid) ->
UUIDs = riak_cs_manifest_utils:deleted_while_writing(Manifests),
Expand Down
18 changes: 17 additions & 1 deletion src/riak_cs_pbc.erl
Expand Up @@ -28,7 +28,8 @@
put/3,
put_with_no_meta/2,
put_with_no_meta/3,
list_keys/2]).
list_keys/2,
check_connection_status/2]).
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

One extra white space char.


%% @doc Get an object from Riak
-spec get_object(pid(), binary(), binary()) ->
Expand Down Expand Up @@ -85,3 +86,18 @@ list_keys(PbcPid, BucketName) ->
{error, _}=Error ->
Error
end.

-spec check_connection_status(pid(), term()) -> no_return().
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

From http://www.erlang.org/doc/reference_manual/typespec.html,

Some functions in Erlang are not meant to return; either because they define servers or because they are used to throw exceptions as the function below:

  my_error(Err) -> erlang:throw({error, Err}).

For such functions we recommend the use of the special no_return() type for their "return", via a contract of the form:

 -spec my_error(term()) -> no_return().

This function return ok, so just ok or any() (if one does not care about return values) seems better.
But, I know, you are far better than me in respect to type theory, so please deny/correct me 😅

check_connection_status(Pbc, Where) ->
try
case riakc_pb_socket:is_connected(Pbc) of
true -> ok;
Other ->
_ = lager:warning("Connection status of ~p at ~p: ~p",
[Pbc, Where, Other])
end
catch
Type:Error ->
_ = lager:warning("Connection status of ~p at ~p: ~p",
[Pbc, Where, {Type, Error}])
end.
5 changes: 3 additions & 2 deletions src/riak_cs_riak_client.erl
Expand Up @@ -56,7 +56,7 @@
-define(SERVER, ?MODULE).

-record(state, {
master_pbc,
master_pbc :: undefined | pid(),
bucket_name,
bucket_obj
}).
Expand Down Expand Up @@ -282,7 +282,8 @@ get_user_with_pbc(MasterPbc, Key) ->
%% be able to properly resolve conflicts).
KeepDeletedBuckets = false,
{ok, {Obj, KeepDeletedBuckets}};
{error, _} ->
{error, Reason0} ->
_ = lager:warning("Fetching user record with strong option failed: ~p", [Reason0]),
WeakOptions = [{r, quorum}, {pr, one}, {notfound_ok, false}],
case riakc_pb_socket:get(MasterPbc, ?USER_BUCKET, Key, WeakOptions) of
{ok, Obj} ->
Expand Down
6 changes: 4 additions & 2 deletions src/riak_cs_riakc_pool_worker.erl
Expand Up @@ -53,8 +53,10 @@ start_link(Args) ->
undefined ->
10000
end,
StartOptions = [{connect_timeout, Timeout},
{auto_reconnect, true}],
StartOptions =
[{connect_timeout, Timeout}]
++ riak_cs_config:auto_reconnect()
++ riak_cs_config:queue_if_disconnected(),
riakc_pb_socket:start_link(Address, Port, StartOptions).

stop(undefined) ->
Expand Down
4 changes: 4 additions & 0 deletions src/riak_cs_wm_common.erl
Expand Up @@ -218,6 +218,10 @@ maybe_create_user({error, no_user_key}=Error, _, _, _, _, _) ->
%% Anonymous access may be authorized by ACL or policy afterwards,
%% no logging here.
Error;
maybe_create_user({error, disconnected}=Error, _, _, _, _, RcPid) ->
{ok, MasterPid} = riak_cs_riak_client:master_pbc(RcPid),
riak_cs_pbc:check_connection_status(MasterPid, maybe_create_user),
Error;
maybe_create_user({error, Reason}=Error, _, Api, _, _, _) ->
_ = lager:error("Retrieval of user record for ~p failed. Reason: ~p",
[Api, Reason]),
Expand Down
3 changes: 3 additions & 0 deletions test/riak_cs_gc_single_run_eqc.erl
Expand Up @@ -269,6 +269,7 @@ dummy_start_delete_fsm(_Node, [_RcPid, {_UUID, ?MANIFEST{bkey={_, K}}=_Manifest}
meck_fileset_get_and_delete() ->
meck:new(riak_cs_pbc, [passthrough]),
meck:expect(riak_cs_pbc, get_object, fun dummy_get_object/3),
meck:expect(riakc_pb_socket, is_connected, fun always_true/1),
meck:expect(riakc_pb_socket, delete_obj, fun dummy_delete_object/2).

dummy_get_object(_Pbc, <<"riak-cs-gc">>=B, K) ->
Expand All @@ -283,6 +284,8 @@ dummy_get_object(_Pbc, <<"riak-cs-gc">>=B, K) ->
dummy_get_object(_Pbc, _B, _K) ->
error.

always_true(_) -> true.

dummy_delete_object(_Pbc, RiakObj) ->
Key = riakc_obj:key(RiakObj),
case re:run(Key, <<"^error:in_block_delete/">>) of
Expand Down