Skip to content

Commit

Permalink
Merge pull request #144 from gocelery/public-var
Browse files Browse the repository at this point in the history
expose private variables for brokers/backends
  • Loading branch information
yoonsio committed Jul 14, 2020
2 parents dc280ff + cbf55ab commit df519d7
Show file tree
Hide file tree
Showing 6 changed files with 43 additions and 694 deletions.
26 changes: 13 additions & 13 deletions amqp_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,32 +15,32 @@ import (
// AMQPCeleryBackend CeleryBackend for AMQP
type AMQPCeleryBackend struct {
*amqp.Channel
connection *amqp.Connection
host string
Connection *amqp.Connection
Host string
}

// NewAMQPCeleryBackend creates new AMQPCeleryBackend
func NewAMQPCeleryBackend(host string) *AMQPCeleryBackend {
backend := NewAMQPCeleryBackendByConnAndChannel(NewAMQPConnection(host))
backend.Host = host
return backend
}

// NewAMQPCeleryBackendByConnAndChannel creates new AMQPCeleryBackend by AMQP connection and channel
func NewAMQPCeleryBackendByConnAndChannel(conn *amqp.Connection, channel *amqp.Channel) *AMQPCeleryBackend {
backend := &AMQPCeleryBackend{
Channel: channel,
connection: conn,
Connection: conn,
}
return backend
}

// NewAMQPCeleryBackend creates new AMQPCeleryBackend
func NewAMQPCeleryBackend(host string) *AMQPCeleryBackend {
backend := NewAMQPCeleryBackendByConnAndChannel(NewAMQPConnection(host))
backend.host = host
return backend
}

// Reconnect reconnects to AMQP server
func (b *AMQPCeleryBackend) Reconnect() {
b.connection.Close()
conn, channel := NewAMQPConnection(b.host)
b.Connection.Close()
conn, channel := NewAMQPConnection(b.Host)
b.Channel = channel
b.connection = conn
b.Connection = conn
}

// GetResult retrieves result from AMQP queue
Expand Down
34 changes: 17 additions & 17 deletions amqp_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,11 @@ func NewAMQPQueue(name string) *AMQPQueue {
//AMQPCeleryBroker is RedisBroker for AMQP
type AMQPCeleryBroker struct {
*amqp.Channel
connection *amqp.Connection
exchange *AMQPExchange
queue *AMQPQueue
Connection *amqp.Connection
Exchange *AMQPExchange
Queue *AMQPQueue
consumingChannel <-chan amqp.Delivery
rate int
Rate int
}

// NewAMQPConnection creates new AMQP channel
Expand All @@ -79,18 +79,18 @@ func NewAMQPCeleryBroker(host string) *AMQPCeleryBroker {
func NewAMQPCeleryBrokerByConnAndChannel(conn *amqp.Connection, channel *amqp.Channel) *AMQPCeleryBroker {
broker := &AMQPCeleryBroker{
Channel: channel,
connection: conn,
exchange: NewAMQPExchange("default"),
queue: NewAMQPQueue("celery"),
rate: 4,
Connection: conn,
Exchange: NewAMQPExchange("default"),
Queue: NewAMQPQueue("celery"),
Rate: 4,
}
if err := broker.CreateExchange(); err != nil {
panic(err)
}
if err := broker.CreateQueue(); err != nil {
panic(err)
}
if err := broker.Qos(broker.rate, 0, false); err != nil {
if err := broker.Qos(broker.Rate, 0, false); err != nil {
panic(err)
}
if err := broker.StartConsumingChannel(); err != nil {
Expand All @@ -101,7 +101,7 @@ func NewAMQPCeleryBrokerByConnAndChannel(conn *amqp.Connection, channel *amqp.Ch

// StartConsumingChannel spawns receiving channel on AMQP queue
func (b *AMQPCeleryBroker) StartConsumingChannel() error {
channel, err := b.Consume(b.queue.Name, "", false, false, false, false, nil)
channel, err := b.Consume(b.Queue.Name, "", false, false, false, false, nil)
if err != nil {
return err
}
Expand Down Expand Up @@ -176,10 +176,10 @@ func (b *AMQPCeleryBroker) GetTaskMessage() (*TaskMessage, error) {
// CreateExchange declares AMQP exchange with stored configuration
func (b *AMQPCeleryBroker) CreateExchange() error {
return b.ExchangeDeclare(
b.exchange.Name,
b.exchange.Type,
b.exchange.Durable,
b.exchange.AutoDelete,
b.Exchange.Name,
b.Exchange.Type,
b.Exchange.Durable,
b.Exchange.AutoDelete,
false,
false,
nil,
Expand All @@ -189,9 +189,9 @@ func (b *AMQPCeleryBroker) CreateExchange() error {
// CreateQueue declares AMQP Queue with stored configuration
func (b *AMQPCeleryBroker) CreateQueue() error {
_, err := b.QueueDeclare(
b.queue.Name,
b.queue.Durable,
b.queue.AutoDelete,
b.Queue.Name,
b.Queue.Durable,
b.Queue.AutoDelete,
false,
false,
nil,
Expand Down
4 changes: 2 additions & 2 deletions broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func TestBrokerRedisSend(t *testing.T) {
}
conn := tc.broker.Get()
defer conn.Close()
messageJSON, err := conn.Do("BRPOP", tc.broker.queueName, "1")
messageJSON, err := conn.Do("BRPOP", tc.broker.QueueName, "1")
if err != nil || messageJSON == nil {
t.Errorf("test '%s': failed to get celery message from broker: %v", tc.name, err)
releaseCeleryMessage(celeryMessage)
Expand Down Expand Up @@ -106,7 +106,7 @@ func TestBrokerRedisGet(t *testing.T) {
}
conn := tc.broker.Get()
defer conn.Close()
_, err = conn.Do("LPUSH", tc.broker.queueName, jsonBytes)
_, err = conn.Do("LPUSH", tc.broker.QueueName, jsonBytes)
if err != nil {
t.Errorf("test '%s': failed to push celery message to redis: %v", tc.name, err)
releaseCeleryMessage(celeryMessage)
Expand Down
32 changes: 3 additions & 29 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,36 +3,10 @@ module github.com/gocelery/gocelery
go 1.13

require (
github.com/Djarvur/go-err113 v0.1.0 // indirect
github.com/fsnotify/fsnotify v1.4.9 // indirect
github.com/gogo/protobuf v1.3.1 // indirect
github.com/golangci/golangci-lint v1.28.3 // indirect
github.com/golangci/misspell v0.3.5 // indirect
github.com/golangci/revgrep v0.0.0-20180812185044-276a5c0a1039 // indirect
github.com/gomodule/redigo v2.0.0+incompatible
github.com/gostaticanalysis/analysisutil v0.1.0 // indirect
github.com/jirfag/go-printf-func-name v0.0.0-20200119135958-7558a9eaa5af // indirect
github.com/kr/pretty v0.1.0 // indirect
github.com/kyoh86/exportloopref v0.1.7 // indirect
github.com/mitchellh/mapstructure v1.3.2 // indirect
github.com/nishanths/exhaustive v0.0.0-20200708172631-8866003e3856 // indirect
github.com/pelletier/go-toml v1.8.0 // indirect
github.com/quasilyte/go-ruleguard v0.1.2 // indirect
github.com/quasilyte/regex/syntax v0.0.0-20200419152657-af9db7f4a3ab // indirect
github.com/kr/text v0.2.0 // indirect
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect
github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b
github.com/spf13/afero v1.3.1 // indirect
github.com/spf13/cast v1.3.1 // indirect
github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271
github.com/stretchr/objx v0.2.0 // indirect
github.com/tdakkota/asciicheck v0.0.0-20200416200610-e657995f937b // indirect
github.com/timakin/bodyclose v0.0.0-20200424151742-cb6215831a94 // indirect
golang.org/x/sys v0.0.0-20200625212154-ddb9806d33ae // indirect
golang.org/x/text v0.3.3 // indirect
golang.org/x/tools v0.0.0-20200713011307-fd294ab11aed // indirect
gopkg.in/ini.v1 v1.57.0 // indirect
gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776 // indirect
mvdan.cc/gofumpt v0.0.0-20200709182408-4fd085cb6d5f // indirect
mvdan.cc/unparam v0.0.0-20200501210554-b37ab49443f7 // indirect
sourcegraph.com/sqs/pbtypes v1.0.0 // indirect
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f // indirect
)
Loading

0 comments on commit df519d7

Please sign in to comment.