Skip to content

Commit

Permalink
feat: remove task and host gc ttl (#1735)
Browse files Browse the repository at this point in the history
Signed-off-by: Gaius <gaius.qi@gmail.com>
  • Loading branch information
gaius-qi committed Oct 10, 2022
1 parent 0b36d30 commit 088f4e6
Show file tree
Hide file tree
Showing 17 changed files with 129 additions and 117 deletions.
10 changes: 3 additions & 7 deletions deploy/docker-compose/template/scheduler.template.yaml
Expand Up @@ -47,17 +47,13 @@ scheduler:
# GC metadata configuration.
gc:
# peerGCInterval is peer's gc interval.
peerGCInterval: 10m
peerGCInterval: 10s
# peerTTL is peer's TTL duration.
peerTTL: 12h
peerTTL: 24h
# taskGCInterval is task's gc interval.
taskGCInterval: 10m
# taskTTL is task's TTL duration.
taskTTL: 24h
# hostGCInterval is host's gc interval.
hostGCInterval: 30m
# hostTTL is host's TTL duration.
hostTTL: 48h
hostGCInterval: 1h

# Dynamic data configuration.
dynConfig:
Expand Down
20 changes: 6 additions & 14 deletions scheduler/config/config.go
Expand Up @@ -147,14 +147,8 @@ type GCConfig struct {
// Task gc interval.
TaskGCInterval time.Duration `yaml:"taskGCInterval" mapstructure:"taskGCInterval"`

// Task time to live.
TaskTTL time.Duration `yaml:"taskTTL" mapstructure:"taskTTL"`

// Host gc interval.
HostGCInterval time.Duration `yaml:"hostGCInterval" mapstructure:"hostGCInterval"`

// Host time to live.
HostTTL time.Duration `yaml:"hostTTL" mapstructure:"hostTTL"`
}

type DynConfig struct {
Expand Down Expand Up @@ -305,9 +299,7 @@ func New() *Config {
PeerGCInterval: DefaultSchedulerPeerGCInterval,
PeerTTL: DefaultSchedulerPeerTTL,
TaskGCInterval: DefaultSchedulerTaskGCInterval,
TaskTTL: DefaultSchedulerTaskTTL,
HostGCInterval: DefaultSchedulerHostGCInterval,
HostTTL: DefaultSchedulerHostTTL,
},
Training: TrainingConfig{
Enable: false,
Expand Down Expand Up @@ -393,20 +385,20 @@ func (cfg *Config) Validate() error {
return errors.New("scheduler requires parameter retryInterval")
}

if cfg.Scheduler.GC.PeerGCInterval <= 0 {
return errors.New("scheduler requires parameter peerGCInterval")
}

if cfg.Scheduler.GC.PeerTTL <= 0 {
return errors.New("scheduler requires parameter peerTTL")
}

if cfg.Scheduler.GC.PeerGCInterval <= 0 {
return errors.New("scheduler requires parameter peerGCInterval")
}

if cfg.Scheduler.GC.TaskGCInterval <= 0 {
return errors.New("scheduler requires parameter taskGCInterval")
}

if cfg.Scheduler.GC.TaskTTL <= 0 {
return errors.New("scheduler requires parameter taskTTL")
if cfg.Scheduler.GC.HostGCInterval <= 0 {
return errors.New("scheduler requires parameter hostGCInterval")
}

if cfg.Scheduler.Training.Enable {
Expand Down
2 changes: 0 additions & 2 deletions scheduler/config/config_test.go
Expand Up @@ -40,9 +40,7 @@ func TestConfig_Load(t *testing.T) {
PeerGCInterval: 1 * time.Minute,
PeerTTL: 5 * time.Minute,
TaskGCInterval: 1 * time.Minute,
TaskTTL: 10 * time.Minute,
HostGCInterval: 1 * time.Minute,
HostTTL: 10 * time.Minute,
},
Training: TrainingConfig{
Enable: true,
Expand Down
12 changes: 3 additions & 9 deletions scheduler/config/constants.go
Expand Up @@ -59,22 +59,16 @@ const (
DefaultSchedulerRetryInterval = 50 * time.Millisecond

// DefaultSchedulerPeerGCInterval is default interval for peer gc.
DefaultSchedulerPeerGCInterval = 10 * time.Minute
DefaultSchedulerPeerGCInterval = 10 * time.Second

// DefaultSchedulerPeerTTL is default ttl for peer.
DefaultSchedulerPeerTTL = 24 * time.Hour

// DefaultSchedulerTaskGCInterval is default interval for task gc.
DefaultSchedulerTaskGCInterval = 10 * time.Minute

// DefaultSchedulerTaskTTL is default ttl for task.
DefaultSchedulerTaskTTL = 24 * time.Hour
DefaultSchedulerTaskGCInterval = 30 * time.Minute

// DefaultSchedulerHostGCInterval is default interval for host gc.
DefaultSchedulerHostGCInterval = 30 * time.Minute

// DefaultSchedulerHostTTL is default ttl for host.
DefaultSchedulerHostTTL = 48 * time.Hour
DefaultSchedulerHostGCInterval = 1 * time.Hour

// DefaultRefreshModelInterval is model refresh interval.
DefaultRefreshModelInterval = 168 * time.Hour
Expand Down
2 changes: 0 additions & 2 deletions scheduler/config/testdata/scheduler.yaml
Expand Up @@ -18,9 +18,7 @@ scheduler:
peerGCInterval: 60000000000
peerTTL: 300000000000
taskGCInterval: 60000000000
taskTTL: 600000000000
hostGCInterval: 60000000000
hostTTL: 600000000000
training:
enable: true
enableAutoRefresh: true
Expand Down
15 changes: 6 additions & 9 deletions scheduler/resource/host_manager.go
Expand Up @@ -20,7 +20,6 @@ package resource

import (
"sync"
"time"

pkggc "d7y.io/dragonfly/v2/pkg/gc"
"d7y.io/dragonfly/v2/scheduler/config"
Expand Down Expand Up @@ -53,16 +52,12 @@ type HostManager interface {
type hostManager struct {
// Host sync map.
*sync.Map

// Host time to live.
ttl time.Duration
}

// New host manager interface.
func newHostManager(cfg *config.GCConfig, gc pkggc.GC) (HostManager, error) {
h := &hostManager{
Map: &sync.Map{},
ttl: cfg.HostTTL,
}

if err := gc.Add(pkggc.Task{
Expand Down Expand Up @@ -101,11 +96,13 @@ func (h *hostManager) Delete(key string) {

func (h *hostManager) RunGC() error {
h.Map.Range(func(_, value any) bool {
host := value.(*Host)
elapsed := time.Since(host.UpdateAt.Load())
host, ok := value.(*Host)
if !ok {
host.Log.Error("invalid host")
return true
}

if elapsed > h.ttl &&
host.PeerCount.Load() == 0 &&
if host.PeerCount.Load() == 0 &&
host.UploadPeerCount.Load() == 0 &&
host.Type == HostTypeNormal {
host.Log.Info("host has been reclaimed")
Expand Down
1 change: 0 additions & 1 deletion scheduler/resource/host_manager_test.go
Expand Up @@ -34,7 +34,6 @@ import (
var (
mockHostGCConfig = &config.GCConfig{
HostGCInterval: 1 * time.Second,
HostTTL: 1 * time.Microsecond,
}
)

Expand Down
43 changes: 29 additions & 14 deletions scheduler/resource/peer_manager.go
Expand Up @@ -126,30 +126,41 @@ func (p *peerManager) Delete(key string) {
func (p *peerManager) RunGC() error {
p.Map.Range(func(_, value any) bool {
peer := value.(*Peer)
elapsed := time.Since(peer.UpdateAt.Load())

// If the peer state is PeerStateLeave,
// peer will be reclaimed.
if peer.FSM.Is(PeerStateLeave) {
p.Delete(peer.ID)
peer.Log.Info("peer has been reclaimed")
return true
}

// If the peer's elapsed exceeds the ttl,
// first set the peer state to PeerStateLeave and then delete peer.
elapsed := time.Since(peer.UpdateAt.Load())
if elapsed > p.ttl {
// If the status is PeerStateLeave,
// clear peer information.
if peer.FSM.Is(PeerStateLeave) {
p.Delete(peer.ID)
peer.Log.Info("peer has been reclaimed")
return true
}

// If the peer is not leave,
// first change the state to PeerEventLeave.
if err := peer.FSM.Event(PeerEventLeave); err != nil {
peer.Log.Errorf("peer fsm event failed: %s", err.Error())
return true
}

peer.Log.Info("gc causes the peer to leave")
peer.Log.Info("peer elapsed exceeds the ttl, causing the peer to leave")
return true
}

// If the peer's state is PeerStateFailed,
// first set the peer state to PeerStateLeave and then delete peer.
if peer.FSM.Is(PeerStateFailed) {
if err := peer.FSM.Event(PeerEventLeave); err != nil {
peer.Log.Errorf("peer fsm event failed: %s", err.Error())
return true
}

peer.Log.Info("peer state is PeerStateFailed, causing the peer to leave")
}

// If no peer exists in the dag of the task,
// delete the peer.
degree, err := peer.Task.PeerDegree(peer.ID)
Expand All @@ -160,12 +171,16 @@ func (p *peerManager) RunGC() error {
}

// If the task dag size exceeds the limit,
// then delete peers which state is PeerStateSucceeded and PeerStateFailed,
// and degree is zero.
// then set the peer state to PeerStateLeave which state is
// PeerStateSucceeded and PeerStateFailed, and degree is zero.
if peer.Task.PeerCount() > PeerCountLimitForTask &&
(peer.FSM.Is(PeerStateSucceeded) || peer.FSM.Is(PeerStateFailed)) && degree == 0 {
p.Delete(peer.ID)
peer.Log.Info("peer has been reclaimed")
if err := peer.FSM.Event(PeerEventLeave); err != nil {
peer.Log.Errorf("peer fsm event failed: %s", err.Error())
return true
}

peer.Log.Info("task dag size exceeds the limit, causing the peer to leave")
return true
}

Expand Down
26 changes: 24 additions & 2 deletions scheduler/resource/peer_manager_test.go
Expand Up @@ -364,6 +364,27 @@ func TestPeerManager_RunGC(t *testing.T) {
assert.Equal(ok, false)
},
},
{
name: "peer state is PeerStateFailed",
gcConfig: &config.GCConfig{
PeerGCInterval: 1 * time.Second,
PeerTTL: 1 * time.Microsecond,
},
mock: func(m *gc.MockGCMockRecorder) {
m.Add(gomock.Any()).Return(nil).Times(1)
},
expect: func(t *testing.T, peerManager PeerManager, mockHost *Host, mockTask *Task, mockPeer *Peer) {
assert := assert.New(t)
peerManager.Store(mockPeer)
mockPeer.FSM.SetState(PeerStateFailed)
err := peerManager.RunGC()
assert.NoError(err)

peer, ok := peerManager.Load(mockPeer.ID)
assert.Equal(ok, true)
assert.Equal(peer.FSM.Current(), PeerStateLeave)
},
},
{
name: "peer gets degree failed",
gcConfig: &config.GCConfig{
Expand Down Expand Up @@ -407,8 +428,9 @@ func TestPeerManager_RunGC(t *testing.T) {
err := peerManager.RunGC()
assert.NoError(err)

_, ok := peerManager.Load(mockPeer.ID)
assert.Equal(ok, false)
peer, ok := peerManager.Load(mockPeer.ID)
assert.Equal(ok, true)
assert.Equal(peer.FSM.Current(), PeerStateLeave)
},
},
}
Expand Down
2 changes: 0 additions & 2 deletions scheduler/resource/resource_test.go
Expand Up @@ -141,9 +141,7 @@ func TestResource_New(t *testing.T) {
PeerGCInterval: 100,
PeerTTL: 1000,
TaskGCInterval: 100,
TaskTTL: 1000,
HostGCInterval: 100,
HostTTL: 1000,
},
},
SeedPeer: config.SeedPeerConfig{
Expand Down
4 changes: 3 additions & 1 deletion scheduler/resource/task.go
Expand Up @@ -330,7 +330,9 @@ func (t *Task) HasAvailablePeer() bool {
continue
}

if peer.FSM.Is(PeerStateSucceeded) {
if peer.FSM.Is(PeerStateSucceeded) ||
peer.FSM.Is(PeerStateRunning) ||
peer.FSM.Is(PeerStateBackToSource) {
hasAvailablePeer = true
break
}
Expand Down
18 changes: 9 additions & 9 deletions scheduler/resource/task_manager.go
Expand Up @@ -20,7 +20,6 @@ package resource

import (
"sync"
"time"

pkggc "d7y.io/dragonfly/v2/pkg/gc"
"d7y.io/dragonfly/v2/scheduler/config"
Expand Down Expand Up @@ -53,16 +52,12 @@ type TaskManager interface {
type taskManager struct {
// Task sync map.
*sync.Map

// Task time to live.
ttl time.Duration
}

// New task manager interface.
func newTaskManager(cfg *config.GCConfig, gc pkggc.GC) (TaskManager, error) {
t := &taskManager{
Map: &sync.Map{},
ttl: cfg.TaskTTL,
}

if err := gc.Add(pkggc.Task{
Expand Down Expand Up @@ -101,10 +96,14 @@ func (t *taskManager) Delete(key string) {

func (t *taskManager) RunGC() error {
t.Map.Range(func(_, value any) bool {
task := value.(*Task)
elapsed := time.Since(task.UpdateAt.Load())
task, ok := value.(*Task)
if !ok {
task.Log.Error("invalid task")
return true
}

if elapsed > t.ttl && task.FSM.Is(TaskStateLeave) {
// If task state is TaskStateLeave, it will be reclaimed.
if task.FSM.Is(TaskStateLeave) {
task.Log.Info("task has been reclaimed")
t.Delete(task.ID)
return true
Expand All @@ -116,9 +115,10 @@ func (t *taskManager) RunGC() error {
task.Log.Errorf("task fsm event failed: %s", err.Error())
return true
}

task.Log.Info("task peer count is zero, causing the task to leave")
}

task.Log.Info("gc causes the task to leave")
return true
})

Expand Down
1 change: 0 additions & 1 deletion scheduler/resource/task_manager_test.go
Expand Up @@ -34,7 +34,6 @@ import (
var (
mockTaskGCConfig = &config.GCConfig{
TaskGCInterval: 1 * time.Second,
TaskTTL: 1 * time.Microsecond,
}
)

Expand Down

0 comments on commit 088f4e6

Please sign in to comment.