Skip to content
This repository has been archived by the owner on Oct 12, 2023. It is now read-only.

Commit

Permalink
Adding the ability to cancel scheduled messges
Browse files Browse the repository at this point in the history
  • Loading branch information
marstr committed Nov 2, 2018
1 parent c03b0ce commit 3884a90
Show file tree
Hide file tree
Showing 3 changed files with 46 additions and 14 deletions.
1 change: 1 addition & 0 deletions operation_constants.go
Expand Up @@ -7,6 +7,7 @@ const (
lockRenewalOperationName = vendorPrefix + "renew-lock"
peekMessageOperationID = vendorPrefix + "peek-message"
scheduleMessageOperationID = vendorPrefix + "schedule-message"
cancelScheduledOperationID = vendorPrefix + "cancel-scheduled-message"
)

// Field Descriptions
Expand Down
40 changes: 39 additions & 1 deletion queue.go
Expand Up @@ -243,13 +243,51 @@ func (q *Queue) ScheduleAt(ctx context.Context, enqueueTime time.Time, messages
}
return retval, nil
}
return nil, newErrIncorrectType(sequenceFieldName, []interface{}{}, rawArr)
return nil, newErrIncorrectType(sequenceFieldName, []int64{}, rawArr)
}
return nil, ErrMissingField(sequenceFieldName)
}
return nil, newErrIncorrectType("value", map[string]interface{}{}, resp.Message.Value)
}

// CancelScheduled allows for removal of messages that have been handed to the Service Bus broker for later delivery,
// but have not yet ben enqueued.
func (q *Queue) CancelScheduled(ctx context.Context, seq ...int64) error {
msg := &amqp.Message{
ApplicationProperties: map[string]interface{}{
operationFieldName: cancelScheduledOperationID,
},
Value: map[string]interface{}{
"sequence-numbers": seq,
},
}

if deadline, ok := ctx.Deadline(); ok {
msg.ApplicationProperties[serverTimeoutFieldName] = uint(time.Until(deadline) / time.Millisecond)
}

err := q.ensureSender(ctx)
if err != nil {
return err
}

link, err := rpc.NewLink(q.sender.connection, q.ManagementPath())
if err != nil {
return err
}

resp, err := link.RetryableRPC(ctx, 5, 5*time.Second, msg)
if err != nil {
return err
}

if resp.Code != 200 {
return ErrAMQP(*resp)
}

return nil
}

// Peek fetches a list of Messages from the Service Bus broker without acquiring a lock or committing to a disposition.
// The messages are delivered as close to sequence order as possible.
//
Expand Down
19 changes: 6 additions & 13 deletions queue_examples_test.go
Expand Up @@ -104,7 +104,7 @@ func ExampleQueue_Receive() {
client.Receive(ctx, printMessage)
}

func ExampleQueue_ScheduleAt() {
func ExampleQueue_scheduleAndCancelMessages() {
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Minute+40*time.Second)
defer cancel()

Expand Down Expand Up @@ -137,28 +137,21 @@ func ExampleQueue_ScheduleAt() {
expectedTime := time.Now().Add(waitTime)
msg := servicebus.NewMessageFromString("to the future!!")

_, err = client.ScheduleAt(ctx, expectedTime, msg)
scheduled, err := client.ScheduleAt(ctx, expectedTime, msg)
if err != nil {
fmt.Println("FATAL: ", err)
return
}

err = client.ReceiveOne(ctx,
servicebus.HandlerFunc(func(ctx context.Context, msg *servicebus.Message) servicebus.DispositionAction {
received := time.Now()
if received.Before(expectedTime.Add(buffer)) && received.After(expectedTime.Add(-buffer)) {
fmt.Println("Received when expected!")
} else {
fmt.Println("Received outside the expected window.")
}
return msg.Complete()
}))
err = client.CancelScheduled(ctx, scheduled...)
if err != nil {
fmt.Println("FATAL: ", err)
return
}

// Output: Received when expected!
fmt.Println("All Messages Scheduled and Cancelled")

// Output: All Messages Scheduled and Cancelled
}

func ExampleQueue_sessionsRoundTrip() {
Expand Down

0 comments on commit 3884a90

Please sign in to comment.