Skip to content

Commit

Permalink
[azservicebus, azeventhubs] Updating to take in the new AMQP version …
Browse files Browse the repository at this point in the history
…to fix issue with sessions taking 26s to close (#22509)

Pulling in a new AMQP version with my fix for the "body size < frame size" that would cause to hang for 26 seconds while closing a session on a link that was idle.

Fixes #21684
  • Loading branch information
richardpark-msft committed Mar 4, 2024
1 parent d5d6195 commit c498179
Show file tree
Hide file tree
Showing 12 changed files with 153 additions and 24 deletions.
8 changes: 2 additions & 6 deletions sdk/messaging/azeventhubs/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,14 +1,10 @@
# Release History

## 1.0.4 (Unreleased)

### Features Added

### Breaking Changes
## 1.0.4 (2024-03-05)

### Bugs Fixed

### Other Changes
- Fixed case where closing a Receiver/Sender after an idle period would take > 20 seconds. (PR#22509)

## 1.0.3 (2024-01-16)

Expand Down
2 changes: 1 addition & 1 deletion sdk/messaging/azeventhubs/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require (
github.com/Azure/azure-sdk-for-go/sdk/internal v1.5.2
github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/eventhub/armeventhub v1.0.0
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.3.0
github.com/Azure/go-amqp v1.0.4
github.com/Azure/go-amqp v1.0.5
github.com/golang/mock v1.6.0
github.com/joho/godotenv v1.5.1
github.com/stretchr/testify v1.8.4
Expand Down
4 changes: 2 additions & 2 deletions sdk/messaging/azeventhubs/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,8 @@ github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/eventhub/armeventhub v1.0.
github.com/Azure/azure-sdk-for-go/sdk/resourcemanager/storage/armstorage v1.5.0 h1:AifHbc4mg0x9zW52WOpKbsHaDKuRhlI7TVl47thgQ70=
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.3.0 h1:IfFdxTUDiV58iZqPKgyWiz4X4fCxZeQ1pTQPImLYXpY=
github.com/Azure/azure-sdk-for-go/sdk/storage/azblob v1.3.0/go.mod h1:SUZc9YRRHfx2+FAQKNDGrssXehqLpxmwRv2mC/5ntj4=
github.com/Azure/go-amqp v1.0.4 h1:GX5OFOs706UjuFRD5PDKm3aOuLQ92F7DMbua+DKAYCc=
github.com/Azure/go-amqp v1.0.4/go.mod h1:vZAogwdrkbyK3Mla8m/CxSc/aKdnTZ4IbPxl51Y5WZE=
github.com/Azure/go-amqp v1.0.5 h1:po5+ljlcNSU8xtapHTe8gIc8yHxCzC03E8afH2g1ftU=
github.com/Azure/go-amqp v1.0.5/go.mod h1:vZAogwdrkbyK3Mla8m/CxSc/aKdnTZ4IbPxl51Y5WZE=
github.com/AzureAD/microsoft-authentication-library-for-go v1.2.1 h1:DzHpqpoJVaCgOUdVHxE8QB52S6NiVdDQvGlny1qvPqA=
github.com/AzureAD/microsoft-authentication-library-for-go v1.2.1/go.mod h1:wP83P5OoQ5p6ip3ScPr0BAq0BvuPAvacpEuSzyouqAI=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
Expand Down
6 changes: 3 additions & 3 deletions sdk/messaging/azeventhubs/internal/eh/stress/Chart.lock
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
dependencies:
- name: stress-test-addons
repository: https://stresstestcharts.blob.core.windows.net/helm/
version: 0.3.0
digest: sha256:3e21a7fdf5d6b37e871a6dd9f755888166fbb24802aa517f51d1d9223b47656e
generated: "2023-09-26T11:39:54.587519919-07:00"
version: 0.3.1
digest: sha256:28e374f8db5c46447b2a1491d4361ceb126536c425cbe54be49017120fe7b27d
generated: "2024-03-01T17:51:06.962215142-08:00"
9 changes: 3 additions & 6 deletions sdk/messaging/azservicebus/CHANGELOG.md
Original file line number Diff line number Diff line change
@@ -1,14 +1,11 @@
# Release History

## 1.6.1 (Unreleased)

### Features Added

### Breaking Changes
## 1.6.1 (2024-03-05)

### Bugs Fixed

### Other Changes
- Fixed case where closing a Receiver/Sender after an idle period would take > 20 seconds. (PR#22509)
- Fixed a potential memory leak when receiving a message on one receiver and attempting to settle with another. (PR#22431)

## 1.6.0 (2024-01-17)

Expand Down
2 changes: 1 addition & 1 deletion sdk/messaging/azservicebus/go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ require (
github.com/Azure/azure-sdk-for-go/sdk/azcore v1.9.2
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.5.1
github.com/Azure/azure-sdk-for-go/sdk/internal v1.5.2
github.com/Azure/go-amqp v1.0.4
github.com/Azure/go-amqp v1.0.5
)

require (
Expand Down
4 changes: 2 additions & 2 deletions sdk/messaging/azservicebus/go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -7,8 +7,8 @@ github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.5.1 h1:sO0/P7g68FrryJzljemN+
github.com/Azure/azure-sdk-for-go/sdk/azidentity v1.5.1/go.mod h1:h8hyGFDsU5HMivxiS2iYFZsgDbU9OnnJ163x5UGVKYo=
github.com/Azure/azure-sdk-for-go/sdk/internal v1.5.2 h1:LqbJ/WzJUwBf8UiaSzgX7aMclParm9/5Vgp+TY51uBQ=
github.com/Azure/azure-sdk-for-go/sdk/internal v1.5.2/go.mod h1:yInRyqWXAuaPrgI7p70+lDDgh3mlBohis29jGMISnmc=
github.com/Azure/go-amqp v1.0.4 h1:GX5OFOs706UjuFRD5PDKm3aOuLQ92F7DMbua+DKAYCc=
github.com/Azure/go-amqp v1.0.4/go.mod h1:vZAogwdrkbyK3Mla8m/CxSc/aKdnTZ4IbPxl51Y5WZE=
github.com/Azure/go-amqp v1.0.5 h1:po5+ljlcNSU8xtapHTe8gIc8yHxCzC03E8afH2g1ftU=
github.com/Azure/go-amqp v1.0.5/go.mod h1:vZAogwdrkbyK3Mla8m/CxSc/aKdnTZ4IbPxl51Y5WZE=
github.com/AzureAD/microsoft-authentication-library-for-go v1.2.2 h1:XHOnouVk1mxXfQidrMEnLlPk9UMeRtyBTnEFtxkV0kU=
github.com/AzureAD/microsoft-authentication-library-for-go v1.2.2/go.mod h1:wP83P5OoQ5p6ip3ScPr0BAq0BvuPAvacpEuSzyouqAI=
github.com/davecgh/go-spew v1.1.1 h1:vj9j/u1bqnvCEfJOwUhtlOARqs3+rkHYY13jYWTU97c=
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,10 @@ matrix:
mostlyIdleReceiver:
testTarget: mostlyIdleReceiver
memory: "0.5Gi"
openCloseMeasurements:
testTarget: openCloseMeasurements
memory: "0.5Gi"
args: -rounds 100
rapidOpenClose:
testTarget: rapidOpenClose
memory: "0.5Gi"
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -81,6 +81,13 @@ type StressContextOptions struct {
// Duration is the amount of time the stress test should run before
// the StressContext.Context expires.
Duration time.Duration

// CommonBaggage will be added as part of the telemetry client, and will be included in each
// metric/event/error that's reported.
CommonBaggage map[string]string

// EmitStartEvent enables the automatic sending of the "Start" event for our test to telemetry.
EmitStartEvent bool
}

func MustCreateStressContext(testName string, options *StressContextOptions) *StressContext {
Expand All @@ -107,6 +114,12 @@ func MustCreateStressContext(testName string, options *StressContextOptions) *St
"TestRunId": testRunID,
}

if options != nil && options.CommonBaggage != nil {
for k, v := range options.CommonBaggage {
telemetryClient.Context().CommonProperties[k] = v
}
}

log.Printf("Common properties\n:%#v", telemetryClient.Context().CommonProperties)

ctx, cancel := NewCtrlCContext()
Expand Down Expand Up @@ -144,7 +157,7 @@ func MustCreateStressContext(testName string, options *StressContextOptions) *St
// return nil
// })

return &StressContext{
sc := &StressContext{
TestRunID: testRunID,
Nano: testRunID, // the same for now
ConnectionString: cs,
Expand All @@ -156,6 +169,12 @@ func MustCreateStressContext(testName string, options *StressContextOptions) *St
Context: ctx,
cancel: cancel,
}

if options != nil && options.EmitStartEvent {
sc.Start(testName, nil)
}

return sc
}

func (sc *StressContext) Start(entityName string, attributes map[string]string) {
Expand All @@ -173,7 +192,7 @@ func (sc *StressContext) Start(entityName string, attributes map[string]string)
}

func (sc *StressContext) End() {
log.Printf("Stopping and flushing telemetry")
log.Printf("Stopping and flushing telemetry: %#v", sc.TC.Context().CommonProperties)

sc.cancel()

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,7 +18,7 @@ spec:
set -ex;
mkdir -p "$DEBUG_SHARE";
{{ if ne .Stress.benchmark true }}
/app/stress tests "{{ .Stress.testTarget }}" 2>&1 | tee -a "${DEBUG_SHARE}/{{ .Stress.Scenario }}-`date +%s`.log";
/app/stress tests "{{ .Stress.testTarget }}" {{ default "" .Stress.args }} 2>&1 | tee -a "${DEBUG_SHARE}/{{ .Stress.Scenario }}-`date +%s`.log";
{{ else }}
/app/benchmarks.test -test.timeout 24h -test.bench {{ .Stress.testTarget }} 2>&1 | tee -a "${DEBUG_SHARE}/{{ .Stress.Scenario }}-`date +%s`.log";
{{ end }}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,112 @@
// Copyright (c) Microsoft Corporation. All rights reserved.
// Licensed under the MIT License.

package tests

import (
"context"
"flag"
"fmt"
"log"
"strconv"
"sync"
"time"

"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/admin"
"github.com/Azure/azure-sdk-for-go/sdk/messaging/azservicebus/internal/stress/shared"
)

// OpenCloseMeasurements tests that we are able to consistently open and close our links and connections
// in a timely way. This test doesn't immediately fail, it's primary purpose is just to provide historical
// and measurable data on our performance.
//
// The origin of this test was a bug we found in go-amqp where, if the frame was too small, it wouldn't parse
// and return it until we received more data (any data, just so long as it caused our total local buffer to exceed
// 8 bytes) [PR#320]
//
// PR#320: https://github.com/Azure/go-amqp/pull/320
func OpenCloseMeasurements(remainingArgs []string) {
type testArgs struct {
SleepDuration time.Duration
MessageCount int
BodySize int
}

fs := flag.NewFlagSet("args", flag.PanicOnError)
numRounds := fs.Int("rounds", 10, "The number of rounds of sends and closes to run")
_ = fs.Parse(remainingArgs)

fn := func(args testArgs) {
sc := shared.MustCreateStressContext("OpenCloseMeasurements", &shared.StressContextOptions{
CommonBaggage: map[string]string{
"SleepDuration": args.SleepDuration.String(),
"MessageCount": strconv.FormatInt(int64(args.MessageCount), 10),
"BodySize": strconv.FormatInt(int64(args.BodySize), 10),
},
EmitStartEvent: true,
})

defer sc.End()

queueName := fmt.Sprintf("OpenCloseMeasurements-%s", sc.Nano)
_ = shared.MustCreateAutoDeletingQueue(sc, queueName, &admin.QueueProperties{})

client, err := azservicebus.NewClientFromConnectionString(sc.ConnectionString, nil)
sc.PanicOnError("failed to create client", err)

trackingSender, err := shared.NewTrackingSender(sc.TC, client, queueName, nil)
sc.PanicOnError("failed to create sender", err)

log.Printf("Sending message to warm up connection and links.")

body := make([]byte, args.BodySize)

for i := 0; i < args.MessageCount; i++ {
err = trackingSender.SendMessage(context.Background(), &azservicebus.Message{
Body: body,
}, nil)
sc.NoErrorf(err, "failed to send message %d", i)
}

log.Printf("Sleeping for %s, done at %s...", args.SleepDuration, time.Now().Add(args.SleepDuration))
time.Sleep(args.SleepDuration)

log.Printf("Done sleeping, now attempting to close link")
// the error is reported for now to metrics - not going to kill this as we have a bug where
// the "detach because idle" error comes back from Close() right now.

start := time.Now()
max := 10 * time.Second
_ = trackingSender.Close(context.Background())

if time.Since(start) > max {
sc.PanicOnError("Slow close", fmt.Errorf("Took longer than %s", max))
}
}

// some simple cases
testCases := []testArgs{
{1 * time.Minute, 1, 10},
{5 * time.Minute, 100, 100},
{5 * time.Minute, 100, 10000},
{5 * time.Minute, 1, 10},
{10*time.Minute + 30*time.Second, 1, 10},
{11 * time.Minute, 1, 10},
}

for i := 0; i < *numRounds; i++ {
wg := sync.WaitGroup{}

for _, args := range testCases {
wg.Add(1)

go func(args testArgs) {
defer wg.Done()
fn(args)
}(args)
}

wg.Wait()
}
}
1 change: 1 addition & 0 deletions sdk/messaging/azservicebus/internal/stress/tests/tests.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func Run(remainingArgs []string) {
"infiniteSendAndReceive": InfiniteSendAndReceiveRun,
"longRunningRenewLock": LongRunningRenewLockTest,
"mostlyIdleReceiver": MostlyIdleReceiver,
"openCloseMeasurements": OpenCloseMeasurements,
"rapidOpenClose": RapidOpenCloseTest,
"receiveCancellation": ReceiveCancellation,
"sendAndReceiveDrain": SendAndReceiveDrain,
Expand Down

0 comments on commit c498179

Please sign in to comment.