diff --git a/src/connection.ts b/src/connection.ts index 973527b..f8c191d 100644 --- a/src/connection.ts +++ b/src/connection.ts @@ -1,31 +1,57 @@ -import { ConnectionEvents, Connection as RheaConnection } from "rhea" +import { ConnectionEvents, create_container, Connection as RheaConnection } from "rhea" +import { AmqpManagement, Management } from "./management.js" +import { EnvironmentParams } from "./environment.js" export interface Connection { close(): Promise isOpen(): boolean + management(): Management } export class AmqpConnection implements Connection { - private readonly rheaConnection: RheaConnection + static async create(params: EnvironmentParams) { + const connection = await AmqpConnection.open(params) + const topologyManagement = await AmqpManagement.create(connection) + return new AmqpConnection(connection, topologyManagement) + } + + private static async open(params: EnvironmentParams): Promise { + return new Promise((res, rej) => { + const container = create_container() + container.once(ConnectionEvents.connectionOpen, (context) => { + return res(context.connection) + }) + container.once(ConnectionEvents.error, (context) => { + return rej(context.error ?? new Error("Connection error occurred")) + }) - constructor(connection: RheaConnection) { - this.rheaConnection = connection + container.connect(params) + }) } + constructor( + private readonly connection: RheaConnection, + private readonly topologyManagement: Management + ) {} + async close(): Promise { return new Promise((res, rej) => { - this.rheaConnection.once(ConnectionEvents.connectionClose, () => { + this.connection.once(ConnectionEvents.connectionClose, () => { return res(true) }) - this.rheaConnection.once(ConnectionEvents.connectionError, (context) => { + this.connection.once(ConnectionEvents.connectionError, (context) => { return rej(new Error("Connection error: " + context.connection.error)) }) - this.rheaConnection.close() + this.connection.close() }) } + management(): Management { + return this.topologyManagement + } + public isOpen(): boolean { - return this.rheaConnection.is_open() + return this.connection ? this.connection.is_open() : false } } diff --git a/src/environment.ts b/src/environment.ts index b56a129..658fd1b 100644 --- a/src/environment.ts +++ b/src/environment.ts @@ -1,6 +1,4 @@ -import { ConnectionEvents, Container, create_container } from "rhea" import { AmqpConnection, Connection } from "./connection.js" -import { Connection as RheaConnection } from "rhea" export interface Environment { createConnection(): Promise @@ -15,42 +13,17 @@ export type EnvironmentParams = { } export class AmqpEnvironment implements Environment { - private readonly host: string - private readonly port: number - private readonly username: string - private readonly password: string - private readonly container: Container - private readonly connections: Connection[] = [] - - constructor({ host, port, username, password }: EnvironmentParams) { - this.host = host - this.port = port - this.username = username - this.password = password - this.container = create_container() - } + constructor( + private readonly params: EnvironmentParams, + private readonly connections: Connection[] = [] + ) {} async createConnection(): Promise { - const rheaConnection = await this.openConnection() - const connection = new AmqpConnection(rheaConnection) + const connection = await AmqpConnection.create(this.params) this.connections.push(connection) - return connection } - private async openConnection(): Promise { - return new Promise((res, rej) => { - this.container.once(ConnectionEvents.connectionOpen, (context) => { - return res(context.connection) - }) - this.container.once(ConnectionEvents.error, (context) => { - return rej(context.error ?? new Error("Connection error occurred")) - }) - - this.container.connect({ host: this.host, port: this.port, username: this.username, password: this.password }) - }) - } - async close(): Promise { await this.closeConnections() this.connections.length = 0 diff --git a/src/management.ts b/src/management.ts index 31db745..58e2dc1 100644 --- a/src/management.ts +++ b/src/management.ts @@ -1,16 +1,161 @@ -import { AmqpQueueSpec, QueueSpec } from "./queue.js" +import { AmqpQueue, Queue, QueueOptions, QueueType } from "./queue.js" +import { + EventContext, + Receiver, + ReceiverEvents, + ReceiverOptions, + Connection as RheaConnection, + Sender, + SenderEvents, + SenderOptions, +} from "rhea" +import { AmqpEndpoints, AmqpMethods, MessageBuilder, ME } from "./message_builder.js" +import { CreateQueueResponseDecoder, DeleteQueueResponseDecoder } from "./response_decoder.js" + +type LinkOpenEvents = SenderEvents.senderOpen | ReceiverEvents.receiverOpen +type LinkErrorEvents = SenderEvents.senderError | ReceiverEvents.receiverError +type OpenLinkMethods = + | ((options?: SenderOptions | string) => Sender) + | ((options?: ReceiverOptions | string) => Receiver) + +const MANAGEMENT_NODE_CONFIGURATION: SenderOptions | ReceiverOptions = { + snd_settle_mode: 1, + rcv_settle_mode: 0, + name: "management-link-pair", + target: { address: "/management", expiry_policy: "LINK_DETACH", timeout: 0, dynamic: false }, + source: { address: "/management", expiry_policy: "LINK_DETACH", timeout: 0, dynamic: false, durable: 0 }, + properties: { paired: true }, +} export interface Management { - queue: (queueName: string) => QueueSpec + declareQueue: (queueName: string, options?: Partial) => Promise + deleteQueue: (queueName: string) => Promise close: () => void } export class AmqpManagement implements Management { - constructor() {} + static async create(connection: RheaConnection): Promise { + const senderLink = await AmqpManagement.openSender(connection) + const receiverLink = await AmqpManagement.openReceiver(connection) + return new AmqpManagement(connection, senderLink, receiverLink) + } + + constructor( + private readonly connection: RheaConnection, + private senderLink: Sender, + private receiverLink: Receiver + ) { + console.log(this.receiverLink.is_open()) + } + + private static async openReceiver(connection: RheaConnection): Promise { + return AmqpManagement.openLink( + connection, + ReceiverEvents.receiverOpen, + ReceiverEvents.receiverError, + connection.open_receiver.bind(connection), + MANAGEMENT_NODE_CONFIGURATION + ) + } + + private static async openSender(connection: RheaConnection): Promise { + return AmqpManagement.openLink( + connection, + SenderEvents.senderOpen, + SenderEvents.senderError, + connection.open_sender.bind(connection), + MANAGEMENT_NODE_CONFIGURATION + ) + } + + private static async openLink( + connection: RheaConnection, + successEvent: LinkOpenEvents, + errorEvent: LinkErrorEvents, + openMethod: OpenLinkMethods, + config?: SenderOptions | ReceiverOptions | string + ): Promise { + return new Promise((res, rej) => { + connection.once(successEvent, (context) => { + return res(context.receiver || context.sender) + }) + connection.once(errorEvent, (context) => { + return rej(context.connection.error) + }) + openMethod(config) + }) + } + + close(): void { + if (this.connection.is_closed()) return + + this.closeSender() + this.closeReceiver() + } + + private closeSender(): void { + this.senderLink.close() + } + + private closeReceiver(): void { + this.senderLink.close() + } + + async declareQueue(queueName: string, options: Partial = {}): Promise { + return new Promise((res, rej) => { + this.receiverLink.once(ReceiverEvents.message, (context: EventContext) => { + if (!context.message) { + return rej(new Error("Receiver has not received any message")) + } + + const response = new CreateQueueResponseDecoder().decodeFrom(context.message, String(message.message_id)) + if (response.status === "error") { + return rej(response.error) + } - close() {} + return res(new AmqpQueue(response.body)) + }) - queue(queueName: string): QueueSpec { - return new AmqpQueueSpec().name(queueName) + const message = new MessageBuilder() + .sendTo(`/${AmqpEndpoints.Queues}/${encodeURIComponent(queueName)}`) + .setReplyTo(ME) + .setAmqpMethod(AmqpMethods.PUT) + .setBody({ + exclusive: options.exclusive ?? false, + durable: options.durable ?? false, + auto_delete: options.autoDelete ?? false, + arguments: buildArgumentsFrom(options.type, options.arguments), + }) + .build() + this.senderLink.send(message) + }) } + + async deleteQueue(queueName: string): Promise { + return new Promise((res, rej) => { + this.receiverLink.once(ReceiverEvents.message, (context: EventContext) => { + if (!context.message) { + return rej(new Error("Receiver has not received any message")) + } + + const response = new DeleteQueueResponseDecoder().decodeFrom(context.message, String(message.message_id)) + if (response.status === "error") { + return rej(response.error) + } + + return res(true) + }) + + const message = new MessageBuilder() + .sendTo(`/${AmqpEndpoints.Queues}/${encodeURIComponent(queueName)}`) + .setReplyTo(ME) + .setAmqpMethod(AmqpMethods.DELETE) + .build() + this.senderLink.send(message) + }) + } +} + +function buildArgumentsFrom(queueType?: QueueType, queueOptions?: Record) { + return { ...(queueOptions ?? {}), ...(queueType ? { "x-queue-type": queueType } : {}) } } diff --git a/src/message_builder.ts b/src/message_builder.ts new file mode 100644 index 0000000..928a78e --- /dev/null +++ b/src/message_builder.ts @@ -0,0 +1,58 @@ +import { generate_uuid, Message } from "rhea" + +export enum AmqpMethods { + PUT = "PUT", + DELETE = "DELETE", + GET = "GET", +} + +export enum AmqpEndpoints { + Queues = "queues", +} + +export const ME = "$me" + +export class MessageBuilder { + private messageId: string = generate_uuid() + private to: string = "" + private replyTo: string = ME + private method: AmqpMethods = AmqpMethods.GET + private body: unknown + + constructor() {} + + setMessageId(id: string) { + this.messageId = id + return this + } + + sendTo(to: string) { + this.to = to + return this + } + + setReplyTo(replyTo: string) { + this.replyTo = replyTo + return this + } + + setAmqpMethod(method: AmqpMethods) { + this.method = method + return this + } + + setBody(body: unknown) { + this.body = body + return this + } + + build(): Message { + return { + message_id: this.messageId, + to: this.to, + reply_to: this.replyTo, + subject: this.method, + body: this.body, + } + } +} diff --git a/src/queue.ts b/src/queue.ts index b071d85..e02ff37 100644 --- a/src/queue.ts +++ b/src/queue.ts @@ -1,53 +1,39 @@ -export interface QueueInfo { - name: string +export type QueueType = "classic" | "stream" | "quorum" + +export type QueueOptions = { + type: QueueType + exclusive: boolean + autoDelete: boolean + durable: boolean + arguments: Record } -export class AmqpQueueInfo implements QueueInfo { - private queueName: string - - constructor(params: { name: string; exclusive: boolean; autoDelete: boolean }) { - this.queueName = params.name - } - - public get name(): string { - return this.queueName - } +export type QueueInfo = { + name: string + durable: boolean + autoDelete: boolean + exclusive: boolean + type: QueueType + leader: string + replicas: string[] + messageCount: number + consumerCount: number + arguments: Record } -export interface QueueSpec { - name: (queueName: string) => QueueSpec - exclusive: (isExclusive: boolean) => QueueSpec - autoDelete: (isAutoDelete: boolean) => QueueSpec - declare: () => QueueInfo +export type DeletedQueueInfo = { + name: string + deleted: boolean } -export class AmqpQueueSpec implements QueueSpec { - private queueName: string - private isExclusive: boolean - private isAutoDelete: boolean - - constructor() { - this.queueName = "default-queue-name" - this.isExclusive = false - this.isAutoDelete = false - } - - name(queueName: string): QueueSpec { - this.queueName = queueName - return this - } - - exclusive(isExclusive: boolean): QueueSpec { - this.isExclusive = isExclusive - return this - } +export interface Queue { + getInfo: QueueInfo +} - autoDelete(isAutoDelete: boolean): QueueSpec { - this.isAutoDelete = isAutoDelete - return this - } +export class AmqpQueue implements Queue { + constructor(private readonly info: QueueInfo) {} - declare(): QueueInfo { - return new AmqpQueueInfo({ name: this.queueName, exclusive: this.isExclusive, autoDelete: this.isAutoDelete }) + public get getInfo(): QueueInfo { + return this.info } } diff --git a/src/response_decoder.ts b/src/response_decoder.ts new file mode 100644 index 0000000..3c2eafa --- /dev/null +++ b/src/response_decoder.ts @@ -0,0 +1,47 @@ +import { Message } from "rhea" +import { AUTO_DELETE, DURABLE, EXCLUSIVE, isError, queueTypeFromString, Result } from "./utils.js" +import { DeletedQueueInfo, QueueInfo } from "./queue.js" + +interface ResponseDecoder { + decodeFrom: (receivedMessage: Message, sentMessageId: string) => Result +} + +export class CreateQueueResponseDecoder implements ResponseDecoder { + decodeFrom(receivedMessage: Message, sentMessageId: string): Result { + if (isError(receivedMessage) || sentMessageId !== receivedMessage.correlation_id) { + return { status: "error", error: new Error(`Message Error: ${receivedMessage.subject}`) } + } + + return { + status: "ok", + body: { + name: receivedMessage.body.name, + durable: receivedMessage.body.durable === DURABLE, + autoDelete: receivedMessage.body.auto_delete === AUTO_DELETE, + exclusive: receivedMessage.body.exclusive === EXCLUSIVE, + type: queueTypeFromString(receivedMessage.body.type), + arguments: receivedMessage.body.arguments ?? {}, + leader: receivedMessage.body.leader, + replicas: receivedMessage.body.replicas, + messageCount: receivedMessage.body.message_count, + consumerCount: receivedMessage.body.consumer_count, + }, + } + } +} + +export class DeleteQueueResponseDecoder implements ResponseDecoder { + decodeFrom(receivedMessage: Message, sentMessageId: string): Result { + if (isError(receivedMessage) || sentMessageId !== receivedMessage.correlation_id) { + return { status: "error", error: new Error(`Message Error: ${receivedMessage.subject}`) } + } + + return { + status: "ok", + body: { + name: receivedMessage.body.name, + deleted: true, + }, + } + } +} diff --git a/src/utils.ts b/src/utils.ts new file mode 100644 index 0000000..3dfd3e3 --- /dev/null +++ b/src/utils.ts @@ -0,0 +1,48 @@ +import { Message } from "rhea" +import { QueueType } from "./queue.js" + +export enum AmqpResponseCodes { + OK = "200", + CREATED = "201", + NO_CONTENT = "204", + BAD_REQUEST = "400", + NOT_FOUND = "404", + CONFLICT = "409", +} + +export const DURABLE = 1 +export const AUTO_DELETE = 1 +export const EXCLUSIVE = 1 + +export type Result = OkResult | ErrorResult + +type OkResult = { + status: "ok" + body: T +} + +type ErrorResult = { + status: "error" + error: K +} + +export function isError(message: Message): boolean { + return ( + message.subject === AmqpResponseCodes.BAD_REQUEST || + message.subject === AmqpResponseCodes.NOT_FOUND || + message.subject === AmqpResponseCodes.CONFLICT + ) +} + +export function queueTypeFromString(queueType: string): QueueType { + switch (queueType) { + case "classic": + return "classic" + case "quorum": + return "quorum" + case "stream": + return "stream" + default: + throw new Error(`Unsupported queue type: ${queueType}`) + } +} diff --git a/test/e2e/management.test.ts b/test/e2e/management.test.ts index a29b90f..e3b5a9d 100644 --- a/test/e2e/management.test.ts +++ b/test/e2e/management.test.ts @@ -1,21 +1,59 @@ +import { Management } from "../../src/index.js" import { afterEach, beforeEach, describe, expect, test } from "vitest" -import { AmqpManagement, Management } from "../../src/index.js" -import { existsQueue } from "../support/util.js" +import { createQueue, eventually, existsQueue, getQueueInfo, host, password, port, username } from "../support/util.js" +import { createEnvironment, Environment } from "../../src/environment.js" +import { Connection } from "../../src/connection.js" -describe.skip("Management", () => { +describe("Management", () => { + let environment: Environment + let connection: Connection let management: Management - beforeEach(() => { - management = new AmqpManagement() + beforeEach(async () => { + environment = createEnvironment({ + host, + port, + username, + password, + }) + connection = await environment.createConnection() + management = connection.management() }) - afterEach(() => { - management.close() + afterEach(async () => { + try { + await management.close() + await connection.close() + await environment.close() + } catch (error) { + console.error(error) + } }) test("create a queue through the management", async () => { - const queue = management.queue("test-coda").exclusive(true).autoDelete(true).declare() + const queue = await management.declareQueue("test-queue") - expect(await existsQueue(queue.name)).to.eql(true) + await eventually(async () => { + const queueInfo = await getQueueInfo(queue.getInfo.name) + expect(queueInfo.ok).to.eql(true) + expect(queue.getInfo.arguments).to.eql(queueInfo.body.arguments) + expect(queue.getInfo.autoDelete).to.eql(queueInfo.body.auto_delete) + expect(queue.getInfo.durable).to.eql(queueInfo.body.durable) + expect(queue.getInfo.exclusive).to.eql(queueInfo.body.exclusive) + expect(queue.getInfo.consumerCount).to.eql(queueInfo.body.consumers) + expect(queue.getInfo.messageCount).to.eql(queueInfo.body.messages) + expect(queue.getInfo.type).to.eql(queueInfo.body.type) + expect(queue.getInfo.leader).to.eql(queueInfo.body.node) + }) + }) + + test("delete a queue through the management", async () => { + await createQueue("test-queue") + + await management.deleteQueue("test-queue") + + await eventually(async () => { + expect(await existsQueue("test-queue")).to.eql(false) + }) }) }) diff --git a/test/support/rhea_utils.ts b/test/support/rhea_utils.ts new file mode 100644 index 0000000..631a222 --- /dev/null +++ b/test/support/rhea_utils.ts @@ -0,0 +1,100 @@ +import { + Connection, + ConnectionEvents, + ConnectionOptions, + Container, + Receiver, + ReceiverEvents, + ReceiverOptions, + Sender, + SenderEvents, + SenderOptions, +} from "rhea" + +export async function openConnection(container: Container, params: ConnectionOptions): Promise { + return new Promise((res, rej) => { + container.once(ConnectionEvents.connectionOpen, (context) => { + return res(context.connection) + }) + container.once(ConnectionEvents.error, (context) => { + return rej(context.connection.error) + }) + container.connect(params) + }) +} + +export async function closeConnection(connection: Connection): Promise { + return new Promise((res, rej) => { + connection.once(ConnectionEvents.connectionClose, () => { + res() + }) + connection.once(ConnectionEvents.connectionError, (context) => { + rej(new Error("Connection error: " + context.connection.error)) + }) + connection.close() + }) +} + +const MANAGEMENT_NODE_CONFIGURATION: SenderOptions | ReceiverOptions = { + snd_settle_mode: 1, + rcv_settle_mode: 0, + name: "management-link-pair", + target: { address: "/management", expiry_policy: "LINK_DETACH", timeout: 0, dynamic: false }, + source: { address: "/management", expiry_policy: "LINK_DETACH", timeout: 0, dynamic: false, durable: 0 }, + properties: { paired: true }, +} + +type LinkOpenEvents = SenderEvents.senderOpen | ReceiverEvents.receiverOpen +type LinkErrorEvents = SenderEvents.senderError | ReceiverEvents.receiverError +type OpenLinkMethods = + | ((options?: SenderOptions | string) => Sender) + | ((options?: ReceiverOptions | string) => Receiver) +export type RheaManagement = { + receiver: Receiver + sender: Sender +} + +export async function openManagement(connection: Connection): Promise { + const receiver = await openReceiver(connection) + const sender = await openSender(connection) + + return { receiver, sender } +} + +async function openReceiver(connection: Connection) { + return openLink( + connection, + ReceiverEvents.receiverOpen, + ReceiverEvents.receiverError, + connection.open_receiver.bind(connection), + MANAGEMENT_NODE_CONFIGURATION + ) +} + +async function openSender(connection: Connection) { + return openLink( + connection, + SenderEvents.senderOpen, + SenderEvents.senderError, + connection.open_sender.bind(connection), + MANAGEMENT_NODE_CONFIGURATION + ) +} + +async function openLink( + connection: Connection, + successEvent: LinkOpenEvents, + errorEvent: LinkErrorEvents, + openMethod: OpenLinkMethods, + config?: SenderOptions | ReceiverOptions | string +): Promise { + return new Promise((res, rej) => { + connection.once(successEvent, (context) => { + return res(context.receiver || context.sender) + }) + connection.once(errorEvent, (context) => { + return rej(context.connection.error) + }) + openMethod(config) + }) +} diff --git a/test/support/util.ts b/test/support/util.ts index 40f91f5..f75ec80 100644 --- a/test/support/util.ts +++ b/test/support/util.ts @@ -9,6 +9,14 @@ export type ConnectionInfoResponse = { export type QueueInfoResponse = { name: string + node: string + messages: number + consumers: number + arguments: Record + auto_delete: boolean + durable: boolean + exclusive: boolean + type: string } export const host = process.env.RABBITMQ_HOSTNAME ?? "localhost" @@ -40,7 +48,7 @@ export async function existsQueue(queueName: string): Promise { return response.ok } -async function getQueueInfo(queue: string): Promise> { +export async function getQueueInfo(queue: string): Promise> { const response = await got.get(`http://${host}:${managementPort}/api/queues/${vhost}/${queue}`, { headers: { Authorization: `Basic ${Buffer.from(`${username}:${password}`).toString("base64")}`, @@ -52,6 +60,20 @@ async function getQueueInfo(queue: string): Promise> return response } +export async function createQueue(queue: string): Promise { + const response = await got.put(`http://${host}:${managementPort}/api/queues/${vhost}/${queue}`, { + headers: { + Authorization: `Basic ${Buffer.from(`${username}:${password}`).toString("base64")}`, + }, + json: { + name: queue, + }, + throwHttpErrors: false, + }) + + return response.ok +} + export async function wait(ms: number) { return new Promise((res) => { setTimeout(() => res(true), ms) diff --git a/test/unit/rhea/connection.test.ts b/test/unit/rhea/connection.test.ts index b0fa72e..ccb23e7 100644 --- a/test/unit/rhea/connection.test.ts +++ b/test/unit/rhea/connection.test.ts @@ -1,20 +1,9 @@ import { afterEach, beforeEach, describe, expect, test } from "vitest" import { host, port, username, password, numberOfConnections, eventually } from "../../support/util.js" -import { - Connection, - ConnectionEvents, - ConnectionOptions, - Container, - create_container, - Receiver, - ReceiverEvents, - ReceiverOptions, - Sender, - SenderEvents, - SenderOptions, -} from "rhea" +import { Connection, Container, create_container } from "rhea" +import { closeConnection, openConnection, openManagement } from "../../support/rhea_utils.js" -describe("Rhea tests", () => { +describe("Rhea connections", () => { let container: Container let connection: Connection @@ -23,11 +12,11 @@ describe("Rhea tests", () => { }) afterEach(async () => { - await close(connection) + await closeConnection(connection) }) test("create a connection", async () => { - connection = await open(container, { + connection = await openConnection(container, { host, port, username, @@ -40,7 +29,7 @@ describe("Rhea tests", () => { }) test("connect to the management", async () => { - connection = await open(container, { + connection = await openConnection(container, { host, port, username, @@ -48,85 +37,7 @@ describe("Rhea tests", () => { }) await eventually(async () => { - await openSender(connection) - await openReceiver(connection) + await openManagement(connection) }, 4000) }) }) - -async function open(container: Container, params: ConnectionOptions): Promise { - return new Promise((res, rej) => { - container.once(ConnectionEvents.connectionOpen, (context) => { - return res(context.connection) - }) - container.once(ConnectionEvents.error, (context) => { - return rej(context.connection.error) - }) - container.connect(params) - }) -} - -async function close(connection: Connection): Promise { - return new Promise((res, rej) => { - connection.once(ConnectionEvents.connectionClose, () => { - res() - }) - connection.once(ConnectionEvents.connectionError, (context) => { - rej(new Error("Connection error: " + context.connection.error)) - }) - connection.close() - }) -} - -const MANAGEMENT_NODE_CONFIGURATION: SenderOptions | ReceiverOptions = { - snd_settle_mode: 1, - rcv_settle_mode: 0, - name: "management-link-pair", - target: { address: "/management", expiry_policy: "LINK_DETACH", timeout: 0, dynamic: false }, - source: { address: "/management", expiry_policy: "LINK_DETACH", timeout: 0, dynamic: false, durable: 0 }, - properties: { paired: true }, -} - -async function openReceiver(connection: Connection) { - return openLink( - connection, - ReceiverEvents.receiverOpen, - ReceiverEvents.receiverError, - connection.open_receiver.bind(connection), - MANAGEMENT_NODE_CONFIGURATION - ) -} - -async function openSender(connection: Connection) { - return openLink( - connection, - SenderEvents.senderOpen, - SenderEvents.senderError, - connection.open_sender.bind(connection), - MANAGEMENT_NODE_CONFIGURATION - ) -} - -type LinkOpenEvents = SenderEvents.senderOpen | ReceiverEvents.receiverOpen -type LinkErrorEvents = SenderEvents.senderError | ReceiverEvents.receiverError -type OpenLinkMethods = - | ((options?: SenderOptions | string) => Sender) - | ((options?: ReceiverOptions | string) => Receiver) - -async function openLink( - connection: Connection, - successEvent: LinkOpenEvents, - errorEvent: LinkErrorEvents, - openMethod: OpenLinkMethods, - config?: SenderOptions | ReceiverOptions | string -): Promise { - return new Promise((res, rej) => { - connection.once(successEvent, (context) => { - return res(context.receiver || context.sender) - }) - connection.once(errorEvent, (context) => { - return rej(context.connection.error) - }) - openMethod(config) - }) -} diff --git a/test/unit/rhea/queue.test.ts b/test/unit/rhea/queue.test.ts new file mode 100644 index 0000000..17ef976 --- /dev/null +++ b/test/unit/rhea/queue.test.ts @@ -0,0 +1,81 @@ +import { afterAll, beforeAll, describe, test } from "vitest" +import { host, port, username, password } from "../../support/util.js" +import { Connection, Container, create_container, Receiver, Sender, SenderEvents } from "rhea" +import { closeConnection, openConnection, openManagement, RheaManagement } from "../../support/rhea_utils.js" + +describe("Rhea queues", () => { + let container: Container + let connection: Connection + let management: RheaManagement + + beforeAll(async () => { + container = create_container() + connection = await openConnection(container, { + host, + port, + username, + password, + }) + management = await openManagement(connection) + }) + + afterAll(async () => { + await closeConnection(connection) + }) + + test("create a queue", async () => { + await sendCreationQueueMessage(connection, management.sender, management.receiver) + + console.log("AAAAAAAAAAAAAAAAA All ok") + }) +}) + +let sent = 0 + +async function sendCreationQueueMessage(connection: Connection, sender: Sender, receiver: Receiver) { + console.log("hello") + + return new Promise((res, rej) => { + connection.once(SenderEvents.sendable, function (context) { + sent++ + console.log("sent " + sent, context.sender) + return res(true) + }) + connection.once(SenderEvents.accepted, function (context) { + console.log("all messages confirmed", context.sender) + return res(true) + }) + receiver.once("message", function (context) { + console.log("AAAAAAAAAAAAA", context) + return res(true) + }) + connection.once(SenderEvents.rejected, function (context) { + console.log("AAAAAAAAAAAAAAAA Rejected") + return rej(context.sender.error) + }) + connection.once(SenderEvents.released, function (context) { + console.log("AAAAAAAAAAAAAAAA released") + return rej(context.sender.error) + }) + connection.once(SenderEvents.modified, function (context) { + console.log("AAAAAAAAAAAAAAAA modified") + return rej(context.sender.error) + }) + connection.once(SenderEvents.senderError, function (context) { + console.log("AAAAAAAAAAAAAAAA senderError") + return rej(context.sender.error) + }) + connection.once(SenderEvents.settled, function (context) { + console.log("AAAAAAAAAAAAAAAA settled") + return rej(context.sender.error) + }) + + sender.send({ + message_id: sent, + to: `/queues/${encodeURIComponent("test-coda")}`, + reply_to: "$me", + subject: "PUT", + body: { durable: true, exclusive: true, auto_delete: true, arguments: {} }, + }) + }) +}