Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add methods to wait for flushing #7

Merged
merged 6 commits into from
May 9, 2019
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
39 changes: 39 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -47,3 +47,42 @@ myServer.run({
})
})
```

## Docs :

### `const server = new SQSServer()`

Create a fake SQS server

### `server.run(opts, cb)`

- `opts.port` ; defaults to 0
- `opts.host` ; defaults to `localhost`

Starts the server. `cb` get's called once listening on a port.

### `server.getQueue()`

Returns the current array of items queued in SQS. These are shaped
like aws SQS objects.

### `server.waitForMessages(count, listener)`

Get notified once N messages have in total been sent to this fake SQS.

`listener` is called once.

### `server.waitForFlush(listener)`

Get notified when the number of pending messages in the SQS
queue is zero.
juliangruber marked this conversation as resolved.
Show resolved Hide resolved

This can be used with `waitForMessages()` to first wait for N
messages to be send and then wait for them to have been received
and deleted from the queue.

`listener` is called once.

### `server.close(cb)`

Closes the underlying http server.
207 changes: 165 additions & 42 deletions lib/__tests__/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -136,60 +136,114 @@ describe('web server', () => {
})
})

function TestServer () {
this.server = new SimplyImitatedSQSHttpServer()
this.sqs = new AWS.SQS({
region: 'us-east-1',
sslEnabled: false,
accessKeyId: '123',
secretAccessKey: 'abc',
apiVersion: '2012-11-05'
})

this.queueUrl = null
}

TestServer.prototype.bootstrap = function bootstrap (cb) {
this.server.run({
port: 0
}, (err) => {
if (err) return cb(err)

this.queueUrl = `http://${this.server.hostPort}`
cb(null)
})
}

TestServer.prototype.sendMessage = function sendMessage (body, cb) {
this.sqs.sendMessage({
QueueUrl: this.queueUrl,
MessageBody: body,
MessageAttributes: {
ProcessEnv: {
DataType: 'String',
StringValue: 'development'
},
ProcessApp: {
DataType: 'String',
StringValue: 'my-app'
}
}
}, cb)
}

TestServer.prototype.receive = function receive (cb) {
this.sqs.receiveMessage({
QueueUrl: this.queueUrl,
MaxNumberOfMessages: 1,
MessageAttributeNames: [
'ProcessEnv', 'ProcessApp'
]
}, cb)
}

TestServer.prototype.receiveAndDelete = function receiveAndDelete (handle) {
this.sqs.receiveMessage({
QueueUrl: this.queueUrl,
MaxNumberOfMessages: 1,
MessageAttributeNames: [
'ProcessEnv', 'ProcessApp'
]
}, (err, messages) => {
if (err) return handle(err)

this.deleteMsg(messages.Messages[0], (err2) => {
if (err2) return handle(err2)

handle(null, messages)
})
})
}

TestServer.prototype.deleteMsg = function deleteMsg (message, cb) {
this.sqs.deleteMessage({
QueueUrl: this.queueUrl,
ReceiptHandle: message.ReceiptHandle
}, cb)
}

TestServer.prototype.close = function close (cb) {
this.server.close(cb)
}

TestServer.prototype.waitForFlush = function waitForFlush (l) {
this.server.waitForFlush(l)
}

TestServer.prototype.waitForMessages = function waitForMessages (n, l) {
this.server.waitForMessages(n, l)
}

describe('run() with attributes', () => {
test('calling out to fake SQS with aws', (done) => {
const myServer = new SimplyImitatedSQSHttpServer()
const testServer = new TestServer()

const sqs = new AWS.SQS({
region: 'us-east-1',
sslEnabled: false,
accessKeyId: '123',
secretAccessKey: 'abc',
apiVersion: '2012-11-05'
})
myServer.run({
port: 0
}, onServerStart)
testServer.bootstrap(onServerStart)

function onServerStart (err) {
if (err) throw err
expect(!err).toBe(true)

const queueUrl = `http://` + myServer.hostPort
sqs.sendMessage({
QueueUrl: queueUrl,
MessageBody: 'my message',
MessageAttributes: {
ProcessEnv: {
DataType: 'String',
StringValue: 'development'
},
ProcessApp: {
DataType: 'String',
StringValue: 'my-app'
}
}
}, onMessagePublished)
testServer.sendMessage('my message', onMessagePublished)
}

function onMessagePublished (err) {
if (err) {
console.log('err', err)
}
expect(!err).toBe(true)

const queueUrl = `http://` + myServer.hostPort
sqs.receiveMessage({
QueueUrl: queueUrl,
MaxNumberOfMessages: 1,
MessageAttributeNames: [
'ProcessEnv', 'ProcessApp'
]
}, onMessageReceive)
testServer.receive(onMessageReceive)
}

function onMessageReceive (err, response) {
if (err) {
console.error('err', err)
}
expect(!err).toBe(true)

const messages = response.Messages
expect(messages.length).toBe(1)
Expand All @@ -203,7 +257,76 @@ describe('web server', () => {
msg.MessageAttributes.ProcessApp.StringValue
).toBe('my-app')

myServer.close(done)
testServer.close(done)
}
})

test('publish and waitFor Messages', (done) => {
const testServer = new TestServer()
let pendingMessages = ['my message1', 'my message2', 'my message3']
let flushed = false

testServer.bootstrap(onServerStart)

function onServerStart (err) {
expect(!err).toBe(true)

testServer.sendMessage('my message1', noop)
testServer.sendMessage('my message2', noop)
testServer.sendMessage('my message3', noop)

testServer.waitForMessages(3, onMessages)
}

function onMessages () {
testServer.waitForFlush(() => {
flushed = true
})

testServer.receiveAndDelete(onMsg1)
}

function onMsg1 (err, messages) {
expect(!err).toBe(true)

expect(messages.Messages.length).toBe(1)

const index = pendingMessages.indexOf(messages.Messages[0].Body)
expect(index >= 0).toBe(true)
pendingMessages.splice(index, 1)

expect(flushed).toBe(false)
testServer.receiveAndDelete(onMsg2)
}

function onMsg2 (err, messages) {
expect(!err).toBe(true)

expect(messages.Messages.length).toBe(1)
const index = pendingMessages.indexOf(messages.Messages[0].Body)
expect(index >= 0).toBe(true)
pendingMessages.splice(index, 1)

expect(flushed).toBe(false)
testServer.receiveAndDelete(onMsg3)
}

function onMsg3 (err, messages) {
expect(!err).toBe(true)

expect(messages.Messages.length).toBe(1)
const index = pendingMessages.indexOf(messages.Messages[0].Body)
expect(index >= 0).toBe(true)
pendingMessages.splice(index, 1)

expect(pendingMessages.length).toBe(0)

expect(flushed).toBe(true)
testServer.close(done)
}

function noop (err) {
expect(!err).toBe(true)
}
})
})
Expand Down
52 changes: 41 additions & 11 deletions lib/server.js
Original file line number Diff line number Diff line change
Expand Up @@ -9,24 +9,19 @@ class SimplyImitatedSQSHttpServer {
this.server = null
this.hostPort = null
this.waiters = []
this.pendingItems = 0
this.sentItems = 0
this.sqs = new SimplyImitatedSQS()
}

// TODO: Other Action

_send (params) {
this.sqs.vsq.send(params)
this.sentItems++
var queue = this.getQueue()
var newWaiters = []
for (var i = 0; i < this.waiters.length; i++) {
var waiter = this.waiters[i]
if (queue.length >= waiter.count) {
process.nextTick(waiter.cb)
} else {
newWaiters.push(waiter)
}
}
this.waiters = newWaiters
this.pendingItems = queue.length
this.checkWaiters()
}

SendMessage (params) {
Expand Down Expand Up @@ -124,6 +119,8 @@ class SimplyImitatedSQSHttpServer {

DeleteMessage (params) {
this.sqs.vsq.delete(params.ReceiptHandle)
this.pendingItems = this.getQueue().length
this.checkWaiters()
return `<DeleteMessageResponse>
<ResponseMetadata>
<RequestId>SimplyImitatedSQS-RequestId</RequestId>
Expand Down Expand Up @@ -163,13 +160,46 @@ class SimplyImitatedSQSHttpServer {
return this.sqs.items()
}

checkWaiters () {
var newWaiters = []
for (var i = 0; i < this.waiters.length; i++) {
var waiter = this.waiters[i]
if (
waiter.type === 'send' &&
this.sentItems >= waiter.count
) {
process.nextTick(waiter.cb)
continue
}

if (
waiter.type === 'pending' &&
this.pendingItems === waiter.count
) {
process.nextTick(waiter.cb)
continue
}

newWaiters.push(waiter)
}
this.waiters = newWaiters
}

waitForMessages (count, cb) {
const q = this.getQueue()
if (q.length >= count) {
return process.nextTick(cb)
}

this.waiters.push({ count: count, cb: cb })
this.waiters.push({ type: 'send', count: count, cb: cb })
}

waitForFlush (cb) {
if (this.pendingItems === 0) {
return process.nextTick(cb)
}

this.waiters.push({ type: 'pending', count: 0, cb: cb })
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

What about instead of manually tracking the subscribers, we inherit from EventEmitter and reuse that logic?

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

because of managing the count logic using the event emitter doesn't match cleanly.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

so to get the count of waiters with type send, you'd do this.listenerCound('send')

Copy link
Owner Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

count is how many messages to wait for; not related to how many waiters or listeners there are.

Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oooh gotcha!

Copy link
Collaborator

@juliangruber juliangruber May 9, 2019

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Would still be possible with event emitters but not really an improvement any more:

const onSend = () => cb()
onSend.count = count
this.on('send', onSend)

// after send
const listeners = this.listeners('send')
for (const listener of listeners) {
  listener.count--
}

}

close (cb) {
Expand Down