Skip to content

feat: Delegate SetDescription process to PeerConnection class #749

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Merged
merged 7 commits into from
Aug 30, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
288 changes: 251 additions & 37 deletions com.unity.renderstreaming/Runtime/Scripts/PeerConnection.cs
Original file line number Diff line number Diff line change
@@ -1,52 +1,104 @@
using System;
using System.Collections;
using System.Collections.Generic;
using Unity.WebRTC;
using UnityEngine;
using UnityEngine.Assertions;

namespace Unity.RenderStreaming
{
internal class PeerConnection : IDisposable
{
public readonly RTCPeerConnection peer;
public readonly bool polite;
public delegate void OnConnectEvent();

public bool makingOffer;
public bool ignoreOffer;
public bool srdAnswerPending;
public bool makingAnswer;
public delegate void OnDisconnectEvent();

bool disposed = false;
public delegate void OnDataChannelEvent(RTCDataChannel channel);

public delegate void OnTrackEvent(RTCTrackEvent trackEvent);

public delegate void SendOfferEvent(RTCSessionDescription description);

public delegate void SendAnswerEvent(RTCSessionDescription description);

public delegate void SendCandidateEvent(RTCIceCandidate candidate);

public OnConnectEvent OnConnectHandler;
public OnDisconnectEvent OnDisconnectHandler;
public OnDataChannelEvent OnDataChannelHandler;
public OnTrackEvent OnTrackEventHandler;
public SendOfferEvent SendOfferHandler;
public SendAnswerEvent SendAnswerHandler;
public SendCandidateEvent SendCandidateHandler;

public RTCPeerConnection peer => _peer;

/// <summary>
///
///
/// </summary>
public bool waitingAnswer
{
get => _waitingAnswer;
set {
private set
{
_waitingAnswer = value;
timeSinceStartWaitingAnswer =
_waitingAnswer ? UnityEngine.Time.realtimeSinceStartup : 0;
_timeSinceStartWaitingAnswer =
_waitingAnswer ? Time.realtimeSinceStartup : 0;
}
}

/// <summary>
/// see Time.realtimeSinceStartup
/// </summary>
public float timeSinceStartWaitingAnswer { get; private set; }
private readonly RTCPeerConnection _peer;
private readonly bool _polite;
private readonly Func<IEnumerator, Coroutine> _startCoroutine;
private readonly Action<Coroutine> _stopCoroutine;
private readonly HashSet<WeakReference<Coroutine>> _processingCoroutineList = new HashSet<WeakReference<Coroutine>>();

// resend offer
private readonly float _resendInterval;
private bool _waitingAnswer;
private float _timeSinceStartWaitingAnswer;

// processing set description
private bool _processingSetDescription;

// processing got description
private bool _ignoreOffer;
private bool _srdAnswerPending;


public PeerConnection(RTCPeerConnection peer, bool polite)
private bool _disposed = false;

public PeerConnection(bool polite, RTCConfiguration config, float resendInterval, Func<IEnumerator, Coroutine> startCoroutine, Action<Coroutine> stopCoroutine)
{
this.peer = peer;
this.polite = polite;
_polite = polite;
_resendInterval = resendInterval;
_startCoroutine = startCoroutine;
_stopCoroutine = stopCoroutine;

_peer = new RTCPeerConnection(ref config);
_peer.OnDataChannel = channel => OnDataChannelHandler?.Invoke(channel);
_peer.OnIceCandidate = candidate => SendCandidateHandler?.Invoke(candidate);
_peer.OnTrack = trackEvent => OnTrackEventHandler?.Invoke(trackEvent);
_peer.OnConnectionStateChange = state =>
{
switch (state)
{
case RTCPeerConnectionState.Connected:
OnConnectHandler?.Invoke();
break;
case RTCPeerConnectionState.Disconnected:
OnDisconnectHandler?.Invoke();
break;
}
};
_peer.OnNegotiationNeeded = () => StartCoroutine(OnNegotiationNeeded());
}

/// <summary>
///
/// </summary>
public void RestartTimerForWaitingAnswer()
private void StartCoroutine(IEnumerator enumerator)
{
timeSinceStartWaitingAnswer = UnityEngine.Time.realtimeSinceStartup;
var co = _startCoroutine(enumerator);
_processingCoroutineList.RemoveWhere(x => !x.TryGetTarget(out _));
_processingCoroutineList.Add(new WeakReference<Coroutine>(co));
}

~PeerConnection()
Expand All @@ -56,30 +108,192 @@ public void RestartTimerForWaitingAnswer()

public override string ToString()
{
var str = polite ? "polite" : "impolite";
return $"[{str}-{base.ToString()}]";
var str = _polite ? "polite" : "impolite";
return
$"[{str}-{nameof(PeerConnection)} {nameof(_peer.ConnectionState)}:{_peer.ConnectionState} {nameof(_peer.IceConnectionState)}:{_peer.IceConnectionState} {nameof(_peer.SignalingState)}:{_peer.SignalingState} {nameof(_peer.GatheringState)}:{_peer.GatheringState}]";
}

public void Dispose()
{
if (peer == null)
if (_disposed)
return;

foreach (var weakCo in _processingCoroutineList)
{
if (weakCo.TryGetTarget(out var co))
{
_stopCoroutine?.Invoke(co);
}
}
_processingCoroutineList.Clear();

if (_peer != null)
{
_peer.OnTrack = null;
_peer.OnDataChannel = null;
_peer.OnIceCandidate = null;
_peer.OnNegotiationNeeded = null;
_peer.OnConnectionStateChange = null;
_peer.OnIceConnectionChange = null;
_peer.OnIceGatheringStateChange = null;
_peer.Dispose();
}

_disposed = true;
GC.SuppressFinalize(this);
}

private IEnumerator OnNegotiationNeeded()
{
var waitProcessSetDescription = new WaitWhile(() => _processingSetDescription);
yield return waitProcessSetDescription;
SendOffer();
}

public bool IsConnected()
{
return _peer.ConnectionState == RTCPeerConnectionState.Connected;
}

public bool IsStable()
{
return _peer.SignalingState == RTCSignalingState.Stable ||
(_peer.SignalingState == RTCSignalingState.HaveLocalOffer && _srdAnswerPending);
}

public void SendOffer()
{
if (_processingSetDescription)
{
Debug.LogWarning($"{this} already processing other set description");
return;
}
if (disposed)

if (!IsStable())
{
if (!_waitingAnswer)
{
throw new InvalidOperationException(
$"{this} sendoffer needs in stable state, current state is {_peer.SignalingState}");
}

var timeout = _timeSinceStartWaitingAnswer + _resendInterval;

if (timeout < Time.realtimeSinceStartup)
{
SendOfferHandler?.Invoke(_peer.LocalDescription);
_timeSinceStartWaitingAnswer = Time.realtimeSinceStartup;
}
return;
}

peer.OnTrack = null;
peer.OnDataChannel = null;
peer.OnIceCandidate = null;
peer.OnNegotiationNeeded = null;
peer.OnConnectionStateChange = null;
peer.OnIceConnectionChange = null;
peer.OnIceGatheringStateChange = null;
peer.Dispose();
StartCoroutine(SendOfferCoroutine());
}

disposed = true;
GC.SuppressFinalize(this);
private IEnumerator SendOfferCoroutine()
{
Assert.AreEqual(_peer.SignalingState, RTCSignalingState.Stable);
Assert.AreEqual(_processingSetDescription, false);
Assert.AreEqual(waitingAnswer, false);

_processingSetDescription = true;

var opLocalDesc = _peer.SetLocalDescription();
yield return opLocalDesc;

if (opLocalDesc.IsError)
{
Debug.LogError($"{this} {opLocalDesc.Error.message}");
_processingSetDescription = false;
yield break;
}

Assert.AreEqual(_peer.LocalDescription.type, RTCSdpType.Offer);
Assert.AreEqual(_peer.SignalingState, RTCSignalingState.HaveLocalOffer);
_processingSetDescription = false;
waitingAnswer = true;

SendOfferHandler?.Invoke(_peer.LocalDescription);
}

public void SendAnswer()
{
if (_processingSetDescription)
{
Debug.LogWarning($"{this} already processing other set description");
return;
}

StartCoroutine(SendAnswerCoroutine());
}

private IEnumerator SendAnswerCoroutine()
{
Assert.AreEqual(_peer.SignalingState, RTCSignalingState.HaveRemoteOffer);
Assert.AreEqual(_processingSetDescription, false);

_processingSetDescription = true;

var opLocalDesc = _peer.SetLocalDescription();
yield return opLocalDesc;

if (opLocalDesc.IsError)
{
Debug.LogError($"{this} {opLocalDesc.Error.message}");
_processingSetDescription = false;
yield break;
}

Assert.AreEqual(_peer.LocalDescription.type, RTCSdpType.Answer);
Assert.AreEqual(_peer.SignalingState, RTCSignalingState.Stable);
_processingSetDescription = false;

SendAnswerHandler?.Invoke(_peer.LocalDescription);
}

public IEnumerator OnGotDescription(RTCSessionDescription description, Action onComplete)
{
var waitOtherProcess = new WaitWhile(() => _processingSetDescription);
yield return waitOtherProcess;

_ignoreOffer = description.type == RTCSdpType.Offer && !_polite && (_processingSetDescription || !IsStable());

if (_ignoreOffer)
{
Debug.LogWarning($"{this} glare - ignoreOffer.");
yield break;
}

waitingAnswer = false;
_srdAnswerPending = description.type == RTCSdpType.Answer;
_processingSetDescription = true;

var remoteDescOp = _peer.SetRemoteDescription(ref description);
yield return remoteDescOp;
if (remoteDescOp.IsError)
{
Debug.LogError($"{this} {remoteDescOp.Error.message}");
_srdAnswerPending = false;
_processingSetDescription = false;
yield break;
}

_srdAnswerPending = false;
_processingSetDescription = false;
onComplete?.Invoke();
}

public bool OnGotIceCandidate(RTCIceCandidate candidate)
{
if (!_peer.AddIceCandidate(candidate))
{
if (!_ignoreOffer)
Debug.LogWarning($"{this} this candidate can't accept on state.");

return false;
}

return true;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -162,6 +162,7 @@ private void _Run(
config = _conf,
signaling = _signaling,
startCoroutine = StartCoroutine,
stopCoroutine = StopCoroutine,
resentOfferInterval = interval,
};
var _handlers = (handlers ?? this.handlers.AsEnumerable()).Where(_ => _ != null);
Expand Down
Loading