Skip to content

Commit

Permalink
Merge e6bc01a into 74a3302
Browse files Browse the repository at this point in the history
  • Loading branch information
xiliuant committed Dec 22, 2016
2 parents 74a3302 + e6bc01a commit bfa12bf
Show file tree
Hide file tree
Showing 54 changed files with 804 additions and 415 deletions.
33 changes: 33 additions & 0 deletions distributedlog-client/pom.xml
Expand Up @@ -129,6 +129,39 @@
<forkedProcessTimeoutInSeconds>1800</forkedProcessTimeoutInSeconds>
</configuration>
</plugin>
<plugin>
<groupId>org.apache.maven.plugins</groupId>
<artifactId>maven-checkstyle-plugin</artifactId>
<version>2.17</version>
<dependencies>
<dependency>
<groupId>com.puppycrawl.tools</groupId>
<artifactId>checkstyle</artifactId>
<version>6.19</version>
</dependency>
<dependency>
<groupId>com.twitter</groupId>
<artifactId>distributedlog-build-tools</artifactId>
<version>${project.version}</version>
</dependency>
</dependencies>
<configuration>
<configLocation>distributedlog/checkstyle.xml</configLocation>
<suppressionsLocation>distributedlog/suppressions.xml</suppressionsLocation>
<consoleOutput>true</consoleOutput>
<failOnViolation>true</failOnViolation>
<includeResources>false</includeResources>
<includeTestSourceDirectory>true</includeTestSourceDirectory>
</configuration>
<executions>
<execution>
<phase>test-compile</phase>
<goals>
<goal>check</goal>
</goals>
</execution>
</executions>
</plugin>
</plugins>
</build>
</project>
Expand Up @@ -17,12 +17,12 @@
*/
package com.twitter.distributedlog.client;

import com.google.common.base.Preconditions;
import static com.google.common.base.Preconditions.checkNotNull;

import java.util.concurrent.TimeUnit;

/**
* Client Config
* Client Config.
*/
public class ClientConfig {
int redirectBackoffStartMs = 25;
Expand Down Expand Up @@ -95,7 +95,7 @@ public boolean getStreamFailfast() {
}

public ClientConfig setStreamNameRegex(String nameRegex) {
Preconditions.checkNotNull(nameRegex);
checkNotNull(nameRegex);
this.streamNameRegex = nameRegex;
return this;
}
Expand Down
Expand Up @@ -24,13 +24,12 @@
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.twitter.distributedlog.DLSN;
import com.twitter.distributedlog.LogRecordSetBuffer;
import com.twitter.distributedlog.util.ProtocolUtils;
import com.twitter.distributedlog.client.monitor.MonitorServiceClient;
import com.twitter.distributedlog.client.ownership.OwnershipCache;
import com.twitter.distributedlog.client.proxy.HostProvider;
import com.twitter.distributedlog.client.proxy.ProxyClient;
import com.twitter.distributedlog.client.proxy.ProxyClientManager;
import com.twitter.distributedlog.client.proxy.ProxyListener;
import com.twitter.distributedlog.client.monitor.MonitorServiceClient;
import com.twitter.distributedlog.client.ownership.OwnershipCache;
import com.twitter.distributedlog.client.resolver.RegionResolver;
import com.twitter.distributedlog.client.routing.RoutingService;
import com.twitter.distributedlog.client.routing.RoutingService.RoutingContext;
Expand All @@ -49,6 +48,7 @@
import com.twitter.distributedlog.thrift.service.StatusCode;
import com.twitter.distributedlog.thrift.service.WriteContext;
import com.twitter.distributedlog.thrift.service.WriteResponse;
import com.twitter.distributedlog.util.ProtocolUtils;
import com.twitter.finagle.CancelledRequestException;
import com.twitter.finagle.ConnectionFailedException;
import com.twitter.finagle.Failure;
Expand All @@ -66,16 +66,6 @@
import com.twitter.util.Future;
import com.twitter.util.FutureEventListener;
import com.twitter.util.Promise;
import org.apache.thrift.TApplicationException;
import org.jboss.netty.channel.ChannelException;
import org.jboss.netty.util.HashedWheelTimer;
import org.jboss.netty.util.Timeout;
import org.jboss.netty.util.TimerTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.collection.Seq;
import scala.runtime.AbstractFunction1;

import java.io.IOException;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
Expand All @@ -91,14 +81,24 @@
import java.util.concurrent.locks.ReentrantReadWriteLock;
import java.util.regex.Matcher;
import java.util.regex.Pattern;
import org.apache.thrift.TApplicationException;
import org.jboss.netty.channel.ChannelException;
import org.jboss.netty.util.HashedWheelTimer;
import org.jboss.netty.util.Timeout;
import org.jboss.netty.util.TimerTask;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import scala.collection.Seq;
import scala.runtime.AbstractFunction1;


/**
* Implementation of distributedlog client
* Implementation of distributedlog client.
*/
public class DistributedLogClientImpl implements DistributedLogClient, MonitorServiceClient,
RoutingService.RoutingListener, ProxyListener, HostProvider {

static final Logger logger = LoggerFactory.getLogger(DistributedLogClientImpl.class);
private static final Logger logger = LoggerFactory.getLogger(DistributedLogClientImpl.class);

private final String clientName;
private final ClientId clientId;
Expand Down Expand Up @@ -142,13 +142,13 @@ abstract class StreamOp implements TimerTask {

void send(SocketAddress address) {
long elapsedMs = stopwatch.elapsed(TimeUnit.MILLISECONDS);
if (clientConfig.getMaxRedirects() > 0 &&
tries.get() >= clientConfig.getMaxRedirects()) {
if (clientConfig.getMaxRedirects() > 0
&& tries.get() >= clientConfig.getMaxRedirects()) {
fail(address, new RequestTimeoutException(Duration.fromMilliseconds(elapsedMs),
"Exhausted max redirects in " + elapsedMs + " ms"));
return;
} else if (clientConfig.getRequestTimeoutMs() > 0 &&
elapsedMs >= clientConfig.getRequestTimeoutMs()) {
} else if (clientConfig.getRequestTimeoutMs() > 0
&& elapsedMs >= clientConfig.getRequestTimeoutMs()) {
fail(address, new RequestTimeoutException(Duration.fromMilliseconds(elapsedMs),
"Exhausted max request timeout " + clientConfig.getRequestTimeoutMs()
+ " in " + elapsedMs + " ms"));
Expand Down Expand Up @@ -203,7 +203,7 @@ Long computeChecksum() {
}

@Override
synchronized public void run(Timeout timeout) throws Exception {
public synchronized void run(Timeout timeout) throws Exception {
if (!timeout.isCancelled() && null != nextAddressToSend) {
doSend(nextAddressToSend);
} else {
Expand Down Expand Up @@ -231,18 +231,20 @@ class BulkWriteOp extends StreamOp {

@Override
Future<ResponseHeader> sendRequest(final ProxyClient sc) {
return sc.getService().writeBulkWithContext(stream, data, ctx).addEventListener(new FutureEventListener<BulkWriteResponse>() {
return sc.getService().writeBulkWithContext(stream, data, ctx)
.addEventListener(new FutureEventListener<BulkWriteResponse>() {
@Override
public void onSuccess(BulkWriteResponse response) {
// For non-success case, the ResponseHeader handler (the caller) will handle it.
// Note success in this case means no finagle errors have occurred (such as finagle connection issues).
// In general code != SUCCESS means there's some error reported by dlog service. The caller will handle such
// errors.
// Note success in this case means no finagle errors have occurred
// (such as finagle connection issues). In general code != SUCCESS means there's some error
// reported by dlog service. The caller will handle such errors.
if (response.getHeader().getCode() == StatusCode.SUCCESS) {
beforeComplete(sc, response.getHeader());
BulkWriteOp.this.complete(sc.getAddress(), response);
if (response.getWriteResponses().size() == 0 && data.size() > 0) {
logger.error("non-empty bulk write got back empty response without failure for stream {}", stream);
logger.error("non-empty bulk write got back empty response without failure for stream {}",
stream);
}
}
}
Expand Down Expand Up @@ -277,7 +279,8 @@ void complete(SocketAddress address, BulkWriteResponse bulkWriteResponse) {

// Should never happen, but just in case so there's some record.
if (bulkWriteResponse.getWriteResponses().size() != data.size()) {
logger.error("wrong number of results, response = {} records = ", bulkWriteResponse.getWriteResponses().size(), data.size());
logger.error("wrong number of results, response = {} records = {}",
bulkWriteResponse.getWriteResponses().size(), data.size());
}
}

Expand Down Expand Up @@ -597,8 +600,15 @@ public Object apply() {
}
});

logger.info("Build distributedlog client : name = {}, client_id = {}, routing_service = {}, stats_receiver = {}, thriftmux = {}",
new Object[] { name, clientId, routingService.getClass(), statsReceiver.getClass(), clientConfig.getThriftMux() });
logger.info("Build distributedlog client : name = {}, client_id = {}, routing_service = {},"
+ " stats_receiver = {}, thriftmux = {}",
new Object[] {
name,
clientId,
routingService.getClass(),
statsReceiver.getClass(),
clientConfig.getThriftMux()
});
}

@Override
Expand All @@ -612,9 +622,9 @@ public Set<SocketAddress> getHosts() {

@Override
public void onHandshakeSuccess(SocketAddress address, ProxyClient client, ServerInfo serverInfo) {
if (null != serverInfo &&
serverInfo.isSetServerStatus() &&
ServerStatus.DOWN == serverInfo.getServerStatus()) {
if (null != serverInfo
&& serverInfo.isSetServerStatus()
&& ServerStatus.DOWN == serverInfo.getServerStatus()) {
logger.info("{} is detected as DOWN during handshaking", address);
// server is shutting down
handleServiceUnavailable(address, client, Optional.<StreamOp>absent());
Expand Down

0 comments on commit bfa12bf

Please sign in to comment.