Skip to content
This repository has been archived by the owner on Feb 27, 2023. It is now read-only.

Commit

Permalink
add implement of preheat
Browse files Browse the repository at this point in the history
Signed-off-by: 玉海 <yuhai.lyh@alibaba-inc.com>
  • Loading branch information
ansinlee committed Aug 10, 2020
1 parent 4906cd1 commit 1e77334
Show file tree
Hide file tree
Showing 10 changed files with 855 additions and 6 deletions.
53 changes: 53 additions & 0 deletions supernode/daemon/mgr/preheat/base_preaheater.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,53 @@
package preheat

import (
"sync"

"github.com/dragonflyoss/Dragonfly/supernode/daemon/mgr"
)

var _ Preheater = &BasePreheater{}

type BasePreheater struct {}

/**
* The type of this preheater
*/
func (p *BasePreheater) Type() string {
panic("not implement")
}

/**
* Create a worker to preheat the task.
*/
func (p *BasePreheater) NewWorker(task *mgr.PreheatTask , service *PreheatService) IWorker {
panic("not implement")
}

/**
* cancel the running task
*/
func (p *BasePreheater) Cancel(id string) {
woker, ok := workerMap.Load(id)
if !ok {
return
}
woker.(IWorker).Stop()
}

/**
* remove a running preheat task
*/
func (p *BasePreheater) Remove(id string) {
p.Cancel(id)
workerMap.Delete(id)
}

/**
* add a worker to workerMap.
*/
func (p *BasePreheater) addWorker(id string, worker IWorker) {
workerMap.Store(id, worker)
}

var workerMap = new(sync.Map)
100 changes: 100 additions & 0 deletions supernode/daemon/mgr/preheat/base_worker.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,100 @@
package preheat

import (
"runtime/debug"
"sync/atomic"
"time"

"github.com/dragonflyoss/Dragonfly/apis/types"
"github.com/dragonflyoss/Dragonfly/supernode/daemon/mgr"
)

const TIMEOUT = 30 * 60;

var _ IWorker = &BaseWorker{}

type IWorker interface{
Run()
Stop()
query() chan error
preRun() bool
failed(errMsg string)
afterRun()
}

type BaseWorker struct {
Task *mgr.PreheatTask
Preheater Preheater
PreheatService *PreheatService
stop *atomic.Value
worker IWorker
}

func newBaseWorker(task *mgr.PreheatTask, preheater Preheater, preheatService *PreheatService) *BaseWorker {
worker := &BaseWorker{
Task: task,
Preheater: preheater,
PreheatService: preheatService,
stop: new(atomic.Value),
}
worker.worker = worker
return worker
}

func (w *BaseWorker) Run() {
go func() {
defer func(){
e := recover()
if e != nil {
debug.PrintStack()
}
}()

if w.worker.preRun() {
timer := time.NewTimer(time.Second*TIMEOUT)
ch := w.worker.query()
select {
case <-timer.C:
w.worker.failed("timeout")
case err := <-ch:
if err != nil {
w.worker.failed(err.Error())
}
}
}
w.worker.afterRun()
}()
}

func (w *BaseWorker) Stop() {
w.stop.Store(true)
}

func (w *BaseWorker) isRunning() bool {
return w.stop.Load() == nil
}

func (w *BaseWorker) preRun() bool {
panic("not implement")
}

func (w *BaseWorker) afterRun() {
w.Preheater.Remove(w.Task.ID)
}

func (w *BaseWorker) query() chan error {
panic("not implement")
}

func (w *BaseWorker) succeed() {
w.Task.FinishTime = time.Now().UnixNano()/int64(time.Millisecond)
w.Task.Status = types.PreheatStatusSUCCESS
w.PreheatService.Update(w.Task.ID, w.Task)
}

func (w *BaseWorker) failed(errMsg string) {
w.Task.FinishTime = time.Now().UnixNano()/int64(time.Millisecond)
w.Task.Status = types.PreheatStatusFAILED
w.Task.ErrorMsg = errMsg
w.PreheatService.Update(w.Task.ID, w.Task)
}
93 changes: 93 additions & 0 deletions supernode/daemon/mgr/preheat/file_preaheater.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,93 @@
package preheat

import (
"errors"
"github.com/sirupsen/logrus"
"time"

"github.com/dragonflyoss/Dragonfly/apis/types"
"github.com/dragonflyoss/Dragonfly/supernode/daemon/mgr"
)

func init() {
RegisterPreheater("file", &FilePreheat{BasePreheater:new(BasePreheater)})
logrus.StandardLogger().SetLevel(logrus.DebugLevel)
}

type FilePreheat struct {
*BasePreheater
}

func (p *FilePreheat) Type() string {
return "file"
}

/**
* Create a worker to preheat the task.
*/
func (p *FilePreheat) NewWorker(task *mgr.PreheatTask , service *PreheatService) IWorker {
worker := &FileWorker{BaseWorker: newBaseWorker(task, p, service)}
worker.worker = worker
p.addWorker(task.ID, worker)
return worker
}

type FileWorker struct {
*BaseWorker
progress *PreheatProgress
}

func (w *FileWorker) preRun() bool {
w.Task.Status = types.PreheatStatusRUNNING
w.PreheatService.Update(w.Task.ID, w.Task)
var err error
w.progress, err = w.PreheatService.ExecutePreheat(w.Task)
if err != nil {
w.failed(err.Error())
return false
}
return true
}

func (w *FileWorker) afterRun() {
if w.progress != nil {
w.progress.cmd.Process.Kill()
}
w.BaseWorker.afterRun()
}

func (w *FileWorker) query() chan error {
result := make(chan error, 1)
go func(){
time.Sleep(time.Second*2)
for w.isRunning() {
if w.Task.FinishTime > 0 {
w.Preheater.Cancel(w.Task.ID)
return
}
if w.progress == nil {
w.succeed()
return
}
status := w.progress.cmd.ProcessState
if status != nil && status.Exited() {
if !status.Success() {
errMsg := "dfget failed:" + status.String()
w.failed(errMsg)
w.Preheater.Cancel(w.Task.ID)
result <- errors.New(errMsg)
return
} else {
w.succeed()
w.Preheater.Cancel(w.Task.ID)
result <- nil
return
}
}

time.Sleep(time.Second*10)
}
}()
return result
}

Loading

0 comments on commit 1e77334

Please sign in to comment.