Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ArrayIndexOutOfBoundsException on ConcurrentLongHashMap if autoshrink is true. #4316

Closed
dlg99 opened this issue Apr 26, 2024 · 5 comments
Closed
Labels

Comments

@dlg99
Copy link
Contributor

dlg99 commented Apr 26, 2024

BUG REPORT

Describe the bug

Similar to #1606 but only happens if autoShrink is true.
This was introduced by #3074 or some subsequent change related to autoShrink.
@lordcheng10 fyi

Error in prod

ERROR org.apache.bookkeeper.proto.WriteEntryProcessor - Unexpected exception while writing 3901@24558 : Index 34 out of bounds for length 32
java.lang.ArrayIndexOutOfBoundsException: Index 34 out of bounds for length 32
        at org.apache.bookkeeper.util.collections.ConcurrentLongHashMap$Section.get(ConcurrentLongHashMap.java:357) ~[org.apache.bookkeeper-bookkeeper-server-4.16.5-fx-f4db1a24ab.jar:4.16.5-fx-f4db1a24ab]
        at org.apache.bookkeeper.util.collections.ConcurrentLongHashMap.get(ConcurrentLongHashMap.java:204) ~[org.apache.bookkeeper-bookkeeper-server-4.16.5-fx-f4db1a24ab.jar:4.16.5-fx-f4db1a24ab]
        at org.apache.bookkeeper.bookie.BookieImpl.addEntryInternal(BookieImpl.java:937) ~[org.apache.bookkeeper-bookkeeper-server-4.16.5-fx-f4db1a24ab.jar:4.16.5-fx-f4db1a24ab]
        at org.apache.bookkeeper.bookie.BookieImpl.addEntry(BookieImpl.java:1074) ~[org.apache.bookkeeper-bookkeeper-server-4.16.5-fx-f4db1a24ab.jar:4.16.5-fx-f4db1a24ab]
        at org.apache.bookkeeper.proto.WriteEntryProcessor.processPacket(WriteEntryProcessor.java:79) ~[org.apache.bookkeeper-bookkeeper-server-4.16.5-fx-f4db1a24ab.jar:4.16.5-fx-f4db1a24ab]
        at org.apache.bookkeeper.proto.PacketProcessorBase.run(PacketProcessorBase.java:202) ~[org.apache.bookkeeper-bookkeeper-server-4.16.5-fx-f4db1a24ab.jar:4.16.5-fx-f4db1a24ab]
        at org.apache.bookkeeper.proto.BookieRequestProcessor.processAddRequest(BookieRequestProcessor.java:655) ~[org.apache.bookkeeper-bookkeeper-server-4.16.5-fx-f4db1a24ab.jar:4.16.5-fx-f4db1a24ab]
        at org.apache.bookkeeper.proto.BookieRequestProcessor.processRequest(BookieRequestProcessor.java:377) ~[org.apache.bookkeeper-bookkeeper-server-4.16.5-fx-f4db1a24ab.jar:4.16.5-fx-f4db1a24ab]
        at org.apache.bookkeeper.proto.BookieRequestHandler.channelRead(BookieRequestHandler.java:90) ~[org.apache.bookkeeper-bookkeeper-server-4.16.5-fx-f4db1a24ab.jar:4.16.5-fx-f4db1a24ab]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final]
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final]
        at io.netty.channel.ChannelInboundHandlerAdapter.channelRead(ChannelInboundHandlerAdapter.java:93) ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final]
        at org.apache.bookkeeper.proto.AuthHandler$ServerSideHandler.channelRead(AuthHandler.java:89) ~[org.apache.bookkeeper-bookkeeper-server-4.16.5-fx-f4db1a24ab.jar:4.16.5-fx-f4db1a24ab]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final]
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final]
        at org.apache.bookkeeper.proto.BookieProtoEncoding$RequestDecoder.channelRead(BookieProtoEncoding.java:477) ~[org.apache.bookkeeper-bookkeeper-server-4.16.5-fx-f4db1a24ab.jar:4.16.5-fx-f4db1a24ab]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final]
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final]
        at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:346) ~[io.netty-netty-codec-4.1.108.Final.jar:4.1.108.Final]
        at io.netty.handler.codec.ByteToMessageDecoder.fireChannelRead(ByteToMessageDecoder.java:333) ~[io.netty-netty-codec-4.1.108.Final.jar:4.1.108.Final]
        at io.netty.handler.codec.ByteToMessageDecoder.callDecode(ByteToMessageDecoder.java:455) ~[io.netty-netty-codec-4.1.108.Final.jar:4.1.108.Final]
        at io.netty.handler.codec.ByteToMessageDecoder.channelRead(ByteToMessageDecoder.java:290) ~[io.netty-netty-codec-4.1.108.Final.jar:4.1.108.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:444) ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final]
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final]
        at io.netty.handler.flush.FlushConsolidationHandler.channelRead(FlushConsolidationHandler.java:152) ~[io.netty-netty-handler-4.1.108.Final.jar:4.1.108.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:442) ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final]
        at io.netty.channel.AbstractChannelHandlerContext.fireChannelRead(AbstractChannelHandlerContext.java:412) ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final]
        at io.netty.channel.DefaultChannelPipeline$HeadContext.channelRead(DefaultChannelPipeline.java:1410) ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:440) ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final]
        at io.netty.channel.AbstractChannelHandlerContext.invokeChannelRead(AbstractChannelHandlerContext.java:420) ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final]
        at io.netty.channel.DefaultChannelPipeline.fireChannelRead(DefaultChannelPipeline.java:919) ~[io.netty-netty-transport-4.1.108.Final.jar:4.1.108.Final]
        at io.netty.channel.epoll.AbstractEpollStreamChannel$EpollStreamUnsafe.epollInReady(AbstractEpollStreamChannel.java:801) ~[io.netty-netty-transport-classes-epoll-4.1.108.Final.jar:4.1.108.Final]
        at io.netty.channel.epoll.EpollEventLoop.processReady(EpollEventLoop.java:509) ~[io.netty-netty-transport-classes-epoll-4.1.108.Final.jar:4.1.108.Final]
        at io.netty.channel.epoll.EpollEventLoop.run(EpollEventLoop.java:407) ~[io.netty-netty-transport-classes-epoll-4.1.108.Final.jar:4.1.108.Final]
        at io.netty.util.concurrent.SingleThreadEventExecutor$4.run(SingleThreadEventExecutor.java:997) ~[io.netty-netty-common-4.1.108.Final.jar:4.1.108.Final]
        at io.netty.util.internal.ThreadExecutorMap$2.run(ThreadExecutorMap.java:74) ~[io.netty-netty-common-4.1.108.Final.jar:4.1.108.Final]
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[io.netty-netty-common-4.1.108.Final.jar:4.1.108.Final]
        at org.apache.bookkeeper.stats.ThreadRegistry$RegisteredRunnable.run(ThreadRegistry.java:146) ~[org.apache.bookkeeper.stats-bookkeeper-stats-api-4.16.5-fx-f4db1a24ab.jar:4.16.5-fx-f4db1a24ab]
        at io.netty.util.concurrent.FastThreadLocalRunnable.run(FastThreadLocalRunnable.java:30) ~[io.netty-netty-common-4.1.108.Final.jar:4.1.108.Final]
        at java.lang.Thread.run(Thread.java:842) ~[?:?]

A clear and concise description of what the bug is.

To Reproduce

diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMapTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMapTest.java
index f1372b2894..f27d6f335c 100644
--- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMapTest.java
+++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/util/collections/ConcurrentLongHashMapTest.java
@@ -348,6 +348,7 @@ public class ConcurrentLongHashMapTest {
         assertEquals(map.size(), n);
     }
 
+
     @Test
     public void concurrentInsertions() throws Throwable {
         ConcurrentLongHashMap<String> map =
@@ -488,6 +489,171 @@ public class ConcurrentLongHashMapTest {
         executor.shutdown();
     }
 
+    @Test
+    public void stressConcurrentInsertionsAndReads2() throws Throwable {
+        ConcurrentLongHashMap<String> map =
+                ConcurrentLongHashMap.<String>newBuilder()
+                        .concurrencyLevel(4)
+                        .expectedItems(4)
+//                        .expandFactor(1.1f)
+//                        .shrinkFactor(1.1f)
+                        .autoShrink(true)
+                        .build();
+        ExecutorService executor = Executors.newCachedThreadPool();
+
+        final int writeThreads = 8;
+        final int readThreads = 8;
+        final int n = 1_000_000;
+        String[] values = new String[] {
+                "v",
+                "vv",
+                "vvv",
+                "vvvv",
+                "vvvvv",
+                "vvvvvv",
+                "vvvvvvv",
+                "vvvvvvvv",
+                "vvvvvvvvv",
+                "vvvvvvvvvv",
+        };
+        final int numValues = values.length;
+
+        CyclicBarrier barrier = new CyclicBarrier(writeThreads + readThreads);
+        List<Future<?>> futures = new ArrayList<>();
+
+        System.out.println("Starting writes");
+        for (int i = 0; i < writeThreads; i++) {
+            final int threadIdx = i;
+
+            futures.add(executor.submit(() -> {
+                Random random = new Random(threadIdx);
+
+                try {
+                    barrier.await();
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+
+                for (int j = 0; j < n; j++) {
+                    long key = random.nextLong();
+                    // Ensure keys are uniques
+                    key -= key % (threadIdx + 1);
+
+                    map.putIfAbsent(key, values[(int)Math.abs(key % numValues)]);
+                }
+            }));
+        }
+
+        System.out.println("Starting reads");
+        for (int i = 0; i < readThreads; i++) {
+            final int threadIdx = i;
+
+            futures.add(executor.submit(() -> {
+                Random random = new Random(threadIdx);
+
+                try {
+                    barrier.await();
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+
+                for (int j = 0; j < n; j++) {
+                    long key = random.nextLong();
+                    // Ensure keys are uniques
+                    key -= key % (threadIdx + 1);
+
+                    String value = map.get(key);
+                    if (value != null) {
+                        assertEquals(values[(int) Math.abs(key % numValues)], value);
+                    }
+                }
+            }));
+        }
+
+        System.out.println("Waiting for futures");
+        int count = 0;
+        for (Future<?> future : futures) {
+            future.get();
+            count++;
+            if (count % 1000 == 0) {
+                System.out.println("Completed " + count + " futures out of " + futures.size());
+            }
+        }
+
+        assertEquals(map.size(), n * writeThreads);
+
+        futures.clear();
+        barrier.reset();
+
+        System.out.println("Starting removes");
+        for (int i = 0; i < writeThreads; i++) {
+            final int threadIdx = i;
+
+            futures.add(executor.submit(() -> {
+                Random random = new Random(threadIdx);
+
+                try {
+                    barrier.await();
+                } catch (Exception e) {
+                    throw new RuntimeException(e);
+                }
+
+                for (int j = 0; j < n; j++) {
+                    long key = random.nextLong();
+                    // Ensure keys are uniques
+                    key -= key % (threadIdx + 1);
+
+                    map.putIfAbsent(key, values[(int)Math.abs(key % numValues)]);
+                    map.remove(key);
+                    String value = map.get(key);
+                    assertNull(value);
+
+                }
+            }));
+        }
+
+        System.out.println("Starting reads 2");
+        for (int i = 0; i < readThreads; i++) {
+            final int threadIdx = i;
+
+            //for (int k = 0; k < 4; k++) {
+                futures.add(executor.submit(() -> {
+                    Random random = new Random(threadIdx);
+
+                    try {
+                        barrier.await();
+                    } catch (Exception e) {
+                        throw new RuntimeException(e);
+                    }
+
+                    for (int j = 0; j < n; j++) {
+                        long key = random.nextLong();
+                        // Ensure keys are uniques
+                        key -= key % (threadIdx + 1);
+
+                        String value = map.get(key);
+                        if (value != null) {
+                            assertEquals(values[(int) Math.abs(key % numValues)], value);
+                        }
+                    }
+                }));
+            //}
+        }
+
+        System.out.println("Waiting for futures 2");
+        count = 0;
+        for (Future<?> future : futures) {
+            future.get();
+            count++;
+            if (count % 1000 == 0) {
+                System.out.println("Completed " + count + " futures out of " + futures.size());
+            }
+        }
+        futures.clear();
+
+        executor.shutdown();
+    }
+
     @Test
     public void testIteration() {
         ConcurrentLongHashMap<String> map =
@dlg99 dlg99 added the type/bug label Apr 26, 2024
@dao-jun
Copy link
Member

dao-jun commented Apr 26, 2024

It looks this issue can be caused by rehash method.

executing rehash method requires writeLock, but run

                    int capacity = this.capacity;
                    bucket = signSafeMod(bucket, capacity);

                    // First try optimistic locking
                    long storedKey = keys[bucket];
                    V storedValue = values[bucket];

not requires a read lock.

@dao-jun
Copy link
Member

dao-jun commented Apr 26, 2024

I pushed a PR to fix the issue, and it can pass your test.
#4317

@thetumbled
Copy link
Contributor

This bug has been found long time ago, but the fix is merged into pulsar repository only, without bookkeeper.
I think it is time to merge this pr to fix this problem, thanks.
#4066

@dao-jun
Copy link
Member

dao-jun commented Apr 27, 2024

Good! @thetumbled

@shoothzj
Copy link
Member

closed by #4066

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
Projects
None yet
Development

No branches or pull requests

4 participants