Skip to content

Commit

Permalink
Changed pipeline id to autoincrement by store. Implemented first vers…
Browse files Browse the repository at this point in the history
…ion of scheduler
  • Loading branch information
michelvocks committed Mar 6, 2018
1 parent 27cde8d commit da1fe8d
Show file tree
Hide file tree
Showing 8 changed files with 192 additions and 37 deletions.
8 changes: 4 additions & 4 deletions cmd/gaia/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -71,22 +71,22 @@ func main() {
irisInstance = iris.New()

// Initialize store
s := store.NewStore()
err = s.Init()
store := store.NewStore()
err = store.Init()
if err != nil {
gaia.Cfg.Logger.Error("cannot initialize store", "error", err.Error())
os.Exit(1)
}

// Initialize handlers
err = handlers.InitHandlers(irisInstance, s)
err = handlers.InitHandlers(irisInstance, store)
if err != nil {
gaia.Cfg.Logger.Error("cannot initialize handlers", "error", err.Error())
os.Exit(1)
}

// Start ticker. Periodic job to check for new plugins.
pipeline.InitTicker()
pipeline.InitTicker(store)

// Start listen
irisInstance.Run(iris.Addr(":" + gaia.Cfg.ListenPort))
Expand Down
2 changes: 1 addition & 1 deletion gaia.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,7 @@ type User struct {

// Pipeline represents a single pipeline
type Pipeline struct {
ID string `json:"id,omitempty"`
ID int `json:"id,omitempty"`
Name string `json:"name,omitempty"`
Repo GitRepo `json:"repo,omitempty"`
Type PipelineType `json:"type,omitempty"`
Expand Down
14 changes: 13 additions & 1 deletion handlers/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,7 @@ package handlers

import (
"errors"
"strconv"
"strings"
"time"

Expand All @@ -17,6 +18,9 @@ var (

// errPipelineNotFound is thrown when a pipeline was not found with the given id
errPipelineNotFound = errors.New("pipeline not found with the given id")

// errInvalidPipelineID is thrown when the given pipeline id is not valid
errInvalidPipelineID = errors.New("the given pipeline id is not valid")
)

const (
Expand Down Expand Up @@ -214,7 +218,15 @@ func PipelineGetAll(ctx iris.Context) {

// PipelineGet accepts a pipeline id and returns the pipeline object.
func PipelineGet(ctx iris.Context) {
userID := ctx.Params().Get("id")
userIDStr := ctx.Params().Get("id")

// Convert string to int because id is int
userID, err := strconv.Atoi(userIDStr)
if err != nil {
ctx.StatusCode(iris.StatusBadRequest)
ctx.WriteString(errInvalidPipelineID.Error())
return
}

// Look up pipeline for the given id
for pipeline := range pipeline.GlobalActivePipelines.Iter() {
Expand Down
6 changes: 3 additions & 3 deletions pipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,10 +104,10 @@ func (ap *ActivePipelines) Append(p gaia.Pipeline) {
ap.Pipelines = append(ap.Pipelines, p)
}

// Get looks up the pipeline with the given id.
func (ap *ActivePipelines) Get(id string) *gaia.Pipeline {
// GetByName looks up the pipeline by the given name.
func (ap *ActivePipelines) GetByName(n string) *gaia.Pipeline {
for pipeline := range ap.Iter() {
if pipeline.ID == id {
if pipeline.Name == n {
return &pipeline
}
}
Expand Down
54 changes: 54 additions & 0 deletions pipeline/scheduler.go
Original file line number Diff line number Diff line change
@@ -1,10 +1,64 @@
package pipeline

import (
"time"

"github.com/gaia-pipeline/gaia"
"github.com/gaia-pipeline/gaia/plugin"
)

const (
// Maximum buffer limit for scheduler
schedulerBufferLimit = 50

// schedulerIntervalSeconds defines the interval the scheduler will look
// for new work to schedule. Definition in seconds.
schedulerIntervalSeconds = 3
)

// Scheduler represents the schuler object
type Scheduler struct {
// buffered channel which is used as queue
pipelines chan gaia.Pipeline
}

// NewScheduler creates a new instance of Scheduler.
func NewScheduler() *Scheduler {
// Create new scheduler
s := &Scheduler{
pipelines: make(chan gaia.Pipeline, schedulerBufferLimit),
}

return s
}

// Init initializes the scheduler.
func (s *Scheduler) Init() {
// Create a periodic job that fills the scheduler with new pipelines.
schedulerJob := time.NewTicker(schedulerIntervalSeconds * time.Second)
go func() {
for {
select {
case <-schedulerJob.C:
checkActivePipelines()
}
}
}()
}

// Schedule looks in the store for new work to do and schedules it.
func (s *Scheduler) Schedule() {
// Do we have space left in our buffer?
if len(s.pipelines) >= schedulerBufferLimit {
// No space left. Exit.
gaia.Cfg.Logger.Debug("scheduler buffer overflow. Cannot schedule new pipelines...")
return
}

// TODO: Implement schedule

}

// setPipelineJobs uses the plugin system to get all
// jobs from the given pipeline.
// This function is blocking and might take some time.
Expand Down
79 changes: 51 additions & 28 deletions pipeline/ticker.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ import (
"time"

"github.com/gaia-pipeline/gaia"
uuid "github.com/satori/go.uuid"
"github.com/gaia-pipeline/gaia/store"
)

const (
Expand All @@ -19,12 +19,19 @@ const (
tickerIntervalSeconds = 5
)

// storeService is an instance of store.
// Use this to talk to the store.
var storeService *store.Store

// InitTicker inititates the pipeline ticker.
// This periodic job will check for new pipelines.
func InitTicker() {
func InitTicker(s *store.Store) {
// Init global active pipelines slice
GlobalActivePipelines = NewActivePipelines()

// Save store pointer
storeService = s

// Check immediately to make sure we fill the list as fast as possible.
checkActivePipelines()

Expand Down Expand Up @@ -66,7 +73,7 @@ func checkActivePipelines() {
pName := getRealPipelineName(n, pType)
if GlobalActivePipelines.Contains(pName) {
// If Md5Checksum is set, we should check if pipeline has been changed.
p := GlobalActivePipelines.Get(pName)
p := GlobalActivePipelines.GetByName(pName)
if p != nil && p.Md5Checksum != nil {
// Get MD5 Checksum
checksum, err := getMd5Checksum(gaia.Cfg.PipelinePath + string(os.PathSeparator) + file.Name())
Expand All @@ -78,7 +85,7 @@ func checkActivePipelines() {
// Pipeline has been changed?
if bytes.Compare(p.Md5Checksum, checksum) != 0 {
// Let us try again to start the plugin and receive all implemented jobs
setPipelineJobsTicker(p)
setPipelineJobs(p)

// Replace pipeline
if ok := GlobalActivePipelines.Replace(*p); !ok {
Expand All @@ -91,20 +98,51 @@ func checkActivePipelines() {
continue
}

// Create pipeline object and fill it with information
p := gaia.Pipeline{
ID: uuid.Must(uuid.NewV4()).String(),
Name: pName,
Type: pType,
ExecPath: gaia.Cfg.PipelinePath + string(os.PathSeparator) + file.Name(),
Created: time.Now(),
// Get pipeline from store.
pipeline, err := storeService.PipelineGetByName(pName)
if err != nil {
// If we have an error here we are in trouble.
gaia.Cfg.Logger.Error("cannot access pipelines bucket. Data corrupted?", "error", err.Error())
continue
}

// We couldn't finde the pipeline. Create a new one.
var shouldStore = false
if pipeline == nil {
// Create pipeline object and fill it with information
pipeline = &gaia.Pipeline{
Name: pName,
Type: pType,
ExecPath: gaia.Cfg.PipelinePath + string(os.PathSeparator) + file.Name(),
Created: time.Now(),
}

// We should store it
shouldStore = true
}

// We calculate a MD5 Checksum and store it.
// We use this to estimate if a pipeline has been changed.
pipeline.Md5Checksum, err = getMd5Checksum(pipeline.ExecPath)
if err != nil {
gaia.Cfg.Logger.Debug("cannot calculate md5 checksum for pipeline", "error", err.Error(), "pipeline", pipeline)
continue
}

// Let us try to start the plugin and receive all implemented jobs
setPipelineJobsTicker(&p)
setPipelineJobs(pipeline)

// Put pipeline into store only when it was new created.
if shouldStore {
storeService.PipelinePut(pipeline)
}

// We do not update the pipeline in store if it already exists there.
// We only updated the Md5 Checksum and the jobs but this is not importent
// to store and should not have any side effects.

// Append new pipeline
GlobalActivePipelines.Append(p)
GlobalActivePipelines.Append(*pipeline)
}
}
}
Expand Down Expand Up @@ -134,21 +172,6 @@ func getRealPipelineName(n string, pType gaia.PipelineType) string {
return strings.TrimSuffix(n, typeDelimiter+pType.String())
}

func setPipelineJobsTicker(p *gaia.Pipeline) {
err := setPipelineJobs(p)
if err != nil {
// We were not able to get jobs from the pipeline.
// We set the Md5Checksum for later to try it again.
p.Md5Checksum, err = getMd5Checksum(p.ExecPath)
if err != nil {
gaia.Cfg.Logger.Debug("cannot calculate md5 checksum for pipeline", "error", err.Error(), "pipeline", p)
}
} else {
// Reset md5 checksum in case we already set it
p.Md5Checksum = nil
}
}

func getMd5Checksum(file string) ([]byte, error) {
// Open file
f, err := os.Open(file)
Expand Down
58 changes: 58 additions & 0 deletions store/pipelines.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,3 +53,61 @@ func (s *Store) CreatePipelineGet() ([]gaia.CreatePipeline, error) {
})
})
}

// PipelinePut puts a pipeline into the store.
// On persist, the pipeline will get a unique id.
func (s *Store) PipelinePut(p *gaia.Pipeline) error {
return s.db.Update(func(tx *bolt.Tx) error {
// Get pipeline bucket
b := tx.Bucket(pipelineBucket)

// Generate ID for the pipeline.
id, err := b.NextSequence()
if err != nil {
return err
}
p.ID = int(id)

// Marshal pipeline data into bytes.
buf, err := json.Marshal(p)
if err != nil {
return err
}

// Persist bytes to pipelines bucket.
return b.Put(itob(p.ID), buf)
})
}

// PipelineGetByName looks up a pipeline by the given name.
// Returns nil if pipeline was not found.
func (s *Store) PipelineGetByName(n string) (*gaia.Pipeline, error) {
var pipeline *gaia.Pipeline

return pipeline, s.db.View(func(tx *bolt.Tx) error {
// Get bucket
b := tx.Bucket(pipelineBucket)

// Iterate all created pipelines.
return b.ForEach(func(k, v []byte) error {
// create single pipeline object
p := &gaia.Pipeline{}

// Unmarshal
err := json.Unmarshal(v, p)
if err != nil {
return err
}

// Is this pipeline we are looking for?
if p.Name == n {
pipeline = p
}

return nil
})
})
}

// PipelineGetScheduled returns the scheduled pipelines
//func (s *Store) PipelineGetScheduled() ([]gaia.Pipeline, error) {}
8 changes: 8 additions & 0 deletions store/store.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package store

import (
"encoding/binary"
"fmt"
"os"

Expand Down Expand Up @@ -104,3 +105,10 @@ func setupDatabase(s *Store) error {

return nil
}

// itob returns an 8-byte big endian representation of v.
func itob(v int) []byte {
b := make([]byte, 8)
binary.BigEndian.PutUint64(b, uint64(v))
return b
}

0 comments on commit da1fe8d

Please sign in to comment.