From e2b436ba5f7f3c4ff2d3035ce30ce77faf0f316c Mon Sep 17 00:00:00 2001 From: Venkateswara Date: Sat, 19 Sep 2015 15:27:35 -0700 Subject: [PATCH] BOOKKEEPER-873: CreateLedgerAPI to accept ledgerId Add ledgerCreateAdv with ledgerId interface to Bookkeeper and corresponding Junit tests. Reviewed-by: Tony Menges Reviewed-by: Charan Reddy Guttapalem Signed-off-by: Venkateswararao Jujjuri (JV) Signed-off-by: Charan Reddy Guttapalem --- .../apache/bookkeeper/client/BookKeeper.java | 109 +++++++++++++++++- .../bookkeeper/client/LedgerCreateOp.java | 26 +++-- .../bookkeeper/meta/FlatLedgerManager.java | 2 +- .../apache/bookkeeper/util/StringUtils.java | 10 ++ .../client/BookieWriteLedgerTest.java | 44 +++++++ 5 files changed, 180 insertions(+), 11 deletions(-) diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java index 08c24b0f029..2f8a0b8d59a 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/BookKeeper.java @@ -738,7 +738,114 @@ public void asyncCreateLedgerAdv(final int ensSize, final int writeQuorumSize, f return; } new LedgerCreateOp(BookKeeper.this, ensSize, writeQuorumSize, - ackQuorumSize, digestType, passwd, cb, ctx, customMetadata).initiateAdv(); + ackQuorumSize, digestType, passwd, cb, ctx, customMetadata).initiateAdv((long)(-1)); + } finally { + closeLock.readLock().unlock(); + } + } + + /** + * Synchronously creates a new ledger using the interface which accepts a ledgerId as input. + * This method returns {@link LedgerHandleAdv} which can accept entryId. + * Parameters must match those of + * {@link #asyncCreateLedgerAdvWithLedgerId(byte[], long, int, int, int, DigestType, byte[], + * AsyncCallback.CreateCallback, Object)} + * @param ledgerId + * @param ensSize + * @param writeQuorumSize + * @param ackQuorumSize + * @param digestType + * @param passwd + * @param customMetadata + * @return a handle to the newly created ledger + * @throws InterruptedException + * @throws BKException + */ + public LedgerHandle createLedgerAdv(final long ledgerId, + int ensSize, + int writeQuorumSize, + int ackQuorumSize, + DigestType digestType, + byte passwd[], + final Map customMetadata) throws InterruptedException, BKException{ + CompletableFuture counter = new CompletableFuture<>(); + + /* + * Calls asynchronous version + */ + asyncCreateLedgerAdv(ledgerId, ensSize, writeQuorumSize, ackQuorumSize, digestType, passwd, + new SyncCreateCallback(), counter, customMetadata); + + LedgerHandle lh = SynchCallbackUtils.waitForResult(counter); + if (lh == null) { + LOG.error("Unexpected condition : no ledger handle returned for a success ledger creation"); + throw BKException.create(BKException.Code.UnexpectedConditionException); + } else if (ledgerId != lh.getId()) { + LOG.error("Unexpected condition : Expected ledgerId: {} but got: {}", ledgerId, lh.getId()); + throw BKException.create(BKException.Code.UnexpectedConditionException); + } + + LOG.info("Ensemble: {} for ledger: {}", lh.getLedgerMetadata().getEnsemble(0L), + lh.getId()); + + return lh; + } + + /** + * Asynchronously creates a new ledger using the interface which accepts a ledgerId as input. + * This method returns {@link LedgerHandleAdv} which can accept entryId. + * Ledgers created with this call have ability to accept + * a separate write quorum and ack quorum size. The write quorum must be larger than + * the ack quorum. + * + * Separating the write and the ack quorum allows the BookKeeper client to continue + * writing when a bookie has failed but the failure has not yet been detected. Detecting + * a bookie has failed can take a number of seconds, as configured by the read timeout + * {@link ClientConfiguration#getReadTimeout()}. Once the bookie failure is detected, + * that bookie will be removed from the ensemble. + * + * The other parameters match those of {@link #asyncCreateLedger(long, int, int, DigestType, byte[], + * AsyncCallback.CreateCallback, Object)} + * + * @param ledgerId + * ledger Id to use for the newly created ledger + * @param ensSize + * number of bookies over which to stripe entries + * @param writeQuorumSize + * number of bookies each entry will be written to + * @param ackQuorumSize + * number of bookies which must acknowledge an entry before the call is completed + * @param digestType + * digest type, either MAC or CRC32 + * @param passwd + * password + * @param cb + * createCallback implementation + * @param ctx + * optional control object + * @param customMetadata + * optional customMetadata that holds user specified metadata + */ + public void asyncCreateLedgerAdv(final long ledgerId, + final int ensSize, + final int writeQuorumSize, + final int ackQuorumSize, + final DigestType digestType, + final byte[] passwd, + final CreateCallback cb, + final Object ctx, + final Map customMetadata) { + if (writeQuorumSize < ackQuorumSize) { + throw new IllegalArgumentException("Write quorum must be larger than ack quorum"); + } + closeLock.readLock().lock(); + try { + if (closed) { + cb.createComplete(BKException.Code.ClientClosedException, null, ctx); + return; + } + new LedgerCreateOp(BookKeeper.this, ensSize, writeQuorumSize, + ackQuorumSize, digestType, passwd, cb, ctx, customMetadata).initiateAdv(ledgerId); } finally { closeLock.readLock().unlock(); } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java index 52a5cb6255d..376d716bb4d 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/client/LedgerCreateOp.java @@ -48,7 +48,7 @@ class LedgerCreateOp implements GenericCallback { CreateCallback cb; LedgerMetadata metadata; LedgerHandle lh; - Long ledgerId; + Long ledgerId = -1L; Object ctx; byte[] passwd; BookKeeper bk; @@ -56,6 +56,7 @@ class LedgerCreateOp implements GenericCallback { long startTime; OpStatsLogger createOpLogger; boolean adv = false; + boolean generateLedgerId = true; /** * Constructor @@ -119,12 +120,16 @@ public void initiate() { * Add ensemble to the configuration */ metadata.addEnsemble(0L, ensemble); - - createLedger(); + if (this.generateLedgerId) { + generateLedgerIdAndCreateLedger(); + } else { + // Create ledger with supplied ledgerId + bk.getLedgerManager().createLedgerMetadata(ledgerId, metadata, LedgerCreateOp.this); + } } - void createLedger() { - // generate a ledger id and then create the ledger with metadata + void generateLedgerIdAndCreateLedger() { + // generate a ledgerId final LedgerIdGenerator ledgerIdGenerator = bk.getLedgerIdGenerator(); ledgerIdGenerator.generateLedgerId(new GenericCallback() { @Override @@ -133,7 +138,6 @@ public void operationComplete(int rc, Long ledgerId) { createComplete(rc, null); return; } - LedgerCreateOp.this.ledgerId = ledgerId; // create a ledger with metadata bk.getLedgerManager().createLedgerMetadata(ledgerId, metadata, LedgerCreateOp.this); @@ -144,8 +148,12 @@ public void operationComplete(int rc, Long ledgerId) { /** * Initiates the operation to return LedgerHandleAdv. */ - public void initiateAdv() { + public void initiateAdv(final long ledgerId) { this.adv = true; + this.ledgerId = ledgerId; + if (this.ledgerId != -1L) { + this.generateLedgerId = false; + } initiate(); } @@ -154,9 +162,9 @@ public void initiateAdv() { */ @Override public void operationComplete(int rc, Void result) { - if (BKException.Code.LedgerExistException == rc) { + if (this.generateLedgerId && (BKException.Code.LedgerExistException == rc)) { // retry to generate a new ledger id - createLedger(); + generateLedgerIdAndCreateLedger(); return; } else if (BKException.Code.OK != rc) { createComplete(rc, null); diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java index 3172247e60b..6c7caba628b 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/meta/FlatLedgerManager.java @@ -64,7 +64,7 @@ public FlatLedgerManager(AbstractConfiguration conf, ZooKeeper zk) { public String getLedgerPath(long ledgerId) { StringBuilder sb = new StringBuilder(); sb.append(ledgerPrefix) - .append(StringUtils.getZKStringId(ledgerId)); + .append(StringUtils.getZKStringIdForFlat(ledgerId)); return sb.toString(); } diff --git a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/StringUtils.java b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/StringUtils.java index bea03721dae..205cc6bbbdb 100644 --- a/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/StringUtils.java +++ b/bookkeeper-server/src/main/java/org/apache/bookkeeper/util/StringUtils.java @@ -40,6 +40,16 @@ public static String getZKStringId(long id) { return String.format("%010d", id); } + /** + * Formats ledger ID according to ZooKeeper rules + * + * @param id + * znode id + */ + public static String getZKStringIdForFlat(long id) { + return String.format("%019d", id); + } + /** * Get the hierarchical ledger path according to the ledger id * diff --git a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java index a5fbe24ecac..c740cbf1807 100644 --- a/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java +++ b/bookkeeper-server/src/test/java/org/apache/bookkeeper/client/BookieWriteLedgerTest.java @@ -176,6 +176,50 @@ public void testLedgerCreateAdv() throws Exception { lh.close(); } + /** + * Verify the functionality of Advanced Ledger which accepts ledgerId as input and returns + * LedgerHandleAdv. LedgerHandleAdv takes entryId for addEntry, and let + * user manage entryId allocation. + * + * @throws Exception + */ + @Test(timeout = 60000) + public void testLedgerCreateAdvWithLedgerId() throws Exception { + // Create a ledger + long ledgerId = 0xABCDEF; + lh = bkc.createLedgerAdv(ledgerId, 5, 3, 2, digestType, ledgerPassword, null); + for (int i = 0; i < numEntriesToWrite; i++) { + ByteBuffer entry = ByteBuffer.allocate(4); + entry.putInt(rng.nextInt(maxInt)); + entry.position(0); + + entries1.add(entry.array()); + lh.addEntry(i, entry.array()); + } + // Start one more bookies + startNewBookie(); + + // Shutdown one bookie in the last ensemble and continue writing + ArrayList ensemble = lh.getLedgerMetadata().getEnsembles().entrySet().iterator().next() + .getValue(); + killBookie(ensemble.get(0)); + + int i = numEntriesToWrite; + numEntriesToWrite = numEntriesToWrite + 50; + for (; i < numEntriesToWrite; i++) { + ByteBuffer entry = ByteBuffer.allocate(4); + entry.putInt(rng.nextInt(maxInt)); + entry.position(0); + + entries1.add(entry.array()); + lh.addEntry(i, entry.array()); + } + + readEntries(lh, entries1); + lh.close(); + bkc.deleteLedger(ledgerId); + } + /** * Verify the functionality of Ledger create which accepts customMetadata as input. * Also verifies that the data written is read back properly.