Skip to content

Commit

Permalink
Fixes #912 - Create 'fluo status' command (#913)
Browse files Browse the repository at this point in the history
* Improved 'fluo list' command to print number of workers
* List & status commands now looked at number of works
  when determining if application is running.
  • Loading branch information
mikewalch committed Aug 18, 2017
1 parent dbdd1a5 commit 151c565
Show file tree
Hide file tree
Showing 8 changed files with 120 additions and 21 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ public void init(FluoConfiguration config, String propsPath, String[] args) {

try (FluoAdminImpl admin = new FluoAdminImpl(config)) {

if (admin.oracleExists()) {
if (admin.applicationRunning()) {
System.err.println("Error - The Fluo '" + config.getApplicationName() + "' application"
+ " is already running and must be stopped before running 'fluo init'. "
+ " Aborted initialization.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,11 @@

package org.apache.fluo.command;

import org.apache.curator.framework.CuratorFramework;
import org.apache.fluo.api.client.FluoAdmin;
import org.apache.fluo.api.config.FluoConfiguration;
import org.apache.fluo.core.client.FluoAdminImpl;
import org.apache.fluo.core.util.CuratorUtil;

public class CommandUtil {

Expand All @@ -30,10 +33,12 @@ public static void verifyAppInitialized(FluoConfiguration config) {

public static void verifyAppRunning(FluoConfiguration config) {
verifyAppInitialized(config);
if (!FluoAdminImpl.oracleExists(config)) {
System.out.println("A Fluo '" + config.getApplicationName() + "' application is initialized "
+ "but is not running!");
System.exit(-1);
try (FluoAdminImpl admin = new FluoAdminImpl(config)) {
if (admin.applicationRunning()) {
System.out.println("A Fluo '" + config.getApplicationName()
+ "' application is initialized " + "but is not running!");
System.exit(-1);
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ public static void main(String[] args) {

try (FluoAdminImpl admin = new FluoAdminImpl(config)) {

if (admin.oracleExists()) {
if (admin.applicationRunning()) {
System.err.println("Error - The Fluo '" + config.getApplicationName() + "' application"
+ " is already running and must be stopped before running 'fluo init'. "
+ " Aborted initialization.");
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -60,16 +60,20 @@ public static void main(String[] args) throws Exception {

System.out.println("Fluo instance (" + config.getInstanceZookeepers() + ") contains "
+ children.size() + " application(s)\n");
System.out.println("Application Status");
System.out.println("----------- ------");
System.out.println("Application Status # Workers");
System.out.println("----------- ------ ---------");

for (String path : children) {
FluoConfiguration appConfig = new FluoConfiguration(config);
appConfig.setApplicationName(path);
String state = "STOPPED";
if (FluoAdminImpl.oracleExists(appConfig)) {
state = "RUNNING";
try (FluoAdminImpl admin = new FluoAdminImpl(appConfig)) {
String state = "STOPPED";
if (admin.applicationRunning()) {
state = "RUNNING";
}
int numWorkers = admin.numWorkers();
System.out.format("%-15s %-11s %4d\n", path, state, numWorkers);
}
System.out.format("%-15s %-11s\n", path, state);
}
}
}
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one or more contributor license
* agreements. See the NOTICE file distributed with this work for additional information regarding
* copyright ownership. The ASF licenses this file to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance with the License. You may obtain a
* copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software distributed under the License
* is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express
* or implied. See the License for the specific language governing permissions and limitations under
* the License.
*/

package org.apache.fluo.command;

import java.io.File;
import java.util.Collections;
import java.util.List;
import java.util.Objects;

import com.google.common.base.Preconditions;
import org.apache.curator.framework.CuratorFramework;
import org.apache.fluo.api.config.FluoConfiguration;
import org.apache.fluo.core.client.FluoAdminImpl;
import org.apache.fluo.core.util.CuratorUtil;

public class FluoStatus {

public static void main(String[] args) throws Exception {
if (args.length != 2) {
System.err.println("Usage: FluoStatus <connectionPropsPath> <applicationName>");
System.exit(-1);
}
String connectionPropsPath = args[0];
String applicationName = args[1];
Objects.requireNonNull(connectionPropsPath);
File connectionPropsFile = new File(connectionPropsPath);
Preconditions.checkArgument(connectionPropsFile.exists(), connectionPropsPath
+ " does not exist");

FluoConfiguration config = new FluoConfiguration(connectionPropsFile);
config.setApplicationName(applicationName);

try (FluoAdminImpl admin = new FluoAdminImpl(config)) {
if (!admin.zookeeperInitialized()) {
System.out.println("NOT_FOUND");
}
if (admin.applicationRunning()) {
System.out.println("RUNNING");
} else {
System.out.println("STOPPED");
}
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -54,6 +54,7 @@
import org.apache.fluo.core.util.AccumuloUtil;
import org.apache.fluo.core.util.ByteUtil;
import org.apache.fluo.core.util.CuratorUtil;
import org.apache.fluo.core.worker.finder.hash.PartitionManager;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FileSystem;
import org.apache.hadoop.fs.Path;
Expand Down Expand Up @@ -400,9 +401,8 @@ private String getJarsFromClasspath() {
return jars.toString();
}

public static boolean oracleExists(FluoConfiguration config) {
try (CuratorFramework curator = CuratorUtil.newAppCurator(config)) {
curator.start();
public static boolean oracleExists(CuratorFramework curator) {
try {
return curator.checkExists().forPath(ZookeeperPath.ORACLE_SERVER) != null
&& !curator.getChildren().forPath(ZookeeperPath.ORACLE_SERVER).isEmpty();
} catch (Exception e) {
Expand All @@ -411,13 +411,35 @@ public static boolean oracleExists(FluoConfiguration config) {
}

public boolean oracleExists() {
CuratorFramework curator = getAppCurator();
return oracleExists(getAppCurator());
}

public static int numWorkers(CuratorFramework curator) {
int numWorkers = 0;
try {
return curator.checkExists().forPath(ZookeeperPath.ORACLE_SERVER) != null
&& !curator.getChildren().forPath(ZookeeperPath.ORACLE_SERVER).isEmpty();
if (curator.checkExists().forPath(ZookeeperPath.FINDERS) != null) {
for (String path : curator.getChildren().forPath(ZookeeperPath.FINDERS)) {
if (path.startsWith(PartitionManager.ZK_FINDER_PREFIX)) {
numWorkers++;
}
}
}
} catch (Exception e) {
throw new RuntimeException(e);
}
return numWorkers;
}

public int numWorkers() {
return numWorkers(getAppCurator());
}

public static boolean applicationRunning(CuratorFramework curator) {
return oracleExists(curator) || (numWorkers(curator) > 0);
}

public boolean applicationRunning() {
return applicationRunning(getAppCurator());
}

public boolean zookeeperInitialized() {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,8 @@ public class PartitionManager {

private static final Logger log = LoggerFactory.getLogger(PartitionManager.class);

public static final String ZK_FINDER_PREFIX = "f-";

private final PathChildrenCache childrenCache;
private final PersistentEphemeralNode myESNode;
private final int groupSize;
Expand Down Expand Up @@ -282,7 +284,7 @@ public void run() {

myESNode =
new PersistentEphemeralNode(curator, Mode.EPHEMERAL_SEQUENTIAL, ZookeeperPath.FINDERS
+ "/f-", ("" + groupSize).getBytes(UTF_8));
+ "/" + ZK_FINDER_PREFIX, ("" + groupSize).getBytes(UTF_8));
myESNode.start();
myESNode.waitForInitialCreate(1, TimeUnit.MINUTES);

Expand Down
13 changes: 11 additions & 2 deletions modules/distribution/src/main/scripts/fluo
Original file line number Diff line number Diff line change
Expand Up @@ -68,6 +68,7 @@ function print_usage {
echo " get-jars <app> <dir> Copies <app> jars from DFS to local <dir>"
echo " list Lists all Fluo applications in Fluo instance"
echo " scan <app> Prints snapshot of data in Fluo <app>"
echo " status <app> Prints status of Fluo application for <app>"
echo " stop <app> Stops Fluo application processes on this machine for <app>"
echo " oracle <app> Starts Fluo Oracle process for <app>"
echo " worker <app> Starts Fluo Worker process for <app>"
Expand All @@ -80,7 +81,6 @@ function print_usage {
echo " start <app> (Deprecated) Starts Fluo application on cluster"
echo " init <app> (Deprecated) Initializes Fluo application using configuration in apps/<app>/conf/fluo.properties"
echo " kill <app> (Deprecated) Kills Fluo application on cluster"
echo " status <app> (Deprecated) Prints status of Fluo application"
echo " info <app> (Deprecated) Prints information about containers of Fluo application"
echo " "
exit 1
Expand Down Expand Up @@ -252,6 +252,15 @@ exec)
java org.apache.fluo.cluster.command.FluoCommand "$basedir" "$HADOOP_PREFIX" "$@"
fi
;;
status)
if [ -f "$FLUO_CONN_PROPS" ]; then
verify_app "$2"
java org.apache.fluo.command.FluoStatus "$FLUO_CONN_PROPS" "$2"
else
check_hadoop
java org.apache.fluo.cluster.command.FluoCommand "$basedir" "$HADOOP_PREFIX" "$@"
fi
;;
version)
echo "$FLUO_VERSION"
;;
Expand Down Expand Up @@ -291,7 +300,7 @@ start)
export CLASSPATH="$APP_LIB_DIR/*:$CLASSPATH"
java org.apache.fluo.cluster.command.FluoCommand "$basedir" "$HADOOP_PREFIX" "$@"
;;
kill|status|info)
kill|info)
deprecated_verify "$2"
check_hadoop
java org.apache.fluo.cluster.command.FluoCommand "$basedir" "$HADOOP_PREFIX" "$@"
Expand Down

0 comments on commit 151c565

Please sign in to comment.