Skip to content

Commit

Permalink
code reorg
Browse files Browse the repository at this point in the history
  • Loading branch information
Dieterbe committed Oct 9, 2018
1 parent 4f71a9d commit 07188ca
Showing 1 changed file with 42 additions and 42 deletions.
84 changes: 42 additions & 42 deletions input/amqp.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,10 +12,6 @@ import (
"github.com/streadway/amqp"
)

// amqpConnector is a function that connects an instance of *Amqp so
// it will receive messages
type amqpConnector func(a *Amqp) (<-chan amqp.Delivery, error)

// Closables are things that have a .Close() method (channels & connections)
type Closable interface {
Close() error
Expand Down Expand Up @@ -56,44 +52,6 @@ func NewAMQP(config cfg.Config, dispatcher Dispatcher, connect amqpConnector) *A
}
}

// connects an instance of Amqp and returns the message channel
func AMQPConnector(a *Amqp) (<-chan amqp.Delivery, error) {
log.Notice("dialing AMQP: %v", a.uri)
conn, err := amqp.Dial(a.uri.String())
if err != nil {
return nil, err
}
a.conn = conn

amqpChan, err := conn.Channel()
if err != nil {
a.conn.Close()
return nil, err
}
a.channel = amqpChan

// queue name will be random, as in the python implementation
q, err := amqpChan.QueueDeclare(a.config.Amqp.Amqp_queue, a.config.Amqp.Amqp_durable, false, a.config.Amqp.Amqp_exclusive, false, nil)
if err != nil {
a.close()
return nil, err
}

err = amqpChan.QueueBind(q.Name, a.config.Amqp.Amqp_key, a.config.Amqp.Amqp_exchange, false, nil)
if err != nil {
a.close()
return nil, err
}

c, err := amqpChan.Consume(q.Name, "carbon-relay-ng", true, a.config.Amqp.Amqp_exclusive, true, false, nil)
if err != nil {
a.close()
return nil, err
}

return c, nil
}

func (a *Amqp) Name() string {
return "amqp"
}
Expand Down Expand Up @@ -174,3 +132,45 @@ func (a *Amqp) consumeAMQP(c <-chan amqp.Delivery) {
}
}
}

// amqpConnector is a function that connects an instance of *Amqp so
// it will receive messages
type amqpConnector func(a *Amqp) (<-chan amqp.Delivery, error)

// connects an instance of Amqp and returns the message channel
func AMQPConnector(a *Amqp) (<-chan amqp.Delivery, error) {
log.Notice("dialing AMQP: %v", a.uri)
conn, err := amqp.Dial(a.uri.String())
if err != nil {
return nil, err
}
a.conn = conn

amqpChan, err := conn.Channel()
if err != nil {
a.conn.Close()
return nil, err
}
a.channel = amqpChan

// queue name will be random, as in the python implementation
q, err := amqpChan.QueueDeclare(a.config.Amqp.Amqp_queue, a.config.Amqp.Amqp_durable, false, a.config.Amqp.Amqp_exclusive, false, nil)
if err != nil {
a.close()
return nil, err
}

err = amqpChan.QueueBind(q.Name, a.config.Amqp.Amqp_key, a.config.Amqp.Amqp_exchange, false, nil)
if err != nil {
a.close()
return nil, err
}

c, err := amqpChan.Consume(q.Name, "carbon-relay-ng", true, a.config.Amqp.Amqp_exclusive, true, false, nil)
if err != nil {
a.close()
return nil, err
}

return c, nil
}

0 comments on commit 07188ca

Please sign in to comment.