Skip to content

Commit

Permalink
[v15] kube: fix data race caused by improper usage of sync.WaitGroup (
Browse files Browse the repository at this point in the history
#43026)

* kube: fix data race caused by improper usage of `sync.WaitGroup`

This PR drops usage of `sync.WaitGroup` for events routines because it's
called concurrently when the session starts - all moderation criteria
are met - but can be closed by multiple readers.

This PR introduced a new `concurrentWaitGroup` based on `sync.Cond` to
do the job of keeping track of events goroutines when closing a session.

Signed-off-by: Tiago Silva <tiago.silva@goteleport.com>

* rename to weakWaitGroup and add docs

* fix typo

* call broadcast when wait has been called

---------

Signed-off-by: Tiago Silva <tiago.silva@goteleport.com>
  • Loading branch information
tigrato authored Jun 14, 2024
1 parent 6a46374 commit 041a1ac
Showing 1 changed file with 73 additions and 11 deletions.
84 changes: 73 additions & 11 deletions lib/kube/proxy/sess.go
Original file line number Diff line number Diff line change
Expand Up @@ -373,9 +373,11 @@ type session struct {
// reason is the reason for the session.
reason string

// eventsWaiter is used to wait for events to be emitted and goroutines closed
// weakEventsWaiter is used to wait for events to be emitted and goroutines closed
// when a session is closed.
eventsWaiter sync.WaitGroup
// Note: this is a weakWaitGroup and doesn't have the same guarantees as sync.WaitGroup.
// Please see the documentation for [weakWaitGroup] for more information.
weakEventsWaiter weakWaitGroup

streamContext context.Context
streamContextCancel context.CancelFunc
Expand Down Expand Up @@ -601,9 +603,9 @@ func (s *session) launch(ephemeralContainerStatus *corev1.ContainerStatus) (retu
s.forwarder.log.WithError(err).Warn("Failed to set up session start event - event will not be recorded")
}

s.eventsWaiter.Add(1)
s.weakEventsWaiter.Add(1)
go func() {
defer s.eventsWaiter.Done()
defer s.weakEventsWaiter.Done()
t := time.NewTimer(time.Until(s.expires))
defer t.Stop()

Expand Down Expand Up @@ -752,9 +754,9 @@ func (s *session) lockedSetupLaunch(request *remoteCommandRequest, eventPodMeta
// the events are emitted before closing the emitter/recorder.
// It might happen when a user disconnects or when a moderator forces an early
// termination.
s.eventsWaiter.Add(1)
s.weakEventsWaiter.Add(1)
onFinish := func(errExec error) {
defer s.eventsWaiter.Done()
defer s.weakEventsWaiter.Done()
s.mu.Lock()
defer s.mu.Unlock()

Expand Down Expand Up @@ -871,9 +873,9 @@ func (s *session) lockedSetupLaunch(request *remoteCommandRequest, eventPodMeta

// If the identity is verified with an MFA device, we enabled MFA-based presence for the session.
if s.PresenceEnabled {
s.eventsWaiter.Add(1)
s.weakEventsWaiter.Add(1)
go func() {
defer s.eventsWaiter.Done()
defer s.weakEventsWaiter.Done()
ticker := time.NewTicker(PresenceVerifyInterval)
defer ticker.Stop()

Expand Down Expand Up @@ -958,9 +960,9 @@ func (s *session) join(p *party, emitJoinEvent bool) error {
s.BroadcastMessage("User %v joined the session with participant mode: %v.", p.Ctx.User.GetName(), p.Mode)

if p.Mode == types.SessionModeratorMode {
s.eventsWaiter.Add(1)
s.weakEventsWaiter.Add(1)
go func() {
defer s.eventsWaiter.Done()
defer s.weakEventsWaiter.Done()
c := p.Client.forceTerminate()
select {
case <-c:
Expand Down Expand Up @@ -1287,7 +1289,7 @@ func (s *session) Close() error {
if recorder != nil {
// wait for events to be emitted before closing the recorder/emitter.
// If we close it immediately we will lose session.end events.
s.eventsWaiter.Wait()
s.weakEventsWaiter.Wait()
recorder.Close(s.forwarder.ctx)
}
})
Expand Down Expand Up @@ -1528,3 +1530,63 @@ func (s *session) retrieveEphemeralContainerCommand(ctx context.Context, usernam
}
return nil
}

// weakWaitGroup is a specialized synchronization primitive similar to sync.WaitGroup
// but with **relaxed** guarantees. Unlike sync.WaitGroup, weakWaitGroup does not ensure
// that the Wait() method will wait for all Add() calls to reach completion through Done()
// if they are called concurrently. This means that there is a potential leak in the
// synchronization of goroutines that are added to the weakWaitGroup and may be started
// after the Wait() method is called.
//
// Use Case:
// This weakWaitGroup is intended for scenarios where goroutines are initiated from
// various parts of the codebase concurrently and need to be awaited only if they started before
// a certain point in time, specifically before session.Close() is called. If a goroutine
// is initiated after session.Close() has been invoked, it will not be included in the wait process.
// It's the caller responsibility to ensure that all goroutines started after Wait() returns end
// up being a no-op.
//
// Important Considerations:
// - This implementation is UNSAFE as a general-purpose synchronization primitive.
// - It does not guarantee that Wait() will account for all Add() calls, leading to potential
// race conditions or goroutines that may not be properly awaited.
// - Due to these limitations, weakWaitGroup should be used with extreme caution and only
// in contexts where its relaxed guarantees are acceptable and safe.
//
// WARNING:
// This is not a substitute for sync.WaitGroup in situations requiring strong synchronization
// guarantees.
type weakWaitGroup struct {
cond sync.Cond
mu sync.Mutex
count int
}

func (c *weakWaitGroup) Add(delta int) {
c.mu.Lock()
defer c.mu.Unlock()
c.count += delta
}

func (c *weakWaitGroup) Done() {
c.mu.Lock()
defer c.mu.Unlock()
c.count--
if c.count == 0 && c.cond.L != nil {
c.cond.Broadcast()
}
}

func (c *weakWaitGroup) Wait() {
c.mu.Lock()
defer c.mu.Unlock()
if c.count == 0 {
return
}
if c.cond.L == nil {
c.cond.L = &c.mu
}
for c.count > 0 {
c.cond.Wait()
}
}

0 comments on commit 041a1ac

Please sign in to comment.