Skip to content

Commit

Permalink
Fixed various issues.
Browse files Browse the repository at this point in the history
  • Loading branch information
amccurry committed Mar 12, 2012
1 parent 0aa6a01 commit 12b59e5
Show file tree
Hide file tree
Showing 9 changed files with 90 additions and 158 deletions.
Expand Up @@ -17,21 +17,15 @@
package com.nearinfinity.blur.manager.indexserver.utils;

import java.io.IOException;
import java.util.List;

import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;

import com.nearinfinity.blur.log.Log;
import com.nearinfinity.blur.log.LogFactory;
import com.nearinfinity.blur.manager.clusterstatus.ZookeeperPathConstants;
import com.nearinfinity.blur.utils.BlurConstants;

public class DisableTable {

private final static Log LOG = LogFactory.getLog(DisableTable.class);
// private final static Log LOG = LogFactory.getLog(DisableTable.class);

public static void disableTable(ZooKeeper zookeeper, String cluster, String table) throws IOException, InterruptedException, KeeperException {
if (zookeeper.exists(ZookeeperPathConstants.getTablePath(cluster, table) , false) == null) {
Expand All @@ -42,30 +36,30 @@ public static void disableTable(ZooKeeper zookeeper, String cluster, String tabl
throw new IOException("Table [" + table + "] already disabled.");
}
zookeeper.delete(blurTableEnabledPath, -1);
waitForWriteLocksToClear(zookeeper, cluster, table);
// waitForWriteLocksToClear(zookeeper, cluster, table);
}

private static void waitForWriteLocksToClear(ZooKeeper zookeeper, String cluster, String table) throws KeeperException, InterruptedException {
final Object object = new Object();
String path = ZookeeperPathConstants.getLockPath(cluster, table);
while (true) {
synchronized (object) {
List<String> list = zookeeper.getChildren(path, new Watcher() {
@Override
public void process(WatchedEvent event) {
synchronized (object) {
object.notifyAll();
}
}
});
if (list.isEmpty()) {
LOG.info("All [{0}] locks for table [{1}]", list.size(), table);
return;
} else {
LOG.info("Waiting for locks to be released [{0}] total [{1}]", list.size(), list);
object.wait(BlurConstants.ZK_WAIT_TIME);
}
}
}
}
// private static void waitForWriteLocksToClear(ZooKeeper zookeeper, String cluster, String table) throws KeeperException, InterruptedException {
// final Object object = new Object();
// String path = ZookeeperPathConstants.getLockPath(cluster, table);
// while (true) {
// synchronized (object) {
// List<String> list = zookeeper.getChildren(path, new Watcher() {
// @Override
// public void process(WatchedEvent event) {
// synchronized (object) {
// object.notifyAll();
// }
// }
// });
// if (list.isEmpty()) {
// LOG.info("All [{0}] locks for table [{1}]", list.size(), table);
// return;
// } else {
// LOG.info("Waiting for locks to be released [{0}] total [{1}]", list.size(), list);
// object.wait(BlurConstants.ZK_WAIT_TIME);
// }
// }
// }
// }
}
Expand Up @@ -17,25 +17,16 @@
package com.nearinfinity.blur.manager.indexserver.utils;

import java.io.IOException;
import java.util.List;

import org.apache.zookeeper.CreateMode;
import org.apache.zookeeper.KeeperException;
import org.apache.zookeeper.WatchedEvent;
import org.apache.zookeeper.Watcher;
import org.apache.zookeeper.ZooKeeper;
import org.apache.zookeeper.ZooDefs.Ids;
import org.apache.zookeeper.data.Stat;
import org.apache.zookeeper.ZooKeeper;

import com.nearinfinity.blur.log.Log;
import com.nearinfinity.blur.log.LogFactory;
import com.nearinfinity.blur.manager.clusterstatus.ZookeeperPathConstants;
import com.nearinfinity.blur.utils.BlurConstants;

public class EnableTable {

private final static Log LOG = LogFactory.getLog(EnableTable.class);

public static void enableTable(ZooKeeper zookeeper, String cluster, String table) throws IOException, KeeperException, InterruptedException {
if (zookeeper.exists(ZookeeperPathConstants.getTablePath(cluster, table) , false) == null) {
throw new IOException("Table [" + table + "] does not exist.");
Expand All @@ -45,45 +36,6 @@ public static void enableTable(ZooKeeper zookeeper, String cluster, String table
throw new IOException("Table [" + table + "] already enabled.");
}
zookeeper.create(blurTableEnabledPath, null, Ids.OPEN_ACL_UNSAFE, CreateMode.PERSISTENT);
int shardCount = getShardCount(zookeeper, cluster, table);
waitForWriteLocksToEngage(zookeeper, cluster, table, shardCount);
}

private static int getShardCount(ZooKeeper zookeeper, String cluster, String table) throws KeeperException, InterruptedException {
String path = ZookeeperPathConstants.getTableShardCountPath(cluster, table);
Stat stat = zookeeper.exists(path, false);
if (stat == null) {
throw new RuntimeException("Shard count missing for table [" + table + "]");
}
byte[] data = zookeeper.getData(path, false, stat);
if (data == null) {
throw new RuntimeException("Shard count missing for table [" + table + "]");
}
return Integer.parseInt(new String(data));
}

private static void waitForWriteLocksToEngage(ZooKeeper zookeeper, String cluster, String table, int shardCount) throws KeeperException, InterruptedException {
final Object object = new Object();
String path = ZookeeperPathConstants.getLockPath(cluster, table);
while (true) {
synchronized (object) {
List<String> list = zookeeper.getChildren(path, new Watcher() {
@Override
public void process(WatchedEvent event) {
synchronized (object) {
object.notifyAll();
}
}
});
if (list.size() == shardCount) {
LOG.info("All [{0}] locks for table [{1}] has been engaged.",list.size(), table);
return;
} else {
LOG.info("Waiting for locks to engage [{0}] out of [{1}]", list.size(), shardCount);
object.wait(BlurConstants.ZK_WAIT_TIME);
}
}
}
}

}
@@ -1,6 +1,7 @@
package com.nearinfinity.blur.thrift;

import java.util.List;
import java.util.Map;

import org.apache.thrift.TException;
import org.apache.zookeeper.ZooKeeper;
Expand Down Expand Up @@ -73,12 +74,32 @@ public final void enableTable(String table) throws BlurException, TException {
throw new BlurException("Table [" + table + "] not found.", null);
}
EnableTable.enableTable(_zookeeper, cluster, table);
waitForTheTableToEnable(cluster, table);
} catch (Exception e) {
LOG.error("Unknown error during enable of [table={0}]", e, table);
throw new BException(e.getMessage(), e);
}
}

private void waitForTheTableToEnable(String cluster, String table) throws BlurException, TException {
TableDescriptor describe = describe(table);
int shardCount = describe.shardCount;
LOG.info("Waiting for shards to enabled on table [" + table + "]");
while (true) {
Map<String, String> shardServerLayout = shardServerLayout(table);
LOG.info("Shards [" + shardServerLayout.size() + "/" + shardCount + "] of table [" + table + "] enabled");
if (shardServerLayout.size() == shardCount) {
return;
}
try {
Thread.sleep(3000);
} catch (InterruptedException e) {
LOG.error("Unknown error while enabling table [" + table + "]", e);
throw new BException("Unknown error while enabling table [" + table + "]", e);
}
}
}

@Override
public final void removeTable(String table, boolean deleteIndexFiles) throws BlurException, TException {
try {
Expand Down
Expand Up @@ -35,7 +35,6 @@
import static com.nearinfinity.blur.utils.BlurConstants.BLUR_CONTROLLER_SERVER_THRIFT_THREAD_COUNT;
import static com.nearinfinity.blur.utils.BlurConstants.BLUR_ZOOKEEPER_CONNECTION;
import static com.nearinfinity.blur.utils.BlurConstants.BLUR_ZOOKEEPER_SYSTEM_TIME_TOLERANCE;
import static com.nearinfinity.blur.utils.BlurConstants.CRAZY;
import static com.nearinfinity.blur.utils.BlurUtil.quietClose;

import java.io.IOException;
Expand All @@ -58,8 +57,8 @@
import com.nearinfinity.blur.metrics.BlurMetrics;
import com.nearinfinity.blur.thrift.client.BlurClient;
import com.nearinfinity.blur.thrift.client.BlurClientRemote;
import com.nearinfinity.blur.thrift.generated.BlurException;
import com.nearinfinity.blur.thrift.generated.Blur.Iface;
import com.nearinfinity.blur.thrift.generated.BlurException;
import com.nearinfinity.blur.utils.BlurUtil;
import com.nearinfinity.blur.zookeeper.ZkUtils;

Expand All @@ -73,13 +72,13 @@ public static void main(String[] args) throws TTransportException, IOException,
Thread.setDefaultUncaughtExceptionHandler(new SimpleUncaughtExceptionHandler());

BlurConfiguration configuration = new BlurConfiguration();

String bindAddress = configuration.get(BLUR_CONTROLLER_BIND_ADDRESS);
int bindPort = configuration.getInt(BLUR_CONTROLLER_BIND_PORT,-1);
int bindPort = configuration.getInt(BLUR_CONTROLLER_BIND_PORT, -1);
bindPort += serverIndex;
LOG.info("Shard Server using index [{0}] bind address [{1}]",serverIndex,bindAddress + ":" + bindPort);

LOG.info("Shard Server using index [{0}] bind address [{1}]", serverIndex, bindAddress + ":" + bindPort);

Configuration config = new Configuration();
BlurMetrics blurMetrics = new BlurMetrics(config);

Expand All @@ -89,18 +88,13 @@ public static void main(String[] args) throws TTransportException, IOException,

BlurQueryChecker queryChecker = new BlurQueryChecker(configuration);

boolean crazyMode = false;
if (args.length == 1 && args[1].equals(CRAZY)) {
crazyMode = true;
}

final ZooKeeper zooKeeper = ZkUtils.newZooKeeper(zkConnectionStr);
ZookeeperSystemTime.checkSystemTime(zooKeeper, configuration.getLong(BLUR_ZOOKEEPER_SYSTEM_TIME_TOLERANCE, 3000));

final ZookeeperClusterStatus clusterStatus = new ZookeeperClusterStatus(zooKeeper);

BlurClient client = new BlurClientRemote();

final BlurControllerServer controllerServer = new BlurControllerServer();
controllerServer.setClient(client);
controllerServer.setClusterStatus(clusterStatus);
Expand All @@ -120,9 +114,9 @@ public static void main(String[] args) throws TTransportException, IOException,
controllerServer.setMaxFetchDelay(configuration.getInt(BLUR_CONTROLLER_RETRY_MAX_FETCH_DELAY, 2000));
controllerServer.setMaxMutateDelay(configuration.getInt(BLUR_CONTROLLER_RETRY_MAX_MUTATE_DELAY, 2000));
controllerServer.setMaxDefaultDelay(configuration.getInt(BLUR_CONTROLLER_RETRY_MAX_DEFAULT_DELAY, 2000));

controllerServer.init();

Iface iface = BlurUtil.recordMethodCallsAndAverageTimes(blurMetrics, controllerServer, Iface.class);

int threadCount = configuration.getInt(BLUR_CONTROLLER_SERVER_THRIFT_THREAD_COUNT, 32);
Expand All @@ -133,12 +127,7 @@ public static void main(String[] args) throws TTransportException, IOException,
server.setBindAddress(bindAddress);
server.setBindPort(bindPort);
server.setThreadCount(threadCount);
if (crazyMode) {
System.err.println("Crazy mode!!!!!");
server.setIface(ThriftBlurShardServer.crazyMode(iface));
} else {
server.setIface(iface);
}
server.setIface(iface);

// This will shutdown the server when the correct path is set in zk
new BlurServerShutDown().register(new BlurShutdown() {
Expand Down

0 comments on commit 12b59e5

Please sign in to comment.