Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

expose private variables for brokers/backends #144

Merged
merged 1 commit into from
Jul 14, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
26 changes: 13 additions & 13 deletions amqp_backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,32 +15,32 @@ import (
// AMQPCeleryBackend CeleryBackend for AMQP
type AMQPCeleryBackend struct {
*amqp.Channel
connection *amqp.Connection
host string
Connection *amqp.Connection
Host string
}

// NewAMQPCeleryBackend creates new AMQPCeleryBackend
func NewAMQPCeleryBackend(host string) *AMQPCeleryBackend {
backend := NewAMQPCeleryBackendByConnAndChannel(NewAMQPConnection(host))
backend.Host = host
return backend
}

// NewAMQPCeleryBackendByConnAndChannel creates new AMQPCeleryBackend by AMQP connection and channel
func NewAMQPCeleryBackendByConnAndChannel(conn *amqp.Connection, channel *amqp.Channel) *AMQPCeleryBackend {
backend := &AMQPCeleryBackend{
Channel: channel,
connection: conn,
Connection: conn,
}
return backend
}

// NewAMQPCeleryBackend creates new AMQPCeleryBackend
func NewAMQPCeleryBackend(host string) *AMQPCeleryBackend {
backend := NewAMQPCeleryBackendByConnAndChannel(NewAMQPConnection(host))
backend.host = host
return backend
}

// Reconnect reconnects to AMQP server
func (b *AMQPCeleryBackend) Reconnect() {
b.connection.Close()
conn, channel := NewAMQPConnection(b.host)
b.Connection.Close()
conn, channel := NewAMQPConnection(b.Host)
b.Channel = channel
b.connection = conn
b.Connection = conn
}

// GetResult retrieves result from AMQP queue
Expand Down
34 changes: 17 additions & 17 deletions amqp_broker.go
Original file line number Diff line number Diff line change
Expand Up @@ -49,11 +49,11 @@ func NewAMQPQueue(name string) *AMQPQueue {
//AMQPCeleryBroker is RedisBroker for AMQP
type AMQPCeleryBroker struct {
*amqp.Channel
connection *amqp.Connection
exchange *AMQPExchange
queue *AMQPQueue
Connection *amqp.Connection
Exchange *AMQPExchange
Queue *AMQPQueue
consumingChannel <-chan amqp.Delivery
rate int
Rate int
}

// NewAMQPConnection creates new AMQP channel
Expand All @@ -79,18 +79,18 @@ func NewAMQPCeleryBroker(host string) *AMQPCeleryBroker {
func NewAMQPCeleryBrokerByConnAndChannel(conn *amqp.Connection, channel *amqp.Channel) *AMQPCeleryBroker {
broker := &AMQPCeleryBroker{
Channel: channel,
connection: conn,
exchange: NewAMQPExchange("default"),
queue: NewAMQPQueue("celery"),
rate: 4,
Connection: conn,
Exchange: NewAMQPExchange("default"),
Queue: NewAMQPQueue("celery"),
Rate: 4,
}
if err := broker.CreateExchange(); err != nil {
panic(err)
}
if err := broker.CreateQueue(); err != nil {
panic(err)
}
if err := broker.Qos(broker.rate, 0, false); err != nil {
if err := broker.Qos(broker.Rate, 0, false); err != nil {
panic(err)
}
if err := broker.StartConsumingChannel(); err != nil {
Expand All @@ -101,7 +101,7 @@ func NewAMQPCeleryBrokerByConnAndChannel(conn *amqp.Connection, channel *amqp.Ch

// StartConsumingChannel spawns receiving channel on AMQP queue
func (b *AMQPCeleryBroker) StartConsumingChannel() error {
channel, err := b.Consume(b.queue.Name, "", false, false, false, false, nil)
channel, err := b.Consume(b.Queue.Name, "", false, false, false, false, nil)
if err != nil {
return err
}
Expand Down Expand Up @@ -176,10 +176,10 @@ func (b *AMQPCeleryBroker) GetTaskMessage() (*TaskMessage, error) {
// CreateExchange declares AMQP exchange with stored configuration
func (b *AMQPCeleryBroker) CreateExchange() error {
return b.ExchangeDeclare(
b.exchange.Name,
b.exchange.Type,
b.exchange.Durable,
b.exchange.AutoDelete,
b.Exchange.Name,
b.Exchange.Type,
b.Exchange.Durable,
b.Exchange.AutoDelete,
false,
false,
nil,
Expand All @@ -189,9 +189,9 @@ func (b *AMQPCeleryBroker) CreateExchange() error {
// CreateQueue declares AMQP Queue with stored configuration
func (b *AMQPCeleryBroker) CreateQueue() error {
_, err := b.QueueDeclare(
b.queue.Name,
b.queue.Durable,
b.queue.AutoDelete,
b.Queue.Name,
b.Queue.Durable,
b.Queue.AutoDelete,
false,
false,
nil,
Expand Down
4 changes: 2 additions & 2 deletions broker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,7 +52,7 @@ func TestBrokerRedisSend(t *testing.T) {
}
conn := tc.broker.Get()
defer conn.Close()
messageJSON, err := conn.Do("BRPOP", tc.broker.queueName, "1")
messageJSON, err := conn.Do("BRPOP", tc.broker.QueueName, "1")
if err != nil || messageJSON == nil {
t.Errorf("test '%s': failed to get celery message from broker: %v", tc.name, err)
releaseCeleryMessage(celeryMessage)
Expand Down Expand Up @@ -106,7 +106,7 @@ func TestBrokerRedisGet(t *testing.T) {
}
conn := tc.broker.Get()
defer conn.Close()
_, err = conn.Do("LPUSH", tc.broker.queueName, jsonBytes)
_, err = conn.Do("LPUSH", tc.broker.QueueName, jsonBytes)
if err != nil {
t.Errorf("test '%s': failed to push celery message to redis: %v", tc.name, err)
releaseCeleryMessage(celeryMessage)
Expand Down
32 changes: 3 additions & 29 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -3,36 +3,10 @@ module github.com/gocelery/gocelery
go 1.13

require (
github.com/Djarvur/go-err113 v0.1.0 // indirect
github.com/fsnotify/fsnotify v1.4.9 // indirect
github.com/gogo/protobuf v1.3.1 // indirect
github.com/golangci/golangci-lint v1.28.3 // indirect
github.com/golangci/misspell v0.3.5 // indirect
github.com/golangci/revgrep v0.0.0-20180812185044-276a5c0a1039 // indirect
github.com/gomodule/redigo v2.0.0+incompatible
github.com/gostaticanalysis/analysisutil v0.1.0 // indirect
github.com/jirfag/go-printf-func-name v0.0.0-20200119135958-7558a9eaa5af // indirect
github.com/kr/pretty v0.1.0 // indirect
github.com/kyoh86/exportloopref v0.1.7 // indirect
github.com/mitchellh/mapstructure v1.3.2 // indirect
github.com/nishanths/exhaustive v0.0.0-20200708172631-8866003e3856 // indirect
github.com/pelletier/go-toml v1.8.0 // indirect
github.com/quasilyte/go-ruleguard v0.1.2 // indirect
github.com/quasilyte/regex/syntax v0.0.0-20200419152657-af9db7f4a3ab // indirect
github.com/kr/text v0.2.0 // indirect
github.com/niemeyer/pretty v0.0.0-20200227124842-a10e7caefd8e // indirect
github.com/satori/go.uuid v1.2.1-0.20181028125025-b2ce2384e17b
github.com/spf13/afero v1.3.1 // indirect
github.com/spf13/cast v1.3.1 // indirect
github.com/spf13/jwalterweatherman v1.1.0 // indirect
github.com/streadway/amqp v0.0.0-20190827072141-edfb9018d271
github.com/stretchr/objx v0.2.0 // indirect
github.com/tdakkota/asciicheck v0.0.0-20200416200610-e657995f937b // indirect
github.com/timakin/bodyclose v0.0.0-20200424151742-cb6215831a94 // indirect
golang.org/x/sys v0.0.0-20200625212154-ddb9806d33ae // indirect
golang.org/x/text v0.3.3 // indirect
golang.org/x/tools v0.0.0-20200713011307-fd294ab11aed // indirect
gopkg.in/ini.v1 v1.57.0 // indirect
gopkg.in/yaml.v3 v3.0.0-20200615113413-eeeca48fe776 // indirect
mvdan.cc/gofumpt v0.0.0-20200709182408-4fd085cb6d5f // indirect
mvdan.cc/unparam v0.0.0-20200501210554-b37ab49443f7 // indirect
sourcegraph.com/sqs/pbtypes v1.0.0 // indirect
gopkg.in/check.v1 v1.0.0-20200227125254-8fa46927fb4f // indirect
)
Loading