Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/main.yml
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ jobs:

services:
rabbitmq:
image: rabbitmq:4.1.0-management
image: rabbitmq:4.0.9-management
options: --hostname test-node --name test-node
env:
RABBITMQ_HOSTNAME: "test-node"
Expand Down
2 changes: 1 addition & 1 deletion docker-compose.yml
Original file line number Diff line number Diff line change
@@ -1,6 +1,6 @@
services:
rabbitmq-rhea:
image: rabbitmq:4.1.0-management
image: rabbitmq:4.0.9-management
container_name: rabbitmq-rhea
restart: unless-stopped
hostname: "rabbitmq"
Expand Down
2 changes: 1 addition & 1 deletion package-lock.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@
"vitest": "^3.1.3"
},
"dependencies": {
"assertion-error": "^2.0.1",
"rhea": "^3.0.4"
}
}
31 changes: 31 additions & 0 deletions src/connection.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,31 @@
import { ConnectionEvents, Connection as RheaConnection } from "rhea"

export interface Connection {
close(): Promise<boolean>
isOpen(): boolean
}

export class AmqpConnection implements Connection {
private readonly rheaConnection: RheaConnection

constructor(connection: RheaConnection) {
this.rheaConnection = connection
}

async close(): Promise<boolean> {
return new Promise((res, rej) => {
this.rheaConnection.once(ConnectionEvents.connectionClose, () => {
return res(true)
})
this.rheaConnection.once(ConnectionEvents.connectionError, (context) => {
return rej(new Error("Connection error: " + context.connection.error))
})

this.rheaConnection.close()
})
}

public isOpen(): boolean {
return this.rheaConnection.is_open()
}
}
70 changes: 70 additions & 0 deletions src/environment.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,70 @@
import { ConnectionEvents, Container, create_container } from "rhea"
import { AmqpConnection, Connection } from "./connection.js"
import { Connection as RheaConnection } from "rhea"

export interface Environment {
createConnection(): Promise<Connection>
close(): Promise<void>
}

export type EnvironmentParams = {
host: string
port: number
username: string
password: string
}

export class AmqpEnvironment implements Environment {
private readonly host: string
private readonly port: number
private readonly username: string
private readonly password: string
private readonly container: Container
private connections: Connection[] = []

constructor({ host, port, username, password }: EnvironmentParams) {
this.host = host
this.port = port
this.username = username
this.password = password
this.container = create_container()
}

async createConnection(): Promise<Connection> {
const rheaConnection = await this.openConnection()
const connection = new AmqpConnection(rheaConnection)
this.connections.push(connection)

return connection
}

private async openConnection(): Promise<RheaConnection> {
return new Promise((res, rej) => {
this.container.once(ConnectionEvents.connectionOpen, (context) => {
return res(context.connection)
})
this.container.once(ConnectionEvents.error, (context) => {
return rej(context.error ?? new Error("Connection error occurred"))
})

this.container.connect({ host: this.host, port: this.port, username: this.username, password: this.password })
})
}

async close(): Promise<void> {
await this.closeConnections()
Copy link

Copilot AI May 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

[nitpick] close clears the tracked connections but does not remove any event listeners from this.container; consider disposing or removing listeners to prevent memory leaks in long-lived processes.

Suggested change
await this.closeConnections()
await this.closeConnections()
this.cleanupListeners.forEach((cleanup) => cleanup())
this.cleanupListeners.length = 0

Copilot uses AI. Check for mistakes.

this.connections = []
}

private async closeConnections(): Promise<void> {
await Promise.allSettled(
this.connections.map(async (c) => {
if (c.isOpen()) await c.close()
})
)
}
}

export function createEnvironment(params: EnvironmentParams): Environment {
return new AmqpEnvironment(params)
}
32 changes: 32 additions & 0 deletions test/e2e/environment.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
import { afterEach, beforeEach, describe, test } from "vitest"
import { use, expect } from "chai"
import chaiAsPromised from "chai-as-promised"
import { createEnvironment, Environment } from "../../src/environment.js"
import { host, port, username, password, numberOfConnections, eventually } from "../support/util.js"

use(chaiAsPromised)

describe("Environment", () => {
let environment: Environment

beforeEach(async () => {
environment = createEnvironment({
host,
port,
username,
password,
})
})

afterEach(async () => {
await environment.close()
})

test("create a connection through the environment", async () => {
await environment.createConnection()

await eventually(async () => {
expect(await numberOfConnections()).to.eql(1)
})
})
})
4 changes: 2 additions & 2 deletions test/e2e/management.test.ts
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ import chaiAsPromised from "chai-as-promised"

use(chaiAsPromised)

describe("Management", () => {
describe.skip("Management", () => {
Copy link

Copilot AI May 28, 2025

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The describe.skip call on the entire suite prevents all tests from running; if you intend to skip only specific tests, remove skip from the suite and apply it to individual test calls.

Suggested change
describe.skip("Management", () => {
describe("Management", () => {

Copilot uses AI. Check for mistakes.

let management: Management

beforeEach(() => {
Expand All @@ -17,7 +17,7 @@ describe("Management", () => {
management.close()
})

test.skip("create a queue through the management", async () => {
test("create a queue through the management", async () => {
const queue = management.queue("test-coda").exclusive(true).autoDelete(true).declare()

expect(await existsQueue(queue.name)).to.eql(true)
Expand Down
50 changes: 47 additions & 3 deletions test/support/util.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,33 @@
import { inspect } from "util"
import got, { Response } from "got"
import { expect } from "chai"
import { AssertionError } from "assertion-error"

export type ConnectionInfoResponse = {
name: string
}

export type QueueInfoResponse = {
name: string
}

const host = process.env.RABBITMQ_HOSTNAME ?? "localhost"
const managementPort = 15672
const vhost = encodeURIComponent("/")
export const host = process.env.RABBITMQ_HOSTNAME ?? "localhost"
export const port = parseInt(process.env.RABBITMQ_PORT ?? "5672")
export const managementPort = 15672
export const vhost = encodeURIComponent("/")
export const username = process.env.RABBITMQ_USER ?? "rabbit"
export const password = process.env.RABBITMQ_PASSWORD ?? "rabbit"

export async function numberOfConnections(): Promise<number> {
const response = await got.get<ConnectionInfoResponse[]>(`http://${host}:${managementPort}/api/connections`, {
username,
password,
responseType: "json",
})

return response.body.length
}

export async function existsQueue(queueName: string): Promise<boolean> {
const response = await getQueueInfo(queueName)

Expand All @@ -34,3 +51,30 @@ async function getQueueInfo(queue: string): Promise<Response<QueueInfoResponse>>

return response
}

export async function wait(ms: number) {
return new Promise((res) => {
setTimeout(() => res(true), ms)
})
}

export function elapsedFrom(from: number): number {
return Date.now() - from
}

// eslint-disable-next-line @typescript-eslint/no-unsafe-function-type
export async function eventually(fn: Function, timeout = 5000) {
const start = Date.now()
while (true) {
try {
await fn()
return
} catch (error) {
if (elapsedFrom(start) > timeout) {
if (error instanceof AssertionError) throw error
expect.fail(error instanceof Error ? error.message : String(error))
}
await wait(5)
}
}
}
136 changes: 136 additions & 0 deletions test/unit/rhea/connection.test.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
import { afterEach, beforeEach, describe, test } from "vitest"
import { use, expect } from "chai"
import chaiAsPromised from "chai-as-promised"
import { host, port, username, password, numberOfConnections, eventually } from "../../support/util.js"
import {
Connection,
ConnectionEvents,
ConnectionOptions,
Container,
create_container,
Receiver,
ReceiverEvents,
ReceiverOptions,
Sender,
SenderEvents,
SenderOptions,
} from "rhea"

use(chaiAsPromised)

describe("Rhea tests", () => {
let container: Container
let connection: Connection

beforeEach(async () => {
container = create_container()
})

afterEach(async () => {
await close(connection)
})

test("create a connection", async () => {
connection = await open(container, {
host,
port,
username,
password,
})

await eventually(async () => {
expect(await numberOfConnections()).to.eql(1)
})
})

test("connect to the management", async () => {
connection = await open(container, {
host,
port,
username,
password,
})

await eventually(async () => {
await openSender(connection)
await openReceiver(connection)
}, 4000)
})
})

async function open(container: Container, params: ConnectionOptions): Promise<Connection> {
return new Promise((res, rej) => {
container.once(ConnectionEvents.connectionOpen, (context) => {
return res(context.connection)
})
container.once(ConnectionEvents.error, (context) => {
return rej(context.connection.error)
})
container.connect(params)
})
}

async function close(connection: Connection): Promise<void> {
return new Promise((res, rej) => {
connection.once(ConnectionEvents.connectionClose, () => {
res()
})
connection.once(ConnectionEvents.connectionError, (context) => {
rej(new Error("Connection error: " + context.connection.error))
})
connection.close()
})
}

const MANAGEMENT_NODE_CONFIGURATION: SenderOptions | ReceiverOptions = {
snd_settle_mode: 1,
rcv_settle_mode: 0,
name: "management-link-pair",
target: { address: "/management", expiry_policy: "LINK_DETACH", timeout: 0, dynamic: false },
source: { address: "/management", expiry_policy: "LINK_DETACH", timeout: 0, dynamic: false, durable: 0 },
properties: { paired: true },
}

async function openReceiver(connection: Connection) {
return openLink(
connection,
ReceiverEvents.receiverOpen,
ReceiverEvents.receiverError,
connection.open_receiver.bind(connection),
MANAGEMENT_NODE_CONFIGURATION
)
}

async function openSender(connection: Connection) {
return openLink(
connection,
SenderEvents.senderOpen,
SenderEvents.senderError,
connection.open_sender.bind(connection),
MANAGEMENT_NODE_CONFIGURATION
)
}

type LinkOpenEvents = SenderEvents.senderOpen | ReceiverEvents.receiverOpen
type LinkErrorEvents = SenderEvents.senderError | ReceiverEvents.receiverError
type OpenLinkMethods =
| ((options?: SenderOptions | string) => Sender)
| ((options?: ReceiverOptions | string) => Receiver)

async function openLink(
connection: Connection,
successEvent: LinkOpenEvents,
errorEvent: LinkErrorEvents,
openMethod: OpenLinkMethods,
config?: SenderOptions | ReceiverOptions | string
): Promise<Sender | Receiver> {
return new Promise((res, rej) => {
connection.once(successEvent, (context) => {
return res(context.receiver || context.sender)
})
connection.once(errorEvent, (context) => {
return rej(context.connection.error)
})
openMethod(config)
})
}