From 2f25d9fead78d982920fe72720b5470a5cb949f7 Mon Sep 17 00:00:00 2001 From: Sijie Guo Date: Wed, 19 Jul 2017 16:33:46 -0700 Subject: [PATCH 1/9] Bump BK version to 4.5.0-SNAPSHOT - registerSuccessEvent and registerFailureEvent needs TimeUnit - change HashedWheelTimer to netty 4 - change channelFactory to eventLoopGroup --- .../common/stats/BroadCastStatsLogger.java | 26 +++++++++--- .../common/stats/OpStatsListener.java | 4 +- .../common/concurrent/TestFutureUtils.java | 6 ++- .../bookkeeper/client/LedgerReader.java | 31 +++++++------- .../distributedlog/BKAsyncLogReader.java | 33 ++++++++++----- .../apache/distributedlog/BKLogHandler.java | 17 ++++---- .../distributedlog/BKLogSegmentWriter.java | 20 ++++++--- .../distributedlog/BKLogWriteHandler.java | 22 +++++++--- .../distributedlog/BookKeeperClient.java | 30 +++++++------- .../BookKeeperClientBuilder.java | 27 +++++++----- .../impl/BKNamespaceDriver.java | 41 ++++++++++++------- .../limiter/ChainedRequestLimiter.java | 2 +- .../lock/ZKDistributedLock.java | 16 ++++++-- .../distributedlog/lock/ZKSessionLock.java | 8 +++- .../util/SimplePermitLimiter.java | 2 +- .../io/LZ4CompressionCodec.java | 6 +-- pom.xml | 10 +---- 17 files changed, 189 insertions(+), 112 deletions(-) diff --git a/distributedlog-common/src/main/java/org/apache/distributedlog/common/stats/BroadCastStatsLogger.java b/distributedlog-common/src/main/java/org/apache/distributedlog/common/stats/BroadCastStatsLogger.java index 61a20f190..6ae269b41 100644 --- a/distributedlog-common/src/main/java/org/apache/distributedlog/common/stats/BroadCastStatsLogger.java +++ b/distributedlog-common/src/main/java/org/apache/distributedlog/common/stats/BroadCastStatsLogger.java @@ -19,6 +19,7 @@ import static com.google.common.base.Preconditions.checkNotNull; +import java.util.concurrent.TimeUnit; import org.apache.bookkeeper.stats.CachingStatsLogger; import org.apache.bookkeeper.stats.Counter; import org.apache.bookkeeper.stats.Gauge; @@ -63,16 +64,29 @@ public OpStatsLogger getOpStatsLogger(final String statName) { final OpStatsLogger firstLogger = first.getOpStatsLogger(statName); final OpStatsLogger secondLogger = second.getOpStatsLogger(statName); return new OpStatsLogger() { + + @Override + public void registerFailedEvent(long l, TimeUnit timeUnit) { + firstLogger.registerFailedEvent(l, timeUnit); + secondLogger.registerFailedEvent(l, timeUnit); + } + + @Override + public void registerSuccessfulEvent(long l, TimeUnit timeUnit) { + firstLogger.registerSuccessfulEvent(l, timeUnit); + secondLogger.registerSuccessfulEvent(l, timeUnit); + } + @Override - public void registerFailedEvent(long l) { - firstLogger.registerFailedEvent(l); - secondLogger.registerFailedEvent(l); + public void registerSuccessfulValue(long l) { + firstLogger.registerSuccessfulValue(l); + secondLogger.registerSuccessfulValue(l); } @Override - public void registerSuccessfulEvent(long l) { - firstLogger.registerSuccessfulEvent(l); - secondLogger.registerSuccessfulEvent(l); + public void registerFailedValue(long l) { + firstLogger.registerFailedValue(l); + secondLogger.registerFailedValue(l); } @Override diff --git a/distributedlog-common/src/main/java/org/apache/distributedlog/common/stats/OpStatsListener.java b/distributedlog-common/src/main/java/org/apache/distributedlog/common/stats/OpStatsListener.java index e71a79929..4145b39ad 100644 --- a/distributedlog-common/src/main/java/org/apache/distributedlog/common/stats/OpStatsListener.java +++ b/distributedlog-common/src/main/java/org/apache/distributedlog/common/stats/OpStatsListener.java @@ -44,11 +44,11 @@ public OpStatsListener(OpStatsLogger opStatsLogger) { @Override public void onSuccess(T value) { - opStatsLogger.registerSuccessfulEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS)); + opStatsLogger.registerSuccessfulEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS); } @Override public void onFailure(Throwable cause) { - opStatsLogger.registerFailedEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS)); + opStatsLogger.registerFailedEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS); } } diff --git a/distributedlog-common/src/test/java/org/apache/distributedlog/common/concurrent/TestFutureUtils.java b/distributedlog-common/src/test/java/org/apache/distributedlog/common/concurrent/TestFutureUtils.java index ddfb7aed8..a887c59db 100644 --- a/distributedlog-common/src/test/java/org/apache/distributedlog/common/concurrent/TestFutureUtils.java +++ b/distributedlog-common/src/test/java/org/apache/distributedlog/common/concurrent/TestFutureUtils.java @@ -312,7 +312,8 @@ public void testStatsSuccess() throws Exception { Stopwatch.createStarted()); underlyFuture.complete(1234L); FutureUtils.result(statsFuture); - verify(statsLogger, times(1)).registerSuccessfulEvent(anyLong()); + verify(statsLogger, times(1)) + .registerSuccessfulEvent(anyLong(), eq(TimeUnit.MICROSECONDS)); } @Test @@ -325,7 +326,8 @@ public void testStatsFailure() throws Exception { Stopwatch.createStarted()); underlyFuture.completeExceptionally(new TestException()); FutureUtils.result(FutureUtils.ignore(statsFuture)); - verify(statsLogger, times(1)).registerFailedEvent(anyLong()); + verify(statsLogger, times(1)) + .registerFailedEvent(anyLong(), eq(TimeUnit.MICROSECONDS)); } @Test diff --git a/distributedlog-core/src/main/java/org/apache/bookkeeper/client/LedgerReader.java b/distributedlog-core/src/main/java/org/apache/bookkeeper/client/LedgerReader.java index 39a0f775f..dc9d09def 100644 --- a/distributedlog-core/src/main/java/org/apache/bookkeeper/client/LedgerReader.java +++ b/distributedlog-core/src/main/java/org/apache/bookkeeper/client/LedgerReader.java @@ -17,16 +17,9 @@ */ package org.apache.bookkeeper.client; -import org.apache.bookkeeper.net.BookieSocketAddress; -import org.apache.bookkeeper.proto.BookieClient; -import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback; -import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback; -import org.apache.bookkeeper.proto.BookkeeperProtocol; -import org.jboss.netty.buffer.ChannelBuffer; -import org.jboss.netty.buffer.ChannelBufferInputStream; -import org.slf4j.Logger; -import org.slf4j.LoggerFactory; - +import io.netty.buffer.ByteBuf; +import io.netty.buffer.ByteBufInputStream; +import io.netty.buffer.Unpooled; import java.io.InputStream; import java.net.InetSocketAddress; import java.util.ArrayList; @@ -36,6 +29,12 @@ import java.util.Set; import java.util.SortedMap; import java.util.concurrent.atomic.AtomicInteger; +import org.apache.bookkeeper.net.BookieSocketAddress; +import org.apache.bookkeeper.proto.BookieClient; +import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.GenericCallback; +import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks.ReadEntryCallback; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; /** * Reader used for DL tools to read entries @@ -93,17 +92,21 @@ public void readEntriesFromAllBookies(final LedgerHandle lh, long eid, final Set> readResults = new HashSet>(); ReadEntryCallback readEntryCallback = new ReadEntryCallback() { @Override - public void readEntryComplete(int rc, long lid, long eid, ChannelBuffer buffer, Object ctx) { + public void readEntryComplete(int rc, long lid, long eid, ByteBuf buffer, Object ctx) { BookieSocketAddress bookieAddress = (BookieSocketAddress) ctx; ReadResult rr; if (BKException.Code.OK != rc) { rr = new ReadResult(eid, rc, null, bookieAddress.getSocketAddress()); } else { + ByteBuf content; try { - ChannelBufferInputStream is = lh.macManager.verifyDigestAndReturnData(eid, buffer); - rr = new ReadResult(eid, BKException.Code.OK, is, bookieAddress.getSocketAddress()); + content = lh.macManager.verifyDigestAndReturnData(eid, buffer); + ByteBuf toRet = Unpooled.copiedBuffer(content); + rr = new ReadResult(eid, BKException.Code.OK, new ByteBufInputStream(toRet), bookieAddress.getSocketAddress()); } catch (BKException.BKDigestMatchException e) { rr = new ReadResult(eid, BKException.Code.DigestMatchException, null, bookieAddress.getSocketAddress()); + } finally { + buffer.release(); } } readResults.add(rr); @@ -184,7 +187,7 @@ public void readLacs(final LedgerHandle lh, long eid, final Set> readResults = new HashSet>(); ReadEntryCallback readEntryCallback = new ReadEntryCallback() { @Override - public void readEntryComplete(int rc, long lid, long eid, ChannelBuffer buffer, Object ctx) { + public void readEntryComplete(int rc, long lid, long eid, ByteBuf buffer, Object ctx) { InetSocketAddress bookieAddress = (InetSocketAddress) ctx; ReadResult rr; if (BKException.Code.OK != rc) { diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/BKAsyncLogReader.java b/distributedlog-core/src/main/java/org/apache/distributedlog/BKAsyncLogReader.java index 26a4a763a..5be9e192f 100644 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/BKAsyncLogReader.java +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/BKAsyncLogReader.java @@ -163,8 +163,10 @@ long elapsedSinceEnqueue(TimeUnit timeUnit) { void completeExceptionally(Throwable throwable) { Stopwatch stopwatch = Stopwatch.createStarted(); if (promise.completeExceptionally(throwable)) { - futureSetLatency.registerFailedEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS)); - delayUntilPromiseSatisfied.registerFailedEvent(enqueueTime.elapsed(TimeUnit.MICROSECONDS)); + futureSetLatency.registerFailedEvent( + stopwatch.stop().elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS); + delayUntilPromiseSatisfied.registerFailedEvent( + enqueueTime.elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS); } } @@ -191,10 +193,12 @@ void complete() { if (LOG.isTraceEnabled()) { LOG.trace("{} : Satisfied promise with {} records", readHandler.getFullyQualifiedName(), records.size()); } - delayUntilPromiseSatisfied.registerSuccessfulEvent(enqueueTime.stop().elapsed(TimeUnit.MICROSECONDS)); + delayUntilPromiseSatisfied.registerSuccessfulEvent( + enqueueTime.stop().elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS); Stopwatch stopwatch = Stopwatch.createStarted(); promise.complete(records); - futureSetLatency.registerSuccessfulEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS)); + futureSetLatency.registerSuccessfulEvent( + stopwatch.stop().elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS); } } @@ -396,7 +400,8 @@ public synchronized CompletableFuture> readBulk(int numE private synchronized CompletableFuture> readInternal(int numEntries, long deadlineTime, TimeUnit deadlineTimeUnit) { - timeBetweenReadNexts.registerSuccessfulEvent(readNextDelayStopwatch.elapsed(TimeUnit.MICROSECONDS)); + timeBetweenReadNexts.registerSuccessfulEvent( + readNextDelayStopwatch.elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS); readNextDelayStopwatch.reset().start(); final PendingReadRequest readRequest = new PendingReadRequest(numEntries, deadlineTime, deadlineTimeUnit); @@ -443,7 +448,8 @@ public void onFailure(Throwable cause) { } } - readNextExecTime.registerSuccessfulEvent(readNextDelayStopwatch.elapsed(TimeUnit.MICROSECONDS)); + readNextExecTime.registerSuccessfulEvent( + readNextDelayStopwatch.elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS); readNextDelayStopwatch.reset().start(); return readRequest.getPromise(); @@ -553,7 +559,8 @@ private synchronized LogRecordWithDLSN readNextRecord() throws IOException { public void run() { synchronized(scheduleLock) { if (scheduleDelayStopwatch.isRunning()) { - scheduleLatency.registerSuccessfulEvent(scheduleDelayStopwatch.stop().elapsed(TimeUnit.MICROSECONDS)); + scheduleLatency.registerSuccessfulEvent( + scheduleDelayStopwatch.stop().elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS); } Stopwatch runTime = Stopwatch.createStarted(); @@ -573,7 +580,8 @@ public void run() { if (null == nextRequest) { LOG.trace("{}: Queue Empty waiting for Input", readHandler.getFullyQualifiedName()); scheduleCount.set(0); - backgroundReaderRunTime.registerSuccessfulEvent(runTime.stop().elapsed(TimeUnit.MICROSECONDS)); + backgroundReaderRunTime.registerSuccessfulEvent( + runTime.stop().elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS); return; } @@ -599,7 +607,8 @@ public void run() { if (!(lastException.get().getCause() instanceof LogNotFoundException)) { LOG.warn("{}: Exception", readHandler.getFullyQualifiedName(), lastException.get()); } - backgroundReaderRunTime.registerFailedEvent(runTime.stop().elapsed(TimeUnit.MICROSECONDS)); + backgroundReaderRunTime.registerFailedEvent( + runTime.stop().elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS); return; } @@ -646,7 +655,8 @@ record = readNextRecord(); if (nextRequest.hasReadRecords()) { long remainingWaitTime = nextRequest.getRemainingWaitTime(); if (remainingWaitTime > 0 && !nextRequest.hasReadEnoughRecords()) { - backgroundReaderRunTime.registerSuccessfulEvent(runTime.stop().elapsed(TimeUnit.MICROSECONDS)); + backgroundReaderRunTime.registerSuccessfulEvent( + runTime.stop().elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS); scheduleDelayStopwatch.reset().start(); scheduleCount.set(0); // the request could still wait for more records @@ -681,7 +691,8 @@ record = readNextRecord(); } else { if (0 == scheduleCountLocal) { LOG.trace("Schedule count dropping to zero", lastException.get()); - backgroundReaderRunTime.registerSuccessfulEvent(runTime.stop().elapsed(TimeUnit.MICROSECONDS)); + backgroundReaderRunTime.registerSuccessfulEvent( + runTime.stop().elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS); return; } scheduleCountLocal = scheduleCount.decrementAndGet(); diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogHandler.java b/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogHandler.java index a7d0d25f7..9f2e75028 100644 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogHandler.java +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogHandler.java @@ -426,13 +426,16 @@ public CompletableFuture asyncReadLastRecord(final LogSegment ).whenComplete(new FutureEventListener() { @Override public void onSuccess(LogRecordWithDLSN value) { - recoverLastEntryStats.registerSuccessfulEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS)); - recoverScannedEntriesStats.registerSuccessfulEvent(numRecordsScanned.get()); + recoverLastEntryStats.registerSuccessfulEvent( + stopwatch.stop().elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS); + recoverScannedEntriesStats.registerSuccessfulValue(numRecordsScanned.get()); } @Override public void onFailure(Throwable cause) { - recoverLastEntryStats.registerFailedEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS)); + recoverLastEntryStats.registerFailedEvent( + stopwatch.stop().elapsed(TimeUnit.MICROSECONDS), + TimeUnit.MICROSECONDS); } }); } @@ -508,9 +511,9 @@ protected void addLogSegmentToCache(String name, LogSegmentMetadata metadata) { LOG.warn("{} received inprogress log segment in {} millis: {}", new Object[] { getFullyQualifiedName(), elapsedMillis, metadata }); } - getInprogressSegmentStat.registerSuccessfulEvent(elapsedMicroSec); + getInprogressSegmentStat.registerSuccessfulEvent(elapsedMicroSec, TimeUnit.MICROSECONDS); } else { - negativeGetInprogressSegmentStat.registerSuccessfulEvent(-elapsedMicroSec); + negativeGetInprogressSegmentStat.registerSuccessfulEvent(-elapsedMicroSec, TimeUnit.MICROSECONDS); } } else { long elapsedMillis = ts - metadata.getCompletionTime(); @@ -520,9 +523,9 @@ protected void addLogSegmentToCache(String name, LogSegmentMetadata metadata) { LOG.warn("{} received completed log segment in {} millis : {}", new Object[] { getFullyQualifiedName(), elapsedMillis, metadata }); } - getCompletedSegmentStat.registerSuccessfulEvent(elapsedMicroSec); + getCompletedSegmentStat.registerSuccessfulEvent(elapsedMicroSec, TimeUnit.MICROSECONDS); } else { - negativeGetCompletedSegmentStat.registerSuccessfulEvent(-elapsedMicroSec); + negativeGetCompletedSegmentStat.registerSuccessfulEvent(-elapsedMicroSec, TimeUnit.MICROSECONDS); } } } diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogSegmentWriter.java b/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogSegmentWriter.java index a4016c84f..6f201a64b 100644 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogSegmentWriter.java +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogSegmentWriter.java @@ -1131,8 +1131,10 @@ public void addComplete(final int rc, LedgerHandle handle, final BKTransmitPacket transmitPacket = (BKTransmitPacket) ctx; // Time from transmit until receipt of addComplete callback - addCompleteTime.registerSuccessfulEvent(TimeUnit.MICROSECONDS.convert( - System.nanoTime() - transmitPacket.getTransmitTime(), TimeUnit.NANOSECONDS)); + addCompleteTime.registerSuccessfulEvent( + TimeUnit.MICROSECONDS.convert( + System.nanoTime() - transmitPacket.getTransmitTime(), TimeUnit.NANOSECONDS), + TimeUnit.MICROSECONDS); if (BKException.Code.OK == rc) { EntryBuffer recordSet = transmitPacket.getRecordSet(); @@ -1149,9 +1151,13 @@ public void addComplete(final int rc, LedgerHandle handle, @Override public Void call() { final Stopwatch deferredTime = Stopwatch.createStarted(); - addCompleteQueuedTime.registerSuccessfulEvent(queuedTime.elapsed(TimeUnit.MICROSECONDS)); + addCompleteQueuedTime.registerSuccessfulEvent( + queuedTime.elapsed(TimeUnit.MICROSECONDS), + TimeUnit.MICROSECONDS); addCompleteDeferredProcessing(transmitPacket, entryId, effectiveRC.get()); - addCompleteDeferredTime.registerSuccessfulEvent(deferredTime.elapsed(TimeUnit.MICROSECONDS)); + addCompleteDeferredTime.registerSuccessfulEvent( + deferredTime.elapsed(TimeUnit.MICROSECONDS), + TimeUnit.MILLISECONDS); return null; } @Override @@ -1199,14 +1205,16 @@ private void addCompleteDeferredProcessing(final BKTransmitPacket transmitPacket if (transmitResult.get() != BKException.Code.OK) { if (recordSet.hasUserRecords()) { - transmitDataPacketSize.registerFailedEvent(recordSet.getNumBytes()); + transmitDataPacketSize.registerFailedEvent( + recordSet.getNumBytes(), TimeUnit.MICROSECONDS); } } else { // If we had data that we flushed then we need it to make sure that // background flush in the next pass will make the previous writes // visible by advancing the lastAck if (recordSet.hasUserRecords()) { - transmitDataPacketSize.registerSuccessfulEvent(recordSet.getNumBytes()); + transmitDataPacketSize.registerSuccessfulEvent( + recordSet.getNumBytes(), TimeUnit.MICROSECONDS); controlFlushNeeded = true; if (immediateFlushEnabled) { if (0 == minDelayBetweenImmediateFlushMs) { diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogWriteHandler.java b/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogWriteHandler.java index 1293d00bf..a310ea57e 100644 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogWriteHandler.java +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/BKLogWriteHandler.java @@ -430,9 +430,11 @@ public BKLogSegmentWriter startLogSegment(long txId, boolean bestEffort, boolean return writer; } finally { if (success) { - openOpStats.registerSuccessfulEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS)); + openOpStats.registerSuccessfulEvent( + stopwatch.stop().elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS); } else { - openOpStats.registerFailedEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS)); + openOpStats.registerFailedEvent( + stopwatch.stop().elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS); } } } @@ -743,9 +745,13 @@ LogSegmentMetadata completeAndCloseLogSegment(String inprogressZnodeName, long l return completedLogSegment; } finally { if (success) { - closeOpStats.registerSuccessfulEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS)); + closeOpStats.registerSuccessfulEvent( + stopwatch.stop().elapsed(TimeUnit.MICROSECONDS), + TimeUnit.MICROSECONDS); } else { - closeOpStats.registerFailedEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS)); + closeOpStats.registerFailedEvent( + stopwatch.stop().elapsed(TimeUnit.MICROSECONDS), + TimeUnit.MICROSECONDS); } } } @@ -1196,12 +1202,16 @@ private CompletableFuture deleteLogSegment( promise.whenComplete(new FutureEventListener() { @Override public void onSuccess(LogSegmentMetadata segment) { - deleteOpStats.registerSuccessfulEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS)); + deleteOpStats.registerSuccessfulEvent( + stopwatch.stop().elapsed(TimeUnit.MICROSECONDS), + TimeUnit.MICROSECONDS); } @Override public void onFailure(Throwable cause) { - deleteOpStats.registerFailedEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS)); + deleteOpStats.registerFailedEvent( + stopwatch.stop().elapsed(TimeUnit.MICROSECONDS), + TimeUnit.MICROSECONDS); } }); entryStore.deleteLogSegment(ledgerMetadata) diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/BookKeeperClient.java b/distributedlog-core/src/main/java/org/apache/distributedlog/BookKeeperClient.java index 2ea3b5d35..a50c391e5 100644 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/BookKeeperClient.java +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/BookKeeperClient.java @@ -18,6 +18,9 @@ package org.apache.distributedlog; import com.google.common.base.Optional; +import io.netty.channel.EventLoopGroup; +import io.netty.util.HashedWheelTimer; +import java.util.Collections; import java.util.concurrent.CompletableFuture; import org.apache.bookkeeper.client.AsyncCallback; import org.apache.bookkeeper.client.BKException; @@ -40,8 +43,6 @@ import org.apache.distributedlog.util.ConfUtils; import org.apache.distributedlog.common.concurrent.FutureUtils; import org.apache.zookeeper.KeeperException; -import org.jboss.netty.channel.socket.ClientSocketChannelFactory; -import org.jboss.netty.util.HashedWheelTimer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -66,7 +67,7 @@ public class BookKeeperClient { private final String zkServers; private final String ledgersPath; private final byte[] passwd; - private final ClientSocketChannelFactory channelFactory; + private final EventLoopGroup eventLoopGroup; private final HashedWheelTimer requestTimer; private final StatsLogger statsLogger; @@ -80,8 +81,10 @@ public class BookKeeperClient { @SuppressWarnings("deprecation") private synchronized void commonInitialization( - DistributedLogConfiguration conf, String ledgersPath, - ClientSocketChannelFactory channelFactory, StatsLogger statsLogger, HashedWheelTimer requestTimer) + DistributedLogConfiguration conf, + String ledgersPath, + EventLoopGroup eventLoopGroup, + StatsLogger statsLogger, HashedWheelTimer requestTimer) throws IOException, InterruptedException, KeeperException { ClientConfiguration bkConfig = new ClientConfiguration(); bkConfig.setAddEntryTimeout(conf.getBKClientWriteTimeout()); @@ -106,11 +109,10 @@ private synchronized void commonInitialization( final DNSToSwitchMapping dnsResolver = NetUtils.getDNSResolver(dnsResolverCls, conf.getBkDNSResolverOverrides()); - this.bkc = BookKeeper.newBuilder() - .config(bkConfig) - .zk(zkc.get()) - .channelFactory(channelFactory) - .statsLogger(statsLogger) + this.bkc = BookKeeper.forConfig(bkConfig) + .setZookeeper(zkc.get()) + .setEventLoopGroup(eventLoopGroup) + .setStatsLogger(statsLogger) .dnsResolver(dnsResolver) .requestTimer(requestTimer) .featureProvider(featureProvider.orNull()) @@ -122,7 +124,7 @@ private synchronized void commonInitialization( String zkServers, ZooKeeperClient zkc, String ledgersPath, - ClientSocketChannelFactory channelFactory, + EventLoopGroup eventLoopGroup, HashedWheelTimer requestTimer, StatsLogger statsLogger, Optional featureProvider) { @@ -131,7 +133,7 @@ private synchronized void commonInitialization( this.zkServers = zkServers; this.ledgersPath = ledgersPath; this.passwd = conf.getBKDigestPW().getBytes(UTF_8); - this.channelFactory = channelFactory; + this.eventLoopGroup = eventLoopGroup; this.requestTimer = requestTimer; this.statsLogger = statsLogger; this.featureProvider = featureProvider; @@ -162,7 +164,7 @@ private synchronized void initialize() throws IOException { } try { - commonInitialization(conf, ledgersPath, channelFactory, statsLogger, requestTimer); + commonInitialization(conf, ledgersPath, eventLoopGroup, statsLogger, requestTimer); } catch (InterruptedException e) { throw new DLInterruptedException("Interrupted on creating bookkeeper client " + name + " : ", e); } catch (KeeperException e) { @@ -216,7 +218,7 @@ public void createComplete(int rc, LedgerHandle lh, Object ctx) { promise.completeExceptionally(BKException.create(rc)); } } - }, null); + }, null, Collections.emptyMap()); return promise; } diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/BookKeeperClientBuilder.java b/distributedlog-core/src/main/java/org/apache/distributedlog/BookKeeperClientBuilder.java index a356f9f62..1149ad510 100644 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/BookKeeperClientBuilder.java +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/BookKeeperClientBuilder.java @@ -19,14 +19,12 @@ import com.google.common.base.Optional; import com.google.common.base.Preconditions; +import io.netty.channel.EventLoopGroup; +import io.netty.util.HashedWheelTimer; import org.apache.bookkeeper.stats.NullStatsLogger; import org.apache.bookkeeper.stats.StatsLogger; -import org.jboss.netty.channel.socket.ClientSocketChannelFactory; -import org.jboss.netty.util.HashedWheelTimer; import org.apache.bookkeeper.feature.FeatureProvider; -import org.apache.bookkeeper.feature.Feature; - /** * Builder to build bookkeeper client. */ @@ -55,7 +53,7 @@ public static BookKeeperClientBuilder newBuilder() { // statsLogger private StatsLogger statsLogger = NullStatsLogger.INSTANCE; // client channel factory - private ClientSocketChannelFactory channelFactory = null; + private EventLoopGroup eventLoopGroup = null; // request timer private HashedWheelTimer requestTimer = null; // feature provider @@ -150,12 +148,12 @@ public synchronized BookKeeperClientBuilder bkc(BookKeeperClient bkc) { /** * Build BookKeeper client using existing channelFactory. * - * @param channelFactory - * Channel Factory used to build bookkeeper client. + * @param eventLoopGroup + * event loop group used to build bookkeeper client. * @return bookkeeper client builder. */ - public synchronized BookKeeperClientBuilder channelFactory(ClientSocketChannelFactory channelFactory) { - this.channelFactory = channelFactory; + public synchronized BookKeeperClientBuilder eventLoopGroup(EventLoopGroup eventLoopGroup) { + this.eventLoopGroup = eventLoopGroup; return this; } @@ -204,6 +202,15 @@ public synchronized BookKeeperClient build() { private BookKeeperClient buildClient() { validateParameters(); - return new BookKeeperClient(dlConfig, name, zkServers, zkc, ledgersPath, channelFactory, requestTimer, statsLogger, featureProvider); + return new BookKeeperClient( + dlConfig, + name, + zkServers, + zkc, + ledgersPath, + eventLoopGroup, + requestTimer, + statsLogger, + featureProvider); } } diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/BKNamespaceDriver.java b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/BKNamespaceDriver.java index 21fe2279a..93a218fec 100644 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/BKNamespaceDriver.java +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/BKNamespaceDriver.java @@ -20,6 +20,12 @@ import com.google.common.annotations.VisibleForTesting; import com.google.common.base.Optional; import com.google.common.util.concurrent.ThreadFactoryBuilder; +import io.netty.channel.EventLoopGroup; +import io.netty.channel.epoll.EpollEventLoopGroup; +import io.netty.channel.nio.NioEventLoopGroup; +import io.netty.util.HashedWheelTimer; +import java.util.concurrent.ThreadFactory; +import org.apache.commons.lang.SystemUtils; import org.apache.distributedlog.BookKeeperClient; import org.apache.distributedlog.BookKeeperClientBuilder; import org.apache.distributedlog.DistributedLogConfiguration; @@ -58,9 +64,6 @@ import org.apache.zookeeper.ZooKeeper; import org.apache.zookeeper.common.PathUtils; import org.apache.zookeeper.data.Stat; -import org.jboss.netty.channel.socket.ClientSocketChannelFactory; -import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory; -import org.jboss.netty.util.HashedWheelTimer; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -69,7 +72,6 @@ import java.util.HashMap; import java.util.List; import java.util.Map; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.concurrent.atomic.AtomicBoolean; @@ -130,7 +132,7 @@ public static String getZKServersFromDLUri(URI uri) { // {@link org.apache.distributedlog.BookKeeperClient#get()}. So it is safe to // keep builders and their client wrappers here, as they will be used when // instantiating readers or writers. - private ClientSocketChannelFactory channelFactory; + private EventLoopGroup eventLoopGroup; private HashedWheelTimer requestTimer; private BookKeeperClientBuilder sharedWriterBKCBuilder; private BookKeeperClient writerBKC; @@ -250,11 +252,22 @@ private synchronized BKDLConfig getBkdlConfig() { return bkdlConfig; } + static EventLoopGroup getDefaultEventLoopGroup(int numThreads) { + ThreadFactory threadFactory = new ThreadFactoryBuilder().setNameFormat("DL-io-%s").build(); + if (SystemUtils.IS_OS_LINUX) { + try { + return new EpollEventLoopGroup(numThreads, threadFactory); + } catch (Throwable t) { + LOG.warn("Could not use Netty Epoll event loop for bookie server: {}", t.getMessage()); + return new NioEventLoopGroup(numThreads, threadFactory); + } + } else { + return new NioEventLoopGroup(numThreads, threadFactory); + } + } + private void initializeBookKeeperClients() throws IOException { - this.channelFactory = new NioClientSocketChannelFactory( - Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("DL-netty-boss-%d").build()), - Executors.newCachedThreadPool(new ThreadFactoryBuilder().setNameFormat("DL-netty-worker-%d").build()), - conf.getBKClientNumberIOThreads()); + this.eventLoopGroup = getDefaultEventLoopGroup(conf.getBKClientNumberIOThreads()); this.requestTimer = new HashedWheelTimer( new ThreadFactoryBuilder().setNameFormat("DLFactoryTimer-%d").build(), conf.getTimeoutTimerTickDurationMs(), TimeUnit.MILLISECONDS, @@ -265,7 +278,7 @@ private void initializeBookKeeperClients() throws IOException { conf, bkdlConfig.getBkZkServersForWriter(), bkdlConfig.getBkLedgersPath(), - channelFactory, + eventLoopGroup, requestTimer, Optional.of(featureProvider.scope("bkc")), statsLogger); @@ -280,7 +293,7 @@ private void initializeBookKeeperClients() throws IOException { conf, bkdlConfig.getBkZkServersForReader(), bkdlConfig.getBkLedgersPath(), - channelFactory, + eventLoopGroup, requestTimer, Optional.absent(), statsLogger); @@ -393,7 +406,7 @@ private void doClose() { writerZKC.close(); readerZKC.close(); // release bookkeeper resources - channelFactory.releaseExternalResources(); + eventLoopGroup.shutdownGracefully(); LOG.info("Release external resources used by channel factory."); requestTimer.stop(); LOG.info("Stopped request timer"); @@ -582,7 +595,7 @@ private BookKeeperClientBuilder createBKCBuilder(String bkcName, DistributedLogConfiguration conf, String zkServers, String ledgersPath, - ClientSocketChannelFactory channelFactory, + EventLoopGroup eventLoopGroup, HashedWheelTimer requestTimer, Optional featureProviderOptional, StatsLogger statsLogger) { @@ -591,7 +604,7 @@ private BookKeeperClientBuilder createBKCBuilder(String bkcName, .dlConfig(conf) .zkServers(zkServers) .ledgersPath(ledgersPath) - .channelFactory(channelFactory) + .eventLoopGroup(eventLoopGroup) .requestTimer(requestTimer) .featureProvider(featureProviderOptional) .statsLogger(statsLogger); diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/limiter/ChainedRequestLimiter.java b/distributedlog-core/src/main/java/org/apache/distributedlog/limiter/ChainedRequestLimiter.java index fdad69bec..bbb4f7eb2 100644 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/limiter/ChainedRequestLimiter.java +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/limiter/ChainedRequestLimiter.java @@ -70,7 +70,7 @@ public void apply(Request request) throws OverCapacityException { limiter.apply(request); } } finally { - applyTime.registerSuccessfulEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS)); + applyTime.registerSuccessfulEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS); } } } diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/lock/ZKDistributedLock.java b/distributedlog-core/src/main/java/org/apache/distributedlog/lock/ZKDistributedLock.java index 77151df2e..664d0bc03 100644 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/lock/ZKDistributedLock.java +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/lock/ZKDistributedLock.java @@ -147,11 +147,15 @@ public synchronized CompletableFuture asyncAcquire() { promise.whenComplete(new FutureEventListener() { @Override public void onSuccess(ZKDistributedLock lock) { - acquireStats.registerSuccessfulEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS)); + acquireStats.registerSuccessfulEvent( + stopwatch.stop().elapsed(TimeUnit.MICROSECONDS), + TimeUnit.MICROSECONDS); } @Override public void onFailure(Throwable cause) { - acquireStats.registerFailedEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS)); + acquireStats.registerFailedEvent( + stopwatch.stop().elapsed(TimeUnit.MICROSECONDS), + TimeUnit.MICROSECONDS); // release the lock if fail to acquire asyncClose(); } @@ -496,7 +500,9 @@ public void onSuccess(ZKDistributedLock lock) { synchronized (ZKDistributedLock.this) { lockReacquireFuture = null; } - reacquireStats.registerSuccessfulEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS)); + reacquireStats.registerSuccessfulEvent( + stopwatch.elapsed(TimeUnit.MICROSECONDS), + TimeUnit.MICROSECONDS); } @Override @@ -509,7 +515,9 @@ public void onFailure(Throwable cause) { "Exception on re-acquiring lock", cause); } } - reacquireStats.registerFailedEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS)); + reacquireStats.registerFailedEvent( + stopwatch.elapsed(TimeUnit.MICROSECONDS), + TimeUnit.MICROSECONDS); } }); } diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/lock/ZKSessionLock.java b/distributedlog-core/src/main/java/org/apache/distributedlog/lock/ZKSessionLock.java index 9fdcbf121..132575142 100644 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/lock/ZKSessionLock.java +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/lock/ZKSessionLock.java @@ -838,9 +838,13 @@ synchronized LockWaiter waitForTry(Stopwatch stopwatch, CompletableFutureUTF-8 UTF-8 - 4.3.7-TWTTR-OSS + 4.5.0-SNAPSHOT 3.0.1 1.1 1.6 @@ -261,14 +261,6 @@ - - bookkeeper-twitter-mvn-repo - https://raw.github.com/twitter/bookkeeper/mvn-repo/${bookkeeper.version} - - true - always - - twitter-repo Twitter Maven Repo From ccd74487da38d0fde64569005b777fea658a0adb Mon Sep 17 00:00:00 2001 From: Jia Zhai Date: Wed, 26 Jul 2017 18:07:52 -0700 Subject: [PATCH 2/9] DL-2: DistributedLog should work with the official apache bookkeeper version --- .../benchmark/DLWriterWorker.java | 6 +- .../benchmark/ReaderWorker.java | 14 +- .../benchmark/WriterWorker.java | 6 +- .../stream/AsyncReaderBenchmark.java | 7 +- .../benchmark/stream/SyncReaderBenchmark.java | 9 +- distributedlog-core-twitter/pom.xml | 7 + .../distributedlog/stats/OpStatsListener.java | 6 +- .../tools/DistributedLogTool.java | 4 +- .../distributedlog/TestDLMTestUtil.java | 6 + .../TestDistributedLogBase.java | 6 + distributedlog-proxy-client/pom.xml | 7 + distributedlog-proxy-protocol/pom.xml | 7 + distributedlog-proxy-server/pom.xml | 7 + .../distributedlog/service/StatsFilter.java | 6 +- .../placement/LeastLoadPlacementPolicy.java | 7 +- .../service/stream/AbstractStreamOp.java | 6 +- .../service/stream/BulkWriteOp.java | 9 +- .../service/stream/StreamImpl.java | 6 +- .../service/stream/WriteOp.java | 9 +- .../service/stream/admin/StreamAdminOp.java | 4 +- .../service/DistributedLogServerTestCase.java | 6 + .../distributedlog-mapreduce/README.md | 1 - .../distributedlog-mapreduce/pom.xml | 56 ------- .../mapreduce/DistributedLogInputFormat.java | 129 --------------- .../mapreduce/LogSegmentReader.java | 148 ------------------ .../mapreduce/LogSegmentSplit.java | 95 ----------- distributedlog-tutorials/pom.xml | 1 - pom.xml | 9 +- 28 files changed, 110 insertions(+), 474 deletions(-) delete mode 100644 distributedlog-tutorials/distributedlog-mapreduce/README.md delete mode 100644 distributedlog-tutorials/distributedlog-mapreduce/pom.xml delete mode 100644 distributedlog-tutorials/distributedlog-mapreduce/src/main/java/org/apache/distributedlog/mapreduce/DistributedLogInputFormat.java delete mode 100644 distributedlog-tutorials/distributedlog-mapreduce/src/main/java/org/apache/distributedlog/mapreduce/LogSegmentReader.java delete mode 100644 distributedlog-tutorials/distributedlog-mapreduce/src/main/java/org/apache/distributedlog/mapreduce/LogSegmentSplit.java diff --git a/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/DLWriterWorker.java b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/DLWriterWorker.java index 5c9b2a9cf..015cf4682 100644 --- a/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/DLWriterWorker.java +++ b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/DLWriterWorker.java @@ -228,12 +228,14 @@ public void run() { writer.write(new LogRecord(requestMillis, data)).whenComplete(new FutureEventListener() { @Override public void onSuccess(DLSN value) { - requestStat.registerSuccessfulEvent(System.currentTimeMillis() - requestMillis); + requestStat.registerSuccessfulEvent( + System.currentTimeMillis() - requestMillis, TimeUnit.MILLISECONDS); } @Override public void onFailure(Throwable cause) { - requestStat.registerFailedEvent(System.currentTimeMillis() - requestMillis); + requestStat.registerFailedEvent( + System.currentTimeMillis() - requestMillis, TimeUnit.MILLISECONDS); LOG.error("Failed to publish, rescue it : ", cause); scheduleRescue(streamIdx, writer, 0); } diff --git a/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/ReaderWorker.java b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/ReaderWorker.java index ad95a59fc..b81dad41f 100644 --- a/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/ReaderWorker.java +++ b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/ReaderWorker.java @@ -156,14 +156,14 @@ public void processRecord(final LogRecordWithDLSN record) { long e2eLatency = curTimeMillis - msg.getPublishTime(); long deliveryLatency = curTimeMillis - record.getTransactionId(); if (e2eLatency >= 0) { - e2eStat.registerSuccessfulEvent(e2eLatency); + e2eStat.registerSuccessfulEvent(e2eLatency, TimeUnit.MILLISECONDS); } else { - negativeE2EStat.registerSuccessfulEvent(-e2eLatency); + negativeE2EStat.registerSuccessfulEvent(-e2eLatency, TimeUnit.MILLISECONDS); } if (deliveryLatency >= 0) { - deliveryStat.registerSuccessfulEvent(deliveryLatency); + deliveryStat.registerSuccessfulEvent(deliveryLatency, TimeUnit.MILLISECONDS); } else { - negativeDeliveryStat.registerSuccessfulEvent(-deliveryLatency); + negativeDeliveryStat.registerSuccessfulEvent(-deliveryLatency, TimeUnit.MILLISECONDS); } prevDLSN = record.getDlsn(); @@ -200,12 +200,14 @@ public void run() { new FutureEventListener() { @Override public void onSuccess(Boolean value) { - truncationStat.registerSuccessfulEvent(stopwatch.stop().elapsed(TimeUnit.MILLISECONDS)); + truncationStat.registerSuccessfulEvent( + stopwatch.stop().elapsed(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS); } @Override public void onFailure(Throwable cause) { - truncationStat.registerFailedEvent(stopwatch.stop().elapsed(TimeUnit.MILLISECONDS)); + truncationStat.registerFailedEvent( + stopwatch.stop().elapsed(TimeUnit.MILLISECONDS), TimeUnit.MILLISECONDS); LOG.error("Failed to truncate stream {} to {} : ", new Object[]{streamName, dlsnToTruncate, cause}); } diff --git a/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/WriterWorker.java b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/WriterWorker.java index 46f9dfc50..9879e7100 100644 --- a/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/WriterWorker.java +++ b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/WriterWorker.java @@ -272,10 +272,12 @@ public void onFailure(Throwable cause) { @Override public void run() { if (null != dlsn) { - requestStat.registerSuccessfulEvent(System.currentTimeMillis() - requestMillis); + requestStat.registerSuccessfulEvent( + System.currentTimeMillis() - requestMillis, TimeUnit.MILLISECONDS); } else { LOG.error("Failed to publish to {} : ", streamName, cause); - requestStat.registerFailedEvent(System.currentTimeMillis() - requestMillis); + requestStat.registerFailedEvent( + System.currentTimeMillis() - requestMillis, TimeUnit.MILLISECONDS); exceptionsLogger.getCounter(cause.getClass().getName()).inc(); if (cause instanceof DLException) { DLException dle = (DLException) cause; diff --git a/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/AsyncReaderBenchmark.java b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/AsyncReaderBenchmark.java index 4c8e372ff..a2dbdd7c0 100644 --- a/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/AsyncReaderBenchmark.java +++ b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/AsyncReaderBenchmark.java @@ -109,11 +109,12 @@ protected void benchmark(Namespace namespace, String logName, StatsLogger statsL reader = FutureUtils.result(dlm.openAsyncLogReader(lastDLSN)); } long elapsedMs = stopwatch.elapsed(TimeUnit.MICROSECONDS); - openReaderStats.registerSuccessfulEvent(elapsedMs); + openReaderStats.registerSuccessfulEvent(elapsedMs, TimeUnit.MICROSECONDS); logger.info("It took {} ms to position the reader to transaction id = {}, dlsn = {}", lastTxId, lastDLSN); } catch (Exception ioe) { - openReaderStats.registerFailedEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS)); + openReaderStats.registerFailedEvent( + stopwatch.elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS); logger.warn("Failed to create reader for stream {} reading from tx id = {}, dlsn = {}.", new Object[] { streamName, lastTxId, lastDLSN }); } @@ -133,7 +134,7 @@ protected void benchmark(Namespace namespace, String logName, StatsLogger statsL stopwatch.start(); records = FutureUtils.result(reader.readBulk(batchSize)); long elapsedMicros = stopwatch.stop().elapsed(TimeUnit.MICROSECONDS); - blockingReadStats.registerSuccessfulEvent(elapsedMicros); + blockingReadStats.registerSuccessfulEvent(elapsedMicros, TimeUnit.MICROSECONDS); if (!records.isEmpty()) { readCounter.add(records.size()); LogRecordWithDLSN lastRecord = records.get(records.size() - 1); diff --git a/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/SyncReaderBenchmark.java b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/SyncReaderBenchmark.java index cbd7f674f..3e8c8bb38 100644 --- a/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/SyncReaderBenchmark.java +++ b/distributedlog-benchmark/src/main/java/org/apache/distributedlog/benchmark/stream/SyncReaderBenchmark.java @@ -98,10 +98,11 @@ protected void benchmark(Namespace namespace, String streamName, StatsLogger sta try { reader = dlm.getInputStream(lastTxId); long elapsedMs = stopwatch.elapsed(TimeUnit.MICROSECONDS); - openReaderStats.registerSuccessfulEvent(elapsedMs); + openReaderStats.registerSuccessfulEvent(elapsedMs, TimeUnit.MICROSECONDS); logger.info("It took {} ms to position the reader to transaction id {}", lastTxId); } catch (IOException ioe) { - openReaderStats.registerFailedEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS)); + openReaderStats.registerFailedEvent( + stopwatch.elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS); logger.warn("Failed to create reader for stream {} reading from {}.", streamName, lastTxId); } if (null == reader) { @@ -129,11 +130,11 @@ record = reader.readNext(nonBlocking); if (null != record) { long elapsedMicros = stopwatch.stop().elapsed(TimeUnit.MICROSECONDS); if (nonBlocking) { - nonBlockingReadStats.registerSuccessfulEvent(elapsedMicros); + nonBlockingReadStats.registerSuccessfulEvent(elapsedMicros, TimeUnit.MICROSECONDS); } else { numCatchupBytes += record.getPayload().length; ++numCatchupReads; - blockingReadStats.registerSuccessfulEvent(elapsedMicros); + blockingReadStats.registerSuccessfulEvent(elapsedMicros, TimeUnit.MICROSECONDS); } lastTxId = record.getTransactionId(); } else { diff --git a/distributedlog-core-twitter/pom.xml b/distributedlog-core-twitter/pom.xml index 41116b96c..7cec0b7ed 100644 --- a/distributedlog-core-twitter/pom.xml +++ b/distributedlog-core-twitter/pom.xml @@ -138,4 +138,11 @@ + + + twitter-repo + Twitter Maven Repo + http://maven.twttr.com + + diff --git a/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/stats/OpStatsListener.java b/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/stats/OpStatsListener.java index a9c7b212a..67911f7ec 100644 --- a/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/stats/OpStatsListener.java +++ b/distributedlog-core-twitter/src/main/java/org/apache/distributedlog/stats/OpStatsListener.java @@ -45,11 +45,13 @@ public OpStatsListener(OpStatsLogger opStatsLogger) { @Override public void onSuccess(T value) { - opStatsLogger.registerSuccessfulEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS)); + opStatsLogger.registerSuccessfulEvent( + stopwatch.elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS); } @Override public void onFailure(Throwable cause) { - opStatsLogger.registerFailedEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS)); + opStatsLogger.registerFailedEvent( + stopwatch.elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS); } } diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/tools/DistributedLogTool.java b/distributedlog-core/src/main/java/org/apache/distributedlog/tools/DistributedLogTool.java index 64229d1ee..eea49e669 100644 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/tools/DistributedLogTool.java +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/tools/DistributedLogTool.java @@ -1727,6 +1727,7 @@ protected int runBKCommand(BKCommandRunner runner) throws Exception { abstract protected int runBKCmd(ZooKeeperClient zkc, BookKeeperClient bkc) throws Exception; } + /** static class RecoverCommand extends PerBKCommand { final List ledgers = new ArrayList(); @@ -2035,6 +2036,7 @@ protected String getUsage() { return "recover [options] "; } } + **/ /** * Per Ledger Command, which parse common options for per ledger. e.g. ledger id. @@ -2854,7 +2856,7 @@ public DistributedLogTool() { addCommand(new ListCommand()); addCommand(new ReadLastConfirmedCommand()); addCommand(new ReadEntriesCommand()); - addCommand(new RecoverCommand()); + // addCommand(new RecoverCommand()); addCommand(new RecoverLedgerCommand()); addCommand(new ShowCommand()); addCommand(new TruncateCommand()); diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/TestDLMTestUtil.java b/distributedlog-core/src/test/java/org/apache/distributedlog/TestDLMTestUtil.java index a4db3adbe..e47d020a3 100644 --- a/distributedlog-core/src/test/java/org/apache/distributedlog/TestDLMTestUtil.java +++ b/distributedlog-core/src/test/java/org/apache/distributedlog/TestDLMTestUtil.java @@ -32,6 +32,12 @@ public class TestDLMTestUtil { static final Logger LOG = LoggerFactory.getLogger(TestDLMTestUtil.class); + static { + // org.apache.zookeeper.test.ClientBase uses FourLetterWordMain, from 3.5.3 four letter words + // are disabled by default due to security reasons + System.setProperty("zookeeper.4lw.commands.whitelist", "*"); + } + @Rule public TestName testNames = new TestName(); diff --git a/distributedlog-core/src/test/java/org/apache/distributedlog/TestDistributedLogBase.java b/distributedlog-core/src/test/java/org/apache/distributedlog/TestDistributedLogBase.java index 5e4ba07b7..5b55cd013 100644 --- a/distributedlog-core/src/test/java/org/apache/distributedlog/TestDistributedLogBase.java +++ b/distributedlog-core/src/test/java/org/apache/distributedlog/TestDistributedLogBase.java @@ -67,6 +67,12 @@ public class TestDistributedLogBase { static final Logger LOG = LoggerFactory.getLogger(TestDistributedLogBase.class); + static { + // org.apache.zookeeper.test.ClientBase uses FourLetterWordMain, from 3.5.3 four letter words + // are disabled by default due to security reasons + System.setProperty("zookeeper.4lw.commands.whitelist", "*"); + } + // Num worker threads should be one, since the exec service is used for the ordered // future pool in test cases, and setting to > 1 will therefore result in unordered // write ops. diff --git a/distributedlog-proxy-client/pom.xml b/distributedlog-proxy-client/pom.xml index 25ad732a7..d8bd613c0 100644 --- a/distributedlog-proxy-client/pom.xml +++ b/distributedlog-proxy-client/pom.xml @@ -169,4 +169,11 @@ + + + twitter-repo + Twitter Maven Repo + http://maven.twttr.com + + diff --git a/distributedlog-proxy-protocol/pom.xml b/distributedlog-proxy-protocol/pom.xml index 0f6a85fd7..4c010a473 100644 --- a/distributedlog-proxy-protocol/pom.xml +++ b/distributedlog-proxy-protocol/pom.xml @@ -127,4 +127,11 @@ + + + twitter-repo + Twitter Maven Repo + http://maven.twttr.com + + diff --git a/distributedlog-proxy-server/pom.xml b/distributedlog-proxy-server/pom.xml index 83b2beffb..10d0a8e62 100644 --- a/distributedlog-proxy-server/pom.xml +++ b/distributedlog-proxy-server/pom.xml @@ -272,4 +272,11 @@ + + + twitter-repo + Twitter Maven Repo + http://maven.twttr.com + + diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/StatsFilter.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/StatsFilter.java index ee645807b..872fecbd2 100644 --- a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/StatsFilter.java +++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/StatsFilter.java @@ -42,11 +42,13 @@ public Future apply(Req req, Service service) { final Stopwatch stopwatch = Stopwatch.createStarted(); try { result = service.apply(req); - serviceExec.registerSuccessfulEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS)); + serviceExec.registerSuccessfulEvent( + stopwatch.stop().elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS); } finally { outstandingAsync.dec(); if (null == result) { - serviceExec.registerFailedEvent(stopwatch.stop().elapsed(TimeUnit.MICROSECONDS)); + serviceExec.registerFailedEvent( + stopwatch.stop().elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS); } } return result; diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/LeastLoadPlacementPolicy.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/LeastLoadPlacementPolicy.java index 1336dddb8..2ecaf3629 100644 --- a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/LeastLoadPlacementPolicy.java +++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/placement/LeastLoadPlacementPolicy.java @@ -17,6 +17,7 @@ */ package org.apache.distributedlog.service.placement; +import java.util.concurrent.TimeUnit; import org.apache.distributedlog.client.routing.RoutingService; import org.apache.distributedlog.api.namespace.Namespace; import com.twitter.util.Duration; @@ -175,14 +176,16 @@ public TreeSet apply(List streamLoads) { }).onSuccess(new Function, BoxedUnit>() { @Override public BoxedUnit apply(TreeSet serverLoads) { - placementCalcStats.registerSuccessfulEvent(System.currentTimeMillis() - startTime); + placementCalcStats.registerSuccessfulEvent( + System.currentTimeMillis() - startTime, TimeUnit.MICROSECONDS); return BoxedUnit.UNIT; } }).onFailure(new Function() { @Override public BoxedUnit apply(Throwable t) { logger.error("Failure calculating loads", t); - placementCalcStats.registerFailedEvent(System.currentTimeMillis() - startTime); + placementCalcStats.registerFailedEvent( + System.currentTimeMillis() - startTime, TimeUnit.MICROSECONDS); return BoxedUnit.UNIT; } }); diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/AbstractStreamOp.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/AbstractStreamOp.java index 770018432..12c4783cb 100644 --- a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/AbstractStreamOp.java +++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/AbstractStreamOp.java @@ -96,7 +96,8 @@ public Future execute(AsyncLogWriter writer, Sequencer sequencer, Object t .addEventListener(new FutureEventListener() { @Override public void onSuccess(Response response) { - opStatsLogger.registerSuccessfulEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS)); + opStatsLogger.registerSuccessfulEvent( + stopwatch.elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS); setResponse(response); } @Override @@ -119,7 +120,8 @@ public void fail(Throwable cause) { OwnershipAcquireFailedException oafe = (OwnershipAcquireFailedException) cause; fail(ResponseUtils.ownerToHeader(oafe.getCurrentOwner())); } else { - opStatsLogger.registerFailedEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS)); + opStatsLogger.registerFailedEvent( + stopwatch.elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS); fail(ResponseUtils.exceptionToHeader(cause)); } } diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/BulkWriteOp.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/BulkWriteOp.java index 372703aa2..93dedb634 100644 --- a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/BulkWriteOp.java +++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/BulkWriteOp.java @@ -122,16 +122,19 @@ public BulkWriteOp(String stream, @Override public void onSuccess(BulkWriteResponse response) { if (response.getHeader().getCode() == StatusCode.SUCCESS) { - latencyStat.registerSuccessfulEvent(stopwatch().elapsed(TimeUnit.MICROSECONDS)); + latencyStat.registerSuccessfulEvent( + stopwatch().elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS); bytes.add(size); bulkWriteBytes.add(size); } else { - latencyStat.registerFailedEvent(stopwatch().elapsed(TimeUnit.MICROSECONDS)); + latencyStat.registerFailedEvent( + stopwatch().elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS); } } @Override public void onFailure(Throwable cause) { - latencyStat.registerFailedEvent(stopwatch().elapsed(TimeUnit.MICROSECONDS)); + latencyStat.registerFailedEvent( + stopwatch().elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS); } }); } diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamImpl.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamImpl.java index df3d64ff3..71a9531f7 100644 --- a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamImpl.java +++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/StreamImpl.java @@ -654,9 +654,11 @@ void processPendingRequestsAfterAcquire(boolean success, Stopwatch stopwatch, Promise acquirePromise) { if (success) { - streamAcquireStat.registerSuccessfulEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS)); + streamAcquireStat.registerSuccessfulEvent( + stopwatch.elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS); } else { - streamAcquireStat.registerFailedEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS)); + streamAcquireStat.registerFailedEvent( + stopwatch.elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS); } for (StreamOp op : oldPendingOps) { executeOp(op, success); diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/WriteOp.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/WriteOp.java index 0a8a2da1f..e3aa7039d 100644 --- a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/WriteOp.java +++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/WriteOp.java @@ -102,16 +102,19 @@ public WriteOp(String stream, @Override public void onSuccess(WriteResponse response) { if (response.getHeader().getCode() == StatusCode.SUCCESS) { - latencyStat.registerSuccessfulEvent(stopwatch().elapsed(TimeUnit.MICROSECONDS)); + latencyStat.registerSuccessfulEvent( + stopwatch().elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS); bytes.add(size); writeBytes.add(size); } else { - latencyStat.registerFailedEvent(stopwatch().elapsed(TimeUnit.MICROSECONDS)); + latencyStat.registerFailedEvent( + stopwatch().elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS); } } @Override public void onFailure(Throwable cause) { - latencyStat.registerFailedEvent(stopwatch().elapsed(TimeUnit.MICROSECONDS)); + latencyStat.registerFailedEvent( + stopwatch().elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS); } }); } diff --git a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/admin/StreamAdminOp.java b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/admin/StreamAdminOp.java index c3c5d8109..d790016f8 100644 --- a/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/admin/StreamAdminOp.java +++ b/distributedlog-proxy-server/src/main/java/org/apache/distributedlog/service/stream/admin/StreamAdminOp.java @@ -84,14 +84,14 @@ public Future execute() { @Override public WriteResponse map(WriteResponse response) { opStatsLogger.registerSuccessfulEvent( - stopwatch.elapsed(TimeUnit.MICROSECONDS)); + stopwatch.elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS); return response; } @Override public WriteResponse handle(Throwable cause) { opStatsLogger.registerFailedEvent( - stopwatch.elapsed(TimeUnit.MICROSECONDS)); + stopwatch.elapsed(TimeUnit.MICROSECONDS), TimeUnit.MICROSECONDS); return ResponseUtils.write(ResponseUtils.exceptionToHeader(cause)); } diff --git a/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/DistributedLogServerTestCase.java b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/DistributedLogServerTestCase.java index 58b5b2a47..a8ed1b299 100644 --- a/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/DistributedLogServerTestCase.java +++ b/distributedlog-proxy-server/src/test/java/org/apache/distributedlog/service/DistributedLogServerTestCase.java @@ -49,6 +49,12 @@ */ public abstract class DistributedLogServerTestCase { + static { + // org.apache.zookeeper.test.ClientBase uses FourLetterWordMain, from 3.5.3 four letter words + // are disabled by default due to security reasons + System.setProperty("zookeeper.4lw.commands.whitelist", "*"); + } + protected static DistributedLogConfiguration conf = new DistributedLogConfiguration().setLockTimeout(10) .setOutputBufferSize(0).setPeriodicFlushFrequencyMilliSeconds(10); diff --git a/distributedlog-tutorials/distributedlog-mapreduce/README.md b/distributedlog-tutorials/distributedlog-mapreduce/README.md deleted file mode 100644 index ab50d286e..000000000 --- a/distributedlog-tutorials/distributedlog-mapreduce/README.md +++ /dev/null @@ -1 +0,0 @@ -### DistributedLog meets MapReduce A distributedlog log stream is consists of log segments. Each log segment is distributed among multiple bookies node. This nature of data distribution allows distributedlog easily integrated with any analytics processing systems like *MapReduce* and *Spark*. This tutorial shows how you could use *MapReduce* to process log streams' data in batch and how *MapReduce* can leverage the data locality of log segments. #### InputFormat **InputFormat** is one of the fundamental class in Hadoop MapReduce framework, that is used for accessing data from different sources. The class is responsible for defining two main things: - Data Splits - Record Reader *Data Split* is a fundamental concept in Hadoop MapReduce framework which defines both the size of individual Map tasks and its potential execution server. The *Record Reader* is responsible for actual reading records from the *data split* and submitting them (as key/value pairs) to the mapper. Using distributedlog log streams as the sources for a MapReduce job, the *log segments* are the *data splits*, while the *log segment reader* for a log segment is the *record reader* for a *data split*. #### Log Segment vs Data Split Any split implementation extends the Apache base abstract class - **InputSplit**, defining a split length and locations. A distributedlog log segment has *record count*, which could be used to define the length of the split, and its metadata contains the storage nodes that are used to store its log records, which could be used to define the locations of the split. So we could create a **LogSegmentSplit** wrapping over a *LogSegment* (LogSegmentMetadata and LedgerMetadata). public class LogSegmentSplit extends InputSplit { private LogSegmentMetadata logSegmentMetadata; private LedgerMetadata ledgerMetadata; public LogSegmentSplit() {} public LogSegmentSplit(LogSegmentMetadata logSegmentMetadata, LedgerMetadata ledgerMetadata) { this.logSegmentMetadata = logSegmentMetadata; this.ledgerMetadata = ledgerMetadata; } } The length of the log segment split is the *number of records in the log segment*. @Override public long getLength() throws IOException, InterruptedException { return logSegmentMetadata.getRecordCount(); } The locations of the log segment split are the bookies' addresses in the ensembles of the log segment. @Override public String[] getLocations() throws IOException, InterruptedException { Set locations = Sets.newHashSet(); for (ArrayList ensemble : ledgerMetadata.getEnsembles().values()) { for (BookieSocketAddress host : ensemble) { locations.add(host.getHostName()); } } return locations.toArray(new String[locations.size()]); } At this point, we will have a basic **LogSegmentSplit** wrapping *LogSegmentMetadata* and *LedgerMetadata*. Then we could retrieve the list of log segments of a log stream and construct corresponding *data splits* in distributedlog inputformat. public class DistributedLogInputFormat extends InputFormat implements Configurable { @Override public List getSplits(JobContext jobContext) throws IOException, InterruptedException { List segments = dlm.getLogSegments(); List inputSplits = Lists.newArrayListWithCapacity(segments.size()); BookKeeper bk = namespace.getReaderBKC().get(); LedgerManager lm = BookKeeperAccessor.getLedgerManager(bk); final AtomicInteger rcHolder = new AtomicInteger(0); final AtomicReference metadataHolder = new AtomicReference(null); for (LogSegmentMetadata segment : segments) { final CountDownLatch latch = new CountDownLatch(1); lm.readLedgerMetadata(segment.getLedgerId(), new BookkeeperInternalCallbacks.GenericCallback() { @Override public void operationComplete(int rc, LedgerMetadata ledgerMetadata) { metadataHolder.set(ledgerMetadata); rcHolder.set(rc); latch.countDown(); } }); latch.await(); if (BKException.Code.OK != rcHolder.get()) { throw new IOException("Faild to get log segment metadata for " + segment + " : " + BKException.getMessage(rcHolder.get())); } inputSplits.add(new LogSegmentSplit(segment, metadataHolder.get())); } return inputSplits; } } #### Log Segment Record Reader At this point, we know how to break the log streams into *data splits*. Then we need to be able to create a **RecordReader** for individual *data split*. Since each *data split* is effectively a *log segment* in distributedlog, it is straight to implement it using distributedlog's log segment reader. For simplicity, this example uses the raw bk api to access entries, which it doesn't leverage features like **ReadAhead** provided in distributedlog. It could be changed to use log segment reader for better performance. From the *data split*, we know which log segment and its corresponding bookkeeper ledger. Then we could open the ledger handle when initializing the record reader. LogSegmentReader(String streamName, DistributedLogConfiguration conf, BookKeeper bk, LogSegmentSplit split) throws IOException { this.streamName = streamName; this.bk = bk; this.metadata = split.getMetadata(); try { this.lh = bk.openLedgerNoRecovery( split.getLedgerId(), BookKeeper.DigestType.CRC32, conf.getBKDigestPW().getBytes(UTF_8)); } catch (BKException e) { throw new IOException(e); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new IOException(e); } } Reading records from the *data split* is effectively reading records from the distributedlog log segment. try { Enumeration entries = lh.readEntries(entryId, entryId); if (entries.hasMoreElements()) { LedgerEntry entry = entries.nextElement(); Entry.newBuilder() .setLogSegmentInfo(metadata.getLogSegmentSequenceNumber(), metadata.getStartSequenceId()) .setEntryId(entry.getEntryId()) .setEnvelopeEntry( LogSegmentMetadata.supportsEnvelopedEntries(metadata.getVersion())) .deserializeRecordSet(true) .setInputStream(entry.getEntryInputStream()) .buildReader(); } return nextKeyValue(); } catch (BKException e) { throw new IOException(e); } We could calculate the progress by comparing the position with the record count of this log segment. @Override public float getProgress() throws IOException, InterruptedException { if (metadata.getRecordCount() > 0) { return ((float) (readPos + 1)) / metadata.getRecordCount(); } return 1; } Once we have *LogSegmentSplit* and the *LogSegmentReader* over a split. We could hook them up to implement distributedlog's InputFormat. Please check out the code for more details. diff --git a/distributedlog-tutorials/distributedlog-mapreduce/pom.xml b/distributedlog-tutorials/distributedlog-mapreduce/pom.xml deleted file mode 100644 index 7a6b67aa0..000000000 --- a/distributedlog-tutorials/distributedlog-mapreduce/pom.xml +++ /dev/null @@ -1,56 +0,0 @@ - - - - 4.0.0 - - distributedlog - org.apache.distributedlog - 0.5.0-incubating-SNAPSHOT - ../.. - - org.apache.distributedlog - distributedlog-mapreduce - Apache DistributedLog :: Tutorials :: MapReduce Tutorial - http://maven.apache.org - - UTF-8 - ${basedir}/lib - - - - org.apache.distributedlog - distributedlog-core - ${project.parent.version} - - - org.apache.distributedlog - distributedlog-proxy-client - ${project.parent.version} - - - org.apache.hadoop - hadoop-common - 2.7.2 - - - org.apache.hadoop - hadoop-mapreduce-client-jobclient - 2.7.2 - - - diff --git a/distributedlog-tutorials/distributedlog-mapreduce/src/main/java/org/apache/distributedlog/mapreduce/DistributedLogInputFormat.java b/distributedlog-tutorials/distributedlog-mapreduce/src/main/java/org/apache/distributedlog/mapreduce/DistributedLogInputFormat.java deleted file mode 100644 index 6fd017cdd..000000000 --- a/distributedlog-tutorials/distributedlog-mapreduce/src/main/java/org/apache/distributedlog/mapreduce/DistributedLogInputFormat.java +++ /dev/null @@ -1,129 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.mapreduce; - -import com.google.common.collect.Lists; -import org.apache.distributedlog.DistributedLogConfiguration; -import org.apache.distributedlog.api.DistributedLogManager; -import org.apache.distributedlog.DLSN; -import org.apache.distributedlog.LogRecordWithDLSN; -import org.apache.distributedlog.LogSegmentMetadata; -import org.apache.distributedlog.api.namespace.Namespace; -import org.apache.distributedlog.impl.BKNamespaceDriver; -import org.apache.distributedlog.api.namespace.NamespaceBuilder; -import org.apache.bookkeeper.client.BKException; -import org.apache.bookkeeper.client.BookKeeper; -import org.apache.bookkeeper.client.BookKeeperAccessor; -import org.apache.bookkeeper.client.LedgerMetadata; -import org.apache.bookkeeper.meta.LedgerManager; -import org.apache.bookkeeper.proto.BookkeeperInternalCallbacks; -import org.apache.hadoop.conf.Configurable; -import org.apache.hadoop.conf.Configuration; -import org.apache.hadoop.mapreduce.InputFormat; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.JobContext; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; - -import java.io.IOException; -import java.net.URI; -import java.util.List; -import java.util.concurrent.CountDownLatch; -import java.util.concurrent.atomic.AtomicInteger; -import java.util.concurrent.atomic.AtomicReference; - -/** - * InputFormat to read data from a distributedlog stream. - */ -public class DistributedLogInputFormat - extends InputFormat implements Configurable { - - private static final String DL_URI = "distributedlog.uri"; - private static final String DL_STREAM = "distributedlog.stream"; - - protected Configuration conf; - protected DistributedLogConfiguration dlConf; - protected URI dlUri; - protected Namespace namespace; - protected String streamName; - protected DistributedLogManager dlm; - - /** {@inheritDoc} */ - @Override - public void setConf(Configuration configuration) { - this.conf = configuration; - dlConf = new DistributedLogConfiguration(); - dlUri = URI.create(configuration.get(DL_URI, "")); - streamName = configuration.get(DL_STREAM, ""); - try { - namespace = NamespaceBuilder.newBuilder() - .conf(dlConf) - .uri(dlUri) - .build(); - dlm = namespace.openLog(streamName); - } catch (IOException e) { - throw new RuntimeException(e); - } - } - - @Override - public Configuration getConf() { - return conf; - } - - @Override - public List getSplits(JobContext jobContext) - throws IOException, InterruptedException { - List segments = dlm.getLogSegments(); - List inputSplits = Lists.newArrayListWithCapacity(segments.size()); - BookKeeper bk = ((BKNamespaceDriver) namespace.getNamespaceDriver()).getReaderBKC().get(); - LedgerManager lm = BookKeeperAccessor.getLedgerManager(bk); - final AtomicInteger rcHolder = new AtomicInteger(0); - final AtomicReference metadataHolder = new AtomicReference(null); - for (LogSegmentMetadata segment : segments) { - final CountDownLatch latch = new CountDownLatch(1); - lm.readLedgerMetadata(segment.getLogSegmentId(), - new BookkeeperInternalCallbacks.GenericCallback() { - @Override - public void operationComplete(int rc, LedgerMetadata ledgerMetadata) { - metadataHolder.set(ledgerMetadata); - rcHolder.set(rc); - latch.countDown(); - } - }); - latch.await(); - if (BKException.Code.OK != rcHolder.get()) { - throw new IOException("Faild to get log segment metadata for " + segment + " : " - + BKException.getMessage(rcHolder.get())); - } - inputSplits.add(new LogSegmentSplit(segment, metadataHolder.get())); - } - return inputSplits; - } - - @Override - public RecordReader createRecordReader( - InputSplit inputSplit, TaskAttemptContext taskAttemptContext) - throws IOException, InterruptedException { - return new LogSegmentReader( - streamName, - dlConf, - ((BKNamespaceDriver) namespace.getNamespaceDriver()).getReaderBKC().get(), - (LogSegmentSplit) inputSplit); - } -} diff --git a/distributedlog-tutorials/distributedlog-mapreduce/src/main/java/org/apache/distributedlog/mapreduce/LogSegmentReader.java b/distributedlog-tutorials/distributedlog-mapreduce/src/main/java/org/apache/distributedlog/mapreduce/LogSegmentReader.java deleted file mode 100644 index 541db3b81..000000000 --- a/distributedlog-tutorials/distributedlog-mapreduce/src/main/java/org/apache/distributedlog/mapreduce/LogSegmentReader.java +++ /dev/null @@ -1,148 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.mapreduce; - -import org.apache.distributedlog.*; -import org.apache.bookkeeper.client.BKException; -import org.apache.bookkeeper.client.BookKeeper; -import org.apache.bookkeeper.client.LedgerEntry; -import org.apache.bookkeeper.client.LedgerHandle; -import org.apache.hadoop.mapreduce.InputSplit; -import org.apache.hadoop.mapreduce.RecordReader; -import org.apache.hadoop.mapreduce.TaskAttemptContext; - -import java.io.IOException; -import java.util.Enumeration; - -import static com.google.common.base.Charsets.UTF_8; - -/** - * Record Reader to read from a log segment split - */ -class LogSegmentReader extends RecordReader { - - final String streamName; - final BookKeeper bk; - final LedgerHandle lh; - final LogSegmentMetadata metadata; - - long entryId = -1L; - Entry.Reader reader = null; - LogRecordWithDLSN currentRecord = null; - int readPos = 0; - - LogSegmentReader(String streamName, - DistributedLogConfiguration conf, - BookKeeper bk, - LogSegmentSplit split) - throws IOException { - this.streamName = streamName; - this.bk = bk; - this.metadata = split.getMetadata(); - try { - this.lh = bk.openLedgerNoRecovery( - split.getLogSegmentId(), - BookKeeper.DigestType.CRC32, - conf.getBKDigestPW().getBytes(UTF_8)); - } catch (BKException e) { - throw new IOException(e); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new IOException(e); - } - } - - @Override - public void initialize(InputSplit split, TaskAttemptContext context) - throws IOException, InterruptedException { - // do nothing - } - - @Override - public boolean nextKeyValue() - throws IOException, InterruptedException { - LogRecordWithDLSN record; - currentRecord = null; - if (null != reader) { - record = reader.nextRecord(); - if (null != record) { - currentRecord = record; - readPos = record.getPositionWithinLogSegment(); - return true; - } else { - return false; - } - } - ++entryId; - if (entryId > lh.getLastAddConfirmed()) { - return false; - } - try { - Enumeration entries = - lh.readEntries(entryId, entryId); - if (entries.hasMoreElements()) { - LedgerEntry entry = entries.nextElement(); - reader = Entry.newBuilder() - .setLogSegmentInfo(metadata.getLogSegmentSequenceNumber(), - metadata.getStartSequenceId()) - .setEntryId(entry.getEntryId()) - .setEnvelopeEntry( - LogSegmentMetadata.supportsEnvelopedEntries(metadata.getVersion())) - .deserializeRecordSet(true) - .setInputStream(entry.getEntryInputStream()) - .buildReader(); - } - return nextKeyValue(); - } catch (BKException e) { - throw new IOException(e); - } - } - - @Override - public DLSN getCurrentKey() - throws IOException, InterruptedException { - return currentRecord.getDlsn(); - } - - @Override - public LogRecordWithDLSN getCurrentValue() - throws IOException, InterruptedException { - return currentRecord; - } - - @Override - public float getProgress() - throws IOException, InterruptedException { - if (metadata.getRecordCount() > 0) { - return ((float) (readPos + 1)) / metadata.getRecordCount(); - } - return 1; - } - - @Override - public void close() throws IOException { - try { - lh.close(); - } catch (InterruptedException e) { - Thread.currentThread().interrupt(); - throw new IOException(e); - } catch (BKException e) { - throw new IOException(e); - } - } -} diff --git a/distributedlog-tutorials/distributedlog-mapreduce/src/main/java/org/apache/distributedlog/mapreduce/LogSegmentSplit.java b/distributedlog-tutorials/distributedlog-mapreduce/src/main/java/org/apache/distributedlog/mapreduce/LogSegmentSplit.java deleted file mode 100644 index 132e24d3b..000000000 --- a/distributedlog-tutorials/distributedlog-mapreduce/src/main/java/org/apache/distributedlog/mapreduce/LogSegmentSplit.java +++ /dev/null @@ -1,95 +0,0 @@ -/** - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ -package org.apache.distributedlog.mapreduce; - -import com.google.common.collect.Sets; -import org.apache.distributedlog.LogSegmentMetadata; -import org.apache.bookkeeper.client.LedgerMetadata; -import org.apache.bookkeeper.net.BookieSocketAddress; -import org.apache.bookkeeper.versioning.Version; -import org.apache.hadoop.io.Writable; -import org.apache.hadoop.mapreduce.InputSplit; - -import java.io.DataInput; -import java.io.DataOutput; -import java.io.IOException; -import java.util.ArrayList; -import java.util.Set; - -import static com.google.common.base.Charsets.UTF_8; - -/** - * A input split that reads from a log segment. - */ -public class LogSegmentSplit extends InputSplit implements Writable { - - private LogSegmentMetadata logSegmentMetadata; - private LedgerMetadata ledgerMetadata; - - public LogSegmentSplit() {} - - public LogSegmentSplit(LogSegmentMetadata logSegmentMetadata, - LedgerMetadata ledgerMetadata) { - this.logSegmentMetadata = logSegmentMetadata; - this.ledgerMetadata = ledgerMetadata; - } - - public LogSegmentMetadata getMetadata() { - return logSegmentMetadata; - } - - public long getLogSegmentId() { - return logSegmentMetadata.getLogSegmentId(); - } - - @Override - public long getLength() - throws IOException, InterruptedException { - return logSegmentMetadata.getRecordCount(); - } - - @Override - public String[] getLocations() - throws IOException, InterruptedException { - Set locations = Sets.newHashSet(); - for (ArrayList ensemble : ledgerMetadata.getEnsembles().values()) { - for (BookieSocketAddress host : ensemble) { - locations.add(host.getHostName()); - } - } - return locations.toArray(new String[locations.size()]); - } - - @Override - public void write(DataOutput dataOutput) throws IOException { - String lsMetadataStr = logSegmentMetadata.getFinalisedData(); - dataOutput.writeUTF(lsMetadataStr); - String lhMetadataStr = new String(ledgerMetadata.serialize(), UTF_8); - dataOutput.writeUTF(lhMetadataStr); - } - - @Override - public void readFields(DataInput dataInput) throws IOException { - String lsMetadataStr = dataInput.readUTF(); - logSegmentMetadata = LogSegmentMetadata.parseData("", - lsMetadataStr.getBytes(UTF_8)); - String lhMetadataStr = dataInput.readUTF(); - ledgerMetadata = LedgerMetadata.parseConfig(lhMetadataStr.getBytes(UTF_8), - Version.ANY); - } -} diff --git a/distributedlog-tutorials/pom.xml b/distributedlog-tutorials/pom.xml index 56957b166..2cf912749 100644 --- a/distributedlog-tutorials/pom.xml +++ b/distributedlog-tutorials/pom.xml @@ -29,7 +29,6 @@ distributedlog-basic distributedlog-messaging distributedlog-kafka - distributedlog-mapreduce UTF-8 diff --git a/pom.xml b/pom.xml index bb81cddde..77a3de554 100644 --- a/pom.xml +++ b/pom.xml @@ -117,7 +117,7 @@ 4.6.0 1.6.4 0.0.58 - 3.5.1-alpha + 3.5.3-beta 0.7 2.7 @@ -260,11 +260,4 @@ - - - twitter-repo - Twitter Maven Repo - http://maven.twttr.com - - From 6957e46ae6f74495aaaf0d4bd706f907abd43057 Mon Sep 17 00:00:00 2001 From: Jia Zhai Date: Sun, 30 Jul 2017 21:52:45 +0800 Subject: [PATCH 3/9] pass compile --- .../tools/DistributedLogTool.java | 2 +- pom.xml | 17 ++++++++++++----- 2 files changed, 13 insertions(+), 6 deletions(-) diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/tools/DistributedLogTool.java b/distributedlog-core/src/main/java/org/apache/distributedlog/tools/DistributedLogTool.java index eea49e669..2d3c95144 100644 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/tools/DistributedLogTool.java +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/tools/DistributedLogTool.java @@ -1665,7 +1665,7 @@ protected void repairLogSegment(BookKeeperAdmin bkAdmin, System.out.println("Skip inprogress log segment " + segment); return; } - LedgerHandle lh = bkAdmin.openLedger(segment.getLogSegmentId(), true); + LedgerHandle lh = bkAdmin.openLedger(segment.getLogSegmentId()); long lac = lh.getLastAddConfirmed(); Enumeration entries = lh.readEntries(lac, lac); if (!entries.hasMoreElements()) { diff --git a/pom.xml b/pom.xml index 77a3de554..79503157e 100644 --- a/pom.xml +++ b/pom.xml @@ -171,7 +171,7 @@ maven-assembly-plugin ${maven-assembly-plugin.version} - gnu + gnu src/assemble/src.xml @@ -230,12 +230,12 @@ ChangeLog **/README.md **/apidocs/* - GROUPS - OWNERS - CONFIG.ini + GROUPS + OWNERS + CONFIG.ini **/**.md scripts/dev/reviewers - src/main/resources/DISCLAIMER.bin.txt + src/main/resources/DISCLAIMER.bin.txt **/dependency-reduced-pom.xml **/org/apache/distributedlog/thrift/* @@ -260,4 +260,11 @@ + + + twitter-repo + Twitter Maven Repo + http://maven.twttr.com + + From a047cf66cbf9ff21dbb3943a6f106713537683d6 Mon Sep 17 00:00:00 2001 From: Jia Zhai Date: Sun, 30 Jul 2017 23:16:27 +0800 Subject: [PATCH 4/9] move twttr repo --- distributedlog-tutorials/distributedlog-messaging/pom.xml | 7 +++++++ pom.xml | 7 ------- 2 files changed, 7 insertions(+), 7 deletions(-) diff --git a/distributedlog-tutorials/distributedlog-messaging/pom.xml b/distributedlog-tutorials/distributedlog-messaging/pom.xml index 97af77baa..c94d14826 100644 --- a/distributedlog-tutorials/distributedlog-messaging/pom.xml +++ b/distributedlog-tutorials/distributedlog-messaging/pom.xml @@ -114,4 +114,11 @@ + + + twitter-repo + Twitter Maven Repo + http://maven.twttr.com + + diff --git a/pom.xml b/pom.xml index 79503157e..3f94e6aad 100644 --- a/pom.xml +++ b/pom.xml @@ -260,11 +260,4 @@ - - - twitter-repo - Twitter Maven Repo - http://maven.twttr.com - - From 67014df76842d41cee25a7c393414fa88e1e532f Mon Sep 17 00:00:00 2001 From: Jia Zhai Date: Mon, 31 Jul 2017 10:13:11 +0800 Subject: [PATCH 5/9] bump curator to 4.0 --- pom.xml | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pom.xml b/pom.xml index 3f94e6aad..783b27f53 100644 --- a/pom.xml +++ b/pom.xml @@ -103,7 +103,7 @@ 1.6 2.6 3.3.2 - 3.2.1 + 4.0.0 6.34.0 1.12.3 20.0 From 892ebe23753aee0de3513c94e17759c8c97bd6ca Mon Sep 17 00:00:00 2001 From: Jia Zhai Date: Wed, 9 Aug 2017 10:11:07 +0800 Subject: [PATCH 6/9] exclude .repo --- distributedlog-tutorials/distributedlog-messaging/pom.xml | 2 +- pom.xml | 1 + 2 files changed, 2 insertions(+), 1 deletion(-) diff --git a/distributedlog-tutorials/distributedlog-messaging/pom.xml b/distributedlog-tutorials/distributedlog-messaging/pom.xml index c94d14826..5801b7290 100644 --- a/distributedlog-tutorials/distributedlog-messaging/pom.xml +++ b/distributedlog-tutorials/distributedlog-messaging/pom.xml @@ -61,7 +61,7 @@ org.apache.thrift libthrift - 0.5.0-1 + ${libthrift.version} com.twitter diff --git a/pom.xml b/pom.xml index 783b27f53..61867e214 100644 --- a/pom.xml +++ b/pom.xml @@ -238,6 +238,7 @@ src/main/resources/DISCLAIMER.bin.txt **/dependency-reduced-pom.xml **/org/apache/distributedlog/thrift/* + .repository/** From df71a83286f95ebd18bbbcfbf4ca86d6a7195f15 Mon Sep 17 00:00:00 2001 From: Jia Zhai Date: Tue, 15 Aug 2017 23:27:21 -0700 Subject: [PATCH 7/9] reduce the number of threads used for tests --- .../distributedlog/LocalDLMEmulator.java | 33 ------------------- .../src/test/resources/bk_server.conf | 10 +++--- 2 files changed, 5 insertions(+), 38 deletions(-) diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/LocalDLMEmulator.java b/distributedlog-core/src/main/java/org/apache/distributedlog/LocalDLMEmulator.java index f2c510ddf..bf3a4910f 100644 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/LocalDLMEmulator.java +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/LocalDLMEmulator.java @@ -21,7 +21,6 @@ import org.apache.distributedlog.impl.metadata.BKDLConfig; import org.apache.distributedlog.metadata.DLMetadata; import org.apache.bookkeeper.conf.ServerConfiguration; -import org.apache.bookkeeper.proto.BookieServer; import org.apache.bookkeeper.shims.zk.ZooKeeperServerShim; import org.apache.bookkeeper.util.IOUtils; import org.apache.bookkeeper.util.LocalBookKeeper; @@ -33,14 +32,12 @@ import org.apache.zookeeper.ZooKeeper; import org.slf4j.Logger; import org.slf4j.LoggerFactory; - import java.io.File; import java.io.IOException; import java.net.BindException; import java.net.URI; import java.util.ArrayList; import java.util.List; -import java.util.UUID; import java.util.concurrent.CountDownLatch; import java.util.concurrent.TimeUnit; @@ -180,36 +177,6 @@ public URI getUri() { return uri; } - public BookieServer newBookie() throws Exception { - ServerConfiguration bookieConf = new ServerConfiguration(); - bookieConf.setZkTimeout(zkTimeoutSec * 1000); - bookieConf.setBookiePort(0); - bookieConf.setAllowLoopback(true); - File tmpdir = File.createTempFile("bookie" + UUID.randomUUID() + "_", - "test"); - if (!tmpdir.delete()) { - LOG.debug("Fail to delete tmpdir " + tmpdir); - } - if (!tmpdir.mkdir()) { - throw new IOException("Fail to create tmpdir " + tmpdir); - } - tmpDirs.add(tmpdir); - - bookieConf.setZkServers(zkEnsemble); - bookieConf.setJournalDirName(tmpdir.getPath()); - bookieConf.setLedgerDirNames(new String[]{tmpdir.getPath()}); - - BookieServer b = new BookieServer(bookieConf); - b.start(); - for (int i = 0; i < 10 && !b.isRunning(); i++) { - Thread.sleep(10000); - } - if (!b.isRunning()) { - throw new IOException("Bookie would not start"); - } - return b; - } - /** * Check that a number of bookies are available * diff --git a/distributedlog-core/src/test/resources/bk_server.conf b/distributedlog-core/src/test/resources/bk_server.conf index bd5ae936e..0a746d680 100644 --- a/distributedlog-core/src/test/resources/bk_server.conf +++ b/distributedlog-core/src/test/resources/bk_server.conf @@ -122,7 +122,7 @@ writeBufferSizeBytes=524288 journalFlushWhenQueueEmpty=false journalRemoveFromPageCache=true journalAdaptiveGroupWrites=true -journalMaxGroupWaitMSec=15 +journalMaxGroupWaitMSec=2 journalBufferedEntriesThreshold=180 journalBufferedWritesThreshold=262144 journalMaxGroupedEntriesToCommit=200 @@ -138,8 +138,8 @@ fileInfoCacheInitialCapacity=10000 fileInfoMaxIdleTime=3600 # Bookie Threads Settings -numAddWorkerThreads=24 -numJournalCallbackThreads=48 -numReadWorkerThreads=72 -numLongPollWorkerThreads=72 +numAddWorkerThreads=1 +numJournalCallbackThreads=1 +numReadWorkerThreads=4 +numLongPollWorkerThreads=4 From cb59cbb0d514dc2bd3df51be82692db0cedcdf6c Mon Sep 17 00:00:00 2001 From: Jia Zhai Date: Thu, 17 Aug 2017 09:01:49 +0800 Subject: [PATCH 8/9] change according comments --- .../java/org/apache/distributedlog/impl/BKNamespaceDriver.java | 2 +- pom.xml | 2 +- 2 files changed, 2 insertions(+), 2 deletions(-) diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/BKNamespaceDriver.java b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/BKNamespaceDriver.java index 93a218fec..36ff43723 100644 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/impl/BKNamespaceDriver.java +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/impl/BKNamespaceDriver.java @@ -258,7 +258,7 @@ static EventLoopGroup getDefaultEventLoopGroup(int numThreads) { try { return new EpollEventLoopGroup(numThreads, threadFactory); } catch (Throwable t) { - LOG.warn("Could not use Netty Epoll event loop for bookie server: {}", t.getMessage()); + LOG.warn("Could not use Netty Epoll event loop for bookie server:", t); return new NioEventLoopGroup(numThreads, threadFactory); } } else { diff --git a/pom.xml b/pom.xml index 1921bf35a..2712b92ea 100644 --- a/pom.xml +++ b/pom.xml @@ -97,7 +97,7 @@ UTF-8 UTF-8 - 4.5.0-SNAPSHOT + 4.5.0 3.0.1 1.1 1.6 From 875bc8e7135508b57107b508309f215115add968 Mon Sep 17 00:00:00 2001 From: Jia Zhai Date: Thu, 17 Aug 2017 14:43:28 +0800 Subject: [PATCH 9/9] add comments to track RecoverCommand --- .../java/org/apache/distributedlog/tools/DistributedLogTool.java | 1 + 1 file changed, 1 insertion(+) diff --git a/distributedlog-core/src/main/java/org/apache/distributedlog/tools/DistributedLogTool.java b/distributedlog-core/src/main/java/org/apache/distributedlog/tools/DistributedLogTool.java index 2d3c95144..88c5d0fc5 100644 --- a/distributedlog-core/src/main/java/org/apache/distributedlog/tools/DistributedLogTool.java +++ b/distributedlog-core/src/main/java/org/apache/distributedlog/tools/DistributedLogTool.java @@ -2856,6 +2856,7 @@ public DistributedLogTool() { addCommand(new ListCommand()); addCommand(new ReadLastConfirmedCommand()); addCommand(new ReadEntriesCommand()); + // TODO: Fix it later, tracking by https://github.com/apache/distributedlog/issues/150 // addCommand(new RecoverCommand()); addCommand(new RecoverLedgerCommand()); addCommand(new ShowCommand());