Skip to content

Commit

Permalink
[Service Bus] Fix test failures (#23081)
Browse files Browse the repository at this point in the history
  • Loading branch information
HarshaNalluru committed Sep 7, 2022
1 parent f646db8 commit 9f2a8de
Show file tree
Hide file tree
Showing 2 changed files with 115 additions and 35 deletions.
1 change: 1 addition & 0 deletions sdk/servicebus/service-bus/karma.conf.js
Expand Up @@ -46,6 +46,7 @@ module.exports = function (config) {
// environment values MUST be exported or set with same console running "karma start"
// https://www.npmjs.com/package/karma-env-preprocessor
envPreprocessor: [
"SERVICEBUS_CONNECTION_STRING_PREMIUM",
"SERVICEBUS_CONNECTION_STRING",
"AZURE_CLIENT_ID",
"AZURE_CLIENT_SECRET",
Expand Down
149 changes: 114 additions & 35 deletions sdk/servicebus/service-bus/test/internal/sendBatch.spec.ts
Expand Up @@ -6,7 +6,12 @@ const should = chai.should();
const assert = chai.assert;
import chaiAsPromised from "chai-as-promised";
chai.use(chaiAsPromised);
import { OperationOptions, ServiceBusMessage } from "../../src";
import {
OperationOptions,
ServiceBusAdministrationClient,
ServiceBusClient,
ServiceBusMessage,
} from "../../src";
import { TestClientType } from "../public/utils/testUtils";
import {
EntityName,
Expand All @@ -16,6 +21,8 @@ import {
getRandomTestClientTypeWithNoSessions,
} from "../public/utils/testutils2";
import { ServiceBusSender, ServiceBusSenderImpl } from "../../src/sender";
import { getEnvVarValue } from "../public/utils/envVarUtils";
import { delay } from "@azure/core-util";

describe("Send Batch", () => {
let sender: ServiceBusSender;
Expand All @@ -34,9 +41,7 @@ describe("Send Batch", () => {
});

async function beforeEachTest(entityType: TestClientType): Promise<void> {
entityNames = await serviceBusClient.test.createTestEntities(entityType, {
maxMessageSizeInKilobytes: 102400,
});
entityNames = await serviceBusClient.test.createTestEntities(entityType);

sender = serviceBusClient.test.addToCleanup(
serviceBusClient.createSender(entityNames.queue ?? entityNames.topic!)
Expand Down Expand Up @@ -260,37 +265,6 @@ describe("Send Batch", () => {
});
});

describe("Send single message - size > 1 MB", function (): void {
afterEach(async () => {
await afterEachTest();
});

function prepareMessage(useSessions: boolean): ServiceBusMessage {
return {
body: Buffer.alloc(1024 * 1024),
sessionId: useSessions ? `s` : undefined,
};
}

async function testSend(): Promise<void> {
// Prepare messages to send
const messageToSend = prepareMessage(entityNames.usesSessions);
await sender.sendMessages(messageToSend);
// receive all the messages in receive and delete mode
await serviceBusClient.test.verifyAndDeleteAllSentMessages(entityNames, [messageToSend]);
}

it(`${noSessionTestClientType}: SendBatch`, async function (): Promise<void> {
await beforeEachTest(noSessionTestClientType);
await testSend();
});

it(`${withSessionTestClientType}: SendBatch`, async function (): Promise<void> {
await beforeEachTest(withSessionTestClientType);
await testSend();
});
});

describe("Send multiple heterogenous messages - size > max_batch_size_allowed", function (): void {
afterEach(async () => {
await afterEachTest();
Expand Down Expand Up @@ -546,3 +520,108 @@ describe("Send Batch", () => {
);
});
});

describe("Premium namespaces - Sending", () => {
const premiumConnectionString = getEnvVarValue("SERVICEBUS_CONNECTION_STRING_PREMIUM");
let atomClient: ServiceBusAdministrationClient;

before(function (this: Mocha.Context) {
if (!premiumConnectionString) {
this.skip();
}
atomClient = new ServiceBusAdministrationClient(premiumConnectionString);
});
let sender: ServiceBusSender;
let serviceBusClient: ServiceBusClient;
let queueName: string | undefined;
let topicName: string | undefined;
let subscriptionName: string | undefined;
let withSessions: boolean;

const noSessionTestClientType =
Math.random() > 0.5 ? TestClientType.UnpartitionedQueue : TestClientType.UnpartitionedTopic;
const withSessionTestClientType =
Math.random() > 0.5
? TestClientType.UnpartitionedQueueWithSessions
: TestClientType.UnpartitionedTopicWithSessions;

before(() => {
serviceBusClient = new ServiceBusClient(premiumConnectionString || "");
});

after(async () => {
await serviceBusClient.close();
});

async function beforeEachTest(entityType: TestClientType): Promise<void> {
atomClient = new ServiceBusAdministrationClient(premiumConnectionString || "");
withSessions = !entityType.includes("WithSessions");
const randomSeed = Math.ceil(Math.random() * 10000 + 1000);
const isQueue = entityType.includes("Queue");
if (isQueue) {
queueName = "queue-" + randomSeed;
await atomClient.createQueue(queueName, {
requiresSession: withSessions,
maxMessageSizeInKilobytes: 10240,
}); // 10 MB
sender = serviceBusClient.createSender(queueName);
} else {
topicName = "topic-" + randomSeed;
subscriptionName = "subscription-" + randomSeed;
await atomClient.createTopic(topicName, { maxMessageSizeInKilobytes: 10240 }); // 10 MB
await atomClient.createSubscription(topicName, subscriptionName, {
requiresSession: withSessions,
});
sender = serviceBusClient.createSender(topicName);
}
}

async function afterEachTest(): Promise<void> {
await sender.close();
if (queueName) {
await atomClient.deleteQueue(queueName);
queueName = undefined;
} else if (topicName) {
await atomClient.deleteTopic(topicName);
topicName = undefined;
}
}

describe("Send single message - size > 1 MB (Large messages)", function (): void {
afterEach(async () => {
await afterEachTest();
});

function prepareMessage(useSessions: boolean): ServiceBusMessage {
return {
body: Buffer.alloc(1024 * 1024),
sessionId: useSessions ? `s` : undefined,
};
}

async function testSend(): Promise<void> {
// Prepare messages to send
const messageToSend = prepareMessage(withSessions);
await sender.sendMessages(messageToSend);
await delay(1000);
should.equal(
queueName
? (await atomClient.getQueueRuntimeProperties(queueName)).totalMessageCount
: (await atomClient.getSubscriptionRuntimeProperties(topicName!, subscriptionName!))
.totalMessageCount,
1,
`Unexpected number of messages are present in the entity.`
);
}

it(`${noSessionTestClientType}: SendBatch`, async function (): Promise<void> {
await beforeEachTest(noSessionTestClientType);
await testSend();
});

it(`${withSessionTestClientType}: SendBatch`, async function (): Promise<void> {
await beforeEachTest(withSessionTestClientType);
await testSend();
});
});
});

0 comments on commit 9f2a8de

Please sign in to comment.