Skip to content

Commit

Permalink
sqlliveness: add timeouts to heartbeats
Browse files Browse the repository at this point in the history
Previously, sqlliveness heartbeat operations could block on the transactions
that were involved. This change introduces some timeouts of the length of the
heartbeat during the create and refresh operations.

This change also adds a `log.Fatal()` if the session expires while
trying to extend it.

Resolves #85541
Resolves #85540

Release note: None

Release justification: low-risk bugfix to existing functionality
  • Loading branch information
dhartunian authored and aadityasondhi committed Sep 12, 2022
1 parent fd588f6 commit 934e219
Show file tree
Hide file tree
Showing 4 changed files with 185 additions and 6 deletions.
1 change: 1 addition & 0 deletions pkg/sql/sqlliveness/slinstance/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ go_library(
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"//pkg/util/uuid",
"@com_github_cockroachdb_errors//:errors",
],
)

Expand Down
56 changes: 53 additions & 3 deletions pkg/sql/sqlliveness/slinstance/slinstance.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
)

var (
Expand Down Expand Up @@ -103,6 +104,7 @@ func (s *session) invokeSessionExpiryCallbacks(ctx context.Context) {
for _, callback := range s.mu.sessionExpiryCallbacks {
callback(ctx)
}
log.Fatal(ctx, "sqlliveness session expired")
}

func (s *session) setExpiration(exp hlc.Timestamp) {
Expand Down Expand Up @@ -237,6 +239,48 @@ func (l *Instance) extendSession(ctx context.Context, s *session) (bool, error)
return true, nil
}

func (l *Instance) createSessionWithTimeout(
ctx context.Context, timeout time.Duration,
) (*session, error) {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, timeout) // nolint:context
defer cancel()
var s *session
var err error
createChan := make(chan struct{})
go func() {
s, err = l.createSession(ctx)
close(createChan)
}()
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-createChan:
return s, err
}
}

func (l *Instance) extendSessionWithTimeout(
ctx context.Context, s *session, timeout time.Duration,
) (bool, error) {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, timeout) // nolint:context
defer cancel()
var found bool
var err error
extendChan := make(chan struct{})
go func() {
found, err = l.extendSession(ctx, s)
close(extendChan)
}()
select {
case <-ctx.Done():
return false, ctx.Err()
case <-extendChan:
return found, err
}
}

func (l *Instance) heartbeatLoop(ctx context.Context) {
defer func() {
log.Warning(ctx, "exiting heartbeat loop")
Expand All @@ -253,8 +297,10 @@ func (l *Instance) heartbeatLoop(ctx context.Context) {
t.Read = true
s, _ := l.getSessionOrBlockCh()
if s == nil {
newSession, err := l.createSession(ctx)
var newSession *session
newSession, err := l.createSessionWithTimeout(ctx, l.hb())
if err != nil {
log.Errorf(ctx, "sqlliveness failed to create new session: %v", err)
func() {
l.mu.Lock()
defer l.mu.Unlock()
Expand All @@ -270,12 +316,16 @@ func (l *Instance) heartbeatLoop(ctx context.Context) {
t.Reset(l.hb())
continue
}
found, err := l.extendSession(ctx, s)
if err != nil {
found, err := l.extendSessionWithTimeout(ctx, s, l.hb())
if err != nil && errors.Is(err, context.DeadlineExceeded) {
// Unable to extend session due to unknown error.
// Clear and stop heartbeat loop.
log.Errorf(ctx, "sqlliveness failed to extend session: %v", err)
l.clearSession(ctx)
return
}
if !found {
// No existing session found, immediately create one.
l.clearSession(ctx)
// Start next loop iteration immediately to insert a new session.
t.Reset(0)
Expand Down
121 changes: 119 additions & 2 deletions pkg/sql/sqlliveness/slinstance/slinstance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ func TestSQLInstance(t *testing.T) {
clusterversion.TestingBinaryVersion,
clusterversion.TestingBinaryMinSupportedVersion,
true /* initializeVersion */)
slinstance.DefaultTTL.Override(ctx, &settings.SV, 2*time.Microsecond)
slinstance.DefaultHeartBeat.Override(ctx, &settings.SV, time.Microsecond)
slinstance.DefaultTTL.Override(ctx, &settings.SV, 2*time.Millisecond)
slinstance.DefaultHeartBeat.Override(ctx, &settings.SV, 1*time.Millisecond)

fakeStorage := slstorage.NewFakeStorage()
sqlInstance := slinstance.NewSQLInstance(stopper, clock, fakeStorage, settings, nil)
Expand Down Expand Up @@ -91,3 +91,120 @@ func TestSQLInstance(t *testing.T) {
_, err = sqlInstance.Session(ctx)
require.Error(t, err)
}

// TestSQLInstanceDeadlines tests that we have proper deadlines set on the
// create and extend session operations. This is done by inserting delays into
// the fake storage layer and ensuring that no sessions get created because the
// timeouts are constantly triggered.
func TestSQLInstanceDeadlines(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx, stopper := context.Background(), stop.NewStopper()
defer stopper.Stop(ctx)

clock := hlc.NewClock(timeutil.NewManualTime(timeutil.Unix(0, 42)), time.Nanosecond /* maxOffset */)
settings := cluster.MakeTestingClusterSettingsWithVersions(
clusterversion.TestingBinaryVersion,
clusterversion.TestingBinaryMinSupportedVersion,
true /* initializeVersion */)
slinstance.DefaultTTL.Override(ctx, &settings.SV, 2*time.Millisecond)
slinstance.DefaultHeartBeat.Override(ctx, &settings.SV, 1*time.Millisecond)

fakeStorage := slstorage.NewFakeStorage()
// block the fake storage
fakeStorage.BlockCh = make(chan chan struct{})
cleanUpFunc := func() {
ch := <-fakeStorage.BlockCh
close(ch)
close(fakeStorage.BlockCh)
fakeStorage.BlockCh = nil
}
defer cleanUpFunc()

sqlInstance := slinstance.NewSQLInstance(stopper, clock, fakeStorage, settings, nil)
sqlInstance.Start(ctx)

// verify that we do not create a session
require.Never(
t,
func() bool {
_, err := sqlInstance.Session(ctx)
if err != nil {
return false
}
return true
},
10*time.Millisecond, 1*time.Millisecond,
)
}

func TestSQLInstanceDeadlinesExtend(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx, stopper := context.Background(), stop.NewStopper()
defer stopper.Stop(ctx)

clock := hlc.NewClock(timeutil.NewManualTime(timeutil.Unix(0, 42)), time.Nanosecond /* maxOffset */)
settings := cluster.MakeTestingClusterSettingsWithVersions(
clusterversion.TestingBinaryVersion,
clusterversion.TestingBinaryMinSupportedVersion,
true /* initializeVersion */)
slinstance.DefaultTTL.Override(ctx, &settings.SV, 2*time.Millisecond)
// Must be shorter than the storage sleep amount below
slinstance.DefaultHeartBeat.Override(ctx, &settings.SV, 1*time.Millisecond)

fakeStorage := slstorage.NewFakeStorage()
sqlInstance := slinstance.NewSQLInstance(stopper, clock, fakeStorage, settings, nil)
sqlInstance.Start(ctx)

// verify that eventually session is created successfully
require.Eventually(
t,
func() bool {
_, err := sqlInstance.Session(ctx)
if err != nil {
return false
}
return true
},
10*time.Millisecond, 1*time.Millisecond,
)

// verify that session is also extended successfully a few times
require.Never(
t,
func() bool {
_, err := sqlInstance.Session(ctx)
if err != nil {
return true
}
return false
},
10*time.Millisecond, 1*time.Millisecond,
)

// block the fake storage
fakeStorage.BlockCh = make(chan chan struct{})
cleanUpFunc := func() {
ch := <-fakeStorage.BlockCh
close(ch)
close(fakeStorage.BlockCh)
fakeStorage.BlockCh = nil
}
defer cleanUpFunc()

// expect subsequent create/extend calls to fail
require.Eventually(
t,
func() bool {
_, err := sqlInstance.Session(ctx)
if err != nil {
return true
}
return false
},
10*time.Millisecond, 1*time.Millisecond,
)
}
13 changes: 12 additions & 1 deletion pkg/sql/sqlliveness/slstorage/test_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ import (

// FakeStorage implements the sqlliveness.Storage interface.
type FakeStorage struct {
mu struct {
BlockCh chan chan struct{}
mu struct {
syncutil.Mutex
sessions map[sqlliveness.SessionID]hlc.Timestamp
}
Expand Down Expand Up @@ -50,6 +51,11 @@ func (s *FakeStorage) Insert(
) error {
s.mu.Lock()
defer s.mu.Unlock()
if s.BlockCh != nil {
ch := make(chan struct{})
s.BlockCh <- ch
<-ch
}
if _, ok := s.mu.sessions[sid]; ok {
return errors.Errorf("session %s already exists", sid)
}
Expand All @@ -63,6 +69,11 @@ func (s *FakeStorage) Update(
) (bool, error) {
s.mu.Lock()
defer s.mu.Unlock()
if s.BlockCh != nil {
ch := make(chan struct{})
s.BlockCh <- ch
<-ch
}
if _, ok := s.mu.sessions[sid]; !ok {
return false, nil
}
Expand Down

0 comments on commit 934e219

Please sign in to comment.