Skip to content

Commit

Permalink
[v14] backports for new ws paradigm (#38070)
Browse files Browse the repository at this point in the history
* Introduce a new streaming player API (#31754)

This new API can be used to play back sessions of any type.
The player accepts a session ID and a streamer, and provides
the caller with an API for playback controls (speed, play/pause,
seek, etc) as well as a channel that receives events with the
proper timing delay applied.

The design for this change is discussed in RFD 91.

Updates #10578
Updates #10579
Updates gravitational/teleport-private#665
Updates gravitational/teleport-private#1024

* Convert the desktop sesssion player to the new player API (#34070)

This makes a few changes to the player API to ensure that errors
are correctly propagated.

* Switch desktop playback to gorilla/websocket (#36405)

We use gorilla/websocket throughout the app, but desktop playback
leveraged x/net/websocket instead. Convert to gorilla so that we
are consistent and use the same library everywhere websockets are
used.

* Read the bearer token over websocket endpoints instead of query parameter (#37520)

* Read the bearer token over WS endpoints

use the request context, not session

Dont pass websocket by context

lint

resolve some comments

Add TestWSAuthenticateRequest

Close ws in handler

deprecation notices, doc

resolve comments

resolve comments

give a longer read/write deadline

dont set write deadline, ws endpoints never did before and it breaks things

convert frontend to use ws access token

Resolove comments, move to using an explicit state

fix ci

reset read deadline

prettier

* update connectToHost

* linter

* read errors from websocket

* missing /ws on ttyWsAddr and fix wrong onmessage

* fix race in test

* lint

* skip TestTerminal as it takes 11 seconds to run

* dont skip the test

* resolve apiserver comments

* Add an AuthenticatedWebSocket class

* convert other clients to use AuthenticatedWebSocket

* Converts `AuthenticatedWebSocket` into drop-in replacement for `WebSocket` (#37699)

* Converts `AuthenticatedWebSocket` into drop-in replacement for `WebSocket`
that automatically goes through Teleport's custom authentication process
before facilitating any caller-defined communication.

This also reverts previous-`WebSocket` users to their original state
(sans the code for passing the bearer token in the query string),
swapping in `AuthenticatedWebSocket` in place of `WebSocket`.

* Create a single authnWsUpgrader with a comment justifying why we turn off CORS

* recieving to receiving

* resolve comments

---------

Co-authored-by: Isaiah Becker-Mayer <isaiah@goteleport.com>

* Updates `desktopPlaybackHandle` to new ws paradigm (#37981)

* Updates `desktopPlaybackHandle` to new ws paradigm

This was mistakenly left out of #37520.
This commit also refactors `WithClusterAuthWebSocket` slightly for easier
comprehension, and updates the vite config to facilitate the new websocket
endpoints in development mode.

* Update lib/web/apiserver.go

Co-authored-by: Zac Bergquist <zac.bergquist@goteleport.com>

---------

Co-authored-by: Zac Bergquist <zac.bergquist@goteleport.com>

* polishing off merge errors

* fixes Path in makeTerminal to reflect new /ws suffix

---------

Co-authored-by: Zac Bergquist <zac.bergquist@goteleport.com>
Co-authored-by: Alex McGrath <alex.mcgrath@goteleport.com>
  • Loading branch information
3 people committed Feb 13, 2024
1 parent ebef9e9 commit df87f9d
Show file tree
Hide file tree
Showing 22 changed files with 1,461 additions and 417 deletions.
17 changes: 12 additions & 5 deletions lib/events/auditlog.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"errors"
"fmt"
"io"
"io/fs"
"os"
"path/filepath"
"sort"
Expand Down Expand Up @@ -985,7 +986,9 @@ func (l *AuditLog) StreamSessionEvents(ctx context.Context, sessionID session.ID
if rmErr := os.Remove(tarballPath); rmErr != nil {
l.log.WithError(rmErr).Warningf("Failed to remove file %v.", tarballPath)
}

if errors.Is(err, fs.ErrNotExist) {
err = trace.NotFound("a recording for session %v was not found", sessionID)
}
e <- trace.Wrap(err)
return c, e
}
Expand All @@ -1003,7 +1006,7 @@ func (l *AuditLog) StreamSessionEvents(ctx context.Context, sessionID session.ID
for {
if ctx.Err() != nil {
e <- trace.Wrap(ctx.Err())
break
return
}

event, err := protoReader.Read(ctx)
Expand All @@ -1013,12 +1016,16 @@ func (l *AuditLog) StreamSessionEvents(ctx context.Context, sessionID session.ID
} else {
close(c)
}

break
return
}

if event.GetIndex() >= startIndex {
c <- event
select {
case c <- event:
case <-ctx.Done():
e <- trace.Wrap(ctx.Err())
return
}
}
}
}()
Expand Down
332 changes: 332 additions & 0 deletions lib/player/player.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,332 @@
// Copyright 2023 Gravitational, Inc
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

// Package player includes an API to play back recorded sessions.
package player

import (
"context"
"errors"
"math"
"sync/atomic"
"time"

"github.com/gravitational/trace"
"github.com/jonboulle/clockwork"
"github.com/sirupsen/logrus"

"github.com/gravitational/teleport/api/types/events"
"github.com/gravitational/teleport/lib/session"
)

// Player is used to stream recorded sessions over a channel.
type Player struct {
// read only config fields
clock clockwork.Clock
log logrus.FieldLogger
sessionID session.ID
streamer Streamer

speed atomic.Value // playback speed (1.0 for normal speed)
lastPlayed atomic.Int64 // timestamp of most recently played event

// advanceTo is used to implement fast-forward and rewind.
// During normal operation, it is set to [normalPlayback].
//
// When set to a positive value the player is seeking forward
// in time (and plays events as quickly as possible).
//
// When set to a negative value, the player needs to "rewind"
// by starting the stream over from the beginning and then
// seeking forward to the rewind point.
advanceTo atomic.Int64

emit chan events.AuditEvent
done chan struct{}

// playPause holds a channel to be closed when
// the player transitions from paused to playing,
// or nil if the player is already playing.
//
// This approach mimics a "select-able" condition variable
// and is inspired by "Rethinking Classical Concurrency Patterns"
// by Bryan C. Mills (GopherCon 2018): https://www.youtube.com/watch?v=5zXAHh5tJqQ
playPause chan chan struct{}

// err holds the error (if any) encountered during playback
err error
}

const normalPlayback = math.MinInt64

// Streamer is the underlying streamer that provides
// access to recorded session events.
type Streamer interface {
StreamSessionEvents(
ctx context.Context,
sessionID session.ID,
startIndex int64,
) (chan events.AuditEvent, chan error)
}

// Config configures a session player.
type Config struct {
Clock clockwork.Clock
Log logrus.FieldLogger
SessionID session.ID
Streamer Streamer
}

func New(cfg *Config) (*Player, error) {
if cfg.Streamer == nil {
return nil, trace.BadParameter("missing Streamer")
}

if cfg.SessionID == "" {
return nil, trace.BadParameter("missing SessionID")
}

clk := cfg.Clock
if clk == nil {
clk = clockwork.NewRealClock()
}

var log logrus.FieldLogger = cfg.Log
if log == nil {
log = logrus.New().WithField(trace.Component, "player")
}

p := &Player{
clock: clk,
log: log,
sessionID: cfg.SessionID,
streamer: cfg.Streamer,
emit: make(chan events.AuditEvent, 64),
playPause: make(chan chan struct{}, 1),
done: make(chan struct{}),
}

p.speed.Store(float64(defaultPlaybackSpeed))
p.advanceTo.Store(normalPlayback)

// start in a paused state
p.playPause <- make(chan struct{})

go p.stream()

return p, nil
}

// errClosed is an internal error that is used to signal
// that the player has been closed
var errClosed = errors.New("player closed")

const (
minPlaybackSpeed = 0.25
defaultPlaybackSpeed = 1.0
maxPlaybackSpeed = 16
)

// SetSpeed adjusts the playback speed of the player.
// It can be called at any time (the player can be in a playing
// or paused state). A speed of 1.0 plays back at regular speed,
// while a speed of 2.0 plays back twice as fast as originally
// recorded. Valid speeds range from 0.25 to 16.0.
func (p *Player) SetSpeed(s float64) error {
if s < minPlaybackSpeed || s > maxPlaybackSpeed {
return trace.BadParameter("speed %v is out of range", s)
}
p.speed.Store(s)
return nil
}

func (p *Player) stream() {
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

eventsC, errC := p.streamer.StreamSessionEvents(ctx, p.sessionID, 0)
lastDelay := int64(0)
for {
select {
case <-p.done:
close(p.emit)
return
case err := <-errC:
p.log.Warn(err)
p.err = err
close(p.emit)
return
case evt := <-eventsC:
if evt == nil {
p.log.Debugf("reached end of playback for session %v", p.sessionID)
close(p.emit)
return
}

if err := p.waitWhilePaused(); err != nil {
p.log.Warn(err)
close(p.emit)
return
}

currentDelay := getDelay(evt)
if currentDelay > 0 && currentDelay > lastDelay {
switch adv := p.advanceTo.Load(); {
case adv >= currentDelay:
// no timing delay necessary, we are fast forwarding
break
case adv < 0 && adv != normalPlayback:
// any negative value other than normalPlayback means
// we rewind (by restarting the stream and seeking forward
// to the rewind point)
p.advanceTo.Store(adv * -1)
go p.stream()
return
default:
if adv != normalPlayback {
p.advanceTo.Store(normalPlayback)

// we're catching back up to real time, so the delay
// is calculated not from the last event but from the
// time we were advanced to
lastDelay = adv
}
if err := p.applyDelay(time.Duration(currentDelay-lastDelay) * time.Millisecond); err != nil {
close(p.emit)
return
}
}

lastDelay = currentDelay
}

select {
case p.emit <- evt:
p.lastPlayed.Store(currentDelay)
default:
p.log.Warnf("dropped event %v, reader too slow", evt.GetID())
}
}
}
}

// Close shuts down the player and cancels any streams that are
// in progress.
func (p *Player) Close() error {
close(p.done)
return nil
}

// C returns a read only channel of recorded session events.
// The player manages the timing of events and writes them to the channel
// when they should be rendered. The channel is closed when the player
// has reached the end of playback.
func (p *Player) C() <-chan events.AuditEvent {
return p.emit
}

// Err returns the error (if any) that occurred during playback.
// It should only be called after the channel returned by [C] is
// closed.
func (p *Player) Err() error {
return p.err
}

// Pause temporarily stops the player from emitting events.
// It is a no-op if playback is currently paused.
func (p *Player) Pause() error {
p.setPlaying(false)
return nil
}

// Play starts emitting events. It is used to start playback
// for the first time and to resume playing after the player
// is paused.
func (p *Player) Play() error {
p.setPlaying(true)
return nil
}

// SetPos sets playback to a specific time, expressed as a duration
// from the beginning of the session. A duration of 0 restarts playback
// from the beginning. A duration greater than the length of the session
// will cause playback to rapidly advance to the end of the recording.
func (p *Player) SetPos(d time.Duration) error {
if d.Milliseconds() < p.lastPlayed.Load() {
// if we're rewinding we store a negative value
d = -1 * d
}
p.advanceTo.Store(d.Milliseconds())
return nil
}

// applyDelay "sleeps" for d in a manner that
// can be canceled
func (p *Player) applyDelay(d time.Duration) error {
scaled := float64(d) / p.speed.Load().(float64)
select {
case <-p.done:
return errClosed
case <-p.clock.After(time.Duration(scaled)):
return nil
}
}

func (p *Player) setPlaying(play bool) {
ch := <-p.playPause
alreadyPlaying := ch == nil

if alreadyPlaying && !play {
ch = make(chan struct{})
} else if !alreadyPlaying && play {
// signal waiters who are paused that it's time to resume playing
close(ch)
ch = nil
}

p.playPause <- ch
}

// waitWhilePaused blocks while the player is in a paused state.
// It returns immediately if the player is currently playing.
func (p *Player) waitWhilePaused() error {
ch := <-p.playPause
p.playPause <- ch

if alreadyPlaying := ch == nil; !alreadyPlaying {
select {
case <-p.done:
return errClosed
case <-ch:
}
}
return nil
}

// LastPlayed returns the time of the last played event,
// expressed as milliseconds since the start of the session.
func (p *Player) LastPlayed() int64 {
return p.lastPlayed.Load()
}

func getDelay(e events.AuditEvent) int64 {
switch x := e.(type) {
case *events.DesktopRecording:
return x.DelayMilliseconds
case *events.SessionPrint:
return x.DelayMilliseconds
default:
return int64(0)
}
}

0 comments on commit df87f9d

Please sign in to comment.