Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

DL-124: Use Java8 Future rather than twitter Future #133

Closed
wants to merge 15 commits into from

Conversation

sijie
Copy link
Member

@sijie sijie commented Jun 2, 2017

Switch to use Java8 CompletableFuture, to reduce dependencies introduced by twitter future and make it more friendly to users (users don't think of using which version of scala).

This change is based on #132 . Gitsha ce0686e is the change to review.

The changes:

  • Change Future to CompletableFuture
  • Map to thenApply
  • flatMap to thenCompose
  • Added a FutureEventListener, and switch addEvenListener to whenComplete (or whenCompleteAsync)
  • setValue to complete
  • setException to completeExceptionally
  • add rescue, ignore, ensure to FutureUtils as util functions.

@sijie
Copy link
Member Author

sijie commented Jun 2, 2017

tag @fcuny @mgodave @leighst @jiazhai for reviews

@asfbot
Copy link

asfbot commented Jun 3, 2017

Build finished.
--none--

lastTruncationAttempt = writeHandler.purgeLogSegmentsOlderThanTimestamp(minTimestampToKeep);
}
}
}

private Future<BKLogSegmentWriter> asyncStartNewLogSegment(final BKLogWriteHandler writeHandler,
private CompletableFuture<BKLogSegmentWriter> asyncStartNewLogSegment(final BKLogWriteHandler writeHandler,
final long startTxId,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please also do the code alignment change here.

return readInternal(numEntries, 0, TimeUnit.MILLISECONDS);
}

@Override
public synchronized Future<List<LogRecordWithDLSN>> readBulk(int numEntries,
public synchronized CompletableFuture<List<LogRecordWithDLSN>> readBulk(int numEntries,
long waitTime,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please also do the code alignment change here.

@@ -404,7 +392,7 @@ public String getStreamName() {
* num entries to read
* @return A promise that satisfied with a non-empty list of log records with their DLSN.
*/
private synchronized Future<List<LogRecordWithDLSN>> readInternal(int numEntries,
private synchronized CompletableFuture<List<LogRecordWithDLSN>> readInternal(int numEntries,
long deadlineTime,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please also do the code alignment change here.

@@ -206,7 +197,7 @@ private BKLogSegmentWriter getCachedLogSegmentWriter() throws WriteException {
}
}

private Future<BKLogSegmentWriter> getLogSegmentWriter(long firstTxid,
private CompletableFuture<BKLogSegmentWriter> getLogSegmentWriter(long firstTxid,
boolean bestEffort,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please also do the code alignment change in this file, at line 201, 212, 242, 272,

}

private Future<DLSN> getDLSNNotLessThanTxId(long fromTxnId,
private CompletableFuture<DLSN> getDLSNNotLessThanTxId(long fromTxnId,
final List<LogSegmentMetadata> segments) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please also do the code alignment change in this file, also at line 565, 707, 838.

return asyncReadLastRecord(l, false, false, false);
}

public Future<LogRecordWithDLSN> asyncReadLastRecord(final LogSegmentMetadata l,
public CompletableFuture<LogRecordWithDLSN> asyncReadLastRecord(final LogSegmentMetadata l,
final boolean fence,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please also do the code alignment change here.

}

protected Future<BKLogSegmentWriter> asyncStartLogSegment(final long txId,
protected CompletableFuture<BKLogSegmentWriter> asyncStartLogSegment(final long txId,
final boolean bestEffort,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please also do the code alignment change here.

}
}
}, null);
return promise;
}

public Future<Void> deleteLedger(long lid,
public CompletableFuture<Void> deleteLedger(long lid,
final boolean ignoreNonExistentLedger) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please also do the code alignment change here.


// Ledger configuration
private final QuorumConfigProvider quorumConfigProvider;

static Future<Versioned<byte[]>> getAndCreateAllocationData(final String allocatePath,
static CompletableFuture<Versioned<byte[]>> getAndCreateAllocationData(final String allocatePath,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please also do the code alignment change here, and also at line 128, 159.

KeeperException.Code.get(rc)));
}
}

@Override
public Future<Versioned<List<String>>> getLogSegmentNames(String logSegmentsPath,
public CompletableFuture<Versioned<List<String>>> getLogSegmentNames(String logSegmentsPath,
LogSegmentNamesListener listener) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please also do the code alignment change here.

@@ -186,13 +185,13 @@ LedgerAllocator createLedgerAllocator(LogMetadataForWriter logMetadata,
//

@Override
public Future<LogSegmentEntryReader> openReader(LogSegmentMetadata segment,
public CompletableFuture<LogSegmentEntryReader> openReader(LogSegmentMetadata segment,
long startEntryId) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please also do the code alignment change here, also at line 244.

@@ -237,15 +227,13 @@ public DistributedLock createWriteLock(LogMetadataForWriter metadata) {
// Create Read Lock
//

private Future<Void> ensureReadLockPathExist(final LogMetadata logMetadata,
private CompletableFuture<Void> ensureReadLockPathExist(final LogMetadata logMetadata,
final String readLockPath) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please also do the code alignment change here, and line 269, 312, 521, 572.

@@ -1088,9 +1063,9 @@ public String getActionName() {
});
}

private Future<String> checkLockOwnerAndWaitIfPossible(final LockWatcher lockWatcher,
private CompletableFuture<String> checkLockOwnerAndWaitIfPossible(final LockWatcher lockWatcher,
final boolean wait) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please also do the code alignment change here.

@@ -58,7 +57,7 @@
* @param startEntryId the start entry id
* @return future represent the opened reader
*/
Future<LogSegmentEntryReader> openReader(LogSegmentMetadata segment,
CompletableFuture<LogSegmentEntryReader> openReader(LogSegmentMetadata segment,
long startEntryId);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please also do the code alignment change here, and line 71

@@ -73,7 +72,7 @@ private String formatLogSegmentSequenceNumber(long logSegmentSeqNo) {
}

@Override
public Future<LogSegmentMetadata> changeSequenceNumber(LogSegmentMetadata segment,
public CompletableFuture<LogSegmentMetadata> changeSequenceNumber(LogSegmentMetadata segment,
long logSegmentSeqNo) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please also do the code alignment change here.

@@ -59,7 +59,7 @@
* @param readerId the reader id used for lock
* @return the read lock
*/
Future<DistributedLock> createReadLock(LogMetadataForReader metadata,
CompletableFuture<DistributedLock> createReadLock(LogMetadataForReader metadata,
Optional<String> readerId);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please also do the code alignment change here, and line 83.

@@ -56,7 +56,7 @@
* ledger sequence number to change.
* @return new log segment
*/
Future<LogSegmentMetadata> changeSequenceNumber(LogSegmentMetadata segment,
CompletableFuture<LogSegmentMetadata> changeSequenceNumber(LogSegmentMetadata segment,
long logSegmentSeqNo);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please also do the code alignment change here, and line 102.

}
}
}, null);
return promise;
}

public static Future<Void> asyncClose(@Nullable AsyncCloseable closeable,
public static CompletableFuture<Void> asyncClose(@Nullable AsyncCloseable closeable,
boolean swallowIOException) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please also do the code alignment change here, and line 573, 586.

Copy link
Member

@jiazhai jiazhai left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1, LGTM.
some small comments related to code alignment.
This changed so many places, thanks for the time.

@leighst
Copy link
Contributor

leighst commented Jun 9, 2017

Is there some way we can make migration easier? We're going to have to do a lot of translation to twitter futures when we integrate. Should we be thinking about some kind of adapter layer?

@@ -27,12 +27,12 @@
<dependencies>
<dependency>
<groupId>org.apache.distributedlog</groupId>
<artifactId>distributedlog-client</artifactId>
<artifactId>distributedlog-proxy-client</artifactId>
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

this change was based on #132. github doesn't allow pull request based on the other branch.

FutureUtils.cancel(acquireFuture);
return null;
final CompletableFuture<Void> threadAcquirePromise = new CompletableFuture<Void>();
threadAcquirePromise.whenComplete((value, cause) -> {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

do i understand correctly java8 future doesn't really have the concept of interrupt?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

correctly, java8 future doesn't have the concept of interrupt. I try to simulate the similar thing.

+ BKException.getMessage(transmitResult), transmitResult);
}

public static <T> T ioResult(CompletableFuture<T> result) throws IOException {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

why not have a java8 version of FutureUtils?
seems weird to throw all of this into one file

@sijie
Copy link
Member Author

sijie commented Jun 9, 2017

@leighst : let's discuss this tomorrow face to face. I am thinking of a new api with java8 future that works for both core-library and proxy. that means the way to distinguish core and library is just the uri. we can have the old twitter-future based api wrapper over the new java8 future api. so it will be backward compatible.

@sijie
Copy link
Member Author

sijie commented Jun 13, 2017

@leighst - I pushed a new change:

  • moved the interfaces to org.apache.distributedlog.api, which they are using Java8 interface.
  • moved the old interfaces to distributedlog-core-twitter module, they are are backward compatible to old application.

So the old application can use distributedlog-core-twitter and keep the binary compatible.

@sijie
Copy link
Member Author

sijie commented Jun 13, 2017

Let me know if this approach works for you.

@asfbot
Copy link

asfbot commented Jun 13, 2017

Build finished.
--none--

1 similar comment
@asfbot
Copy link

asfbot commented Jun 13, 2017

Build finished.
--none--

@asfbot
Copy link

asfbot commented Jun 13, 2017

Build finished.
--none--

log4j.logger.org.apache.bookkeeper.util.SafeRunnable=INFO, Executors
log4j.additivity.org.apache.distributedlog.util.MonitoredFuturePool=false
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

i think this will result in duplicate messages?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

MonitoredFuturePool and MonitoredScheduledThreadPoolExecutor are already removed in this patch. because in Java8, you can configure a thread where the callbacks will be executed, there is no need for additional future pool, we just use the ordered scheduler.

@asfgit asfgit closed this in 53fca4a Jun 21, 2017
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment
Labels
None yet
Projects
None yet
Development

Successfully merging this pull request may close these issues.

None yet

4 participants