Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 34 additions & 8 deletions src/connection.ts
Original file line number Diff line number Diff line change
@@ -1,31 +1,57 @@
import { ConnectionEvents, Connection as RheaConnection } from "rhea"
import { ConnectionEvents, create_container, Connection as RheaConnection } from "rhea"
import { AmqpManagement, Management } from "./management.js"
import { EnvironmentParams } from "./environment.js"

export interface Connection {
close(): Promise<boolean>
isOpen(): boolean
management(): Management
}

export class AmqpConnection implements Connection {
private readonly rheaConnection: RheaConnection
static async create(params: EnvironmentParams) {
const connection = await AmqpConnection.open(params)
const topologyManagement = await AmqpManagement.create(connection)
return new AmqpConnection(connection, topologyManagement)
}

private static async open(params: EnvironmentParams): Promise<RheaConnection> {
return new Promise((res, rej) => {
const container = create_container()
container.once(ConnectionEvents.connectionOpen, (context) => {
return res(context.connection)
})
container.once(ConnectionEvents.error, (context) => {
return rej(context.error ?? new Error("Connection error occurred"))
})

constructor(connection: RheaConnection) {
this.rheaConnection = connection
container.connect(params)
})
}

constructor(
private readonly connection: RheaConnection,
private readonly topologyManagement: Management
) {}

async close(): Promise<boolean> {
return new Promise((res, rej) => {
this.rheaConnection.once(ConnectionEvents.connectionClose, () => {
this.connection.once(ConnectionEvents.connectionClose, () => {
return res(true)
})
this.rheaConnection.once(ConnectionEvents.connectionError, (context) => {
this.connection.once(ConnectionEvents.connectionError, (context) => {
return rej(new Error("Connection error: " + context.connection.error))
})

this.rheaConnection.close()
this.connection.close()
})
}

management(): Management {
return this.topologyManagement
}

public isOpen(): boolean {
return this.rheaConnection.is_open()
return this.connection ? this.connection.is_open() : false
}
}
37 changes: 5 additions & 32 deletions src/environment.ts
Original file line number Diff line number Diff line change
@@ -1,6 +1,4 @@
import { ConnectionEvents, Container, create_container } from "rhea"
import { AmqpConnection, Connection } from "./connection.js"
import { Connection as RheaConnection } from "rhea"

export interface Environment {
createConnection(): Promise<Connection>
Expand All @@ -15,42 +13,17 @@ export type EnvironmentParams = {
}

export class AmqpEnvironment implements Environment {
private readonly host: string
private readonly port: number
private readonly username: string
private readonly password: string
private readonly container: Container
private readonly connections: Connection[] = []

constructor({ host, port, username, password }: EnvironmentParams) {
this.host = host
this.port = port
this.username = username
this.password = password
this.container = create_container()
}
constructor(
private readonly params: EnvironmentParams,
private readonly connections: Connection[] = []
) {}

async createConnection(): Promise<Connection> {
const rheaConnection = await this.openConnection()
const connection = new AmqpConnection(rheaConnection)
const connection = await AmqpConnection.create(this.params)
this.connections.push(connection)

return connection
}

private async openConnection(): Promise<RheaConnection> {
return new Promise((res, rej) => {
this.container.once(ConnectionEvents.connectionOpen, (context) => {
return res(context.connection)
})
this.container.once(ConnectionEvents.error, (context) => {
return rej(context.error ?? new Error("Connection error occurred"))
})

this.container.connect({ host: this.host, port: this.port, username: this.username, password: this.password })
})
}

async close(): Promise<void> {
await this.closeConnections()
this.connections.length = 0
Expand Down
157 changes: 151 additions & 6 deletions src/management.ts
Original file line number Diff line number Diff line change
@@ -1,16 +1,161 @@
import { AmqpQueueSpec, QueueSpec } from "./queue.js"
import { AmqpQueue, Queue, QueueOptions, QueueType } from "./queue.js"
import {
EventContext,
Receiver,
ReceiverEvents,
ReceiverOptions,
Connection as RheaConnection,
Sender,
SenderEvents,
SenderOptions,
} from "rhea"
import { AmqpEndpoints, AmqpMethods, MessageBuilder, ME } from "./message_builder.js"
import { CreateQueueResponseDecoder, DeleteQueueResponseDecoder } from "./response_decoder.js"

type LinkOpenEvents = SenderEvents.senderOpen | ReceiverEvents.receiverOpen
type LinkErrorEvents = SenderEvents.senderError | ReceiverEvents.receiverError
type OpenLinkMethods =
| ((options?: SenderOptions | string) => Sender)
| ((options?: ReceiverOptions | string) => Receiver)

const MANAGEMENT_NODE_CONFIGURATION: SenderOptions | ReceiverOptions = {
snd_settle_mode: 1,
rcv_settle_mode: 0,
name: "management-link-pair",
target: { address: "/management", expiry_policy: "LINK_DETACH", timeout: 0, dynamic: false },
source: { address: "/management", expiry_policy: "LINK_DETACH", timeout: 0, dynamic: false, durable: 0 },
properties: { paired: true },
}

export interface Management {
queue: (queueName: string) => QueueSpec
declareQueue: (queueName: string, options?: Partial<QueueOptions>) => Promise<Queue>
deleteQueue: (queueName: string) => Promise<boolean>
close: () => void
}

export class AmqpManagement implements Management {
constructor() {}
static async create(connection: RheaConnection): Promise<AmqpManagement> {
const senderLink = await AmqpManagement.openSender(connection)
const receiverLink = await AmqpManagement.openReceiver(connection)
return new AmqpManagement(connection, senderLink, receiverLink)
}

constructor(
private readonly connection: RheaConnection,
private senderLink: Sender,
private receiverLink: Receiver
) {
console.log(this.receiverLink.is_open())
}

private static async openReceiver(connection: RheaConnection): Promise<Receiver> {
return AmqpManagement.openLink<Receiver>(
connection,
ReceiverEvents.receiverOpen,
ReceiverEvents.receiverError,
connection.open_receiver.bind(connection),
MANAGEMENT_NODE_CONFIGURATION
)
}

private static async openSender(connection: RheaConnection): Promise<Sender> {
return AmqpManagement.openLink<Sender>(
connection,
SenderEvents.senderOpen,
SenderEvents.senderError,
connection.open_sender.bind(connection),
MANAGEMENT_NODE_CONFIGURATION
)
}

private static async openLink<T extends Sender | Receiver>(
connection: RheaConnection,
successEvent: LinkOpenEvents,
errorEvent: LinkErrorEvents,
openMethod: OpenLinkMethods,
config?: SenderOptions | ReceiverOptions | string
): Promise<T> {
return new Promise((res, rej) => {
connection.once(successEvent, (context) => {
return res(context.receiver || context.sender)
})
connection.once(errorEvent, (context) => {
return rej(context.connection.error)
})
openMethod(config)
})
}

close(): void {
if (this.connection.is_closed()) return

this.closeSender()
this.closeReceiver()
}

private closeSender(): void {
this.senderLink.close()
}

private closeReceiver(): void {
this.senderLink.close()
}

async declareQueue(queueName: string, options: Partial<QueueOptions> = {}): Promise<Queue> {
return new Promise((res, rej) => {
this.receiverLink.once(ReceiverEvents.message, (context: EventContext) => {
if (!context.message) {
return rej(new Error("Receiver has not received any message"))
}

const response = new CreateQueueResponseDecoder().decodeFrom(context.message, String(message.message_id))
if (response.status === "error") {
return rej(response.error)
}

close() {}
return res(new AmqpQueue(response.body))
})

queue(queueName: string): QueueSpec {
return new AmqpQueueSpec().name(queueName)
const message = new MessageBuilder()
.sendTo(`/${AmqpEndpoints.Queues}/${encodeURIComponent(queueName)}`)
.setReplyTo(ME)
.setAmqpMethod(AmqpMethods.PUT)
.setBody({
exclusive: options.exclusive ?? false,
durable: options.durable ?? false,
auto_delete: options.autoDelete ?? false,
arguments: buildArgumentsFrom(options.type, options.arguments),
})
.build()
this.senderLink.send(message)
})
}

async deleteQueue(queueName: string): Promise<boolean> {
return new Promise((res, rej) => {
this.receiverLink.once(ReceiverEvents.message, (context: EventContext) => {
if (!context.message) {
return rej(new Error("Receiver has not received any message"))
}

const response = new DeleteQueueResponseDecoder().decodeFrom(context.message, String(message.message_id))
if (response.status === "error") {
return rej(response.error)
}

return res(true)
})

const message = new MessageBuilder()
.sendTo(`/${AmqpEndpoints.Queues}/${encodeURIComponent(queueName)}`)
.setReplyTo(ME)
.setAmqpMethod(AmqpMethods.DELETE)
.build()
this.senderLink.send(message)
})
}
}

function buildArgumentsFrom(queueType?: QueueType, queueOptions?: Record<string, string>) {
return { ...(queueOptions ?? {}), ...(queueType ? { "x-queue-type": queueType } : {}) }
}
58 changes: 58 additions & 0 deletions src/message_builder.ts
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
import { generate_uuid, Message } from "rhea"

export enum AmqpMethods {
PUT = "PUT",
DELETE = "DELETE",
GET = "GET",
}

export enum AmqpEndpoints {
Queues = "queues",
}

export const ME = "$me"

export class MessageBuilder {
private messageId: string = generate_uuid()
private to: string = ""
private replyTo: string = ME
private method: AmqpMethods = AmqpMethods.GET
private body: unknown

constructor() {}

setMessageId(id: string) {
this.messageId = id
return this
}

sendTo(to: string) {
this.to = to
return this
}

setReplyTo(replyTo: string) {
this.replyTo = replyTo
return this
}

setAmqpMethod(method: AmqpMethods) {
this.method = method
return this
}

setBody(body: unknown) {
this.body = body
return this
}

build(): Message {
return {
message_id: this.messageId,
to: this.to,
reply_to: this.replyTo,
subject: this.method,
body: this.body,
}
}
}
Loading