Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Add simple custom routing for task messages on AMQP broker #112

Closed
wants to merge 1 commit into from
Closed

Add simple custom routing for task messages on AMQP broker #112

wants to merge 1 commit into from

Conversation

crouguer
Copy link

@crouguer crouguer commented Jun 6, 2019

The only non-backwards-compatible behaviour change here is that the default queue name and default exchange name are set to "celery" which I believe matches with the celery's defaults (see https://celery.readthedocs.io/en/latest/userguide/configuration.html#std:setting-task_default_queue).

The routing mechanism just looks for an exact match on the specified task name. Pattern matching like the Celery does in its routes could be added to GetRoute

@bleader
Copy link

bleader commented Jun 6, 2019

Hello, I created #113 a bit after you, we actually don't aim for the same exact feature, and as I mentionned in my PR I hope we can work together to achieve a more complet set of feature. I like your approach, but it only is implemented for AMQP, and I guess it would be nice to have a DelRoute too (in my use case we actually want to target only one queue). If you have comments feel free to tag me here or in #113 and see what is @sickyoon opinion on these :)

Copy link
Member

@yoonsio yoonsio left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

looks okay but I have to test it.
can you add some tests and examples specific to your change?

amqp_broker.go Outdated
rate: 4,
routes: map[string]*Route{"default": &Route{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe it would be good idea to create defaultRoutes as separate top level variable just like http package DefaultClient

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wanted to make sure that each broker has its own default that can be modified independently, in case there are multiple brokers (see new commit with SetDefaultRoute). If you like it better with a DefaultRoute variable at the module level we'd just need to make a deep copy for each instantiated broker

amqp_broker.go Outdated Show resolved Hide resolved
@crouguer
Copy link
Author

crouguer commented Jun 6, 2019

@bleader check out the SetDefaultRoute function I added - does that handle your use case?

@bleader
Copy link

bleader commented Jun 7, 2019

The use case is indeed to have a single queue that isn't always the default, so being able to tweak the default would work, beside the fact that we're working with redis and your patches only affect the AMQP broker.

I guess what we could do is having these as part of the CeleryBroker interface and implement the methods for both broker, even though, as far as I can tell, the redis implementation doesn't use the exchange on itself. I'm unsure about how this is used in the CeleryMessage when interacting with a python celery for example.

If that's fine with you, I suggest we see with @sickyoon if what you currently have is ok, after that, I'll add the interface and the redis implementation after that.

We do need to add tests and some examples_*_test.go using these too.

@crouguer
Copy link
Author

crouguer commented Jun 7, 2019

I agree - adding AddRoute and SetDefaultRoute to the CeleryBroker interface would be good. I don't work with Redis as a broker but I'm pretty sure you are right, that there no concept of exchange or routing keys. I believe that for routes with Redis broker the exchange and routing key are just ignored.

In order to do a complete end-to-end test a possibly breaking change to the broker is required to allow it to consume from a custom queue. In #113 you added a queueName parameter to NewAMQPCeleryBrokerByConnAndChannel which would do the job, but maybe it would be better to make it a Route so that the exchange, queue and binding are all initialized? However this still a breaking change. We could use the variadic trick there to make it non-breaking.

If we don't make any change to the broker the only test would be to just check and see that the message was delivered to the correct queue without actually consuming it with something like this:

        celeryMessage, err := makeCeleryMessage()
	test := "test amqp broker custom route"

	if err != nil || celeryMessage == nil {
		t.Errorf("test '%s': failed to construct celery message: %v", test, err)
	}

	broker := amqpBroker
	broker.AddRoute("routedTask", "test", "test", "test")
	broker.SendCeleryMessage(celeryMessage)
	// wait arbitrary time for message to propagate
	time.Sleep(1 * time.Second)

	// Since the broker can't yet consume from a custom queue, just
	// directly check to make sure the message landed in the correct queue
	_, amqpchannel := NewAMQPConnection("amqp://")
	channel, err := amqpchannel.Consume("test", "", false, false, false, false, nil)
	if err != nil {
		t.Errorf("failed to start consumer on custom queue")
	}

	var taskMessage TaskMessage

	select {
	case delivery := <-channel:
		deliveryAck(delivery)
		if err := json.Unmarshal(delivery.Body, &taskMessage); err != nil {
			t.Errorf("test '%s': failed to get celery message from amqp: %v", test, err)
		}
		originalMessage := celeryMessage.GetTaskMessage()
		if !reflect.DeepEqual(taskMessage, originalMessage) {
			t.Errorf("test '%s': received message %v different from original message %v", tc.name, message, originalMessage)
		}
		releaseCeleryMessage(celeryMessage)
	default:
		t.Errorf("failed to retrieve celery message from custom queue: consuming channel empty")
		releaseCeleryMessage(celeryMessage)
	}

@bleader
Copy link

bleader commented Jun 10, 2019

Yes, a route would be better than the queue. @sickyoon isn't fond of the variadic trick (I can understand that). Note that the consumption will actually use the broker configured in the client, so it should be able to consume from it too. In the redis implementation there was a test on the queue that I actually removed in #113 because it didn't make sense with a single queue, but in the case of mulitple route as you implement it here, it could make sense to keep the test in the implementation, and to have it in the TestBrokerRedisSend() to test we do send and receive on the right route and queue.

amqp_broker.go Outdated Show resolved Hide resolved
amqp_broker.go Outdated Show resolved Hide resolved
broker_test.go Outdated Show resolved Hide resolved
@crouguer
Copy link
Author

I rounded the changes out a lot with that recent batch of commits. I hope you don't mind @bleader that I went ahead and implemented your suggested change to add the route helpers to the CeleryBroker interface and implement them on the Redis side too.

For the broker consuming side, I decided to make the SetDefaultRoute function change both the default message delivery queue and also the queue being consumed by the broker for both AMQP and Redis. So now when you SetDefaultRoute it should change the queue that is being consumed. This allows the user to change the queue being consumed and also it enables an end-to-end test on an arbitrary queue. Note that a user can still send celery messages to any arbitrary queue successfully (tested on AMQP, no test yet for Redis), but you can only consume from the one specified in the default route.

I'm not sure why the tests are timing out on travis though...

Copy link

@bleader bleader left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No problem from me that you implemented the interface, I suggested to do it simply to put less charge on your side (trying to load balance in open source projects 😛). See other comments on diff, but otherwise seems like a good starting point.

Have you tried to restart the CI later ? There was absolutely no message after starting go test which seems odd... An error could have been understandable, but nothing like that is unsettling.

redis_broker.go Outdated Show resolved Hide resolved
redis_broker.go Show resolved Hide resolved
@crouguer crouguer closed this Jun 14, 2019
@crouguer
Copy link
Author

crouguer commented Jun 14, 2019

Close and re-open (totally not by accident :)) oughtta trigger CI to re-build

@crouguer crouguer reopened this Jun 14, 2019
@m3nu
Copy link

m3nu commented Jul 27, 2019

Can I help you guys out with anything here? Looking to see this finished for selfish reasons.

Currently I'm using the less comprehensive PR from here: #44.

@crouguer
Copy link
Author

crouguer commented Aug 3, 2019

Hi @m3nu, thanks for chiming in! All the code in this PR is working fine for my purposes - I've been using it successfully since it was written. The only thing left to do before it can be merged is to get the tests working. I wrote a couple tests but one is timing out on GitHub for some reason, even though it works on the test machine I used locally. If you want to have a look at those tests and see if you can tweak them or rewrite them in a way that the runner doesn't choke, that would be great. @sickyoon may also want some further tests beyond what I added. I'll try to get back at it in a week or two if you have no luck

@yoonsio
Copy link
Member

yoonsio commented Aug 7, 2019

@crouguer It looks like tests unexpectedly hang when you try to include sub-packages even when there is none. Furthermore, vendor directory is automatically excluded starting from Go 1.9. For now, try following for .travis.yml and Makefile files.
go test -v -cover ./... -> go test -v -cover .

@yoonsio
Copy link
Member

yoonsio commented Aug 7, 2019

@crouguer can you also squash the commits to 1 - 2 commits and clean them up?

@yoonsio
Copy link
Member

yoonsio commented Aug 14, 2019

@crouguer you can simply rebase to master now.

@yoonsio yoonsio added this to In progress in v1 release with 3.x legacy support via automation Aug 14, 2019
amqp_broker.go Outdated Show resolved Hide resolved
broker_test.go Outdated Show resolved Hide resolved
broker_test.go Outdated Show resolved Hide resolved
@crouguer
Copy link
Author

crouguer commented Sep 5, 2019

@sickyoon does this look good to you?

@kamigerami
Copy link

any updates on this @sickyoon @crouguer etc...

@yoonsio
Copy link
Member

yoonsio commented Oct 21, 2019

sorry, I am swamped. will take a look in next couple of days.

@aaronn
Copy link

aaronn commented Jan 14, 2020

Any updates on this?

@crouguer crouguer closed this by deleting the head repository Jan 6, 2023
v1 release with 3.x legacy support automation moved this from In progress to Done Jan 6, 2023
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Development

Successfully merging this pull request may close these issues.

None yet

7 participants