Skip to content
This repository has been archived by the owner on May 27, 2022. It is now read-only.

Commit

Permalink
Fixes
Browse files Browse the repository at this point in the history
  • Loading branch information
bieniusa committed Jul 2, 2019
1 parent 3384377 commit c04faea
Show file tree
Hide file tree
Showing 2 changed files with 13 additions and 16 deletions.
20 changes: 10 additions & 10 deletions src/antidotec_pb.erl
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ start_transaction(Pid, TimeStamp, TxnProperties) ->
true ->
{ok, {static, {TimeStamp, TxnProperties}}};
false ->
EncMsg = antidote_pb_codec:encode(start_transaction,
{TimeStamp, TxnProperties}),
EncMsg = antidote_pb_codec:encode({start_transaction,
{TimeStamp, TxnProperties}}),
Result = antidotec_pb_socket:call_infinity(Pid, {req, EncMsg, ?TIMEOUT}),
case Result of
{error, timeout} ->
Expand All @@ -64,7 +64,7 @@ start_transaction(Pid, TimeStamp, TxnProperties) ->

-spec abort_transaction(Pid::term(), TxId::term()) -> ok | {error, term()}.
abort_transaction(Pid, {interactive, TxId}) ->
EncMsg = antidote_pb_codec:encode(abort_transaction, TxId),
EncMsg = antidote_pb_codec:encode({abort_transaction, TxId}),
Result = antidotec_pb_socket:call_infinity(Pid, {req, EncMsg, ?TIMEOUT}),
case Result of
{error, timeout} -> {error, timeout};
Expand All @@ -79,7 +79,7 @@ abort_transaction(Pid, {interactive, TxId}) ->
-spec commit_transaction(Pid::term(), TxId::{interactive, term()} | {static, term()}) ->
{ok, term()} | {error, term()}.
commit_transaction(Pid, {interactive, TxId}) ->
EncMsg = antidote_pb_codec:encode(commit_transaction, TxId),
EncMsg = antidote_pb_codec:encode({commit_transaction, TxId}),
Result = antidotec_pb_socket:call_infinity(Pid, {req, EncMsg, ?TIMEOUT}),
case Result of
{error, timeout} -> {error, timeout};
Expand All @@ -98,7 +98,7 @@ commit_transaction(Pid, {static, _TxId}) ->

-spec update_objects(Pid::term(), Updates::[{term(), term(), term()}], TxId::term()) -> ok | {error, term()}.
update_objects(Pid, Updates, {interactive, TxId}) ->
EncMsg = antidote_pb_codec: encode(update_objects, {Updates, TxId}),
EncMsg = antidote_pb_codec: encode({update_objects, {Updates, TxId}}),
Result = antidotec_pb_socket: call_infinity(Pid, {req, EncMsg, ?TIMEOUT}),
case Result of
{error, timeout} -> {error, timeout};
Expand All @@ -112,8 +112,8 @@ update_objects(Pid, Updates, {interactive, TxId}) ->

update_objects(Pid, Updates, {static, TxId}) ->
{Clock, Properties} = TxId,
EncMsg = antidote_pb_codec:encode(static_update_objects,
{Clock, Properties, Updates}),
EncMsg = antidote_pb_codec:encode({static_update_objects,
{Clock, Properties, Updates}}),
Result = antidotec_pb_socket:call_infinity(Pid, {req, EncMsg, ?TIMEOUT}),
case Result of
{error, timeout} -> {error, timeout};
Expand Down Expand Up @@ -143,7 +143,7 @@ read_objects(Pid, Objects, Transaction) ->

-spec read_values(Pid::term(), Objects::[term()], TxId::term()) -> {ok, [term()]} | {error, term()}.
read_values(Pid, Objects, {interactive, TxId}) ->
EncMsg = antidote_pb_codec:encode(read_objects, {Objects, TxId}),
EncMsg = antidote_pb_codec:encode({read_objects, {Objects, TxId}}),
Result = antidotec_pb_socket:call_infinity(Pid, {req, EncMsg, ?TIMEOUT}),
case Result of
{error, timeout} -> {error, timeout};
Expand All @@ -157,8 +157,8 @@ read_values(Pid, Objects, {interactive, TxId}) ->
end;
read_values(Pid, Objects, {static, TxId}) ->
{Clock, Properties} = TxId,
EncMsg = antidote_pb_codec:encode(static_read_objects,
{Clock, Properties, Objects}),
EncMsg = antidote_pb_codec:encode({static_read_objects,
{Clock, Properties, Objects}}),
Result = antidotec_pb_socket:call_infinity(Pid, {req, EncMsg, ?TIMEOUT}),
case Result of
{error, timeout} -> {error, timeout};
Expand Down
9 changes: 3 additions & 6 deletions src/antidotec_pb_socket.erl
Original file line number Diff line number Diff line change
Expand Up @@ -30,10 +30,8 @@
-type address() :: string() | atom() | inet:ip_address().
%% The TCP port number of the Riak node's Protocol Buffers interface
-type portnum() :: non_neg_integer().
-type msg_id() :: non_neg_integer().
-type rpb_req() :: {tunneled, msg_id(), binary()} | atom() | tuple().

-record(request, {ref :: reference(), msg :: rpb_req(), from, timeout :: timeout(),
-record(request, {ref :: reference(), from, timeout :: timeout(),
tref :: reference() | undefined }).

-record(state, {
Expand Down Expand Up @@ -112,10 +110,9 @@ get_last_commit_time(Pid) ->
%% @private
handle_call({req, Msg, Timeout}, From, State) ->
Ref = make_ref(),
Req = #request{ref = Ref, msg = Msg, from = From, timeout = Timeout,
Req = #request{ref = Ref, from = From, timeout = Timeout,
tref = create_req_timer(Timeout, Ref)},
Pkt = antidote_pb_codec:encode_msg(Msg),
NewState = case gen_tcp:send(State#state.sock, Pkt) of
NewState = case gen_tcp:send(State#state.sock, Msg) of
ok ->
maybe_reply({noreply, State#state{active = Req}});
{error, Reason} ->
Expand Down

0 comments on commit c04faea

Please sign in to comment.