Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

fix: delayed subscriber offer handling #666

Merged
merged 4 commits into from
May 8, 2024
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
37 changes: 30 additions & 7 deletions packages/stream_video/lib/src/call/session/call_session.dart
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@ import 'dart:io';

import 'package:collection/collection.dart';
import 'package:flutter_webrtc/flutter_webrtc.dart' as rtc;
import 'package:rxdart/rxdart.dart';
import 'package:webrtc_interface/webrtc_interface.dart';

import '../../../protobuf/video/sfu/event/events.pb.dart' as sfu_events;
Expand Down Expand Up @@ -82,7 +83,9 @@ class CallSession extends Disposable {
final OnFullReconnectNeeded onFullReconnectNeeded;

RtcManager? rtcManager;
StreamSubscription<SfuEvent>? eventsSubscription;

BehaviorSubject<RtcManager>? _rtcManagerSubject;
StreamSubscription<SfuEvent>? _eventsSubscription;
StreamSubscription<Map<String, dynamic>>? _statsSubscription;
Timer? _peerConnectionCheckTimer;

Expand All @@ -106,8 +109,25 @@ class CallSession extends Disposable {
Future<Result<None>> start() async {
try {
_logger.d(() => '[start] no args');
await eventsSubscription?.cancel();
eventsSubscription = sfuWS.events.listen(_onSfuEvent);

await _eventsSubscription?.cancel();
await _rtcManagerSubject?.close();

_rtcManagerSubject = BehaviorSubject();

// Buffer sfu events until rtc manager is set
final bufferedStream =
sfuWS.events.asStream().buffer(_rtcManagerSubject!);

// Handle buffered events and then listen to sfu events as normal
_eventsSubscription = bufferedStream.asyncExpand((bufferedEvents) async* {
for (final event in bufferedEvents) {
await _onSfuEvent(event);
}

yield* sfuWS.events.asStream();
}).listen(_onSfuEvent);

final wsResult = await sfuWS.connect();
if (wsResult.isFailure) {
_logger.e(() => '[start] ws connect failed: $wsResult');
Expand Down Expand Up @@ -150,6 +170,8 @@ class CallSession extends Disposable {
..onRemoteTrackReceived = _onRemoteTrackReceived
..onStatsReceived = _onStatsReceived;

_rtcManagerSubject!.add(rtcManager!);

await _statsSubscription?.cancel();
_statsSubscription = rtcManager?.statsStream.listen((rawStats) {
sfuClient.sendStats(
Expand Down Expand Up @@ -183,8 +205,8 @@ class CallSession extends Disposable {
final genericSdp = await RtcManager.getGenericSdp();
_logger.v(() => '[fastReconnect] genericSdp.len: ${genericSdp.length}');

await eventsSubscription?.cancel();
eventsSubscription = sfuWS.events.listen(_onSfuEvent);
await _eventsSubscription?.cancel();
_eventsSubscription = sfuWS.events.listen(_onSfuEvent);
await sfuWS.connect();

sfuWS.send(
Expand Down Expand Up @@ -234,8 +256,8 @@ class CallSession extends Disposable {
_logger.d(() => '[dispose] no args');
await _stats.close();
await _saBuffer.cancel();
await eventsSubscription?.cancel();
eventsSubscription = null;
await _eventsSubscription?.cancel();
_eventsSubscription = null;
await _statsSubscription?.cancel();
_statsSubscription = null;
await sfuWS.disconnect();
Expand Down Expand Up @@ -395,6 +417,7 @@ class CallSession extends Disposable {
Future<void> _onSubscriberOffer(SfuSubscriberOfferEvent event) async {
final offerSdp = event.sdp;
_logger.i(() => '[onSubscriberOffer] event: $event');

final answerSdp = await rtcManager?.onSubscriberOffer(offerSdp);
if (answerSdp == null) {
_logger.w(() => '[onSubscriberOffer] rejected (answerSdp is null)');
Expand Down
Loading