Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -56,21 +56,21 @@ public class MiniZooKeeperCluster {
private static final int TICK_TIME = 2000;
private static final int DEFAULT_CONNECTION_TIMEOUT = 30000;
private static final byte[] STATIC_BYTES = Bytes.toBytes("stat");
private int connectionTimeout;
private final int connectionTimeout;

private boolean started;

/** The default port. If zero, we use a random port. */
private int defaultClientPort = 0;

private List<NIOServerCnxnFactory> standaloneServerFactoryList;
private List<ZooKeeperServer> zooKeeperServers;
private List<Integer> clientPortList;
private final List<NIOServerCnxnFactory> standaloneServerFactoryList;
private final List<ZooKeeperServer> zooKeeperServers;
private final List<Integer> clientPortList;

private int activeZKServerIndex;
private int tickTime = 0;

private Configuration configuration;
private final Configuration configuration;

public MiniZooKeeperCluster() {
this(new Configuration());
Expand Down Expand Up @@ -98,6 +98,7 @@ public void addClientPort(int clientPort) {

/**
* Get the list of client ports.
*
* @return clientPortList the client port list
*/
@VisibleForTesting
Expand Down Expand Up @@ -126,7 +127,7 @@ public void setDefaultClientPort(int clientPort) {
* Selects a ZK client port.
*
* @param seedPort the seed port to start with; -1 means first time.
* @Returns a valid and unused client port
* @return a valid and unused client port
*/
private int selectClientPort(int seedPort) {
int i;
Expand All @@ -143,18 +144,16 @@ private int selectClientPort(int seedPort) {
}
}
// Make sure that the port is unused.
while (true) {
// break when an unused port is found
do {
for (i = 0; i < clientPortList.size(); i++) {
if (returnClientPort == clientPortList.get(i)) {
// Already used. Update the port and retry.
returnClientPort++;
break;
}
}
if (i == clientPortList.size()) {
break; // found a unused port, exit
}
}
} while (i != clientPortList.size());
return returnClientPort;
}

Expand All @@ -163,7 +162,7 @@ public void setTickTime(int tickTime) {
}

public int getBackupZooKeeperServerNum() {
return zooKeeperServers.size()-1;
return zooKeeperServers.size() - 1;
}

public int getZooKeeperServerNum() {
Expand All @@ -179,7 +178,7 @@ private static void setupTestEnv() {
System.setProperty("zookeeper.preAllocSize", "100");
FileTxnLog.setPreallocSize(100 * 1024);
// allow all 4 letter words
System.setProperty("zookeeper.4lw.commands.whitelist","*");
System.setProperty("zookeeper.4lw.commands.whitelist", "*");
}

public int startup(File baseDir) throws IOException, InterruptedException {
Expand Down Expand Up @@ -212,7 +211,7 @@ public int startup(File baseDir, int numZooKeeperServers) throws IOException,

// running all the ZK servers
for (int i = 0; i < numZooKeeperServers; i++) {
File dir = new File(baseDir, "zookeeper_"+i).getAbsoluteFile();
File dir = new File(baseDir, "zookeeper_" + i).getAbsoluteFile();
createDir(dir);
int tickTimeToUse;
if (this.tickTime > 0) {
Expand Down Expand Up @@ -268,8 +267,7 @@ public int startup(File baseDir, int numZooKeeperServers) throws IOException,
// We have selected a port as a client port. Update clientPortList if necessary.
if (clientPortList.size() <= i) { // it is not in the list, add the port
clientPortList.add(currentClientPort);
}
else if (clientPortList.get(i) <= 0) { // the list has invalid port, update with valid port
} else if (clientPortList.get(i) <= 0) { // the list has invalid port, update with valid port
clientPortList.remove(i);
clientPortList.add(i, currentClientPort);
}
Expand Down Expand Up @@ -405,13 +403,10 @@ private static boolean waitForServerDown(int port, long timeout) throws IOExcept
long start = System.currentTimeMillis();
while (true) {
try {
Socket sock = new Socket("localhost", port);
try {
try (Socket sock = new Socket("localhost", port)) {
OutputStream outstream = sock.getOutputStream();
outstream.write(STATIC_BYTES);
outstream.flush();
} finally {
sock.close();
}
} catch (IOException e) {
return true;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -78,10 +78,10 @@ public class RecoverableZooKeeper {
// An identifier of this process in the cluster
private final String identifier;
private final byte[] id;
private Watcher watcher;
private int sessionTimeout;
private String quorumServers;
private int maxMultiSize;
private final Watcher watcher;
private final int sessionTimeout;
private final String quorumServers;
private final int maxMultiSize;

@edu.umd.cs.findbugs.annotations.SuppressWarnings(value="DE_MIGHT_IGNORE",
justification="None. Its always been this way.")
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,13 +67,10 @@ private static void resetAcls(final ZKWatcher zkw, final String znode,

private static void resetAcls(final Configuration conf, boolean eraseAcls)
throws Exception {
ZKWatcher zkw = new ZKWatcher(conf, "ZKAclReset", null);
try {
try (ZKWatcher zkw = new ZKWatcher(conf, "ZKAclReset", null)) {
LOG.info((eraseAcls ? "Erase" : "Set") + " HBase ACLs for " +
zkw.getQuorum() + " " + zkw.getZNodePaths().baseZNode);
zkw.getQuorum() + " " + zkw.getZNodePaths().baseZNode);
resetAcls(zkw, zkw.getZNodePaths().baseZNode, eraseAcls);
} finally {
zkw.close();
}
}

Expand All @@ -96,13 +93,20 @@ private void printUsageAndExit() {
public int run(String[] args) throws Exception {
boolean eraseAcls = true;

for (int i = 0; i < args.length; ++i) {
if (args[i].equals("-help")) {
printUsageAndExit();
} else if (args[i].equals("-set-acls")) {
eraseAcls = false;
} else {
printUsageAndExit();
for (String arg : args) {
switch (arg) {
case "-help": {
printUsageAndExit();
break;
}
case "-set-acls": {
eraseAcls = false;
break;
}
default: {
printUsageAndExit();
break;
}
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@
*/
@InterfaceAudience.Private
public class ZKClusterId {
private ZKWatcher watcher;
private Abortable abortable;
private final ZKWatcher watcher;
private final Abortable abortable;
private String id;

public ZKClusterId(ZKWatcher watcher, Abortable abortable) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -45,9 +45,9 @@ public class ZKLeaderManager extends ZKListener {

private final Object lock = new Object();
private final AtomicBoolean leaderExists = new AtomicBoolean();
private String leaderZNode;
private byte[] nodeId;
private Stoppable candidate;
private final String leaderZNode;
private final byte[] nodeId;
private final Stoppable candidate;

public ZKLeaderManager(ZKWatcher watcher, String leaderZNode,
byte[] identifier, Stoppable candidate) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -246,7 +246,7 @@ private static class JaasConfiguration extends javax.security.auth.login.Configu
private static final Map<String, String> BASIC_JAAS_OPTIONS = new HashMap<>();
static {
String jaasEnvVar = System.getenv("HBASE_JAAS_DEBUG");
if (jaasEnvVar != null && "true".equalsIgnoreCase(jaasEnvVar)) {
if ("true".equalsIgnoreCase(jaasEnvVar)) {
BASIC_JAAS_OPTIONS.put("debug", "true");
}
}
Expand Down Expand Up @@ -353,7 +353,7 @@ public static boolean watchAndCheckExists(ZKWatcher zkw, String znode)
throws KeeperException {
try {
Stat s = zkw.getRecoverableZooKeeper().exists(znode, zkw);
boolean exists = s != null ? true : false;
boolean exists = s != null;
if (exists) {
LOG.debug(zkw.prefix("Set watcher on existing znode=" + znode));
} else {
Expand Down Expand Up @@ -443,8 +443,7 @@ public static List<String> listChildrenAndWatchForNewChildren(
ZKWatcher zkw, String znode)
throws KeeperException {
try {
List<String> children = zkw.getRecoverableZooKeeper().getChildren(znode, zkw);
return children;
return zkw.getRecoverableZooKeeper().getChildren(znode, zkw);
} catch(KeeperException.NoNodeException ke) {
LOG.debug(zkw.prefix("Unable to list children of znode " + znode + " " +
"because node does not exist (not an error)"));
Expand Down Expand Up @@ -1400,9 +1399,16 @@ public static void deleteNodeRecursivelyMultiOrSequential(ZKWatcher zkw,
* Chunks the provided {@code ops} when their approximate size exceeds the the configured limit.
* Take caution that this can ONLY be used for operations where atomicity is not important,
* e.g. deletions. It must not be used when atomicity of the operations is critical.
*
* @param zkw reference to the {@link ZKWatcher} which contains configuration and constants
* @param runSequentialOnMultiFailure if true when we get a ZooKeeper exception that could
* retry the operations one-by-one (sequentially)
* @param ops list of ZKUtilOp {@link ZKUtilOp} to partition while submitting batched multi
* or sequential
* @throws KeeperException unexpected ZooKeeper Exception / Zookeeper unreachable
*/
static void submitBatchedMultiOrSequential(ZKWatcher zkw, boolean runSequentialOnMultiFailure,
List<ZKUtilOp> ops) throws KeeperException {
private static void submitBatchedMultiOrSequential(ZKWatcher zkw,
boolean runSequentialOnMultiFailure, List<ZKUtilOp> ops) throws KeeperException {
// at least one element should exist
if (ops.isEmpty()) {
return;
Expand Down Expand Up @@ -1794,9 +1800,12 @@ public static String dump(ZKWatcher zkw) {
sb.append("<<FAILED LOOKUP: " + e.getMessage() + ">>");
}
sb.append("\nBackup master addresses:");
for (String child : listChildrenNoWatch(zkw,
zkw.getZNodePaths().backupMasterAddressesZNode)) {
sb.append("\n ").append(child);
final List<String> backupMasterChildrenNoWatchList = listChildrenNoWatch(zkw,
zkw.getZNodePaths().backupMasterAddressesZNode);
if (backupMasterChildrenNoWatchList != null) {
for (String child : backupMasterChildrenNoWatchList) {
sb.append("\n ").append(child);
}
}
sb.append("\nRegion server holding hbase:meta: "
+ MetaTableLocator.getMetaRegionLocation(zkw));
Expand All @@ -1808,8 +1817,12 @@ public static String dump(ZKWatcher zkw) {
+ MetaTableLocator.getMetaRegionLocation(zkw, i));
}
sb.append("\nRegion servers:");
for (String child : listChildrenNoWatch(zkw, zkw.getZNodePaths().rsZNode)) {
sb.append("\n ").append(child);
final List<String> rsChildrenNoWatchList =
listChildrenNoWatch(zkw, zkw.getZNodePaths().rsZNode);
if (rsChildrenNoWatchList != null) {
for (String child : rsChildrenNoWatchList) {
sb.append("\n ").append(child);
}
}
try {
getReplicationZnodesDump(zkw, sb);
Expand Down Expand Up @@ -1860,31 +1873,33 @@ private static void getReplicationZnodesDump(ZKWatcher zkw, StringBuilder sb)
// do a ls -r on this znode
sb.append("\n").append(replicationZnode).append(": ");
List<String> children = ZKUtil.listChildrenNoWatch(zkw, replicationZnode);
Collections.sort(children);
for (String child : children) {
String znode = ZNodePaths.joinZNode(replicationZnode, child);
if (znode.equals(zkw.getZNodePaths().peersZNode)) {
appendPeersZnodes(zkw, znode, sb);
} else if (znode.equals(zkw.getZNodePaths().queuesZNode)) {
appendRSZnodes(zkw, znode, sb);
} else if (znode.equals(zkw.getZNodePaths().hfileRefsZNode)) {
appendHFileRefsZnodes(zkw, znode, sb);
if (children != null) {
Collections.sort(children);
for (String child : children) {
String zNode = ZNodePaths.joinZNode(replicationZnode, child);
if (zNode.equals(zkw.getZNodePaths().peersZNode)) {
appendPeersZnodes(zkw, zNode, sb);
} else if (zNode.equals(zkw.getZNodePaths().queuesZNode)) {
appendRSZnodes(zkw, zNode, sb);
} else if (zNode.equals(zkw.getZNodePaths().hfileRefsZNode)) {
appendHFileRefsZNodes(zkw, zNode, sb);
}
}
}
}

private static void appendHFileRefsZnodes(ZKWatcher zkw, String hfileRefsZnode,
private static void appendHFileRefsZNodes(ZKWatcher zkw, String hFileRefsZNode,
StringBuilder sb) throws KeeperException {
sb.append("\n").append(hfileRefsZnode).append(": ");
for (String peerIdZnode : ZKUtil.listChildrenNoWatch(zkw, hfileRefsZnode)) {
String znodeToProcess = ZNodePaths.joinZNode(hfileRefsZnode, peerIdZnode);
sb.append("\n").append(znodeToProcess).append(": ");
List<String> peerHFileRefsZnodes = ZKUtil.listChildrenNoWatch(zkw, znodeToProcess);
int size = peerHFileRefsZnodes.size();
for (int i = 0; i < size; i++) {
sb.append(peerHFileRefsZnodes.get(i));
if (i != size - 1) {
sb.append(", ");
sb.append("\n").append(hFileRefsZNode).append(": ");
final List<String> hFileRefChildrenNoWatchList =
ZKUtil.listChildrenNoWatch(zkw, hFileRefsZNode);
if (hFileRefChildrenNoWatchList != null) {
for (String peerIdZNode : hFileRefChildrenNoWatchList) {
String zNodeToProcess = ZNodePaths.joinZNode(hFileRefsZNode, peerIdZNode);
sb.append("\n").append(zNodeToProcess).append(": ");
List<String> peerHFileRefsZNodes = ZKUtil.listChildrenNoWatch(zkw, zNodeToProcess);
if (peerHFileRefsZNodes != null) {
sb.append(String.join(", ", peerHFileRefsZNodes));
}
}
}
Expand Down Expand Up @@ -1996,10 +2011,10 @@ private static void appendPeerState(ZKWatcher zkw, String znodeToProcess, String
* @return The array of response strings.
* @throws IOException When the socket communication fails.
*/
public static String[] getServerStats(String server, int timeout)
private static String[] getServerStats(String server, int timeout)
throws IOException {
String[] sp = server.split(":");
if (sp == null || sp.length == 0) {
if (sp.length == 0) {
return null;
}

Expand Down Expand Up @@ -2135,7 +2150,7 @@ public static void logZKTree(ZKWatcher zkw, String root) {
* @see #logZKTree(ZKWatcher, String)
* @throws KeeperException if an unexpected exception occurs
*/
protected static void logZKTree(ZKWatcher zkw, String root, String prefix)
private static void logZKTree(ZKWatcher zkw, String root, String prefix)
throws KeeperException {
List<String> children = ZKUtil.listChildrenNoWatch(zkw, root);

Expand Down