Skip to content
This repository has been archived by the owner on Feb 8, 2019. It is now read-only.

Commit

Permalink
S4-120 Prefer @singleton for app singleton
Browse files Browse the repository at this point in the history
  • Loading branch information
matthieumorel committed Mar 7, 2013
1 parent 3e4c9e4 commit 1eb5c87
Show file tree
Hide file tree
Showing 6 changed files with 20 additions and 21 deletions.
Expand Up @@ -16,7 +16,6 @@

import com.google.inject.AbstractModule;
import com.google.inject.Binder;
import com.google.inject.Scopes;
import com.google.inject.name.Names;

public class BaseModule extends AbstractModule {
Expand Down Expand Up @@ -47,8 +46,8 @@ protected void configure() {
bind(ArchiveFetcher.class).to(RemoteFileFetcher.class);
bind(S4Bootstrap.class);

// share the Zookeeper connection
bind(ZkClient.class).toProvider(ZkClientProvider.class).in(Scopes.SINGLETON);
// ZkClientProvider singleton shares the Zookeeper connection
bind(ZkClient.class).toProvider(ZkClientProvider.class);

}

Expand Down
Expand Up @@ -47,7 +47,6 @@
import com.google.inject.AbstractModule;
import com.google.inject.Binder;
import com.google.inject.Provides;
import com.google.inject.Scopes;
import com.google.inject.name.Named;
import com.google.inject.name.Names;

Expand Down Expand Up @@ -84,22 +83,22 @@ protected void configure() {
/* The hashing function to map keys top partitions. */
bind(Hasher.class).to(DefaultHasher.class);

bind(DeploymentManager.class).to(DistributedDeploymentManager.class).in(Scopes.SINGLETON);
bind(DeploymentManager.class).to(DistributedDeploymentManager.class);

bind(S4RLoaderFactory.class);

// For enabling checkpointing, one needs to use a custom module, such as
// org.apache.s4.core.ft.FileSytemBasedCheckpointingModule
bind(CheckpointingFramework.class).to(NoOpCheckpointingFramework.class).in(Scopes.SINGLETON);
bind(CheckpointingFramework.class).to(NoOpCheckpointingFramework.class);

// shed load in local sender only by default
bind(SenderExecutorServiceFactory.class).to(ThrottlingSenderExecutorServiceFactory.class);
bind(RemoteSendersExecutorServiceFactory.class).to(BlockingRemoteSendersExecutorServiceFactory.class);

bind(StreamExecutorServiceFactory.class).to(BlockingStreamExecutorServiceFactory.class);

bind(RemoteStreams.class).to(ZkRemoteStreams.class).in(Scopes.SINGLETON);
bind(RemoteSenders.class).to(DefaultRemoteSenders.class).in(Scopes.SINGLETON);
bind(RemoteStreams.class).to(ZkRemoteStreams.class);
bind(RemoteSenders.class).to(DefaultRemoteSenders.class);

}

Expand Down
Expand Up @@ -36,7 +36,9 @@
import org.slf4j.LoggerFactory;

import com.google.inject.Inject;
import com.google.inject.Singleton;

@Singleton
public class DefaultRemoteSenders implements RemoteSenders {

Logger logger = LoggerFactory.getLogger(DefaultRemoteSenders.class);
Expand Down
Expand Up @@ -13,12 +13,10 @@
import java.util.concurrent.atomic.AtomicBoolean;

import org.I0Itec.zkclient.IZkDataListener;
import org.I0Itec.zkclient.serialize.ZkSerializer;
import org.apache.s4.base.util.ModulesLoader;
import org.apache.s4.comm.DefaultCommModule;
import org.apache.s4.comm.ModulesLoaderFactory;
import org.apache.s4.comm.topology.ZNRecord;
import org.apache.s4.comm.topology.ZNRecordSerializer;
import org.apache.s4.comm.topology.ZkClient;
import org.apache.s4.comm.util.ArchiveFetchException;
import org.apache.s4.comm.util.ArchiveFetcher;
Expand Down Expand Up @@ -72,15 +70,10 @@ public class S4Bootstrap {
CountDownLatch signalOneAppLoaded = new CountDownLatch(1);

@Inject
public S4Bootstrap(@Named("s4.cluster.name") String clusterName,
@Named("s4.cluster.zk_address") String zookeeperAddress,
@Named("s4.cluster.zk_session_timeout") int sessionTimeout,
@Named("s4.cluster.zk_connection_timeout") int connectionTimeout, ArchiveFetcher fetcher) {
public S4Bootstrap(@Named("s4.cluster.name") String clusterName, ZkClient zkClient, ArchiveFetcher fetcher) {

this.fetcher = fetcher;
zkClient = new ZkClient(zookeeperAddress, sessionTimeout, connectionTimeout);
ZkSerializer serializer = new ZNRecordSerializer();
zkClient.setZkSerializer(serializer);
this.zkClient = zkClient;

String appDir = "/s4/clusters/" + clusterName + "/app";
if (!zkClient.exists(appDir)) {
Expand Down
Expand Up @@ -6,17 +6,18 @@

import com.google.inject.Inject;
import com.google.inject.Provider;
import com.google.inject.Singleton;
import com.google.inject.name.Named;

/**
*
* Provides a connection to ZooKeeper through the {@link ZkClient} class.
* <p>
* This connection can easily be shared by specifying singleton scope at binding time (i.e. when binding the ZkClient
* class, see {@link BaseModule}).
*
* As an application singleton, it provides a single shared connection for an S4 node.
*
*/
@Singleton
// injected only once per node
public class ZkClientProvider implements Provider<ZkClient> {

private final ZkClient zkClient;
Expand All @@ -32,6 +33,8 @@ public ZkClientProvider(@Named("s4.cluster.zk_address") String zookeeperAddress,

@Override
public ZkClient get() {
// reuses initialized instance
return zkClient;

}
}
Expand Up @@ -20,10 +20,13 @@

import org.apache.s4.core.ProcessingElement;

import com.google.inject.Singleton;

/**
* Implementation of {@link CheckpointingFramework} that does NO checkpointing.
*
*
*/
@Singleton
public final class NoOpCheckpointingFramework implements CheckpointingFramework {

@Override
Expand Down

0 comments on commit 1eb5c87

Please sign in to comment.