Skip to content

Commit

Permalink
server/grpc_service: make the lock for UpdateServiceGCSafePoint sma…
Browse files Browse the repository at this point in the history
…ller (tikv#5128)

close tikv#5019

Signed-off-by: shirly <AndreMouche@126.com>

Co-authored-by: Ti Chi Robot <ti-community-prow-bot@tidb.io>
  • Loading branch information
2 people authored and CabinfeverB committed Jul 14, 2022
1 parent eac035d commit fb0e669
Show file tree
Hide file tree
Showing 4 changed files with 135 additions and 56 deletions.
63 changes: 42 additions & 21 deletions server/gc/safepoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,42 +15,35 @@
package gc

import (
"math"
"time"

"github.com/tikv/pd/pkg/syncutil"
"github.com/tikv/pd/server/storage/endpoint"
)

// SafePointManager is the manager for safePoint of GC and services
// SafePointManager is the manager for safePoint of GC and services.
type SafePointManager struct {
*gcSafePointManager
// TODO add ServiceSafepointManager
}

// NewSafepointManager creates a SafePointManager of GC and services
func NewSafepointManager(store endpoint.GCSafePointStorage) *SafePointManager {
return &SafePointManager{
newGCSafePointManager(store),
}
gcLock syncutil.Mutex
serviceGCLock syncutil.Mutex
store endpoint.GCSafePointStorage
}

type gcSafePointManager struct {
syncutil.Mutex
store endpoint.GCSafePointStorage
}

func newGCSafePointManager(store endpoint.GCSafePointStorage) *gcSafePointManager {
return &gcSafePointManager{store: store}
// NewSafePointManager creates a SafePointManager of GC and services.
func NewSafePointManager(store endpoint.GCSafePointStorage) *SafePointManager {
return &SafePointManager{store: store}
}

// LoadGCSafePoint loads current GC safe point from storage.
func (manager *gcSafePointManager) LoadGCSafePoint() (uint64, error) {
func (manager *SafePointManager) LoadGCSafePoint() (uint64, error) {
return manager.store.LoadGCSafePoint()
}

// UpdateGCSafePoint updates the safepoint if it is greater than the previous one
// it returns the old safepoint in the storage.
func (manager *gcSafePointManager) UpdateGCSafePoint(newSafePoint uint64) (oldSafePoint uint64, err error) {
manager.Lock()
defer manager.Unlock()
func (manager *SafePointManager) UpdateGCSafePoint(newSafePoint uint64) (oldSafePoint uint64, err error) {
manager.gcLock.Lock()
defer manager.gcLock.Unlock()
// TODO: cache the safepoint in the storage.
oldSafePoint, err = manager.store.LoadGCSafePoint()
if err != nil {
Expand All @@ -62,3 +55,31 @@ func (manager *gcSafePointManager) UpdateGCSafePoint(newSafePoint uint64) (oldSa
err = manager.store.SaveGCSafePoint(newSafePoint)
return
}

// UpdateServiceGCSafePoint update the safepoint for a specific service.
func (manager *SafePointManager) UpdateServiceGCSafePoint(serviceID string, newSafePoint uint64, ttl int64, now time.Time) (minServiceSafePoint *endpoint.ServiceSafePoint, updated bool, err error) {
manager.serviceGCLock.Lock()
defer manager.serviceGCLock.Unlock()
minServiceSafePoint, err = manager.store.LoadMinServiceGCSafePoint(now)
if err != nil || ttl <= 0 || newSafePoint < minServiceSafePoint.SafePoint {
return minServiceSafePoint, false, err
}

ssp := &endpoint.ServiceSafePoint{
ServiceID: serviceID,
ExpiredAt: now.Unix() + ttl,
SafePoint: newSafePoint,
}
if math.MaxInt64-now.Unix() <= ttl {
ssp.ExpiredAt = math.MaxInt64
}
if err := manager.store.SaveServiceGCSafePoint(ssp); err != nil {
return nil, false, err
}

// If the min safePoint is updated, load the next one.
if serviceID == minServiceSafePoint.ServiceID {
minServiceSafePoint, err = manager.store.LoadMinServiceGCSafePoint(now)
}
return minServiceSafePoint, true, err
}
88 changes: 86 additions & 2 deletions server/gc/safepoint_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,10 @@
package gc

import (
"math"
"sync"
"testing"
"time"

"github.com/stretchr/testify/require"
"github.com/tikv/pd/server/storage/endpoint"
Expand All @@ -28,7 +30,7 @@ func newGCStorage() endpoint.GCSafePointStorage {
}

func TestGCSafePointUpdateSequentially(t *testing.T) {
gcSafePointManager := newGCSafePointManager(newGCStorage())
gcSafePointManager := NewSafePointManager(newGCStorage())
re := require.New(t)
curSafePoint := uint64(0)
// update gc safePoint with asc value.
Expand Down Expand Up @@ -57,7 +59,7 @@ func TestGCSafePointUpdateSequentially(t *testing.T) {
}

func TestGCSafePointUpdateCurrently(t *testing.T) {
gcSafePointManager := newGCSafePointManager(newGCStorage())
gcSafePointManager := NewSafePointManager(newGCStorage())
maxSafePoint := uint64(1000)
wg := sync.WaitGroup{}
re := require.New(t)
Expand All @@ -78,3 +80,85 @@ func TestGCSafePointUpdateCurrently(t *testing.T) {
re.NoError(err)
re.Equal(maxSafePoint, safePoint)
}

func TestServiceGCSafePointUpdate(t *testing.T) {
re := require.New(t)
manager := NewSafePointManager(newGCStorage())
gcworkerServiceID := "gc_worker"
cdcServiceID := "cdc"
brServiceID := "br"
cdcServiceSafePoint := uint64(10)
gcWorkerSafePoint := uint64(8)
brSafePoint := uint64(15)

wg := sync.WaitGroup{}
wg.Add(5)
// update the safepoint for cdc to 10 should success
go func() {
defer wg.Done()
min, updated, err := manager.UpdateServiceGCSafePoint(cdcServiceID, cdcServiceSafePoint, 10000, time.Now())
re.NoError(err)
re.True(updated)
// the service will init the service safepoint to 0(<10 for cdc) for gc_worker.
re.Equal(gcworkerServiceID, min.ServiceID)
}()

// update the safepoint for br to 15 should success
go func() {
defer wg.Done()
min, updated, err := manager.UpdateServiceGCSafePoint(brServiceID, brSafePoint, 10000, time.Now())
re.NoError(err)
re.True(updated)
// the service will init the service safepoint to 0(<10 for cdc) for gc_worker.
re.Equal(gcworkerServiceID, min.ServiceID)
}()

// update safepoint to 8 for gc_woker should be success
go func() {
defer wg.Done()
// update with valid ttl for gc_worker should be success.
min, updated, _ := manager.UpdateServiceGCSafePoint(gcworkerServiceID, gcWorkerSafePoint, math.MaxInt64, time.Now())
re.True(updated)
// the current min safepoint should be 8 for gc_worker(cdc 10)
re.Equal(gcWorkerSafePoint, min.SafePoint)
re.Equal(gcworkerServiceID, min.ServiceID)
}()

go func() {
defer wg.Done()
// update safepoint of gc_worker's service with ttl not infinity should be failed.
_, updated, err := manager.UpdateServiceGCSafePoint(gcworkerServiceID, 10000, 10, time.Now())
re.Error(err)
re.False(updated)
}()

// update safepoint with negative ttl should be failed.
go func() {
defer wg.Done()
brTTL := int64(-100)
_, updated, err := manager.UpdateServiceGCSafePoint(brServiceID, uint64(10000), brTTL, time.Now())
re.NoError(err)
re.False(updated)
}()

wg.Wait()
// update safepoint to 15(>10 for cdc) for gc_worker
gcWorkerSafePoint = uint64(15)
min, updated, err := manager.UpdateServiceGCSafePoint(gcworkerServiceID, gcWorkerSafePoint, math.MaxInt64, time.Now())
re.NoError(err)
re.True(updated)
re.Equal(cdcServiceID, min.ServiceID)
re.Equal(cdcServiceSafePoint, min.SafePoint)

// the value shouldn't be updated with current safepoint smaller than the min safepoint.
brTTL := int64(100)
brSafePoint = min.SafePoint - 5
min, updated, err = manager.UpdateServiceGCSafePoint(brServiceID, brSafePoint, brTTL, time.Now())
re.NoError(err)
re.False(updated)

brSafePoint = min.SafePoint + 10
_, updated, err = manager.UpdateServiceGCSafePoint(brServiceID, brSafePoint, brTTL, time.Now())
re.NoError(err)
re.True(updated)
}
34 changes: 6 additions & 28 deletions server/grpc_service.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,6 @@ import (
"context"
"fmt"
"io"
"math"
"strconv"
"sync/atomic"
"time"
Expand Down Expand Up @@ -1358,8 +1357,6 @@ func (s *GrpcServer) UpdateGCSafePoint(ctx context.Context, request *pdpb.Update

// UpdateServiceGCSafePoint update the safepoint for specific service
func (s *GrpcServer) UpdateServiceGCSafePoint(ctx context.Context, request *pdpb.UpdateServiceGCSafePointRequest) (*pdpb.UpdateServiceGCSafePointResponse, error) {
s.serviceSafePointLock.Lock()
defer s.serviceSafePointLock.Unlock()
fn := func(ctx context.Context, client *grpc.ClientConn) (interface{}, error) {
return pdpb.NewPDClient(client).UpdateServiceGCSafePoint(ctx, request)
}
Expand All @@ -1385,36 +1382,17 @@ func (s *GrpcServer) UpdateServiceGCSafePoint(ctx context.Context, request *pdpb
return nil, err
}
now, _ := tsoutil.ParseTimestamp(nowTSO)
min, err := storage.LoadMinServiceGCSafePoint(now)
serviceID := string(request.ServiceId)
min, updated, err := s.gcSafePointManager.UpdateServiceGCSafePoint(serviceID, request.GetSafePoint(), request.GetTTL(), now)
if err != nil {
return nil, err
}

if request.TTL > 0 && request.SafePoint >= min.SafePoint {
ssp := &endpoint.ServiceSafePoint{
ServiceID: string(request.ServiceId),
ExpiredAt: now.Unix() + request.TTL,
SafePoint: request.SafePoint,
}
if math.MaxInt64-now.Unix() <= request.TTL {
ssp.ExpiredAt = math.MaxInt64
}
if err := storage.SaveServiceGCSafePoint(ssp); err != nil {
return nil, err
}
if updated {
log.Info("update service GC safe point",
zap.String("service-id", ssp.ServiceID),
zap.Int64("expire-at", ssp.ExpiredAt),
zap.Uint64("safepoint", ssp.SafePoint))
// If the min safepoint is updated, load the next one
if string(request.ServiceId) == min.ServiceID {
min, err = storage.LoadMinServiceGCSafePoint(now)
if err != nil {
return nil, err
}
}
zap.String("service-id", serviceID),
zap.Int64("expire-at", now.Unix()+request.GetTTL()),
zap.Uint64("safepoint", request.GetSafePoint()))
}

return &pdpb.UpdateServiceGCSafePointResponse{
Header: s.header(),
ServiceId: []byte(min.ServiceID),
Expand Down
6 changes: 1 addition & 5 deletions server/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,6 @@ import (
"github.com/tikv/pd/pkg/grpcutil"
"github.com/tikv/pd/pkg/logutil"
"github.com/tikv/pd/pkg/ratelimit"
"github.com/tikv/pd/pkg/syncutil"
"github.com/tikv/pd/pkg/systimemon"
"github.com/tikv/pd/pkg/typeutil"
"github.com/tikv/pd/server/cluster"
Expand Down Expand Up @@ -146,9 +145,6 @@ type Server struct {
startCallbacks []func()
closeCallbacks []func()

// serviceSafePointLock is a lock for UpdateServiceGCSafePoint
serviceSafePointLock syncutil.Mutex

// hot region history info storeage
hotRegionStorage *storage.HotRegionStorage
// Store as map[string]*grpc.ClientConn
Expand Down Expand Up @@ -404,7 +400,7 @@ func (s *Server) startServer(ctx context.Context) error {
}
defaultStorage := storage.NewStorageWithEtcdBackend(s.client, s.rootPath)
s.storage = storage.NewCoreStorage(defaultStorage, regionStorage)
s.gcSafePointManager = gc.NewSafepointManager(s.storage)
s.gcSafePointManager = gc.NewSafePointManager(s.storage)
s.basicCluster = core.NewBasicCluster()
s.cluster = cluster.NewRaftCluster(ctx, s.clusterID, syncer.NewRegionSyncer(s), s.client, s.httpClient)
s.hbStreams = hbstream.NewHeartbeatStreams(ctx, s.clusterID, s.cluster)
Expand Down

0 comments on commit fb0e669

Please sign in to comment.