-
Notifications
You must be signed in to change notification settings - Fork 107
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
RSDK-7403 - add remote rtp_passthrough support #3957
base: main
Are you sure you want to change the base?
Conversation
Warning your change may break code samples. If your change modifies any of the following functions please contact @viamrobotics/fleet-management. Thanks!
|
@@ -452,7 +456,8 @@ func (ss *StreamState) streamH264Passthrough(ctx context.Context) error { | |||
} | |||
|
|||
func (ss *StreamState) unsubscribeH264Passthrough(ctx context.Context, id rtppassthrough.SubscriptionID) error { | |||
cam, err := camera.FromRobot(ss.robot, ss.Stream.Name()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We need to turn the SDPTrackName back into a short name for the FromRobot
lookup to work if the camera is in a remote part
robot/web/web_c.go
Outdated
@@ -292,13 +292,13 @@ func (svc *webService) refreshVideoSources() { | |||
if err != nil { | |||
continue | |||
} | |||
existing, ok := svc.videoSources[validSDPTrackName(name)] | |||
existing, ok := svc.videoSources[cam.Name().SDPTrackName()] |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Changed from using private function to new public method on resource.Name
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
no behavior change, other than requiring injected cameras actually have names in test setup
resource/name_test.go
Outdated
@@ -0,0 +1,148 @@ | |||
package resource |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
I added 2 new functions to this package so I added some tests add added tests for the most common ways resource.Name s are created
@@ -294,6 +295,8 @@ func (c *Camera) SubscribeRTP( | |||
bufferSize int, | |||
packetsCB rtppassthrough.PacketCallback, | |||
) (rtppassthrough.Subscription, error) { | |||
golog.Global().Warnf("SubscribeRTP FAKE START %s", c.Name().String()) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove
golog.Global().Warnf("OnTrack START %s pc: %p", trackRemote.StreamID(), pc) | ||
defer golog.Global().Warnf("OnTrack END %s pc: %p", trackRemote.StreamID(), pc) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove
@@ -81,8 +80,8 @@ type SharedConn struct { | |||
// set to nil before this channel is closed. | |||
peerConnFailed chan struct{} | |||
|
|||
resOnTrackMu sync.Mutex | |||
resOnTrackCBs map[resource.Name]OnTrackCB | |||
onTrackCBByTrackNameMu sync.Mutex |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
pull out the name changes into a separate PR
ss.logger.Warnf("AddStream START %s", req.Name) | ||
defer ss.logger.Warnf("AddStream END %s", req.Name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove
@@ -204,12 +225,14 @@ func (ss *Server) AddStream(ctx context.Context, req *streampb.AddStreamRequest) | |||
|
|||
guard := rutils.NewGuard(func() { | |||
for _, sender := range ps.senders { | |||
golog.Global().Infof("calling RemoveTrack on %s pc: %p", sender.Track().StreamID(), pc) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove
ss.logger.Warnf("RemoveStream START %s", req.Name) | ||
defer ss.logger.Warnf("RemoveStream END %s", req.Name) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove
|
||
if _, ok := ss.activePeerStreams[pc][req.Name]; !ok { | ||
return nil, errors.New("stream already inactive") | ||
} | ||
|
||
var errs error | ||
for _, sender := range ss.activePeerStreams[pc][req.Name].senders { | ||
// golog.Global().Infof("calling RemoveTrack on %s pc: %p", sender.Track().StreamID(), pc) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove
ss.logger.Info("monitorStreamAvailable loop START") | ||
defer ss.logger.Info("monitorStreamAvailable loop END") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove
// if ss.streamSource != streamSourceUnknown { | ||
// return fmt.Errorf("unexpected stream %s source %s", ss.Stream.Name(), ss.streamSource) | ||
// } | ||
// this is the first subscription, attempt passthrough | ||
// ss.logger.CDebugw(ctx, "attempting to subscribe to rtp_passthrough", "name", ss.Stream.Name()) | ||
// err := ss.streamH264Passthrough(ctx) | ||
// if err != nil { | ||
// ss.logger.CDebugw(ctx, "rtp_passthrough not possible, falling back to GoStream", "err", err.Error(), "name", ss.Stream.Name()) | ||
// // if passthrough failed, fall back to gostream based approach | ||
// ss.Stream.Start() | ||
// ss.streamSource = streamSourceGoStream | ||
// } | ||
// ss.activeClients++ | ||
// return nil | ||
// } | ||
|
||
// switch ss.streamSource { | ||
// case streamSourcePassthrough: | ||
// ss.logger.Debugw("continuing using rtp_passthrough", "name", ss.Stream.Name()) | ||
// // noop as we are already subscribed | ||
// case streamSourceGoStream: | ||
// ss.logger.Debugw("currently using gostream, trying upgrade to rtp_passthrough", "name", ss.Stream.Name()) | ||
// // attempt to cut over to passthrough | ||
// err := ss.streamH264Passthrough(ctx) | ||
// if err != nil { | ||
// ss.logger.Debugw("rtp_passthrough not possible, continuing with gostream", "err", err.Error(), "name", ss.Stream.Name()) | ||
// } | ||
// case streamSourceUnknown: | ||
// fallthrough | ||
// default: | ||
// err := fmt.Errorf("%s streamSource in unexpected state %s", ss.Stream.Name(), ss.streamSource) | ||
// ss.logger.Error(err.Error()) | ||
// return err | ||
// } | ||
// ss.activeClients++ |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove
// case err != nil && ss.activeClients > 0: | ||
// ss.logger.Warn("camera no longer exists in resource graph, stopping subscriptions and setting active clients to 0") | ||
// // stop stream if the camera no longer exists | ||
// // noop if there is no stream source | ||
// ss.stopBasedOnSub() | ||
// // nil out subsription & stream source even if stopBasedOnSub didn't | ||
// ss.streamSourceSub = rtppassthrough.NilSubscription | ||
// ss.streamSource = streamSourceUnknown | ||
// ss.activeClients = 0 |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove
components/camera/client.go
Outdated
c.logger.Warn("Close START") | ||
debug.PrintStack() | ||
defer c.logger.Warn("Close END") |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove
c.redLog(fmt.Sprintf("SubscribeRTP START %s client: %p, pc: %p", c.Name().String(), c, c.conn.PeerConn())) | ||
defer c.redLog(fmt.Sprintf("SubscribeRTP END %s client: %p, pc: %p", c.Name().String(), c, c.conn.PeerConn())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove
c.logger.CDebugw(ctx, "SubscribeRTP AddStream hit error", "subID", sub.ID.String(), "name", c.Name(), "err", err) | ||
defer sc.RemoveOnTrackSub(c.trackName()) | ||
|
||
c.redLog(fmt.Sprintf("SubscribeRTP AddStream CALL %s client: %p, pc: %p", c.trackName(), c, c.conn.PeerConn())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove
return rtppassthrough.NilSubscription, err | ||
} | ||
c.redLog(fmt.Sprintf("SubscribeRTP AddStream RETURN %s client: %p, pc: %p", c.trackName(), c, c.conn.PeerConn())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove
@@ -469,18 +524,20 @@ func (c *client) SubscribeRTP( | |||
|
|||
// To prevent that failure mode, we exit with an error if a track is not received within | |||
// the SubscribeRTP context. | |||
c.redLog(fmt.Sprintf("SubscribeRTP waiting for track %s client: %p, pc: %p", c.trackName(), c, c.conn.PeerConn())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove
c.logger.Error(err) | ||
return rtppassthrough.NilSubscription, err | ||
case <-trackReceived: | ||
c.logger.Debug("received track") | ||
c.redLog(fmt.Sprintf("received track %s client: %p, pc: %p", c.trackName(), c, c.conn.PeerConn())) |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove
c.logger.Debugw("SubscribeRTP: camera client", "name ", c.Name(), "parentID", parentID.String(), | ||
"OnTrack callback terminating as the client is closing") | ||
close(trackClosed) | ||
return | ||
default: | ||
} | ||
|
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove
} | ||
// END TestWhyMustTimeoutOnReadRTP |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove
|
||
// BEGIN TestWhyMustCallUnsubscribe | ||
// if we are talking to a remote viam-sever we need to call RemoveStream so that | ||
// the remote (which still has a healthy peer connection with the main part) | ||
// knows to stop the stream. If we don't do this there is a risk that | ||
// the remote will still leave the stream open, causing a newly reconfigured | ||
// main part client's AddStream call to receive an error that there is already | ||
// a live stream. | ||
|
||
// We don't need to do this in the case of modules as resource manager of the main part | ||
// will call `Close()` on them in the cases when the client would have Close() calld on it | ||
// which will terminate the stream. | ||
|
||
// if _, isRemote := c.conn.(*grpc.ReconfigurableClientConn); isRemote { | ||
// // NOTE: This is the ctx from `camera/client.Close()` which is called by resource manager. I don't know | ||
// // if it has a timeout. If it doesn't then this might block forever. | ||
// c.logger.CDebugw(ctx, "unsubscribeAll calling RemoveStream", "trackName", c.trackName()) | ||
// if _, err := c.streamClient.RemoveStream(ctx, &streampb.RemoveStreamRequest{Name: c.trackName()}); err != nil { | ||
// c.logger.CWarnw(ctx, "unsubscribeAll RemoveStream returned err", "trackName", c.trackName(), "err", err) | ||
// } | ||
// } | ||
// END TestWhyMustCallUnsubscribe |
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
remove
Changes:
rdk:component:camera/remote:cam-1
the SDP encoded short name isremote+cam-1
)rdk:component:camera/remote:cam-1
name used iscam-1
asremote
doesn't know that it's name isremote
from the perspective of the main part)Tracker
interface support toReconfigurableClientConn
so that peer connections from viam-server main & remote parts can stream WebRTC tracks to / from each otherTracker
interface to take strings rather thanresource.Name
s as keys for the track names as that is the lowest common denominator for how both module.Add|RemoveStream and stream.Add|RemoveStream endpoints workrobot/web/stream/state/state.go
to support looking up cameras in remote parts by decoding the SDP encoded short name back into a short nameFollow up ticket:
https://viam.atlassian.net/browse/RSDK-7668
Manual Test Plan:
Note:
I tested this manually with the following topology:
At every hop, the frequency of video pauses would increase. I believe this is due to the fact that there is variable latency on the office wifi network, where I see ping times swing between 5 ms and 200 ms per hop when there are a lot of people on the network. When the congestion is bad, the topology above does have noticeable pauses / starts. Future work could mitigate this by adding RTP packet buffering to help prevent pauses / starts at the cost of showing a video feed that is a few seconds stale.
Details:
This diagram shows the inputs to AddStream and the name of the track used in AddTrack & the name format convention used in the case of:
The fact that modules use the fully qualified component name as the input to AddStream and RemoveStream is a mistake I made in an earlier implementation. This change keeps that status quo to not break backward compatibility with existing modules.