Skip to content

Commit

Permalink
More work on some tests. Need to find a way to test pipeline execution
Browse files Browse the repository at this point in the history
  • Loading branch information
michelvocks committed Mar 6, 2018
1 parent 3c17fc6 commit ed1682f
Show file tree
Hide file tree
Showing 4 changed files with 87 additions and 73 deletions.
63 changes: 21 additions & 42 deletions scheduler/scheduler.go
Expand Up @@ -78,6 +78,7 @@ func (s *Scheduler) work() {

// Mark the scheduled run as running
r.Status = gaia.RunRunning
r.RunDate = time.Now()

// Update entry in store
err := s.storeService.PipelinePutRun(&r)
Expand Down Expand Up @@ -105,8 +106,25 @@ func (s *Scheduler) work() {
continue
}

// Start pipeline run process
s.executePipeline(pipeline, &r)
// Get all jobs
r.Jobs, err = s.getPipelineJobs(pipeline)
if err != nil {
gaia.Cfg.Logger.Error("cannot get pipeline jobs before execution", "error", err.Error())

// Update store
r.Status = gaia.RunFailed
s.storeService.PipelinePutRun(&r)
continue
}

// Check if this pipeline has jobs declared
if len(r.Jobs) == 0 {
continue
}

// Schedule jobs and execute them.
// Also update the run in the store.
s.scheduleJobsByPriority(&r, pipeline)
}
}

Expand Down Expand Up @@ -176,33 +194,6 @@ func (s *Scheduler) SchedulePipeline(p *gaia.Pipeline) (*gaia.PipelineRun, error
return &run, s.storeService.PipelinePutRun(&run)
}

// executePipeline executes the given pipeline and updates it status periodically.
func (s *Scheduler) executePipeline(p *gaia.Pipeline, r *gaia.PipelineRun) {
// Set start time
r.RunDate = time.Now()

// Get all jobs
var err error
r.Jobs, err = s.getPipelineJobs(p)
if err != nil {
gaia.Cfg.Logger.Error("cannot get pipeline jobs before execution", "error", err.Error())

// Update store
r.Status = gaia.RunFailed
s.storeService.PipelinePutRun(r)
return
}

// Check if this pipeline has jobs declared
if len(r.Jobs) == 0 {
return
}

// Schedule jobs and execute them.
// Also update the run in the store.
s.scheduleJobsByPriority(r, p)
}

// executeJob executes a single job.
// This method is blocking.
func executeJob(job *gaia.Job, p *gaia.Pipeline, wg *sync.WaitGroup, triggerSave chan bool) {
Expand All @@ -211,13 +202,6 @@ func executeJob(job *gaia.Job, p *gaia.Pipeline, wg *sync.WaitGroup, triggerSave
triggerSave <- true
}()

// In testmode we do not test this.
// TODO: Bad testing. Fix this asap!
if gaia.Cfg.TestMode {
job.Status = gaia.JobSuccess
return
}

// Set Job to running
job.Status = gaia.JobRunning

Expand Down Expand Up @@ -276,7 +260,7 @@ func (s *Scheduler) scheduleJobsByPriority(r *gaia.PipelineRun, p *gaia.Pipeline
var wg sync.WaitGroup
triggerSave := make(chan bool)
for id, job := range r.Jobs {
if job.Priority == lowestPrio {
if job.Priority == lowestPrio && job.Status == gaia.JobWaitingExec {
// Increase wait group by one
wg.Add(1)

Expand Down Expand Up @@ -320,11 +304,6 @@ func (s *Scheduler) scheduleJobsByPriority(r *gaia.PipelineRun, p *gaia.Pipeline
// getJobResultsAndStore
func (s *Scheduler) getJobResultsAndStore(triggerSave chan bool, r *gaia.PipelineRun) {
for _ = range triggerSave {
// TODO: Bad testing. Fix this asap!
if gaia.Cfg.TestMode {
continue
}

// Store update
s.storeService.PipelinePutRun(r)
}
Expand Down
67 changes: 48 additions & 19 deletions scheduler/scheduler_test.go
@@ -1,7 +1,9 @@
package scheduler

import (
"fmt"
"hash/fnv"
"os"
"testing"

"github.com/gaia-pipeline/gaia"
Expand All @@ -11,39 +13,77 @@ import (

func TestScheduleJobsByPriority(t *testing.T) {
gaia.Cfg = &gaia.Config{}
gaia.Cfg.TestMode = true
storeInstance := store.NewStore()
gaia.Cfg.DataPath = "data"
gaia.Cfg.Bolt.Path = "test.db"
gaia.Cfg.Bolt.Mode = 0600

// Create test folder
err := os.MkdirAll(gaia.Cfg.DataPath, 0700)
if err != nil {
fmt.Printf("cannot create data folder: %s\n", err.Error())
t.Fatal(err)
}

if err = storeInstance.Init(); err != nil {
t.Fatal(err)
}
p, r := prepareTestData()
s := NewScheduler(storeInstance)
s.scheduleJobsByPriority(r, p)

// Iterate jobs
for _, job := range r.Jobs {
if job.Status != gaia.JobSuccess {
t.Fatalf("job status should be success but was %s", string(job.Status))
} else {
t.Logf("Job %s has been executed...", job.Title)
}
}

// cleanup
err = os.Remove("data/test.db")
if err != nil {
t.Fatal(err)
}
err = os.Remove("data")
if err != nil {
t.Fatal(err)
}
}

func prepareTestData() (pipeline *gaia.Pipeline, pipelineRun *gaia.PipelineRun) {
job1 := gaia.Job{
ID: hash("Job1"),
Title: "Job1",
Priority: 0,
Status: gaia.JobWaitingExec,
Status: gaia.JobSuccess,
}
job2 := gaia.Job{
ID: hash("Job2"),
Title: "Job2",
Priority: 10,
Status: gaia.JobWaitingExec,
Status: gaia.JobSuccess,
}
job3 := gaia.Job{
ID: hash("Job3"),
Title: "Job3",
Priority: 20,
Status: gaia.JobWaitingExec,
Status: gaia.JobSuccess,
}
job4 := gaia.Job{
ID: hash("Job4"),
Title: "Job4",
Priority: 20,
Status: gaia.JobWaitingExec,
Status: gaia.JobSuccess,
}

p := &gaia.Pipeline{
pipeline = &gaia.Pipeline{
ID: 1,
Name: "Test Pipeline",
Type: gaia.GOLANG,
}
r := &gaia.PipelineRun{
pipelineRun = &gaia.PipelineRun{
ID: 1,
PipelineID: 1,
Status: gaia.RunNotScheduled,
Expand All @@ -55,18 +95,7 @@ func TestScheduleJobsByPriority(t *testing.T) {
job4,
},
}

s := NewScheduler(store.NewStore())
s.scheduleJobsByPriority(r, p)

// Iterate jobs
for _, job := range r.Jobs {
if job.Status != gaia.JobSuccess {
t.Fatalf("job status should be success but was %s", string(job.Status))
} else {
t.Logf("Job %s has been executed...", job.Title)
}
}
return
}

// hash hashes the given string.
Expand Down
4 changes: 2 additions & 2 deletions store/store.go
Expand Up @@ -54,12 +54,12 @@ func (s *Store) Init() error {
s.db = db

// Setup database
return setupDatabase(s)
return s.setupDatabase()
}

// setupDatabase create all buckets in the db.
// Additionally, it makes sure that the admin user exists.
func setupDatabase(s *Store) error {
func (s *Store) setupDatabase() error {
// Create bucket if not exists function
var bucketName []byte
c := func(tx *bolt.Tx) error {
Expand Down
26 changes: 16 additions & 10 deletions store/store_test.go
Expand Up @@ -9,19 +9,25 @@ import (
)

var store *Store
var config *gaia.Config

func TestMain(m *testing.M) {
store = NewStore()
config = &gaia.Config{}
config.DataPath = "data"
config.Bolt.Path = "test.db"
config.Bolt.Mode = 0600
gaia.Cfg = &gaia.Config{}
gaia.Cfg.DataPath = "data"
gaia.Cfg.Bolt.Path = "test.db"
gaia.Cfg.Bolt.Mode = 0600

// Create test folder
err := os.MkdirAll(gaia.Cfg.DataPath, 0700)
if err != nil {
fmt.Printf("cannot create data folder: %s\n", err.Error())
os.Exit(1)
}

r := m.Run()

// cleanup
err := os.Remove("data")
err = os.Remove("data")
if err != nil {
fmt.Printf("cannot remove data folder: %s\n", err.Error())
r = 1
Expand All @@ -30,7 +36,7 @@ func TestMain(m *testing.M) {
}

func TestInit(t *testing.T) {
err := store.Init(config)
err := store.Init()
if err != nil {
t.Fatal(err)
}
Expand All @@ -43,7 +49,7 @@ func TestInit(t *testing.T) {
}

func TestUserGet(t *testing.T) {
err := store.Init(config)
err := store.Init()
if err != nil {
t.Fatal(err)
}
Expand Down Expand Up @@ -81,7 +87,7 @@ func TestUserGet(t *testing.T) {
}

func TestUserPut(t *testing.T) {
err := store.Init(config)
err := store.Init()
if err != nil {
t.Fatal(err)
}
Expand All @@ -103,7 +109,7 @@ func TestUserPut(t *testing.T) {
}

func TestUserAuth(t *testing.T) {
err := store.Init(config)
err := store.Init()
if err != nil {
t.Fatal(err)
}
Expand Down

0 comments on commit ed1682f

Please sign in to comment.