/
finite_sessions.go
121 lines (91 loc) · 3.54 KB
/
finite_sessions.go
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.
package tests
import (
"context"
"flag"
"fmt"
"log"
"strings"
"sync"
"time"
"github.com/Azure/azure-sdk-for-go/sdk/azcore/to"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/admin"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/stress/shared"
)
type finiteSessionsArgs struct {
numSessions int
rounds int
}
func FiniteSessions(remainingArgs []string) {
// NOTE: these values aren't particularly special, but they do try to create a reasonable default
// test just to make sure everything is working.
//
// Look in ../templates/deploy-job.yaml for some of the other parameter variations we use in stress/longevity
// testing.
fs := flag.NewFlagSet("FiniteSessions", flag.ContinueOnError)
params := finiteSessionsArgs{}
fs.IntVar(¶ms.numSessions, "sessions", 2000, "Number of sessions to test")
fs.IntVar(¶ms.rounds, "rounds", 300, "Number of rounds to run with these parameters. -1 means math.MaxInt64")
sc := shared.MustCreateStressContext("FiniteSessions", nil)
defer sc.End()
topicName := strings.ToLower(fmt.Sprintf("topic-%X", time.Now().UnixNano()))
sc.Start(topicName, map[string]string{
"NumSessions": fmt.Sprintf("%d", params.numSessions),
"Rounds": fmt.Sprintf("%d", params.rounds),
})
log.Printf("Creating topic %s", topicName)
cleanup := shared.MustCreateSubscriptions(sc, topicName, []string{"sub1"}, &shared.MustCreateSubscriptionsOptions{
Subscription: &admin.CreateSubscriptionOptions{
Properties: &admin.SubscriptionProperties{
RequiresSession: to.Ptr(true),
},
},
})
defer cleanup()
client, err := azservicebus.NewClientFromConnectionString(sc.ConnectionString, nil)
sc.NoError(err)
sender, err := shared.NewTrackingSender(sc.TC, client, topicName, nil)
sc.NoError(err)
defer sender.Close(sc.Context)
for round := 0; round < int(params.rounds); round++ {
var sessionReceivers []*azservicebus.SessionReceiver
wg := sync.WaitGroup{}
for i := 0; i < params.numSessions; i++ {
sessionID := fmt.Sprintf("%d:%d", round, i)
err = sender.SendMessage(sc.Context, &azservicebus.Message{
SessionID: &sessionID,
}, nil)
sc.NoError(err)
shared.TrackMetric(sc.Context, sc.TC, shared.MetricMessagesSent, 1, map[string]string{
"SessionID": sessionID,
})
sessionReceiver, err := client.AcceptNextSessionForSubscription(sc.Context, topicName, "sub1", nil)
sc.NoError(err)
shared.TrackMetric(sc.Context, sc.TC, shared.MetricSessionAccept, 1, nil)
// one of the things mentioned in the customer issue - they keep the session receivers
// alive for a long time.
sessionReceivers = append(sessionReceivers, sessionReceiver)
wg.Add(1)
go func() {
defer wg.Done()
ctx, cancel := context.WithTimeout(sc.Context, time.Minute)
messages, err := sessionReceiver.ReceiveMessages(ctx, 2, nil)
cancel()
sc.NoError(err)
sc.Equal(1, len(messages))
sc.Equal(sessionID, *messages[0].SessionID)
shared.TrackMetric(ctx, sc.TC, shared.MetricMessageReceived, float64(len(messages)), nil)
trackDuration := shared.TrackDuration(ctx, sc.TC, shared.MetricSettlementRequestDuration)
sc.NoError(sessionReceiver.CompleteMessage(sc.Context, messages[0], nil))
trackDuration(nil)
}()
}
wg.Wait()
for _, receiver := range sessionReceivers {
err = receiver.Close(sc.Context)
sc.NoErrorf(err, "No errors when session receiver is closed")
}
}
}