Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Fix the potential race condition in the BlobStore readhandler #12123

Merged
merged 9 commits into from
Oct 12, 2021

Conversation

zymap
Copy link
Member

@zymap zymap commented Sep 22, 2021


Motivation

We found the BlobStoreBackedReadHandler enter an infinite loop when reading a offload ledger.

We saw in the heap dump, there has two ledger 1 and 2, and we have
a consumer is reading ledger 1, but the heap shows the buffer it
reading is ledger 2. Then the read handler read a wrong entry id
and the entry id is out of range between the firstEntryId and the
lastEntryId, it will try to keep seeking to the right position, so
that it enters an infinite loop.

The buffer in the BlobStoreBackedInputStreamImpl is a wrong buffer,
so the read handler can not read a correct entry from it. After investigating,
the buffer used in the BlobStoreBackedInputStreamImpl is allocated from
PulsarByteBufAllocator.DEFAULT.buffer(bufferSize, bufferSize), by default,
we used a PooledByteBufAllocator.DEFAULT to allocate memory for that buffer,
a weird thing we found is the closed buffer still can read data if there has
new buffer allocated and write things.
This is the test code:

ByteBuf buf = PulsarByteBufAllocator.DEFAULT.buffer(1024, 1024);
buf.writeByte(1);
System.out.println(buf.readByte()); // print 1
buf.release();
//System.out.println(buf.readByte()); // will throw exception
ByteBuf newBuf = PulsarByteBufAllocator.DEFAULT.buffer(1024, 1024);
newBuf.writeByte(2);
System.out.println(buf.readByte()); // print 2
newBuf.release();
//System.out.println(buf.readByte());

So we suspect there has a race condition between the read and close
operation. That cause there has a thread start reading after
the read handler closed.

Modifications

  • Add a state check before reading entries.
  • exit loop when the entryID is bigger than the lastEntryID

---

*Motivation*

We found the BlobStoreBackedReadHandler enter an infinite loop                                                                                          when reading a offload ledger.

We saw in the heap dump, there has two ledger 1 and 2, and we have
a consumer is reading ledger 1, but the heap shows the buffer it
reading is ledger 2. Then the read handler read a wrong entry id
and the entry id is out of range between the firstEntryId and the
lastEntryId, it will try to keep seeking to the right position, so
that it enter an infinite loop.

The buffer in the `BlobStoreBackedInputStreamImpl` is a wrong buffer,
so the read handler can not read a right entry from it. After investigating,
the buffer used in the `BlobStoreBackedInputStreamImpl` is allocated from
`PulsarByteBufAllocator.DEFAULT.buffer(bufferSize, bufferSize)`, by default,
we used a `PooledByteBufAllocator.DEFAULT` to allocate memory for that buffer,
a weird thing we found is the closed buffer still can read data if there has
new buffer allocated and write things.
This is the test code:
```
ByteBuf buf = PulsarByteBufAllocator.DEFAULT.buffer(1024, 1024);
buf.writeByte(1);
System.out.println(buf.readByte()); // print 1
buf.release();
//System.out.println(buf.readByte()); // will throw exception
ByteBuf newBuf = PulsarByteBufAllocator.DEFAULT.buffer(1024, 1024);
newBuf.writeByte(2);
System.out.println(buf.readByte()); // print 2
newBuf.release();
//System.out.println(buf.readByte());
```

So we suspect  there has a race condition between the read and close
operation. That cause there has a thread  start reading after
the read handler closed.

*Modifications*

- Add a state check before rading entries.
- exit loop when the entryID is bigger than the lastEntryID
@zymap zymap added type/bug The PR fixed a bug or issue reported a bug area/tieredstorage labels Sep 22, 2021
@zymap zymap self-assigned this Sep 22, 2021
@zymap
Copy link
Member Author

zymap commented Sep 22, 2021

I will try to add test later

@Anonymitaet
Copy link
Member

@zymap Thanks for your contribution. For this PR, do we need to update docs?

(The PR template contains info about doc, which helps others know more about the changes. Can you provide doc-related info in this and future PR descriptions? Thanks)

@zymap zymap added the doc-not-needed Your PR changes do not impact docs label Sep 22, 2021
Copy link
Member

@michaeljmarshall michaeljmarshall left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great catch! It'll be good to add the test, as you mentioned @zymap.

State state = STATE_UPDATER.get(this);
if (state == State.Closed) {
log.warn("Reading a closed read handler. Ledger ID: {}, Read range: {}-{}", ledgerId, firstEntry, lastEntry);
throw new BKException.BKUnexpectedConditionException();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this complete the promise exceptionally?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The promise is completed in the catch section.

Comment on lines 64 to 66
private AtomicReferenceFieldUpdater<BlobStoreBackedReadHandleImpl, State> STATE_UPDATER = AtomicReferenceFieldUpdater
.newUpdater(BlobStoreBackedReadHandleImpl.class, State.class, "state");
private volatile State state = null;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm not sure that we need these concurrency controls. The executor in this class is a single thread. Therefore, a local state variable should be sufficient as long as we only update the variable from within the executor's thread.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. We did don't need the AtomicReferenceFieldUpdater to control it. But we need to make it as volatile, the read and close in the different thread, if the read thread into an infinite loop, then someone triggers the close, the read thread can not exit the infinite loop because the state is holding by the read thread. We need to add the volatile to make the state can be notified when it's changed.

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the read and close in the different thread, if the read thread into an infinite loop, then someone triggers the close, the read thread can not exit the infinite loop because the state is holding by the read thread.

The read and the write are not in separate threads. They are both scheduled to run as runnables on the executor in this class. That executor has a single thread. If the read is in an infinite loop, the close runnable will never run.

@hangc0276
Copy link
Contributor

@zymap Please take a look of the failed tests

Error:  Tests run: 27, Failures: 2, Errors: 0, Skipped: 4, Time elapsed: 661.15 s <<< FAILURE! - in TestSuite
Error:  testQueryTieredStorage1(org.apache.pulsar.tests.integration.presto.TestPrestoQueryTieredStorage)  Time elapsed: 64.892 s  <<< FAILURE!
java.sql.SQLException: Query failed (#20210923_090130_00005_pyunu): org.apache.bookkeeper.mledger.ManagedLedgerException: Unexpected condition
	at io.prestosql.jdbc.PrestoResultSet.resultsException(PrestoResultSet.java:1894)
	at io.prestosql.jdbc.PrestoResultSet.getColumns(PrestoResultSet.java:1755)
	at io.prestosql.jdbc.PrestoResultSet.<init>(PrestoResultSet.java:125)
	at io.prestosql.jdbc.PrestoStatement.internalExecute(PrestoStatement.java:249)
	at io.prestosql.jdbc.PrestoStatement.execute(PrestoStatement.java:227)
	at io.prestosql.jdbc.PrestoStatement.executeQuery(PrestoStatement.java:76)
	at org.apache.pulsar.tests.integration.presto.TestPulsarSQLBase.validateData(TestPulsarSQLBase.java:220)
	at org.apache.pulsar.tests.integration.presto.TestPulsarSQLBase.pulsarSQLBasicTest(TestPulsarSQLBase.java:72)
	at org.apache.pulsar.tests.integration.presto.TestPrestoQueryTieredStorage.testQueryTieredStorage1(TestPrestoQueryTieredStorage.java:122)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
	at java.base/jdk.internal.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
	at java.base/jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
	at java.base/java.lang.reflect.Method.invoke(Method.java:566)
	at org.testng.internal.MethodInvocationHelper.invokeMethod(MethodInvocationHelper.java:132)
	at org.testng.internal.InvokeMethodRunnable.runOne(InvokeMethodRunnable.java:45)
	at org.testng.internal.InvokeMethodRunnable.call(InvokeMethodRunnable.java:73)
	at org.testng.internal.InvokeMethodRunnable.call(InvokeMethodRunnable.java:11)
	at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
	at java.base/java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1128)
	at java.base/java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:628)
	at java.base/java.lang.Thread.run(Thread.java:829)

@zymap
Copy link
Member Author

zymap commented Sep 24, 2021

@michaeljmarshall @hangc0276 @codelipenghui All tests passed, PTAL, thanks

}
entriesToRead--;
nextExpectedId++;
} else if (entryId > nextExpectedId && entryId < lastEntry) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The two if check are lead to

inputStream.seek(index.getIndexEntryForEntry(nextExpectedId).getDataOffset());
continue;

can we merge them into one if check?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

@@ -94,56 +104,70 @@ public LedgerMetadata getLedgerMetadata() {
log.debug("Ledger {}: reading {} - {}", getId(), firstEntry, lastEntry);
CompletableFuture<LedgerEntries> promise = new CompletableFuture<>();
executor.submit(() -> {
List<LedgerEntry> entries = new ArrayList<LedgerEntry>();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We'd better put it behind

if (firstEntry > lastEntry
                    || firstEntry < 0
                    || lastEntry > getLastAddConfirmed()) {
    promise.completeExceptionally(new BKException.BKIncorrectParameterException());
    return;
}

we can save the list object apply when the (firstEntry, lastEntry) check failed.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The array list will not allocate space until there is an element added to it. And it is used in the catch, I think we can keep it.

@@ -115,7 +115,8 @@ public int read(byte[] b, int off, int len) throws IOException {
}

@Override
public void seek(long position) {
public void seek(long position) throws IOException {
refillBufferIfNeeded();
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Why add refillBufferIfNeed()?
We'd better check buffer.readerIndex() == position, if the check is true, just skip the following check and return.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because we add a new line in the BlobStoreBackedHandleImpl:

                inputStream.seek(index.getIndexEntryForEntry(firstEntry).getDataOffset());

Before this, the readAsync will use readInt() to trigger the stream to fill buffer, then to seek. Currently, we will execute seek before any read operation. We need to fill the buffer to let it fetch the data.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The position is the position of the whole object, and the buffer is part of the object. I think we don't need to check buffer.readerIndex() == position, the seek operation is not a complicated work.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It was added because my test case has some issues. Remove this and refactor the test case

nextExpectedId, entryId, lastEntry);
throw new BKException.BKUnexpectedConditionException();
} else {
long ignored = inputStream.skip(length);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ignored has no reference, we can use inputStream.skip(length);

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

There has a spotbugs check to let us handle the returned value. So we can't remove it.

nextExpectedId, entryId, lastEntry);
throw new BKException.BKUnexpectedConditionException();
} else {
long ignored = inputStream.skip(length);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The ignored has no reference, we can use inputStream.skip(length);

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ok

// will read the entry id from the stream and that is not the correct entry id, so it will seek to the
// correct position then read the stream as normal. But the entry id may exceed the last entry id, that
// will cause we are hardly to know the edge of the request range.
inputStream.seek(index.getIndexEntryForEntry(firstEntry).getDataOffset());
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Because we have seek to the firstEntry before reading, we can simplify the following check logic. If the entryId read from dataStream not equal to nextExpectedId, we can throw exception.
@codelipenghui Please help check this logic.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This seek operation is being used to seeking to the first entry id position, and then do the original logic. This operation is only to make sure the read position is as we wanted.
Before this, it reads the first data to get to know if the read position is the right position. If not, it will seek to the right position.

The following check logic is still needed to help us to seek to the right position if we met some unexpected data. Maybe we can set a limitation to avoid it into an infinite loop.

@hangc0276 hangc0276 added this to the 2.9.0 milestone Sep 24, 2021
@zymap
Copy link
Member Author

zymap commented Sep 28, 2021

@hangc0276 @michaeljmarshall PTAL

@michaeljmarshall
Copy link
Member

@zymap - If I understand this class correctly, the new state variable will never be able to interrupt an infinite while loop because both closeAsync and the while loop are run on the class's single threaded executor. If the while loop is in an infinite loop, the closeAsync runnable will never get run. I tried to address your comment here: #12123 (comment). Let me know if you think my understanding makes sense or is incorrect in some way. Thanks.

@zymap
Copy link
Member Author

zymap commented Sep 29, 2021

@michaeljmarshall Thanks, that makes sense to me. I will fix that

@zymap
Copy link
Member Author

zymap commented Oct 2, 2021

@michaeljmarshall I removed the volatile. PTAL if you have time. Thanks

@eolivelli eolivelli modified the milestones: 2.9.0, 2.10.0 Oct 6, 2021
@zymap
Copy link
Member Author

zymap commented Oct 8, 2021

ping @hangc0276 @michaeljmarshall Please take another look when you have time. Thanks

@hangc0276 hangc0276 modified the milestones: 2.10.0, 2.9.0 Oct 9, 2021
Copy link
Contributor

@hangc0276 hangc0276 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Great Catch!

@codelipenghui
Copy link
Contributor

@michaeljmarshall Please help review the PR again

@zymap zymap merged commit 3d7c3f8 into apache:master Oct 12, 2021
@michaeljmarshall
Copy link
Member

@codelipenghui and @zymap - sorry for my delayed review. LGTM

zymap added a commit that referenced this pull request Oct 13, 2021
* Fix the potential race condition in the BlobStore readhandler
---

*Motivation*

We found the BlobStoreBackedReadHandler enter an infinite loop                                                                                          when reading a offload ledger.

We saw in the heap dump, there has two ledger 1 and 2, and we have
a consumer is reading ledger 1, but the heap shows the buffer it
reading is ledger 2. Then the read handler read a wrong entry id
and the entry id is out of range between the firstEntryId and the
lastEntryId, it will try to keep seeking to the right position, so
that it enter an infinite loop.

The buffer in the `BlobStoreBackedInputStreamImpl` is a wrong buffer,
so the read handler can not read a right entry from it. After investigating,
the buffer used in the `BlobStoreBackedInputStreamImpl` is allocated from
`PulsarByteBufAllocator.DEFAULT.buffer(bufferSize, bufferSize)`, by default,
we used a `PooledByteBufAllocator.DEFAULT` to allocate memory for that buffer,
a weird thing we found is the closed buffer still can read data if there has
new buffer allocated and write things.
This is the test code:
```
ByteBuf buf = PulsarByteBufAllocator.DEFAULT.buffer(1024, 1024);
buf.writeByte(1);
System.out.println(buf.readByte()); // print 1
buf.release();
//System.out.println(buf.readByte()); // will throw exception
ByteBuf newBuf = PulsarByteBufAllocator.DEFAULT.buffer(1024, 1024);
newBuf.writeByte(2);
System.out.println(buf.readByte()); // print 2
newBuf.release();
//System.out.println(buf.readByte());
```

So we suspect there has a race condition between the read and close
operation. That cause there has a thread start reading after
the read handler closed.

*Modifications*

- Add a state check before reading entries.
- exit loop when the entryID is bigger than the lastEntryID

(cherry picked from commit 3d7c3f8)
@zymap zymap added the cherry-picked/branch-2.8 Archived: 2.8 is end of life label Oct 13, 2021
zymap added a commit to zymap/pulsar that referenced this pull request Oct 21, 2021
---

*Motivation*

In the apache#12123, I add the seek operation at the readAsync method.
It makes sure the data stream always seek to the first entry position
to read and will not introduce EOF exception.
But in the offload index entry, it groups a set of entries into a range,
the seek operation will seek the posistion to the first entry in the range.
That will introduce a performance issue because every read opeartion will
read from the first entry in the range until it find the actual first read
entry.
But if we remove the seek operation, that will cause a EOF exception from
the readAsync method. This PR adds a limitation of the seek opeartion.

*Modifications*

Add available method in the backedInputStream to get know how many bytes
we can read from the stream.
zymap added a commit to zymap/pulsar that referenced this pull request Oct 21, 2021
---

*Motivation*

In the apache#12123, I add the seek operation at the readAsync method.
It makes sure the data stream always seek to the first entry position
to read and will not introduce EOF exception.
But in the offload index entry, it groups a set of entries into a range,
the seek operation will seek the posistion to the first entry in the range.
That will introduce a performance issue because every read opeartion will
read from the first entry in the range until it find the actual first read
entry.
But if we remove the seek operation, that will cause a EOF exception from
the readAsync method. This PR adds a limitation of the seek opeartion.

*Modifications*

Add available method in the backedInputStream to get know how many bytes
we can read from the stream.
codelipenghui pushed a commit that referenced this pull request Oct 22, 2021
---

*Motivation*

In the #12123, I add the seek operation at the readAsync method.
It makes sure the data stream always seek to the first entry position
to read and will not introduce EOF exception.
But in the offload index entry, it groups a set of entries into a range,
the seek operation will seek the posistion to the first entry in the range.
That will introduce a performance issue because every read opeartion will
read from the first entry in the range until it find the actual first read
entry.
But if we remove the seek operation, that will cause a EOF exception from
the readAsync method. This PR adds a limitation of the seek opeartion.

*Modifications*

Add available method in the backedInputStream to get know how many bytes
we can read from the stream.
zymap added a commit that referenced this pull request Oct 22, 2021
---

*Motivation*

In the #12123, I add the seek operation at the readAsync method.
It makes sure the data stream always seek to the first entry position
to read and will not introduce EOF exception.
But in the offload index entry, it groups a set of entries into a range,
the seek operation will seek the posistion to the first entry in the range.
That will introduce a performance issue because every read opeartion will
read from the first entry in the range until it find the actual first read
entry.
But if we remove the seek operation, that will cause a EOF exception from
the readAsync method. This PR adds a limitation of the seek opeartion.

*Modifications*

Add available method in the backedInputStream to get know how many bytes
we can read from the stream.

(cherry picked from commit b4d05ac)
eolivelli pushed a commit to eolivelli/pulsar that referenced this pull request Nov 29, 2021
---

*Motivation*

In the apache#12123, I add the seek operation at the readAsync method.
It makes sure the data stream always seek to the first entry position
to read and will not introduce EOF exception.
But in the offload index entry, it groups a set of entries into a range,
the seek operation will seek the posistion to the first entry in the range.
That will introduce a performance issue because every read opeartion will
read from the first entry in the range until it find the actual first read
entry.
But if we remove the seek operation, that will cause a EOF exception from
the readAsync method. This PR adds a limitation of the seek opeartion.

*Modifications*

Add available method in the backedInputStream to get know how many bytes
we can read from the stream.
codelipenghui pushed a commit that referenced this pull request Dec 11, 2021
* Fix the potential race condition in the BlobStore readhandler
---

*Motivation*

We found the BlobStoreBackedReadHandler enter an infinite loop                                                                                          when reading a offload ledger.

We saw in the heap dump, there has two ledger 1 and 2, and we have
a consumer is reading ledger 1, but the heap shows the buffer it
reading is ledger 2. Then the read handler read a wrong entry id
and the entry id is out of range between the firstEntryId and the
lastEntryId, it will try to keep seeking to the right position, so
that it enter an infinite loop.

The buffer in the `BlobStoreBackedInputStreamImpl` is a wrong buffer,
so the read handler can not read a right entry from it. After investigating,
the buffer used in the `BlobStoreBackedInputStreamImpl` is allocated from
`PulsarByteBufAllocator.DEFAULT.buffer(bufferSize, bufferSize)`, by default,
we used a `PooledByteBufAllocator.DEFAULT` to allocate memory for that buffer,
a weird thing we found is the closed buffer still can read data if there has
new buffer allocated and write things.
This is the test code:
```
ByteBuf buf = PulsarByteBufAllocator.DEFAULT.buffer(1024, 1024);
buf.writeByte(1);
System.out.println(buf.readByte()); // print 1
buf.release();
//System.out.println(buf.readByte()); // will throw exception
ByteBuf newBuf = PulsarByteBufAllocator.DEFAULT.buffer(1024, 1024);
newBuf.writeByte(2);
System.out.println(buf.readByte()); // print 2
newBuf.release();
//System.out.println(buf.readByte());
```

So we suspect there has a race condition between the read and close
operation. That cause there has a thread start reading after
the read handler closed.

*Modifications*

- Add a state check before reading entries.
- exit loop when the entryID is bigger than the lastEntryID

(cherry picked from commit 3d7c3f8)
@codelipenghui codelipenghui added the cherry-picked/branch-2.7 Archived: 2.7 is end of life label Dec 11, 2021
codelipenghui pushed a commit that referenced this pull request Dec 20, 2021
---

*Motivation*

In the #12123, I add the seek operation at the readAsync method.
It makes sure the data stream always seek to the first entry position
to read and will not introduce EOF exception.
But in the offload index entry, it groups a set of entries into a range,
the seek operation will seek the posistion to the first entry in the range.
That will introduce a performance issue because every read opeartion will
read from the first entry in the range until it find the actual first read
entry.
But if we remove the seek operation, that will cause a EOF exception from
the readAsync method. This PR adds a limitation of the seek opeartion.

*Modifications*

Add available method in the backedInputStream to get know how many bytes
we can read from the stream.

(cherry picked from commit b4d05ac)
bharanic-dev pushed a commit to bharanic-dev/pulsar that referenced this pull request Mar 18, 2022
…#12123)

* Fix the potential race condition in the BlobStore readhandler
---

*Motivation*

We found the BlobStoreBackedReadHandler enter an infinite loop                                                                                          when reading a offload ledger.

We saw in the heap dump, there has two ledger 1 and 2, and we have
a consumer is reading ledger 1, but the heap shows the buffer it
reading is ledger 2. Then the read handler read a wrong entry id
and the entry id is out of range between the firstEntryId and the
lastEntryId, it will try to keep seeking to the right position, so
that it enter an infinite loop.

The buffer in the `BlobStoreBackedInputStreamImpl` is a wrong buffer,
so the read handler can not read a right entry from it. After investigating,
the buffer used in the `BlobStoreBackedInputStreamImpl` is allocated from
`PulsarByteBufAllocator.DEFAULT.buffer(bufferSize, bufferSize)`, by default,
we used a `PooledByteBufAllocator.DEFAULT` to allocate memory for that buffer,
a weird thing we found is the closed buffer still can read data if there has
new buffer allocated and write things.
This is the test code:
```
ByteBuf buf = PulsarByteBufAllocator.DEFAULT.buffer(1024, 1024);
buf.writeByte(1);
System.out.println(buf.readByte()); // print 1
buf.release();
//System.out.println(buf.readByte()); // will throw exception
ByteBuf newBuf = PulsarByteBufAllocator.DEFAULT.buffer(1024, 1024);
newBuf.writeByte(2);
System.out.println(buf.readByte()); // print 2
newBuf.release();
//System.out.println(buf.readByte());
```

So we suspect there has a race condition between the read and close
operation. That cause there has a thread start reading after
the read handler closed.

*Modifications*

- Add a state check before reading entries.
- exit loop when the entryID is bigger than the lastEntryID
lhotari pushed a commit to datastax/pulsar that referenced this pull request Apr 11, 2022
…#12123)

* Fix the potential race condition in the BlobStore readhandler
---

*Motivation*

We found the BlobStoreBackedReadHandler enter an infinite loop                                                                                          when reading a offload ledger.

We saw in the heap dump, there has two ledger 1 and 2, and we have
a consumer is reading ledger 1, but the heap shows the buffer it
reading is ledger 2. Then the read handler read a wrong entry id
and the entry id is out of range between the firstEntryId and the
lastEntryId, it will try to keep seeking to the right position, so
that it enter an infinite loop.

The buffer in the `BlobStoreBackedInputStreamImpl` is a wrong buffer,
so the read handler can not read a right entry from it. After investigating,
the buffer used in the `BlobStoreBackedInputStreamImpl` is allocated from
`PulsarByteBufAllocator.DEFAULT.buffer(bufferSize, bufferSize)`, by default,
we used a `PooledByteBufAllocator.DEFAULT` to allocate memory for that buffer,
a weird thing we found is the closed buffer still can read data if there has
new buffer allocated and write things.
This is the test code:
```
ByteBuf buf = PulsarByteBufAllocator.DEFAULT.buffer(1024, 1024);
buf.writeByte(1);
System.out.println(buf.readByte()); // print 1
buf.release();
//System.out.println(buf.readByte()); // will throw exception
ByteBuf newBuf = PulsarByteBufAllocator.DEFAULT.buffer(1024, 1024);
newBuf.writeByte(2);
System.out.println(buf.readByte()); // print 2
newBuf.release();
//System.out.println(buf.readByte());
```

So we suspect there has a race condition between the read and close
operation. That cause there has a thread start reading after
the read handler closed.

*Modifications*

- Add a state check before reading entries.
- exit loop when the entryID is bigger than the lastEntryID

(cherry picked from commit 3d7c3f8)
lhotari pushed a commit to datastax/pulsar that referenced this pull request Apr 11, 2022
---

*Motivation*

In the apache#12123, I add the seek operation at the readAsync method.
It makes sure the data stream always seek to the first entry position
to read and will not introduce EOF exception.
But in the offload index entry, it groups a set of entries into a range,
the seek operation will seek the posistion to the first entry in the range.
That will introduce a performance issue because every read opeartion will
read from the first entry in the range until it find the actual first read
entry.
But if we remove the seek operation, that will cause a EOF exception from
the readAsync method. This PR adds a limitation of the seek opeartion.

*Modifications*

Add available method in the backedInputStream to get know how many bytes
we can read from the stream.

(cherry picked from commit b4d05ac)
lhotari pushed a commit to datastax/pulsar that referenced this pull request Apr 11, 2022
…#12123)

* Fix the potential race condition in the BlobStore readhandler
---

*Motivation*

We found the BlobStoreBackedReadHandler enter an infinite loop                                                                                          when reading a offload ledger.

We saw in the heap dump, there has two ledger 1 and 2, and we have
a consumer is reading ledger 1, but the heap shows the buffer it
reading is ledger 2. Then the read handler read a wrong entry id
and the entry id is out of range between the firstEntryId and the
lastEntryId, it will try to keep seeking to the right position, so
that it enter an infinite loop.

The buffer in the `BlobStoreBackedInputStreamImpl` is a wrong buffer,
so the read handler can not read a right entry from it. After investigating,
the buffer used in the `BlobStoreBackedInputStreamImpl` is allocated from
`PulsarByteBufAllocator.DEFAULT.buffer(bufferSize, bufferSize)`, by default,
we used a `PooledByteBufAllocator.DEFAULT` to allocate memory for that buffer,
a weird thing we found is the closed buffer still can read data if there has
new buffer allocated and write things.
This is the test code:
```
ByteBuf buf = PulsarByteBufAllocator.DEFAULT.buffer(1024, 1024);
buf.writeByte(1);
System.out.println(buf.readByte()); // print 1
buf.release();
//System.out.println(buf.readByte()); // will throw exception
ByteBuf newBuf = PulsarByteBufAllocator.DEFAULT.buffer(1024, 1024);
newBuf.writeByte(2);
System.out.println(buf.readByte()); // print 2
newBuf.release();
//System.out.println(buf.readByte());
```

So we suspect there has a race condition between the read and close
operation. That cause there has a thread start reading after
the read handler closed.

*Modifications*

- Add a state check before reading entries.
- exit loop when the entryID is bigger than the lastEntryID

(cherry picked from commit 3d7c3f8)
lhotari pushed a commit to datastax/pulsar that referenced this pull request Apr 11, 2022
---

*Motivation*

In the apache#12123, I add the seek operation at the readAsync method.
It makes sure the data stream always seek to the first entry position
to read and will not introduce EOF exception.
But in the offload index entry, it groups a set of entries into a range,
the seek operation will seek the posistion to the first entry in the range.
That will introduce a performance issue because every read opeartion will
read from the first entry in the range until it find the actual first read
entry.
But if we remove the seek operation, that will cause a EOF exception from
the readAsync method. This PR adds a limitation of the seek opeartion.

*Modifications*

Add available method in the backedInputStream to get know how many bytes
we can read from the stream.

(cherry picked from commit b4d05ac)
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
area/tieredstorage cherry-picked/branch-2.7 Archived: 2.7 is end of life cherry-picked/branch-2.8 Archived: 2.8 is end of life doc-not-needed Your PR changes do not impact docs release/2.7.4 release/2.8.2 type/bug The PR fixed a bug or issue reported a bug
Projects
None yet
Development

Successfully merging this pull request may close these issues.

6 participants