Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
Browse files

WHISTLE-1505: use a timer to shutdown media files, instead of the tim…

…eout return of the gen_server
  • Loading branch information...
commit 9fef4cd21a32e4f70ea5a22c116114ba6a930bcd 1 parent abd7345
@jamesaimonetti jamesaimonetti authored
Showing with 39 additions and 13 deletions.
  1. +39 −13 whistle_apps/apps/media_mgr/src/media_file.erl
View
52 whistle_apps/apps/media_mgr/src/media_file.erl
@@ -23,6 +23,7 @@
-include("media.hrl").
-define(TIMEOUT_LIFETIME, 600000).
+-define(TIMEOUT_MESSAGE, {'$media_file', message_timeout}).
-record(state, {
db :: ne_binary()
@@ -33,6 +34,7 @@
,stream_ref :: reference()
,status :: 'streaming' | 'ready'
,reqs :: [{pid(), reference()},...] | []
+ ,timer_ref :: reference()
}).
%%%===================================================================
@@ -46,7 +48,8 @@
%% @spec start_link() -> {ok, Pid} | ignore | {error, Error}
%% @end
%%--------------------------------------------------------------------
--spec start_link/4 :: (ne_binary(), ne_binary(), ne_binary(), wh_json:json_object()) -> {'ok', pid()}.
+-spec start_link/4 :: (ne_binary(), ne_binary(), ne_binary(), wh_json:json_object()) ->
+ startlink_ret().
start_link(Id, Doc, Attach, Meta) ->
gen_server:start_link(?MODULE, [Id, Doc, Attach, Meta], []).
@@ -77,7 +80,7 @@ init([Id|Rest]) ->
init(wh_util:format_account_id(Id, encoded), Rest).
init(Db, [Doc, Attach, Meta]) ->
- put(callid, ?LOG_SYSTEM_ID),
+ put(callid, <<Db/binary, "/", Doc/binary, "/", Attach/binary>>),
lager:debug("streaming ~s/~s/~s", [Db, Doc, Attach]),
{ok, Ref} = couch_mgr:stream_attachment(Db, Doc, Attach),
@@ -92,8 +95,9 @@ init(Db, [Doc, Attach, Meta]) ->
,status=streaming
,contents = <<>>
,reqs = [] %% buffer requests until file has completed streaming
+ ,timer_ref=start_timer()
}
- ,?TIMEOUT_LIFETIME}.
+ }.
%%--------------------------------------------------------------------
%% @private
@@ -109,15 +113,22 @@ init(Db, [Doc, Attach, Meta]) ->
%% {stop, Reason, State}
%% @end
%%--------------------------------------------------------------------
-handle_call(single, _From, #state{meta=Meta, contents=Contents, status=ready}=State) ->
+handle_call(single, _From, #state{meta=Meta
+ ,contents=Contents
+ ,status=ready
+ ,timer_ref=Ref
+ }=State) ->
%% doesn't currently check whether we're still streaming in from the DB
lager:debug("returning media contents"),
- {reply, {Meta, Contents}, State, ?TIMEOUT_LIFETIME};
-handle_call(single, From, #state{reqs=Reqs, status=streaming}=State) ->
+ _ = stop_timer(Ref),
+ {reply, {Meta, Contents}, State#state{timer_ref=start_timer()}};
+handle_call(single, From, #state{reqs=Reqs
+ ,status=streaming
+ }=State) ->
lager:debug("file not ready for ~p, queueing", [From]),
{noreply, State#state{reqs=[From | Reqs]}};
handle_call(continuous, _From, #state{}=State) ->
- {reply, ok, State, ?TIMEOUT_LIFETIME}.
+ {reply, ok, State}.
%%--------------------------------------------------------------------
%% @private
@@ -142,18 +153,28 @@ handle_cast(_Msg, State) ->
%% {stop, Reason, State}
%% @end
%%--------------------------------------------------------------------
-handle_info(timeout, State) ->
+handle_info({timeout, TRef, ?TIMEOUT_MESSAGE}, #state{timer_ref=TRef}=State) ->
lager:debug("timeout expired, going down"),
{stop, normal, State};
-handle_info({Ref, done}, #state{stream_ref=Ref, reqs=Reqs, contents=Contents, meta=Meta}=State) ->
+handle_info({Ref, done}, #state{stream_ref=Ref
+ ,reqs=Reqs
+ ,contents=Contents
+ ,meta=Meta
+ ,timer_ref=TRef
+ }=State) ->
+ _ = stop_timer(TRef),
Res = {Meta, Contents},
_ = [gen_server:reply(From, Res) || From <- Reqs],
lager:debug("finished receiving file contents"),
- {noreply, State#state{status=ready}, hibernate};
-handle_info({Ref, {ok, Bin}}, #state{stream_ref=Ref, contents=Contents}=State) ->
+ {noreply, State#state{status=ready
+ ,timer_ref=start_timer()
+ }};
+handle_info({Ref, {ok, Bin}}, #state{stream_ref=Ref
+ ,contents=Contents
+ }=State) ->
lager:debug("recv ~b bytes", [byte_size(Bin)]),
- {noreply, State#state{contents = <<Contents/binary, Bin/binary>>}};
+ {noreply, State#state{contents = <<Contents/binary, Bin/binary>>}, hibernate};
handle_info({Ref, {error, E}}, #state{stream_ref=Ref}=State) ->
lager:debug("recv stream error: ~p", [E]),
{stop, normal, State};
@@ -173,7 +194,7 @@ handle_info(_Info, State) ->
%% @end
%%--------------------------------------------------------------------
terminate(_Reason, _State) ->
- ok.
+ lager:debug("media file going down: ~p", [_Reason]).
%%--------------------------------------------------------------------
%% @private
@@ -189,3 +210,8 @@ code_change(_OldVsn, State, _Extra) ->
%%%===================================================================
%%% Internal functions
%%%===================================================================
+start_timer() ->
+ erlang:start_timer(?TIMEOUT_LIFETIME, self(), ?TIMEOUT_MESSAGE).
+stop_timer(Ref) when is_reference(Ref) ->
+ erlang:cancel_timer(Ref);
+stop_timer(_) -> ok.
Please sign in to comment.
Something went wrong with that request. Please try again.