-
Notifications
You must be signed in to change notification settings - Fork 0
/
scribe_endpoint.ex
152 lines (125 loc) · 3.89 KB
/
scribe_endpoint.ex
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
defmodule Membrane.RTC.Engine.Endpoint.Scribe do
@moduledoc """
Documentation for `Scribe`.
"""
use Membrane.Bin
require Logger
alias Membrane.RTC.Engine
alias Membrane.RTC.Engine.Track
alias Membrane.RTC.Engine.Endpoint.WebRTC.TrackReceiver
alias Membrane.RTC.Engine.Endpoint.Scribe.Filter
alias Membrane.RawAudio
def_options rtc_engine: [
spec: pid(),
description: "Pid of the parent Engine"
]
def_input_pad :input,
availability: :on_request,
accepted_format: Membrane.RTP
@impl true
def handle_init(_ctx, opts) do
state = %{
rtc_engine: opts.rtc_engine,
stream_format: %RawAudio{
channels: 1,
sample_format: :f32le,
sample_rate: 16_000
},
tracks: %{}
}
spec = [
child(:scribe_filter, Filter)
|> child(:sink, %Membrane.Debug.Sink{})
]
{[notify_parent: :ready, spec: spec], state}
end
@impl true
def handle_pad_added(Pad.ref(:input, track_id) = pad, _ctx, state) do
track = Map.fetch!(state.tracks, track_id)
spec = [
bin_input(pad)
|> child({:track_receiver, track_id}, %TrackReceiver{
track: track,
initial_target_variant: :high
})
|> child({:depayloader, track_id}, Track.get_depayloader(track))
|> child({:opus_decoder, track_id}, Membrane.Opus.Decoder)
# Bumblebee Audio requires 16kHz f32le format
|> child(
{:converter, track_id},
%Membrane.FFmpeg.SWResample.Converter{
output_stream_format: state.stream_format
}
)
# Adds timestamps to the audio frames
|> child(
{:parser, track_id},
%Membrane.RawAudioParser{
stream_format: state.stream_format,
overwrite_pts?: true
}
)
|> via_in(Pad.ref(:input, track_id))
|> get_child(:scribe_filter)
]
{[spec: spec], state}
end
@impl true
def handle_pad_removed(Pad.ref(:input, track_id), _ctx, state) do
state = %{state | tracks: Map.delete(state.tracks, track_id)}
children_to_remove =
[:track_receiver, :depayloader, :opus_decoder, :converter, :parser] |> Enum.map(&{&1, track_id})
actions = [remove_children: children_to_remove]
if map_size(state.tracks) == 0 do
{actions ++ [notify_parent: :finished], state}
else
{actions, state}
end
end
@impl true
def handle_parent_notification({:new_tracks, tracks}, ctx, state) do
{:endpoint, endpoint_id} = ctx.name
state =
tracks
|> Enum.filter(fn track -> track.type == :audio end)
|> Enum.reduce(state, fn track, state ->
case Engine.subscribe(state.rtc_engine, endpoint_id, track.id) do
:ok ->
put_in(state, [:tracks, track.id], track)
{:error, :invalid_track_id} ->
Logger.info("""
Couldn't subscribe to the track: #{inspect(track.id)}. No such track.
It had to be removed just after publishing it. Ignoring.
""")
state
{:error, reason} ->
raise "Couldn't subscribe to the track: #{inspect(track.id)}. Reason: #{inspect(reason)}"
end
end)
{[], state}
end
@impl true
def handle_parent_notification({:ready, _other_endpoints}, _ctx, state) do
{[], state}
end
@impl true
def handle_parent_notification({:new_endpoint, _endpoint}, _ctx, state) do
{[], state}
end
@impl true
def handle_parent_notification({:endpoint_removed, _endpoint_id}, _ctx, state) do
{[], state}
end
@impl true
def handle_parent_notification({:remove_tracks, _list}, _ctx, state) do
{[], state}
end
def handle_parent_notification(_notification, _ctx, state) do
# Logger.warning("Scribe Unhandled parent notification: #{inspect(notification)}")
{[], state}
end
@impl true
def handle_element_end_of_stream(_other, _pad, _ctx, state) do
{[], state}
end
end