Skip to content

Commit

Permalink
feat: added client input
Browse files Browse the repository at this point in the history
  • Loading branch information
aarontravass committed Aug 29, 2023
1 parent c418a85 commit 63c870d
Show file tree
Hide file tree
Showing 7 changed files with 92 additions and 53 deletions.
2 changes: 1 addition & 1 deletion .husky/pre-commit
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
#!/bin/sh
. "$(dirname "$0")/_/husky.sh"

pnpm format:check && pnpm lint:check && pnpm test:coverage
pnpm format:check && pnpm lint:check && pnpm test
14 changes: 8 additions & 6 deletions package.json
Original file line number Diff line number Diff line change
Expand Up @@ -18,16 +18,18 @@
"access": "public"
},
"scripts": {
"lint:check": "eslint src/** --ext=ts",
"lint:fix": "eslint src/** --fix --ext=ts",
"build": "parcel build",
"format:check": "prettier --check src/**",
"format:fix": "prettier --write src/**",
"lint:check": "eslint src/** --ext=ts",
"lint:fix": "eslint src/** --fix --ext=ts",
"prepare": "husky install",
"watch": "parcel watch",
"build": "parcel build",
"test:dev": "vitest",
"prepublish": "pnpm build",
"test:coverage": "vitest run --coverage",
"prepublish": "pnpm build"
"test:dev": "vitest",
"test": "pnpm test:coverage",
"typecheck": "vitest typecheck --run",
"watch": "parcel watch"
},
"keywords": [
"azure",
Expand Down
21 changes: 16 additions & 5 deletions src/index.ts
Original file line number Diff line number Diff line change
Expand Up @@ -2,22 +2,21 @@ import { DequeuedMessageItem, QueueClient, QueueServiceClient } from '@azure/sto
import { HandlerFunction, QueueError, QueueOptions } from './utils'
import { QueueEventEmitter } from './events'

type QueueConnection = string | QueueServiceClient

export class AzureQueueConsumer extends QueueEventEmitter {
#options: QueueOptions
#handler: HandlerFunction
#queueClient: QueueClient
#pollingTime: number
#shouldShutdown = false
constructor(queueName: string, connectionString: string, handler: HandlerFunction, options?: QueueOptions) {
constructor(queueName: string, connection: QueueConnection, handler: HandlerFunction, options?: QueueOptions) {
super()
this.#handler = handler
this.#options = options ?? { pollingTime: 10 }
this.#options.maxTries = this.#options.maxTries ?? 4
this.#options.numberOfMessages = this.#options.numberOfMessages ?? 1
const queueServiceClient = QueueServiceClient.fromConnectionString(connectionString, {
retryOptions: { maxTries: this.#options.maxTries }
})
this.#queueClient = queueServiceClient.getQueueClient(queueName)
this.#queueClient = this.#getQueueClient(connection, queueName)
this.#pollingTime = this.#options.pollingTime
this.#createQueueAsync()
}
Expand Down Expand Up @@ -77,4 +76,16 @@ export class AzureQueueConsumer extends QueueEventEmitter {
this.#shouldShutdown = true
this.emit('queue::shutdown')
}

#getQueueClient = (connection: QueueConnection, queueName: string) => {
let queueServiceClient: QueueServiceClient
if (typeof connection == 'string') {
queueServiceClient = QueueServiceClient.fromConnectionString(connection, {
retryOptions: { maxTries: this.#options.maxTries }
})
} else {
queueServiceClient = connection
}
return queueServiceClient.getQueueClient(queueName)
}
}
35 changes: 21 additions & 14 deletions tests/consumer.test.ts
Original file line number Diff line number Diff line change
@@ -1,15 +1,16 @@
import { afterAll, afterEach, describe, expect, it, vi } from 'vitest'
import { QUEUE_MESSAGE } from './fixtures/Message'
import { AzureQueueConsumer } from '../src'
import { DequeuedMessageItem, QueueServiceClient } from '@azure/storage-queue'
import { flushPromises } from './helper/helper'
import { QueueServiceClient } from '@azure/storage-queue'
import { flushPromises, QueueService } from './helper/helper'

const queueService = new QueueService()

vi.mock('@azure/storage-queue', () => {
const QueueServiceClient = vi.fn()
QueueServiceClient.fromConnectionString = vi.fn((...args) => {
QueueServiceClient['fromConnectionString'] = vi.fn((...args) => {
const getQueueClient = vi.fn(() => {
const createIfNotExists = vi.fn(() => new Promise((resolve) => resolve('this')))
const receiveMessages = vi.fn(() => new Promise((resolve) => resolve(QUEUE_MESSAGE)))
const receiveMessages = vi.fn(() => new Promise((resolve) => resolve(queueService.fetchMessages(2))))
const deleteMessage = vi.fn(
(messageId: string, popReceipt: string) => new Promise((resolve) => resolve(messageId))
)
Expand Down Expand Up @@ -44,7 +45,7 @@ describe('azure queue tests', () => {
const errorToThrow = { code: 'code', message: 'message' }

try {
vi.spyOn(QueueServiceClient, 'fromConnectionString').mockImplementation((...args) => {
vi.spyOn(QueueServiceClient, 'fromConnectionString').mockImplementation((...args: string[]) => {
const getQueueClient = vi.fn(() => {
const createIfNotExists = vi.fn(() => new Promise((resolve, reject) => reject(errorToThrow)))

Expand All @@ -68,7 +69,7 @@ describe('azure queue tests', () => {
listener.listen()
await flushPromises()
expect(handler).toHaveBeenCalledTimes(1)
expect(handler).toHaveBeenCalledWith(QUEUE_MESSAGE.receivedMessageItems)
expect(handler).toHaveBeenCalledWith(queueService.fetchMessages(2).receivedMessageItems)
})
describe('listener event tests', () => {
it('should emit event when message is received', async () => {
Expand All @@ -80,7 +81,7 @@ describe('azure queue tests', () => {

await flushPromises()
expect(handler).toBeCalledTimes(1)
expect(handler).toHaveBeenCalledWith(QUEUE_MESSAGE)
expect(handler).toHaveBeenCalledWith(queueService.fetchMessages(2))
})
it('should emit event when handler completes execution', async () => {
expect.assertions(2)
Expand Down Expand Up @@ -120,13 +121,13 @@ describe('azure queue tests', () => {

expect(handler).toHaveBeenNthCalledWith(
1,
QUEUE_MESSAGE.receivedMessageItems[0].messageId,
QUEUE_MESSAGE.receivedMessageItems[0].popReceipt
queueService.fetchMessages(2).receivedMessageItems[0].messageId,
queueService.fetchMessages(2).receivedMessageItems[0].popReceipt
)
expect(handler).toHaveBeenNthCalledWith(
2,
QUEUE_MESSAGE.receivedMessageItems[1].messageId,
QUEUE_MESSAGE.receivedMessageItems[1].popReceipt
queueService.fetchMessages(2).receivedMessageItems[1].messageId,
queueService.fetchMessages(2).receivedMessageItems[1].popReceipt
)
expect(handler).toHaveBeenCalledTimes(2)
})
Expand All @@ -137,8 +138,14 @@ describe('azure queue tests', () => {
listener.on('message::afterDelete', handler)
listener.listen()
await flushPromises()
expect(handler).toHaveBeenNthCalledWith(1, QUEUE_MESSAGE.receivedMessageItems[0].messageId)
expect(handler).toHaveBeenNthCalledWith(2, QUEUE_MESSAGE.receivedMessageItems[1].messageId)
expect(handler).toHaveBeenNthCalledWith(
1,
queueService.fetchMessages(2).receivedMessageItems[0].messageId
)
expect(handler).toHaveBeenNthCalledWith(
2,
queueService.fetchMessages(2).receivedMessageItems[1].messageId
)
expect(handler).toHaveBeenCalledTimes(2)
})
})
Expand Down
26 changes: 0 additions & 26 deletions tests/fixtures/Message.ts

This file was deleted.

45 changes: 45 additions & 0 deletions tests/helper/helper.ts
Original file line number Diff line number Diff line change
@@ -1 +1,46 @@
import { DequeuedMessageItem, QueueCreateIfNotExistsResponse, QueueReceiveMessageResponse } from '@azure/storage-queue'

export class QueueService {
readonly messages: DequeuedMessageItem[] = [
{
messageId: 'abcderhbub',
insertedOn: new Date(),
expiresOn: new Date(),
popReceipt: '#6788888',
nextVisibleOn: new Date(),
dequeueCount: 0,
messageText: '{"hello":"world","a":5}'
},
{
messageId: 'gjhcvf',
insertedOn: new Date(),
expiresOn: new Date(),
popReceipt: '#1234',
nextVisibleOn: new Date(),
dequeueCount: 0,
messageText: '{"world":"hello","a":5}'
}
]
queueMessage: DequeuedMessageItem[] = []
constructor() {
this.queueMessage = this.messages
}

deleteMessage = (messageId: string) => {
this.queueMessage = this.queueMessage.filter((message) => message.messageId != messageId)
}

restoreMessages = () => {
this.queueMessage = this.messages
}

fetchMessages = (count: number): QueueReceiveMessageResponse => ({
receivedMessageItems: this.queueMessage.slice(0, Math.min(count, this.queueMessage.length)),
_response: undefined,
errorCode: undefined
})

createQueue = (): QueueCreateIfNotExistsResponse => ({ succeeded: true, _response: undefined })
}

export const flushPromises = async () => await new Promise(process.nextTick)
2 changes: 1 addition & 1 deletion vite.config.ts
Original file line number Diff line number Diff line change
Expand Up @@ -12,4 +12,4 @@ export default defineConfig({
exclude: ['dist/**', 'tests/**']
}
}
})
})

0 comments on commit 63c870d

Please sign in to comment.