Skip to content

Commit

Permalink
Fix the service configuration option response_timeout_immediate_max w…
Browse files Browse the repository at this point in the history
…hen request_name_lookup is set to async and a lookup timeout occurs. Fix cloudi_queue:send when PatternPid is provided as undefined.
  • Loading branch information
okeuday committed Sep 2, 2017
1 parent 9ce26f2 commit f1e73be
Show file tree
Hide file tree
Showing 4 changed files with 118 additions and 47 deletions.
6 changes: 6 additions & 0 deletions src/ChangeLog
Original file line number Diff line number Diff line change
@@ -1,6 +1,12 @@
# -*- coding: utf-8; tab-width: 4; -*-
# ex: set fenc=utf-8 sts=4 ts=4 et nomod:

2017-09-02 Michael Truog <mjtruog [at] gmail (dot) com>

* Fix the service configuration option response_timeout_immediate_max
when request_name_lookup is set to async and a lookup timeout occurs
* Fix cloudi_queue:send when PatternPid is provided as undefined

2017-09-01 Michael Truog <mjtruog [at] gmail (dot) com>

* Fix release script to always succeed with stop (for openrc-run)
Expand Down
20 changes: 20 additions & 0 deletions src/lib/cloudi_core/src/cloudi_core_i_services_external.erl
Original file line number Diff line number Diff line change
Expand Up @@ -592,6 +592,8 @@ handle_event(EventType, EventContent, StateName, State) ->
dest_allow = DestAllow,
options = #config_service_options{
request_name_lookup = RequestNameLookup,
response_timeout_immediate_max =
ResponseTimeoutImmediateMax,
scope = Scope,
aspects_request_after = AspectsAfter}} = State) ->
true = is_list(NextName) andalso is_integer(hd(NextName)),
Expand Down Expand Up @@ -624,6 +626,14 @@ handle_event(EventType, EventContent, StateName, State) ->
ok;
{error, _}
when RequestNameLookup =:= async ->
if
NewTimeout >= ResponseTimeoutImmediateMax ->
Source ! {'cloudi_service_return_async',
Name, Pattern, <<>>, <<>>,
NewTimeout, TransId, Source};
true ->
ok
end,
ok;
{error, _}
when NewTimeout >= ?FORWARD_ASYNC_INTERVAL ->
Expand Down Expand Up @@ -678,6 +688,8 @@ handle_event(EventType, EventContent, StateName, State) ->
dest_allow = DestAllow,
options = #config_service_options{
request_name_lookup = RequestNameLookup,
response_timeout_immediate_max =
ResponseTimeoutImmediateMax,
scope = Scope,
aspects_request_after = AspectsAfter}} = State) ->
true = is_list(NextName) andalso is_integer(hd(NextName)),
Expand Down Expand Up @@ -709,6 +721,14 @@ handle_event(EventType, EventContent, StateName, State) ->
ok;
{error, _}
when RequestNameLookup =:= async ->
if
NewTimeout >= ResponseTimeoutImmediateMax ->
Source ! {'cloudi_service_return_sync',
Name, Pattern, <<>>, <<>>,
NewTimeout, TransId, Source};
true ->
ok
end,
ok;
{error, _}
when NewTimeout >= ?FORWARD_SYNC_INTERVAL ->
Expand Down
94 changes: 60 additions & 34 deletions src/lib/cloudi_core/src/cloudi_core_i_services_internal.erl
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@
%%%
%%% @author Michael Truog <mjtruog [at] gmail (dot) com>
%%% @copyright 2011-2017 Michael Truog
%%% @version 1.7.1 {@date} {@time}
%%% @version 1.7.2 {@date} {@time}
%%%------------------------------------------------------------------------

-module(cloudi_core_i_services_internal).
Expand Down Expand Up @@ -877,9 +877,9 @@ handle_info({'cloudi_service_request_success', RequestResponse,
Source ! T;
{'cloudi_service_return_sync', _, _, _, _, _, _, Source} = T ->
Source ! T;
{'cloudi_service_forward_async_retry', _, _, _, _, _, _, _} = T ->
{'cloudi_service_forward_async_retry', _, _, _, _, _, _, _, _, _} = T ->
Dispatcher ! T;
{'cloudi_service_forward_sync_retry', _, _, _, _, _, _, _} = T ->
{'cloudi_service_forward_sync_retry', _, _, _, _, _, _, _, _, _} = T ->
Dispatcher ! T
end,
NewState = process_queues(State#state{service_state = NewServiceState}),
Expand Down Expand Up @@ -968,35 +968,48 @@ handle_info({'cloudi_service_mcast_async_active_retry',
Timeout, Priority,
Client, State));

handle_info({'cloudi_service_forward_async_retry',
Name, RequestInfo, Request, Timeout, Priority, TransId, Source},
handle_info({'cloudi_service_forward_async_retry', Name, Pattern,
NextName, NextRequestInfo, NextRequest,
Timeout, Priority, TransId, Source},
#state{dest_refresh = DestRefresh,
cpg_data = Groups,
dest_deny = DestDeny,
dest_allow = DestAllow,
options = #config_service_options{
request_name_lookup = RequestNameLookup,
response_timeout_immediate_max =
ResponseTimeoutImmediateMax,
scope = Scope}} = State) ->
case destination_allowed(Name, DestDeny, DestAllow) of
case destination_allowed(NextName, DestDeny, DestAllow) of
true ->
case destination_get(DestRefresh, Scope, Name, Source,
case destination_get(DestRefresh, Scope, NextName, Source,
Groups, Timeout) of
{error, timeout} ->
ok;
{error, _} when RequestNameLookup =:= async ->
if
Timeout >= ResponseTimeoutImmediateMax ->
Source ! {'cloudi_service_return_async',
Name, Pattern, <<>>, <<>>,
Timeout, TransId, Source};
true ->
ok
end,
ok;
{error, _} when Timeout >= ?FORWARD_ASYNC_INTERVAL ->
erlang:send_after(?FORWARD_ASYNC_INTERVAL, self(),
{'cloudi_service_forward_async_retry',
Name, RequestInfo, Request,
Name, Pattern,
NextName, NextRequestInfo, NextRequest,
Timeout - ?FORWARD_ASYNC_INTERVAL,
Priority, TransId, Source}),
ok;
{error, _} ->
ok;
{ok, NextPattern, NextPid} when Timeout >= ?FORWARD_DELTA ->
NextPid ! {'cloudi_service_send_async', Name, NextPattern,
RequestInfo, Request,
NextPid ! {'cloudi_service_send_async',
NextName, NextPattern,
NextRequestInfo, NextRequest,
Timeout - ?FORWARD_DELTA,
Priority, TransId, Source};
_ ->
Expand All @@ -1007,35 +1020,48 @@ handle_info({'cloudi_service_forward_async_retry',
end,
hibernate_check({noreply, State});

handle_info({'cloudi_service_forward_sync_retry', Name, RequestInfo, Request,
handle_info({'cloudi_service_forward_sync_retry', Name, Pattern,
NextName, NextRequestInfo, NextRequest,
Timeout, Priority, TransId, Source},
#state{dest_refresh = DestRefresh,
cpg_data = Groups,
dest_deny = DestDeny,
dest_allow = DestAllow,
options = #config_service_options{
request_name_lookup = RequestNameLookup,
response_timeout_immediate_max =
ResponseTimeoutImmediateMax,
scope = Scope}} = State) ->
case destination_allowed(Name, DestDeny, DestAllow) of
case destination_allowed(NextName, DestDeny, DestAllow) of
true ->
case destination_get(DestRefresh, Scope, Name, Source,
case destination_get(DestRefresh, Scope, NextName, Source,
Groups, Timeout) of
{error, timeout} ->
ok;
{error, _} when RequestNameLookup =:= async ->
if
Timeout >= ResponseTimeoutImmediateMax ->
Source ! {'cloudi_service_return_sync',
Name, Pattern, <<>>, <<>>,
Timeout, TransId, Source};
true ->
ok
end,
ok;
{error, _} when Timeout >= ?FORWARD_SYNC_INTERVAL ->
erlang:send_after(?FORWARD_SYNC_INTERVAL, self(),
{'cloudi_service_forward_sync_retry',
Name, RequestInfo, Request,
Name, Pattern,
NextName, NextRequestInfo, NextRequest,
Timeout - ?FORWARD_SYNC_INTERVAL,
Priority, TransId, Source}),
ok;
{error, _} ->
ok;
{ok, NextPattern, NextPid} when Timeout >= ?FORWARD_DELTA ->
NextPid ! {'cloudi_service_send_sync', Name, NextPattern,
RequestInfo, Request,
NextPid ! {'cloudi_service_send_sync',
NextName, NextPattern,
NextRequestInfo, NextRequest,
Timeout - ?FORWARD_DELTA,
Priority, TransId, Source};
_ ->
Expand Down Expand Up @@ -2396,8 +2422,8 @@ handle_module_request(Type, Name, Pattern, RequestInfo, Request,
NewServiceState}
end;
{'cloudi_service_request_success',
{ForwardType, NextName,
NextRequestInfo, NextRequest,
{ForwardType, Name, Pattern,
NextName, NextRequestInfo, NextRequest,
NextTimeout, NextPriority, TransId, Source},
NewServiceState}
when ForwardType =:= 'cloudi_service_forward_async_retry';
Expand All @@ -2420,8 +2446,8 @@ handle_module_request(Type, Name, Pattern, RequestInfo, Request,
NextTimeout
end,
{'cloudi_service_request_success',
{ForwardType, NextName,
NextRequestInfo, NextRequest,
{ForwardType, Name, Pattern,
NextName, NextRequestInfo, NextRequest,
NewTimeout, NextPriority, TransId, Source},
FinalServiceState};
{stop, Reason, FinalServiceState} ->
Expand Down Expand Up @@ -2527,15 +2553,15 @@ handle_module_request_f('send_async', Name, Pattern, RequestInfo, Request,
{forward, NextName, NextRequestInfo, NextRequest,
NextTimeout, NextPriority, NewServiceState} ->
{'cloudi_service_request_success',
{'cloudi_service_forward_async_retry', NextName,
NextRequestInfo, NextRequest,
{'cloudi_service_forward_async_retry', Name, Pattern,
NextName, NextRequestInfo, NextRequest,
NextTimeout, NextPriority, TransId, Source},
NewServiceState};
{forward, NextName, NextRequestInfo, NextRequest,
NewServiceState} ->
{'cloudi_service_request_success',
{'cloudi_service_forward_async_retry', NextName,
NextRequestInfo, NextRequest,
{'cloudi_service_forward_async_retry', Name, Pattern,
NextName, NextRequestInfo, NextRequest,
Timeout, Priority, TransId, Source},
NewServiceState};
{noreply, NewServiceState} ->
Expand Down Expand Up @@ -2607,8 +2633,8 @@ handle_module_request_f('send_async', Name, Pattern, RequestInfo, Request,
NextTimeout, NextPriority, TransId, Source}}
when ForwardType =:= 'cloudi_service_forward_async_retry' ->
{'cloudi_service_request_success',
{ForwardType, NextName,
NextRequestInfo, NextRequest,
{ForwardType, Name, Pattern,
NextName, NextRequestInfo, NextRequest,
NextTimeout, NextPriority, TransId, Source},
ServiceState};
ErrorType:Error ->
Expand Down Expand Up @@ -2674,15 +2700,15 @@ handle_module_request_f('send_sync', Name, Pattern, RequestInfo, Request,
{forward, NextName, NextRequestInfo, NextRequest,
NextTimeout, NextPriority, NewServiceState} ->
{'cloudi_service_request_success',
{'cloudi_service_forward_sync_retry', NextName,
NextRequestInfo, NextRequest,
{'cloudi_service_forward_sync_retry', Name, Pattern,
NextName, NextRequestInfo, NextRequest,
NextTimeout, NextPriority, TransId, Source},
NewServiceState};
{forward, NextName, NextRequestInfo, NextRequest,
NewServiceState} ->
{'cloudi_service_request_success',
{'cloudi_service_forward_sync_retry', NextName,
NextRequestInfo, NextRequest,
{'cloudi_service_forward_sync_retry', Name, Pattern,
NextName, NextRequestInfo, NextRequest,
Timeout, Priority, TransId, Source},
NewServiceState};
{noreply, NewServiceState} ->
Expand Down Expand Up @@ -2754,8 +2780,8 @@ handle_module_request_f('send_sync', Name, Pattern, RequestInfo, Request,
NextTimeout, NextPriority, TransId, Source}}
when ForwardType =:= 'cloudi_service_forward_sync_retry' ->
{'cloudi_service_request_success',
{ForwardType, NextName,
NextRequestInfo, NextRequest,
{ForwardType, Name, Pattern,
NextName, NextRequestInfo, NextRequest,
NextTimeout, NextPriority, TransId, Source},
ServiceState};
ErrorType:Error ->
Expand Down Expand Up @@ -3527,9 +3553,9 @@ duo_handle_info({'cloudi_service_request_success', RequestResponse,
Source ! T;
{'cloudi_service_return_sync', _, _, _, _, _, _, Source} = T ->
Source ! T;
{'cloudi_service_forward_async_retry', _, _, _, _, _, _, _} = T ->
{'cloudi_service_forward_async_retry', _, _, _, _, _, _, _, _, _} = T ->
Dispatcher ! T;
{'cloudi_service_forward_sync_retry', _, _, _, _, _, _, _} = T ->
{'cloudi_service_forward_sync_retry', _, _, _, _, _, _, _, _, _} = T ->
Dispatcher ! T
end,
NewState = State#state_duo{service_state = NewServiceState},
Expand Down
45 changes: 32 additions & 13 deletions src/lib/cloudi_core/src/cloudi_queue.erl
Original file line number Diff line number Diff line change
Expand Up @@ -48,7 +48,7 @@
%%%
%%% @author Michael Truog <mjtruog [at] gmail (dot) com>
%%% @copyright 2015-2017 Michael Truog
%%% @version 1.7.1 {@date} {@time}
%%% @version 1.7.2 {@date} {@time}
%%%------------------------------------------------------------------------

-module(cloudi_queue).
Expand Down Expand Up @@ -291,7 +291,7 @@ send(Dispatcher, Name, Request, Timeout, State) ->
Name :: cloudi_service:service_name(),
Request :: cloudi_service:request(),
Timeout :: cloudi_service:timeout_milliseconds(),
PatternPid :: cloudi_service:pattern_pid(),
PatternPid :: cloudi_service:pattern_pid() | undefined,
State :: state()) ->
{ok, NewState :: state()} |
{{error, Reason :: cloudi_service:error_reason()}, NewState :: state()}.
Expand Down Expand Up @@ -349,17 +349,9 @@ send(Dispatcher, Name, RequestInfo, Request, Timeout, Priority, PatternPid,
case validate(RequestInfoF, RequestF,
RequestInfo, Request) of
true ->
case cloudi_service:send_async_active(Dispatcher, Name,
RequestInfo, Request,
Timeout, Priority,
PatternPid) of
{ok, TransId} ->
RequestState = #request{name = Name,
request_info = RequestInfo,
request = Request,
timeout = Timeout,
priority = Priority,
pattern_pid = PatternPid},
case send_async(Dispatcher, Name, RequestInfo, Request,
Timeout, Priority, PatternPid) of
{ok, TransId, RequestState} ->
{ok, State#cloudi_queue{requests = maps:put(TransId,
RequestState,
Requests)}};
Expand Down Expand Up @@ -458,6 +450,33 @@ validate(RInfoF, undefined, RInfo, _) ->
validate(RInfoF, RF, RInfo, R) ->
validate_f_return(RInfoF(RInfo)) andalso validate_f_return(RF(RInfo, R)).

send_async(Dispatcher, Name, RequestInfo, Request,
Timeout, Priority, undefined) ->
case cloudi_service:get_pid(Dispatcher, Name, Timeout) of
{ok, PatternPid} ->
send_async(Dispatcher, Name, RequestInfo, Request,
Timeout, Priority, PatternPid);
{error, _} = Error ->
Error
end;
send_async(Dispatcher, Name, RequestInfo, Request,
Timeout, Priority, PatternPid) ->
case cloudi_service:send_async_active(Dispatcher, Name,
RequestInfo, Request,
Timeout, Priority,
PatternPid) of
{ok, TransId} ->
RequestState = #request{name = Name,
request_info = RequestInfo,
request = Request,
timeout = Timeout,
priority = Priority,
pattern_pid = PatternPid},
{ok, TransId, RequestState};
{error, _} = Error ->
Error
end.

failure(false, _, _, FailureList) ->
FailureList;
failure(true, MaxCount, MaxPeriod, FailureList) ->
Expand Down

0 comments on commit f1e73be

Please sign in to comment.