Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Issue 550: add readLastAddConfirmedAndEntry in ReadHandle for long poll read #729

Closed
wants to merge 6 commits into from
Closed
Show file tree
Hide file tree
Changes from 3 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -51,14 +51,13 @@
import org.apache.bookkeeper.client.AsyncCallback.ReadLastConfirmedCallback;
import org.apache.bookkeeper.client.BookKeeper.DigestType;
import org.apache.bookkeeper.client.SyncCallbackUtils.FutureReadLastConfirmed;
import org.apache.bookkeeper.client.SyncCallbackUtils.FutureReadLastConfirmedAndEntry;
import org.apache.bookkeeper.client.SyncCallbackUtils.FutureReadResult;
import org.apache.bookkeeper.client.SyncCallbackUtils.SyncAddCallback;
import org.apache.bookkeeper.client.SyncCallbackUtils.SyncCloseCallback;
import org.apache.bookkeeper.client.SyncCallbackUtils.SyncReadCallback;
import org.apache.bookkeeper.client.SyncCallbackUtils.SyncReadLastConfirmedCallback;
import org.apache.bookkeeper.client.api.WriteAdvHandle;
import org.apache.bookkeeper.client.api.WriteHandle;
import org.apache.bookkeeper.common.concurrent.FutureUtils;
import org.apache.bookkeeper.net.BookieSocketAddress;
import org.apache.bookkeeper.proto.BookieProtocol;
import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks;
Expand Down Expand Up @@ -965,7 +964,7 @@ public void readLastConfirmedDataComplete(int rc, DigestManager.RecoveryData dat
/**
* Obtains asynchronously the last confirmed write from a quorum of bookies.
* It is similar as
* {@link #asyncTryReadLastConfirmed(org.apache.bookkeeper.client.AsyncCallback.ReadLastConfirmedCallback, Object)},
* {@link #asyncReadLastConfirmed(org.apache.bookkeeper.client.AsyncCallback.ReadLastConfirmedCallback, Object)},
* but it doesn't wait all the responses from the quorum. It would callback
* immediately if it received a LAC which is larger than current LAC.
*
Expand Down Expand Up @@ -1027,6 +1026,18 @@ public CompletableFuture<Long> readLastAddConfirmed() {
return result;
}

/**
* @{@inheritDoc }
*/
@Override
public CompletableFuture<LastConfirmedAndEntry> readLastAddConfirmedAndEntry(long entryId,
long timeOutInMillis,
boolean parallel) {
FutureReadLastConfirmedAndEntry result = new FutureReadLastConfirmedAndEntry();
asyncReadLastConfirmedAndEntry(entryId, timeOutInMillis, parallel, result, null);
return result;
}

/**
* Asynchronous read next entry and the latest last add confirmed.
* If the next entryId is less than known last add confirmed, the call will read next entry directly.
Expand Down Expand Up @@ -1083,7 +1094,8 @@ public void readComplete(int rc, LedgerHandle lh, Enumeration<LedgerEntry> seq,
return;
}
// wait for entry <i>entryId</i>
ReadLastConfirmedAndEntryOp.LastConfirmedAndEntryCallback innercb = new ReadLastConfirmedAndEntryOp.LastConfirmedAndEntryCallback() {
ReadLastConfirmedAndEntryOp.LastConfirmedAndEntryCallback innercb =
new ReadLastConfirmedAndEntryOp.LastConfirmedAndEntryCallback() {
AtomicBoolean completed = new AtomicBoolean(false);
@Override
public void readLastConfirmedAndEntryComplete(int rc, long lastAddConfirmed, LedgerEntry entry) {
Expand All @@ -1098,7 +1110,13 @@ public void readLastConfirmedAndEntryComplete(int rc, long lastAddConfirmed, Led
}
}
};
new ReadLastConfirmedAndEntryOp(this, innercb, entryId - 1, timeOutInMillis, bk.getScheduler()).parallelRead(parallel).initiate();
new ReadLastConfirmedAndEntryOp(this,
innercb,
entryId - 1,
timeOutInMillis,
bk.getScheduler())
.parallelRead(parallel)
.initiate();
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,12 +17,12 @@
*/
package org.apache.bookkeeper.client;

import static org.apache.bookkeeper.client.LedgerHandle.LOG;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it is not needed.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. It is originally there, this change adjust the order of importing, will handle it.


import com.google.common.collect.Iterators;
import java.util.Enumeration;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ExecutionException;
import static org.apache.bookkeeper.client.LedgerHandle.LOG;
import org.apache.bookkeeper.client.api.Handle;
import org.apache.bookkeeper.client.api.ReadHandle;

/**
Expand Down Expand Up @@ -266,7 +266,8 @@ public void addComplete(int rc, LedgerHandle lh, long entry, Object ctx) {
}
}

static class FutureReadLastConfirmed extends CompletableFuture<Long> implements AsyncCallback.ReadLastConfirmedCallback {
static class FutureReadLastConfirmed extends CompletableFuture<Long>
implements AsyncCallback.ReadLastConfirmedCallback {

@Override
public void readLastConfirmedComplete(int rc, long lastConfirmed, Object ctx) {
Expand Down Expand Up @@ -312,4 +313,14 @@ public void closeComplete(int rc, LedgerHandle lh, Object ctx) {
}
}

static class FutureReadLastConfirmedAndEntry
extends CompletableFuture<ReadHandle.LastConfirmedAndEntry> implements AsyncCallback.ReadLastConfirmedAndEntryCallback {

@Override
public void readLastConfirmedAndEntryComplete(int rc, long lastConfirmed, LedgerEntry entry, Object ctx) {
ReadHandle.LastConfirmedAndEntry result = new ReadHandle.LastConfirmedAndEntry(lastConfirmed, entry);
finish(rc, result, this);
}
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@
package org.apache.bookkeeper.client.api;

import java.util.concurrent.CompletableFuture;
import lombok.Data;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you don't this import

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

right. will remove it.


/**
* Provide read access to a ledger.
Expand Down Expand Up @@ -111,4 +112,32 @@ public interface ReadHandle extends Handle {
*/
long getLength();

/**
* The type contains LAC and a LedgerEntry want to read.
* It is used for readLastAddConfirmedAndEntry.
*/
@Data
class LastConfirmedAndEntry {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I think it is better that this class is not an inner class but a top level class

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move it out of ReadHandle.java, make it an interface

interface LastConfirmedAndEntry extends AutoCloseable {

     long getLastAddConfirmed();

     boolean hasEntry();

     LedgerEntry getEntry();

}

create a class LastConfirmedAndEntryImpl to implement the interface.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks @eolivelli @sijie , will change it.

private final Long lac;
private final LedgerEntry entry;
}

/**
* Asynchronous read specific entry and the latest last add confirmed.
* If the next entryId is less than known last add confirmed, the call will read next entry directly.
* If the next entryId is ahead of known last add confirmed, the call will issue a long poll read
* to wait for the next entry <i>entryId</i>.
*
* @param entryId
* next entry id to read
* @param timeOutInMillis
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

in case of THIS timeout the CompletableFuture will return null ? or is will be completed exceptionally ?

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it will return null entry and latest lac on timeout.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes. In LedgerHandle, most of the similar situation will return null.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a line which explains? That was my real request, sorry

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we add a line which explains? That was my real request, sorry

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks. sure will add it.

* timeout period to wait for the entry id to be available (for long poll only)
* @param parallel
* whether to issue the long poll reads in parallel
* @return an handle to the result of the operation
*/
CompletableFuture<LastConfirmedAndEntry> readLastAddConfirmedAndEntry(long entryId,
long timeOutInMillis,
boolean parallel);

}
Original file line number Diff line number Diff line change
Expand Up @@ -203,6 +203,12 @@ public void testOpenLedgerRead() throws Exception {
assertEquals(2, result(reader.tryReadLastAddConfirmed()).intValue());
checkEntries(result(reader.read(0, reader.getLastAddConfirmed())), data);
checkEntries(result(reader.readUnconfirmed(0, reader.getLastAddConfirmed())), data);

// test readLastAddConfirmedAndEntry
ReadHandle.LastConfirmedAndEntry lastConfirmedAndEntry =
result(reader.readLastAddConfirmedAndEntry(0, 999, false));
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We have a 999 timeout less then 1 second. This test could be flaky.
Maybe we can do a loop until the condition is verified

Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@eolivelli Thanks.
This is only for test the interface works, There are other tests for the functionality of asyncReadLastConfirmedAndEntry().
Here it read an old entry, which has already been there, so ideally it need no wait time.

assertEquals(2, lastConfirmedAndEntry.getLac().intValue());
assertArrayEquals(data, lastConfirmedAndEntry.getEntry().getEntry());
}
}

Expand Down