diff --git a/cmd/gaia/main.go b/cmd/gaia/main.go
index 3550c860..08faa46a 100644
--- a/cmd/gaia/main.go
+++ b/cmd/gaia/main.go
@@ -13,6 +13,7 @@ import (
"github.com/gaia-pipeline/gaia"
"github.com/gaia-pipeline/gaia/handlers"
"github.com/gaia-pipeline/gaia/pipeline"
+ "github.com/gaia-pipeline/gaia/plugin"
scheduler "github.com/gaia-pipeline/gaia/scheduler"
"github.com/gaia-pipeline/gaia/store"
hclog "github.com/hashicorp/go-hclog"
@@ -132,8 +133,11 @@ func main() {
os.Exit(1)
}
+ // Create new plugin system
+ pS := &plugin.Plugin{}
+
// Initialize scheduler
- scheduler := scheduler.NewScheduler(store)
+ scheduler := scheduler.NewScheduler(store, pS)
err = scheduler.Init()
if err != nil {
gaia.Cfg.Logger.Error("cannot initialize scheduler:", "error", err.Error())
diff --git a/frontend/client/views/pipeline/detail.vue b/frontend/client/views/pipeline/detail.vue
index 99d91cd2..8191704d 100755
--- a/frontend/client/views/pipeline/detail.vue
+++ b/frontend/client/views/pipeline/detail.vue
@@ -90,7 +90,6 @@ export default {
}
],
runsRows: [],
- job: null,
pipelineViewOptions: {
physics: { stabilization: true },
layout: {
@@ -333,11 +332,6 @@ export default {
// Create vis network
// We have to move out the instance out of vue because of https://github.com/almende/vis/issues/2567
window.pipelineView = new Vis.Network(container, data, this.pipelineViewOptions)
-
- // Create an selectNode event
- window.pipelineView.on('selectNode', function (params) {
- this.job = this.nodes.get(params.nodes[0])
- }.bind(this))
}
},
@@ -358,13 +352,8 @@ export default {
},
jobLog () {
- var jobid = null
- if (this.job) {
- jobid = this.job.internalID
- }
-
// Route
- this.$router.push({path: '/pipeline/log', query: { pipelineid: this.pipelineID, runid: this.runID, jobid: jobid }})
+ this.$router.push({path: '/pipeline/log', query: { pipelineid: this.pipelineID, runid: this.runID }})
},
startPipeline (pipelineid) {
diff --git a/frontend/client/views/pipeline/log.vue b/frontend/client/views/pipeline/log.vue
index a9c765f0..38cad099 100755
--- a/frontend/client/views/pipeline/log.vue
+++ b/frontend/client/views/pipeline/log.vue
@@ -22,9 +22,7 @@ export default {
logText: '',
jobRunning: true,
runID: null,
- pipelineID: null,
- jobID: null,
- currentPath: ''
+ pipelineID: null
}
},
@@ -36,26 +34,28 @@ export default {
this.fetchData()
// periodically update dashboard
- this.intervalID = setInterval(function () {
+ var intervalID = setInterval(function () {
this.fetchData()
}.bind(this), 3000)
- this.currentPath = this.$route.path
+
+ // Append interval id to store
+ this.$store.commit('appendInterval', intervalID)
},
watch: {
'$route': 'fetchData'
},
+ destroyed () {
+ this.$store.commit('clearIntervals')
+ },
+
components: {
Message
},
methods: {
fetchData () {
- if (this.$route.path !== this.currentPath) {
- this.$store.commit('clearIntervals')
- }
-
// look up required url parameters
this.pipelineID = this.$route.query.pipelineid
this.runID = this.$route.query.runid
@@ -63,37 +63,18 @@ export default {
return
}
- // job id is optional. If ommitted, all logs from all jobs
- // are displayed.
- this.jobID = this.$route.query.jobid
-
this.$http
- .get('/api/v1/pipelinerun/' + this.pipelineID + '/' + this.runID + '/log', {
- showProgressBar: false,
- params: {
- jobid: this.jobID
- }
- })
+ .get('/api/v1/pipelinerun/' + this.pipelineID + '/' + this.runID + '/log', { showProgressBar: false })
.then(response => {
if (response.data) {
- // Check if we got multiple objects
- var finished = true
- this.logText = ''
- for (let i = 0, l = response.data.length; i < l; i++) {
- // We add the received log
- this.logText += response.data[i].log
-
- // LF does not work for HTML. Replace with
- this.logText = this.logText.replace(/\n/g, '
')
-
- // Job not finished?
- if (!response.data[i].finished) {
- finished = false
- }
- }
+ // We add the received log
+ this.logText = response.data.log
+
+ // LF does not work for HTML. Replace with
+ this.logText = this.logText.replace(/\n/g, '
')
// All jobs finished. Stop interval.
- if (finished && response.data.length > 0) {
+ if (response.data.finished) {
this.jobRunning = false
clearInterval(this.intervalID)
}
diff --git a/gaia.go b/gaia.go
index 9e3818a9..32171d7c 100644
--- a/gaia.go
+++ b/gaia.go
@@ -66,6 +66,9 @@ const (
// LogsFolderName represents the Name of the logs folder in pipeline run folder
LogsFolderName = "logs"
+
+ // LogsFileName represents the file name of the logs output
+ LogsFileName = "output.log"
)
// User is the user object
diff --git a/handlers/pipeline_run.go b/handlers/pipeline_run.go
index 2d250f3c..bb202da3 100644
--- a/handlers/pipeline_run.go
+++ b/handlers/pipeline_run.go
@@ -5,7 +5,6 @@ import (
"net/http"
"os"
"path/filepath"
- "sort"
"strconv"
"github.com/gaia-pipeline/gaia"
@@ -84,20 +83,15 @@ func PipelineGetLatestRun(c echo.Context) error {
return c.JSON(http.StatusOK, run)
}
-// GetJobLogs returns jobs for a given job.
-// If no jobID is given, a collection of all jobs logs will be returned.
+// GetJobLogs returns logs from a pipeline run.
//
// Required parameters:
// pipelineid - Related pipeline id
// pipelinerunid - Related pipeline run id
-//
-// Optional parameters:
-// jobid - Job id
func GetJobLogs(c echo.Context) error {
// Get parameters and validate
pipelineID := c.Param("pipelineid")
pipelineRunID := c.Param("runid")
- jobID := c.QueryParam("jobid")
// Transform pipelineid to int
p, err := strconv.Atoi(pipelineID)
@@ -111,92 +105,31 @@ func GetJobLogs(c echo.Context) error {
return c.String(http.StatusBadRequest, "invalid pipeline run id given")
}
- // Get pipeline run from store
run, err := storeService.PipelineGetRunByPipelineIDAndID(p, r)
if err != nil {
return c.String(http.StatusBadRequest, "cannot find pipeline run with given pipeline id and pipeline run id")
}
- // jobID is not empty, just return the logs from this job
- if jobID != "" {
- for _, job := range run.Jobs {
- if strconv.FormatUint(uint64(job.ID), 10) == jobID {
- // Get logs
- jL, err := getLogs(pipelineID, pipelineRunID, jobID, false)
- if err != nil {
- return c.String(http.StatusBadRequest, err.Error())
- }
-
- // Check if job is finished
- if job.Status == gaia.JobSuccess || job.Status == gaia.JobFailed {
- jL.Finished = true
- }
-
- // We always return an array.
- // It makes a bit easier in the frontend.
- jobLogsList := []jobLogs{}
- jobLogsList = append(jobLogsList, *jL)
- return c.JSON(http.StatusOK, jobLogsList)
- }
- }
+ // Create return object
+ jL := jobLogs{}
- // Logs for given job id not found
- return c.String(http.StatusBadRequest, "cannot find job with given job id")
+ // Determine if job has been finished
+ if run.Status == gaia.RunFailed || run.Status == gaia.RunSuccess {
+ jL.Finished = true
}
- // Sort the slice. This is important for the order of the returned logs.
- sort.Slice(run.Jobs, func(i, j int) bool {
- return run.Jobs[i].Priority < run.Jobs[j].Priority
- })
-
- // Return a collection of all logs
- jobs := []jobLogs{}
- for _, job := range run.Jobs {
- // Get logs
- jL, err := getLogs(pipelineID, pipelineRunID, strconv.FormatUint(uint64(job.ID), 10), true)
+ // Check if log file exists
+ logFilePath := filepath.Join(gaia.Cfg.WorkspacePath, pipelineID, pipelineRunID, gaia.LogsFolderName, gaia.LogsFileName)
+ if _, err := os.Stat(logFilePath); err == nil {
+ content, err := ioutil.ReadFile(logFilePath)
if err != nil {
- return c.String(http.StatusBadRequest, err.Error())
- }
-
- // No error but also no job logs. Job must be in the queue.
- // We skip it so no error will break things.
- if jL == nil {
- continue
- }
-
- // Check if job is finished
- if job.Status == gaia.JobSuccess || job.Status == gaia.JobFailed {
- jL.Finished = true
+ return c.String(http.StatusInternalServerError, "cannot read pipeline run log file")
}
- jobs = append(jobs, *jL)
+ // Convert logs
+ jL.Log = string(content)
}
// Return logs
- return c.JSON(http.StatusOK, jobs)
-}
-
-func getLogs(pipelineID, pipelineRunID, jobID string, getAllJobLogs bool) (*jobLogs, error) {
- // Lookup log file
- logFilePath := filepath.Join(gaia.Cfg.WorkspacePath, pipelineID, pipelineRunID, gaia.LogsFolderName, jobID)
-
- // We only check if logs exist when a specific job log was requested.
- // If we don't do this, get all job logs will fail during a pipeline run.
- if _, err := os.Stat(logFilePath); os.IsNotExist(err) {
- if !getAllJobLogs {
- return nil, err
- }
- return nil, nil
- }
-
- // Read file
- content, err := ioutil.ReadFile(logFilePath)
- if err != nil {
- return nil, err
- }
-
- // Create return struct
- return &jobLogs{
- Log: string(content),
- }, nil
+ return c.JSON(http.StatusOK, jL)
}
diff --git a/plugin/plugin.go b/plugin/plugin.go
index 3d5416b0..0a133967 100644
--- a/plugin/plugin.go
+++ b/plugin/plugin.go
@@ -8,6 +8,7 @@ import (
"os/exec"
"github.com/gaia-pipeline/gaia"
+ "github.com/gaia-pipeline/gaia/scheduler"
"github.com/gaia-pipeline/protobuf"
plugin "github.com/hashicorp/go-plugin"
)
@@ -45,23 +46,29 @@ type Plugin struct {
// NewPlugin creates a new instance of Plugin.
// One Plugin instance represents one connection to a plugin.
-//
-// It expects the start command to start the plugin and the log path (including file)
-// where the output should be logged to.
-func NewPlugin(command *exec.Cmd, logPath *string) (p *Plugin, err error) {
- // Allocate
- p = &Plugin{}
+func (p *Plugin) NewPlugin() scheduler.Plugin {
+ return &Plugin{}
+}
+// Connect prepares the log path, starts the plugin, initiates the
+// gRPC connection and looks up the plugin.
+// It's up to the caller to call plugin.Close to shutdown the plugin
+// and close the gRPC connection.
+//
+// It expects the start command for the plugin and the path where
+// the log file should be stored.
+func (p *Plugin) Connect(command *exec.Cmd, logPath *string) error {
// Create log file and open it.
// We will close this file in the close method.
if logPath != nil {
+ var err error
p.logFile, err = os.OpenFile(
*logPath,
os.O_CREATE|os.O_WRONLY,
0666,
)
if err != nil {
- return nil, err
+ return err
}
}
@@ -77,13 +84,6 @@ func NewPlugin(command *exec.Cmd, logPath *string) (p *Plugin, err error) {
Stderr: p.writer,
})
- return p, nil
-}
-
-// Connect starts the plugin, initiates the gRPC connection and looks up the plugin.
-// It's up to the caller to call plugin.Close to shutdown the plugin
-// and close the gRPC connection.
-func (p *Plugin) Connect() error {
// Connect via gRPC
gRPCClient, err := p.client.Client()
if err != nil {
@@ -116,6 +116,10 @@ func (p *Plugin) Execute(j *gaia.Job) error {
// Execute the job
_, err := p.pluginConn.ExecuteJob(job)
+
+ // Flush logs
+ p.writer.Flush()
+
return err
}
diff --git a/scheduler/scheduler.go b/scheduler/scheduler.go
index 5e6ee19b..092c5e27 100644
--- a/scheduler/scheduler.go
+++ b/scheduler/scheduler.go
@@ -10,7 +10,6 @@ import (
"time"
"github.com/gaia-pipeline/gaia"
- "github.com/gaia-pipeline/gaia/plugin"
"github.com/gaia-pipeline/gaia/store"
uuid "github.com/satori/go.uuid"
)
@@ -30,6 +29,26 @@ var (
errCreateCMDForPipeline = errors.New("could not create execute command for plugin")
)
+// Plugin represents the plugin implementation which is used
+// during scheduling and execution.
+type Plugin interface {
+ // NewPlugin creates a new instance of plugin
+ NewPlugin() Plugin
+
+ // Connect initializes the connection with the execution command
+ // and the log path wbere the logs should be stored.
+ Connect(command *exec.Cmd, logPath *string) error
+
+ // Execute executes one job of a pipeline.
+ Execute(j *gaia.Job) error
+
+ // GetJobs returns all real jobs from the pipeline.
+ GetJobs() ([]gaia.Job, error)
+
+ // Close closes the connection and cleansup open file writes.
+ Close()
+}
+
// Scheduler represents the schuler object
type Scheduler struct {
// buffered channel which is used as queue
@@ -38,14 +57,18 @@ type Scheduler struct {
// storeService is an instance of store.
// Use this to talk to the store.
storeService *store.Store
+
+ // pluginSystem is the used plugin system.
+ pluginSystem Plugin
}
// NewScheduler creates a new instance of Scheduler.
-func NewScheduler(store *store.Store) *Scheduler {
+func NewScheduler(store *store.Store, pS Plugin) *Scheduler {
// Create new scheduler
s := &Scheduler{
scheduledRuns: make(chan gaia.PipelineRun, schedulerBufferLimit),
storeService: store,
+ pluginSystem: pS,
}
return s
@@ -79,76 +102,86 @@ func (s *Scheduler) Init() error {
return nil
}
-// work takes work from the scheduled run buffer channel
-// and executes the pipeline. Then repeats.
+// work takes work from the scheduled run buffer channel and starts
+// the preparation and execution of the pipeline. Then repeats.
func (s *Scheduler) work() {
// This worker never stops working.
for {
// Take one scheduled run, block if there are no scheduled pipelines
r := <-s.scheduledRuns
- // Mark the scheduled run as running
- r.Status = gaia.RunRunning
- r.StartDate = time.Now()
+ // Prepare execution and start it
+ s.prepareAndExec(&r)
+ }
+}
- // Update entry in store
- err := s.storeService.PipelinePutRun(&r)
- if err != nil {
- gaia.Cfg.Logger.Debug("could not put pipeline run into store during executing work", "error", err.Error())
- continue
- }
+// prepareAndExec does the real preparation and start the execution.
+func (s *Scheduler) prepareAndExec(r *gaia.PipelineRun) {
+ // Mark the scheduled run as running
+ r.Status = gaia.RunRunning
+ r.StartDate = time.Now()
- // Get related pipeline from pipeline run
- pipeline, err := s.storeService.PipelineGet(r.PipelineID)
- if err != nil {
- gaia.Cfg.Logger.Debug("cannot access pipeline during execution", "error", err.Error())
- r.Status = gaia.RunFailed
- } else if pipeline == nil {
- gaia.Cfg.Logger.Debug("wanted to execute job for pipeline which does not exist", "run", r)
- r.Status = gaia.RunFailed
- }
+ // Update entry in store
+ err := s.storeService.PipelinePutRun(r)
+ if err != nil {
+ gaia.Cfg.Logger.Debug("could not put pipeline run into store during executing work", "error", err.Error())
+ return
+ }
- if r.Status == gaia.RunFailed {
- // Update entry in store
- err = s.storeService.PipelinePutRun(&r)
- if err != nil {
- gaia.Cfg.Logger.Debug("could not put pipeline run into store during executing work", "error", err.Error())
- }
- continue
- }
+ // Get related pipeline from pipeline run
+ pipeline, _ := s.storeService.PipelineGet(r.PipelineID)
- // Get all jobs
- r.Jobs, err = s.getPipelineJobs(pipeline)
- if err != nil {
- gaia.Cfg.Logger.Error("cannot get pipeline jobs before execution", "error", err.Error())
+ // Get all jobs
+ r.Jobs, err = s.getPipelineJobs(pipeline)
+ if err != nil {
+ gaia.Cfg.Logger.Error("cannot get pipeline jobs before execution", "error", err.Error())
- // Update store
- r.Status = gaia.RunFailed
- s.storeService.PipelinePutRun(&r)
- continue
- }
+ // Update store
+ r.Status = gaia.RunFailed
+ s.storeService.PipelinePutRun(r)
+ return
+ }
- // Check if this pipeline has jobs declared
- if len(r.Jobs) == 0 {
- // Finish pipeline run
- s.finishPipelineRun(&r, gaia.RunSuccess)
- continue
- }
+ // Check if this pipeline has jobs declared
+ if len(r.Jobs) == 0 {
+ // Finish pipeline run
+ s.finishPipelineRun(r, gaia.RunSuccess)
+ return
+ }
- // Create logs folder for this run
- path := filepath.Join(gaia.Cfg.WorkspacePath, strconv.Itoa(r.PipelineID), strconv.Itoa(r.ID), gaia.LogsFolderName)
- err = os.MkdirAll(path, 0700)
- if err != nil {
- gaia.Cfg.Logger.Error("cannot create pipeline run folder", "error", err.Error(), "path", path)
- }
+ // Create logs folder for this run
+ path := filepath.Join(gaia.Cfg.WorkspacePath, strconv.Itoa(r.PipelineID), strconv.Itoa(r.ID), gaia.LogsFolderName)
+ err = os.MkdirAll(path, 0700)
+ if err != nil {
+ gaia.Cfg.Logger.Error("cannot create pipeline run folder", "error", err.Error(), "path", path)
+ }
- // Schedule jobs and execute them.
- // Also update the run in the store.
- s.scheduleJobsByPriority(&r, pipeline)
+ // Create the start command for the pipeline
+ c := createPipelineCmd(pipeline)
+ if c == nil {
+ gaia.Cfg.Logger.Debug("cannot create pipeline start command", "error", errCreateCMDForPipeline.Error())
+ s.finishPipelineRun(r, gaia.RunFailed)
+ return
}
+
+ // Create new plugin instance
+ pS := s.pluginSystem.NewPlugin()
+
+ // Connect to plugin(pipeline)
+ path = filepath.Join(path, gaia.LogsFileName)
+ if err := pS.Connect(c, &path); err != nil {
+ gaia.Cfg.Logger.Debug("cannot connect to pipeline", "error", err.Error(), "pipeline", pipeline)
+ s.finishPipelineRun(r, gaia.RunFailed)
+ return
+ }
+ defer pS.Close()
+
+ // Schedule jobs and execute them.
+ // Also update the run in the store.
+ s.scheduleJobsByPriority(r, pipeline, pS)
}
-// schedule looks in the store for new work to do and schedules it.
+// schedule looks in the store for new work and schedules it.
func (s *Scheduler) schedule() {
// Do we have space left in our buffer?
if len(s.scheduledRuns) >= schedulerBufferLimit {
@@ -216,40 +249,18 @@ func (s *Scheduler) SchedulePipeline(p *gaia.Pipeline) (*gaia.PipelineRun, error
// executeJob executes a single job.
// This method is blocking.
-func executeJob(job *gaia.Job, p *gaia.Pipeline, logPath string, wg *sync.WaitGroup, triggerSave chan bool) {
+func executeJob(job *gaia.Job, pS Plugin, wg *sync.WaitGroup, triggerSave chan bool) {
defer wg.Done()
defer func() {
triggerSave <- true
}()
- // Set Job to running
+ // Set Job to running and trigger save
job.Status = gaia.JobRunning
-
- // Create the start command for the pipeline
- c := createPipelineCmd(p)
- if c == nil {
- gaia.Cfg.Logger.Debug("cannot execute pipeline job", "error", errCreateCMDForPipeline.Error(), "job", job)
- job.Status = gaia.JobFailed
- return
- }
-
- // Create new plugin instance
- pC, err := plugin.NewPlugin(c, &logPath)
- if err != nil {
- gaia.Cfg.Logger.Error("cannot initiate plugin before job execution", "error", err.Error())
- return
- }
-
- // Connect to plugin(pipeline)
- if err := pC.Connect(); err != nil {
- gaia.Cfg.Logger.Debug("cannot connect to pipeline", "error", err.Error(), "pipeline", p)
- job.Status = gaia.JobFailed
- return
- }
- defer pC.Close()
+ triggerSave <- true
// Execute job
- if err := pC.Execute(job); err != nil {
+ if err := pS.Execute(job); err != nil {
// TODO: Show it to user
gaia.Cfg.Logger.Debug("error during job execution", "error", err.Error(), "job", job)
job.Status = gaia.JobFailed
@@ -262,7 +273,7 @@ func executeJob(job *gaia.Job, p *gaia.Pipeline, logPath string, wg *sync.WaitGr
// scheduleJobsByPriority schedules the given jobs by their respective
// priority. This method is designed to be recursive and blocking.
// If jobs have the same priority, they will be executed in parallel.
-func (s *Scheduler) scheduleJobsByPriority(r *gaia.PipelineRun, p *gaia.Pipeline) {
+func (s *Scheduler) scheduleJobsByPriority(r *gaia.PipelineRun, p *gaia.Pipeline, pS Plugin) {
// Do a prescheduling and set it to the first waiting job
var lowestPrio int64
for _, job := range r.Jobs {
@@ -289,9 +300,7 @@ func (s *Scheduler) scheduleJobsByPriority(r *gaia.PipelineRun, p *gaia.Pipeline
wg.Add(1)
// Execute this job in a separate goroutine
- path := filepath.Join(gaia.Cfg.WorkspacePath, strconv.Itoa(r.PipelineID), strconv.Itoa(r.ID), gaia.LogsFolderName)
- path = filepath.Join(path, strconv.FormatUint(uint64(job.ID), 10))
- go executeJob(&r.Jobs[id], p, path, &wg, triggerSave)
+ go executeJob(&r.Jobs[id], pS, &wg, triggerSave)
}
}
@@ -322,7 +331,7 @@ func (s *Scheduler) scheduleJobsByPriority(r *gaia.PipelineRun, p *gaia.Pipeline
}
// Run scheduleJobsByPriority again until all jobs have been executed
- s.scheduleJobsByPriority(r, p)
+ s.scheduleJobsByPriority(r, p, pS)
}
// getJobResultsAndStore
@@ -342,21 +351,17 @@ func (s *Scheduler) getPipelineJobs(p *gaia.Pipeline) ([]gaia.Job, error) {
return nil, errCreateCMDForPipeline
}
- // Create new plugin instance
- pC, err := plugin.NewPlugin(c, nil)
- if err != nil {
- gaia.Cfg.Logger.Error("cannot initiate plugin", "error", err.Error())
- return nil, err
- }
+ // Create new Plugin instance
+ pS := s.pluginSystem.NewPlugin()
// Connect to plugin(pipeline)
- if err := pC.Connect(); err != nil {
+ if err := pS.Connect(c, nil); err != nil {
gaia.Cfg.Logger.Debug("cannot connect to pipeline", "error", err.Error(), "pipeline", p)
return nil, err
}
- defer pC.Close()
+ defer pS.Close()
- return pC.GetJobs()
+ return pS.GetJobs()
}
// SetPipelineJobs uses the plugin system to get all jobs from the given pipeline.
diff --git a/scheduler/scheduler_test.go b/scheduler/scheduler_test.go
index f36069a0..86d39fdd 100644
--- a/scheduler/scheduler_test.go
+++ b/scheduler/scheduler_test.go
@@ -1,35 +1,82 @@
package scheduler
import (
- "fmt"
"hash/fnv"
"os"
+ "os/exec"
+ "path/filepath"
"testing"
+ "time"
"github.com/gaia-pipeline/gaia"
"github.com/gaia-pipeline/gaia/store"
+ hclog "github.com/hashicorp/go-hclog"
uuid "github.com/satori/go.uuid"
)
-func TestScheduleJobsByPriority(t *testing.T) {
+type PluginFake struct {
+ // Fake struct
+ jobs []gaia.Job
+}
+
+var pluginFake *PluginFake
+
+func (p *PluginFake) NewPlugin() Plugin {
+ return &PluginFake{}
+}
+func (p *PluginFake) Connect(cmd *exec.Cmd, logPath *string) error { return nil }
+func (p *PluginFake) Execute(j *gaia.Job) error { return nil }
+func (p *PluginFake) GetJobs() ([]gaia.Job, error) { return pluginFake.jobs, nil }
+func (p *PluginFake) Close() {}
+
+func TestInit(t *testing.T) {
gaia.Cfg = &gaia.Config{}
storeInstance := store.NewStore()
- gaia.Cfg.DataPath = "data"
+ gaia.Cfg.DataPath = os.TempDir()
+ gaia.Cfg.WorkspacePath = filepath.Join(os.TempDir(), "tmp")
gaia.Cfg.Bolt.Mode = 0600
-
- // Create test folder
- err := os.MkdirAll(gaia.Cfg.DataPath, 0700)
+ gaia.Cfg.Logger = hclog.New(&hclog.LoggerOptions{
+ Level: hclog.Trace,
+ Output: hclog.DefaultOutput,
+ Name: "Gaia",
+ })
+ gaia.Cfg.Worker = "2"
+ if err := storeInstance.Init(); err != nil {
+ t.Fatal(err)
+ }
+ pluginFake = &PluginFake{}
+ s := NewScheduler(storeInstance, pluginFake)
+ err := s.Init()
+ if err != nil {
+ t.Fatal(err)
+ }
+ err = os.Remove(filepath.Join(os.TempDir(), "gaia.db"))
if err != nil {
- fmt.Printf("cannot create data folder: %s\n", err.Error())
t.Fatal(err)
}
+}
+
+func TestPrepareAndExec(t *testing.T) {
+ gaia.Cfg = &gaia.Config{}
+ storeInstance := store.NewStore()
+ gaia.Cfg.DataPath = os.TempDir()
+ gaia.Cfg.WorkspacePath = filepath.Join(os.TempDir(), "tmp")
+ gaia.Cfg.Bolt.Mode = 0600
+ gaia.Cfg.Logger = hclog.New(&hclog.LoggerOptions{
+ Level: hclog.Trace,
+ Output: hclog.DefaultOutput,
+ Name: "Gaia",
+ })
- if err = storeInstance.Init(); err != nil {
+ if err := storeInstance.Init(); err != nil {
t.Fatal(err)
}
p, r := prepareTestData()
- s := NewScheduler(storeInstance)
- s.scheduleJobsByPriority(r, p)
+ storeInstance.PipelinePut(p)
+ pluginFake = &PluginFake{}
+ pluginFake.jobs = p.Jobs
+ s := NewScheduler(storeInstance, pluginFake)
+ s.prepareAndExec(r)
// Iterate jobs
for _, job := range r.Jobs {
@@ -39,13 +86,120 @@ func TestScheduleJobsByPriority(t *testing.T) {
t.Logf("Job %s has been executed...", job.Title)
}
}
+ err := os.Remove(filepath.Join(os.TempDir(), "gaia.db"))
+ if err != nil {
+ t.Fatal(err)
+ }
+}
+
+func TestSchedulePipeline(t *testing.T) {
+ gaia.Cfg = &gaia.Config{}
+ storeInstance := store.NewStore()
+ gaia.Cfg.DataPath = os.TempDir()
+ gaia.Cfg.WorkspacePath = filepath.Join(os.TempDir(), "tmp")
+ gaia.Cfg.Bolt.Mode = 0600
+ gaia.Cfg.Logger = hclog.New(&hclog.LoggerOptions{
+ Level: hclog.Trace,
+ Output: hclog.DefaultOutput,
+ Name: "Gaia",
+ })
+ gaia.Cfg.Worker = "2"
+ if err := storeInstance.Init(); err != nil {
+ t.Fatal(err)
+ }
+ p, _ := prepareTestData()
+ storeInstance.PipelinePut(p)
+ pluginFake = &PluginFake{}
+ pluginFake.jobs = p.Jobs
+ s := NewScheduler(storeInstance, pluginFake)
+ err := s.Init()
+ if err != nil {
+ t.Fatal(err)
+ }
+ _, err = s.SchedulePipeline(p)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ err = os.Remove(filepath.Join(os.TempDir(), "gaia.db"))
+ if err != nil {
+ t.Fatal(err)
+ }
+}
+
+func TestSchedule(t *testing.T) {
+ gaia.Cfg = &gaia.Config{}
+ storeInstance := store.NewStore()
+ gaia.Cfg.DataPath = os.TempDir()
+ gaia.Cfg.WorkspacePath = filepath.Join(os.TempDir(), "tmp")
+ gaia.Cfg.Bolt.Mode = 0600
+ gaia.Cfg.Logger = hclog.New(&hclog.LoggerOptions{
+ Level: hclog.Trace,
+ Output: hclog.DefaultOutput,
+ Name: "Gaia",
+ })
+ gaia.Cfg.Worker = "2"
+ if err := storeInstance.Init(); err != nil {
+ t.Fatal(err)
+ }
+ p, _ := prepareTestData()
+ storeInstance.PipelinePut(p)
+ pluginFake = &PluginFake{}
+ pluginFake.jobs = p.Jobs
+ s := NewScheduler(storeInstance, pluginFake)
+ err := s.Init()
+ if err != nil {
+ t.Fatal(err)
+ }
+ _, err = s.SchedulePipeline(p)
+ if err != nil {
+ t.Fatal(err)
+ }
+ // Wait some time to pickup work and finish.
+ // We have to wait at least 3 seconds for scheduler tick interval.
+ time.Sleep(5 * time.Second)
+ r, err := storeInstance.PipelineGetRunByPipelineIDAndID(p.ID, 1)
+ if err != nil {
+ t.Fatal(err)
+ }
+ for _, job := range r.Jobs {
+ if job.Status != gaia.JobSuccess {
+ t.Fatalf("Job %s has status %s but should be %s!\n", job.Title, string(job.Status), string(gaia.JobSuccess))
+ }
+ }
+ err = os.Remove(filepath.Join(os.TempDir(), "gaia.db"))
+ if err != nil {
+ t.Fatal(err)
+ }
+}
- // cleanup
- err = os.Remove("data/gaia.db")
+func TestSetPipelineJobs(t *testing.T) {
+ gaia.Cfg = &gaia.Config{}
+ storeInstance := store.NewStore()
+ gaia.Cfg.DataPath = os.TempDir()
+ gaia.Cfg.WorkspacePath = filepath.Join(os.TempDir(), "tmp")
+ gaia.Cfg.Bolt.Mode = 0600
+ gaia.Cfg.Logger = hclog.New(&hclog.LoggerOptions{
+ Level: hclog.Trace,
+ Output: hclog.DefaultOutput,
+ Name: "Gaia",
+ })
+ if err := storeInstance.Init(); err != nil {
+ t.Fatal(err)
+ }
+ p, _ := prepareTestData()
+ pluginFake = &PluginFake{}
+ pluginFake.jobs = p.Jobs
+ p.Jobs = nil
+ s := NewScheduler(storeInstance, pluginFake)
+ err := s.SetPipelineJobs(p)
if err != nil {
t.Fatal(err)
}
- err = os.Remove("data")
+ if len(p.Jobs) != 4 {
+ t.Fatalf("Number of jobs should be 4 but was %d\n", len(p.Jobs))
+ }
+ err = os.Remove(filepath.Join(os.TempDir(), "gaia.db"))
if err != nil {
t.Fatal(err)
}
@@ -56,37 +210,31 @@ func prepareTestData() (pipeline *gaia.Pipeline, pipelineRun *gaia.PipelineRun)
ID: hash("Job1"),
Title: "Job1",
Priority: 0,
- Status: gaia.JobSuccess,
+ Status: gaia.JobWaitingExec,
}
job2 := gaia.Job{
ID: hash("Job2"),
Title: "Job2",
Priority: 10,
- Status: gaia.JobSuccess,
+ Status: gaia.JobWaitingExec,
}
job3 := gaia.Job{
ID: hash("Job3"),
Title: "Job3",
Priority: 20,
- Status: gaia.JobSuccess,
+ Status: gaia.JobWaitingExec,
}
job4 := gaia.Job{
ID: hash("Job4"),
Title: "Job4",
Priority: 20,
- Status: gaia.JobSuccess,
+ Status: gaia.JobWaitingExec,
}
pipeline = &gaia.Pipeline{
ID: 1,
Name: "Test Pipeline",
Type: gaia.PTypeGolang,
- }
- pipelineRun = &gaia.PipelineRun{
- ID: 1,
- PipelineID: 1,
- Status: gaia.RunNotScheduled,
- UniqueID: uuid.Must(uuid.NewV4(), nil).String(),
Jobs: []gaia.Job{
job1,
job2,
@@ -94,6 +242,12 @@ func prepareTestData() (pipeline *gaia.Pipeline, pipelineRun *gaia.PipelineRun)
job4,
},
}
+ pipelineRun = &gaia.PipelineRun{
+ ID: 1,
+ PipelineID: 1,
+ Status: gaia.RunNotScheduled,
+ UniqueID: uuid.Must(uuid.NewV4(), nil).String(),
+ }
return
}