Skip to content

Commit

Permalink
feat: send ack message when notify is actually completed
Browse files Browse the repository at this point in the history
  • Loading branch information
yasserf committed Jun 28, 2019
1 parent 01105c5 commit 46b027c
Showing 1 changed file with 20 additions and 6 deletions.
26 changes: 20 additions & 6 deletions src/handlers/record/record-handler.ts
Original file line number Diff line number Diff line change
Expand Up @@ -150,16 +150,17 @@ export default class RecordHandler implements Handler<RecordMessage> {
}

if (message.action === RA.NOTIFY) {
this.services.clusterNode.send(message)
this.recordUpdatedWithoutDeepstream(message, socketWrapper)
this.services.clusterNode.send(message)
return
}

this.services.logger.error(PARSER_ACTIONS[PARSER_ACTIONS.UNKNOWN_ACTION], RA[action], this.metaData)
}

private recordUpdatedWithoutDeepstream (message: RecordMessage, socketWrapper: SocketWrapper | null = null) {
message.names!.forEach((recordName) => {
let completed = 0
message.names!.forEach((recordName, index, names) => {
if (this.subscriptionRegistry.hasLocalSubscribers(recordName)) {
this.recordRequest(recordName, null, (name: string, version: number, data: JSONObject) => {
if (version === -1) {
Expand All @@ -177,12 +178,25 @@ export default class RecordHandler implements Handler<RecordMessage> {
parsedData: data
}, true, null)
}
}, onRequestError, message)

completed++
if (completed === names.length && socketWrapper) {
socketWrapper.sendAckMessage(message)
}
}, (event: RA, errorMessage: string, name: string, socket: SocketWrapper, msg: Message) => {
completed++
if (completed === names.length && socketWrapper) {
socketWrapper.sendAckMessage(message)
}
onRequestError(event, errorMessage, recordName, socket, msg)
}, message)
} else {
completed++
if (completed === names.length && socketWrapper) {
socketWrapper.sendAckMessage(message)
}
}
})
if (socketWrapper) {
socketWrapper.sendAckMessage(message)
}
}

/**
Expand Down

0 comments on commit 46b027c

Please sign in to comment.