Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We鈥檒l occasionally send you account related emails.

Already on GitHub? Sign in to your account

new: add ability for subscriber to respond back to publication AFTER processing #60

Open
wants to merge 14 commits into
base: master
from

Conversation

Projects
None yet
2 participants
@aaslamin
Copy link
Contributor

commented Jun 6, 2019

This PR should be reviewed/merged AFTER #59


Context

This builds on #59 by modifying the NATS Subscribe implementation to consume the new ResponseMode attribute serialized into the Publication message

High-level workflow

  • Subscriber subscribes to a subject they are interested in receiving Publications for

  • When Subscriber receives a Publication, it checks whether a response was even expected (_by examining the raw NATS message's Reply field, then it examines the ResponseMode attribute of the publication to determine the type of response the client is expecting:

    • ACK response: subscriber responds right away prior to sending the publication to the pubs channel
    • Publication response: a response channel is created and then a new goroutine is spawned waiting to read of this channel. The client code consuming Bahamut for subscribing would then use a new method Reply to respond back to the received publication. For example:
	pubsCh := make(chan *bahamut.Publication)
	errsCh := make(chan error)

	disco := pubsub.Subscribe(pubsCh, errsCh, "some topic of interest", bahamut.NATSOptSubscribeQueue("main"))

	for {
		select {
		case p := <-pubsCh:
			go func(pub *bahamut.Publication) {

				// do some processing...

				// whenever you are ready, respond to the publication
				response := bahamut.NewPublication("")
				response.Data = []byte("this is my response!")
				pub.Reply(response)

			}(p)
		case err := <-errsCh:
			fmt.Printf("received an error: %+v", err)
		}
	}

馃挜 Breaking change(s)

  • Removed the NATSOptSubscribeReplyer option as clients should now use the new request-reply workflow.

TODO:

  • get feedback
  • unit tests
Show resolved Hide resolved pubsub_nats.go
Show resolved Hide resolved pubsub_nats.go Outdated

@aaslamin aaslamin requested review from primalmotion and t00f Jun 6, 2019

@aaslamin

This comment has been minimized.

Copy link
Contributor Author

commented Jun 6, 2019

An idea for Publish - the client making the request should be able to relay to the subscriber how long it has to process the publication if it is using the RequestMode waitForPublication. This can be a new field in Publication, for example ResponseDeadline.

On the subscriber side, it can then use this value to setup cancellation signals to save resources if the processing of a publication has taken too long.

@aaslamin aaslamin force-pushed the nats-pub-sub-subscribe-respond branch from 8ff1336 to 47bd5fc Jun 6, 2019

Show resolved Hide resolved publication.go
Show resolved Hide resolved publication.go
Show resolved Hide resolved pubsub_nats.go Outdated
Show resolved Hide resolved pubsub_nats.go
Show resolved Hide resolved pubsub_nats.go Outdated

@aaslamin aaslamin force-pushed the nats-pub-sub-subscribe-respond branch from 47bd5fc to de3d8a2 Jun 11, 2019

publication.ResponseMode = ResponseModeNone
}

publication.ResponseMode = config.desiredResponse

This comment has been minimized.

Copy link
@aaslamin

aaslamin Jun 11, 2019

Author Contributor

wow, that was fail 馃う鈥嶁檪

aaslamin added some commits Jun 6, 2019

new: add ability for subscriber to respond back to publication AFTER 鈥
鈥t has processed the received publication

Signed-off-by: Amir Aslaminejad <amir.a@aporeto.com>
edit: provide constructor for setting up default natsSubscribeConfig 鈥
鈥alues

Signed-off-by: Amir Aslaminejad <amir.a@aporeto.com>
馃挜 - removed NATSOptSubscribeReplyer option as clients can now reply t鈥
鈥 a publication

Signed-off-by: Amir Aslaminejad <amir.a@aporeto.com>
simplify assignment :facepalm:
Signed-off-by: Amir Aslaminejad <amir.a@aporeto.com>

@aaslamin aaslamin force-pushed the nats-pub-sub-subscribe-respond branch from 3d89982 to 5b6d501 Jun 11, 2019

@aaslamin aaslamin changed the base branch from distinguish-between-ack-reply-requests to master Jun 11, 2019

@aaslamin aaslamin requested a review from primalmotion Jun 11, 2019

@aaslamin

This comment has been minimized.

Copy link
Contributor Author

commented Jun 11, 2019

/build - automatically fired by gogo with following PRs and commit SHAs v1.0.0

[
  {
    "project": "",
    "component": "bahamut",
    "pr-id": "60",
    "commit-sha": "5b6d5010bc72c7c55f0f871fa0e1a1ee3abf860a"
  }
]
@codecov

This comment has been minimized.

Copy link

commented Jun 11, 2019

Codecov Report

Merging #60 into master will decrease coverage by 0.11%.
The diff coverage is 89.47%.

Impacted file tree graph

@@            Coverage Diff            @@
##           master     #60      +/-   ##
=========================================
- Coverage   88.31%   88.2%   -0.12%     
=========================================
  Files          28      28              
  Lines        2156    2178      +22     
=========================================
+ Hits         1904    1921      +17     
- Misses        224     229       +5     
  Partials       28      28
Impacted Files Coverage 螖
publication.go 97.1% <100%> (+0.67%) 猬嗭笍
pubsub_nats_options.go 90.19% <60%> (-5.64%) 猬囷笍
pubsub_nats.go 56.91% <90%> (+1.35%) 猬嗭笍
rest_server.go 59.45% <0%> (-0.41%) 猬囷笍
rest_server_helpers.go 94.44% <0%> (-0.16%) 猬囷笍

Continue to review full report at Codecov.

Legend - Click here to learn more
螖 = absolute <relative> (impact), 酶 = not affected, ? = missing data
Powered by Codecov. Last update d214026...32af45f. Read the comment docs.

@aaslamin

This comment has been minimized.

Copy link
Contributor Author

commented Jun 11, 2019

@primalmotion could you take another looksy 馃檹

If the latest changes look A-OK to you, I will start adding test coverage

add stronger assertions to ensure we fail if an error was expected
Signed-off-by: Amir Aslaminejad <amir.a@aporeto.com>
Show resolved Hide resolved pubsub_nats.go Outdated
Show resolved Hide resolved pubsub_nats.go Outdated
Show resolved Hide resolved pubsub_nats.go Outdated
Show resolved Hide resolved pubsub_nats_options.go Outdated
Show resolved Hide resolved pubsub_nats_options.go Outdated

aaslamin added some commits Jun 12, 2019

ensure that the response is not nil
Signed-off-by: Amir Aslaminejad <amir.a@aporeto.com>
new: send error to error channel if responding with an ACK fails
Signed-off-by: Amir Aslaminejad <amir.a@aporeto.com>
edit: bump replyTimeout to 60 seconds
Signed-off-by: Amir Aslaminejad <amir.a@aporeto.com>

@aaslamin aaslamin requested a review from primalmotion Jun 12, 2019

aaslamin added some commits Jun 12, 2019

fix: update comment copy to remove outdated info
Signed-off-by: Amir Aslaminejad <amir.a@aporeto.com>
new: add test coverage for scenario: should NOT receive anything in p鈥
鈥blication channel if responding back with an ACK fails

Signed-off-by: Amir Aslaminejad <amir.a@aporeto.com>

@aaslamin aaslamin changed the title WIP - add ability for subscriber to respond back to publication AFTER processing new: add ability for subscriber to respond back to publication AFTER processing Jun 12, 2019

@aaslamin

This comment has been minimized.

Copy link
Contributor Author

commented Jun 13, 2019

Been plumbing some test coverage for the new behaviour, will need to cover a few more cases to be at ~100% coverage.

Feel free to take an early peek at the two that have been added so far.

new: add test coverage for scenario: should be able to respond back t鈥
鈥 a publication manually

Signed-off-by: Amir Aslaminejad <amir.a@aporeto.com>

@aaslamin aaslamin force-pushed the nats-pub-sub-subscribe-respond branch from 93c9976 to e196254 Jun 13, 2019

new: add test for scenario: should receive error in errors channel if鈥
鈥 publishing manual response fails for whatever reason

Signed-off-by: Amir Aslaminejad <amir.a@aporeto.com>

@aaslamin aaslamin marked this pull request as ready for review Jun 14, 2019

@aaslamin

This comment has been minimized.

Copy link
Contributor Author

commented Jun 14, 2019

Alright, this is basically ready for review.

I am missing one last test case scenario for covering the behaviour of the NATSOptSubscribeReplyTimeout option which I will add tomorrow.

@primalmotion

This comment has been minimized.

Copy link
Member

commented Jun 14, 2019

of course you will need a ft with backend using this pr

@aaslamin

This comment has been minimized.

Copy link
Contributor Author

commented Jun 14, 2019

/build - automatically fired by gogo with following PRs and commit SHAs v1.0.0

[
  {
    "project": "bahamut-pubsub-subscribe",
    "component": "bahamut",
    "pr-id": "60",
    "commit-sha": "31e47cd93b0ad66454ec8fa2c42d8bb116b62fe3"
  },
  {
    "project": "bahamut-pubsub-subscribe",
    "component": "backend",
    "pr-id": "432",
    "commit-sha": "780ac89083fa1c8a1505e0100fd1a9e7cdd095ec"
  }
]
@aaslamin

This comment has been minimized.

Copy link
Contributor Author

commented Jun 14, 2019

aaslamin added some commits Jun 15, 2019

new: add logic to cover case where client code calls Reply more than 鈥
鈥nce for a given publication - in this case we want to error as the message handler will no longer be reading off the channel

Signed-off-by: Amir Aslaminejad <amir.a@aporeto.com>
new: add test coverage for the Publication Reply method
Signed-off-by: Amir Aslaminejad <amir.a@aporeto.com>
@aaslamin

This comment has been minimized.

Copy link
Contributor Author

commented Jun 15, 2019

/build - automatically fired by gogo with following PRs and commit SHAs v1.0.0

[
  {
    "project": "",
    "component": "bahamut",
    "pr-id": "60",
    "commit-sha": "32af45fcfb1fe2d52e5579ac49da3362ed744784"
  }
]
@aaslamin

This comment has been minimized.

Copy link
Contributor Author

commented Jun 15, 2019

@primalmotion please check the last two commits - new logic to cover edge case where client code calls Reply more than once (definitely a bug on the client code!) on a publication that is expecting a response.

In this case, we want all subsequent calls to Reply on that publication to return an error because the goroutine that is reading off the reply channel will no longer be active as it would have read the first value sent to it on the first call. Without this logic, the next call to Reply will block as it will be attempting to send to an unbuffered channel with no receiver.

馃挴% test coverage on Reply has been added as well.

@aaslamin aaslamin requested a review from primalmotion Jun 15, 2019

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
You can鈥檛 perform that action at this time.