Skip to content

Commit

Permalink
sqlliveness: add timeouts to heartbeats
Browse files Browse the repository at this point in the history
Previously, sqlliveness heartbeat operations could block on the transactions
that were involved. This change introduces some timeouts of the length of the
heartbeat during the create and refresh operations.

This change also adds a `log.Fatal()` if the session expires while
trying to extend it.

Resolves #85541
Resolves #85540

Release note: None

Release justification: low-risk bugfix to existing functionality
  • Loading branch information
dhartunian authored and aadityasondhi committed Sep 7, 2022
1 parent 0197ada commit 8eeebb5
Show file tree
Hide file tree
Showing 4 changed files with 167 additions and 6 deletions.
2 changes: 2 additions & 0 deletions pkg/sql/sqlliveness/slinstance/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ go_library(
"//pkg/settings",
"//pkg/settings/cluster",
"//pkg/sql/sqlliveness",
"//pkg/util/contextutil",
"//pkg/util/grpcutil",
"//pkg/util/hlc",
"//pkg/util/log",
Expand All @@ -35,6 +36,7 @@ go_test(
"//pkg/settings/cluster",
"//pkg/sql/sqlliveness",
"//pkg/sql/sqlliveness/slstorage",
"//pkg/util/contextutil",
"//pkg/util/hlc",
"//pkg/util/leaktest",
"//pkg/util/log",
Expand Down
64 changes: 61 additions & 3 deletions pkg/sql/sqlliveness/slinstance/slinstance.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
)

var (
Expand Down Expand Up @@ -103,6 +104,7 @@ func (s *session) invokeSessionExpiryCallbacks(ctx context.Context) {
for _, callback := range s.mu.sessionExpiryCallbacks {
callback(ctx)
}
log.Fatal(ctx, "sqlliveness session expired")
}

func (s *session) setExpiration(exp hlc.Timestamp) {
Expand Down Expand Up @@ -237,6 +239,56 @@ func (l *Instance) extendSession(ctx context.Context, s *session) (bool, error)
return true, nil
}

type TimeoutError struct{}

func (t TimeoutError) Error() string {
return "session timeout"
}

var _ error = &TimeoutError{}

func (l *Instance) createSessionWithTimeout(
ctx context.Context, timeout time.Duration,
) (*session, error) {
var s *session
var err error
var createChan = make(chan struct{})
go func() {
s, err = l.createSession(ctx)
close(createChan)
}()

t := timeutil.NewTimer()
t.Reset(timeout)
select {
case <-t.C:
return nil, &TimeoutError{}
case <-createChan:
return s, err
}
}

func (l *Instance) extendSessionWithTimeout(
ctx context.Context, s *session, timeout time.Duration,
) (bool, error) {
var found bool
var err error
var extendChan = make(chan struct{})
go func() {
found, err = l.extendSession(ctx, s)
close(extendChan)
}()

t := timeutil.NewTimer()
t.Reset(timeout)
select {
case <-t.C:
return false, &TimeoutError{}
case <-extendChan:
return found, err
}
}

func (l *Instance) heartbeatLoop(ctx context.Context) {
defer func() {
log.Warning(ctx, "exiting heartbeat loop")
Expand All @@ -253,8 +305,10 @@ func (l *Instance) heartbeatLoop(ctx context.Context) {
t.Read = true
s, _ := l.getSessionOrBlockCh()
if s == nil {
newSession, err := l.createSession(ctx)
var newSession *session
newSession, err := l.createSessionWithTimeout(ctx, l.hb())
if err != nil {
log.Errorf(ctx, "sqlliveness failed to create new session: %v", err)
func() {
l.mu.Lock()
defer l.mu.Unlock()
Expand All @@ -270,12 +324,16 @@ func (l *Instance) heartbeatLoop(ctx context.Context) {
t.Reset(l.hb())
continue
}
found, err := l.extendSession(ctx, s)
if err != nil {
found, err := l.extendSessionWithTimeout(ctx, s, l.hb())
if err != nil && !errors.HasType(err, (*TimeoutError)(nil)) {
// Unable to extend session due to unknown error.
// Clear and stop heartbeat loop.
log.Errorf(ctx, "sqlliveness failed to extend session: %v", err)
l.clearSession(ctx)
return
}
if !found {
// No existing session found, immediately create one.
l.clearSession(ctx)
// Start next loop iteration immediately to insert a new session.
t.Reset(0)
Expand Down
101 changes: 99 additions & 2 deletions pkg/sql/sqlliveness/slinstance/slinstance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ func TestSQLInstance(t *testing.T) {
clusterversion.TestingBinaryVersion,
clusterversion.TestingBinaryMinSupportedVersion,
true /* initializeVersion */)
slinstance.DefaultTTL.Override(ctx, &settings.SV, 2*time.Microsecond)
slinstance.DefaultHeartBeat.Override(ctx, &settings.SV, time.Microsecond)
slinstance.DefaultTTL.Override(ctx, &settings.SV, 500*time.Microsecond)
slinstance.DefaultHeartBeat.Override(ctx, &settings.SV, 250*time.Microsecond)

fakeStorage := slstorage.NewFakeStorage()
sqlInstance := slinstance.NewSQLInstance(stopper, clock, fakeStorage, settings, nil)
Expand Down Expand Up @@ -91,3 +91,100 @@ func TestSQLInstance(t *testing.T) {
_, err = sqlInstance.Session(ctx)
require.Error(t, err)
}

// TestSQLInstanceDeadlines tests that we have proper deadlines set on the
// create and extend session operations. This is done by inserting delays into
// the fake storage layer and ensuring that no sessions get created because the
// timeouts are constantly triggered.
func TestSQLInstanceDeadlines(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx, stopper := context.Background(), stop.NewStopper()
defer stopper.Stop(ctx)

clock := hlc.NewClock(timeutil.NewManualTime(timeutil.Unix(0, 42)), time.Nanosecond /* maxOffset */)
settings := cluster.MakeTestingClusterSettingsWithVersions(
clusterversion.TestingBinaryVersion,
clusterversion.TestingBinaryMinSupportedVersion,
true /* initializeVersion */)
slinstance.DefaultTTL.Override(ctx, &settings.SV, 500*time.Microsecond)
// Must be shorter than the storage sleep amount below
slinstance.DefaultHeartBeat.Override(ctx, &settings.SV, 250*time.Microsecond)

fakeStorage := slstorage.NewFakeStorage()
fakeStorage.InsertSleep = 2 * time.Millisecond
sqlInstance := slinstance.NewSQLInstance(stopper, clock, fakeStorage, settings, nil)
sqlInstance.Start(ctx)

require.Never(
t,
func() bool {
_, err := sqlInstance.Session(ctx)
if err != nil {
return false
}
return true
},
1*time.Millisecond, 100*time.Microsecond,
)
}

func TestSQLInstanceDeadlinesExtend(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx, stopper := context.Background(), stop.NewStopper()
defer stopper.Stop(ctx)

clock := hlc.NewClock(timeutil.NewManualTime(timeutil.Unix(0, 42)), time.Nanosecond /* maxOffset */)
settings := cluster.MakeTestingClusterSettingsWithVersions(
clusterversion.TestingBinaryVersion,
clusterversion.TestingBinaryMinSupportedVersion,
true /* initializeVersion */)
slinstance.DefaultTTL.Override(ctx, &settings.SV, 0*time.Microsecond)
// Must be shorter than the storage sleep amount below
slinstance.DefaultHeartBeat.Override(ctx, &settings.SV, 250*time.Microsecond)

fakeStorage := slstorage.NewFakeStorage()
sqlInstance := slinstance.NewSQLInstance(stopper, clock, fakeStorage, settings, nil)
sqlInstance.Start(ctx)

require.Eventually(
t,
func() bool {
_, err := sqlInstance.Session(ctx)
if err != nil {
return false
}
return true
},
1*time.Millisecond, 100*time.Microsecond,
)

require.Never(
t,
func() bool {
_, err := sqlInstance.Session(ctx)
if err != nil {
return true
}
return false
},
1*time.Millisecond, 100*time.Microsecond,
)

fakeStorage.InsertSleep = 2 * time.Millisecond

require.Eventually(
t,
func() bool {
_, err := sqlInstance.Session(ctx)
if err != nil {
return true
}
return false
},
1*time.Millisecond, 100*time.Microsecond,
)
}
6 changes: 5 additions & 1 deletion pkg/sql/sqlliveness/slstorage/test_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ package slstorage

import (
"context"
"time"

"github.com/cockroachdb/cockroach/pkg/sql/sqlliveness"
"github.com/cockroachdb/cockroach/pkg/util/hlc"
Expand All @@ -21,7 +22,8 @@ import (

// FakeStorage implements the sqlliveness.Storage interface.
type FakeStorage struct {
mu struct {
InsertSleep time.Duration
mu struct {
syncutil.Mutex
sessions map[sqlliveness.SessionID]hlc.Timestamp
}
Expand Down Expand Up @@ -50,6 +52,7 @@ func (s *FakeStorage) Insert(
) error {
s.mu.Lock()
defer s.mu.Unlock()
time.Sleep(s.InsertSleep)
if _, ok := s.mu.sessions[sid]; ok {
return errors.Errorf("session %s already exists", sid)
}
Expand All @@ -63,6 +66,7 @@ func (s *FakeStorage) Update(
) (bool, error) {
s.mu.Lock()
defer s.mu.Unlock()
time.Sleep(s.InsertSleep)
if _, ok := s.mu.sessions[sid]; !ok {
return false, nil
}
Expand Down

0 comments on commit 8eeebb5

Please sign in to comment.