Permalink
Browse files

S4-108 Share Zookeeper connection within a node

- build the connection through a provider, use singleton scope
- updated some javadoc for S4-114
  • Loading branch information...
1 parent eb85153 commit d6decd05f323d0e495958c23c5b856c8dc120766 @matthieumorel matthieumorel committed Jan 24, 2013
Showing with 125 additions and 80 deletions.
  1. +0 −1 subprojects/s4-comm/src/main/java/org/apache/s4/comm/DefaultCommModule.java
  2. +3 −5 subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/DefaultRemoteEmitters.java
  3. +4 −0 subprojects/s4-comm/src/main/java/org/apache/s4/comm/tcp/RemoteEmitters.java
  4. +5 −9 subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromZK.java
  5. +4 −9 subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClusterFromZK.java
  6. +6 −9 subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ClustersFromZK.java
  7. +13 −0 subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/RemoteStreams.java
  8. +15 −24 subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/ZkRemoteStreams.java
  9. +3 −1 subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/AssignmentsFromZKTest1.java
  10. +7 −3 subprojects/s4-comm/src/test/java/org/apache/s4/comm/topology/ClustersFromZKTest.java
  11. +6 −1 subprojects/s4-comm/src/test/java/org/apache/s4/fixtures/TestCommModule.java
  12. +5 −0 subprojects/s4-core/src/main/java/org/apache/s4/core/BaseModule.java
  13. +3 −6 subprojects/s4-core/src/main/java/org/apache/s4/core/DefaultRemoteSenders.java
  14. +5 −0 subprojects/s4-core/src/main/java/org/apache/s4/core/RemoteSenders.java
  15. +37 −0 subprojects/s4-core/src/main/java/org/apache/s4/core/ZkClientProvider.java
  16. +3 −4 subprojects/s4-core/src/test/java/org/apache/s4/fixtures/AssignmentFromZKNoFailFast.java
  17. +3 −4 subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ClusterFromZKNoFailFast.java
  18. +3 −4 subprojects/s4-core/src/test/java/org/apache/s4/fixtures/ClustersFromZKNoFailFast.java
@@ -115,7 +115,6 @@ protected void configure() {
}
}
- @SuppressWarnings("serial")
private void loadProperties(Binder binder) {
try {
config = new PropertiesConfiguration();
@@ -28,10 +28,6 @@
import com.google.inject.Inject;
import com.google.inject.Singleton;
-/**
- * Manages the {@link RemoteEmitter} instances for sending messages to remote subclusters.
- *
- */
@Singleton
public class DefaultRemoteEmitters implements RemoteEmitters {
@@ -40,7 +36,9 @@
@Inject
RemoteEmitterFactory emitterFactory;
- /* (non-Javadoc)
+ /*
+ * (non-Javadoc)
+ *
* @see org.apache.s4.comm.tcp.RemoteEmitters#getEmitter(org.apache.s4.comm.topology.Cluster)
*/
@Override
@@ -3,6 +3,10 @@
import org.apache.s4.base.RemoteEmitter;
import org.apache.s4.comm.topology.Cluster;
+/**
+ * Manages the {@link RemoteEmitter} instances for sending messages to remote subclusters.
+ *
+ */
public interface RemoteEmitters {
public abstract RemoteEmitter getEmitter(Cluster topology);
@@ -44,7 +44,7 @@
/**
* Handles partition assignment through Zookeeper.
- *
+ *
*/
@Singleton
public class AssignmentFromZK implements Assignment, IZkChildListener, IZkStateListener, IZkDataListener {
@@ -85,15 +85,13 @@
* Holds the reference to ClusterNode which points to the current partition owned
*/
AtomicReference<ClusterNode> clusterNodeRef;
- private int connectionTimeout;
- private String clusterName;
+ private final int connectionTimeout;
+ private final String clusterName;
// TODO we currently have a single assignment per node (i.e. a node can only belong to 1 topology)
@Inject
public AssignmentFromZK(@Named("s4.cluster.name") String clusterName,
- @Named("s4.cluster.zk_address") String zookeeperAddress,
- @Named("s4.cluster.zk_session_timeout") int sessionTimeout,
- @Named("s4.cluster.zk_connection_timeout") int connectionTimeout) throws Exception {
+ @Named("s4.cluster.zk_connection_timeout") int connectionTimeout, ZkClient zkClient) throws Exception {
this.clusterName = clusterName;
this.connectionTimeout = connectionTimeout;
taskPath = "/s4/clusters/" + clusterName + "/tasks";
@@ -110,9 +108,7 @@ public AssignmentFromZK(@Named("s4.cluster.name") String clusterName,
machineId = "UNKNOWN";
}
- zkClient = new ZkClient(zookeeperAddress, sessionTimeout, connectionTimeout);
- ZkSerializer serializer = new ZNRecordSerializer();
- zkClient.setZkSerializer(serializer);
+ this.zkClient = zkClient;
}
@Inject
@@ -30,7 +30,6 @@
import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.IZkStateListener;
-import org.I0Itec.zkclient.serialize.ZkSerializer;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -41,7 +40,7 @@
/**
* Represents a logical cluster definition fetched from Zookeeper. Notifies listeners of runtime changes in the
* configuration.
- *
+ *
*/
public class ClusterFromZK implements Cluster, IZkChildListener, IZkDataListener, IZkStateListener {
@@ -53,23 +52,19 @@
private final String taskPath;
private final String processPath;
private final Lock lock;
- private String clusterName;
+ private final String clusterName;
/**
* only the local topology
*/
@Inject
public ClusterFromZK(@Named("s4.cluster.name") String clusterName,
- @Named("s4.cluster.zk_address") String zookeeperAddress,
- @Named("s4.cluster.zk_session_timeout") int sessionTimeout,
- @Named("s4.cluster.zk_connection_timeout") int connectionTimeout) throws Exception {
+ @Named("s4.cluster.zk_connection_timeout") int connectionTimeout, ZkClient zkClient) throws Exception {
this.clusterName = clusterName;
this.taskPath = "/s4/clusters/" + clusterName + "/tasks";
this.processPath = "/s4/clusters/" + clusterName + "/process";
lock = new ReentrantLock();
- zkClient = new ZkClient(zookeeperAddress, sessionTimeout, connectionTimeout);
- ZkSerializer serializer = new ZNRecordSerializer();
- zkClient.setZkSerializer(serializer);
+ this.zkClient = zkClient;
zkClient.subscribeStateChanges(this);
if (!zkClient.waitUntilConnected(connectionTimeout, TimeUnit.MILLISECONDS)) {
throw new Exception("cannot connect to zookeeper");
@@ -46,21 +46,17 @@
private final ZkClient zkClient;
private final Lock lock;
private String machineId;
- private Map<String, ClusterFromZK> clusters = new HashMap<String, ClusterFromZK>();
- private int connectionTimeout;
- private String clusterName;
+ private final Map<String, ClusterFromZK> clusters = new HashMap<String, ClusterFromZK>();
+ private final int connectionTimeout;
+ private final String clusterName;
@Inject
public ClustersFromZK(@Named("s4.cluster.name") String clusterName,
- @Named("s4.cluster.zk_address") String zookeeperAddress,
- @Named("s4.cluster.zk_session_timeout") int sessionTimeout,
- @Named("s4.cluster.zk_connection_timeout") int connectionTimeout) throws Exception {
+ @Named("s4.cluster.zk_connection_timeout") int connectionTimeout, ZkClient zkClient) throws Exception {
this.clusterName = clusterName;
this.connectionTimeout = connectionTimeout;
lock = new ReentrantLock();
- zkClient = new ZkClient(zookeeperAddress, sessionTimeout, connectionTimeout);
- ZkSerializer serializer = new ZNRecordSerializer();
- zkClient.setZkSerializer(serializer);
+ this.zkClient = zkClient;
zkClient.subscribeStateChanges(this);
zkClient.waitUntilConnected(connectionTimeout, TimeUnit.MILLISECONDS);
try {
@@ -115,6 +111,7 @@ public void handleNewSession() throws Exception {
doProcess();
}
+ @Override
public Cluster getCluster(String clusterName) {
return clusters.get(clusterName);
}
@@ -2,6 +2,19 @@
import java.util.Set;
+/**
+ * <p>
+ * Monitors streams available in the S4 cluster.
+ * </p>
+ * <p>
+ * Maintains a data structure reflecting the currently published streams with their consumers and publishers.
+ * </p>
+ * <p>
+ * Provides methods to publish producers and consumers of streams
+ * </p>
+ *
+ */
+
public interface RemoteStreams {
public abstract Set<StreamConsumer> getConsumers(String streamName);
@@ -30,7 +30,6 @@
import org.I0Itec.zkclient.IZkChildListener;
import org.I0Itec.zkclient.IZkStateListener;
-import org.I0Itec.zkclient.serialize.ZkSerializer;
import org.apache.zookeeper.Watcher.Event.KeeperState;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -39,18 +38,6 @@
import com.google.inject.Singleton;
import com.google.inject.name.Named;
-/**
- * <p>
- * Monitors streams available in the S4 cluster.
- * </p>
- * <p>
- * Maintains a data structure reflecting the currently published streams with their consumers and publishers.
- * </p>
- * <p>
- * Provides methods to publish producers and consumers of streams
- * </p>
- *
- */
@Singleton
public class ZkRemoteStreams implements IZkStateListener, IZkChildListener, RemoteStreams {
@@ -60,7 +47,7 @@
private final Lock lock;
private final static String STREAMS_PATH = "/s4/streams";
// by stream name, then "producer"|"consumer" then
- private Map<String, Map<String, Set<StreamConsumer>>> streams = new HashMap<String, Map<String, Set<StreamConsumer>>>();
+ private final Map<String, Map<String, Set<StreamConsumer>>> streams = new HashMap<String, Map<String, Set<StreamConsumer>>>();
public enum StreamType {
PRODUCER, CONSUMER;
@@ -89,14 +76,11 @@ public String getCollectionName() {
}
@Inject
- public ZkRemoteStreams(@Named("s4.cluster.zk_address") String zookeeperAddress,
- @Named("s4.cluster.zk_session_timeout") int sessionTimeout,
- @Named("s4.cluster.zk_connection_timeout") int connectionTimeout) throws Exception {
+ public ZkRemoteStreams(@Named("s4.cluster.zk_connection_timeout") int connectionTimeout, ZkClient zkClient)
+ throws Exception {
lock = new ReentrantLock();
- zkClient = new ZkClient(zookeeperAddress, sessionTimeout, connectionTimeout);
- ZkSerializer serializer = new ZNRecordSerializer();
- zkClient.setZkSerializer(serializer);
+ this.zkClient = zkClient;
zkClient.subscribeStateChanges(this);
zkClient.waitUntilConnected(connectionTimeout, TimeUnit.MILLISECONDS);
// bug in zkClient, it does not invoke handleNewSession the first time
@@ -107,7 +91,9 @@ public ZkRemoteStreams(@Named("s4.cluster.zk_address") String zookeeperAddress,
}
- /* (non-Javadoc)
+ /*
+ * (non-Javadoc)
+ *
* @see org.apache.s4.comm.topology.RemoteStreams#getConsumers(java.lang.String)
*/
@Override
@@ -186,8 +172,11 @@ private void update(String streamName, StreamType type) {
streams.get(streamName).put(type.getCollectionName(), Collections.unmodifiableSet(consumers));
}
- /* (non-Javadoc)
- * @see org.apache.s4.comm.topology.RemoteStreams#addOutputStream(java.lang.String, java.lang.String, java.lang.String)
+ /*
+ * (non-Javadoc)
+ *
+ * @see org.apache.s4.comm.topology.RemoteStreams#addOutputStream(java.lang.String, java.lang.String,
+ * java.lang.String)
*/
@Override
public void addOutputStream(String appId, String clusterName, String streamName) {
@@ -219,7 +208,9 @@ private void createStreamPaths(String streamName) {
zkClient.createPersistent(StreamType.CONSUMER.getPath(streamName), true);
}
- /* (non-Javadoc)
+ /*
+ * (non-Javadoc)
+ *
* @see org.apache.s4.comm.topology.RemoteStreams#addInputStream(int, java.lang.String, java.lang.String)
*/
@Override
@@ -64,7 +64,9 @@ public void run() {
try {
for (String topologyName : names) {
- assignmentFromZK = new AssignmentFromZK(topologyName, CommTestUtils.ZK_STRING, 30000, 30000);
+ ZkClient zkClient = new ZkClient(CommTestUtils.ZK_STRING);
+ zkClient.setZkSerializer(new ZNRecordSerializer());
+ assignmentFromZK = new AssignmentFromZK(topologyName, 30000, zkClient);
assignmentFromZK.init();
ClusterNode assignClusterNode = assignmentFromZK.assignClusterNode();
latch.countDown();
@@ -56,11 +56,13 @@ private void testAssignment(TaskSetup taskSetup, final String clustersString) th
InterruptedException {
final Set<String> clusterNames = Sets.newHashSet(Splitter.onPattern("\\s*,\\s*").split(clustersString));
taskSetup.clean("s4");
+
for (String clusterName : clusterNames) {
taskSetup.setup(clusterName, 10, 1300);
}
-
- final ClustersFromZK clusterFromZK = new ClustersFromZK(null, CommTestUtils.ZK_STRING, 30000, 30000);
+ ZkClient zkClient1 = new ZkClient(CommTestUtils.ZK_STRING);
+ zkClient1.setZkSerializer(new ZNRecordSerializer());
+ final ClustersFromZK clusterFromZK = new ClustersFromZK(null, 30000, zkClient1);
final CountDownLatch signalAllClustersComplete = new CountDownLatch(clusterNames.size());
for (final String clusterName : clusterNames) {
@@ -87,7 +89,9 @@ public void run() {
AssignmentFromZK assignmentFromZK;
try {
for (String clusterName : clusterNames) {
- assignmentFromZK = new AssignmentFromZK(clusterName, CommTestUtils.ZK_STRING, 30000, 30000);
+ ZkClient zkClient = new ZkClient(CommTestUtils.ZK_STRING);
+ zkClient.setZkSerializer(new ZNRecordSerializer());
+ assignmentFromZK = new AssignmentFromZK(clusterName, 30000, zkClient);
assignmentFromZK.init();
ClusterNode assignClusterNode = assignmentFromZK.assignClusterNode();
latch.countDown();
@@ -5,6 +5,8 @@
import org.apache.s4.comm.DefaultCommModule;
import org.apache.s4.comm.topology.Assignment;
import org.apache.s4.comm.topology.AssignmentFromZK;
+import org.apache.s4.comm.topology.ZNRecordSerializer;
+import org.apache.s4.comm.topology.ZkClient;
import org.apache.s4.comm.util.ArchiveFetcher;
import org.apache.s4.comm.util.RemoteFileFetcher;
@@ -31,8 +33,11 @@ protected void configure() {
bind(Integer.class).annotatedWith(Names.named("s4.cluster.zk_session_timeout")).toInstance(10000);
bind(Integer.class).annotatedWith(Names.named("s4.cluster.zk_connection_timeout")).toInstance(10000);
bind(Assignment.class).to(AssignmentFromZK.class).asEagerSingleton();
- // bind(Cluster.class).to(ClusterFromZK.class);
bind(ArchiveFetcher.class).to(RemoteFileFetcher.class);
+
+ ZkClient zkClient = new ZkClient(CommTestUtils.ZK_STRING);
+ zkClient.setZkSerializer(new ZNRecordSerializer());
+ bind(ZkClient.class).toInstance(zkClient);
}
}
@@ -8,13 +8,15 @@
import org.apache.commons.configuration.PropertiesConfiguration;
import org.apache.s4.comm.topology.Assignment;
import org.apache.s4.comm.topology.AssignmentFromZK;
+import org.apache.s4.comm.topology.ZkClient;
import org.apache.s4.comm.util.ArchiveFetcher;
import org.apache.s4.comm.util.RemoteFileFetcher;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import com.google.inject.AbstractModule;
import com.google.inject.Binder;
+import com.google.inject.Scopes;
import com.google.inject.name.Names;
public class BaseModule extends AbstractModule {
@@ -45,6 +47,9 @@ protected void configure() {
bind(ArchiveFetcher.class).to(RemoteFileFetcher.class);
bind(S4Bootstrap.class);
+ // share the Zookeeper connection
+ bind(ZkClient.class).toProvider(ZkClientProvider.class).in(Scopes.SINGLETON);
+
}
@SuppressWarnings("serial")
@@ -37,11 +37,6 @@
import com.google.inject.Inject;
-/**
- * Sends events to remote clusters. Target clusters are selected dynamically based on the stream name information from
- * the event.
- *
- */
public class DefaultRemoteSenders implements RemoteSenders {
Logger logger = LoggerFactory.getLogger(DefaultRemoteSenders.class);
@@ -73,7 +68,9 @@ public DefaultRemoteSenders(RemoteEmitters remoteEmitters, RemoteStreams remoteS
serDeser = serDeserFactory.createSerializerDeserializer(Thread.currentThread().getContextClassLoader());
}
- /* (non-Javadoc)
+ /*
+ * (non-Javadoc)
+ *
* @see org.apache.s4.core.RemoteSenders#send(java.lang.String, org.apache.s4.base.Event)
*/
@Override
@@ -2,6 +2,11 @@
import org.apache.s4.base.Event;
+/**
+ * Sends events to remote clusters. Target clusters are selected dynamically based on the stream name information from
+ * the event.
+ *
+ */
public interface RemoteSenders {
public abstract void send(String hashKey, Event event);
Oops, something went wrong.

0 comments on commit d6decd0

Please sign in to comment.