Skip to content

Commit aca4c1c

Browse files
lhotarisrinath-ctds
authored andcommitted
[fix][misc] Fix compareTo contract violation for NamespaceBundleStats, TimeAverageMessageData and ResourceUnitRanking (apache#24772)
(cherry picked from commit d6b00f9)
1 parent 514b6d0 commit aca4c1c

File tree

9 files changed

+304
-47
lines changed

9 files changed

+304
-47
lines changed

pulsar-broker/src/main/java/org/apache/pulsar/broker/loadbalance/impl/SimpleLoadManagerImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -813,7 +813,7 @@ private synchronized ResourceUnit findBrokerForPlacement(Multimap<Long, Resource
813813
minLoadPercentage = loadPercentage;
814814
} else {
815815
if ((unboundedRanks ? ranking.compareMessageRateTo(selectedRanking)
816-
: ranking.compareTo(selectedRanking)) < 0) {
816+
: ranking.compareToOtherRanking(selectedRanking)) < 0) {
817817
minLoadPercentage = loadPercentage;
818818
selectedRU = candidate;
819819
selectedRanking = ranking;

pulsar-broker/src/test/java/org/apache/pulsar/broker/loadbalance/extensions/models/TopKBundlesTest.java

Lines changed: 25 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -297,4 +297,29 @@ public void testPartitionSort() {
297297
}
298298
}
299299
}
300+
301+
// Issue https://github.com/apache/pulsar/issues/24754
302+
@Test
303+
public void testPartitionSortCompareToContractViolationIssue() {
304+
Random rnd = new Random(0);
305+
ArrayList<NamespaceBundleStats> stats = new ArrayList<>();
306+
for (int i = 0; i < 1000; ++i) {
307+
NamespaceBundleStats s = new NamespaceBundleStats();
308+
s.msgThroughputIn = 4 * 75000 * rnd.nextDouble(); // Just above threshold (1e5)
309+
s.msgThroughputOut = 75000000 - (4 * (75000 * rnd.nextDouble()));
310+
s.msgRateIn = 4 * 75 * rnd.nextDouble();
311+
s.msgRateOut = 75000 - (4 * 75 * rnd.nextDouble());
312+
s.topics = i;
313+
s.consumerCount = i;
314+
s.producerCount = 4 * rnd.nextInt(375);
315+
s.cacheSize = 75000000 - (rnd.nextInt(4 * 75000));
316+
stats.add(s);
317+
}
318+
List<Map.Entry<String, ? extends Comparable>> bundleEntries = new ArrayList<>();
319+
320+
for (NamespaceBundleStats s : stats) {
321+
bundleEntries.add(Map.entry("bundle-" + s.msgThroughputIn, s));
322+
}
323+
TopKBundles.partitionSort(bundleEntries, 100);
324+
}
300325
}

pulsar-client-admin-api/src/main/java/org/apache/pulsar/policies/data/loadbalancer/NamespaceBundleStats.java

Lines changed: 22 additions & 28 deletions
Original file line numberDiff line numberDiff line change
@@ -37,14 +37,14 @@ public class NamespaceBundleStats implements Comparable<NamespaceBundleStats>, S
3737
public long topics;
3838
public long cacheSize;
3939

40-
// Consider the throughput equal if difference is less than 100 KB/s
41-
private static final double throughputDifferenceThreshold = 1e5;
42-
// Consider the msgRate equal if the difference is less than 100
43-
private static final double msgRateDifferenceThreshold = 100;
44-
// Consider the total topics/producers/consumers equal if the difference is less than 500
45-
private static final long topicConnectionDifferenceThreshold = 500;
46-
// Consider the cache size equal if the difference is less than 100 kb
47-
private static final long cacheSizeDifferenceThreshold = 100000;
40+
// When comparing throughput, uses a resolution of 100 KB/s, effectively rounding values before comparison
41+
private static final double throughputComparisonResolution = 1e5;
42+
// When comparing message rate, uses a resolution of 100, effectively rounding values before comparison
43+
private static final double msgRateComparisonResolution = 100;
44+
// When comparing total topics/producers/consumers, uses a resolution/rounding of 500
45+
private static final long topicConnectionComparisonResolution = 500;
46+
// When comparing cache size, uses a resolution/rounding of 100kB
47+
private static final long cacheSizeComparisonResolution = 100000;
4848

4949
public NamespaceBundleStats() {
5050
reset();
@@ -89,39 +89,33 @@ public int compareTo(NamespaceBundleStats other) {
8989
public int compareByMsgRate(NamespaceBundleStats other) {
9090
double thisMsgRate = this.msgRateIn + this.msgRateOut;
9191
double otherMsgRate = other.msgRateIn + other.msgRateOut;
92-
if (Math.abs(thisMsgRate - otherMsgRate) > msgRateDifferenceThreshold) {
93-
return Double.compare(thisMsgRate, otherMsgRate);
94-
}
95-
return 0;
92+
return compareDoubleWithResolution(thisMsgRate, otherMsgRate, msgRateComparisonResolution);
93+
}
94+
95+
private static int compareDoubleWithResolution(double v1, double v2, double resolution) {
96+
return Long.compare(Math.round(v1 / resolution), Math.round(v2 / resolution));
97+
}
98+
99+
private static int compareLongWithResolution(long v1, long v2, long resolution) {
100+
return Long.compare(v1 / resolution, v2 / resolution);
96101
}
97102

98103
public int compareByTopicConnections(NamespaceBundleStats other) {
99104
long thisTopicsAndConnections = this.topics + this.consumerCount + this.producerCount;
100105
long otherTopicsAndConnections = other.topics + other.consumerCount + other.producerCount;
101-
if (Math.abs(thisTopicsAndConnections - otherTopicsAndConnections) > topicConnectionDifferenceThreshold) {
102-
return Long.compare(thisTopicsAndConnections, otherTopicsAndConnections);
103-
}
104-
return 0;
106+
return compareLongWithResolution(thisTopicsAndConnections, otherTopicsAndConnections,
107+
topicConnectionComparisonResolution);
105108
}
106109

107110
public int compareByCacheSize(NamespaceBundleStats other) {
108-
if (Math.abs(this.cacheSize - other.cacheSize) > cacheSizeDifferenceThreshold) {
109-
return Long.compare(this.cacheSize, other.cacheSize);
110-
}
111-
return 0;
111+
return compareLongWithResolution(cacheSize, other.cacheSize, cacheSizeComparisonResolution);
112112
}
113113

114114
public int compareByBandwidthIn(NamespaceBundleStats other) {
115-
if (Math.abs(this.msgThroughputIn - other.msgThroughputIn) > throughputDifferenceThreshold) {
116-
return Double.compare(this.msgThroughputIn, other.msgThroughputIn);
117-
}
118-
return 0;
115+
return compareDoubleWithResolution(msgThroughputIn, other.msgThroughputIn, throughputComparisonResolution);
119116
}
120117

121118
public int compareByBandwidthOut(NamespaceBundleStats other) {
122-
if (Math.abs(this.msgThroughputOut - other.msgThroughputOut) > throughputDifferenceThreshold) {
123-
return Double.compare(this.msgThroughputOut, other.msgThroughputOut);
124-
}
125-
return 0;
119+
return compareDoubleWithResolution(msgThroughputOut, other.msgThroughputOut, throughputComparisonResolution);
126120
}
127121
}
Lines changed: 48 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,48 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.policies.data.loadbalancer;
20+
21+
import java.util.ArrayList;
22+
import java.util.List;
23+
import java.util.Random;
24+
import org.testng.annotations.Test;
25+
26+
public class NamespaceBundleStatsTest {
27+
28+
@Test
29+
public void testCompareToContract() {
30+
Random rnd = new Random();
31+
List<NamespaceBundleStats> stats = new ArrayList<>();
32+
for (int i = 0; i < 1000; ++i) {
33+
NamespaceBundleStats s = new NamespaceBundleStats();
34+
s.msgThroughputIn = 4 * 75000 * rnd.nextDouble();
35+
s.msgThroughputOut = 75000000 - (4 * (75000 * rnd.nextDouble()));
36+
s.msgRateIn = 4 * 75 * rnd.nextDouble();
37+
s.msgRateOut = 75000 - (4 * 75 * rnd.nextDouble());
38+
s.topics = i;
39+
s.consumerCount = i;
40+
s.producerCount = 4 * rnd.nextInt(375);
41+
s.cacheSize = 75000000 - (rnd.nextInt(4 * 75000));
42+
stats.add(s);
43+
}
44+
// this would throw "java.lang.IllegalArgumentException: Comparison method violates its general contract!"
45+
// if compareTo() is not implemented correctly.
46+
stats.sort(NamespaceBundleStats::compareTo);
47+
}
48+
}
Lines changed: 61 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,61 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.common.util;
20+
21+
import lombok.experimental.UtilityClass;
22+
23+
/**
24+
* Utility class for comparing values.
25+
*/
26+
@UtilityClass
27+
public class CompareUtil {
28+
29+
/**
30+
* Compare two double values with a given resolution.
31+
* @param v1 first value to compare
32+
* @param v2 second value to compare
33+
* @param resolution resolution to compare with
34+
* @return -1 if v1 < v2, 0 if v1 == v2, 1 if v1 > v2
35+
*/
36+
public static int compareDoubleWithResolution(double v1, double v2, double resolution) {
37+
return Long.compare((long) (v1 / resolution), (long) (v2 / resolution));
38+
}
39+
40+
/**
41+
* Compare two long values with a given resolution.
42+
* @param v1 first value to compare
43+
* @param v2 second value to compare
44+
* @param resolution resolution to compare with
45+
* @return -1 if v1 < v2, 0 if v1 == v2, 1 if v1 > v2
46+
*/
47+
public static int compareLongWithResolution(long v1, long v2, long resolution) {
48+
return Long.compare(v1 / resolution, v2 / resolution);
49+
}
50+
51+
/**
52+
* Compare two int values with a given resolution.
53+
* @param v1 first value to compare
54+
* @param v2 second value to compare
55+
* @param resolution resolution to compare with
56+
* @return -1 if v1 < v2, 0 if v1 == v2, 1 if v1 > v2
57+
*/
58+
public static int compareIntegerWithResolution(int v1, int v2, int resolution) {
59+
return Integer.compare(v1 / resolution, v2 / resolution);
60+
}
61+
}

pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/ResourceUnitRanking.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -26,7 +26,7 @@
2626
* The class containing information about system resources, allocated quota, and loaded bundles.
2727
*/
2828
@EqualsAndHashCode
29-
public class ResourceUnitRanking implements Comparable<ResourceUnitRanking> {
29+
public class ResourceUnitRanking {
3030

3131
private static final long KBITS_TO_BYTES = 1024 / 8;
3232
private static final double PERCENTAGE_DIFFERENCE_THRESHOLD = 5.0;
@@ -129,7 +129,13 @@ private void estimateLoadPercentage() {
129129

130130
}
131131

132-
public int compareTo(ResourceUnitRanking other) {
132+
/**
133+
* Compares to another ranking. Please note that this cannot be used to sort the rankings since the results
134+
* of this method don't satify the contract of {@link Comparable#compareTo(Object)}
135+
* @param other other ranking to compare to
136+
* @return negative if this is less than other, 0 if they are equal, positive if this is greater than other
137+
*/
138+
public int compareToOtherRanking(ResourceUnitRanking other) {
133139
if (Math.abs(this.estimatedLoadPercentage - other.estimatedLoadPercentage) > PERCENTAGE_DIFFERENCE_THRESHOLD) {
134140
return Double.compare(this.estimatedLoadPercentage, other.estimatedLoadPercentage);
135141
}

pulsar-common/src/main/java/org/apache/pulsar/policies/data/loadbalancer/TimeAverageMessageData.java

Lines changed: 8 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
*/
1919
package org.apache.pulsar.policies.data.loadbalancer;
2020

21+
import static org.apache.pulsar.common.util.CompareUtil.compareDoubleWithResolution;
2122
import lombok.EqualsAndHashCode;
2223

2324
/**
@@ -44,10 +45,10 @@ public class TimeAverageMessageData implements Comparable<TimeAverageMessageData
4445
// The average message rate out per second.
4546
private double msgRateOut;
4647

47-
// Consider the throughput equal if difference is less than 100 KB/s
48-
private static final double throughputDifferenceThreshold = 1e5;
49-
// Consider the msgRate equal if the difference is less than 100
50-
private static final double msgRateDifferenceThreshold = 100;
48+
// When comparing throughput, uses a resolution of 100 KB/s, effectively rounding values before comparison
49+
private static final double throughputComparisonResolution = 1e5;
50+
// When comparing message rate, uses a resolution of 100, effectively rounding values before comparison
51+
private static final double msgRateComparisonResolution = 100;
5152

5253
// For JSON only.
5354
public TimeAverageMessageData() {
@@ -202,23 +203,14 @@ public int compareTo(TimeAverageMessageData other) {
202203
public int compareByMsgRate(TimeAverageMessageData other) {
203204
double thisMsgRate = this.msgRateIn + this.msgRateOut;
204205
double otherMsgRate = other.msgRateIn + other.msgRateOut;
205-
if (Math.abs(thisMsgRate - otherMsgRate) > msgRateDifferenceThreshold) {
206-
return Double.compare(thisMsgRate, otherMsgRate);
207-
}
208-
return 0;
206+
return compareDoubleWithResolution(thisMsgRate, otherMsgRate, msgRateComparisonResolution);
209207
}
210208

211209
public int compareByBandwidthIn(TimeAverageMessageData other) {
212-
if (Math.abs(this.msgThroughputIn - other.msgThroughputIn) > throughputDifferenceThreshold) {
213-
return Double.compare(this.msgThroughputIn, other.msgThroughputIn);
214-
}
215-
return 0;
210+
return compareDoubleWithResolution(msgThroughputIn, other.msgThroughputIn, throughputComparisonResolution);
216211
}
217212

218213
public int compareByBandwidthOut(TimeAverageMessageData other) {
219-
if (Math.abs(this.msgThroughputOut - other.msgThroughputOut) > throughputDifferenceThreshold) {
220-
return Double.compare(this.msgThroughputOut, other.msgThroughputOut);
221-
}
222-
return 0;
214+
return compareDoubleWithResolution(msgThroughputOut, other.msgThroughputOut, throughputComparisonResolution);
223215
}
224216
}
Lines changed: 87 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,87 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing,
13+
* software distributed under the License is distributed on an
14+
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
15+
* KIND, either express or implied. See the License for the
16+
* specific language governing permissions and limitations
17+
* under the License.
18+
*/
19+
package org.apache.pulsar.common.util;
20+
21+
import static org.testng.Assert.assertEquals;
22+
import org.testng.annotations.Test;
23+
24+
public class CompareUtilTest {
25+
@Test
26+
public void testCompareDoubleWithResolution_bucketedComparison() {
27+
// Same truncated bucket when dividing by resolution
28+
assertEquals(CompareUtil.compareDoubleWithResolution(0.01, 0.49, 0.5), 0);
29+
assertEquals(CompareUtil.compareDoubleWithResolution(0.51, 0.99, 0.5), 0);
30+
31+
// Different truncated buckets
32+
assertEquals(CompareUtil.compareDoubleWithResolution(0.51, 0.49, 0.5), 1);
33+
assertEquals(CompareUtil.compareDoubleWithResolution(0.49, 0.51, 0.5), -1);
34+
assertEquals(CompareUtil.compareDoubleWithResolution(1.01, 0.49, 0.5), 1);
35+
36+
// Larger numbers
37+
assertEquals(CompareUtil.compareDoubleWithResolution(19.99, 10.01, 10.0), 0);
38+
assertEquals(CompareUtil.compareDoubleWithResolution(19.99, 20.01, 10.0), -1);
39+
assertEquals(CompareUtil.compareDoubleWithResolution(10.00, 9.99, 10.0), 1);
40+
}
41+
42+
@Test
43+
public void testCompareLongWithResolution_exactComparison() {
44+
// resolution = 1 -> behave like Long.compare
45+
assertEquals(CompareUtil.compareLongWithResolution(1L, 2L, 1L), -1);
46+
assertEquals(CompareUtil.compareLongWithResolution(2L, 1L, 1L), 1);
47+
assertEquals(CompareUtil.compareLongWithResolution(2L, 2L, 1L), 0);
48+
}
49+
50+
@Test
51+
public void testCompareLongWithResolution_bucketedComparison() {
52+
// Same bucket when divided by resolution
53+
assertEquals(CompareUtil.compareLongWithResolution(8L, 9L, 10L), 0);
54+
assertEquals(CompareUtil.compareLongWithResolution(10L, 19L, 10L), 0);
55+
56+
// Different buckets
57+
assertEquals(CompareUtil.compareLongWithResolution(9L, 20L, 10L), -1);
58+
assertEquals(CompareUtil.compareLongWithResolution(21L, 10L, 10L), 1);
59+
60+
// Larger resolution
61+
assertEquals(CompareUtil.compareLongWithResolution(100L, 175L, 100L), 0);
62+
assertEquals(CompareUtil.compareLongWithResolution(199L, 201L, 100L), -1);
63+
}
64+
65+
@Test
66+
public void testCompareIntegerWithResolution_exactComparison() {
67+
// resolution = 1 -> behave like Integer.compare
68+
assertEquals(CompareUtil.compareIntegerWithResolution(1, 2, 1), -1);
69+
assertEquals(CompareUtil.compareIntegerWithResolution(2, 1, 1), 1);
70+
assertEquals(CompareUtil.compareIntegerWithResolution(2, 2, 1), 0);
71+
}
72+
73+
@Test
74+
public void testCompareIntegerWithResolution_bucketedComparison() {
75+
// Same bucket
76+
assertEquals(CompareUtil.compareIntegerWithResolution(3, 4, 5), 0);
77+
assertEquals(CompareUtil.compareIntegerWithResolution(5, 9, 5), 0);
78+
79+
// Different buckets
80+
assertEquals(CompareUtil.compareIntegerWithResolution(4, 10, 5), -1);
81+
assertEquals(CompareUtil.compareIntegerWithResolution(11, 5, 5), 1);
82+
83+
// Larger resolution
84+
assertEquals(CompareUtil.compareIntegerWithResolution(51, 75, 50), 0);
85+
assertEquals(CompareUtil.compareIntegerWithResolution(49, 101, 50), -1);
86+
}
87+
}

0 commit comments

Comments
 (0)