Skip to content

Commit

Permalink
v1.51: support mp3 as well as mpeg content types, rework how wavs are…
Browse files Browse the repository at this point in the history
… served as well
  • Loading branch information
James Aimonetti committed Mar 23, 2012
1 parent ca0a3c0 commit cffcef6
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 37 deletions.
21 changes: 13 additions & 8 deletions whistle_apps/apps/media_mgr/src/media_file.erl
Expand Up @@ -46,8 +46,8 @@
%% @spec start_link() -> {ok, Pid} | ignore | {error, Error}
%% @end
%%--------------------------------------------------------------------
start_link(Db, Doc, Attach, Meta) ->
gen_server:start_link(?MODULE, [Db, Doc, Attach, Meta], []).
start_link(Id, Doc, Attach, Meta) ->
gen_server:start_link(?MODULE, [Id, Doc, Attach, Meta], []).

single(Srv) ->
gen_server:call(Srv, single).
Expand All @@ -70,7 +70,11 @@ continuous(Srv) ->
%% {stop, Reason}
%% @end
%%--------------------------------------------------------------------
init([Db, Doc, Attach, Meta]) ->
init([Id, Doc, Attach, Meta]) ->
put(callid, ?LOG_SYSTEM_ID),
Db = wh_util:format_account_id(Id, encoded),

lager:debug("streaming ~s/~s/~s", [Db, Doc, Attach]),
{ok, Ref} = couch_mgr:stream_attachment(Db, Doc, Attach),
{ok
,#state{
Expand Down Expand Up @@ -101,10 +105,11 @@ init([Db, Doc, Attach, Meta]) ->
%%--------------------------------------------------------------------
handle_call(single, _From, #state{meta=Meta, contents=Contents, status=ready}=State) ->
%% doesn't currently check whether we're still streaming in from the DB
lager:debug("returning media contents"),
{reply, {Meta, Contents}, State, ?TIMEOUT_LIFETIME};
handle_call(single, From, #state{reqs=Reqs, status=streaming}=State) ->
lager:debug("file not ready for ~p", [From]),
{noreply, State#state{reqs=[From, Reqs]}};
lager:debug("file not ready for ~p, queueing", [From]),
{noreply, State#state{reqs=[From | Reqs]}};
handle_call(continuous, _From, #state{}=State) ->
{reply, ok, State, ?TIMEOUT_LIFETIME}.

Expand Down Expand Up @@ -143,9 +148,9 @@ handle_info({Ref, done}, #state{stream_ref=Ref, reqs=Reqs, contents=Contents, me
handle_info({Ref, {ok, Bin}}, #state{stream_ref=Ref, contents=Contents}=State) ->
lager:debug("recv ~b bytes", [byte_size(Bin)]),
{noreply, State#state{contents = <<Contents/binary, Bin/binary>>}};
handle_info({Ref, {error, _E}}, #state{stream_ref=Ref}=State) ->
lager:debug("recv stream error: ~p", [_E]),
{noreply, State, hibernate};
handle_info({Ref, {error, E}}, #state{stream_ref=Ref}=State) ->
lager:debug("recv stream error: ~p", [E]),
{stop, normal, State};
handle_info(_Info, State) ->
lager:debug("unhandled message: ~p", [_Info]),
{noreply, State, hibernate}.
Expand Down
6 changes: 3 additions & 3 deletions whistle_apps/apps/media_mgr/src/media_files_sup.erl
Expand Up @@ -20,7 +20,7 @@

-define(SERVER, ?MODULE).

-define(CHILD(Name, Id, Doc, Attachment, Meta), {Name, {media_file, start_link, [Id, Doc, Attachment, Meta]}, permanent, 5000, worker, [media_file]}).
-define(CHILD(Name, Id, Doc, Attachment, Meta), {Name, {media_file, start_link, [Id, Doc, Attachment, Meta]}, temporary, 5000, worker, [media_file]}).

%%%===================================================================
%%% API functions
Expand All @@ -37,14 +37,14 @@ start_link() ->
supervisor:start_link({local, ?SERVER}, ?MODULE, []).

find_file_server(Id, Doc, Attachment) ->
Name = wh_util:to_hex(list_to_binary([Id,Doc,Attachment])),
Name = [Id,Doc,Attachment],
case [P||{N,P,_,_} <- supervisor:which_children(?MODULE), N =:= Name] of
[] -> {error, no_file_server};
[P] -> {ok, P}
end.

find_file_server(Id, Doc, Attachment, Meta) ->
Name = wh_util:to_hex(list_to_binary([Id,Doc,Attachment])),
Name = [Id,Doc,Attachment],
case supervisor:start_child(?MODULE, ?CHILD(Name, Id, Doc, Attachment, Meta)) of
{ok, _Pid}=OK -> OK;
{error, {already_started, Pid}} -> {ok, Pid};
Expand Down
79 changes: 53 additions & 26 deletions whistle_apps/apps/media_mgr/src/media_single.erl
Expand Up @@ -13,37 +13,50 @@
-include("media.hrl").

init({_Transport, _Proto}, Req0, _Opts) ->
put(callid, ?LOG_SYSTEM_ID),
put(callid, wh_util:rand_hex_binary(16)),

{[Id, Doc, Attachment], Req1} = cowboy_http_req:path_info(Req0),

lager:debug("fetch ~s/~s/~s", [Id, Doc, Attachment]),
lager:debug("fetching ~s/~s/~s", [Id, Doc, Attachment]),

{ok, Pid} = media_files_sup:find_file_server(Id, Doc, Attachment),
lager:debug("found media file server: ~p", [Pid]),
{ok, Req1, media_file:single(Pid)}.
case media_files_sup:find_file_server(Id, Doc, Attachment) of
{ok, Pid} ->
{ok, Req1, media_file:single(Pid)};
{error, _} ->
lager:debug("missing file server"),
{ok, Req2} = cowboy_http_req:reply(404, Req1),
{shutdown, Req2, ok}
end.

handle(Req0, {Meta, Bin}) ->
Size = byte_size(Bin),
ChunkSize = case Size of S when S > ?CHUNKSIZE -> ?CHUNKSIZE; S -> S end,

lager:debug("size: ~b chunk: ~b", [Size, ChunkSize]),

ContentType = wh_json:get_value(<<"content_type">>, Meta),
MediaName = wh_json:get_value(<<"media_name">>, Meta, <<>>),
Url = wh_json:get_value(<<"url">>, Meta, <<>>),

Pad = (ContentType =:= <<"audio/mpeg">>) orelse
(ContentType =:= <<"audio/mp3">>),

{ok, Req1} = cowboy_http_req:set_resp_body_fun(Size
,fun() ->
stream(Req0, ChunkSize, Bin, get_shout_header(MediaName, Url), Pad)
end
,Req0),
{ok, Req2} = case ContentType of
<<"audio/x-wav">> ->
{ok, Req1} = set_resp_headers(Req0, ContentType),

cowboy_http_req:set_resp_body_fun(Size
,fun() ->
stream(Req1, ChunkSize, Bin, undefined, false)
end
,Req1);
CT when CT =:= <<"audio/mpeg">> orelse CT =:= <<"audio/mp3">> ->
{ok, Req1} = set_resp_headers(Req0, ChunkSize, ContentType, MediaName, Url),

cowboy_http_req:set_resp_body_fun(Size
,fun() ->
stream(Req1, ChunkSize, Bin, get_shout_header(MediaName, Url), true)
end
,Req1)
end,

{ok, Req2} = set_resp_headers(Req1, ChunkSize, ContentType, MediaName, Url),
{ok, Req3} = cowboy_http_req:reply(200, Req2),

{ok, Req3, ok}.

terminate(_Req, _State) ->
Expand All @@ -52,38 +65,42 @@ terminate(_Req, _State) ->
stream(Req, ChunkSize, Bin, Header, ToPad) ->
{ok, Transport, Socket} = cowboy_http_req:transport(Req),

lager:debug("streaming in ~b bytes (use padding: ~p)", [ChunkSize, ToPad]),
send_chunks(Transport, Socket, ChunkSize, Bin, {0, Header}, ToPad).
send_chunks(Transport, Socket, ChunkSize, Bin, Header, ToPad).

send_chunks(_, _, _, <<>>, _, _) ->
lager:debug("nothing to send");
send_chunks(Transport, Socket, ChunkSize, Bin, Header, ToPad) ->
try erlang:split_binary(Bin, ChunkSize) of
{Send, Rest} ->
lager:debug("sending another chunk (~b left, hdr: ~b)", [byte_size(Rest), element(1, Header)]),
write_chunk(Transport, Socket, Send, Header),
send_chunks(Transport, Socket, ChunkSize, Rest, bump(Header), ToPad)
catch
_:_ ->
lager:debug("sending last bit (~b to send, hdr: ~b)", [byte_size(Bin), element(1, Header)]),
write_data(Transport, Socket, ChunkSize, Bin, Header, ToPad)
end.

%% When we know we have the whole chunk, just send it + header
write_chunk(Transport, Socket, Bin, undefined) ->
lager:debug("writing ~b bytes", [byte_size(Bin)]),
Transport:send(Socket, Bin);
write_chunk(Transport, Socket, Bin, Header) ->
lager:debug("writing ~b bytes", [byte_size(Bin)+byte_size(the_header(Header))]),
Transport:send(Socket, [Bin, the_header(Header)]).

%% when we have less than the chunk size to send, possibly pad it
write_data(Transport, Socket, ChunkSize, Bin, Header, true) ->
Size = byte_size(Bin),
H = the_header(Header),
PaddingAmount = ChunkSize-Size-byte_size(H),

lager:debug("padding ~b bytes", [PaddingAmount]),
Padding = binary:copy(<<0>>, ChunkSize-Size-byte_size(H)),

Padding = binary:copy(<<0>>, PaddingAmount),
lager:debug("writing ~b bytes", [Size+byte_size(H)+byte_size(Padding)]),
Transport:send(Socket, [Bin, H, Padding]);
write_data(Transport, Socket, _, Bin, undefined, _) ->
lager:debug("writing ~b byptes", [byte_size(Bin)]),
Transport:send(Socket, Bin);
write_data(Transport, Socket, _, Bin, Header, false) ->
lager:debug("writing ~b bytes", [byte_size(Bin)+byte_size(the_header(Header))]),
Transport:send(Socket, [Bin, the_header(Header)]).

bump(undefined) -> undefined;
Expand All @@ -97,13 +114,23 @@ the_header({K, H}) ->
_ -> <<0>>
end.

set_resp_headers(Req, <<"audio/x-wav">>) ->
lists:foldl(fun({K,V}, {ok, Req0Acc}) ->
cowboy_http_req:set_resp_header(K, V, Req0Acc)
end, {ok, Req}, [
{<<"Server">>, list_to_binary([?APP_NAME, "/", ?APP_VERSION])}
,{<<"Content-Type">>, <<"audio/x-wav">>}
]
).

-spec set_resp_headers/5 :: (#http_req{}, pos_integer(), ne_binary(), ne_binary(), ne_binary()) -> {'ok', #http_req{}}.
set_resp_headers(Req, ChunkSize, ContentType, MediaName, Url) ->
lists:foldl(fun({K,V}, {ok, Req0Acc}) ->
cowboy_http_req:set_resp_header(K, V, Req0Acc)
end, {ok, Req}, [
{<<"icy-notice1">>, <<"MediaMgr<BR>">>}
{<<"Server">>, list_to_binary([?APP_NAME, "/", ?APP_VERSION])}
,{<<"Content-Type">>, <<"audio/mpeg">>}
,{<<"icy-notice1">>, <<"MediaMgr">>}
,{<<"icy-name">>, MediaName}
,{<<"icy-genre">>, <<"Whistle Media">>}
,{<<"icy-url">>, Url}
Expand All @@ -114,12 +141,12 @@ set_resp_headers(Req, ChunkSize, ContentType, MediaName, Url) ->
]
).

-spec get_shout_header/2 :: (ne_binary(), ne_binary()) -> ne_binary().
-spec get_shout_header/2 :: (ne_binary(), ne_binary()) -> {0, ne_binary()}.
get_shout_header(MediaName, Url) ->
Bin = list_to_binary(["StreamTitle='",MediaName
,"';StreamUrl='",Url,"';"
]),
Nblocks = ((byte_size(Bin) - 1) div 16) + 1,
NPad = Nblocks*16 - byte_size(Bin),
Extra = lists:duplicate(NPad, 0),
list_to_binary([Nblocks, Bin, Extra]).
{0, list_to_binary([Nblocks, Bin, Extra])}.

0 comments on commit cffcef6

Please sign in to comment.