This repository has been archived by the owner on Oct 12, 2023. It is now read-only.
-
Notifications
You must be signed in to change notification settings - Fork 50
/
lockrenewal.go
75 lines (61 loc) · 1.57 KB
/
lockrenewal.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
package servicebus
import (
"context"
"fmt"
"sync"
"time"
"github.com/Azure/azure-amqp-common-go/log"
"github.com/Azure/azure-amqp-common-go/rpc"
"go.opencensus.io/trace"
"pack.ag/amqp"
)
type (
lockRenewer interface {
entityConnector
lockMutex() *sync.Mutex
}
)
func renewLocks(ctx context.Context, lr lockRenewer, messages ...*Message) error {
span, ctx := startConsumerSpanFromContext(ctx, "sb.RenewLocks")
defer span.Finish()
lockTokens := make([]amqp.UUID, 0, len(messages))
for _, m := range messages {
if m.LockToken == nil {
log.For(ctx).Error(fmt.Errorf("failed: message has nil lock token, cannot renew lock"), trace.StringAttribute("messageId", m.ID))
continue
}
amqpLockToken := amqp.UUID(*m.LockToken)
lockTokens = append(lockTokens, amqpLockToken)
}
if len(lockTokens) < 1 {
log.For(ctx).Info("no lock tokens present to renew")
return nil
}
lr.lockMutex().Lock()
defer lr.lockMutex().Unlock()
renewRequestMsg := &amqp.Message{
ApplicationProperties: map[string]interface{}{
operationFieldName: lockRenewalOperationName,
},
Value: map[string]interface{}{
lockTokensFieldName: lockTokens,
},
}
entityManagementAddress := lr.ManagementPath()
conn, err := lr.connection(ctx)
if err != nil {
return err
}
rpcLink, err := rpc.NewLink(conn, entityManagementAddress)
if err != nil {
return err
}
response, err := rpcLink.RetryableRPC(ctx, 3, 1*time.Second, renewRequestMsg)
if err != nil {
return err
}
if response.Code != 200 {
return fmt.Errorf("error renewing locks: %v", response.Description)
}
return nil
}