Skip to content
This repository has been archived by the owner on Jan 23, 2019. It is now read-only.

Commit

Permalink
Fix reschedule isuues (#22)
Browse files Browse the repository at this point in the history
* remove rescheduling job while running worker stopped

* Seperate jobDone, jobFailed, jobFailedWithException
Support CanDOTimeout

* remove dependency from db & fix some error

* Server and worker restart support
 - set timeout for every running job
 - monitor all running job & remove if timeout expire

* report to client that job failed with timeout exception

* generic db call for all.

* DB test fixed

* fix

* Merge Pull Request mikespook/gearman-go#75, Add sync lock to create job functions

Signed-off-by: sadlil <sadlil@appscode.com>
  • Loading branch information
ashiquzzaman33 authored and sadlil committed Mar 9, 2017
1 parent e009f36 commit 8ddd88c
Show file tree
Hide file tree
Showing 12 changed files with 563 additions and 345 deletions.
2 changes: 2 additions & 0 deletions client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -222,6 +222,8 @@ func (client *Client) do(funcname string, data []byte, flag rt.PT) (handle strin
return "", ErrLostConn
}
var result = make(chan handleOrError, 1)
client.Lock()
defer client.Unlock()
client.innerHandler.put("c", func(resp *Response) {
if resp.DataType == rt.PT_Error {
err = getError(resp.Data)
Expand Down
37 changes: 26 additions & 11 deletions client/client_test.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
package client

import (
"fmt"
"sync"
"testing"
"time"

Expand Down Expand Up @@ -48,7 +50,7 @@ func TestClientEcho(t *testing.T) {
}

func TestClientDoBg(t *testing.T) {
handle, err := client.DoBg("ToUpper", []byte("abcdef"), rt.JobNormal)
handle, err := client.DoBg("scheduledJobTest", []byte("abcdef"), rt.JobNormal)
if err != nil {
t.Error(err)
return
Expand All @@ -61,7 +63,7 @@ func TestClientDoBg(t *testing.T) {
}

func TestClientDoCron(t *testing.T) {
handle, err := client.DoCron("scheduledJobTest", "* * * * *", []byte("test data"))
handle, err := client.DoCron("scheduledJobTest", "* * * * 5", []byte("test data"))
if err != nil {
t.Fatal(err)
}
Expand All @@ -73,7 +75,7 @@ func TestClientDoCron(t *testing.T) {
}

func TestClientDoAt(t *testing.T) {
handle, err := client.DoAt("scheduledJobTest", time.Now().Add(10*time.Second).Unix(), []byte("test data"))
handle, err := client.DoAt("scheduledJobTest", time.Now().Add(20*time.Second).Unix(), []byte("test data"))
if err != nil {
t.Fatal(err)
}
Expand All @@ -85,17 +87,28 @@ func TestClientDoAt(t *testing.T) {
}

func TestClientDo(t *testing.T) {
var wg sync.WaitGroup = sync.WaitGroup{}
wg.Add(1)
jobHandler := func(job *Response) {
str := string(job.Data)
if str == "ABCDEF" {
t.Log(str)
} else {
t.Errorf("Invalid data: %s", job.Data)
switch job.DataType {
case rt.PT_WorkComplete:
t.Log("Work complete, handle ", job.Handle)
wg.Done()
case rt.PT_WorkException, rt.PT_WorkFail:
t.Log("Work fail, handle ", job.Handle, " cause: ", string(job.Data))
wg.Done()
case rt.PT_WorkData:
t.Logf("Work data: %+v", string(job.Data))
case rt.PT_WorkStatus:
status, err := job.Status()
if err != nil {
t.Error(err)
}
fmt.Println("Work status, num: %v, denom: %v", status.Numerator, status.Denominator)
}
return
}
handle, err := client.Do("ToUpper", []byte("abcdef"),
rt.JobLow, jobHandler)
handle, err := client.Do("scheduledJobTest", []byte("abcdef"),
rt.JobHigh, jobHandler)
if err != nil {
t.Error(err)
return
Expand All @@ -105,6 +118,8 @@ func TestClientDo(t *testing.T) {
} else {
t.Log(handle)
}
wg.Wait()

}

func TestClientStatus(t *testing.T) {
Expand Down
20 changes: 18 additions & 2 deletions pkg/runtime/job.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@ const (
PRIORITY_HIGH = 1

JobPrefix = "H:"
SchedJobPrefix = "S:"
CronJobPrefix = "S:"
EpochTimePrefix = "UTC-"
)

Expand All @@ -22,7 +22,7 @@ type Job struct {
Denominator int `json:"denominator,omitempty"`
CreateAt time.Time `json:"created_at,omitempty"`
ProcessAt time.Time `json:"process_at,omitempty"`
TimeoutSec int `json:"timeout_sec,omitempty"`
TimeoutSec int32 `json:"timeout_sec,omitempty"`
CreateBy int64 `json:"created_by,omitempty"` //client sessionId
ProcessBy int64 `json:"process_by,omitempty"` //worker sessionId
FuncName string `json:"function_name,omitempty"`
Expand All @@ -42,3 +42,19 @@ type CronJob struct {
SuccessfulRun int `json:"successful_run,omitempty"`
FailedRun int `json:"failed_run,omitempty"`
}

func (c *Job) Key() string {
return c.Handle
}

func (c *Job) Prefix() string {
return JobPrefix
}

func (c *CronJob) Key() string {
return c.Handle
}

func (c *CronJob) Prefix() string {
return CronJobPrefix
}
5 changes: 3 additions & 2 deletions pkg/runtime/protocol.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,8 +82,9 @@ const (
Req = 5391697
ReqStr = "\x00REQ"
// \x00RES
Res = 5391699
ResStr = "\x00RES"
Res = 5391699
ResStr = "\x00RES"
DefaultTimeout = 20 // 20 Seconds

HANDLE_SHAKE_HEADER_LENGTH = 12
)
Expand Down
Loading

0 comments on commit 8ddd88c

Please sign in to comment.