-
Notifications
You must be signed in to change notification settings - Fork 1
/
scheduler.go
112 lines (98 loc) · 3.17 KB
/
scheduler.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
// Copyright (C) 2019-2023, Ava Labs, Inc. All rights reserved.
// See the file LICENSE for licensing terms.
package scheduler
import (
"time"
"go.uber.org/zap"
"github.com/DioneProtocol/odysseygo/snow/engine/common"
"github.com/DioneProtocol/odysseygo/utils/logging"
)
type Scheduler interface {
Dispatch(startTime time.Time)
// Client must guarantee that [SetBuildBlockTime]
// is never called after [Close]
SetBuildBlockTime(t time.Time)
Close()
}
// Scheduler receives notifications from a VM that it wants its engine to call
// the VM's BuildBlock method, and delivers the notification to the engine only
// when the engine should call BuildBlock. Namely, when this node is allowed to
// propose a block under the congestion control mechanism.
type scheduler struct {
log logging.Logger
// The VM sends a message on this channel when it wants to tell the engine
// that the engine should call the VM's BuildBlock method
fromVM <-chan common.Message
// The scheduler sends a message on this channel to notify the engine that
// it should call its VM's BuildBlock method
toEngine chan<- common.Message
// When we receive a message on this channel, it means that we must refrain
// from telling the engine to call its VM's BuildBlock method until the
// given time
newBuildBlockTime chan time.Time
}
func New(log logging.Logger, toEngine chan<- common.Message) (Scheduler, chan<- common.Message) {
vmToEngine := make(chan common.Message, cap(toEngine))
return &scheduler{
log: log,
fromVM: vmToEngine,
toEngine: toEngine,
newBuildBlockTime: make(chan time.Time),
}, vmToEngine
}
func (s *scheduler) Dispatch(buildBlockTime time.Time) {
timer := time.NewTimer(time.Until(buildBlockTime))
waitloop:
for {
select {
case <-timer.C: // It's time to tell the engine to try to build a block
case buildBlockTime, ok := <-s.newBuildBlockTime:
// Stop the timer and clear [timer.C] if needed
if !timer.Stop() {
<-timer.C
}
if !ok {
// s.Close() was called
return
}
// The time at which we should notify the engine that it should try
// to build a block has changed
timer.Reset(time.Until(buildBlockTime))
continue waitloop
}
for {
select {
case msg := <-s.fromVM:
// Give the engine the message from the VM asking the engine to
// build a block
select {
case s.toEngine <- msg:
default:
// If the channel to the engine is full, drop the message
// from the VM to avoid deadlock
s.log.Debug("dropping message from VM",
zap.String("reason", "channel to engine is full"),
zap.Stringer("messageString", msg),
)
}
case buildBlockTime, ok := <-s.newBuildBlockTime:
// The time at which we should notify the engine that it should
// try to build a block has changed
if !ok {
// s.Close() was called
return
}
// We know [timer.C] was drained in the first select statement
// so its safe to call [timer.Reset]
timer.Reset(time.Until(buildBlockTime))
continue waitloop
}
}
}
}
func (s *scheduler) SetBuildBlockTime(t time.Time) {
s.newBuildBlockTime <- t
}
func (s *scheduler) Close() {
close(s.newBuildBlockTime)
}