From ab48faa9b7203f0d15e6843b351cca14bd38bad1 Mon Sep 17 00:00:00 2001 From: Michel Vocks Date: Tue, 17 Jul 2018 07:50:39 +0200 Subject: [PATCH] One pipeline run now starts exactly one pipeline process. Added scheduler tests. Introduced an interface for pipeline which makes testing easier. Removed single job log view. --- cmd/gaia/main.go | 6 +- frontend/client/views/pipeline/detail.vue | 13 +- frontend/client/views/pipeline/log.vue | 51 ++---- gaia.go | 3 + handlers/pipeline_run.go | 95 ++-------- plugin/plugin.go | 32 ++-- scheduler/scheduler.go | 195 +++++++++++---------- scheduler/scheduler_test.go | 200 +++++++++++++++++++--- 8 files changed, 334 insertions(+), 261 deletions(-) diff --git a/cmd/gaia/main.go b/cmd/gaia/main.go index 3550c860..08faa46a 100644 --- a/cmd/gaia/main.go +++ b/cmd/gaia/main.go @@ -13,6 +13,7 @@ import ( "github.com/gaia-pipeline/gaia" "github.com/gaia-pipeline/gaia/handlers" "github.com/gaia-pipeline/gaia/pipeline" + "github.com/gaia-pipeline/gaia/plugin" scheduler "github.com/gaia-pipeline/gaia/scheduler" "github.com/gaia-pipeline/gaia/store" hclog "github.com/hashicorp/go-hclog" @@ -132,8 +133,11 @@ func main() { os.Exit(1) } + // Create new plugin system + pS := &plugin.Plugin{} + // Initialize scheduler - scheduler := scheduler.NewScheduler(store) + scheduler := scheduler.NewScheduler(store, pS) err = scheduler.Init() if err != nil { gaia.Cfg.Logger.Error("cannot initialize scheduler:", "error", err.Error()) diff --git a/frontend/client/views/pipeline/detail.vue b/frontend/client/views/pipeline/detail.vue index 99d91cd2..8191704d 100755 --- a/frontend/client/views/pipeline/detail.vue +++ b/frontend/client/views/pipeline/detail.vue @@ -90,7 +90,6 @@ export default { } ], runsRows: [], - job: null, pipelineViewOptions: { physics: { stabilization: true }, layout: { @@ -333,11 +332,6 @@ export default { // Create vis network // We have to move out the instance out of vue because of https://github.com/almende/vis/issues/2567 window.pipelineView = new Vis.Network(container, data, this.pipelineViewOptions) - - // Create an selectNode event - window.pipelineView.on('selectNode', function (params) { - this.job = this.nodes.get(params.nodes[0]) - }.bind(this)) } }, @@ -358,13 +352,8 @@ export default { }, jobLog () { - var jobid = null - if (this.job) { - jobid = this.job.internalID - } - // Route - this.$router.push({path: '/pipeline/log', query: { pipelineid: this.pipelineID, runid: this.runID, jobid: jobid }}) + this.$router.push({path: '/pipeline/log', query: { pipelineid: this.pipelineID, runid: this.runID }}) }, startPipeline (pipelineid) { diff --git a/frontend/client/views/pipeline/log.vue b/frontend/client/views/pipeline/log.vue index a9c765f0..38cad099 100755 --- a/frontend/client/views/pipeline/log.vue +++ b/frontend/client/views/pipeline/log.vue @@ -22,9 +22,7 @@ export default { logText: '', jobRunning: true, runID: null, - pipelineID: null, - jobID: null, - currentPath: '' + pipelineID: null } }, @@ -36,26 +34,28 @@ export default { this.fetchData() // periodically update dashboard - this.intervalID = setInterval(function () { + var intervalID = setInterval(function () { this.fetchData() }.bind(this), 3000) - this.currentPath = this.$route.path + + // Append interval id to store + this.$store.commit('appendInterval', intervalID) }, watch: { '$route': 'fetchData' }, + destroyed () { + this.$store.commit('clearIntervals') + }, + components: { Message }, methods: { fetchData () { - if (this.$route.path !== this.currentPath) { - this.$store.commit('clearIntervals') - } - // look up required url parameters this.pipelineID = this.$route.query.pipelineid this.runID = this.$route.query.runid @@ -63,37 +63,18 @@ export default { return } - // job id is optional. If ommitted, all logs from all jobs - // are displayed. - this.jobID = this.$route.query.jobid - this.$http - .get('/api/v1/pipelinerun/' + this.pipelineID + '/' + this.runID + '/log', { - showProgressBar: false, - params: { - jobid: this.jobID - } - }) + .get('/api/v1/pipelinerun/' + this.pipelineID + '/' + this.runID + '/log', { showProgressBar: false }) .then(response => { if (response.data) { - // Check if we got multiple objects - var finished = true - this.logText = '' - for (let i = 0, l = response.data.length; i < l; i++) { - // We add the received log - this.logText += response.data[i].log - - // LF does not work for HTML. Replace with
- this.logText = this.logText.replace(/\n/g, '
') - - // Job not finished? - if (!response.data[i].finished) { - finished = false - } - } + // We add the received log + this.logText = response.data.log + + // LF does not work for HTML. Replace with
+ this.logText = this.logText.replace(/\n/g, '
') // All jobs finished. Stop interval. - if (finished && response.data.length > 0) { + if (response.data.finished) { this.jobRunning = false clearInterval(this.intervalID) } diff --git a/gaia.go b/gaia.go index 9e3818a9..32171d7c 100644 --- a/gaia.go +++ b/gaia.go @@ -66,6 +66,9 @@ const ( // LogsFolderName represents the Name of the logs folder in pipeline run folder LogsFolderName = "logs" + + // LogsFileName represents the file name of the logs output + LogsFileName = "output.log" ) // User is the user object diff --git a/handlers/pipeline_run.go b/handlers/pipeline_run.go index 2d250f3c..bb202da3 100644 --- a/handlers/pipeline_run.go +++ b/handlers/pipeline_run.go @@ -5,7 +5,6 @@ import ( "net/http" "os" "path/filepath" - "sort" "strconv" "github.com/gaia-pipeline/gaia" @@ -84,20 +83,15 @@ func PipelineGetLatestRun(c echo.Context) error { return c.JSON(http.StatusOK, run) } -// GetJobLogs returns jobs for a given job. -// If no jobID is given, a collection of all jobs logs will be returned. +// GetJobLogs returns logs from a pipeline run. // // Required parameters: // pipelineid - Related pipeline id // pipelinerunid - Related pipeline run id -// -// Optional parameters: -// jobid - Job id func GetJobLogs(c echo.Context) error { // Get parameters and validate pipelineID := c.Param("pipelineid") pipelineRunID := c.Param("runid") - jobID := c.QueryParam("jobid") // Transform pipelineid to int p, err := strconv.Atoi(pipelineID) @@ -111,92 +105,31 @@ func GetJobLogs(c echo.Context) error { return c.String(http.StatusBadRequest, "invalid pipeline run id given") } - // Get pipeline run from store run, err := storeService.PipelineGetRunByPipelineIDAndID(p, r) if err != nil { return c.String(http.StatusBadRequest, "cannot find pipeline run with given pipeline id and pipeline run id") } - // jobID is not empty, just return the logs from this job - if jobID != "" { - for _, job := range run.Jobs { - if strconv.FormatUint(uint64(job.ID), 10) == jobID { - // Get logs - jL, err := getLogs(pipelineID, pipelineRunID, jobID, false) - if err != nil { - return c.String(http.StatusBadRequest, err.Error()) - } - - // Check if job is finished - if job.Status == gaia.JobSuccess || job.Status == gaia.JobFailed { - jL.Finished = true - } - - // We always return an array. - // It makes a bit easier in the frontend. - jobLogsList := []jobLogs{} - jobLogsList = append(jobLogsList, *jL) - return c.JSON(http.StatusOK, jobLogsList) - } - } + // Create return object + jL := jobLogs{} - // Logs for given job id not found - return c.String(http.StatusBadRequest, "cannot find job with given job id") + // Determine if job has been finished + if run.Status == gaia.RunFailed || run.Status == gaia.RunSuccess { + jL.Finished = true } - // Sort the slice. This is important for the order of the returned logs. - sort.Slice(run.Jobs, func(i, j int) bool { - return run.Jobs[i].Priority < run.Jobs[j].Priority - }) - - // Return a collection of all logs - jobs := []jobLogs{} - for _, job := range run.Jobs { - // Get logs - jL, err := getLogs(pipelineID, pipelineRunID, strconv.FormatUint(uint64(job.ID), 10), true) + // Check if log file exists + logFilePath := filepath.Join(gaia.Cfg.WorkspacePath, pipelineID, pipelineRunID, gaia.LogsFolderName, gaia.LogsFileName) + if _, err := os.Stat(logFilePath); err == nil { + content, err := ioutil.ReadFile(logFilePath) if err != nil { - return c.String(http.StatusBadRequest, err.Error()) - } - - // No error but also no job logs. Job must be in the queue. - // We skip it so no error will break things. - if jL == nil { - continue - } - - // Check if job is finished - if job.Status == gaia.JobSuccess || job.Status == gaia.JobFailed { - jL.Finished = true + return c.String(http.StatusInternalServerError, "cannot read pipeline run log file") } - jobs = append(jobs, *jL) + // Convert logs + jL.Log = string(content) } // Return logs - return c.JSON(http.StatusOK, jobs) -} - -func getLogs(pipelineID, pipelineRunID, jobID string, getAllJobLogs bool) (*jobLogs, error) { - // Lookup log file - logFilePath := filepath.Join(gaia.Cfg.WorkspacePath, pipelineID, pipelineRunID, gaia.LogsFolderName, jobID) - - // We only check if logs exist when a specific job log was requested. - // If we don't do this, get all job logs will fail during a pipeline run. - if _, err := os.Stat(logFilePath); os.IsNotExist(err) { - if !getAllJobLogs { - return nil, err - } - return nil, nil - } - - // Read file - content, err := ioutil.ReadFile(logFilePath) - if err != nil { - return nil, err - } - - // Create return struct - return &jobLogs{ - Log: string(content), - }, nil + return c.JSON(http.StatusOK, jL) } diff --git a/plugin/plugin.go b/plugin/plugin.go index 3d5416b0..0a133967 100644 --- a/plugin/plugin.go +++ b/plugin/plugin.go @@ -8,6 +8,7 @@ import ( "os/exec" "github.com/gaia-pipeline/gaia" + "github.com/gaia-pipeline/gaia/scheduler" "github.com/gaia-pipeline/protobuf" plugin "github.com/hashicorp/go-plugin" ) @@ -45,23 +46,29 @@ type Plugin struct { // NewPlugin creates a new instance of Plugin. // One Plugin instance represents one connection to a plugin. -// -// It expects the start command to start the plugin and the log path (including file) -// where the output should be logged to. -func NewPlugin(command *exec.Cmd, logPath *string) (p *Plugin, err error) { - // Allocate - p = &Plugin{} +func (p *Plugin) NewPlugin() scheduler.Plugin { + return &Plugin{} +} +// Connect prepares the log path, starts the plugin, initiates the +// gRPC connection and looks up the plugin. +// It's up to the caller to call plugin.Close to shutdown the plugin +// and close the gRPC connection. +// +// It expects the start command for the plugin and the path where +// the log file should be stored. +func (p *Plugin) Connect(command *exec.Cmd, logPath *string) error { // Create log file and open it. // We will close this file in the close method. if logPath != nil { + var err error p.logFile, err = os.OpenFile( *logPath, os.O_CREATE|os.O_WRONLY, 0666, ) if err != nil { - return nil, err + return err } } @@ -77,13 +84,6 @@ func NewPlugin(command *exec.Cmd, logPath *string) (p *Plugin, err error) { Stderr: p.writer, }) - return p, nil -} - -// Connect starts the plugin, initiates the gRPC connection and looks up the plugin. -// It's up to the caller to call plugin.Close to shutdown the plugin -// and close the gRPC connection. -func (p *Plugin) Connect() error { // Connect via gRPC gRPCClient, err := p.client.Client() if err != nil { @@ -116,6 +116,10 @@ func (p *Plugin) Execute(j *gaia.Job) error { // Execute the job _, err := p.pluginConn.ExecuteJob(job) + + // Flush logs + p.writer.Flush() + return err } diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go index 5e6ee19b..092c5e27 100644 --- a/scheduler/scheduler.go +++ b/scheduler/scheduler.go @@ -10,7 +10,6 @@ import ( "time" "github.com/gaia-pipeline/gaia" - "github.com/gaia-pipeline/gaia/plugin" "github.com/gaia-pipeline/gaia/store" uuid "github.com/satori/go.uuid" ) @@ -30,6 +29,26 @@ var ( errCreateCMDForPipeline = errors.New("could not create execute command for plugin") ) +// Plugin represents the plugin implementation which is used +// during scheduling and execution. +type Plugin interface { + // NewPlugin creates a new instance of plugin + NewPlugin() Plugin + + // Connect initializes the connection with the execution command + // and the log path wbere the logs should be stored. + Connect(command *exec.Cmd, logPath *string) error + + // Execute executes one job of a pipeline. + Execute(j *gaia.Job) error + + // GetJobs returns all real jobs from the pipeline. + GetJobs() ([]gaia.Job, error) + + // Close closes the connection and cleansup open file writes. + Close() +} + // Scheduler represents the schuler object type Scheduler struct { // buffered channel which is used as queue @@ -38,14 +57,18 @@ type Scheduler struct { // storeService is an instance of store. // Use this to talk to the store. storeService *store.Store + + // pluginSystem is the used plugin system. + pluginSystem Plugin } // NewScheduler creates a new instance of Scheduler. -func NewScheduler(store *store.Store) *Scheduler { +func NewScheduler(store *store.Store, pS Plugin) *Scheduler { // Create new scheduler s := &Scheduler{ scheduledRuns: make(chan gaia.PipelineRun, schedulerBufferLimit), storeService: store, + pluginSystem: pS, } return s @@ -79,76 +102,86 @@ func (s *Scheduler) Init() error { return nil } -// work takes work from the scheduled run buffer channel -// and executes the pipeline. Then repeats. +// work takes work from the scheduled run buffer channel and starts +// the preparation and execution of the pipeline. Then repeats. func (s *Scheduler) work() { // This worker never stops working. for { // Take one scheduled run, block if there are no scheduled pipelines r := <-s.scheduledRuns - // Mark the scheduled run as running - r.Status = gaia.RunRunning - r.StartDate = time.Now() + // Prepare execution and start it + s.prepareAndExec(&r) + } +} - // Update entry in store - err := s.storeService.PipelinePutRun(&r) - if err != nil { - gaia.Cfg.Logger.Debug("could not put pipeline run into store during executing work", "error", err.Error()) - continue - } +// prepareAndExec does the real preparation and start the execution. +func (s *Scheduler) prepareAndExec(r *gaia.PipelineRun) { + // Mark the scheduled run as running + r.Status = gaia.RunRunning + r.StartDate = time.Now() - // Get related pipeline from pipeline run - pipeline, err := s.storeService.PipelineGet(r.PipelineID) - if err != nil { - gaia.Cfg.Logger.Debug("cannot access pipeline during execution", "error", err.Error()) - r.Status = gaia.RunFailed - } else if pipeline == nil { - gaia.Cfg.Logger.Debug("wanted to execute job for pipeline which does not exist", "run", r) - r.Status = gaia.RunFailed - } + // Update entry in store + err := s.storeService.PipelinePutRun(r) + if err != nil { + gaia.Cfg.Logger.Debug("could not put pipeline run into store during executing work", "error", err.Error()) + return + } - if r.Status == gaia.RunFailed { - // Update entry in store - err = s.storeService.PipelinePutRun(&r) - if err != nil { - gaia.Cfg.Logger.Debug("could not put pipeline run into store during executing work", "error", err.Error()) - } - continue - } + // Get related pipeline from pipeline run + pipeline, _ := s.storeService.PipelineGet(r.PipelineID) - // Get all jobs - r.Jobs, err = s.getPipelineJobs(pipeline) - if err != nil { - gaia.Cfg.Logger.Error("cannot get pipeline jobs before execution", "error", err.Error()) + // Get all jobs + r.Jobs, err = s.getPipelineJobs(pipeline) + if err != nil { + gaia.Cfg.Logger.Error("cannot get pipeline jobs before execution", "error", err.Error()) - // Update store - r.Status = gaia.RunFailed - s.storeService.PipelinePutRun(&r) - continue - } + // Update store + r.Status = gaia.RunFailed + s.storeService.PipelinePutRun(r) + return + } - // Check if this pipeline has jobs declared - if len(r.Jobs) == 0 { - // Finish pipeline run - s.finishPipelineRun(&r, gaia.RunSuccess) - continue - } + // Check if this pipeline has jobs declared + if len(r.Jobs) == 0 { + // Finish pipeline run + s.finishPipelineRun(r, gaia.RunSuccess) + return + } - // Create logs folder for this run - path := filepath.Join(gaia.Cfg.WorkspacePath, strconv.Itoa(r.PipelineID), strconv.Itoa(r.ID), gaia.LogsFolderName) - err = os.MkdirAll(path, 0700) - if err != nil { - gaia.Cfg.Logger.Error("cannot create pipeline run folder", "error", err.Error(), "path", path) - } + // Create logs folder for this run + path := filepath.Join(gaia.Cfg.WorkspacePath, strconv.Itoa(r.PipelineID), strconv.Itoa(r.ID), gaia.LogsFolderName) + err = os.MkdirAll(path, 0700) + if err != nil { + gaia.Cfg.Logger.Error("cannot create pipeline run folder", "error", err.Error(), "path", path) + } - // Schedule jobs and execute them. - // Also update the run in the store. - s.scheduleJobsByPriority(&r, pipeline) + // Create the start command for the pipeline + c := createPipelineCmd(pipeline) + if c == nil { + gaia.Cfg.Logger.Debug("cannot create pipeline start command", "error", errCreateCMDForPipeline.Error()) + s.finishPipelineRun(r, gaia.RunFailed) + return } + + // Create new plugin instance + pS := s.pluginSystem.NewPlugin() + + // Connect to plugin(pipeline) + path = filepath.Join(path, gaia.LogsFileName) + if err := pS.Connect(c, &path); err != nil { + gaia.Cfg.Logger.Debug("cannot connect to pipeline", "error", err.Error(), "pipeline", pipeline) + s.finishPipelineRun(r, gaia.RunFailed) + return + } + defer pS.Close() + + // Schedule jobs and execute them. + // Also update the run in the store. + s.scheduleJobsByPriority(r, pipeline, pS) } -// schedule looks in the store for new work to do and schedules it. +// schedule looks in the store for new work and schedules it. func (s *Scheduler) schedule() { // Do we have space left in our buffer? if len(s.scheduledRuns) >= schedulerBufferLimit { @@ -216,40 +249,18 @@ func (s *Scheduler) SchedulePipeline(p *gaia.Pipeline) (*gaia.PipelineRun, error // executeJob executes a single job. // This method is blocking. -func executeJob(job *gaia.Job, p *gaia.Pipeline, logPath string, wg *sync.WaitGroup, triggerSave chan bool) { +func executeJob(job *gaia.Job, pS Plugin, wg *sync.WaitGroup, triggerSave chan bool) { defer wg.Done() defer func() { triggerSave <- true }() - // Set Job to running + // Set Job to running and trigger save job.Status = gaia.JobRunning - - // Create the start command for the pipeline - c := createPipelineCmd(p) - if c == nil { - gaia.Cfg.Logger.Debug("cannot execute pipeline job", "error", errCreateCMDForPipeline.Error(), "job", job) - job.Status = gaia.JobFailed - return - } - - // Create new plugin instance - pC, err := plugin.NewPlugin(c, &logPath) - if err != nil { - gaia.Cfg.Logger.Error("cannot initiate plugin before job execution", "error", err.Error()) - return - } - - // Connect to plugin(pipeline) - if err := pC.Connect(); err != nil { - gaia.Cfg.Logger.Debug("cannot connect to pipeline", "error", err.Error(), "pipeline", p) - job.Status = gaia.JobFailed - return - } - defer pC.Close() + triggerSave <- true // Execute job - if err := pC.Execute(job); err != nil { + if err := pS.Execute(job); err != nil { // TODO: Show it to user gaia.Cfg.Logger.Debug("error during job execution", "error", err.Error(), "job", job) job.Status = gaia.JobFailed @@ -262,7 +273,7 @@ func executeJob(job *gaia.Job, p *gaia.Pipeline, logPath string, wg *sync.WaitGr // scheduleJobsByPriority schedules the given jobs by their respective // priority. This method is designed to be recursive and blocking. // If jobs have the same priority, they will be executed in parallel. -func (s *Scheduler) scheduleJobsByPriority(r *gaia.PipelineRun, p *gaia.Pipeline) { +func (s *Scheduler) scheduleJobsByPriority(r *gaia.PipelineRun, p *gaia.Pipeline, pS Plugin) { // Do a prescheduling and set it to the first waiting job var lowestPrio int64 for _, job := range r.Jobs { @@ -289,9 +300,7 @@ func (s *Scheduler) scheduleJobsByPriority(r *gaia.PipelineRun, p *gaia.Pipeline wg.Add(1) // Execute this job in a separate goroutine - path := filepath.Join(gaia.Cfg.WorkspacePath, strconv.Itoa(r.PipelineID), strconv.Itoa(r.ID), gaia.LogsFolderName) - path = filepath.Join(path, strconv.FormatUint(uint64(job.ID), 10)) - go executeJob(&r.Jobs[id], p, path, &wg, triggerSave) + go executeJob(&r.Jobs[id], pS, &wg, triggerSave) } } @@ -322,7 +331,7 @@ func (s *Scheduler) scheduleJobsByPriority(r *gaia.PipelineRun, p *gaia.Pipeline } // Run scheduleJobsByPriority again until all jobs have been executed - s.scheduleJobsByPriority(r, p) + s.scheduleJobsByPriority(r, p, pS) } // getJobResultsAndStore @@ -342,21 +351,17 @@ func (s *Scheduler) getPipelineJobs(p *gaia.Pipeline) ([]gaia.Job, error) { return nil, errCreateCMDForPipeline } - // Create new plugin instance - pC, err := plugin.NewPlugin(c, nil) - if err != nil { - gaia.Cfg.Logger.Error("cannot initiate plugin", "error", err.Error()) - return nil, err - } + // Create new Plugin instance + pS := s.pluginSystem.NewPlugin() // Connect to plugin(pipeline) - if err := pC.Connect(); err != nil { + if err := pS.Connect(c, nil); err != nil { gaia.Cfg.Logger.Debug("cannot connect to pipeline", "error", err.Error(), "pipeline", p) return nil, err } - defer pC.Close() + defer pS.Close() - return pC.GetJobs() + return pS.GetJobs() } // SetPipelineJobs uses the plugin system to get all jobs from the given pipeline. diff --git a/scheduler/scheduler_test.go b/scheduler/scheduler_test.go index f36069a0..86d39fdd 100644 --- a/scheduler/scheduler_test.go +++ b/scheduler/scheduler_test.go @@ -1,35 +1,82 @@ package scheduler import ( - "fmt" "hash/fnv" "os" + "os/exec" + "path/filepath" "testing" + "time" "github.com/gaia-pipeline/gaia" "github.com/gaia-pipeline/gaia/store" + hclog "github.com/hashicorp/go-hclog" uuid "github.com/satori/go.uuid" ) -func TestScheduleJobsByPriority(t *testing.T) { +type PluginFake struct { + // Fake struct + jobs []gaia.Job +} + +var pluginFake *PluginFake + +func (p *PluginFake) NewPlugin() Plugin { + return &PluginFake{} +} +func (p *PluginFake) Connect(cmd *exec.Cmd, logPath *string) error { return nil } +func (p *PluginFake) Execute(j *gaia.Job) error { return nil } +func (p *PluginFake) GetJobs() ([]gaia.Job, error) { return pluginFake.jobs, nil } +func (p *PluginFake) Close() {} + +func TestInit(t *testing.T) { gaia.Cfg = &gaia.Config{} storeInstance := store.NewStore() - gaia.Cfg.DataPath = "data" + gaia.Cfg.DataPath = os.TempDir() + gaia.Cfg.WorkspacePath = filepath.Join(os.TempDir(), "tmp") gaia.Cfg.Bolt.Mode = 0600 - - // Create test folder - err := os.MkdirAll(gaia.Cfg.DataPath, 0700) + gaia.Cfg.Logger = hclog.New(&hclog.LoggerOptions{ + Level: hclog.Trace, + Output: hclog.DefaultOutput, + Name: "Gaia", + }) + gaia.Cfg.Worker = "2" + if err := storeInstance.Init(); err != nil { + t.Fatal(err) + } + pluginFake = &PluginFake{} + s := NewScheduler(storeInstance, pluginFake) + err := s.Init() + if err != nil { + t.Fatal(err) + } + err = os.Remove(filepath.Join(os.TempDir(), "gaia.db")) if err != nil { - fmt.Printf("cannot create data folder: %s\n", err.Error()) t.Fatal(err) } +} + +func TestPrepareAndExec(t *testing.T) { + gaia.Cfg = &gaia.Config{} + storeInstance := store.NewStore() + gaia.Cfg.DataPath = os.TempDir() + gaia.Cfg.WorkspacePath = filepath.Join(os.TempDir(), "tmp") + gaia.Cfg.Bolt.Mode = 0600 + gaia.Cfg.Logger = hclog.New(&hclog.LoggerOptions{ + Level: hclog.Trace, + Output: hclog.DefaultOutput, + Name: "Gaia", + }) - if err = storeInstance.Init(); err != nil { + if err := storeInstance.Init(); err != nil { t.Fatal(err) } p, r := prepareTestData() - s := NewScheduler(storeInstance) - s.scheduleJobsByPriority(r, p) + storeInstance.PipelinePut(p) + pluginFake = &PluginFake{} + pluginFake.jobs = p.Jobs + s := NewScheduler(storeInstance, pluginFake) + s.prepareAndExec(r) // Iterate jobs for _, job := range r.Jobs { @@ -39,13 +86,120 @@ func TestScheduleJobsByPriority(t *testing.T) { t.Logf("Job %s has been executed...", job.Title) } } + err := os.Remove(filepath.Join(os.TempDir(), "gaia.db")) + if err != nil { + t.Fatal(err) + } +} + +func TestSchedulePipeline(t *testing.T) { + gaia.Cfg = &gaia.Config{} + storeInstance := store.NewStore() + gaia.Cfg.DataPath = os.TempDir() + gaia.Cfg.WorkspacePath = filepath.Join(os.TempDir(), "tmp") + gaia.Cfg.Bolt.Mode = 0600 + gaia.Cfg.Logger = hclog.New(&hclog.LoggerOptions{ + Level: hclog.Trace, + Output: hclog.DefaultOutput, + Name: "Gaia", + }) + gaia.Cfg.Worker = "2" + if err := storeInstance.Init(); err != nil { + t.Fatal(err) + } + p, _ := prepareTestData() + storeInstance.PipelinePut(p) + pluginFake = &PluginFake{} + pluginFake.jobs = p.Jobs + s := NewScheduler(storeInstance, pluginFake) + err := s.Init() + if err != nil { + t.Fatal(err) + } + _, err = s.SchedulePipeline(p) + if err != nil { + t.Fatal(err) + } + + err = os.Remove(filepath.Join(os.TempDir(), "gaia.db")) + if err != nil { + t.Fatal(err) + } +} + +func TestSchedule(t *testing.T) { + gaia.Cfg = &gaia.Config{} + storeInstance := store.NewStore() + gaia.Cfg.DataPath = os.TempDir() + gaia.Cfg.WorkspacePath = filepath.Join(os.TempDir(), "tmp") + gaia.Cfg.Bolt.Mode = 0600 + gaia.Cfg.Logger = hclog.New(&hclog.LoggerOptions{ + Level: hclog.Trace, + Output: hclog.DefaultOutput, + Name: "Gaia", + }) + gaia.Cfg.Worker = "2" + if err := storeInstance.Init(); err != nil { + t.Fatal(err) + } + p, _ := prepareTestData() + storeInstance.PipelinePut(p) + pluginFake = &PluginFake{} + pluginFake.jobs = p.Jobs + s := NewScheduler(storeInstance, pluginFake) + err := s.Init() + if err != nil { + t.Fatal(err) + } + _, err = s.SchedulePipeline(p) + if err != nil { + t.Fatal(err) + } + // Wait some time to pickup work and finish. + // We have to wait at least 3 seconds for scheduler tick interval. + time.Sleep(5 * time.Second) + r, err := storeInstance.PipelineGetRunByPipelineIDAndID(p.ID, 1) + if err != nil { + t.Fatal(err) + } + for _, job := range r.Jobs { + if job.Status != gaia.JobSuccess { + t.Fatalf("Job %s has status %s but should be %s!\n", job.Title, string(job.Status), string(gaia.JobSuccess)) + } + } + err = os.Remove(filepath.Join(os.TempDir(), "gaia.db")) + if err != nil { + t.Fatal(err) + } +} - // cleanup - err = os.Remove("data/gaia.db") +func TestSetPipelineJobs(t *testing.T) { + gaia.Cfg = &gaia.Config{} + storeInstance := store.NewStore() + gaia.Cfg.DataPath = os.TempDir() + gaia.Cfg.WorkspacePath = filepath.Join(os.TempDir(), "tmp") + gaia.Cfg.Bolt.Mode = 0600 + gaia.Cfg.Logger = hclog.New(&hclog.LoggerOptions{ + Level: hclog.Trace, + Output: hclog.DefaultOutput, + Name: "Gaia", + }) + if err := storeInstance.Init(); err != nil { + t.Fatal(err) + } + p, _ := prepareTestData() + pluginFake = &PluginFake{} + pluginFake.jobs = p.Jobs + p.Jobs = nil + s := NewScheduler(storeInstance, pluginFake) + err := s.SetPipelineJobs(p) if err != nil { t.Fatal(err) } - err = os.Remove("data") + if len(p.Jobs) != 4 { + t.Fatalf("Number of jobs should be 4 but was %d\n", len(p.Jobs)) + } + err = os.Remove(filepath.Join(os.TempDir(), "gaia.db")) if err != nil { t.Fatal(err) } @@ -56,37 +210,31 @@ func prepareTestData() (pipeline *gaia.Pipeline, pipelineRun *gaia.PipelineRun) ID: hash("Job1"), Title: "Job1", Priority: 0, - Status: gaia.JobSuccess, + Status: gaia.JobWaitingExec, } job2 := gaia.Job{ ID: hash("Job2"), Title: "Job2", Priority: 10, - Status: gaia.JobSuccess, + Status: gaia.JobWaitingExec, } job3 := gaia.Job{ ID: hash("Job3"), Title: "Job3", Priority: 20, - Status: gaia.JobSuccess, + Status: gaia.JobWaitingExec, } job4 := gaia.Job{ ID: hash("Job4"), Title: "Job4", Priority: 20, - Status: gaia.JobSuccess, + Status: gaia.JobWaitingExec, } pipeline = &gaia.Pipeline{ ID: 1, Name: "Test Pipeline", Type: gaia.PTypeGolang, - } - pipelineRun = &gaia.PipelineRun{ - ID: 1, - PipelineID: 1, - Status: gaia.RunNotScheduled, - UniqueID: uuid.Must(uuid.NewV4(), nil).String(), Jobs: []gaia.Job{ job1, job2, @@ -94,6 +242,12 @@ func prepareTestData() (pipeline *gaia.Pipeline, pipelineRun *gaia.PipelineRun) job4, }, } + pipelineRun = &gaia.PipelineRun{ + ID: 1, + PipelineID: 1, + Status: gaia.RunNotScheduled, + UniqueID: uuid.Must(uuid.NewV4(), nil).String(), + } return }