Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ARTEMIS-196 Implement Consumer Priority #2490

Closed
wants to merge 1 commit into from

Conversation

michaelandrepearce
Copy link
Contributor

@franz1981 an alternative so we don't have to have a copy of CopyOnWriteArrayList, it does mean on add or remove consumer we have to invoke toArray which causes a copy, but this is not on hot path, so i think we should be good, and avoids us having to clone a jvm class.

@franz1981
Copy link
Contributor

@michaelandrepearce Nice! Will take a look today or max tomorrow 👍

@michaelandrepearce
Copy link
Contributor Author

michaelandrepearce commented Jan 7, 2019

@franz1981 just ignore class comments, theyre the originals still, ill need to change, but wanted to get to you quickly so you have chance to look over. If you think this is better ill make final tidyup bits, such as class comments and replace the real PR's branch.

--update--
just did a bit of a go at the class comments also.

@michaelandrepearce
Copy link
Contributor Author

@franz1981 did you get a chance to look, do you think this is better than original solution?

Am keen to get this feature into the next release cut.

@franz1981
Copy link
Contributor

franz1981 commented Jan 7, 2019

@michaelandrepearce sadly haven't had much time today to look into it :(
Tomorrow I have already scheduled some time in the morning to take a look into this

@michaelandrepearce
Copy link
Contributor Author

@franz1981 great thanks a million :)

Copy link
Member

@gemmellr gemmellr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Left some small comments and questions around parts of the diff (e.g I didn't look at the stuff around the queue itself, will leave to folks more familiar).

@michaelandrepearce
Copy link
Contributor Author

michaelandrepearce commented Jan 8, 2019

@gemmellr thanks for review, have updated this pr, if you could recheck the AMQPSessionCallback for me, to make sure i understood you basically we can expect any number to be representative of the priority. My understanding is an integral is a whole number, that essentially in java means any Number, but we still expect the whole number within the integer range aka -2^31 to 2^31-1. Also i added some additional tests to test for different number type representations of the integral.

As for the Openwire test case, this was a simple port over of the existing activemq5 test case as untouched as possible, i agree we could reduce the time but id rather (at least for this release 2.7.0) keep it the same, so we can be sure feature works for openwire same as activemq5.

Copy link
Contributor

@franz1981 franz1981 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm happy with this changes if compared with the previous PR, so kudos to @michaelandrepearce 👍
I think that given the new defined abstractions it is time to create some unit tests around the new collections/iterators etc etc to be sure in the future that if changed we will know how was the original contract that ATM (in the previous impl too) is only in the code.
I'm blaming myself for a recent PR that I will change after this review for this same reason I swear :P

Copy link
Member

@gemmellr gemmellr left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yep, changing to Number to cover the integral types is good.

I think there are now a few more type-system test derivatives than need be, much faster unit tests would generally be better for covering things like that. Just hitting a few key points seems fine, e.g byte, int, an unsigned. I don't think it need test floating point types, I'd consider that an impl side effect more than anything.

@michaelandrepearce
Copy link
Contributor Author

@gemmellr ok will reduce back down the extra tests i added. To those you suggest.

Also re activemq5. Fair enough, ill reduce / alter this then.

@michaelandrepearce
Copy link
Contributor Author

@franz1981 good point around adding further tests to the extracted out bits. Agree it will make everything more robust

@michaelandrepearce
Copy link
Contributor Author

@gemmellr pushed changes based on latest comments, thanks.

@michaelandrepearce
Copy link
Contributor Author

michaelandrepearce commented Jan 9, 2019

@franz1981 pushed following changes based on your comments that agree with, for others i have left comment for us to discuss.

Changes:

  1. Ensured total size of the priority collection can never exceed Integer.MAX_VALUE, by ensuring this on add, thus the edge case you were worried about of the calcSize being greater than int cannot occur. Note if someone has that many consumers, we probably want to have some discussions with them as they would be some power user!!! ;)

  2. Avoid double volatile read of changedIterator in reset method.

  3. Removed need for a cast in MultiResettableIterator

  4. made index in MultiIteratorBase private

clebertsuconic added a commit to clebertsuconic/activemq-artemis that referenced this pull request Jan 16, 2019
@clebertsuconic
Copy link
Contributor

one test failure at least: JMSMessageGroupsTest

The testsuite didn't complete.. but that one is failing for sure.

@clebertsuconic
Copy link
Contributor

I will spend some time understanding the failure and propose changes. I will communicate through here.. and if you find anything please let me know here as well.

@michaelandrepearce
Copy link
Contributor Author

michaelandrepearce commented Jan 16, 2019

Ill have a look, i know that one was working as i had run those and exlusive test when i first pr'd i assume i broke something during changes during reviews

@michaelandrepearce
Copy link
Contributor Author

michaelandrepearce commented Jan 16, 2019

@clebertsuconic ok so i know whats going on, another feature i have implemented for 2.7.0 reset of message groups, the test i had created is a little over strict and tested exact current behaviour, rather than what is just needed, that when resetting a message group, the selection on the next consumer for that group would have been the next consumer that joined. Where as actually when you reset the group in reality you just open the oppurtunity for another consumer to take the group.
Because i have reworked this PR to make things more tidy and to abstract bits, during dispatch we cycle through the consumers round robin, even if it was a consumer group, as such on group reset it is the next current positioned consumer.

Im thinking i could add a repeat method to the iteration, so when a consumer it will repeat, so return the same consumer, this would give same as before.

@michaelandrepearce
Copy link
Contributor Author

Sorted locally just extracting repeatble iterator logic as franz is keen on this

@michaelandrepearce
Copy link
Contributor Author

michaelandrepearce commented Jan 16, 2019

@clebertsuconic sorted,

just added ability in QueueConsumers to repeat (new repeatable iterator), and then if group, we tell it to repeat, that provides the original behavior, and test is passing locally.

@clebertsuconic
Copy link
Contributor

Current list of failures:

Test Name Duration Age
org.apache.activemq.artemis.tests.integration.client.AutogroupIdTest.testGroupIdAutomaticallySetMultipleProducers 37 ms 1
org.apache.activemq.artemis.tests.integration.client.MessageGroupingTest.testMultipleGroupingXACommit 47 ms 1
org.apache.activemq.artemis.tests.integration.client.MessageGroupingTest.testLoadBalanceGroups 21 ms 1
org.apache.activemq.artemis.tests.integration.client.MessageGroupingTest.testMultipleGroupingTXCommit 0.25 sec 1
org.apache.activemq.artemis.tests.integration.client.MessageGroupingTest.testMultipleGroupingConsumeHalf 0.79 sec 1
org.apache.activemq.artemis.tests.integration.client.MessageGroupingTest.testMultipleGroupingXARollback 55 ms 1
org.apache.activemq.artemis.tests.integration.client.MessageGroupingTest.testMultipleGrouping 14 ms 1
org.apache.activemq.artemis.tests.integration.client.MessageGroupingTest.testMultipleGroupingTXRollback 0.28 sec 1
org.apache.activemq.artemis.tests.integration.cluster.failover.ReplicatedMultipleServerFailoverExtraBackupsTest.testStartLiveFirst 1 min 8 sec 1
org.apache.activemq.artemis.tests.integration.jms.client.GroupingTest.testManyGroups 6 sec 1
org.apache.activemq.artemis.tests.integration.jms.client.GroupingTest.testManyGroups 6 sec 1
org.apache.activemq.artemis.tests.integration.jmx.JmxConnectionTest.testJmxConnection 1.1 sec 1
org.apache.activemq.artemis.tests.integration.journal.AIOJournalImplTest.testDoubleDelete 53 ms 1
org.apache.activemq.artemis.tests.integration.journal.AIOUnbuferedJournalImplTest.testDoubleDelete 39 ms 1
org.apache.activemq.artemis.tests.integration.journal.MappedJournalImplTest.testDoubleDelete 39 ms 1
org.apache.activemq.artemis.tests.integration.journal.MappedUnbuferedJournalImplTest.testDoubleDelete 43 ms 1
org.apache.activemq.artemis.tests.integration.journal.NIOJournalImplTest.testDoubleDelete 36 ms 1
org.apache.activemq.artemis.tests.integration.journal.NIONoBufferJournalImplTest.testDoubleDelete 18 ms 1
org.apache.activemq.artemis.tests.integration.mqtt.MqttAcknowledgementTest.testAcknowledgementQOS1 5.6 sec 1
org.apache.activemq.artemis.tests.integration.mqtt.imported.MQTTFQQNTest.testSendAndReceiveMQTTSpecial1 11 sec 1
org.apache.activemq.artemis.tests.integration.mqtt.imported.MQTTOpenwireTest.testWildcards 3.2 sec 1
org.apache.activemq.artemis.tests.integration.mqtt.imported.MQTTSecurityCRLTest.crlNotRevokedTest 5.2 sec 1
org.apache.activemq.artemis.tests.integration.mqtt.imported.MQTTTest.testSendAndReceiveAtLeastOnce 5.3 sec 1
org.apache.activemq.artemis.tests.integration.mqtt.imported.MQTTTest.testManagementQueueMessagesAreAckd 5.2 sec 1
org.apache.activemq.artemis.tests.integration.mqtt.imported.MQTTTest.testMQTTRetainQoS 7.3 sec 1
org.apache.activemq.artemis.tests.integration.mqtt.imported.MQTTTest.testSubscribeMultipleTopics 5 min 0 sec 1
org.apache.activemq.artemis.tests.integration.mqtt.imported.MQTTTest.testSendAndReceiveExactlyOnceWithInterceptors 6 sec 1
org.apache.activemq.artemis.tests.integration.mqtt.imported.MQTTTest.testSendAtLeastOnceReceiveAtMostOnce 5.4 sec 1
org.apache.activemq.artemis.tests.integration.mqtt.imported.MQTTTest.testNoMessageReceivedAfterUnsubscribeMQTT 5.4 sec 1
org.apache.activemq.artemis.tests.integration.mqtt.imported.MQTTTest.testMQTTPathPatterns 5.3 sec 1
org.apache.activemq.artemis.tests.integration.mqtt.imported.MQTTTest.testUniqueMessageIds 5.3 sec 1
org.apache.activemq.artemis.tests.integration.mqtt.imported.MQTTTest.testSendAndReceiveRetainedMessages 5.4 sec 1
org.apache.activemq.artemis.tests.integration.mqtt.imported.MQTTTest.testRetainedMessage 5.9 sec 1
org.apache.activemq.artemis.tests.integration.mqtt.imported.MQTTTest.testResendMessageId 5.3 sec 1
org.apache.activemq.artemis.tests.integration.mqtt.imported.MQTTTest.testClientConnectionFailure 11 sec 1
org.apache.activemq.artemis.tests.integration.mqtt.imported.MQTTTest.testSendAtMostOnceReceiveExactlyOnce 5.5 sec 1
org.apache.activemq.artemis.tests.integration.mqtt.imported.MQTTTest.testAnycastAddressWorksWithMQTT 3.5 sec 1
org.apache.activemq.artemis.tests.integration.mqtt.imported.MQTTTest.testAnycastPrefixWorksWithMQTT 2.4 sec 1
org.apache.activemq.artemis.tests.integration.mqtt.imported.MQTTTest.testReceiveMessageSentWhileOffline 5.4 sec 1
org.apache.activemq.artemis.tests.integration.mqtt.imported.MQTTTest.testAmbiguousRoutingWithMQTT 3.2 sec 1
org.apache.activemq.artemis.tests.integration.mqtt.imported.MQTTTest.testLinkRouteAmqpReceiveMQTT 5.6 sec 1
org.apache.activemq.artemis.tests.integration.mqtt.imported.MQTTTest.testPacketIdGeneratorNonCleanSession 2.1 sec 1
org.apache.activemq.artemis.tests.integration.mqtt.imported.MQTTTest.testWillMessageIsRetained 11 sec 1
org.apache.activemq.artemis.tests.integration.mqtt.imported.MQTTTest.testSendAndReceiveAtMostOnce 5.7 sec 1
org.apache.activemq.artemis.tests.integration.mqtt.imported.MQTTTest.testSendAtLeastOnceReceiveExactlyOnce 7.9 sec 1
org.apache.activemq.artemis.tests.integration.mqtt.imported.MQTTTest.testDuplicateSubscriptions 7.6 sec 1
org.apache.activemq.artemis.tests.integration.mqtt.imported.MQTTTest.testSendJMSReceiveMQTT 11 sec 1
org.apache.activemq.artemis.tests.integration.mqtt.imported.MQTTTest.testCleanSessionForSubscriptions 11 sec 1
org.apache.activemq.artemis.tests.integration.mqtt.imported.MQTTTest.testRetainedMessagesAreCorrectlyFormedAfterRestart 7.7 sec 1
org.apache.activemq.artemis.tests.integration.mqtt.imported.MQTTTest.testUnsubscribeMQTT 23 sec 1
org.apache.activemq.artemis.tests.integration.mqtt.imported.MQTTTest.testSendAndReceiveLargeMessages 6.4 sec 1
org.apache.activemq.artemis.tests.integration.mqtt.imported.MQTTTest.testClientConnectionFailureSendsWillMessage 15 sec 1
org.apache.activemq.artemis.tests.integration.mqtt.imported.MQTTTest.testSendAndReceiveMQTT 12 sec 1
org.apache.activemq.artemis.tests.integration.mqtt.imported.MqttClusterRemoteSubscribeTest.unsubscribeRemoteQueue 10 sec 1
org.apache.activemq.artemis.tests.integration.mqtt.imported.MqttClusterRemoteSubscribeTest.unsubscribeExistingQueue 10 sec 1
org.apache.activemq.artemis.tests.integration.mqtt.imported.MqttClusterRemoteSubscribeTest.unsubscribeRemoteQueueMultipleSubscriptions 8.7 sec 1
org.apache.activemq.artemis.tests.integration.mqtt.imported.MqttClusterRemoteSubscribeTest.unsubscribeRemoteQueueWildCard 10 sec 1
org.apache.activemq.artemis.tests.integration.mqtt.imported.MqttClusterWildcardTest.loadBalanceRequests 36 sec 1
org.apache.activemq.artemis.tests.integration.mqtt.imported.MqttClusterWildcardTest.wildcardsWithBroker1Disconnected 18 sec 1
org.apache.activemq.artemis.tests.integration.mqtt.imported.PahoMQTTTest.testLotsOfClients 39 sec 1
org.apache.activemq.artemis.tests.integration.mqtt.imported.PahoMQTTTest.testSendAndReceiveMQTT 1 min 2 sec 1
org.apache.activemq.artemis.tests.integration.openwire.cluster.TemporaryQueueClusterTest.testClusteredQueue 7.7 sec 1
org.apache.activemq.artemis.tests.integration.openwire.cluster.TemporaryQueueClusterTest.testTemporaryQueue 3.9 sec 1
org.apache.activemq.artemis.tests.integration.paging.GlobalPagingTest.testDeletePhysicalPages[storeType=FILE, mapped=true] 2.9 sec 1
org.apache.activemq.artemis.tests.integration.paging.GlobalPagingTest.testReceiveImmediate[storeType=FILE, mapped=true] 2.1 sec 1
org.apache.activemq.artemis.tests.integration.paging.PageCountSyncOnNonTXTest.testSendNoTx 57 sec 1
org.apache.activemq.artemis.tests.integration.paging.PagingTest.testPurge[storeType=FILE, mapped=false] 34 sec 1
org.apache.activemq.artemis.tests.integration.plugin.MqttPluginTest.testSendAndReceiveMQTT 15 sec 1
org.apache.activemq.artemis.tests.integration.server.ExpiryRunnerTest.testExpireConsumeHalf 2.5 sec 1
org.apache.activemq.artemis.tests.integration.stomp.StompConnectionCleanupTest.sendSTOMPReceiveMQTT[ws+v10.stomp] 11 sec 1
org.apache.activemq.artemis.tests.integration.stomp.StompConnectionCleanupTest.sendSTOMPReceiveMQTT[tcp+v10.stomp] 11 sec 1
org.apache.activemq.artemis.tests.integration.stomp.StompConnectionCleanupTest.testMultiProtocolConsumers[tcp+v10.stomp] 3.6 sec 1
org.apache.activemq.artemis.tests.integration.stomp.StompTest.sendSTOMPReceiveMQTT[ws+v10.stomp] 10 sec 1
org.apache.activemq.artemis.tests.integration.stomp.StompTest.sendSTOMPReceiveMQTT[tcp+v10.stomp] 10 sec 1
org.apache.activemq.artemis.tests.integration.stomp.StompWebSocketMaxFrameTest.testStompSendReceiveWithMaxFramePayloadLength[ws+v10.stomp] 14 sec 1
org.apache.activemq.artemis.tests.integration.stomp.StompWebSocketMaxFrameTest.testStompSendReceiveWithMaxFramePayloadLength[ws+v11.stomp] 13 sec 1
org.apache.activemq.artemis.tests.integration.stomp.StompWebSocketMaxFrameTest.testStompSendReceiveWithMaxFramePayloadLength[ws+v12.stomp] 15 sec 1

@michaelandrepearce
Copy link
Contributor Author

@clebertsuconic can you recheck this against master, i know things like testDoubleDelete are failing currently on master without this PR.

@michaelandrepearce
Copy link
Contributor Author

michaelandrepearce commented Jan 18, 2019

@clebertsuconic the message group test ones are are genuine, and i have fixed so now should be fixed up in last push.

I dont believe StompWebSocketMaxFrameTest is related, when i run this its erroring because
"io.netty.handler.codec.CorruptedFrameException: Max frame length of 65536 has been exceeded."

@michaelandrepearce
Copy link
Contributor Author

@clebertsuconic could you look at this b3f0a87#r31982567, all builds are failing since that was merged to master.....

@michaelandrepearce
Copy link
Contributor Author

michaelandrepearce commented Jan 18, 2019

@clebertsuconic once master is fixed ill rebase and if i could ask if you then could re run this?

@michaelandrepearce
Copy link
Contributor Author

@clebertsuconic fully rebased (including over the FQQN for producers stuff)

@michaelandrepearce
Copy link
Contributor Author

@clebertsuconic so as noted, the message groups ones is sorted, repeat needs to occur only if groupconsumer exist.

It does seem MQTTTest and StompWebSocketMaxFrameTest is directly broken due to this, but i cant think to why, any ideas welcome, ill continue to look into

Add consumer priority support
Includes refactor of consumer iterating in QueueImpl to its own logical class, to be able to implement.
Add OpenWire JMS Test - taken from ActiveMQ5
Add Core JMS Test
Add AMQP Test
Add Docs
@michaelandrepearce
Copy link
Contributor Author

@clebertsuconic ive found the pesky thing, in ServerSessionImpl where i had added a new parameter to the createConsumer method, the old method signature where we pass though but default the priority in the new, i had missed forwarding on the credits value, thus the issue.

Typical one liner causing lots of headache ;)

@michaelandrepearce
Copy link
Contributor Author

@clebertsuconic hows this looking now, we good to merge???

@clebertsuconic
Copy link
Contributor

Let me run one final round of testsuite. If all good will merge it tomorrow.

@clebertsuconic
Copy link
Contributor

It’s important to keep the testsuite clean. As we should release soon.

@clebertsuconic
Copy link
Contributor

@michaelandrepearce I'm pretty sure I will be able to merge this tomorrow. Testsuite had some unrelated failures. I just want to double check.. if there's any issues I'm sure i will be able to figure out.

will let you know tomorrow. I got the results pretty late after I finished. leave it with me for tomorrow first thing please?

@asfgit asfgit closed this in d7c940d Jan 23, 2019
@clebertsuconic
Copy link
Contributor

I added one change on top of yours during merge... to remove unused variables..

merged!

you are the man!

@michaelandrepearce
Copy link
Contributor Author

@clebertsuconic some of those variables are used but accesed by atomic updaters. Have commented on your commit

@clebertsuconic
Copy link
Contributor

@michaelandrepearce duh.. thanks

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
4 participants