diff --git a/src/handlers/record/record-handler.ts b/src/handlers/record/record-handler.ts index e48d06ce6..34780a166 100644 --- a/src/handlers/record/record-handler.ts +++ b/src/handlers/record/record-handler.ts @@ -150,8 +150,8 @@ export default class RecordHandler implements Handler { } if (message.action === RA.NOTIFY) { - this.services.clusterNode.send(message) this.recordUpdatedWithoutDeepstream(message, socketWrapper) + this.services.clusterNode.send(message) return } @@ -159,7 +159,8 @@ export default class RecordHandler implements Handler { } private recordUpdatedWithoutDeepstream (message: RecordMessage, socketWrapper: SocketWrapper | null = null) { - message.names!.forEach((recordName) => { + let completed = 0 + message.names!.forEach((recordName, index, names) => { if (this.subscriptionRegistry.hasLocalSubscribers(recordName)) { this.recordRequest(recordName, null, (name: string, version: number, data: JSONObject) => { if (version === -1) { @@ -177,12 +178,25 @@ export default class RecordHandler implements Handler { parsedData: data }, true, null) } - }, onRequestError, message) + + completed++ + if (completed === names.length && socketWrapper) { + socketWrapper.sendAckMessage(message) + } + }, (event: RA, errorMessage: string, name: string, socket: SocketWrapper, msg: Message) => { + completed++ + if (completed === names.length && socketWrapper) { + socketWrapper.sendAckMessage(message) + } + onRequestError(event, errorMessage, recordName, socket, msg) + }, message) + } else { + completed++ + if (completed === names.length && socketWrapper) { + socketWrapper.sendAckMessage(message) + } } }) - if (socketWrapper) { - socketWrapper.sendAckMessage(message) - } } /**