-
Notifications
You must be signed in to change notification settings - Fork 5.9k
/
VideoStreamListener.java
executable file
·207 lines (162 loc) · 7.25 KB
/
VideoStreamListener.java
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
/**
* BigBlueButton open source conferencing system - http://www.bigbluebutton.org/
*
* Copyright (c) 2012 BigBlueButton Inc. and by respective authors (see below).
*
* This program is free software; you can redistribute it and/or modify it under the
* terms of the GNU Lesser General Public License as published by the Free Software
* Foundation; either version 3.0 of the License, or (at your option) any later
* version.
*
* BigBlueButton is distributed in the hope that it will be useful, but WITHOUT ANY
* WARRANTY; without even the implied warranty of MERCHANTABILITY or FITNESS FOR A
* PARTICULAR PURPOSE. See the GNU Lesser General Public License for more details.
*
* You should have received a copy of the GNU Lesser General Public License along
* with BigBlueButton; if not, see <http://www.gnu.org/licenses/>.
*
*/
package org.bigbluebutton.app.video;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;
import org.apache.mina.core.buffer.IoBuffer;
import org.red5.server.api.scheduling.IScheduledJob;
import org.red5.server.api.scheduling.ISchedulingService;
import org.red5.server.api.scope.IScope;
import org.red5.server.api.stream.IBroadcastStream;
import org.red5.server.api.stream.IStreamListener;
import org.red5.server.api.stream.IStreamPacket;
import org.red5.server.net.rtmp.event.VideoData;
import org.red5.server.scheduling.QuartzSchedulingService;
import org.slf4j.Logger;
import org.red5.logging.Red5LoggerFactory;
import com.google.gson.Gson;
/**
* Class to listen for the first video packet of the webcam.
* We need to listen for the first packet and send a startWebcamEvent.
* The reason is that when starting the webcam, sometimes Flash Player
* needs to prompt the user for permission to access the webcam. However,
* while waiting for the user to click OK to the prompt, Red5 has already
* called the startBroadcast method which we take as the start of the recording.
* When the user finally clicks OK, the packets then start to flow through.
* This introduces a delay of when we assume the start of the recording and
* the webcam actually publishes video packets. When we do the ingest and
* processing of the video and multiplex the audio, the video and audio will
* be un-synched by at least this amount of delay.
* @author Richard Alam
*
*/
public class VideoStreamListener implements IStreamListener {
private static final Logger log = Red5LoggerFactory.getLogger(VideoStreamListener.class, "video");
private EventRecordingService recordingService;
private volatile boolean firstPacketReceived = false;
// Maximum time between video packets
private int videoTimeout = 10000;
private long firstPacketTime = 0L;
private long packetCount = 0L;
// Last time video was received, not video timestamp
private long lastVideoTime;
private String userId;
// Stream being observed
private String streamId;
// if this stream is recorded or not
private boolean record;
// Scheduler
private QuartzSchedulingService scheduler;
// Event queue worker job name
private String timeoutJobName;
private volatile boolean publishing = false;
private volatile boolean streamPaused = false;
private String meetingId;
public VideoStreamListener(String meetingId, String streamId, Boolean record,
String userId, int packetTimeout,
QuartzSchedulingService scheduler) {
this.meetingId = meetingId;
this.streamId = streamId;
this.record = record;
this.videoTimeout = packetTimeout;
this.userId = userId;
this.scheduler = scheduler;
}
private Long genTimestamp() {
return TimeUnit.NANOSECONDS.toMillis(System.nanoTime());
}
@Override
public void packetReceived(IBroadcastStream stream, IStreamPacket packet) {
IoBuffer buf = packet.getData();
if (buf != null)
buf.rewind();
if (buf == null || buf.remaining() == 0){
return;
}
if (packet instanceof VideoData) {
// keep track of last time video was received
lastVideoTime = System.currentTimeMillis();
packetCount++;
if (! firstPacketReceived) {
firstPacketReceived = true;
publishing = true;
firstPacketTime = lastVideoTime;
// start the worker to monitor if we are still receiving video packets
timeoutJobName = scheduler.addScheduledJob(videoTimeout, new TimeoutJob());
if (record) {
Map<String, String> event = new HashMap<String, String>();
event.put("module", "WEBCAM");
event.put("timestamp", genTimestamp().toString());
event.put("meetingId", meetingId);
event.put("stream", stream.getPublishedName());
event.put("eventName", "StartWebcamShareEvent");
recordingService.record(meetingId, event);
}
}
if (streamPaused) {
streamPaused = false;
long now = System.currentTimeMillis();
long numSeconds = (now - lastVideoTime)/1000;
Map<String, Object> logData = new HashMap<String, Object>();
logData.put("meetingId", meetingId);
logData.put("userId", userId);
logData.put("stream", stream.getPublishedName());
logData.put("packetCount", packetCount);
logData.put("publishing", publishing);
logData.put("pausedFor (sec)", numSeconds);
Gson gson = new Gson();
String logStr = gson.toJson(logData);
log.warn("Video stream restarted. data={}", logStr );
}
}
}
public void setEventRecordingService(EventRecordingService s) {
recordingService = s;
}
public void streamStopped() {
this.publishing = false;
}
private class TimeoutJob implements IScheduledJob {
private boolean streamStopped = false;
public void execute(ISchedulingService service) {
Map<String, Object> logData = new HashMap<String, Object>();
logData.put("meetingId", meetingId);
logData.put("userId", userId);
logData.put("stream", streamId);
logData.put("packetCount", packetCount);
logData.put("publishing", publishing);
Gson gson = new Gson();
long now = System.currentTimeMillis();
if ((now - lastVideoTime) > videoTimeout && !streamPaused) {
streamPaused = true;
long numSeconds = (now - lastVideoTime)/1000;
logData.put("lastPacketTime (sec)", numSeconds);
String logStr = gson.toJson(logData);
log.warn("Video packet timeout. data={}", logStr );
}
String logStr = gson.toJson(logData);
if (!publishing) {
log.warn("Removing scheduled job. data={}", logStr );
// remove the scheduled job
scheduler.removeScheduledJob(timeoutJobName);
}
}
}
}