Skip to content

Commit

Permalink
sqlliveness: add timeouts to heartbeats
Browse files Browse the repository at this point in the history
Previously, sqlliveness heartbeat operations could block on the transactions
that were involved. This change introduces some timeouts of the length of the
heartbeat during the create and refresh operations.

Resolves #85541

Release note: None

Release justification: low-risk bugfix to existing functionality
  • Loading branch information
dhartunian authored and aadityasondhi committed Sep 12, 2022
1 parent fd588f6 commit 34bf5ef
Show file tree
Hide file tree
Showing 5 changed files with 188 additions and 7 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -930,7 +930,7 @@ func TestSQLLivenessExemption(t *testing.T) {
// Make the tenant heartbeat like crazy.
ctx := context.Background()
//slinstance.DefaultTTL.Override(ctx, &st.SV, 20*time.Millisecond)
slinstance.DefaultHeartBeat.Override(ctx, &st.SV, time.Millisecond)
slinstance.DefaultHeartBeat.Override(ctx, &st.SV, 10*time.Millisecond)

_, tenantDB := serverutils.StartTenant(t, hostServer, base.TestTenantArgs{
TenantID: tenantID,
Expand Down
1 change: 1 addition & 0 deletions pkg/sql/sqlliveness/slinstance/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ go_library(
"//pkg/util/syncutil",
"//pkg/util/timeutil",
"//pkg/util/uuid",
"@com_github_cockroachdb_errors//:errors",
],
)

Expand Down
55 changes: 52 additions & 3 deletions pkg/sql/sqlliveness/slinstance/slinstance.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (
"github.com/cockroachdb/cockroach/pkg/util/syncutil"
"github.com/cockroachdb/cockroach/pkg/util/timeutil"
"github.com/cockroachdb/cockroach/pkg/util/uuid"
"github.com/cockroachdb/errors"
)

var (
Expand Down Expand Up @@ -237,6 +238,48 @@ func (l *Instance) extendSession(ctx context.Context, s *session) (bool, error)
return true, nil
}

func (l *Instance) createSessionWithTimeout(
ctx context.Context, timeout time.Duration,
) (*session, error) {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, timeout) // nolint:context
defer cancel()
var s *session
var err error
createChan := make(chan struct{})
go func() {
s, err = l.createSession(ctx)
close(createChan)
}()
select {
case <-ctx.Done():
return nil, ctx.Err()
case <-createChan:
return s, err
}
}

func (l *Instance) extendSessionWithTimeout(
ctx context.Context, s *session, timeout time.Duration,
) (bool, error) {
var cancel context.CancelFunc
ctx, cancel = context.WithTimeout(ctx, timeout) // nolint:context
defer cancel()
var found bool
var err error
extendChan := make(chan struct{})
go func() {
found, err = l.extendSession(ctx, s)
close(extendChan)
}()
select {
case <-ctx.Done():
return false, ctx.Err()
case <-extendChan:
return found, err
}
}

func (l *Instance) heartbeatLoop(ctx context.Context) {
defer func() {
log.Warning(ctx, "exiting heartbeat loop")
Expand All @@ -253,8 +296,10 @@ func (l *Instance) heartbeatLoop(ctx context.Context) {
t.Read = true
s, _ := l.getSessionOrBlockCh()
if s == nil {
newSession, err := l.createSession(ctx)
var newSession *session
newSession, err := l.createSessionWithTimeout(ctx, l.hb())
if err != nil {
log.Errorf(ctx, "sqlliveness failed to create new session: %v", err)
func() {
l.mu.Lock()
defer l.mu.Unlock()
Expand All @@ -270,12 +315,16 @@ func (l *Instance) heartbeatLoop(ctx context.Context) {
t.Reset(l.hb())
continue
}
found, err := l.extendSession(ctx, s)
if err != nil {
found, err := l.extendSessionWithTimeout(ctx, s, l.hb())
if err != nil && errors.Is(err, context.DeadlineExceeded) {
// Unable to extend session due to unknown error.
// Clear and stop heartbeat loop.
log.Errorf(ctx, "sqlliveness failed to extend session: %v", err)
l.clearSession(ctx)
return
}
if !found {
// No existing session found, immediately create one.
l.clearSession(ctx)
// Start next loop iteration immediately to insert a new session.
t.Reset(0)
Expand Down
124 changes: 122 additions & 2 deletions pkg/sql/sqlliveness/slinstance/slinstance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,8 +40,8 @@ func TestSQLInstance(t *testing.T) {
clusterversion.TestingBinaryVersion,
clusterversion.TestingBinaryMinSupportedVersion,
true /* initializeVersion */)
slinstance.DefaultTTL.Override(ctx, &settings.SV, 2*time.Microsecond)
slinstance.DefaultHeartBeat.Override(ctx, &settings.SV, time.Microsecond)
slinstance.DefaultTTL.Override(ctx, &settings.SV, 2*time.Millisecond)
slinstance.DefaultHeartBeat.Override(ctx, &settings.SV, 1*time.Millisecond)

fakeStorage := slstorage.NewFakeStorage()
sqlInstance := slinstance.NewSQLInstance(stopper, clock, fakeStorage, settings, nil)
Expand Down Expand Up @@ -91,3 +91,123 @@ func TestSQLInstance(t *testing.T) {
_, err = sqlInstance.Session(ctx)
require.Error(t, err)
}

// TestSQLInstanceDeadlines tests that we have proper deadlines set on the
// create and extend session operations. This is done by blocking the fake
// storage layer and ensuring that no sessions get created because the
// timeouts are constantly triggered.
func TestSQLInstanceDeadlines(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx, stopper := context.Background(), stop.NewStopper()
defer stopper.Stop(ctx)

clock := hlc.NewClock(timeutil.NewManualTime(timeutil.Unix(0, 42)), time.Nanosecond /* maxOffset */)
settings := cluster.MakeTestingClusterSettingsWithVersions(
clusterversion.TestingBinaryVersion,
clusterversion.TestingBinaryMinSupportedVersion,
true /* initializeVersion */)
slinstance.DefaultTTL.Override(ctx, &settings.SV, 2*time.Millisecond)
slinstance.DefaultHeartBeat.Override(ctx, &settings.SV, 1*time.Millisecond)

fakeStorage := slstorage.NewFakeStorage()
// block the fake storage
fakeStorage.BlockCh = make(chan chan struct{})
cleanUpFunc := func() {
ch := <-fakeStorage.BlockCh
close(ch)
close(fakeStorage.BlockCh)
fakeStorage.BlockCh = nil
}
defer cleanUpFunc()

sqlInstance := slinstance.NewSQLInstance(stopper, clock, fakeStorage, settings, nil)
sqlInstance.Start(ctx)

// verify that we do not create a session
require.Never(
t,
func() bool {
_, err := sqlInstance.Session(ctx)
if err != nil {
return false
}
return true
},
10*time.Millisecond, 1*time.Millisecond,
)
}

// TestSQLInstanceDeadlinesExtend tests that we have proper deadlines set on the
// create and extend session operations. This tests the case where the session is
// successfully created first and then blocks indefinitely.
func TestSQLInstanceDeadlinesExtend(t *testing.T) {
defer leaktest.AfterTest(t)()
defer log.Scope(t).Close(t)

ctx, stopper := context.Background(), stop.NewStopper()
defer stopper.Stop(ctx)

clock := hlc.NewClock(timeutil.NewManualTime(timeutil.Unix(0, 42)), time.Nanosecond /* maxOffset */)
settings := cluster.MakeTestingClusterSettingsWithVersions(
clusterversion.TestingBinaryVersion,
clusterversion.TestingBinaryMinSupportedVersion,
true /* initializeVersion */)
slinstance.DefaultTTL.Override(ctx, &settings.SV, 2*time.Millisecond)
// Must be shorter than the storage sleep amount below
slinstance.DefaultHeartBeat.Override(ctx, &settings.SV, 1*time.Millisecond)

fakeStorage := slstorage.NewFakeStorage()
sqlInstance := slinstance.NewSQLInstance(stopper, clock, fakeStorage, settings, nil)
sqlInstance.Start(ctx)

// verify that eventually session is created successfully
require.Eventually(
t,
func() bool {
_, err := sqlInstance.Session(ctx)
if err != nil {
return false
}
return true
},
10*time.Millisecond, 1*time.Millisecond,
)

// verify that session is also extended successfully a few times
require.Never(
t,
func() bool {
_, err := sqlInstance.Session(ctx)
if err != nil {
return true
}
return false
},
10*time.Millisecond, 1*time.Millisecond,
)

// block the fake storage
fakeStorage.BlockCh = make(chan chan struct{})
cleanUpFunc := func() {
ch := <-fakeStorage.BlockCh
close(ch)
close(fakeStorage.BlockCh)
fakeStorage.BlockCh = nil
}
defer cleanUpFunc()

// expect subsequent create/extend calls to fail
require.Eventually(
t,
func() bool {
_, err := sqlInstance.Session(ctx)
if err != nil {
return true
}
return false
},
10*time.Millisecond, 1*time.Millisecond,
)
}
13 changes: 12 additions & 1 deletion pkg/sql/sqlliveness/slstorage/test_helpers.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,7 +21,8 @@ import (

// FakeStorage implements the sqlliveness.Storage interface.
type FakeStorage struct {
mu struct {
BlockCh chan chan struct{}
mu struct {
syncutil.Mutex
sessions map[sqlliveness.SessionID]hlc.Timestamp
}
Expand Down Expand Up @@ -50,6 +51,11 @@ func (s *FakeStorage) Insert(
) error {
s.mu.Lock()
defer s.mu.Unlock()
if s.BlockCh != nil {
ch := make(chan struct{})
s.BlockCh <- ch
<-ch
}
if _, ok := s.mu.sessions[sid]; ok {
return errors.Errorf("session %s already exists", sid)
}
Expand All @@ -63,6 +69,11 @@ func (s *FakeStorage) Update(
) (bool, error) {
s.mu.Lock()
defer s.mu.Unlock()
if s.BlockCh != nil {
ch := make(chan struct{})
s.BlockCh <- ch
<-ch
}
if _, ok := s.mu.sessions[sid]; !ok {
return false, nil
}
Expand Down

0 comments on commit 34bf5ef

Please sign in to comment.