Skip to content
Browse files

S4 nodes will pickup the first avaialbe instance name, no need to spe…

…cify it
  • Loading branch information...
1 parent 88b0acf commit 59bcb94cc727cb1600074e6da81330953aeaa566 Daniel Gómez Ferro committed
View
46 subprojects/s4-comm/src/main/java/org/apache/s4/comm/topology/AssignmentFromHelix.java
@@ -11,14 +11,12 @@
import java.util.concurrent.locks.ReentrantLock;
import org.apache.helix.BaseDataAccessor;
-import org.apache.helix.HelixAdmin;
import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.ZNRecord;
-import org.apache.helix.manager.zk.ZKHelixAdmin;
import org.apache.helix.manager.zk.ZKHelixDataAccessor;
import org.apache.helix.manager.zk.ZNRecordSerializer;
import org.apache.helix.manager.zk.ZkBaseDataAccessor;
@@ -26,6 +24,7 @@
import org.apache.helix.model.InstanceConfig;
import org.apache.helix.participant.statemachine.StateModel;
import org.apache.helix.participant.statemachine.StateModelFactory;
+import org.apache.s4.comm.helix.S4HelixConstants;
import org.apache.s4.comm.helix.TaskStateModelFactory;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -55,7 +54,7 @@
@Inject
public AssignmentFromHelix(@Named("s4.cluster.name") String clusterName,
- @Named("s4.instance.name") String instanceName, @Named("s4.cluster.zk_address") String zookeeperAddress)
+ @Named("s4.cluster.zk_address") String zookeeperAddress)
throws Exception {
this.taskStateModelFactory = new TaskStateModelFactory();
// this.appStateModelFactory = appStateModelFactory;
@@ -67,7 +66,7 @@ public AssignmentFromHelix(@Named("s4.cluster.name") String clusterName,
zkClient.setZkSerializer(new ZNRecordSerializer());
zkClient.waitUntilConnected(60, TimeUnit.SECONDS);
BaseDataAccessor<ZNRecord> baseDataAccessor = new ZkBaseDataAccessor<ZNRecord>(zkClient);
- helixDataAccessor = new ZKHelixDataAccessor(clusterName, baseDataAccessor);
+ helixDataAccessor = new ZKHelixDataAccessor(S4HelixConstants.HELIX_CLUSTER_NAME, baseDataAccessor);
clusterNodeRef = new AtomicReference<ClusterNode>();
taskAcquired = lock.newCondition();
currentlyOwningTask = new AtomicBoolean(true);
@@ -77,14 +76,11 @@ public AssignmentFromHelix(@Named("s4.cluster.name") String clusterName,
logger.warn("Unable to get hostname", e);
machineId = "UNKNOWN";
}
- ClusterNode node = new ClusterNode(-1, Integer.parseInt(instanceName.split("_")[1]), machineId, instanceName);
- clusterNodeRef.set(node);
- currentlyOwningTask.set(true);
}
@Inject
public void init() {
- // joinCluster();
+ joinCluster();
}
@Override
@@ -103,7 +99,7 @@ public ClusterNode assignClusterNode() {
return clusterNodeRef.get();
}
- public void joinClusterOld() {
+ public void joinCluster() {
lock.lock();
try {
Builder keyBuilder = helixDataAccessor.keyBuilder();
@@ -112,25 +108,33 @@ public void joinClusterOld() {
List<String> liveInstances = helixDataAccessor.getChildNames(keyBuilder.liveInstances());
for (InstanceConfig instanceConfig : instances) {
String instanceName = instanceConfig.getInstanceName();
- if (!liveInstances.contains(instanceName)) {
- zkHelixManager = HelixManagerFactory.getZKHelixManager(clusterName, instanceName,
+ if (liveInstances.contains(instanceName)) {
+ continue;
+ }
+ String nodeGroup = instanceConfig.getRecord().getSimpleField("GROUP");
+ if (!nodeGroup.equals(clusterName)) {
+ continue;
+ }
+
+ try {
+ zkHelixManager = HelixManagerFactory.getZKHelixManager(S4HelixConstants.HELIX_CLUSTER_NAME, instanceName,
InstanceType.PARTICIPANT, zookeeperAddress);
zkHelixManager.getStateMachineEngine().registerStateModelFactory("LeaderStandby",
taskStateModelFactory);
-
zkHelixManager.connect();
- ClusterNode node = new ClusterNode(-1, Integer.parseInt(instanceConfig.getPort()), machineId,
- instanceName);
+
+ ClusterNode node = new ClusterNode(-1, Integer.parseInt(instanceName.split("_")[1]), machineId, instanceName);
clusterNodeRef.set(node);
currentlyOwningTask.set(true);
taskAcquired.signalAll();
- break;
+ return;
+ } catch (Exception e) {
+ logger.error("Unexpected exception while trying to register with Helix, retrying", e);
}
}
- if (instances.size() == liveInstances.size()) {
- System.out.println("No more nodes can join the cluster. Will wait for some node to die.");
- Thread.sleep(100000);
- }
+ // TODO wait for notification from Helix instead
+ logger.warn("No more nodes can join the cluster. Will wait for some node to die.");
+ Thread.sleep(1000);
} while (!currentlyOwningTask.get());
System.out.println("Joined the cluster:" + clusterName + " as " + clusterNodeRef.get().getTaskId());
} catch (Exception e) {
@@ -140,4 +144,8 @@ public void joinClusterOld() {
}
}
+ public HelixManager getHelixManager() {
+ return zkHelixManager;
+ }
+
}
View
18 subprojects/s4-core/src/main/java/org/apache/s4/core/BaseModule.java
@@ -6,6 +6,7 @@
import org.apache.commons.configuration.ConfigurationConverter;
import org.apache.commons.configuration.ConfigurationException;
import org.apache.commons.configuration.PropertiesConfiguration;
+import org.apache.helix.HelixManager;
import org.apache.s4.comm.helix.TaskStateModelFactory;
import org.apache.s4.comm.topology.Assignment;
import org.apache.s4.comm.topology.AssignmentFromHelix;
@@ -35,14 +36,12 @@
private PropertiesConfiguration config;
InputStream baseConfigInputStream;
String clusterName;
- private final String instanceName;
boolean useHelix = false;
- public BaseModule(InputStream baseConfigInputStream, String clusterName, String instanceName, boolean useHelix) {
+ public BaseModule(InputStream baseConfigInputStream, String clusterName, boolean useHelix) {
super();
this.baseConfigInputStream = baseConfigInputStream;
this.clusterName = clusterName;
- this.instanceName = instanceName;
this.useHelix = useHelix;
}
@@ -102,19 +101,6 @@ private void loadProperties(Binder binder) {
});
}
}
- if (instanceName != null) {
- if (config.containsKey("s4.instance.name")) {
- logger.warn(
- "instanceName [{}] passed as a parameter will not be used because an existing s4.instance.name parameter of value [{}] was found in the configuration file and will be used",
- instanceName, config.getProperty("s4.instance.name"));
- } else {
- Names.bindProperties(binder, new HashMap<String, String>() {
- {
- put("s4.instance.name", instanceName);
- }
- });
- }
- }
} catch (ConfigurationException e) {
binder.addError(e);
View
27 subprojects/s4-core/src/main/java/org/apache/s4/core/S4HelixBootstrap.java
@@ -2,19 +2,26 @@
import java.net.Inet4Address;
import java.net.UnknownHostException;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
import java.util.UUID;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
+import org.apache.helix.HelixDataAccessor;
import org.apache.helix.HelixManager;
import org.apache.helix.HelixManagerFactory;
import org.apache.helix.InstanceType;
+import org.apache.helix.PropertyKey.Builder;
import org.apache.helix.controller.HelixControllerMain;
+import org.apache.helix.model.InstanceConfig;
import org.apache.helix.spectator.RoutingTableProvider;
import org.apache.s4.comm.helix.S4HelixConstants;
import org.apache.s4.comm.helix.TaskStateModelFactory;
+import org.apache.s4.comm.topology.AssignmentFromHelix;
import org.apache.s4.comm.topology.Cluster;
import org.apache.s4.comm.util.ArchiveFetchException;
import org.apache.s4.comm.util.ArchiveFetcher;
@@ -57,33 +64,30 @@
private final String clusterName;
- private final String instanceName;
+ private String instanceName;
private final String zookeeperAddress;
- private final TaskStateModelFactory taskStateModelFactory;
private final AppStateModelFactory appStateModelFactory;
+ private final AssignmentFromHelix assignmentFromHelix;
private final Cluster cluster;
- private final Lock startingNode = new ReentrantLock();
-
public static Injector rootInjector;
@Inject
public S4HelixBootstrap(@Named("s4.cluster.name") String clusterName,
- @Named("s4.instance.name") String instanceName, @Named("s4.cluster.zk_address") String zookeeperAddress,
+ @Named("s4.cluster.zk_address") String zookeeperAddress,
@Named("s4.cluster.zk_session_timeout") int sessionTimeout,
@Named("s4.cluster.zk_connection_timeout") int connectionTimeout,
- AppStateModelFactory appStateModelFactory, TaskStateModelFactory taskStateModelFactory,
+ AppStateModelFactory appStateModelFactory, AssignmentFromHelix assignmentFromHelix,
ArchiveFetcher fetcher, Cluster cluster) {
this.clusterName = clusterName;
- this.instanceName = instanceName;
this.zookeeperAddress = zookeeperAddress;
- this.taskStateModelFactory = taskStateModelFactory;
this.appStateModelFactory = appStateModelFactory;
this.fetcher = fetcher;
this.cluster = cluster;
+ this.assignmentFromHelix = assignmentFromHelix;
}
@Override
@@ -96,19 +100,16 @@ public void start(Injector parentInjector) throws InterruptedException, ArchiveF
HelixControllerMain.STANDALONE);
// this.parentInjector = parentInjector;
S4HelixBootstrap.rootInjector = parentInjector;
+
registerWithHelix();
signalOneAppLoaded.await();
}
private void registerWithHelix() {
- HelixManager helixManager;
+ HelixManager helixManager = assignmentFromHelix.getHelixManager();
try {
- helixManager = HelixManagerFactory.getZKHelixManager(S4HelixConstants.HELIX_CLUSTER_NAME, instanceName,
- InstanceType.PARTICIPANT, zookeeperAddress);
- helixManager.getStateMachineEngine().registerStateModelFactory("LeaderStandby", taskStateModelFactory);
helixManager.getStateMachineEngine().registerStateModelFactory("OnlineOffline", appStateModelFactory);
- helixManager.connect();
helixManager.addExternalViewChangeListener((RoutingTableProvider) cluster);
} catch (Exception e) {
// TODO Auto-generated catch block
View
5 subprojects/s4-core/src/main/java/org/apache/s4/core/S4Node.java
@@ -50,7 +50,7 @@ public void uncaughtException(Thread t, Throwable e) {
Injector injector = Guice
.createInjector(new Module[] { new BaseModule(Resources.getResource("default.s4.base.properties")
- .openStream(), nodeArgs.clusterName, nodeArgs.instanceName, nodeArgs.useHelix) });
+ .openStream(), nodeArgs.clusterName, nodeArgs.useHelix) });
Bootstrap bootstrap = injector.getInstance(Bootstrap.class);
try {
bootstrap.start(injector);
@@ -75,9 +75,6 @@ public void uncaughtException(Thread t, Throwable e) {
@Parameter(names = "-zk", description = "Zookeeper connection string", required = false)
String zkConnectionString;
- @Parameter(names = { "-id", "-nodeId" }, description = "Node/Instance id that uniquely identifies a node", required = false)
- String instanceName = null;
-
@Parameter(names = "-helix", description = "Required flag when using a Helix based cluster manager", required = false, arity = 0)
boolean useHelix = false;
}
View
2 subprojects/s4-core/src/test/java/org/apache/s4/core/moduleloader/ModuleLoaderTestUtils.java
@@ -69,7 +69,7 @@ public static Process testModuleLoader(boolean fork) throws Exception {
}
Injector injector = Guice.createInjector(new BaseModule(Resources.getResource("default.s4.base.properties")
- .openStream(), "cluster1", null, false),
+ .openStream(), "cluster1", false),
new DefaultCommModule(Resources.getResource("default.s4.comm.properties").openStream()));
Emitter emitter = injector.getInstance(TCPEmitter.class);
View
3 subprojects/s4-core/src/test/java/org/apache/s4/fixtures/CoreTestUtils.java
@@ -115,8 +115,7 @@ public static void callGradleTask(File buildFile, String taskName, String[] para
public static Injector createInjectorWithNonFailFastZKClients() throws IOException {
return Guice.createInjector(Modules.override(
- new BaseModule(Resources.getResource("default.s4.base.properties").openStream(), "cluster1", null,
- false),
+ new BaseModule(Resources.getResource("default.s4.base.properties").openStream(), "cluster1", false),
new DefaultCommModule(Resources.getResource("default.s4.comm.properties").openStream()),
new DefaultCoreModule(Resources.getResource("default.s4.core.properties").openStream())).with(
new NonFailFastZookeeperClientsModule()));
View
2 subprojects/s4-core/src/test/java/org/apache/s4/wordcount/WordCountTest.java
@@ -60,7 +60,7 @@
public void createEmitter() throws IOException {
injector = Guice.createInjector(new BaseModule(
- Resources.getResource("default.s4.base.properties").openStream(), "cluster1", null, false),
+ Resources.getResource("default.s4.base.properties").openStream(), "cluster1", false),
new DefaultCommModule(Resources.getResource("default.s4.comm.properties").openStream()),
new DefaultCoreModule(Resources.getResource("default.s4.core.properties").openStream()));

0 comments on commit 59bcb94

Please sign in to comment.
Something went wrong with that request. Please try again.