Skip to content

Commit

Permalink
Merge ce0686e into 52c0eef
Browse files Browse the repository at this point in the history
  • Loading branch information
sijie committed Jun 2, 2017
2 parents 52c0eef + ce0686e commit 9c85042
Show file tree
Hide file tree
Showing 405 changed files with 7,175 additions and 5,255 deletions.
3 changes: 3 additions & 0 deletions .gitignore
Expand Up @@ -27,3 +27,6 @@ logs/

# vagrant
.vagrant

# shade plugins
*dependency-reduced-pom.xml
4 changes: 2 additions & 2 deletions distributedlog-benchmark/pom.xml
Expand Up @@ -27,12 +27,12 @@
<dependencies>
<dependency>
<groupId>org.apache.distributedlog</groupId>
<artifactId>distributedlog-client</artifactId>
<artifactId>distributedlog-proxy-client</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
<groupId>org.apache.distributedlog</groupId>
<artifactId>distributedlog-service</artifactId>
<artifactId>distributedlog-proxy-server</artifactId>
<version>${project.parent.version}</version>
</dependency>
<dependency>
Expand Down
Expand Up @@ -27,9 +27,9 @@
import org.apache.distributedlog.benchmark.utils.ShiftableRateLimiter;
import org.apache.distributedlog.namespace.DistributedLogNamespace;
import org.apache.distributedlog.namespace.DistributedLogNamespaceBuilder;
import org.apache.distributedlog.util.FutureEventListener;
import org.apache.distributedlog.util.FutureUtils;
import org.apache.distributedlog.util.SchedulerUtils;
import com.twitter.util.FutureEventListener;
import java.io.IOException;
import java.net.URI;
import java.util.ArrayList;
Expand Down Expand Up @@ -120,7 +120,7 @@ public void run() {
FutureUtils.result(writer.asyncClose());
}
latch.countDown();
} catch (IOException e) {
} catch (Exception e) {
LOG.error("Failed to intialize writer for stream : {}", streamName, e);
}

Expand Down Expand Up @@ -148,7 +148,7 @@ void rescueWriter(int idx, AsyncLogWriter writer) {
if (streamWriters.get(idx) == writer) {
try {
FutureUtils.result(writer.asyncClose());
} catch (IOException e) {
} catch (Exception e) {
LOG.error("Failed to close writer for stream {}.", idx);
}
AsyncLogWriter newWriter = null;
Expand Down Expand Up @@ -185,7 +185,7 @@ public void close() throws IOException {
SchedulerUtils.shutdownScheduler(this.executorService, 2, TimeUnit.MINUTES);
SchedulerUtils.shutdownScheduler(this.rescueService, 2, TimeUnit.MINUTES);
for (AsyncLogWriter writer : streamWriters) {
FutureUtils.result(writer.asyncClose());
org.apache.distributedlog.util.Utils.ioResult(writer.asyncClose());
}
for (DistributedLogManager dlm : dlms) {
dlm.close();
Expand Down Expand Up @@ -225,7 +225,7 @@ public void run() {
LOG.error("Error on generating message : ", e);
break;
}
writer.write(new LogRecord(requestMillis, data)).addEventListener(new FutureEventListener<DLSN>() {
writer.write(new LogRecord(requestMillis, data)).whenComplete(new FutureEventListener<DLSN>() {
@Override
public void onSuccess(DLSN value) {
requestStat.registerSuccessfulEvent(System.currentTimeMillis() - requestMillis);
Expand Down
Expand Up @@ -100,7 +100,8 @@ public class ReaderWorker implements Worker {
final Counter invalidRecordsCounter;
final Counter outOfOrderSequenceIdCounter;

class StreamReader implements FutureEventListener<List<LogRecordWithDLSN>>, Runnable, Gauge<Number> {
class StreamReader implements
org.apache.distributedlog.util.FutureEventListener<List<LogRecordWithDLSN>>, Runnable, Gauge<Number> {

final int streamIdx;
final String streamName;
Expand Down Expand Up @@ -184,7 +185,7 @@ void readLoop() {
if (!running) {
return;
}
logReaders[streamIdx].readBulk(10).addEventListener(this);
logReaders[streamIdx].readBulk(10).whenComplete(this);
}

@Override
Expand Down Expand Up @@ -369,7 +370,7 @@ private void reinitStream(int idx, Promise<Void> promise) {
if (logReaders[idx] != null) {
try {
FutureUtils.result(logReaders[idx].asyncClose());
} catch (IOException e) {
} catch (Exception e) {
LOG.warn("Failed on closing stream reader {} : ", streamName, e);
}
logReaders[idx] = null;
Expand Down Expand Up @@ -434,7 +435,7 @@ public void close() throws IOException {
this.running = false;
for (AsyncLogReader reader : logReaders) {
if (null != reader) {
FutureUtils.result(reader.asyncClose());
org.apache.distributedlog.util.Utils.ioResult(reader.asyncClose());
}
}
for (DistributedLogManager dlm : dlms) {
Expand Down
Expand Up @@ -28,6 +28,7 @@
import org.apache.distributedlog.io.CompressionCodec;
import org.apache.distributedlog.service.DistributedLogClient;
import org.apache.distributedlog.service.DistributedLogClientBuilder;
import org.apache.distributedlog.thrift.service.StatusCode;
import org.apache.distributedlog.util.SchedulerUtils;
import com.twitter.finagle.builder.ClientBuilder;
import com.twitter.finagle.stats.StatsReceiver;
Expand Down Expand Up @@ -278,7 +279,7 @@ public void run() {
exceptionsLogger.getCounter(cause.getClass().getName()).inc();
if (cause instanceof DLException) {
DLException dle = (DLException) cause;
dlErrorCodeLogger.getCounter(dle.getCode().toString()).inc();
dlErrorCodeLogger.getCounter(StatusCode.findByValue(dle.getCode()).toString()).inc();
}
}
}
Expand Down
Expand Up @@ -112,7 +112,7 @@ protected void benchmark(DistributedLogNamespace namespace, String logName, Stat
openReaderStats.registerSuccessfulEvent(elapsedMs);
logger.info("It took {} ms to position the reader to transaction id = {}, dlsn = {}",
lastTxId, lastDLSN);
} catch (IOException ioe) {
} catch (Exception ioe) {
openReaderStats.registerFailedEvent(stopwatch.elapsed(TimeUnit.MICROSECONDS));
logger.warn("Failed to create reader for stream {} reading from tx id = {}, dlsn = {}.",
new Object[] { streamName, lastTxId, lastDLSN });
Expand Down Expand Up @@ -141,7 +141,7 @@ protected void benchmark(DistributedLogNamespace namespace, String logName, Stat
lastDLSN = lastRecord.getDlsn();
}
stopwatch.reset();
} catch (IOException e) {
} catch (Exception e) {
logger.warn("Encountered reading record from stream {} : ", streamName, e);
reader = null;
break;
Expand Down
Expand Up @@ -240,7 +240,7 @@ page at http://checkstyle.sourceforge.net/config.html -->
<module name="MethodNameCheck">
<!-- Validates identifiers for method names. -->
<metadata name="altname" value="MethodName"/>
<property name="format" value="^[a-z][a-zA-Z0-9]*(_[a-zA-Z0-9]+)*$"/>
<property name="format" value="(^[a-z][a-zA-Z0-9]*(_[a-zA-Z0-9]+)*$|Void)"/>
<property name="severity" value="error"/>
</module>

Expand Down Expand Up @@ -271,12 +271,12 @@ page at http://checkstyle.sourceforge.net/config.html -->
</module>

<module name="MethodTypeParameterName">
<property name="format" value="^(((T|K|V|W|X)[0-9]*)|([A-Z][a-z][a-zA-Z]*T))$"/>
<property name="format" value="^(((T|K|V|W|X|R)[0-9]*)|([A-Z][a-z][a-zA-Z]*T))$"/>
<property name="severity" value="error"/>
</module>

<module name="InterfaceTypeParameterName">
<property name="format" value="^(((T|K|V|W|X)[0-9]*)|([A-Z][a-z][a-zA-Z]*T))$"/>
<property name="format" value="^(((T|K|V|W|X|R)[0-9]*)|([A-Z][a-z][a-zA-Z]*T))$"/>
<property name="severity" value="error"/>
</module>

Expand Down
79 changes: 42 additions & 37 deletions distributedlog-core/pom.xml
Expand Up @@ -64,20 +64,10 @@
<version>${slf4j.version}</version>
</dependency>
<dependency>
<groupId>com.twitter.common</groupId>
<artifactId>stats-util</artifactId>
<version>${stats-util.version}</version>
<exclusions>
<exclusion>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-jdk14</artifactId>
</exclusion>
</exclusions>
</dependency>
<dependency>
<groupId>com.twitter</groupId>
<artifactId>util-core_2.11</artifactId>
<version>${finagle.version}</version>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>${slf4j.version}</version>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
Expand All @@ -87,12 +77,7 @@
<dependency>
<groupId>org.apache.thrift</groupId>
<artifactId>libthrift</artifactId>
<version>${libthrift.version}</version>
</dependency>
<dependency>
<groupId>com.twitter</groupId>
<artifactId>scrooge-core_2.11</artifactId>
<version>${scrooge.version}</version>
<version>0.9.3</version>
</dependency>
<dependency>
<groupId>org.apache.bookkeeper</groupId>
Expand Down Expand Up @@ -133,6 +118,12 @@
<artifactId>lz4</artifactId>
<version>${lz4.version}</version>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>${lombok.version}</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.mockito</groupId>
<artifactId>mockito-core</artifactId>
Expand All @@ -149,23 +140,6 @@
</dependencies>
<build>
<plugins>
<plugin>
<groupId>com.twitter</groupId>
<artifactId>scrooge-maven-plugin</artifactId>
<version>${scrooge-maven-plugin.version}</version>
<configuration>
<language>java</language>
</configuration>
<executions>
<execution>
<id>thrift-sources</id>
<phase>generate-sources</phase>
<goals>
<goal>compile</goal>
</goals>
</execution>
</executions>
</plugin>
<plugin>
<artifactId>maven-compiler-plugin</artifactId>
<version>${maven-compiler-plugin.version}</version>
Expand Down Expand Up @@ -206,6 +180,37 @@
<excludeFilterFile>${basedir}/src/main/resources/findbugsExclude.xml</excludeFilterFile>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-shade-plugin</artifactId>
<version>${maven-shade-plugin.version}</version>
<configuration>
<!-- put your configurations here -->
</configuration>
<executions>
<execution>
<phase>package</phase>
<goals>
<goal>shade</goal>
</goals>
<configuration>
<createDependencyReducedPom>true</createDependencyReducedPom>
<artifactSet>
<includes>
<include>org.apache.thrift:libthrift</include>
</includes>
</artifactSet>
<minimizeJar>true</minimizeJar>
<relocations>
<relocation>
<pattern>org.apache.thrift</pattern>
<shadedPattern>dl-shade.org.apache.thrift</shadedPattern>
</relocation>
</relocations>
</configuration>
</execution>
</executions>
</plugin>
</plugins>
</build>
<profiles>
Expand Down
Expand Up @@ -18,10 +18,8 @@
package org.apache.distributedlog;

import com.google.common.base.Preconditions;

import java.io.IOException;
import java.io.InputStream;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand Down
Expand Up @@ -17,14 +17,12 @@
*/
package org.apache.distributedlog;

import org.apache.distributedlog.exceptions.UnexpectedException;
import org.apache.distributedlog.util.FutureUtils;
import com.twitter.util.Await;
import com.twitter.util.Future;
import com.twitter.util.FutureEventListener;
import java.io.Closeable;
import java.io.IOException;

import java.util.concurrent.CompletableFuture;
import org.apache.distributedlog.exceptions.UnexpectedException;
import org.apache.distributedlog.util.FutureEventListener;
import org.apache.distributedlog.util.FutureUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

Expand All @@ -44,16 +42,16 @@ public AppendOnlyStreamWriter(BKAsyncLogWriter logWriter, long pos) {
this.requestPos = pos;
}

public Future<DLSN> write(byte[] data) {
public CompletableFuture<DLSN> write(byte[] data) {
requestPos += data.length;
Future<DLSN> writeResult = logWriter.write(new LogRecord(requestPos, data));
return writeResult.addEventListener(new WriteCompleteListener(requestPos));
CompletableFuture<DLSN> writeResult = logWriter.write(new LogRecord(requestPos, data));
return writeResult.whenComplete(new WriteCompleteListener(requestPos));
}

public void force(boolean metadata) throws IOException {
long pos = 0;
try {
pos = Await.result(logWriter.flushAndCommit());
pos = FutureUtils.result(logWriter.flushAndCommit());
} catch (IOException ioe) {
throw ioe;
} catch (Exception ex) {
Expand All @@ -78,7 +76,7 @@ public void close() throws IOException {

public void markEndOfStream() throws IOException {
try {
Await.result(logWriter.markEndOfStream());
FutureUtils.result(logWriter.markEndOfStream());
} catch (IOException ioe) {
throw ioe;
} catch (Exception ex) {
Expand Down
Expand Up @@ -17,11 +17,10 @@
*/
package org.apache.distributedlog;

import org.apache.distributedlog.io.AsyncCloseable;
import com.twitter.util.Future;

import java.util.List;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;
import org.apache.distributedlog.io.AsyncCloseable;

public interface AsyncLogReader extends AsyncCloseable {

Expand All @@ -37,7 +36,7 @@ public interface AsyncLogReader extends AsyncCloseable {
*
* @return A promise that when satisfied will contain the Log Record with its DLSN.
*/
public Future<LogRecordWithDLSN> readNext();
public CompletableFuture<LogRecordWithDLSN> readNext();

/**
* Read next <i>numEntries</i> entries. The future is only satisfied with non-empty list
Expand All @@ -48,7 +47,7 @@ public interface AsyncLogReader extends AsyncCloseable {
* num entries
* @return A promise that when satisfied will contain a non-empty list of records with their DLSN.
*/
public Future<List<LogRecordWithDLSN>> readBulk(int numEntries);
public CompletableFuture<List<LogRecordWithDLSN>> readBulk(int numEntries);

/**
* Read next <i>numEntries</i> entries in a given <i>waitTime</i>.
Expand All @@ -65,5 +64,5 @@ public interface AsyncLogReader extends AsyncCloseable {
* wait time unit
* @return A promise that when satisfied will contain a non-empty list of records with their DLSN.
*/
public Future<List<LogRecordWithDLSN>> readBulk(int numEntries, long waitTime, TimeUnit timeUnit);
public CompletableFuture<List<LogRecordWithDLSN>> readBulk(int numEntries, long waitTime, TimeUnit timeUnit);
}

0 comments on commit 9c85042

Please sign in to comment.