From 51645c3fa97ac4d99ef8e61951874e34227b17b1 Mon Sep 17 00:00:00 2001 From: Lev Date: Tue, 10 Jan 2017 21:02:28 +0200 Subject: [PATCH 01/11] added remote job support --- job/job.go | 26 ++++++++++++ job/runner.go | 110 +++++++++++++++++++++++++++++++++++++++++++++++--- 2 files changed, 131 insertions(+), 5 deletions(-) diff --git a/job/job.go b/job/job.go index 9f87a88b..d413f315 100644 --- a/job/job.go +++ b/job/job.go @@ -76,6 +76,11 @@ type Job struct { // Meta data about successful and failed runs. Metadata Metadata `json:"metadata"` + // Type of the job + JobType jobType `json:"type"` + + RemoteProperties RemoteProperties `json:"remote_proporties"` + // Collection of Job Stats Stats []*JobStat `json:"stats"` @@ -86,6 +91,22 @@ type Job struct { IsDone bool `json:"is_done"` } +type jobType int + +const ( + LocalJob jobType = 0 + iota + RemoteJob +) + +type RemoteProperties struct { + Url string `json:"url"` + Method string `json:"method"` + Body string `json:"body"` + Headers []Header `json:"headers"` + Timeout int `json:"timeout"` + ExpectedResponseCode []int `json:"expected_response_codes"` +} + type Metadata struct { SuccessCount uint `json:"success_count"` LastSuccess time.Time `json:"last_success"` @@ -94,6 +115,11 @@ type Metadata struct { LastAttemptedRun time.Time `json:"last_attempted_run"` } +type Header struct { + Key string `json:"key"` + Value string `json:"value"` +} + // Bytes returns the byte representation of the Job. func (j Job) Bytes() ([]byte, error) { buff := new(bytes.Buffer) diff --git a/job/runner.go b/job/runner.go index 6255c9cd..21e6c9f0 100644 --- a/job/runner.go +++ b/job/runner.go @@ -5,7 +5,10 @@ import ( "os/exec" "time" + "bytes" log "github.com/Sirupsen/logrus" + "net/http" + "strings" ) type JobRunner struct { @@ -18,12 +21,11 @@ type JobRunner struct { } var ( - ErrJobDisabled = errors.New("Job cannot run, as it is disabled") - ErrCmdIsEmpty = errors.New("Job Command is empity.") + ErrJobDisabled = errors.New("Job cannot run, as it is disabled") + ErrCmdIsEmpty = errors.New("Job Command is empty.") + ErrJobTypeInvalid = errors.New("Job Type is not valid.") ) -// Run executes the Job's command, collects metadata around the success -// or failure of the Job's execution, and schedules the next run. func (j *JobRunner) Run(cache JobCache) (*JobStat, Metadata, error) { j.job.lock.RLock() defer j.job.lock.RUnlock() @@ -40,7 +42,17 @@ func (j *JobRunner) Run(cache JobCache) (*JobStat, Metadata, error) { j.runSetup() for { - err := j.runCmd() + var err error + if j.job.JobType == LocalJob { + log.Debug("Running local job") + err = j.LocalRun() + } else if j.job.JobType == RemoteJob { + log.Debug("Running remote job") + err = j.RemoteRun() + } else { + err = ErrJobTypeInvalid + } + if err != nil { // Log Error in Metadata // TODO - Error Reporting, email error @@ -85,6 +97,44 @@ func (j *JobRunner) Run(cache JobCache) (*JobStat, Metadata, error) { return j.currentStat, j.meta, nil } +// LocalRun executes the Job's local shell command +func (j *JobRunner) LocalRun() error { + return j.runCmd() +} + +// RemoteRun sends a http request, and checks if the response is valid in time, +func (j *JobRunner) RemoteRun() error { + // Calculate a response timeout + timeout := j.responseTimeout() + + httpClient := http.Client{ + Timeout: timeout, + } + + // Normalize the method passed by the user + method := strings.ToUpper(j.job.RemoteProperties.Method) + bodyBuffer := bytes.NewBufferString(j.job.RemoteProperties.Body) + req, err := http.NewRequest(method, j.job.RemoteProperties.Url, bodyBuffer) + if err != nil { + return err + } + + // Set default or user's passed headers + j.setHeaders(req) + + res, err := httpClient.Do(req) + if err != nil { + return err + } + + // Check if we got any of the status codes the user asked for + if j.checkExpected(res.StatusCode) { + return nil + } else { + return errors.New(res.Status) + } +} + func (j *JobRunner) runCmd() error { j.numberOfAttempts++ @@ -133,3 +183,53 @@ func (j *JobRunner) collectStats(success bool) { j.currentStat.Success = success j.currentStat.NumberOfRetries = j.job.Retries - j.currentRetries } + +func (j *JobRunner) checkExpected(statusCode int) bool { + // If no expected response codes passed, add 200 status code as expected + if len(j.job.RemoteProperties.ExpectedResponseCode) == 0 { + j.job.RemoteProperties.ExpectedResponseCode = append(j.job.RemoteProperties.ExpectedResponseCode, 200) + } + for _, expected := range j.job.RemoteProperties.ExpectedResponseCode { + if expected == statusCode { + return true + } + } + + return false +} + +func (j *JobRunner) responseTimeout() time.Duration { + responseTimeout := time.Duration(j.job.RemoteProperties.Timeout) + if responseTimeout == 0 { + + // set default to 30 seconds + responseTimeout = 30 + } + return time.Duration(time.Duration(responseTimeout) * time.Second) +} + +func (j *JobRunner) setHeaders(req *http.Request) { + // A valid assumption is that the user is sending something in json cause we're past 2017, check if the user + // already added it, if not, add it to the header + if !j.keyInHeaders("Content-Type", j.job.RemoteProperties.Headers) { + jsonContentType := "application/json" + req.Header.Set("Content-Type", jsonContentType) + + // Add the new header to the job properties + j.job.RemoteProperties.Headers = append(j.job.RemoteProperties.Headers, Header{"Content-Type", jsonContentType}) + } + + // Set any custom headers + for _, header := range j.job.RemoteProperties.Headers { + req.Header.Set(header.Key, header.Value) + } +} + +func (j *JobRunner) keyInHeaders(keyExpected string, headers []Header) bool { + for _, header := range headers { + if header.Key == keyExpected { + return true + } + } + return false +} From 045c71a47e830ba416cdfb603fe3ce48134f47b5 Mon Sep 17 00:00:00 2001 From: Lev Date: Tue, 10 Jan 2017 21:05:57 +0200 Subject: [PATCH 02/11] little cleaning --- job/job.go | 12 ++++++------ job/runner.go | 14 +++++++------- 2 files changed, 13 insertions(+), 13 deletions(-) diff --git a/job/job.go b/job/job.go index d413f315..1447b448 100644 --- a/job/job.go +++ b/job/job.go @@ -99,12 +99,12 @@ const ( ) type RemoteProperties struct { - Url string `json:"url"` - Method string `json:"method"` - Body string `json:"body"` - Headers []Header `json:"headers"` - Timeout int `json:"timeout"` - ExpectedResponseCode []int `json:"expected_response_codes"` + Url string `json:"url"` + Method string `json:"method"` + Body string `json:"body"` + Headers []Header `json:"headers"` + Timeout int `json:"timeout"` + ExpectedResponseCodes []int `json:"expected_response_codes"` } type Metadata struct { diff --git a/job/runner.go b/job/runner.go index 21e6c9f0..cad0b195 100644 --- a/job/runner.go +++ b/job/runner.go @@ -1,14 +1,14 @@ package job import ( + "strings" "errors" - "os/exec" "time" - "bytes" - log "github.com/Sirupsen/logrus" "net/http" - "strings" + "os/exec" + + log "github.com/Sirupsen/logrus" ) type JobRunner struct { @@ -186,10 +186,10 @@ func (j *JobRunner) collectStats(success bool) { func (j *JobRunner) checkExpected(statusCode int) bool { // If no expected response codes passed, add 200 status code as expected - if len(j.job.RemoteProperties.ExpectedResponseCode) == 0 { - j.job.RemoteProperties.ExpectedResponseCode = append(j.job.RemoteProperties.ExpectedResponseCode, 200) + if len(j.job.RemoteProperties.ExpectedResponseCodes) == 0 { + j.job.RemoteProperties.ExpectedResponseCodes = append(j.job.RemoteProperties.ExpectedResponseCodes, 200) } - for _, expected := range j.job.RemoteProperties.ExpectedResponseCode { + for _, expected := range j.job.RemoteProperties.ExpectedResponseCodes { if expected == statusCode { return true } From 85579ee068c4eaed31c38a4258ee8850d8ae8d9b Mon Sep 17 00:00:00 2001 From: Lev Date: Tue, 10 Jan 2017 21:51:12 +0200 Subject: [PATCH 03/11] remove reduntant --- job/runner.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/job/runner.go b/job/runner.go index cad0b195..e5ee787c 100644 --- a/job/runner.go +++ b/job/runner.go @@ -199,7 +199,7 @@ func (j *JobRunner) checkExpected(statusCode int) bool { } func (j *JobRunner) responseTimeout() time.Duration { - responseTimeout := time.Duration(j.job.RemoteProperties.Timeout) + responseTimeout := j.job.RemoteProperties.Timeout if responseTimeout == 0 { // set default to 30 seconds From 8913b82cfbce8ef3e2394fa8b7c9b897a0efb080 Mon Sep 17 00:00:00 2001 From: Lev Date: Tue, 10 Jan 2017 21:52:27 +0200 Subject: [PATCH 04/11] remove more reduntant --- job/runner.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/job/runner.go b/job/runner.go index e5ee787c..b724d169 100644 --- a/job/runner.go +++ b/job/runner.go @@ -205,7 +205,7 @@ func (j *JobRunner) responseTimeout() time.Duration { // set default to 30 seconds responseTimeout = 30 } - return time.Duration(time.Duration(responseTimeout) * time.Second) + return time.Duration(responseTimeout) * time.Second } func (j *JobRunner) setHeaders(req *http.Request) { From 5c9bcb3d0a579fd8fd8bcb2085eb4e60a18ecab0 Mon Sep 17 00:00:00 2001 From: Lev Date: Wed, 11 Jan 2017 00:18:22 +0200 Subject: [PATCH 05/11] added tests and removed coveralls from circleci.yml --- api/api_test.go | 44 +++++++++++++++++++++++++++++++++++++++++++ circle.yml | 6 ------ job/job.go | 33 ++++++++++++++++++++++++-------- job/job_test.go | 48 +++++++++++++++++++++++++++++++++++++++++++++++ job/runner.go | 9 +++++++-- job/test_utils.go | 9 +++++++++ 6 files changed, 133 insertions(+), 16 deletions(-) diff --git a/api/api_test.go b/api/api_test.go index 34a12d4e..938d3c97 100644 --- a/api/api_test.go +++ b/api/api_test.go @@ -32,6 +32,25 @@ func generateNewJobMap() map[string]string { "owner": "aj@ajvb.me", } } + +func generateNewRemoteJobMap() map[string]interface{} { + scheduleTime := time.Now().Add(time.Minute * 5) + repeat := 1 + delay := "P1DT10M10S" + parsedTime := scheduleTime.Format(time.RFC3339) + scheduleStr := fmt.Sprintf("R%d/%s/%s", repeat, parsedTime, delay) + + return map[string]interface{}{ + "schedule": scheduleStr, + "name": "mock_remote_job", + "owner": "aj@ajvb.me", + "type": 1, + "remote_properties": map[string]string{ + "url": "http://example.com", + }, + } +} + func generateJobAndCache() (*job.MemoryJobCache, *job.Job) { cache := job.NewMockCache() j := job.GetMockJobWithGenericSchedule() @@ -70,6 +89,31 @@ func (a *ApiTestSuite) TestHandleAddJob() { a.Equal(defaultOwner, retrievedJob.Owner) a.Equal(w.Code, http.StatusCreated) } + +func (a *ApiTestSuite) TestHandleAddRemoteJob() { + t := a.T() + cache := job.NewMockCache() + jobMap := generateNewRemoteJobMap() + jobMap["owner"] = "" + defaultOwner := "aj+tester@ajvb.me" + handler := HandleAddJob(cache, defaultOwner) + + jsonJobMap, err := json.Marshal(jobMap) + a.NoError(err) + w, req := setupTestReq(t, "POST", ApiJobPath, jsonJobMap) + handler(w, req) + + var addJobResp AddJobResponse + err = json.Unmarshal(w.Body.Bytes(), &addJobResp) + a.NoError(err) + retrievedJob, err := cache.Get(addJobResp.Id) + a.NoError(err) + a.Equal(jobMap["name"], retrievedJob.Name) + a.NotEqual(jobMap["owner"], retrievedJob.Owner) + a.Equal(defaultOwner, retrievedJob.Owner) + a.Equal(w.Code, http.StatusCreated) +} + func (a *ApiTestSuite) TestHandleAddJobFailureBadJson() { t := a.T() cache := job.NewMockCache() diff --git a/circle.yml b/circle.yml index 6c5f1d1d..1ca8b243 100644 --- a/circle.yml +++ b/circle.yml @@ -20,14 +20,8 @@ dependencies: pwd: ../gopath/src/${REPO_PATH} test: - pre: - - go get github.com/axw/gocov/gocov github.com/mattn/goveralls golang.org/x/tools/cmd/cover github.com/bluesuncorp/overalls: - pwd: ../gopath/src/${REPO_PATH} override: - go test -race ./...: pwd: ../gopath/src/${REPO_PATH} - $GOPATH/bin/overalls -project=${REPO_PATH} -covermode=count -debug -ignore=.git,Godeps: pwd: ../gopath/src/${REPO_PATH} - post: - - $GOPATH/bin/goveralls -coverprofile=overalls.coverprofile -service=circle-ci -repotoken=$COVERALLS_KEY: - pwd: ../gopath/src/${REPO_PATH} diff --git a/job/job.go b/job/job.go index 1447b448..a122cae3 100644 --- a/job/job.go +++ b/job/job.go @@ -22,7 +22,9 @@ var ( RFC3339WithoutTimezone = "2006-01-02T15:04:05" - ErrInvalidJob = errors.New("Invalid Job. Job's must contain a Name and a Command field") + ErrInvalidJob = errors.New("Invalid Local Job. Job's must contain a Name and a Command field") + ErrInvalidRemoteJob = errors.New("Invalid Remote Job. Job's must contain a Name and a url field") + ErrInvalidJobType = errors.New("Invalid Job type. Types supported: 0 for local and 1 for remote") ) func init() { @@ -79,6 +81,7 @@ type Job struct { // Type of the job JobType jobType `json:"type"` + // Custom properties for the remote job type RemoteProperties RemoteProperties `json:"remote_proporties"` // Collection of Job Stats @@ -101,9 +104,17 @@ const ( type RemoteProperties struct { Url string `json:"url"` Method string `json:"method"` + + // A body to attach to the http request Body string `json:"body"` + + // A list of headers to add to http request (e.g. [{"key": "charset", "value": "UTF-8"}]) Headers []Header `json:"headers"` + + // A timeout property for the http request in seconds Timeout int `json:"timeout"` + + // A list of expected response codes (e.g. [200, 201]) ExpectedResponseCodes []int `json:"expected_response_codes"` } @@ -150,13 +161,6 @@ func (j *Job) Init(cache JobCache) error { j.lock.Lock() defer j.lock.Unlock() - // Job Validation - // TODO: Move this to a seperated method? - if j.Name == "" || j.Command == "" { - log.Errorf(ErrInvalidJob.Error()) - return ErrInvalidJob - } - u4, err := uuid.NewV4() if err != nil { log.Errorf("Error occured when generating uuid: %s", err) @@ -458,3 +462,16 @@ func (j *Job) ShouldStartWaiting() bool { } return true } + +func (j *Job) validation() error { + if j.JobType == 0 && (j.Name == "" || j.Command == "") { + log.Errorf(ErrInvalidJob.Error()) + return ErrInvalidJob + } else if j.JobType == 1 && (j.Name == "" || j.RemoteProperties.Url == ""){ + log.Errorf(ErrInvalidRemoteJob.Error()) + return ErrInvalidRemoteJob + } else { + log.Errorf(ErrInvalidJobType.Error()) + return ErrInvalidJobType + } +} diff --git a/job/job_test.go b/job/job_test.go index c30d1ec0..03b274ab 100644 --- a/job/job_test.go +++ b/job/job_test.go @@ -5,6 +5,8 @@ import ( "strings" "testing" "time" + "net/http/httptest" + "net/http" "github.com/stretchr/testify/assert" ) @@ -927,3 +929,49 @@ func TestDependentJobsParentJobGetsDeleted(t *testing.T) { assert.True(t, len(j.ParentJobs) == 1) j.lock.RUnlock() } + +func TestRemoteJobRunner(t *testing.T) { + testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + fmt.Fprintln(w, "Hello, client") + })) + defer testServer.Close() + + mockRemoteJob := GetMockRemoteJob(RemoteProperties{ + Url: testServer.URL, + }) + + cache := NewMockCache() + mockRemoteJob.Run(cache) + assert.True(t, mockRemoteJob.Metadata.SuccessCount == 1) +} + +func TestRemoteJobBadStatus(t *testing.T) { + testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + http.Error(w, "something failed", http.StatusInternalServerError) + })) + defer testServer.Close() + + mockRemoteJob := GetMockRemoteJob(RemoteProperties{ + Url: testServer.URL, + }) + + cache := NewMockCache() + mockRemoteJob.Run(cache) + assert.True(t, mockRemoteJob.Metadata.SuccessCount == 0) +} + +func TestRemoteJobBadStatusSuccess(t *testing.T) { + testServer := httptest.NewServer(http.HandlerFunc(func(w http.ResponseWriter, r *http.Request) { + http.Error(w, "something failed", http.StatusInternalServerError) + })) + defer testServer.Close() + + mockRemoteJob := GetMockRemoteJob(RemoteProperties{ + Url: testServer.URL, + ExpectedResponseCodes: []int{500}, + }) + + cache := NewMockCache() + mockRemoteJob.Run(cache) + assert.True(t, mockRemoteJob.Metadata.SuccessCount == 1) +} diff --git a/job/runner.go b/job/runner.go index b724d169..45139ef2 100644 --- a/job/runner.go +++ b/job/runner.go @@ -26,6 +26,8 @@ var ( ErrJobTypeInvalid = errors.New("Job Type is not valid.") ) +// Run calls the appropiate run function, collects metadata around the success +// or failure of the Job's execution, and schedules the next run. func (j *JobRunner) Run(cache JobCache) (*JobStat, Metadata, error) { j.job.lock.RLock() defer j.job.lock.RUnlock() @@ -122,6 +124,7 @@ func (j *JobRunner) RemoteRun() error { // Set default or user's passed headers j.setHeaders(req) + // Do the request res, err := httpClient.Do(req) if err != nil { return err @@ -198,6 +201,7 @@ func (j *JobRunner) checkExpected(statusCode int) bool { return false } +// responseTimeout sets a default timeout if none specified func (j *JobRunner) responseTimeout() time.Duration { responseTimeout := j.job.RemoteProperties.Timeout if responseTimeout == 0 { @@ -208,9 +212,9 @@ func (j *JobRunner) responseTimeout() time.Duration { return time.Duration(responseTimeout) * time.Second } +// setHeaders sets default and user specific headers to the http request func (j *JobRunner) setHeaders(req *http.Request) { - // A valid assumption is that the user is sending something in json cause we're past 2017, check if the user - // already added it, if not, add it to the header + // A valid assumption is that the user is sending something in json cause we're past 2017 if !j.keyInHeaders("Content-Type", j.job.RemoteProperties.Headers) { jsonContentType := "application/json" req.Header.Set("Content-Type", jsonContentType) @@ -225,6 +229,7 @@ func (j *JobRunner) setHeaders(req *http.Request) { } } +// keyInHeaders checks to see if a key of the name keyExpected exists in the headers list func (j *JobRunner) keyInHeaders(keyExpected string, headers []Header) bool { for _, header := range headers { if header.Key == keyExpected { diff --git a/job/test_utils.go b/job/test_utils.go index 21eb79e4..d835be42 100644 --- a/job/test_utils.go +++ b/job/test_utils.go @@ -38,6 +38,15 @@ func GetMockJob() *Job { } } +func GetMockRemoteJob(props RemoteProperties) *Job { + return &Job{ + Name: "mock_remote_job", + Command: "", + JobType: RemoteJob, + RemoteProperties: props, + } +} + func GetMockJobWithSchedule(repeat int, scheduleTime time.Time, delay string) *Job { genericMockJob := GetMockJob() From d81076ce9498fd34956309ae43cba351b96b93f0 Mon Sep 17 00:00:00 2001 From: Lev Date: Wed, 11 Jan 2017 00:35:22 +0200 Subject: [PATCH 06/11] actually validate and fix test --- api/api_test.go | 9 +-------- circle.yml | 2 -- job/job.go | 11 ++++++++++- 3 files changed, 11 insertions(+), 11 deletions(-) diff --git a/api/api_test.go b/api/api_test.go index 938d3c97..4c62efaa 100644 --- a/api/api_test.go +++ b/api/api_test.go @@ -34,18 +34,11 @@ func generateNewJobMap() map[string]string { } func generateNewRemoteJobMap() map[string]interface{} { - scheduleTime := time.Now().Add(time.Minute * 5) - repeat := 1 - delay := "P1DT10M10S" - parsedTime := scheduleTime.Format(time.RFC3339) - scheduleStr := fmt.Sprintf("R%d/%s/%s", repeat, parsedTime, delay) - return map[string]interface{}{ - "schedule": scheduleStr, "name": "mock_remote_job", "owner": "aj@ajvb.me", "type": 1, - "remote_properties": map[string]string{ + "remote_proporties": map[string]string{ "url": "http://example.com", }, } diff --git a/circle.yml b/circle.yml index 1ca8b243..cf7815eb 100644 --- a/circle.yml +++ b/circle.yml @@ -23,5 +23,3 @@ test: override: - go test -race ./...: pwd: ../gopath/src/${REPO_PATH} - - $GOPATH/bin/overalls -project=${REPO_PATH} -covermode=count -debug -ignore=.git,Godeps: - pwd: ../gopath/src/${REPO_PATH} diff --git a/job/job.go b/job/job.go index a122cae3..c0f7bc04 100644 --- a/job/job.go +++ b/job/job.go @@ -161,6 +161,12 @@ func (j *Job) Init(cache JobCache) error { j.lock.Lock() defer j.lock.Unlock() + //validate job type and params + err := j.validation() + if err != nil { + return err + } + u4, err := uuid.NewV4() if err != nil { log.Errorf("Error occured when generating uuid: %s", err) @@ -168,6 +174,7 @@ func (j *Job) Init(cache JobCache) error { } j.Id = u4.String() + // Add Job to the cache. err = cache.Set(j) if err != nil { @@ -470,8 +477,10 @@ func (j *Job) validation() error { } else if j.JobType == 1 && (j.Name == "" || j.RemoteProperties.Url == ""){ log.Errorf(ErrInvalidRemoteJob.Error()) return ErrInvalidRemoteJob - } else { + } else if j.JobType != 0 && j.JobType != 1 { log.Errorf(ErrInvalidJobType.Error()) return ErrInvalidJobType + } else { + return nil } } From d097aef7611d6532ed37b94f5740d543457e8295 Mon Sep 17 00:00:00 2001 From: Lev Date: Wed, 11 Jan 2017 09:09:49 +0200 Subject: [PATCH 07/11] changed to consts --- job/job.go | 6 +++--- job/runner.go | 4 ++-- 2 files changed, 5 insertions(+), 5 deletions(-) diff --git a/job/job.go b/job/job.go index c0f7bc04..cf735c83 100644 --- a/job/job.go +++ b/job/job.go @@ -471,13 +471,13 @@ func (j *Job) ShouldStartWaiting() bool { } func (j *Job) validation() error { - if j.JobType == 0 && (j.Name == "" || j.Command == "") { + if j.JobType == LocalJob && (j.Name == "" || j.Command == "") { log.Errorf(ErrInvalidJob.Error()) return ErrInvalidJob - } else if j.JobType == 1 && (j.Name == "" || j.RemoteProperties.Url == ""){ + } else if j.JobType == RemoteJob && (j.Name == "" || j.RemoteProperties.Url == ""){ log.Errorf(ErrInvalidRemoteJob.Error()) return ErrInvalidRemoteJob - } else if j.JobType != 0 && j.JobType != 1 { + } else if j.JobType != LocalJob && j.JobType != RemoteJob { log.Errorf(ErrInvalidJobType.Error()) return ErrInvalidJobType } else { diff --git a/job/runner.go b/job/runner.go index 45139ef2..179f2383 100644 --- a/job/runner.go +++ b/job/runner.go @@ -223,9 +223,9 @@ func (j *JobRunner) setHeaders(req *http.Request) { j.job.RemoteProperties.Headers = append(j.job.RemoteProperties.Headers, Header{"Content-Type", jsonContentType}) } - // Set any custom headers + // Add any custom headers for _, header := range j.job.RemoteProperties.Headers { - req.Header.Set(header.Key, header.Value) + req.Header.Add(header.Key, header.Value) } } From 9ab60ca4fb5c5e35169ae4a6cf18004d9d26fc6c Mon Sep 17 00:00:00 2001 From: Lev Date: Thu, 26 Jan 2017 20:18:51 +0200 Subject: [PATCH 08/11] changed according to pr comments --- api/api_test.go | 8 ++++---- job/job.go | 42 +++++++++++++++++++----------------------- job/runner.go | 32 +++++++++++--------------------- 3 files changed, 34 insertions(+), 48 deletions(-) diff --git a/api/api_test.go b/api/api_test.go index 4c62efaa..1c1124b0 100644 --- a/api/api_test.go +++ b/api/api_test.go @@ -35,10 +35,10 @@ func generateNewJobMap() map[string]string { func generateNewRemoteJobMap() map[string]interface{} { return map[string]interface{}{ - "name": "mock_remote_job", - "owner": "aj@ajvb.me", - "type": 1, - "remote_proporties": map[string]string{ + "name": "mock_remote_job", + "owner": "aj@ajvb.me", + "type": 1, + "remote_properties": map[string]string{ "url": "http://example.com", }, } diff --git a/job/job.go b/job/job.go index cf735c83..3fc968e1 100644 --- a/job/job.go +++ b/job/job.go @@ -5,6 +5,7 @@ import ( "encoding/gob" "errors" "fmt" + "net/http" "strconv" "strings" "sync" @@ -22,9 +23,9 @@ var ( RFC3339WithoutTimezone = "2006-01-02T15:04:05" - ErrInvalidJob = errors.New("Invalid Local Job. Job's must contain a Name and a Command field") + ErrInvalidJob = errors.New("Invalid Local Job. Job's must contain a Name and a Command field") ErrInvalidRemoteJob = errors.New("Invalid Remote Job. Job's must contain a Name and a url field") - ErrInvalidJobType = errors.New("Invalid Job type. Types supported: 0 for local and 1 for remote") + ErrInvalidJobType = errors.New("Invalid Job type. Types supported: 0 for local and 1 for remote") ) func init() { @@ -82,7 +83,7 @@ type Job struct { JobType jobType `json:"type"` // Custom properties for the remote job type - RemoteProperties RemoteProperties `json:"remote_proporties"` + RemoteProperties RemoteProperties `json:"remote_properties"` // Collection of Job Stats Stats []*JobStat `json:"stats"` @@ -97,25 +98,26 @@ type Job struct { type jobType int const ( - LocalJob jobType = 0 + iota + LocalJob jobType = iota RemoteJob ) +// RemoteProperties Custom properties for the remote job type type RemoteProperties struct { - Url string `json:"url"` - Method string `json:"method"` + Url string `json:"url"` + Method string `json:"method"` // A body to attach to the http request - Body string `json:"body"` + Body string `json:"body"` // A list of headers to add to http request (e.g. [{"key": "charset", "value": "UTF-8"}]) - Headers []Header `json:"headers"` + Headers http.Header `json:"headers"` // A timeout property for the http request in seconds - Timeout int `json:"timeout"` + Timeout int `json:"timeout"` // A list of expected response codes (e.g. [200, 201]) - ExpectedResponseCodes []int `json:"expected_response_codes"` + ExpectedResponseCodes []int `json:"expected_response_codes"` } type Metadata struct { @@ -126,11 +128,6 @@ type Metadata struct { LastAttemptedRun time.Time `json:"last_attempted_run"` } -type Header struct { - Key string `json:"key"` - Value string `json:"value"` -} - // Bytes returns the byte representation of the Job. func (j Job) Bytes() ([]byte, error) { buff := new(bytes.Buffer) @@ -174,7 +171,6 @@ func (j *Job) Init(cache JobCache) error { } j.Id = u4.String() - // Add Job to the cache. err = cache.Set(j) if err != nil { @@ -471,16 +467,16 @@ func (j *Job) ShouldStartWaiting() bool { } func (j *Job) validation() error { + var err error if j.JobType == LocalJob && (j.Name == "" || j.Command == "") { - log.Errorf(ErrInvalidJob.Error()) - return ErrInvalidJob - } else if j.JobType == RemoteJob && (j.Name == "" || j.RemoteProperties.Url == ""){ - log.Errorf(ErrInvalidRemoteJob.Error()) - return ErrInvalidRemoteJob + err = ErrInvalidJob + } else if j.JobType == RemoteJob && (j.Name == "" || j.RemoteProperties.Url == "") { + err = ErrInvalidRemoteJob } else if j.JobType != LocalJob && j.JobType != RemoteJob { - log.Errorf(ErrInvalidJobType.Error()) - return ErrInvalidJobType + err = ErrInvalidJobType } else { return nil } + log.Errorf(err.Error()) + return err } diff --git a/job/runner.go b/job/runner.go index 179f2383..4797a915 100644 --- a/job/runner.go +++ b/job/runner.go @@ -1,12 +1,12 @@ package job import ( - "strings" - "errors" - "time" "bytes" + "errors" "net/http" "os/exec" + "strings" + "time" log "github.com/Sirupsen/logrus" ) @@ -215,26 +215,16 @@ func (j *JobRunner) responseTimeout() time.Duration { // setHeaders sets default and user specific headers to the http request func (j *JobRunner) setHeaders(req *http.Request) { // A valid assumption is that the user is sending something in json cause we're past 2017 - if !j.keyInHeaders("Content-Type", j.job.RemoteProperties.Headers) { + if j.job.RemoteProperties.Headers["Content-Type"] == nil { jsonContentType := "application/json" - req.Header.Set("Content-Type", jsonContentType) - - // Add the new header to the job properties - j.job.RemoteProperties.Headers = append(j.job.RemoteProperties.Headers, Header{"Content-Type", jsonContentType}) - } - // Add any custom headers - for _, header := range j.job.RemoteProperties.Headers { - req.Header.Add(header.Key, header.Value) - } -} + // Set in the request header we are sending to remote host the newly header + req.Header.Set("Content-Type", jsonContentType) -// keyInHeaders checks to see if a key of the name keyExpected exists in the headers list -func (j *JobRunner) keyInHeaders(keyExpected string, headers []Header) bool { - for _, header := range headers { - if header.Key == keyExpected { - return true - } + // Create a new header for our job properties and set the default header + j.job.RemoteProperties.Headers = make(http.Header) + j.job.RemoteProperties.Headers.Set("Content-Type", jsonContentType) + } else { + req.Header = j.job.RemoteProperties.Headers } - return false } From c8e911c41f053dbfd9312a520128313c73300e4d Mon Sep 17 00:00:00 2001 From: Lev Date: Thu, 26 Jan 2017 21:00:31 +0200 Subject: [PATCH 09/11] fix to test, avoiding race --- job/job_test.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/job/job_test.go b/job/job_test.go index 03b274ab..1fa76669 100644 --- a/job/job_test.go +++ b/job/job_test.go @@ -273,9 +273,9 @@ func TestJobEpsilon(t *testing.T) { now := time.Now() + j.lock.RLock() assert.Equal(t, j.Metadata.SuccessCount, uint(0)) assert.Equal(t, j.Metadata.ErrorCount, uint(2)) - j.lock.RLock() assert.WithinDuration(t, j.Metadata.LastError, now, 4*time.Second) assert.WithinDuration(t, j.Metadata.LastAttemptedRun, now, 4*time.Second) assert.True(t, j.Metadata.LastSuccess.IsZero()) From 9b985c5c7a4364e7950fcf4abdc2e26ee17492a9 Mon Sep 17 00:00:00 2001 From: Lev Date: Thu, 26 Jan 2017 21:11:55 +0200 Subject: [PATCH 10/11] no need to init in seperate line --- job/runner.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/job/runner.go b/job/runner.go index 4797a915..3357b4f9 100644 --- a/job/runner.go +++ b/job/runner.go @@ -222,8 +222,7 @@ func (j *JobRunner) setHeaders(req *http.Request) { req.Header.Set("Content-Type", jsonContentType) // Create a new header for our job properties and set the default header - j.job.RemoteProperties.Headers = make(http.Header) - j.job.RemoteProperties.Headers.Set("Content-Type", jsonContentType) + j.job.RemoteProperties.Headers = http.Header{"Content-Type": []string{jsonContentType}} } else { req.Header = j.job.RemoteProperties.Headers } From c27d2cddf03818529f93960cb81b9271aa1bb3c8 Mon Sep 17 00:00:00 2001 From: Lev Date: Thu, 26 Jan 2017 21:59:07 +0200 Subject: [PATCH 11/11] dont reuse shParser --- job/job.go | 8 -------- job/runner.go | 11 ++++++++++- 2 files changed, 10 insertions(+), 9 deletions(-) diff --git a/job/job.go b/job/job.go index 3fc968e1..8f7295cf 100644 --- a/job/job.go +++ b/job/job.go @@ -14,13 +14,10 @@ import ( "github.com/ajvb/kala/utils/iso8601" log "github.com/Sirupsen/logrus" - "github.com/mattn/go-shellwords" "github.com/nu7hatch/gouuid" ) var ( - shParser = shellwords.NewParser() - RFC3339WithoutTimezone = "2006-01-02T15:04:05" ErrInvalidJob = errors.New("Invalid Local Job. Job's must contain a Name and a Command field") @@ -28,11 +25,6 @@ var ( ErrInvalidJobType = errors.New("Invalid Job type. Types supported: 0 for local and 1 for remote") ) -func init() { - shParser.ParseEnv = true - shParser.ParseBacktick = true -} - type Job struct { Name string `json:"name"` Id string `json:"id"` diff --git a/job/runner.go b/job/runner.go index 3357b4f9..1689dca4 100644 --- a/job/runner.go +++ b/job/runner.go @@ -9,6 +9,7 @@ import ( "time" log "github.com/Sirupsen/logrus" + "github.com/mattn/go-shellwords" ) type JobRunner struct { @@ -26,7 +27,7 @@ var ( ErrJobTypeInvalid = errors.New("Job Type is not valid.") ) -// Run calls the appropiate run function, collects metadata around the success +// Run calls the appropriate run function, collects metadata around the success // or failure of the Job's execution, and schedules the next run. func (j *JobRunner) Run(cache JobCache) (*JobStat, Metadata, error) { j.job.lock.RLock() @@ -138,10 +139,18 @@ func (j *JobRunner) RemoteRun() error { } } +func initShParser() *shellwords.Parser { + shParser := shellwords.NewParser() + shParser.ParseEnv = true + shParser.ParseBacktick = true + return shParser +} + func (j *JobRunner) runCmd() error { j.numberOfAttempts++ // Execute command + shParser := initShParser() args, err := shParser.Parse(j.job.Command) if err != nil { return err