Skip to content

Commit

Permalink
Merge 209ad5f into 0ae08c9
Browse files Browse the repository at this point in the history
  • Loading branch information
pashagolub committed Feb 22, 2022
2 parents 0ae08c9 + 209ad5f commit 44bdc1d
Show file tree
Hide file tree
Showing 3 changed files with 23 additions and 28 deletions.
7 changes: 1 addition & 6 deletions internal/scheduler/chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -156,12 +156,7 @@ func (sch *Scheduler) chainWorker(ctx context.Context, chains <-chan Chain) {
}

func getTimeoutContext(ctx context.Context, t1 int, t2 int) (context.Context, context.CancelFunc) {
var timeout int
if t1 > t2 {
timeout = t1
} else {
timeout = t2
}
timeout := Max(t1, t2)
if timeout > 0 {
return context.WithTimeout(ctx, time.Millisecond*time.Duration(timeout))
}
Expand Down
6 changes: 4 additions & 2 deletions internal/scheduler/interval_chain.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,14 +26,16 @@ func (ichain IntervalChain) isListed(ichains []IntervalChain) bool {
// SendIntervalChain sends interval chain to the channel for workers
func (sch *Scheduler) SendIntervalChain(c IntervalChain) {
select {
case sch.intervalChainsChan <- c:
case sch.ichainsChan <- c:
sch.l.WithField("chain", c.ChainID).Debug("Sent interval chain to the execution channel")
default:
sch.l.WithField("chain", c.ChainID).Error("Failed to send interval chain to the execution channel")
}
}

func (sch *Scheduler) isValid(ichain IntervalChain) bool {
sch.intervalChainMutex.Lock()
defer sch.intervalChainMutex.Unlock()
return (IntervalChain{}) != sch.intervalChains[ichain.ChainID]
}

Expand All @@ -54,7 +56,6 @@ func (sch *Scheduler) reschedule(ctx context.Context, ichain IntervalChain) {
}

func (sch *Scheduler) retrieveIntervalChainsAndRun(ctx context.Context) {
sch.intervalChainMutex.Lock()
ichains := []IntervalChain{}
err := sch.pgengine.SelectIntervalChains(ctx, &ichains)
if err != nil {
Expand All @@ -64,6 +65,7 @@ func (sch *Scheduler) retrieveIntervalChainsAndRun(ctx context.Context) {
}

// delete chains that are not returned from the database
sch.intervalChainMutex.Lock()
for id, ichain := range sch.intervalChains {
if !ichain.isListed(ichains) {
delete(sch.intervalChains, id)
Expand Down
38 changes: 18 additions & 20 deletions internal/scheduler/scheduler.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,23 +30,21 @@ const (

// Scheduler is the main class for running the tasks
type Scheduler struct {
l log.LoggerIface
chainsChan chan Chain // channel for passing chains to workers
pgengine *pgengine.PgEngine
pgengine *pgengine.PgEngine
l log.LoggerIface
chainsChan chan Chain // channel for passing chains to workers
ichainsChan chan IntervalChain // channel for passing interval chains to workers

exclusiveMutex sync.RWMutex //read-write mutex for running regular and exclusive chains

// activeChains holds the map of chain ID with context cancel() function, so we can abort chain by request
activeChains map[int]func()
activeChains map[int]func() // map of chain ID with context cancel() function to abort chain by request
activeChainMutex sync.Mutex

// map of active chains, updated every minute
intervalChains map[int]IntervalChain
// create channel for passing interval chains to workers
intervalChainsChan chan IntervalChain
intervalChains map[int]IntervalChain // map of active chains, updated every minute
intervalChainMutex sync.Mutex
shutdown chan struct{} // closed when shutdown is called
status RunStatus

shutdown chan struct{} // closed when shutdown is called
status RunStatus
}

// Max returns the maximum number of two arguments
Expand All @@ -60,14 +58,14 @@ func Max(x, y int) int {
// New returns a new instance of Scheduler
func New(pge *pgengine.PgEngine, logger log.LoggerIface) *Scheduler {
return &Scheduler{
l: logger,
pgengine: pge,
chainsChan: make(chan Chain, Max(minChannelCapacity, pge.Resource.CronWorkers*2)),
intervalChainsChan: make(chan IntervalChain, Max(minChannelCapacity, pge.Resource.IntervalWorkers*2)),
activeChains: make(map[int]func()), //holds cancel() functions to stop chains
intervalChains: make(map[int]IntervalChain),
shutdown: make(chan struct{}),
status: RunningStatus,
l: logger,
pgengine: pge,
chainsChan: make(chan Chain, Max(minChannelCapacity, pge.Resource.CronWorkers*2)),
ichainsChan: make(chan IntervalChain, Max(minChannelCapacity, pge.Resource.IntervalWorkers*2)),
activeChains: make(map[int]func()), //holds cancel() functions to stop chains
intervalChains: make(map[int]IntervalChain),
shutdown: make(chan struct{}),
status: RunningStatus,
}
}

Expand Down Expand Up @@ -98,7 +96,7 @@ func (sch *Scheduler) Run(ctx context.Context) RunStatus {
for w := 1; w <= sch.Config().Resource.IntervalWorkers; w++ {
workerCtx, cancel := context.WithCancel(ctx)
defer cancel()
go sch.intervalChainWorker(workerCtx, sch.intervalChainsChan)
go sch.intervalChainWorker(workerCtx, sch.ichainsChan)
}
ctx = log.WithLogger(ctx, sch.l)

Expand Down

0 comments on commit 44bdc1d

Please sign in to comment.