diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index 48b6e165..77b0f346 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -78,6 +78,7 @@ func (s *Scheduler) work() { // Mark the scheduled run as running r.Status = gaia.RunRunning + r.RunDate = time.Now() // Update entry in store err := s.storeService.PipelinePutRun(&r) @@ -105,8 +106,25 @@ func (s *Scheduler) work() { continue } - // Start pipeline run process - s.executePipeline(pipeline, &r) + // Get all jobs + r.Jobs, err = s.getPipelineJobs(pipeline) + if err != nil { + gaia.Cfg.Logger.Error("cannot get pipeline jobs before execution", "error", err.Error()) + + // Update store + r.Status = gaia.RunFailed + s.storeService.PipelinePutRun(&r) + continue + } + + // Check if this pipeline has jobs declared + if len(r.Jobs) == 0 { + continue + } + + // Schedule jobs and execute them. + // Also update the run in the store. + s.scheduleJobsByPriority(&r, pipeline) } } @@ -176,33 +194,6 @@ func (s *Scheduler) SchedulePipeline(p *gaia.Pipeline) (*gaia.PipelineRun, error return &run, s.storeService.PipelinePutRun(&run) } -// executePipeline executes the given pipeline and updates it status periodically. -func (s *Scheduler) executePipeline(p *gaia.Pipeline, r *gaia.PipelineRun) { - // Set start time - r.RunDate = time.Now() - - // Get all jobs - var err error - r.Jobs, err = s.getPipelineJobs(p) - if err != nil { - gaia.Cfg.Logger.Error("cannot get pipeline jobs before execution", "error", err.Error()) - - // Update store - r.Status = gaia.RunFailed - s.storeService.PipelinePutRun(r) - return - } - - // Check if this pipeline has jobs declared - if len(r.Jobs) == 0 { - return - } - - // Schedule jobs and execute them. - // Also update the run in the store. - s.scheduleJobsByPriority(r, p) -} - // executeJob executes a single job. // This method is blocking. func executeJob(job *gaia.Job, p *gaia.Pipeline, wg *sync.WaitGroup, triggerSave chan bool) { @@ -211,13 +202,6 @@ func executeJob(job *gaia.Job, p *gaia.Pipeline, wg *sync.WaitGroup, triggerSave triggerSave <- true }() - // In testmode we do not test this. - // TODO: Bad testing. Fix this asap! - if gaia.Cfg.TestMode { - job.Status = gaia.JobSuccess - return - } - // Set Job to running job.Status = gaia.JobRunning @@ -276,7 +260,7 @@ func (s *Scheduler) scheduleJobsByPriority(r *gaia.PipelineRun, p *gaia.Pipeline var wg sync.WaitGroup triggerSave := make(chan bool) for id, job := range r.Jobs { - if job.Priority == lowestPrio { + if job.Priority == lowestPrio && job.Status == gaia.JobWaitingExec { // Increase wait group by one wg.Add(1) @@ -320,11 +304,6 @@ func (s *Scheduler) scheduleJobsByPriority(r *gaia.PipelineRun, p *gaia.Pipeline // getJobResultsAndStore func (s *Scheduler) getJobResultsAndStore(triggerSave chan bool, r *gaia.PipelineRun) { for _ = range triggerSave { - // TODO: Bad testing. Fix this asap! - if gaia.Cfg.TestMode { - continue - } - // Store update s.storeService.PipelinePutRun(r) } diff --git a/scheduler/scheduler_test.go b/scheduler/scheduler_test.go index 027bd70e..15d87eec 100644 --- a/scheduler/scheduler_test.go +++ b/scheduler/scheduler_test.go @@ -1,7 +1,9 @@ package scheduler import ( + "fmt" "hash/fnv" + "os" "testing" "github.com/gaia-pipeline/gaia" @@ -11,39 +13,77 @@ import ( func TestScheduleJobsByPriority(t *testing.T) { gaia.Cfg = &gaia.Config{} - gaia.Cfg.TestMode = true + storeInstance := store.NewStore() + gaia.Cfg.DataPath = "data" + gaia.Cfg.Bolt.Path = "test.db" + gaia.Cfg.Bolt.Mode = 0600 + // Create test folder + err := os.MkdirAll(gaia.Cfg.DataPath, 0700) + if err != nil { + fmt.Printf("cannot create data folder: %s\n", err.Error()) + t.Fatal(err) + } + + if err = storeInstance.Init(); err != nil { + t.Fatal(err) + } + p, r := prepareTestData() + s := NewScheduler(storeInstance) + s.scheduleJobsByPriority(r, p) + + // Iterate jobs + for _, job := range r.Jobs { + if job.Status != gaia.JobSuccess { + t.Fatalf("job status should be success but was %s", string(job.Status)) + } else { + t.Logf("Job %s has been executed...", job.Title) + } + } + + // cleanup + err = os.Remove("data/test.db") + if err != nil { + t.Fatal(err) + } + err = os.Remove("data") + if err != nil { + t.Fatal(err) + } +} + +func prepareTestData() (pipeline *gaia.Pipeline, pipelineRun *gaia.PipelineRun) { job1 := gaia.Job{ ID: hash("Job1"), Title: "Job1", Priority: 0, - Status: gaia.JobWaitingExec, + Status: gaia.JobSuccess, } job2 := gaia.Job{ ID: hash("Job2"), Title: "Job2", Priority: 10, - Status: gaia.JobWaitingExec, + Status: gaia.JobSuccess, } job3 := gaia.Job{ ID: hash("Job3"), Title: "Job3", Priority: 20, - Status: gaia.JobWaitingExec, + Status: gaia.JobSuccess, } job4 := gaia.Job{ ID: hash("Job4"), Title: "Job4", Priority: 20, - Status: gaia.JobWaitingExec, + Status: gaia.JobSuccess, } - p := &gaia.Pipeline{ + pipeline = &gaia.Pipeline{ ID: 1, Name: "Test Pipeline", Type: gaia.GOLANG, } - r := &gaia.PipelineRun{ + pipelineRun = &gaia.PipelineRun{ ID: 1, PipelineID: 1, Status: gaia.RunNotScheduled, @@ -55,18 +95,7 @@ func TestScheduleJobsByPriority(t *testing.T) { job4, }, } - - s := NewScheduler(store.NewStore()) - s.scheduleJobsByPriority(r, p) - - // Iterate jobs - for _, job := range r.Jobs { - if job.Status != gaia.JobSuccess { - t.Fatalf("job status should be success but was %s", string(job.Status)) - } else { - t.Logf("Job %s has been executed...", job.Title) - } - } + return } // hash hashes the given string. diff --git a/store/store.go b/store/store.go index 505774b0..78b8c450 100644 --- a/store/store.go +++ b/store/store.go @@ -54,12 +54,12 @@ func (s *Store) Init() error { s.db = db // Setup database - return setupDatabase(s) + return s.setupDatabase() } // setupDatabase create all buckets in the db. // Additionally, it makes sure that the admin user exists. -func setupDatabase(s *Store) error { +func (s *Store) setupDatabase() error { // Create bucket if not exists function var bucketName []byte c := func(tx *bolt.Tx) error { diff --git a/store/store_test.go b/store/store_test.go index 1f6cc099..38541b47 100644 --- a/store/store_test.go +++ b/store/store_test.go @@ -9,19 +9,25 @@ import ( ) var store *Store -var config *gaia.Config func TestMain(m *testing.M) { store = NewStore() - config = &gaia.Config{} - config.DataPath = "data" - config.Bolt.Path = "test.db" - config.Bolt.Mode = 0600 + gaia.Cfg = &gaia.Config{} + gaia.Cfg.DataPath = "data" + gaia.Cfg.Bolt.Path = "test.db" + gaia.Cfg.Bolt.Mode = 0600 + + // Create test folder + err := os.MkdirAll(gaia.Cfg.DataPath, 0700) + if err != nil { + fmt.Printf("cannot create data folder: %s\n", err.Error()) + os.Exit(1) + } r := m.Run() // cleanup - err := os.Remove("data") + err = os.Remove("data") if err != nil { fmt.Printf("cannot remove data folder: %s\n", err.Error()) r = 1 @@ -30,7 +36,7 @@ func TestMain(m *testing.M) { } func TestInit(t *testing.T) { - err := store.Init(config) + err := store.Init() if err != nil { t.Fatal(err) } @@ -43,7 +49,7 @@ func TestInit(t *testing.T) { } func TestUserGet(t *testing.T) { - err := store.Init(config) + err := store.Init() if err != nil { t.Fatal(err) } @@ -81,7 +87,7 @@ func TestUserGet(t *testing.T) { } func TestUserPut(t *testing.T) { - err := store.Init(config) + err := store.Init() if err != nil { t.Fatal(err) } @@ -103,7 +109,7 @@ func TestUserPut(t *testing.T) { } func TestUserAuth(t *testing.T) { - err := store.Init(config) + err := store.Init() if err != nil { t.Fatal(err) }