Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: Kafka broker #449

Open
wants to merge 15 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
49 changes: 37 additions & 12 deletions README.md
Expand Up @@ -181,6 +181,31 @@ cnf := &config.Config{
}
```

##### Kafka

`Broker` url is not applicable to Kafka broker so initialize worker with `config.KafaConfig`.

```go
cnf := &config.Config{
DefaultQueue: "KAFKA_TOPIC_TO_LISTEN",
ResultBackend: "YOUR_BACKEND_URL",
Kafka: &config.KafkaConfig{
// Kafka consumer group name.
Group: "KAFKA_GROUP",
// Kafka broker URLs.
Addrs: []string{"127.0.0.1:9092"},
// Client ID used by Kafka client (optional).
ClientID: "KAFKA_CLIENT_ID",
// Consume from newest if enabled, bydefault it consumes from oldest.
OffsetNewest: true,
// Set compression format (optional). Allowed values are `gzip`, `lz4`, `snappy`, `zstd`.
Compression: "gzip",
// Set compression level (optional).
CompressionLevel: 5,
},
}
```

#### DefaultQueue

Default queue name, e.g. `machinery_tasks`.
Expand Down Expand Up @@ -599,18 +624,18 @@ If you configure a result backend, the task states and results will be persisted

```go
const (
// StatePending - initial state of a task
StatePending = "PENDING"
// StateReceived - when task is received by a worker
StateReceived = "RECEIVED"
// StateStarted - when the worker starts processing the task
StateStarted = "STARTED"
// StateRetry - when failed task has been scheduled for retry
StateRetry = "RETRY"
// StateSuccess - when the task is processed successfully
StateSuccess = "SUCCESS"
// StateFailure - when processing of the task fails
StateFailure = "FAILURE"
// StatePending - initial state of a task
StatePending = "PENDING"
// StateReceived - when task is received by a worker
StateReceived = "RECEIVED"
// StateStarted - when the worker starts processing the task
StateStarted = "STARTED"
// StateRetry - when failed task has been scheduled for retry
StateRetry = "RETRY"
// StateSuccess - when the task is processed successfully
StateSuccess = "SUCCESS"
// StateFailure - when processing of the task fails
StateFailure = "FAILURE"
)
```

Expand Down
40 changes: 25 additions & 15 deletions go.mod
@@ -1,32 +1,42 @@
module github.com/RichardKnop/machinery

require (
cloud.google.com/go v0.36.0
cloud.google.com/go v0.82.0 // indirect
cloud.google.com/go/pubsub v1.10.3
github.com/DataDog/zstd v1.4.1 // indirect
github.com/RichardKnop/logging v0.0.0-20181101035820-b1d5d44c82d6
github.com/RichardKnop/redsync v1.2.0
github.com/Shopify/sarama v1.23.1
github.com/aws/aws-sdk-go v1.17.2
github.com/bradfitz/gomemcache v0.0.0-20180710155616-bc664df96737
github.com/eapache/go-resiliency v1.2.0 // indirect
github.com/frankban/quicktest v1.13.0 // indirect
github.com/go-redis/redis/v8 v8.11.3
github.com/go-redsync/redsync/v4 v4.4.1
github.com/go-stack/stack v1.8.0 // indirect
github.com/golang/snappy v0.0.1 // indirect
github.com/gocql/gocql v0.0.0-20200221113847-372a19b1a852
github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect
github.com/gomodule/redigo v2.0.0+incompatible
github.com/google/uuid v1.1.0
github.com/google/uuid v1.1.2
github.com/jcmturner/gofork v1.0.0 // indirect
github.com/kelseyhightower/envconfig v1.3.0
github.com/opentracing/opentracing-go v1.0.2
github.com/pierrec/lz4 v2.2.5+incompatible // indirect
github.com/pkg/errors v0.8.1
github.com/rcrowley/go-metrics v0.0.0-20190706150252-9beb055b7962 // indirect
github.com/streadway/amqp v0.0.0-20190214183023-884228600bc9
github.com/stretchr/testify v1.3.0
github.com/stvp/tempredis v0.0.0-20181119212430-b82af8480203 // indirect
github.com/stretchr/testify v1.6.1
github.com/tidwall/pretty v0.0.0-20180105212114-65a9db5fad51 // indirect
github.com/urfave/cli v1.20.0
github.com/xdg/scram v0.0.0-20180814205039-7eeb5667e42c // indirect
github.com/xdg/stringprep v1.0.0 // indirect
go.mongodb.org/mongo-driver v1.0.0
go.opencensus.io v0.19.0 // indirect
golang.org/x/crypto v0.0.0-20190219172222-a4c6cb3142f2 // indirect
golang.org/x/net v0.0.0-20190213061140-3a22650c66bd // indirect
golang.org/x/oauth2 v0.0.0-20190220154721-9b3c75971fc9 // indirect
golang.org/x/sys v0.0.0-20190221075227-b4e8571b14e0 // indirect
google.golang.org/genproto v0.0.0-20190219182410-082222b4a5c5 // indirect
google.golang.org/grpc v1.18.0 // indirect
gopkg.in/yaml.v2 v2.2.2
golang.org/x/net v0.0.0-20210521195947-fe42d452be8f // indirect
golang.org/x/sys v0.0.0-20210521203332-0cec03c779c1 // indirect
google.golang.org/api v0.47.0 // indirect
google.golang.org/genproto v0.0.0-20210521181308-5ccab8a35a9a // indirect
google.golang.org/grpc v1.38.0 // indirect
gopkg.in/jcmturner/goidentity.v3 v3.0.0 // indirect
gopkg.in/jcmturner/gokrb5.v7 v7.3.0 // indirect
gopkg.in/yaml.v2 v2.4.0
)

go 1.13