4545import org .apache .bookkeeper .mledger .offload .jcloud .BackedInputStream ;
4646import org .apache .bookkeeper .mledger .offload .jcloud .OffloadIndexBlock ;
4747import org .apache .bookkeeper .mledger .offload .jcloud .OffloadIndexBlockBuilder ;
48+ import org .apache .bookkeeper .mledger .offload .jcloud .OffloadIndexEntry ;
4849import org .apache .bookkeeper .mledger .offload .jcloud .impl .DataBlockUtils .VersionCheck ;
4950import org .apache .pulsar .common .allocator .PulsarByteBufAllocator ;
5051import org .apache .pulsar .common .naming .TopicName ;
@@ -79,7 +80,8 @@ enum State {
7980
8081 private long lastAccessTimestamp = System .currentTimeMillis ();
8182
82- private BlobStoreBackedReadHandleImpl (long ledgerId , OffloadIndexBlock index ,
83+ @ VisibleForTesting
84+ BlobStoreBackedReadHandleImpl (long ledgerId , OffloadIndexBlock index ,
8385 BackedInputStream inputStream , ExecutorService executor ,
8486 OffsetsCache entryOffsetsCache ) {
8587 this .ledgerId = ledgerId ;
@@ -121,108 +123,212 @@ public CompletableFuture<Void> closeAsync() {
121123 return promise ;
122124 }
123125
124- @ Override
125- public CompletableFuture <LedgerEntries > readAsync (long firstEntry , long lastEntry ) {
126- if (log .isDebugEnabled ()) {
127- log .debug ("Ledger {}: reading {} - {} ({} entries}" ,
128- getId (), firstEntry , lastEntry , (1 + lastEntry - firstEntry ));
126+ private class ReadTask implements Runnable {
127+ private final long firstEntry ;
128+ private final long lastEntry ;
129+ private final CompletableFuture <LedgerEntries > promise ;
130+ private int seekedAndTryTimes = 0 ;
131+
132+ public ReadTask (long firstEntry , long lastEntry , CompletableFuture <LedgerEntries > promise ) {
133+ this .firstEntry = firstEntry ;
134+ this .lastEntry = lastEntry ;
135+ this .promise = promise ;
129136 }
130- CompletableFuture <LedgerEntries > promise = new CompletableFuture <>();
131137
132- // Ledger handles will be only marked idle when "pendingRead" is "0", it is not needed to update
133- // "lastAccessTimestamp" if "pendingRead" is larger than "0".
134- // Rather than update "lastAccessTimestamp" when starts a reading, updating it when a reading task is finished
135- // is better.
136- PENDING_READ_UPDATER .incrementAndGet (this );
137- promise .whenComplete ((__ , ex ) -> {
138- lastAccessTimestamp = System .currentTimeMillis ();
139- PENDING_READ_UPDATER .decrementAndGet (BlobStoreBackedReadHandleImpl .this );
140- });
141- executor .execute (() -> {
138+ @ Override
139+ public void run () {
142140 if (state == State .Closed ) {
143141 log .warn ("Reading a closed read handler. Ledger ID: {}, Read range: {}-{}" ,
144142 ledgerId , firstEntry , lastEntry );
145143 promise .completeExceptionally (new ManagedLedgerException .OffloadReadHandleClosedException ());
146144 return ;
147145 }
148146
149- List <LedgerEntry > entries = new ArrayList <LedgerEntry >();
150- boolean seeked = false ;
147+ List <LedgerEntry > entryCollector = new ArrayList <LedgerEntry >();
151148 try {
152149 if (firstEntry > lastEntry
153- || firstEntry < 0
154- || lastEntry > getLastAddConfirmed ()) {
150+ || firstEntry < 0
151+ || lastEntry > getLastAddConfirmed ()) {
155152 promise .completeExceptionally (new BKException .BKIncorrectParameterException ());
156153 return ;
157154 }
158155 long entriesToRead = (lastEntry - firstEntry ) + 1 ;
159- long nextExpectedId = firstEntry ;
160-
161- // checking the data stream has enough data to read to avoid throw EOF exception when reading data.
162- // 12 bytes represent the stream have the length and entryID to read.
163- if (dataStream .available () < 12 ) {
164- log .warn ("There hasn't enough data to read, current available data has {} bytes,"
165- + " seek to the first entry {} to avoid EOF exception" , inputStream .available (), firstEntry );
166- seekToEntry (firstEntry );
167- }
156+ long expectedEntryId = firstEntry ;
157+ seekToEntryOffset (firstEntry );
158+ seekedAndTryTimes ++;
168159
169160 while (entriesToRead > 0 ) {
170161 long currentPosition = inputStream .getCurrentPosition ();
171162 int length = dataStream .readInt ();
172163 if (length < 0 ) { // hit padding or new block
173- seekToEntry ( nextExpectedId );
164+ seekToEntryOffset ( expectedEntryId );
174165 continue ;
175166 }
176167 long entryId = dataStream .readLong ();
177-
178- if (entryId == nextExpectedId ) {
168+ if (entryId == expectedEntryId ) {
179169 entryOffsetsCache .put (ledgerId , entryId , currentPosition );
180170 ByteBuf buf = PulsarByteBufAllocator .DEFAULT .buffer (length , length );
181- entries .add (LedgerEntryImpl .create (ledgerId , entryId , length , buf ));
171+ entryCollector .add (LedgerEntryImpl .create (ledgerId , entryId , length , buf ));
182172 int toWrite = length ;
183173 while (toWrite > 0 ) {
184174 toWrite -= buf .writeBytes (dataStream , toWrite );
185175 }
186176 entriesToRead --;
187- nextExpectedId ++;
188- } else if (entryId > nextExpectedId && entryId < lastEntry ) {
189- log .warn ("The read entry {} is not the expected entry {} but in the range of {} - {},"
190- + " seeking to the right position" , entryId , nextExpectedId , nextExpectedId , lastEntry );
191- seekToEntry (nextExpectedId );
192- } else if (entryId < nextExpectedId
193- && !index .getIndexEntryForEntry (nextExpectedId ).equals (index .getIndexEntryForEntry (entryId ))) {
194- log .warn ("Read an unexpected entry id {} which is smaller than the next expected entry id {}"
195- + ", seeking to the right position" , entryId , nextExpectedId );
196- seekToEntry (nextExpectedId );
197- } else if (entryId > lastEntry ) {
198- // in the normal case, the entry id should increment in order. But if there has random access in
199- // the read method, we should allow to seek to the right position and the entry id should
200- // never over to the last entry again.
201- if (!seeked ) {
202- seekToEntry (nextExpectedId );
203- seeked = true ;
204- continue ;
205- }
206- log .info ("Expected to read {}, but read {}, which is greater than last entry {}" ,
207- nextExpectedId , entryId , lastEntry );
208- throw new BKException .BKUnexpectedConditionException ();
177+ expectedEntryId ++;
209178 } else {
210- long ignore = inputStream . skip ( length );
179+ handleUnexpectedEntryId ( expectedEntryId , entryId );
211180 }
212181 }
213-
214- promise .complete (LedgerEntriesImpl .create (entries ));
182+ promise .complete (LedgerEntriesImpl .create (entryCollector ));
215183 } catch (Throwable t ) {
216- log .error ("Failed to read entries {} - {} from the offloader in ledger {}" ,
217- firstEntry , lastEntry , ledgerId , t );
184+ log .error ("Failed to read entries {} - {} from the offloader in ledger {}, current position of input"
185+ + " stream is {}" , firstEntry , lastEntry , ledgerId , inputStream . getCurrentPosition () , t );
218186 if (t instanceof KeyNotFoundException ) {
219187 promise .completeExceptionally (new BKException .BKNoSuchLedgerExistsException ());
220188 } else {
221189 promise .completeExceptionally (t );
222190 }
223- entries .forEach (LedgerEntry ::close );
191+ entryCollector .forEach (LedgerEntry ::close );
192+ }
193+ }
194+
195+ // in the normal case, the entry id should increment in order. But if there has random access in
196+ // the read method, we should allow to seek to the right position and the entry id should
197+ // never over to the last entry again.
198+ private void handleUnexpectedEntryId (long expectedId , long actEntryId ) throws Exception {
199+ LedgerMetadata ledgerMetadata = getLedgerMetadata ();
200+ OffloadIndexEntry offsetOfExpectedId = index .getIndexEntryForEntry (expectedId );
201+ OffloadIndexEntry offsetOfActId = actEntryId <= getLedgerMetadata ().getLastEntryId () && actEntryId >= 0
202+ ? index .getIndexEntryForEntry (actEntryId ) : null ;
203+ String logLine = String .format ("Failed to read [ %s ~ %s ] of the ledger %s."
204+ + " Because got a incorrect entry id %s, the offset is %s."
205+ + " The expected entry id is %s, the offset is %s."
206+ + " Have seeked and retry read times: %s. LAC is %s." ,
207+ firstEntry , lastEntry , ledgerId ,
208+ actEntryId , offsetOfActId == null ? "null because it does not exist"
209+ : String .valueOf (offsetOfActId ),
210+ expectedId , String .valueOf (offsetOfExpectedId ),
211+ seekedAndTryTimes , ledgerMetadata != null ? ledgerMetadata .getLastEntryId () : "unknown" );
212+ // If it still fails after tried entries count times, throw the exception.
213+ long maxTryTimes = Math .max (3 , (lastEntry - firstEntry + 1 ) >> 2 );
214+ if (seekedAndTryTimes > maxTryTimes ) {
215+ log .error (logLine );
216+ throw new BKException .BKUnexpectedConditionException ();
217+ } else {
218+ log .warn (logLine );
219+ }
220+ seekToEntryOffset (expectedId );
221+ seekedAndTryTimes ++;
222+ }
223+
224+ private void skipPreviousEntry (long startEntryId , long expectedEntryId ) throws IOException , BKException {
225+ long nextExpectedEntryId = startEntryId ;
226+ while (nextExpectedEntryId < expectedEntryId ) {
227+ long offset = inputStream .getCurrentPosition ();
228+ int len = dataStream .readInt ();
229+ if (len < 0 ) {
230+ LedgerMetadata ledgerMetadata = getLedgerMetadata ();
231+ OffloadIndexEntry offsetOfExpectedId = index .getIndexEntryForEntry (expectedEntryId );
232+ log .error ("Failed to read [ {} ~ {} ] of the ledger {}."
233+ + " Because failed to skip a previous entry {}, len: {}, got a negative len."
234+ + " The expected entry id is {}, the offset is {}."
235+ + " Have seeked and retry read times: {}. LAC is {}." ,
236+ firstEntry , lastEntry , ledgerId ,
237+ nextExpectedEntryId , len ,
238+ expectedEntryId , String .valueOf (offsetOfExpectedId ),
239+ seekedAndTryTimes , ledgerMetadata != null ? ledgerMetadata .getLastEntryId () : "unknown" );
240+ throw new BKException .BKUnexpectedConditionException ();
241+ }
242+ long entryId = dataStream .readLong ();
243+ if (entryId == nextExpectedEntryId ) {
244+ long skipped = inputStream .skip (len );
245+ if (skipped != len ) {
246+ LedgerMetadata ledgerMetadata = getLedgerMetadata ();
247+ OffloadIndexEntry offsetOfExpectedId = index .getIndexEntryForEntry (expectedEntryId );
248+ log .error ("Failed to read [ {} ~ {} ] of the ledger {}."
249+ + " Because failed to skip a previous entry {}, offset: {}, len: {}, there is no more data."
250+ + " The expected entry id is {}, the offset is {}."
251+ + " Have seeked and retry read times: {}. LAC is {}." ,
252+ firstEntry , lastEntry , ledgerId ,
253+ entryId , offset , len ,
254+ expectedEntryId , String .valueOf (offsetOfExpectedId ),
255+ seekedAndTryTimes , ledgerMetadata != null ? ledgerMetadata .getLastEntryId () : "unknown" );
256+ throw new BKException .BKUnexpectedConditionException ();
257+ }
258+ nextExpectedEntryId ++;
259+ } else {
260+ LedgerMetadata ledgerMetadata = getLedgerMetadata ();
261+ OffloadIndexEntry offsetOfExpectedId = index .getIndexEntryForEntry (expectedEntryId );
262+ log .error ("Failed to read [ {} ~ {} ] of the ledger {}."
263+ + " Because got a incorrect entry id {},."
264+ + " The expected entry id is {}, the offset is {}."
265+ + " Have seeked and retry read times: {}. LAC is {}." ,
266+ firstEntry , lastEntry , ledgerId ,
267+ entryId , expectedEntryId , String .valueOf (offsetOfExpectedId ),
268+ seekedAndTryTimes , ledgerMetadata != null ? ledgerMetadata .getLastEntryId () : "unknown" );
269+ throw new BKException .BKUnexpectedConditionException ();
270+ }
271+ }
272+ }
273+
274+ private void seekToEntryOffset (long expectedEntryId ) throws IOException , BKException {
275+ // 1. Try to find the precise index.
276+ // 1-1. Precise cached indexes.
277+ Long cachedPreciseIndex = entryOffsetsCache .getIfPresent (ledgerId , expectedEntryId );
278+ if (cachedPreciseIndex != null ) {
279+ inputStream .seek (cachedPreciseIndex );
280+ return ;
281+ }
282+ // 1-2. Precise persistent indexes.
283+ OffloadIndexEntry indexOfNearestEntry = index .getIndexEntryForEntry (expectedEntryId );
284+ if (indexOfNearestEntry .getEntryId () == expectedEntryId ) {
285+ inputStream .seek (indexOfNearestEntry .getDataOffset ());
286+ return ;
224287 }
288+ // 2. Try to use the previous index. Since the entry-0 must have a precise index, we can skip to check
289+ // whether "expectedEntryId" is larger than 0;
290+ Long cachedPreviousKnownOffset = entryOffsetsCache .getIfPresent (ledgerId , expectedEntryId - 1 );
291+ if (cachedPreviousKnownOffset != null ) {
292+ inputStream .seek (cachedPreviousKnownOffset );
293+ skipPreviousEntry (expectedEntryId - 1 , expectedEntryId );
294+ return ;
295+ }
296+ // 3. Use the persistent index of the nearest entry that is smaller than "expectedEntryId".
297+ // Because it is a sparse index, some entries need to be skipped.
298+ if (indexOfNearestEntry .getEntryId () < expectedEntryId ) {
299+ inputStream .seek (indexOfNearestEntry .getDataOffset ());
300+ skipPreviousEntry (indexOfNearestEntry .getEntryId (), expectedEntryId );
301+ } else {
302+ LedgerMetadata ledgerMetadata = getLedgerMetadata ();
303+ log .error ("Failed to read [ {} ~ {} ] of the ledger {}."
304+ + " Because got a incorrect index {} of the entry {}, which is greater than expected."
305+ + " Have seeked and retry read times: {}. LAC is {}." ,
306+ firstEntry , lastEntry , ledgerId ,
307+ String .valueOf (indexOfNearestEntry ), expectedEntryId ,
308+ seekedAndTryTimes , ledgerMetadata != null ? ledgerMetadata .getLastEntryId () : "unknown" );
309+ throw new BKException .BKUnexpectedConditionException ();
310+ }
311+ }
312+ }
313+
314+ @ Override
315+ public CompletableFuture <LedgerEntries > readAsync (long firstEntry , long lastEntry ) {
316+ if (log .isDebugEnabled ()) {
317+ log .debug ("Ledger {}: reading {} - {} ({} entries}" ,
318+ getId (), firstEntry , lastEntry , (1 + lastEntry - firstEntry ));
319+ }
320+ CompletableFuture <LedgerEntries > promise = new CompletableFuture <>();
321+
322+ // Ledger handles will be only marked idle when "pendingRead" is "0", it is not needed to update
323+ // "lastAccessTimestamp" if "pendingRead" is larger than "0".
324+ // Rather than update "lastAccessTimestamp" when starts a reading, updating it when a reading task is finished
325+ // is better.
326+ PENDING_READ_UPDATER .incrementAndGet (this );
327+ promise .whenComplete ((__ , ex ) -> {
328+ lastAccessTimestamp = System .currentTimeMillis ();
329+ PENDING_READ_UPDATER .decrementAndGet (BlobStoreBackedReadHandleImpl .this );
225330 });
331+ executor .execute (new ReadTask (firstEntry , lastEntry , promise ));
226332 return promise ;
227333 }
228334
@@ -238,6 +344,11 @@ private void seekToEntry(long nextExpectedId) throws IOException {
238344 }
239345 }
240346
347+ private void seekToEntry (OffloadIndexEntry offloadIndexEntry ) throws IOException {
348+ long dataOffset = offloadIndexEntry .getDataOffset ();
349+ inputStream .seek (dataOffset );
350+ }
351+
241352 @ Override
242353 public CompletableFuture <LedgerEntries > readUnconfirmedAsync (long firstEntry , long lastEntry ) {
243354 return readAsync (firstEntry , lastEntry );
0 commit comments