diff --git a/distribution/server/src/assemble/LICENSE.bin.txt b/distribution/server/src/assemble/LICENSE.bin.txt
index 24c601b184afa..cfbe991a8edd8 100644
--- a/distribution/server/src/assemble/LICENSE.bin.txt
+++ b/distribution/server/src/assemble/LICENSE.bin.txt
@@ -513,7 +513,7 @@ The Apache Software License, Version 2.0
* RxJava
- io.reactivex.rxjava3-rxjava-3.0.1.jar
* RoaringBitmap
- - org.roaringbitmap-RoaringBitmap-1.1.0.jar
+ - org.roaringbitmap-RoaringBitmap-1.0.6.jar
* OpenTelemetry
- io.opentelemetry-opentelemetry-api-1.38.0.jar
- io.opentelemetry-opentelemetry-api-incubator-1.38.0-alpha.jar
diff --git a/distribution/shell/src/assemble/LICENSE.bin.txt b/distribution/shell/src/assemble/LICENSE.bin.txt
index 2971147c2c8df..0da56c6afa8fc 100644
--- a/distribution/shell/src/assemble/LICENSE.bin.txt
+++ b/distribution/shell/src/assemble/LICENSE.bin.txt
@@ -382,8 +382,6 @@ The Apache Software License, Version 2.0
- simpleclient_tracer_common-0.16.0.jar
- simpleclient_tracer_otel-0.16.0.jar
- simpleclient_tracer_otel_agent-0.16.0.jar
- * RoaringBitmap
- - RoaringBitmap-1.1.0.jar
* Log4J
- log4j-api-2.23.1.jar
- log4j-core-2.23.1.jar
diff --git a/pom.xml b/pom.xml
index 1e200d04d68fd..7c556fa127786 100644
--- a/pom.xml
+++ b/pom.xml
@@ -317,7 +317,7 @@ flexible messaging model and an intuitive client API.
1.3
0.4
9.1.0
- 1.1.0
+ 1.0.6
1.6.1
6.4.0
3.33.0
diff --git a/pulsar-common/pom.xml b/pulsar-common/pom.xml
index 3f73a43698ea4..aa7e4998e5c3e 100644
--- a/pulsar-common/pom.xml
+++ b/pulsar-common/pom.xml
@@ -252,11 +252,6 @@
awaitility
test
-
-
- org.roaringbitmap
- RoaringBitmap
-
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSet.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSet.java
index b5ad89d1695d4..72215d7296cc3 100644
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSet.java
+++ b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentOpenLongPairRangeSet.java
@@ -29,7 +29,6 @@
import java.util.concurrent.ConcurrentSkipListMap;
import java.util.concurrent.atomic.AtomicBoolean;
import org.apache.commons.lang.mutable.MutableInt;
-import org.roaringbitmap.RoaringBitSet;
/**
* A Concurrent set comprising zero or more ranges of type {@link LongPair}. This can be alternative of
@@ -45,7 +44,7 @@
public class ConcurrentOpenLongPairRangeSet> implements LongPairRangeSet {
protected final NavigableMap rangeBitSetMap = new ConcurrentSkipListMap<>();
- private final boolean threadSafe;
+ private boolean threadSafe = true;
private final int bitSetSize;
private final LongPairConsumer consumer;
@@ -96,7 +95,9 @@ public void addOpenClosed(long lowerKey, long lowerValueOpen, long upperKey, lon
// (2) set 0th-index to upper-index in upperRange.getKey()
if (isValid(upperKey, upperValue)) {
BitSet rangeBitSet = rangeBitSetMap.computeIfAbsent(upperKey, (key) -> createNewBitSet());
- rangeBitSet.set(0, (int) upperValue + 1);
+ if (rangeBitSet != null) {
+ rangeBitSet.set(0, (int) upperValue + 1);
+ }
}
// No-op if values are not valid eg: if lower == LongPair.earliest or upper == LongPair.latest then nothing
// to set
@@ -413,6 +414,7 @@ private int getSafeEntry(long value) {
}
private BitSet createNewBitSet() {
- return this.threadSafe ? new ConcurrentRoaringBitSet() : new RoaringBitSet();
+ return this.threadSafe ? new ConcurrentBitSet(bitSetSize) : new BitSet(bitSetSize);
}
-}
\ No newline at end of file
+
+}
diff --git a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentRoaringBitSet.java b/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentRoaringBitSet.java
deleted file mode 100644
index 814e58400993b..0000000000000
--- a/pulsar-common/src/main/java/org/apache/pulsar/common/util/collections/ConcurrentRoaringBitSet.java
+++ /dev/null
@@ -1,439 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.pulsar.common.util.collections;
-
-import java.util.BitSet;
-import java.util.concurrent.locks.StampedLock;
-import java.util.stream.IntStream;
-import org.roaringbitmap.RoaringBitSet;
-
-public class ConcurrentRoaringBitSet extends RoaringBitSet {
- private final StampedLock rwLock = new StampedLock();
-
- public ConcurrentRoaringBitSet() {
- super();
- }
-
- @Override
- public boolean get(int bitIndex) {
- long stamp = rwLock.tryOptimisticRead();
- boolean isSet = super.get(bitIndex);
- if (!rwLock.validate(stamp)) {
- stamp = rwLock.readLock();
- try {
- isSet = super.get(bitIndex);
- } finally {
- rwLock.unlockRead(stamp);
- }
- }
- return isSet;
- }
-
- @Override
- public void set(int bitIndex) {
- long stamp = rwLock.writeLock();
- try {
- super.set(bitIndex);
- } finally {
- rwLock.unlockWrite(stamp);
- }
- }
-
- @Override
- public void clear(int bitIndex) {
- long stamp = rwLock.writeLock();
- try {
- super.clear(bitIndex);
- } finally {
- rwLock.unlockWrite(stamp);
- }
- }
-
- @Override
- public void set(int fromIndex, int toIndex) {
- long stamp = rwLock.writeLock();
- try {
- super.set(fromIndex, toIndex);
- } finally {
- rwLock.unlockWrite(stamp);
- }
- }
-
- @Override
- public void clear(int fromIndex, int toIndex) {
- long stamp = rwLock.writeLock();
- try {
- super.clear(fromIndex, toIndex);
- } finally {
- rwLock.unlockWrite(stamp);
- }
- }
-
- @Override
- public void clear() {
- long stamp = rwLock.writeLock();
- try {
- super.clear();
- } finally {
- rwLock.unlockWrite(stamp);
- }
- }
-
- @Override
- public int nextSetBit(int fromIndex) {
- long stamp = rwLock.tryOptimisticRead();
- int nextSetBit = super.nextSetBit(fromIndex);
- if (!rwLock.validate(stamp)) {
- // Fallback to read lock
- stamp = rwLock.readLock();
- try {
- nextSetBit = super.nextSetBit(fromIndex);
- } finally {
- rwLock.unlockRead(stamp);
- }
- }
- return nextSetBit;
- }
-
- @Override
- public int nextClearBit(int fromIndex) {
- long stamp = rwLock.tryOptimisticRead();
- int nextClearBit = super.nextClearBit(fromIndex);
- if (!rwLock.validate(stamp)) {
- // Fallback to read lock
- stamp = rwLock.readLock();
- try {
- nextClearBit = super.nextClearBit(fromIndex);
- } finally {
- rwLock.unlockRead(stamp);
- }
- }
- return nextClearBit;
- }
-
- @Override
- public int previousSetBit(int fromIndex) {
- long stamp = rwLock.tryOptimisticRead();
- int previousSetBit = super.previousSetBit(fromIndex);
- if (!rwLock.validate(stamp)) {
- // Fallback to read lock
- stamp = rwLock.readLock();
- try {
- previousSetBit = super.previousSetBit(fromIndex);
- } finally {
- rwLock.unlockRead(stamp);
- }
- }
- return previousSetBit;
- }
-
- @Override
- public int previousClearBit(int fromIndex) {
- long stamp = rwLock.tryOptimisticRead();
- int previousClearBit = super.previousClearBit(fromIndex);
- if (!rwLock.validate(stamp)) {
- // Fallback to read lock
- stamp = rwLock.readLock();
- try {
- previousClearBit = super.previousClearBit(fromIndex);
- } finally {
- rwLock.unlockRead(stamp);
- }
- }
- return previousClearBit;
- }
-
- @Override
- public int length() {
- long stamp = rwLock.tryOptimisticRead();
- int length = super.length();
- if (!rwLock.validate(stamp)) {
- // Fallback to read lock
- stamp = rwLock.readLock();
- try {
- length = super.length();
- } finally {
- rwLock.unlockRead(stamp);
- }
- }
- return length;
- }
-
- @Override
- public boolean isEmpty() {
- long stamp = rwLock.tryOptimisticRead();
- boolean isEmpty = super.isEmpty();
- if (!rwLock.validate(stamp)) {
- // Fallback to read lock
- stamp = rwLock.readLock();
- try {
- isEmpty = super.isEmpty();
- } finally {
- rwLock.unlockRead(stamp);
- }
- }
- return isEmpty;
- }
-
- @Override
- public int cardinality() {
- long stamp = rwLock.tryOptimisticRead();
- int cardinality = super.cardinality();
- if (!rwLock.validate(stamp)) {
- // Fallback to read lock
- stamp = rwLock.readLock();
- try {
- cardinality = super.cardinality();
- } finally {
- rwLock.unlockRead(stamp);
- }
- }
- return cardinality;
- }
-
- @Override
- public int size() {
- long stamp = rwLock.tryOptimisticRead();
- int size = super.size();
- if (!rwLock.validate(stamp)) {
- // Fallback to read lock
- stamp = rwLock.readLock();
- try {
- size = super.size();
- } finally {
- rwLock.unlockRead(stamp);
- }
- }
- return size;
- }
-
- @Override
- public byte[] toByteArray() {
- long stamp = rwLock.tryOptimisticRead();
- byte[] byteArray = super.toByteArray();
- if (!rwLock.validate(stamp)) {
- // Fallback to read lock
- stamp = rwLock.readLock();
- try {
- byteArray = super.toByteArray();
- } finally {
- rwLock.unlockRead(stamp);
- }
- }
- return byteArray;
- }
-
- @Override
- public long[] toLongArray() {
- long stamp = rwLock.tryOptimisticRead();
- long[] longArray = super.toLongArray();
- if (!rwLock.validate(stamp)) {
- // Fallback to read lock
- stamp = rwLock.readLock();
- try {
- longArray = super.toLongArray();
- } finally {
- rwLock.unlockRead(stamp);
- }
- }
- return longArray;
- }
-
- @Override
- public void flip(int bitIndex) {
- long stamp = rwLock.writeLock();
- try {
- super.flip(bitIndex);
- } finally {
- rwLock.unlockWrite(stamp);
- }
- }
-
- @Override
- public void flip(int fromIndex, int toIndex) {
- long stamp = rwLock.writeLock();
- try {
- super.flip(fromIndex, toIndex);
- } finally {
- rwLock.unlockWrite(stamp);
- }
- }
-
- @Override
- public void set(int bitIndex, boolean value) {
- long stamp = rwLock.writeLock();
- try {
- super.set(bitIndex, value);
- } finally {
- rwLock.unlockWrite(stamp);
- }
- }
-
- @Override
- public void set(int fromIndex, int toIndex, boolean value) {
- long stamp = rwLock.writeLock();
- try {
- super.set(fromIndex, toIndex, value);
- } finally {
- rwLock.unlockWrite(stamp);
- }
- }
-
- @Override
- public BitSet get(int fromIndex, int toIndex) {
- long stamp = rwLock.tryOptimisticRead();
- BitSet bitSet = super.get(fromIndex, toIndex);
- if (!rwLock.validate(stamp)) {
- // Fallback to read lock
- stamp = rwLock.readLock();
- try {
- bitSet = super.get(fromIndex, toIndex);
- } finally {
- rwLock.unlockRead(stamp);
- }
- }
- return bitSet;
- }
-
- @Override
- public boolean intersects(BitSet set) {
- long stamp = rwLock.writeLock();
- try {
- return super.intersects(set);
- } finally {
- rwLock.unlockWrite(stamp);
- }
- }
-
- @Override
- public void and(BitSet set) {
- long stamp = rwLock.writeLock();
- try {
- super.and(set);
- } finally {
- rwLock.unlockWrite(stamp);
- }
- }
-
- @Override
- public void or(BitSet set) {
- long stamp = rwLock.writeLock();
- try {
- super.or(set);
- } finally {
- rwLock.unlockWrite(stamp);
- }
- }
-
- @Override
- public void xor(BitSet set) {
- long stamp = rwLock.writeLock();
- try {
- super.xor(set);
- } finally {
- rwLock.unlockWrite(stamp);
- }
- }
-
- @Override
- public void andNot(BitSet set) {
- long stamp = rwLock.writeLock();
- try {
- super.andNot(set);
- } finally {
- rwLock.unlockWrite(stamp);
- }
- }
-
- /**
- * Returns the clone of the internal wrapped {@code BitSet}.
- * This won't be a clone of the {@code ConcurrentBitSet} object.
- *
- * @return a clone of the internal wrapped {@code BitSet}
- */
- @Override
- public Object clone() {
- long stamp = rwLock.tryOptimisticRead();
- RoaringBitSet clone = (RoaringBitSet) super.clone();
- if (!rwLock.validate(stamp)) {
- // Fallback to read lock
- stamp = rwLock.readLock();
- try {
- clone = (RoaringBitSet) super.clone();
- } finally {
- rwLock.unlockRead(stamp);
- }
- }
- return clone;
- }
-
- @Override
- public String toString() {
- long stamp = rwLock.tryOptimisticRead();
- String str = super.toString();
- if (!rwLock.validate(stamp)) {
- // Fallback to read lock
- stamp = rwLock.readLock();
- try {
- str = super.toString();
- } finally {
- rwLock.unlockRead(stamp);
- }
- }
- return str;
- }
-
- /**
- * This operation is not supported on {@code ConcurrentBitSet}.
- */
- @Override
- public IntStream stream() {
- throw new UnsupportedOperationException("stream is not supported");
- }
-
- public boolean equals(final Object o) {
- long stamp = rwLock.tryOptimisticRead();
- boolean isEqual = super.equals(o);
- if (!rwLock.validate(stamp)) {
- // Fallback to read lock
- stamp = rwLock.readLock();
- try {
- isEqual = super.equals(o);
- } finally {
- rwLock.unlockRead(stamp);
- }
- }
- return isEqual;
- }
-
- public int hashCode() {
- long stamp = rwLock.tryOptimisticRead();
- int hashCode = super.hashCode();
- if (!rwLock.validate(stamp)) {
- // Fallback to read lock
- stamp = rwLock.readLock();
- try {
- hashCode = super.hashCode();
- } finally {
- rwLock.unlockRead(stamp);
- }
- }
- return hashCode;
- }
-}