Skip to content

Commit

Permalink
Merge 7b46a9a into 3fceccc
Browse files Browse the repository at this point in the history
  • Loading branch information
sijie committed Dec 13, 2016
2 parents 3fceccc + 7b46a9a commit e0267ad
Show file tree
Hide file tree
Showing 11 changed files with 129 additions and 27 deletions.
Expand Up @@ -480,6 +480,31 @@ static int parseMemberID(String nodeName) {
return id;
}

static boolean areLockWaitersInSameSession(String node1, String node2) {
String[] parts1 = node1.split("_");
String[] parts2 = node2.split("_");
if (parts1.length != 4 || parts2.length != 4) {
return node1.equals(node2);
}
if (!parts1[2].startsWith("s") || !parts2[2].startsWith("s")) {
return node1.equals(node2);
}
long sessionOwner1 = Long.parseLong(parts1[2].substring(1));
long sessionOwner2 = Long.parseLong(parts2[2].substring(1));
if (sessionOwner1 != sessionOwner2) {
return false;
}
String clientId1, clientId2;
try {
clientId1 = URLDecoder.decode(parts1[1], UTF_8.name());
clientId2 = URLDecoder.decode(parts2[1], UTF_8.name());
return clientId1.equals(clientId2);
} catch (UnsupportedEncodingException e) {
// if failed to parse client id, we have to get client id by zookeeper#getData.
return node1.equals(node2);
}
}

/**
* Get client id and its ephemeral owner.
*
Expand Down Expand Up @@ -1209,17 +1234,19 @@ private void watchLockOwner(final LockWatcher lockWatcher,
@Override
public void execute() {
boolean shouldWatch;
final boolean shouldClaimOwnership;
if (lockContext.hasLockId(currentOwner) && siblingNode.equals(ownerNode)) {
// if the current owner is the znode left from previous session
// we should watch it and claim ownership
shouldWatch = true;
shouldClaimOwnership = true;
LOG.info("LockWatcher {} for {} found its previous session {} held lock, watch it to claim ownership.",
new Object[] { myNode, lockPath, currentOwner });
} else if (lockId.compareTo(currentOwner) == 0 && siblingNode.equals(ownerNode)) {
} else if (lockId.compareTo(currentOwner) == 0 && areLockWaitersInSameSession(siblingNode, ownerNode)) {
// I found that my sibling is the current owner with same lock id (client id & session id)
// It must be left by any race condition from same zookeeper client
// I would watch owner instead of sibling
shouldWatch = true;
shouldClaimOwnership = true;
LOG.info("LockWatcher {} for {} found itself {} already held lock at sibling node {}, watch it to claim ownership.",
new Object[]{myNode, lockPath, lockId, siblingNode});
} else {
Expand All @@ -1230,6 +1257,7 @@ public void execute() {
new Object[]{lockPath, myNode, siblingNode, System.currentTimeMillis()});
}
}
shouldClaimOwnership = false;
}

// watch sibling for lock ownership
Expand All @@ -1247,8 +1275,7 @@ public void execute() {
}

if (KeeperException.Code.OK.intValue() == rc) {
if (siblingNode.equals(ownerNode) &&
(lockId.compareTo(currentOwner) == 0 || lockContext.hasLockId(currentOwner))) {
if (shouldClaimOwnership) {
// watch owner successfully
LOG.info("LockWatcher {} claimed ownership for {} after set watcher on {}.",
new Object[]{ myNode, lockPath, ownerNode });
Expand Down
Expand Up @@ -179,6 +179,28 @@ public void testParseMemberID() throws Exception {
assertEquals(123, parseMemberID("member_000123"));
}

@Test(timeout = 60000)
public void testAreLockWaitersInSameSession() throws Exception {
ZooKeeper zk = zkc.get();

String lockPath = "/test-are-lock-waiters-in-same-session";
String clientId1 = "test-are-lock-waiters-in-same-session-1";
String clientId2 = "test-are-lock-waiters-in-same-session-2";

createLockPath(zk, lockPath);

String node1 = getLockIdFromPath(createLockNodeV3(zk, lockPath, clientId1));
String node2 = getLockIdFromPath(createLockNodeV3(zk, lockPath, clientId2));
String node3 = getLockIdFromPath(createLockNodeV3(zk, lockPath, clientId1));

assertEquals(node1 + " and " + node3 + " should be in same session.",
true, areLockWaitersInSameSession(node1, node3));
assertEquals(node1 + " and " + node2 + " should be not in same session.",
false, areLockWaitersInSameSession(node1, node2));
assertEquals(node3 + " and " + node2 + " should be not in same session.",
false, areLockWaitersInSameSession(node3, node2));
}

@Test(timeout = 60000)
public void testExecuteLockAction() throws Exception {
String lockPath = "/test-execute-lock-action";
Expand Down Expand Up @@ -921,6 +943,33 @@ public void run() {
lock1_1.unlock();
}

@Test(timeout = 60000)
public void testLockWithMultipleSiblingWaiters() throws Exception {
String lockPath = "/test-lock-with-multiple-sibling-waiters";
String clientId = "client-id";

createLockPath(zkc.get(), lockPath);

final ZKSessionLock lock0 = new ZKSessionLock(zkc, lockPath, clientId, lockStateExecutor);
final ZKSessionLock lock1 = new ZKSessionLock(zkc, lockPath, clientId, lockStateExecutor);
final ZKSessionLock lock2 = new ZKSessionLock(zkc, lockPath, clientId, lockStateExecutor);

lock0.tryLock(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
lock1.tryLock(Long.MAX_VALUE, TimeUnit.MILLISECONDS);
lock2.tryLock(Long.MAX_VALUE, TimeUnit.MILLISECONDS);

List<String> children = awaitWaiters(3, zkc, lockPath);

assertEquals(3, children.size());
assertEquals(State.CLAIMED, lock0.getLockState());
assertEquals(State.CLAIMED, lock1.getLockState());
assertEquals(State.CLAIMED, lock2.getLockState());

lock0.unlock();
lock1.unlock();
lock2.unlock();
}

/**
* Immediate lock and unlock first lock
* @throws Exception
Expand Down
Expand Up @@ -132,6 +132,7 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI
// Stats
private final StatsLogger statsLogger;
private final StatsLogger perStreamStatsLogger;
private final StreamPartitionConverter streamPartitionConverter;
private final StreamOpStats streamOpStats;
private final Counter bulkWritePendingStat;
private final Counter writePendingStat;
Expand All @@ -158,6 +159,7 @@ public class DistributedLogServiceImpl implements DistributedLogService.ServiceI
this.perStreamStatsLogger = perStreamStatsLogger;
this.dlsnVersion = serverConf.getDlsnVersion();
this.serverRegionId = serverConf.getRegionId();
this.streamPartitionConverter = converter;
int serverPort = serverConf.getServerPort();
int shard = serverConf.getServerShardId();
int numThreads = serverConf.getServerThreads();
Expand Down Expand Up @@ -396,8 +398,8 @@ public Future<WriteResponse> write(final String stream, ByteBuffer data) {
public Future<BulkWriteResponse> writeBulkWithContext(final String stream, List<ByteBuffer> data, WriteContext ctx) {
bulkWritePendingStat.inc();
receivedRecordCounter.add(data.size());
BulkWriteOp op = new BulkWriteOp(stream, data, statsLogger, perStreamStatsLogger, getChecksum(ctx),
featureChecksumDisabled, accessControlManager);
BulkWriteOp op = new BulkWriteOp(stream, data, statsLogger, perStreamStatsLogger, streamPartitionConverter,
getChecksum(ctx), featureChecksumDisabled, accessControlManager);
executeStreamOp(op);
return op.result().ensure(new Function0<BoxedUnit>() {
public BoxedUnit apply() {
Expand Down Expand Up @@ -675,8 +677,9 @@ WriteOp newWriteOp(String stream,
ByteBuffer data,
Long checksum,
boolean isRecordSet) {
return new WriteOp(stream, data, statsLogger, perStreamStatsLogger, serverConfig, dlsnVersion,
checksum, isRecordSet, featureChecksumDisabled, accessControlManager);
return new WriteOp(stream, data, statsLogger, perStreamStatsLogger, streamPartitionConverter,
serverConfig, dlsnVersion, checksum, isRecordSet, featureChecksumDisabled,
accessControlManager);
}

@VisibleForTesting
Expand Down
Expand Up @@ -33,6 +33,8 @@
import com.twitter.distributedlog.exceptions.OwnershipAcquireFailedException;
import com.twitter.distributedlog.exceptions.RequestDeniedException;
import com.twitter.distributedlog.service.ResponseUtils;
import com.twitter.distributedlog.service.streamset.Partition;
import com.twitter.distributedlog.service.streamset.StreamPartitionConverter;
import com.twitter.distributedlog.thrift.service.BulkWriteResponse;
import com.twitter.distributedlog.thrift.service.ResponseHeader;
import com.twitter.distributedlog.thrift.service.StatusCode;
Expand Down Expand Up @@ -88,6 +90,7 @@ public BulkWriteOp(String stream,
List<ByteBuffer> buffers,
StatsLogger statsLogger,
StatsLogger perStreamStatsLogger,
StreamPartitionConverter streamPartitionConverter,
Long checksum,
Feature checksumDisabledFeature,
AccessControlManager accessControlManager) {
Expand All @@ -100,15 +103,16 @@ public BulkWriteOp(String stream,
}
this.payloadSize = total;

final Partition partition = streamPartitionConverter.convert(stream);
// Write record stats
StreamOpStats streamOpStats = new StreamOpStats(statsLogger, perStreamStatsLogger);
this.deniedBulkWriteCounter = streamOpStats.requestDeniedCounter("bulkWrite");
this.successRecordCounter = streamOpStats.recordsCounter("success");
this.failureRecordCounter = streamOpStats.recordsCounter("failure");
this.redirectRecordCounter = streamOpStats.recordsCounter("redirect");
this.bulkWriteBytes = streamOpStats.scopedRequestCounter("bulkWrite", "bytes");
this.latencyStat = streamOpStats.streamRequestLatencyStat(stream, "bulkWrite");
this.bytes = streamOpStats.streamRequestCounter(stream, "bulkWrite", "bytes");
this.latencyStat = streamOpStats.streamRequestLatencyStat(partition, "bulkWrite");
this.bytes = streamOpStats.streamRequestCounter(partition, "bulkWrite", "bytes");

this.accessControlManager = accessControlManager;

Expand Down
Expand Up @@ -194,12 +194,12 @@ static boolean isUnavailable(StreamStatus status) {
this.dynConf = streamConf;
StatsLogger limiterStatsLogger = BroadCastStatsLogger.two(
streamOpStats.baseScope("stream_limiter"),
streamOpStats.streamRequestScope(name, "limiter"));
streamOpStats.streamRequestScope(partition, "limiter"));
this.limiter = new StreamRequestLimiter(name, dynConf, limiterStatsLogger, featureRateLimitDisabled);
this.requestTimer = requestTimer;

// Stats
this.streamLogger = streamOpStats.streamRequestStatsLogger(name);
this.streamLogger = streamOpStats.streamRequestStatsLogger(partition);
this.limiterStatLogger = streamOpStats.baseScope("request_limiter");
this.streamExceptionStatLogger = streamLogger.scope("exceptions");
this.serviceTimeout = streamOpStats.baseCounter("serviceTimeout");
Expand Down
Expand Up @@ -17,6 +17,8 @@
*/
package com.twitter.distributedlog.service.stream;

import com.twitter.distributedlog.stats.BroadCastStatsLogger;
import com.twitter.distributedlog.service.streamset.Partition;
import org.apache.bookkeeper.stats.Counter;
import org.apache.bookkeeper.stats.OpStatsLogger;
import org.apache.bookkeeper.stats.StatsLogger;
Expand Down Expand Up @@ -81,19 +83,22 @@ public Counter recordsCounter(String counterName) {
return recordsStatsLogger.getCounter(counterName);
}

public StatsLogger streamRequestStatsLogger(String streamName) {
return streamStatsLogger.scope(streamName);
public StatsLogger streamRequestStatsLogger(Partition partition) {
return BroadCastStatsLogger.masterslave(
streamStatsLogger.scope(partition.getStream()).scope("partition")
.scope(partition.getPaddedId()), streamStatsLogger.scope(partition.getStream())
.scope("aggregate"));
}

public StatsLogger streamRequestScope(String streamName, String scopeName) {
return streamRequestStatsLogger(streamName).scope(scopeName);
public StatsLogger streamRequestScope(Partition partition, String scopeName) {
return streamRequestStatsLogger(partition).scope(scopeName);
}

public OpStatsLogger streamRequestLatencyStat(String streamName, String opName) {
return streamRequestStatsLogger(streamName).getOpStatsLogger(opName);
public OpStatsLogger streamRequestLatencyStat(Partition partition, String opName) {
return streamRequestStatsLogger(partition).getOpStatsLogger(opName);
}

public Counter streamRequestCounter(String streamName, String opName, String counterName) {
return streamRequestScope(streamName, opName).getCounter(counterName);
public Counter streamRequestCounter(Partition partition, String opName, String counterName) {
return streamRequestScope(partition, opName).getCounter(counterName);
}
}
Expand Up @@ -26,6 +26,8 @@
import com.twitter.distributedlog.exceptions.DLException;
import com.twitter.distributedlog.exceptions.RequestDeniedException;
import com.twitter.distributedlog.service.ResponseUtils;
import com.twitter.distributedlog.service.streamset.Partition;
import com.twitter.distributedlog.service.streamset.StreamPartitionConverter;
import com.twitter.distributedlog.thrift.service.WriteResponse;
import com.twitter.distributedlog.thrift.service.ResponseHeader;
import com.twitter.distributedlog.thrift.service.StatusCode;
Expand Down Expand Up @@ -67,6 +69,7 @@ public WriteOp(String stream,
ByteBuffer data,
StatsLogger statsLogger,
StatsLogger perStreamStatsLogger,
StreamPartitionConverter streamPartitionConverter,
ServerConfiguration conf,
byte dlsnVersion,
Long checksum,
Expand All @@ -78,14 +81,15 @@ public WriteOp(String stream,
data.get(payload);
this.isRecordSet = isRecordSet;

final Partition partition = streamPartitionConverter.convert(stream);
StreamOpStats streamOpStats = new StreamOpStats(statsLogger, perStreamStatsLogger);
this.successRecordCounter = streamOpStats.recordsCounter("success");
this.failureRecordCounter = streamOpStats.recordsCounter("failure");
this.redirectRecordCounter = streamOpStats.recordsCounter("redirect");
this.deniedWriteCounter = streamOpStats.requestDeniedCounter("write");
this.writeBytes = streamOpStats.scopedRequestCounter("write", "bytes");
this.latencyStat = streamOpStats.streamRequestLatencyStat(stream, "write");
this.bytes = streamOpStats.streamRequestCounter(stream, "write", "bytes");
this.latencyStat = streamOpStats.streamRequestLatencyStat(partition, "write");
this.bytes = streamOpStats.streamRequestCounter(partition, "write", "bytes");

this.dlsnVersion = dlsnVersion;
this.accessControlManager = accessControlManager;
Expand Down
Expand Up @@ -57,6 +57,14 @@ public int getId() {
return id;
}

/**
* Get the 6 digit 0 padded id of this partition as a String.
* @return partition id
*/
public String getPaddedId() {
return String.format("%06d", getId());
}

@Override
public boolean equals(Object o) {
if (this == o) {
Expand Down
Expand Up @@ -106,9 +106,9 @@ public void testBasicWrite() throws Exception {
int numRead = 0;
LogRecord r = reader.readNext(false);
while (null != r) {
int i = Integer.parseInt(new String(r.getPayload()));
assertEquals(numRead + 1, i);
++numRead;
int i = Integer.parseInt(new String(r.getPayload()));
assertEquals(numRead, i);
r = reader.readNext(false);
}
assertEquals(10, numRead);
Expand All @@ -121,7 +121,7 @@ public void testBasicWrite() throws Exception {
*/
@Test(timeout = 60000)
public void testChecksumFlag() throws Exception {
String name = "dlserver-basic-write";
String name = "testChecksumFlag";
LocalRoutingService routingService = LocalRoutingService.newBuilder().build();
routingService.addHost(name, dlServer.getAddress());
DistributedLogClientBuilder dlClientBuilder = DistributedLogClientBuilder.newBuilder()
Expand All @@ -134,7 +134,7 @@ public void testChecksumFlag() throws Exception {
.connectionTimeout(Duration.fromSeconds(1))
.requestTimeout(Duration.fromSeconds(60)))
.checksum(false);
DistributedLogClient dlClient = (DistributedLogClientImpl) dlClientBuilder.build();
DistributedLogClient dlClient = dlClientBuilder.build();
Await.result(dlClient.write(name, ByteBuffer.wrap(("1").getBytes())));
dlClient.close();

Expand Down
Expand Up @@ -600,6 +600,7 @@ private WriteOp getWriteOp(String name, SettableFeature disabledFeature, Long ch
ByteBuffer.wrap("test".getBytes()),
new NullStatsLogger(),
new NullStatsLogger(),
new IdentityStreamPartitionConverter(),
new ServerConfiguration(),
(byte)0,
checksum,
Expand Down
Expand Up @@ -24,7 +24,7 @@
import com.twitter.distributedlog.exceptions.InternalServerException;
import com.twitter.distributedlog.service.ResponseUtils;
import com.twitter.distributedlog.service.config.ServerConfiguration;
import com.twitter.distributedlog.service.stream.WriteOp;
import com.twitter.distributedlog.service.streamset.IdentityStreamPartitionConverter;
import com.twitter.distributedlog.thrift.service.StatusCode;
import com.twitter.distributedlog.thrift.service.WriteResponse;
import com.twitter.distributedlog.util.Sequencer;
Expand Down Expand Up @@ -67,6 +67,7 @@ private WriteOp getWriteOp() {
ByteBuffer.wrap("test".getBytes()),
new NullStatsLogger(),
new NullStatsLogger(),
new IdentityStreamPartitionConverter(),
new ServerConfiguration(),
(byte)0,
null,
Expand Down

0 comments on commit e0267ad

Please sign in to comment.