Skip to content

HTTPS clone URL

Subversion checkout URL

You can clone with HTTPS or Subversion.

Download ZIP
branch: master
Fetching contributors…

Cannot retrieve contributors at this time

167 lines (135 sloc) 5.401 kb
// Copyright (c) 2012, Sean Treadway, SoundCloud Ltd.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Source code and contact info at http://github.com/streadway/amqp
package amqp
import (
"time"
)
// A delivery from the server to a consumer started with Queue.Consume or
// Queue.Get.
type Delivery struct {
channel *Channel
Headers Table // Application or header exchange table
// Properties
ContentType string // MIME content type
ContentEncoding string // MIME content encoding
DeliveryMode uint8 // queue implemention use - non-persistent (1) or persistent (2)
Priority uint8 // queue implementation use - 0 to 9
CorrelationId string // application use - correlation identifier
ReplyTo string // application use - address to to reply to (ex: RPC)
Expiration string // implementation use - message expiration spec
MessageId string // application use - message identifier
Timestamp time.Time // application use - message timestamp
Type string // application use - message type name
UserId string // application use - creating user - should be authenticated user
AppId string // application use - creating application id
// Valid only with Channel.Consume
ConsumerTag string
// Valid only with Channel.Get
MessageCount uint32
DeliveryTag uint64
Redelivered bool
Exchange string // basic.publish exhange
RoutingKey string // basic.publish routing key
Body []byte
}
func newDelivery(channel *Channel, msg messageWithContent) *Delivery {
props, body := msg.getContent()
delivery := Delivery{
channel: channel,
Headers: props.Headers,
ContentType: props.ContentType,
ContentEncoding: props.ContentEncoding,
DeliveryMode: props.DeliveryMode,
Priority: props.Priority,
CorrelationId: props.CorrelationId,
ReplyTo: props.ReplyTo,
Expiration: props.Expiration,
MessageId: props.MessageId,
Timestamp: props.Timestamp,
Type: props.Type,
UserId: props.UserId,
AppId: props.AppId,
Body: body,
}
// Properties for the delivery types
switch m := msg.(type) {
case *basicDeliver:
delivery.ConsumerTag = m.ConsumerTag
delivery.DeliveryTag = m.DeliveryTag
delivery.Redelivered = m.Redelivered
delivery.Exchange = m.Exchange
delivery.RoutingKey = m.RoutingKey
case *basicGetOk:
delivery.MessageCount = m.MessageCount
delivery.DeliveryTag = m.DeliveryTag
delivery.Redelivered = m.Redelivered
delivery.Exchange = m.Exchange
delivery.RoutingKey = m.RoutingKey
}
return &delivery
}
/*
All deliveries in AMQP must be acknowledged. If you called Channel.Consume
with autoAck true then the server will be automatically ack each message and
this method should not be called. Otherwise, you must call Delivery.Ack after
you have successfully processed this delivery.
When multiple is true, this delivery and all prior unacknowledged deliveries
on the same channel will be acknowledged. This is useful for batch processing
of deliveries.
An error will indicate that the acknowledge could not be delivered to the
channel it was sent from.
Either Delivery.Ack, Delivery.Reject or Delivery.Nack must be called for every
delivery that is not automatically acknowledged.
*/
func (me Delivery) Ack(multiple bool) error {
return me.channel.send(me.channel, &basicAck{
DeliveryTag: me.DeliveryTag,
Multiple: multiple,
})
}
/*
Negatively acknowledge processing of this message.
When requeue is true, queue this message to be delivered to a consumer on a
different channel. When requeue is false or the server is unable to queue this
message, it will be dropped.
If you are batch processing deliveries, and your server supports it, prefer
Delivery.Nack.
Either Delivery.Ack, Delivery.Reject or Delivery.Nack must be called for every
delivery that is not automatically acknowledged.
*/
func (me Delivery) Reject(requeue bool) error {
return me.channel.send(me.channel, &basicReject{
DeliveryTag: me.DeliveryTag,
Requeue: requeue,
})
}
/*
Uses the consumer identity in this delivery to cancel the consumer delegating
to Channel.Cancel. An error indicates the channel is closed.
*/
func (me Delivery) Cancel(noWait bool) error {
return me.channel.Cancel(me.ConsumerTag, noWait)
}
/*
RabbitMQ extension - Negatively acknowledge the delivery of message(s)
identified by the deliveryTag.
When multiple is true, nack messages up to and including delivered messages up
until the deliveryTag delivered on the same channel.
When requeue is true, request the server to deliver this message to a different
consumer. If it is not possible or requeue is false, the message will be
dropped or delivered to a server configured dead-letter queue.
This method must not be used to select or requeue messages the client wishes
not to handle, rather it is to inform the server that the client is incapable
of handling this message at this time.
Either Delivery.Ack, Delivery.Reject or Delivery.Nack must be called for every
delivery that is not automatically acknowledged.
*/
func (me Delivery) Nack(multiple, requeue bool) error {
return me.channel.send(me.channel, &basicNack{
DeliveryTag: me.DeliveryTag,
Multiple: multiple,
Requeue: requeue,
})
}
Jump to Line
Something went wrong with that request. Please try again.