Skip to content

Commit

Permalink
fix: delayed subscriber offer handling (#666)
Browse files Browse the repository at this point in the history
* fix for a failed subscriber offer handling when rtc manager is not initilaized yet

* fixed buffored stream

* Update packages/stream_video/lib/src/call/session/call_session.dart

---------

Co-authored-by: Deven Joshi <deven9852@gmail.com>
  • Loading branch information
Brazol and deven98 committed May 8, 2024
1 parent 9d3f46d commit 7cfea49
Showing 1 changed file with 30 additions and 7 deletions.
37 changes: 30 additions & 7 deletions packages/stream_video/lib/src/call/session/call_session.dart
Expand Up @@ -4,6 +4,7 @@ import 'dart:io';

import 'package:collection/collection.dart';
import 'package:flutter_webrtc/flutter_webrtc.dart' as rtc;
import 'package:rxdart/rxdart.dart';
import 'package:webrtc_interface/webrtc_interface.dart';

import '../../../protobuf/video/sfu/event/events.pb.dart' as sfu_events;
Expand Down Expand Up @@ -82,7 +83,9 @@ class CallSession extends Disposable {
final OnFullReconnectNeeded onFullReconnectNeeded;

RtcManager? rtcManager;
StreamSubscription<SfuEvent>? eventsSubscription;

BehaviorSubject<RtcManager>? _rtcManagerSubject;
StreamSubscription<SfuEvent>? _eventsSubscription;
StreamSubscription<Map<String, dynamic>>? _statsSubscription;
Timer? _peerConnectionCheckTimer;

Expand All @@ -106,8 +109,25 @@ class CallSession extends Disposable {
Future<Result<None>> start() async {
try {
_logger.d(() => '[start] no args');
await eventsSubscription?.cancel();
eventsSubscription = sfuWS.events.listen(_onSfuEvent);

await _eventsSubscription?.cancel();
await _rtcManagerSubject?.close();

_rtcManagerSubject = BehaviorSubject();

// Buffer sfu events until rtc manager is set
final bufferedStream =
sfuWS.events.asStream().buffer(_rtcManagerSubject!);

// Handle buffered events and then listen to sfu events as normal
_eventsSubscription = bufferedStream.asyncExpand((bufferedEvents) async* {
for (final event in bufferedEvents) {
await _onSfuEvent(event);
}

yield* sfuWS.events.asStream();
}).listen(_onSfuEvent);

final wsResult = await sfuWS.connect();
if (wsResult.isFailure) {
_logger.e(() => '[start] ws connect failed: $wsResult');
Expand Down Expand Up @@ -150,6 +170,8 @@ class CallSession extends Disposable {
..onRemoteTrackReceived = _onRemoteTrackReceived
..onStatsReceived = _onStatsReceived;

_rtcManagerSubject!.add(rtcManager!);

await _statsSubscription?.cancel();
_statsSubscription = rtcManager?.statsStream.listen((rawStats) {
sfuClient.sendStats(
Expand Down Expand Up @@ -183,8 +205,8 @@ class CallSession extends Disposable {
final genericSdp = await RtcManager.getGenericSdp();
_logger.v(() => '[fastReconnect] genericSdp.len: ${genericSdp.length}');

await eventsSubscription?.cancel();
eventsSubscription = sfuWS.events.listen(_onSfuEvent);
await _eventsSubscription?.cancel();
_eventsSubscription = sfuWS.events.listen(_onSfuEvent);
await sfuWS.connect();

sfuWS.send(
Expand Down Expand Up @@ -234,8 +256,8 @@ class CallSession extends Disposable {
_logger.d(() => '[dispose] no args');
await _stats.close();
await _saBuffer.cancel();
await eventsSubscription?.cancel();
eventsSubscription = null;
await _eventsSubscription?.cancel();
_eventsSubscription = null;
await _statsSubscription?.cancel();
_statsSubscription = null;
await sfuWS.disconnect();
Expand Down Expand Up @@ -395,6 +417,7 @@ class CallSession extends Disposable {
Future<void> _onSubscriberOffer(SfuSubscriberOfferEvent event) async {
final offerSdp = event.sdp;
_logger.i(() => '[onSubscriberOffer] event: $event');

final answerSdp = await rtcManager?.onSubscriberOffer(offerSdp);
if (answerSdp == null) {
_logger.w(() => '[onSubscriberOffer] rejected (answerSdp is null)');
Expand Down

0 comments on commit 7cfea49

Please sign in to comment.