Skip to content

Commit

Permalink
Merge pull request RichardKnop#459 from WingGao/go-redis
Browse files Browse the repository at this point in the history
support redis cluster
  • Loading branch information
RichardKnop committed Oct 8, 2019
2 parents b18ab3f + e946a73 commit edf30ef
Show file tree
Hide file tree
Showing 8 changed files with 890 additions and 10 deletions.
1 change: 1 addition & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -204,6 +204,7 @@ For example:

1. `redis://localhost:6379`, or with password `redis://password@localhost:6379`
2. `redis+socket://password@/path/to/file.sock:/0`
3. cluster `redis://host1:port1,host2:port2,host3:port3`

##### Memcache

Expand Down
3 changes: 3 additions & 0 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -6,11 +6,14 @@ require (
github.com/RichardKnop/redsync v1.2.0
github.com/aws/aws-sdk-go v1.17.2
github.com/bradfitz/gomemcache v0.0.0-20180710155616-bc664df96737
github.com/go-redis/redis v6.15.2+incompatible
github.com/go-stack/stack v1.8.0 // indirect
github.com/golang/snappy v0.0.1 // indirect
github.com/gomodule/redigo v2.0.0+incompatible
github.com/google/uuid v1.1.0
github.com/kelseyhightower/envconfig v1.3.0
github.com/onsi/ginkgo v1.10.0 // indirect
github.com/onsi/gomega v1.7.0 // indirect
github.com/opentracing/opentracing-go v1.0.2
github.com/pkg/errors v0.8.1
github.com/streadway/amqp v0.0.0-20190214183023-884228600bc9
Expand Down
17 changes: 17 additions & 0 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -28,9 +28,13 @@ github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c
github.com/davecgh/go-spew v1.1.1/go.mod h1:J7Y8YcW2NihsgmVo/mv3lAwl/skON4iLHjSsI+c5H38=
github.com/dustin/go-humanize v1.0.0/go.mod h1:HtrtbFcZ19U5GC7JDqmcUSB87Iq5E25KnS6fMYU6eOk=
github.com/flynn/go-shlex v0.0.0-20150515145356-3f9db97f8568/go.mod h1:xEzjJPgXI435gkrCt3MPfRiAkVrwSbHsst4LCFVfpJc=
github.com/fsnotify/fsnotify v1.4.7 h1:IXs+QLmnXW2CcXuY+8Mzv/fWEsPGWxqefPtCP5CnV9I=
github.com/fsnotify/fsnotify v1.4.7/go.mod h1:jwhsz4b93w/PPRr/qN1Yymfu8t87LnFCMoQvtojpjFo=
github.com/ghodss/yaml v1.0.0/go.mod h1:4dBDuWmgqj2HViK6kFavaiC9ZROes6MMH2rRYeMEF04=
github.com/gliderlabs/ssh v0.1.1/go.mod h1:U7qILu1NlMHj9FlMhZLlkCdDnU1DBEAqr0aevW3Awn0=
github.com/go-redis/redis v6.15.2+incompatible h1:9SpNVG76gr6InJGxoZ6IuuxaCOQwDAhzyXg+Bs+0Sb4=
github.com/go-redis/redis v6.15.2+incompatible/go.mod h1:NAIEuMOZ/fxfXJIrKDQDz8wamY7mA7PouImQ2Jvg6kA=
github.com/go-redis/redis/v7 v7.0.0-beta.4/go.mod h1:xhhSbUMTsleRPur+Vgx9sUHtyN33bdjxY+9/0n9Ig8s=
github.com/go-stack/stack v1.8.0 h1:5SgMzNM5HxrEjV0ww2lTmX6E2Izsfxas4+YHWRs3Lsk=
github.com/go-stack/stack v1.8.0/go.mod h1:v0f6uXyyMGvRgIKkXu+yp6POWl0qKG85gN/melR3HDY=
github.com/gogo/protobuf v1.1.1/go.mod h1:r8qH/GZQm5c6nD/R0oafs1akxWv10x8SbQlK7atdtwQ=
Expand Down Expand Up @@ -62,6 +66,8 @@ github.com/gopherjs/gopherjs v0.0.0-20181017120253-0766667cb4d1/go.mod h1:wJfORR
github.com/gregjones/httpcache v0.0.0-20180305231024-9cad4c3443a7/go.mod h1:FecbI9+v66THATjSRHfNgh1IVFe/9kFxbXtjV0ctIMA=
github.com/grpc-ecosystem/grpc-gateway v1.5.0/go.mod h1:RSKVYQBd5MCa4OVpNdGskqpgL2+G+NZTnrVHpWWfpdw=
github.com/grpc-ecosystem/grpc-gateway v1.6.2/go.mod h1:RSKVYQBd5MCa4OVpNdGskqpgL2+G+NZTnrVHpWWfpdw=
github.com/hpcloud/tail v1.0.0 h1:nfCOvKYfkgYP8hkirhJocXT2+zOD8yUNjXaWfTlyFKI=
github.com/hpcloud/tail v1.0.0/go.mod h1:ab1qPbhIpdTxEkNHXyeSf5vhxWSCs/tWer42PpOxQnU=
github.com/jellevandenhooff/dkim v0.0.0-20150330215556-f50fe3d243e1/go.mod h1:E0B/fFc00Y+Rasa88328GlI/XbtyysCtTHZS8h7IrBU=
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af h1:pmfjZENx5imkbgOkpRUYLnmbU7UEFbjtDA2hxJ1ichM=
github.com/jmespath/go-jmespath v0.0.0-20180206201540-c2b33e8439af/go.mod h1:Nht3zPeWKUH0NzdCt2Blrr5ys8VGpn0CEB0cQHVjt7k=
Expand All @@ -78,6 +84,13 @@ github.com/matttproud/golang_protobuf_extensions v1.0.1/go.mod h1:D8He9yQNgCq6Z5
github.com/microcosm-cc/bluemonday v1.0.1/go.mod h1:hsXNsILzKxV+sX77C5b8FSuKF00vh2OMYv+xgHpAMF4=
github.com/neelance/astrewrite v0.0.0-20160511093645-99348263ae86/go.mod h1:kHJEU3ofeGjhHklVoIGuVj85JJwZ6kWPaJwCIxgnFmo=
github.com/neelance/sourcemap v0.0.0-20151028013722-8c68805598ab/go.mod h1:Qr6/a/Q4r9LP1IltGz7tA7iOK1WonHEYhu1HRBA7ZiM=
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.8.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/ginkgo v1.10.0 h1:XaTQmdKecIbwNHpzOIy0XMoEG5bmv/n0OVyaF1NKUdo=
github.com/onsi/ginkgo v1.10.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/gomega v1.5.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/onsi/gomega v1.7.0 h1:XPnZz8VVBHjVsy1vzJmRwIcSwiUO+JFfrv/xGiigmME=
github.com/onsi/gomega v1.7.0/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
github.com/opentracing/opentracing-go v1.0.2 h1:3jA2P6O1F9UOrWVpwrIo17pu01KWvNWg4X946/Y5Zwg=
github.com/opentracing/opentracing-go v1.0.2/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
github.com/openzipkin/zipkin-go v0.1.1/go.mod h1:NtoC/o8u3JlF1lSlyPNswIbeQH9bJTmOf0Erfk+hxe8=
Expand Down Expand Up @@ -221,7 +234,11 @@ gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405 h1:yhCVgyC4o1eVCa2tZl7eS0r+
gopkg.in/check.v1 v0.0.0-20161208181325-20d25e280405/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127 h1:qIbj1fsPNlZgppZ+VLlY7N33q108Sa+fhmuc+sWQYwY=
gopkg.in/check.v1 v1.0.0-20180628173108-788fd7840127/go.mod h1:Co6ibVJAznAaIkqp8huTwlJQCZ016jof/cbN4VW5Yz0=
gopkg.in/fsnotify.v1 v1.4.7 h1:xOHLXZwVvI9hhs+cLKq5+I5onOuwQLhQwiu63xxlHs4=
gopkg.in/fsnotify.v1 v1.4.7/go.mod h1:Tz8NjZHkW78fSQdbUxIjBTcgA1z1m8ZHf0WmKUhAMys=
gopkg.in/inf.v0 v0.9.1/go.mod h1:cWUDdTG/fYaXco+Dcufb5Vnc6Gp2YChqWtbxRZE0mXw=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7 h1:uRGJdciOHaEIrze2W8Q3AKkepLTh2hOroT7a+7czfdQ=
gopkg.in/tomb.v1 v1.0.0-20141024135613-dd632973f1e7/go.mod h1:dt/ZhP58zS4L8KSrWDmTeBkI65Dw0HsyUHuEVlX15mw=
gopkg.in/yaml.v2 v2.2.1 h1:mUhvW9EsL+naU5Q3cakzfE91YhliOondGd6ZrsDBHQE=
gopkg.in/yaml.v2 v2.2.1/go.mod h1:hI93XBmqTisBFMUTm0b8Fm+jr3Dg1NNxqwp+5A1VGuI=
gopkg.in/yaml.v2 v2.2.2 h1:ZCJp+EgiOT7lHqUV2J862kp8Qj64Jo6az82+3Td9dZw=
Expand Down
299 changes: 299 additions & 0 deletions v1/backends/redis/goredis.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,299 @@
package redis

import (
"bytes"
"encoding/json"
"github.com/go-redis/redis"
"sync"
"time"

"github.com/RichardKnop/machinery/v1/backends/iface"
"github.com/RichardKnop/machinery/v1/common"
"github.com/RichardKnop/machinery/v1/config"
"github.com/RichardKnop/machinery/v1/log"
"github.com/RichardKnop/machinery/v1/tasks"
"github.com/RichardKnop/redsync"
)

// Backend represents a Redis result backend
type BackendGR struct {
common.Backend
rclient redis.UniversalClient
host string
password string
db int
// If set, path to a socket file overrides hostname
socketPath string
redsync *redsync.Redsync
redisOnce sync.Once
}

// New creates Backend instance
func NewGR(cnf *config.Config, addrs []string, db int) iface.Backend {
b := &BackendGR{
Backend: common.NewBackend(cnf),
}
ropt := &redis.UniversalOptions{
Addrs: addrs,
DB: db,
}
b.rclient = redis.NewUniversalClient(ropt)
return b
}

// InitGroup creates and saves a group meta data object
func (b *BackendGR) InitGroup(groupUUID string, taskUUIDs []string) error {
groupMeta := &tasks.GroupMeta{
GroupUUID: groupUUID,
TaskUUIDs: taskUUIDs,
CreatedAt: time.Now().UTC(),
}

encoded, err := json.Marshal(groupMeta)
if err != nil {
return err
}

err = b.rclient.Set(groupUUID, encoded, 0).Err()
if err != nil {
return err
}

return b.setExpirationTime(groupUUID)
}

// GroupCompleted returns true if all tasks in a group finished
func (b *BackendGR) GroupCompleted(groupUUID string, groupTaskCount int) (bool, error) {
groupMeta, err := b.getGroupMeta(groupUUID)
if err != nil {
return false, err
}

taskStates, err := b.getStates(groupMeta.TaskUUIDs...)
if err != nil {
return false, err
}

var countSuccessTasks = 0
for _, taskState := range taskStates {
if taskState.IsCompleted() {
countSuccessTasks++
}
}

return countSuccessTasks == groupTaskCount, nil
}

// GroupTaskStates returns states of all tasks in the group
func (b *BackendGR) GroupTaskStates(groupUUID string, groupTaskCount int) ([]*tasks.TaskState, error) {
groupMeta, err := b.getGroupMeta(groupUUID)
if err != nil {
return []*tasks.TaskState{}, err
}

return b.getStates(groupMeta.TaskUUIDs...)
}

// TriggerChord flags chord as triggered in the backend storage to make sure
// chord is never trigerred multiple times. Returns a boolean flag to indicate
// whether the worker should trigger chord (true) or no if it has been triggered
// already (false)
func (b *BackendGR) TriggerChord(groupUUID string) (bool, error) {
m := b.redsync.NewMutex("TriggerChordMutex")
if err := m.Lock(); err != nil {
return false, err
}
defer m.Unlock()

groupMeta, err := b.getGroupMeta(groupUUID)
if err != nil {
return false, err
}

// Chord has already been triggered, return false (should not trigger again)
if groupMeta.ChordTriggered {
return false, nil
}

// Set flag to true
groupMeta.ChordTriggered = true

// Update the group meta
encoded, err := json.Marshal(&groupMeta)
if err != nil {
return false, err
}

err = b.rclient.Set(groupUUID, encoded, 0).Err()
if err != nil {
return false, err
}

return true, b.setExpirationTime(groupUUID)
}

func (b *BackendGR) mergeNewTaskState(newState *tasks.TaskState) {
state, err := b.GetState(newState.TaskUUID)
if err == nil {
newState.CreatedAt = state.CreatedAt
newState.TaskName = state.TaskName
}
}

// SetStatePending updates task state to PENDING
func (b *BackendGR) SetStatePending(signature *tasks.Signature) error {
taskState := tasks.NewPendingTaskState(signature)
return b.updateState(taskState)
}

// SetStateReceived updates task state to RECEIVED
func (b *BackendGR) SetStateReceived(signature *tasks.Signature) error {
taskState := tasks.NewReceivedTaskState(signature)
b.mergeNewTaskState(taskState)
return b.updateState(taskState)
}

// SetStateStarted updates task state to STARTED
func (b *BackendGR) SetStateStarted(signature *tasks.Signature) error {
taskState := tasks.NewStartedTaskState(signature)
b.mergeNewTaskState(taskState)
return b.updateState(taskState)
}

// SetStateRetry updates task state to RETRY
func (b *BackendGR) SetStateRetry(signature *tasks.Signature) error {
taskState := tasks.NewRetryTaskState(signature)
b.mergeNewTaskState(taskState)
return b.updateState(taskState)
}

// SetStateSuccess updates task state to SUCCESS
func (b *BackendGR) SetStateSuccess(signature *tasks.Signature, results []*tasks.TaskResult) error {
taskState := tasks.NewSuccessTaskState(signature, results)
b.mergeNewTaskState(taskState)
return b.updateState(taskState)
}

// SetStateFailure updates task state to FAILURE
func (b *BackendGR) SetStateFailure(signature *tasks.Signature, err string) error {
taskState := tasks.NewFailureTaskState(signature, err)
b.mergeNewTaskState(taskState)
return b.updateState(taskState)
}

// GetState returns the latest task state
func (b *BackendGR) GetState(taskUUID string) (*tasks.TaskState, error) {

item, err := b.rclient.Get(taskUUID).Bytes()
if err != nil {
return nil, err
}
state := new(tasks.TaskState)
decoder := json.NewDecoder(bytes.NewReader(item))
decoder.UseNumber()
if err := decoder.Decode(state); err != nil {
return nil, err
}

return state, nil
}

// PurgeState deletes stored task state
func (b *BackendGR) PurgeState(taskUUID string) error {
err := b.rclient.Del(taskUUID).Err()
if err != nil {
return err
}

return nil
}

// PurgeGroupMeta deletes stored group meta data
func (b *BackendGR) PurgeGroupMeta(groupUUID string) error {
err := b.rclient.Del(groupUUID).Err()
if err != nil {
return err
}

return nil
}

// getGroupMeta retrieves group meta data, convenience function to avoid repetition
func (b *BackendGR) getGroupMeta(groupUUID string) (*tasks.GroupMeta, error) {
item, err := b.rclient.Get(groupUUID).Bytes()
if err != nil {
return nil, err
}

groupMeta := new(tasks.GroupMeta)
decoder := json.NewDecoder(bytes.NewReader(item))
decoder.UseNumber()
if err := decoder.Decode(groupMeta); err != nil {
return nil, err
}

return groupMeta, nil
}

// getStates returns multiple task states
func (b *BackendGR) getStates(taskUUIDs ...string) ([]*tasks.TaskState, error) {
taskStates := make([]*tasks.TaskState, len(taskUUIDs))
// to avoid CROSSSLOT error, use pipeline
cmders, err := b.rclient.Pipelined(func(pipeliner redis.Pipeliner) error {
for _, uuid := range taskUUIDs {
pipeliner.Get(uuid)
}
return nil
})
if err != nil {
return taskStates, err
}
for i, cmder := range cmders {
stateBytes, err1 := cmder.(*redis.StringCmd).Bytes()
if err1 != nil {
return taskStates, err1
}
taskState := new(tasks.TaskState)
decoder := json.NewDecoder(bytes.NewReader(stateBytes))
decoder.UseNumber()
if err1 = decoder.Decode(taskState); err1 != nil {
log.ERROR.Print(err1)
return taskStates, err1
}
taskStates[i] = taskState
}

return taskStates, nil
}

// updateState saves current task state
func (b *BackendGR) updateState(taskState *tasks.TaskState) error {
encoded, err := json.Marshal(taskState)
if err != nil {
return err
}

_, err = b.rclient.Set(taskState.TaskUUID, encoded, 0).Result()
if err != nil {
return err
}

return b.setExpirationTime(taskState.TaskUUID)
}

// setExpirationTime sets expiration timestamp on a stored task state
func (b *BackendGR) setExpirationTime(key string) error {
expiresIn := b.GetConfig().ResultsExpireIn
if expiresIn == 0 {
// // expire results after 1 hour by default
expiresIn = config.DefaultResultsExpireIn
}
expirationTimestamp := time.Now().Add(time.Duration(expiresIn) * time.Second)

_, err := b.rclient.ExpireAt(key, expirationTimestamp).Result()
if err != nil {
return err
}

return nil
}
Loading

0 comments on commit edf30ef

Please sign in to comment.