Skip to content

Commit

Permalink
Merge branch 'master' into session-v2
Browse files Browse the repository at this point in the history
  • Loading branch information
Randgalt committed Apr 28, 2012
2 parents 340f000 + 8b94324 commit a02082e
Show file tree
Hide file tree
Showing 7 changed files with 87 additions and 36 deletions.
4 changes: 4 additions & 0 deletions CHANGES.txt
Original file line number Diff line number Diff line change
Expand Up @@ -3,6 +3,10 @@
* Based on suggestion in Issue 67: Added new concept of UriSpec to the ServiceInstance in the
Service Discovery Curator extension.

* User "Pierre-Luc Bertrand" pointed out a potential race condition that would cause a SysConnected
to get sent before an Expired. So, now I push the event to the parent watcher before resetting
the connection in ConnectionState.process(WatchedEvent)

1.1.8/1.0.9 - April 17, 2012
============================
* Added methods to compress data via create() and setData() and to decompress data via getData(). The
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,6 +151,14 @@ public void process(WatchedEvent event)
log.debug("ConnectState watcher: " + event);
}

Watcher localParentWatcher = parentWatcher.get();
if ( localParentWatcher != null )
{
TimeTrace timeTrace = new TimeTrace("connection-state-parent-process", tracer.get());
localParentWatcher.process(event);
timeTrace.commit();
}

boolean wasConnected = isConnected.get();
boolean newIsConnected = wasConnected;
if ( event.getType() == Watcher.Event.EventType.None )
Expand All @@ -163,14 +171,6 @@ public void process(WatchedEvent event)
isConnected.set(newIsConnected);
connectionStartMs = System.currentTimeMillis();
}

Watcher localParentWatcher = parentWatcher.get();
if ( localParentWatcher != null )
{
TimeTrace timeTrace = new TimeTrace("connection-state-parent-process", tracer.get());
localParentWatcher.process(event);
timeTrace.commit();
}
}

private boolean checkState(Event.KeeperState state, boolean wasConnected)
Expand Down
23 changes: 18 additions & 5 deletions curator-client/src/test/java/com/netflix/curator/BasicTests.java
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import com.netflix.curator.test.Timing;
import com.netflix.curator.utils.ZookeeperFactory;
import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooDefs;
Expand All @@ -34,6 +35,7 @@
import java.io.File;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;

public class BasicTests extends BaseClassForTests
{
Expand Down Expand Up @@ -78,6 +80,7 @@ public void process(WatchedEvent event)
client.start();
try
{
final AtomicBoolean firstTime = new AtomicBoolean(true);
RetryLoop.callWithRetry
(
client,
Expand All @@ -86,11 +89,21 @@ public void process(WatchedEvent event)
@Override
public Object call() throws Exception
{
client.getZooKeeper().create("/foo", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);

KillSession.kill(client.getZooKeeper(), server.getConnectString());

Assert.assertTrue(timing.awaitLatch(latch));
if ( firstTime.compareAndSet(true, false) )
{
try
{
client.getZooKeeper().create("/foo", new byte[0], ZooDefs.Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
}
catch ( KeeperException.NodeExistsException ignore )
{
// ignore
}

KillSession.kill(client.getZooKeeper(), server.getConnectString());

Assert.assertTrue(timing.awaitLatch(latch));
}
ZooKeeper zooKeeper = client.getZooKeeper();
client.blockUntilConnectedOrTimedOut();
Assert.assertNotNull(zooKeeper.exists("/foo", false));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package com.netflix.curator.framework.recipes.queue;

import com.google.common.annotations.VisibleForTesting;
import com.google.common.base.Function;
import com.google.common.base.Preconditions;
import com.netflix.curator.framework.CuratorFramework;
Expand Down Expand Up @@ -357,6 +358,13 @@ public Void apply(QueuePutListener<T> listener)
);
}
};
internalCreateNode(path, bytes, callback);
}


@VisibleForTesting
void internalCreateNode(String path, byte[] bytes, BackgroundCallback callback) throws Exception
{
client.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).inBackground(callback).forPath(path, bytes);
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -281,7 +281,8 @@ public void testRotatingLeadership() throws Exception
final int LEADER_QTY = 5;
final int REPEAT_QTY = 3;

CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
final Timing timing = new Timing();
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
client.start();
try
{
Expand All @@ -295,7 +296,7 @@ public void testRotatingLeadership() throws Exception
@Override
public void takeLeadership(CuratorFramework client) throws Exception
{
Thread.sleep(500);
timing.sleepABit();
leaderList.add(ourIndex);
}

Expand Down Expand Up @@ -328,7 +329,7 @@ public void stateChanged(CuratorFramework client, ConnectionState newState)
Assert.assertNotNull(polledIndex);
localLeaderList.add(polledIndex);
}
Thread.sleep(500);
timing.sleepABit();
}

for ( LeaderSelector leaderSelector : selectors )
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,16 +20,18 @@
import com.google.common.collect.Lists;
import com.google.common.collect.Sets;
import com.google.common.io.Closeables;
import com.google.common.util.concurrent.MoreExecutors;
import com.google.common.util.concurrent.ThreadFactoryBuilder;
import com.netflix.curator.framework.CuratorFramework;
import com.netflix.curator.framework.CuratorFrameworkFactory;
import com.netflix.curator.framework.api.CuratorEvent;
import com.netflix.curator.framework.api.CuratorListener;
import com.netflix.curator.framework.api.BackgroundCallback;
import com.netflix.curator.framework.recipes.BaseClassForTests;
import com.netflix.curator.framework.state.ConnectionState;
import com.netflix.curator.framework.state.ConnectionStateListener;
import com.netflix.curator.retry.ExponentialBackoffRetry;
import com.netflix.curator.retry.RetryOneTime;
import com.netflix.curator.test.Timing;
import org.apache.zookeeper.CreateMode;
import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.Test;
Expand All @@ -40,6 +42,7 @@
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicReference;

Expand Down Expand Up @@ -535,33 +538,49 @@ public void testMultiPutterSingleGetter() throws Exception
@Test
public void testFlush() throws Exception
{
final Timing timing = new Timing();
final CountDownLatch latch = new CountDownLatch(1);
DistributedQueue<TestQueueItem> queue = null;
CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), new RetryOneTime(1));
final CuratorFramework client = CuratorFrameworkFactory.newClient(server.getConnectString(), timing.session(), timing.connection(), new RetryOneTime(1));
client.start();
try
{
client.getCuratorListenable().addListener
(
new CuratorListener()
final AtomicBoolean firstTime = new AtomicBoolean(true);
queue = new DistributedQueue<TestQueueItem>(client, null, serializer, "/test", new ThreadFactoryBuilder().build(), MoreExecutors.sameThreadExecutor(), 10, true, null)
{
@Override
void internalCreateNode(final String path, final byte[] bytes, final BackgroundCallback callback) throws Exception
{
@Override
public void eventReceived(CuratorFramework client, CuratorEvent event) throws Exception
if ( firstTime.compareAndSet(true, false) )
{
Executors.newSingleThreadExecutor().submit
(
new Callable<Object>()
{
@Override
public Object call() throws Exception
{
latch.await();
timing.sleepABit();
client.create().withMode(CreateMode.PERSISTENT_SEQUENTIAL).inBackground(callback).forPath(path, bytes);
return null;
}
}
);
}
else
{
// this listener should get called before the queue's listener
latch.await();
super.internalCreateNode(path, bytes, callback);
}
}
);

queue = QueueBuilder.builder(client, null, serializer, "/test").buildQueue();
};
queue.start();

queue.put(new TestQueueItem("1"));
Assert.assertFalse(queue.flushPuts(3, TimeUnit.SECONDS));
Assert.assertFalse(queue.flushPuts(timing.forWaiting().seconds(), TimeUnit.SECONDS));
latch.countDown();

Assert.assertTrue(queue.flushPuts(10, TimeUnit.SECONDS));
Assert.assertTrue(queue.flushPuts(timing.forWaiting().seconds(), TimeUnit.SECONDS));
}
finally
{
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -86,7 +86,7 @@ public class UriSpec implements Iterable<UriSpec.Part>
public static class Part
{
private final String value;
private final boolean isVariable;
private final boolean variable;

/**
* @param value the token value
Expand All @@ -95,7 +95,13 @@ public static class Part
public Part(String value, boolean isVariable)
{
this.value = value;
this.isVariable = isVariable;
this.variable = isVariable;
}

public Part()
{
value = "";
variable = false;
}

public String getValue()
Expand All @@ -105,7 +111,7 @@ public String getValue()

public boolean isVariable()
{
return isVariable;
return variable;
}

@SuppressWarnings("RedundantIfStatement")
Expand All @@ -123,7 +129,7 @@ public boolean equals(Object o)

Part part = (Part)o;

if ( isVariable != part.isVariable )
if ( variable != part.variable )
{
return false;
}
Expand All @@ -139,7 +145,7 @@ public boolean equals(Object o)
public int hashCode()
{
int result = value.hashCode();
result = 31 * result + (isVariable ? 1 : 0);
result = 31 * result + (variable ? 1 : 0);
return result;
}
}
Expand Down

0 comments on commit a02082e

Please sign in to comment.