diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
index cbd756e8a02..df7aeb2b2af 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServices.java
@@ -181,7 +181,7 @@ public interface QueryServices extends SQLCloseable {
public static final String STATS_GUIDEPOST_WIDTH_BYTES_ATTRIB = "phoenix.stats.guidepost.width";
public static final String STATS_GUIDEPOST_PER_REGION_ATTRIB = "phoenix.stats.guidepost.per.region";
public static final String STATS_USE_CURRENT_TIME_ATTRIB = "phoenix.stats.useCurrentTime";
-
+
@Deprecated // use STATS_COLLECTION_ENABLED config instead
public static final String STATS_ENABLED_ATTRIB = "phoenix.stats.enabled";
@@ -271,6 +271,11 @@ public interface QueryServices extends SQLCloseable {
public static final String DEFAULT_COLUMN_ENCODED_BYTES_ATRRIB = "phoenix.default.column.encoded.bytes.attrib";
public static final String DEFAULT_IMMUTABLE_STORAGE_SCHEME_ATTRIB = "phoenix.default.immutable.storage.scheme";
public static final String DEFAULT_MULTITENANT_IMMUTABLE_STORAGE_SCHEME_ATTRIB = "phoenix.default.multitenant.immutable.storage.scheme";
+
+ public static final String PHOENIX_QUERY_SERVER_CLUSTER_BASE_PATH = "phoenix.queryserver.base.path";
+ public static final String PHOENIX_QUERY_SERVER_SERVICE_NAME = "phoenix.queryserver.service.name";
+ public static final String PHOENIX_QUERY_SERVER_ZK_ACL_USERNAME = "phoenix.queryserver.zookeeper.acl.username";
+ public static final String PHOENIX_QUERY_SERVER_ZK_ACL_PASSWORD = "phoenix.queryserver.zookeeper.acl.password";
public static final String STATS_COLLECTION_ENABLED = "phoenix.stats.collection.enabled";
public static final String USE_STATS_FOR_PARALLELIZATION = "phoenix.use.stats.parallelization";
diff --git a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
index ec5c95bbe9b..935a5fdf400 100644
--- a/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
+++ b/phoenix-core/src/main/java/org/apache/phoenix/query/QueryServicesOptions.java
@@ -80,6 +80,10 @@
import static org.apache.phoenix.query.QueryServices.THREAD_POOL_SIZE_ATTRIB;
import static org.apache.phoenix.query.QueryServices.THREAD_TIMEOUT_MS_ATTRIB;
import static org.apache.phoenix.query.QueryServices.TRACING_BATCH_SIZE;
+import static org.apache.phoenix.query.QueryServices.PHOENIX_QUERY_SERVER_SERVICE_NAME;
+import static org.apache.phoenix.query.QueryServices.PHOENIX_QUERY_SERVER_CLUSTER_BASE_PATH;
+import static org.apache.phoenix.query.QueryServices.PHOENIX_QUERY_SERVER_ZK_ACL_PASSWORD;
+import static org.apache.phoenix.query.QueryServices.PHOENIX_QUERY_SERVER_ZK_ACL_USERNAME;
import static org.apache.phoenix.query.QueryServices.TRACING_ENABLED;
import static org.apache.phoenix.query.QueryServices.TRACING_STATS_TABLE_NAME_ATTRIB;
import static org.apache.phoenix.query.QueryServices.TRACING_THREAD_POOL_SIZE;
@@ -179,8 +183,8 @@ public class QueryServicesOptions {
public static final int DEFAULT_CLOCK_SKEW_INTERVAL = 2000;
public static final boolean DEFAULT_INDEX_FAILURE_HANDLING_REBUILD = true; // auto rebuild on
public static final boolean DEFAULT_INDEX_FAILURE_BLOCK_WRITE = false;
- public static final boolean DEFAULT_INDEX_FAILURE_DISABLE_INDEX = true;
- public static final boolean DEFAULT_INDEX_FAILURE_THROW_EXCEPTION = true;
+ public static final boolean DEFAULT_INDEX_FAILURE_DISABLE_INDEX = true;
+ public static final boolean DEFAULT_INDEX_FAILURE_THROW_EXCEPTION = true;
public static final long DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_INTERVAL = 60000; // 60 secs
public static final long DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_BACKWARD_TIME = 1; // 1 ms
public static final long DEFAULT_INDEX_FAILURE_HANDLING_REBUILD_OVERLAP_FORWARD_TIME = 60000 * 3; // 3 mins
@@ -295,6 +299,10 @@ public class QueryServicesOptions {
public static final int DEFAULT_COLUMN_ENCODED_BYTES = QualifierEncodingScheme.TWO_BYTE_QUALIFIERS.getSerializedMetadataValue();
public static final String DEFAULT_IMMUTABLE_STORAGE_SCHEME = ImmutableStorageScheme.SINGLE_CELL_ARRAY_WITH_OFFSETS.toString();
public static final String DEFAULT_MULTITENANT_IMMUTABLE_STORAGE_SCHEME = ImmutableStorageScheme.ONE_CELL_PER_COLUMN.toString();
+ public final static String DEFAULT_PHOENIX_QUERY_SERVER_CLUSTER_BASE_PATH = "/phoenix";
+ public final static String DEFAULT_PHOENIX_QUERY_SERVER_SERVICE_NAME = "queryserver";
+ public final static String DEFAULT_PHOENIX_QUERY_SERVER_ZK_ACL_USERNAME = "phoenix";
+ public final static String DEFAULT_PHOENIX_QUERY_SERVER_ZK_ACL_PASSWORD = "phoenix";
//by default, max connections from one client to one cluster is unlimited
public static final int DEFAULT_CLIENT_CONNECTION_MAX_ALLOWED_CONNECTIONS = 0;
@@ -388,6 +396,10 @@ public static QueryServicesOptions withDefaults() {
.setIfUnset(IS_SYSTEM_TABLE_MAPPED_TO_NAMESPACE, DEFAULT_IS_SYSTEM_TABLE_MAPPED_TO_NAMESPACE)
.setIfUnset(LOCAL_INDEX_CLIENT_UPGRADE_ATTRIB, DEFAULT_LOCAL_INDEX_CLIENT_UPGRADE)
.setIfUnset(AUTO_UPGRADE_ENABLED, DEFAULT_AUTO_UPGRADE_ENABLED)
+ .setIfUnset(PHOENIX_QUERY_SERVER_CLUSTER_BASE_PATH, DEFAULT_PHOENIX_QUERY_SERVER_CLUSTER_BASE_PATH)
+ .setIfUnset(PHOENIX_QUERY_SERVER_SERVICE_NAME, DEFAULT_PHOENIX_QUERY_SERVER_SERVICE_NAME)
+ .setIfUnset(PHOENIX_QUERY_SERVER_ZK_ACL_USERNAME, DEFAULT_PHOENIX_QUERY_SERVER_ZK_ACL_USERNAME)
+ .setIfUnset(PHOENIX_QUERY_SERVER_ZK_ACL_PASSWORD, DEFAULT_PHOENIX_QUERY_SERVER_ZK_ACL_PASSWORD)
.setIfUnset(UPLOAD_BINARY_DATA_TYPE_ENCODING, DEFAULT_UPLOAD_BINARY_DATA_TYPE_ENCODING)
.setIfUnset(TRACING_ENABLED, DEFAULT_TRACING_ENABLED)
.setIfUnset(TRACING_BATCH_SIZE, DEFAULT_TRACING_BATCH_SIZE)
diff --git a/phoenix-load-balancer/pom.xml b/phoenix-load-balancer/pom.xml
new file mode 100644
index 00000000000..8cdbd83ab18
--- /dev/null
+++ b/phoenix-load-balancer/pom.xml
@@ -0,0 +1,84 @@
+
+
+
+
+ 4.0.0
+
+ org.apache.phoenix
+ phoenix
+ 4.12.0-HBase-1.3-SNAPSHOT
+
+ phoenix-load-balancer
+ Phoenix Load Balancer
+ A Load balancer which routes calls to Phoenix Query Server
+
+
+
+ org.apache.hbase
+ hbase-common
+
+
+ org.apache.curator
+ curator-client
+
+
+ org.apache.phoenix
+ phoenix-queryserver
+
+
+
+ org.apache.curator
+ curator-test
+ test
+
+
+
+
+
+
+ maven-source-plugin
+
+
+ attach-sources
+ verify
+
+ jar-no-fork
+ test-jar-no-fork
+
+
+
+
+
+ org.apache.rat
+ apache-rat-plugin
+
+
+ src/main/resources/META-INF/services/org.apache.phoenix.loadbalancer.service.LoadBalanceZookeeperConf
+
+
+
+
+
+
+
diff --git a/phoenix-load-balancer/src/it/java/org/apache/phoenix/end2end/LoadBalancerEnd2EndIT.java b/phoenix-load-balancer/src/it/java/org/apache/phoenix/end2end/LoadBalancerEnd2EndIT.java
new file mode 100644
index 00000000000..a5e2c9b97d8
--- /dev/null
+++ b/phoenix-load-balancer/src/it/java/org/apache/phoenix/end2end/LoadBalancerEnd2EndIT.java
@@ -0,0 +1,144 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.end2end;
+
+import com.google.common.net.HostAndPort;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.curator.CuratorZookeeperClient;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.curator.TestingServer;
+import org.apache.curator.utils.CloseableUtils;
+import org.apache.phoenix.loadbalancer.service.LoadBalancer;
+import org.apache.phoenix.loadbalancer.service.LoadBalanceZookeeperConf;
+import org.apache.phoenix.loadbalancer.service.LoadBalanceZookeeperConfImpl;
+import org.apache.phoenix.queryserver.register.Registry;
+import org.apache.phoenix.queryserver.register.ZookeeperRegistry;
+import org.apache.zookeeper.KeeperException;
+import org.junit.*;
+
+import java.util.Arrays;
+import java.util.List;
+
+public class LoadBalancerEnd2EndIT {
+ private static TestingServer testingServer;
+ private static CuratorFramework curatorFramework;
+ private static final Log LOG = LogFactory.getLog(LoadBalancerEnd2EndIT.class);
+ private static final LoadBalanceZookeeperConf LOAD_BALANCER_CONFIGURATION = new LoadBalanceZookeeperConfImpl();
+ private static String path;
+ private static LoadBalancer loadBalancer;
+ private static HostAndPort pqs1 = HostAndPort.fromParts("localhost",1000);
+ private static HostAndPort pqs2 = HostAndPort.fromParts("localhost",2000);
+ private static HostAndPort pqs3 = HostAndPort.fromParts("localhost",3000);
+ public static String zkConnectString;
+ public static Registry registry;
+
+ @BeforeClass
+ public static void setup() throws Exception{
+
+ registry = new ZookeeperRegistry();
+ zkConnectString = LOAD_BALANCER_CONFIGURATION.getZkConnectString();
+ int port = Integer.parseInt(zkConnectString.split(":")[1]);
+ testingServer = new TestingServer(port);
+ testingServer.start();
+
+ path = LOAD_BALANCER_CONFIGURATION.getParentPath();
+ curatorFramework = CuratorFrameworkFactory.newClient(zkConnectString,
+ new ExponentialBackoffRetry(1000, 3));
+ curatorFramework.start();
+ createNodeForTesting(Arrays.asList(pqs1,pqs2,pqs3));
+ curatorFramework.setACL().withACL(LOAD_BALANCER_CONFIGURATION.getAcls());
+ loadBalancer = LoadBalancer.getLoadBalancer();
+ }
+
+ @AfterClass
+ public static void tearDown() throws Exception {
+ CloseableUtils.closeQuietly(curatorFramework);
+ CloseableUtils.closeQuietly(testingServer);
+ }
+
+ private static void createNodeForTesting(List pqsNodes) throws Exception{
+ for(HostAndPort pqs:pqsNodes) {
+ registry.registerServer(LOAD_BALANCER_CONFIGURATION,pqs.getPort(),zkConnectString,pqs.getHostText());
+ }
+ curatorFramework.getChildren().forPath(LOAD_BALANCER_CONFIGURATION.getParentPath()).size();
+ }
+
+
+ @Test
+ public void testGetAllServiceLocation() throws Exception {
+ Assert.assertNotNull(loadBalancer);
+ List serviceLocations = loadBalancer.getAllServiceLocation();
+ Assert.assertTrue(" must contains 3 service location",serviceLocations.size() == 3);
+ }
+
+ @Test
+ public void testGetSingleServiceLocation() throws Exception {
+ Assert.assertNotNull(loadBalancer);
+ HostAndPort serviceLocation = loadBalancer.getSingleServiceLocation();
+ Assert.assertNotNull(serviceLocation);
+ }
+
+ @Test(expected=Exception.class)
+ public void testZookeeperDown() throws Exception{
+ testingServer.stop();
+ CuratorZookeeperClient zookeeperClient = curatorFramework.getZookeeperClient();
+ //check to see if zookeeper is really down.
+ while (zookeeperClient.isConnected()){
+ Thread.sleep(1000);
+ };
+ loadBalancer.getSingleServiceLocation();
+ }
+
+ @Test(expected = KeeperException.NoNodeException.class)
+ public void testNoPhoenixQueryServerNodeInZookeeper() throws Exception{
+ List hostAndPorts = Arrays.asList(pqs1, pqs2, pqs3);
+ for(HostAndPort pqs: hostAndPorts) {
+ String fullPathToNode = LOAD_BALANCER_CONFIGURATION.getFullPathToNode(pqs);
+ curatorFramework.delete().deletingChildrenIfNeeded().forPath(fullPathToNode);
+ while (curatorFramework.checkExists().forPath(fullPathToNode) != null){
+ //wait for the node to deleted
+ Thread.sleep(1000);
+ };
+ }
+ //delete the parent
+ curatorFramework.delete().forPath(path);
+ // should throw an exception as there is
+ // no node in the zookeeper
+ try {
+ loadBalancer.getSingleServiceLocation();
+ } catch(Exception e) {
+ throw e;
+ } finally {
+ // need to create node for other tests to run.
+ createNodeForTesting(hostAndPorts);
+ }
+ }
+
+ @Test
+ public void testSingletonPropertyForLoadBalancer(){
+ LoadBalancer anotherloadBalancerRef = LoadBalancer.getLoadBalancer();
+ Assert.assertTrue(" the load balancer is not singleton",loadBalancer == anotherloadBalancerRef );
+ }
+
+
+
+}
diff --git a/phoenix-load-balancer/src/main/java/org/apache/phoenix/loadbalancer/service/LoadBalanceZookeeperConfImpl.java b/phoenix-load-balancer/src/main/java/org/apache/phoenix/loadbalancer/service/LoadBalanceZookeeperConfImpl.java
new file mode 100644
index 00000000000..98e2682695a
--- /dev/null
+++ b/phoenix-load-balancer/src/main/java/org/apache/phoenix/loadbalancer/service/LoadBalanceZookeeperConfImpl.java
@@ -0,0 +1,103 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.phoenix.loadbalancer.service;
+
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.net.HostAndPort;
+import org.apache.hadoop.conf.Configuration;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+import org.apache.phoenix.query.QueryServices;
+import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.zookeeper.ZooDefs;
+import org.apache.zookeeper.data.ACL;
+import org.apache.zookeeper.data.Id;
+
+import java.util.Arrays;
+import java.util.List;
+
+
+
+public class LoadBalanceZookeeperConfImpl implements LoadBalanceZookeeperConf {
+
+ private Configuration configuration;
+
+ public LoadBalanceZookeeperConfImpl() {
+ this.configuration = HBaseConfiguration.create();
+ }
+
+ public LoadBalanceZookeeperConfImpl(Configuration configuration) {
+ this.configuration = configuration;
+ }
+
+ @VisibleForTesting
+ public void setConfiguration(Configuration configuration) {
+ this.configuration = configuration;
+ }
+
+ @Override
+ public String getQueryServerBasePath(){
+ return configuration.get(QueryServices.PHOENIX_QUERY_SERVER_CLUSTER_BASE_PATH,
+ QueryServicesOptions.DEFAULT_PHOENIX_QUERY_SERVER_CLUSTER_BASE_PATH);
+ }
+
+ @Override
+ public String getServiceName(){
+ return configuration.get(QueryServices.PHOENIX_QUERY_SERVER_SERVICE_NAME,
+ QueryServicesOptions.DEFAULT_PHOENIX_QUERY_SERVER_SERVICE_NAME);
+ }
+
+ @Override
+ public String getZkConnectString(){
+ return String.format("%s:%s",configuration.get(QueryServices.ZOOKEEPER_QUORUM_ATTRIB,
+ "localhost"),configuration.get(QueryServices.ZOOKEEPER_PORT_ATTRIB,"2181"));
+ }
+
+ private String getZkLbUserName(){
+ return configuration.get(QueryServices.PHOENIX_QUERY_SERVER_ZK_ACL_USERNAME,
+ QueryServicesOptions.DEFAULT_PHOENIX_QUERY_SERVER_ZK_ACL_USERNAME);
+ }
+
+ private String getZkLbPassword(){
+ return configuration.get(QueryServices.PHOENIX_QUERY_SERVER_ZK_ACL_PASSWORD,
+ QueryServicesOptions.DEFAULT_PHOENIX_QUERY_SERVER_ZK_ACL_PASSWORD);
+ }
+
+ @Override
+ public List getAcls() {
+ ACL acl = new ACL();
+ acl.setId(new Id("digest",getZkLbUserName()+":"+getZkLbPassword()));
+ acl.setPerms(ZooDefs.Perms.READ);
+ return Arrays.asList(acl);
+ }
+
+ @Override
+ public String getParentPath() {
+ String path = String.format("%s/%s",getQueryServerBasePath(),getServiceName());
+ return path;
+ }
+
+ @Override
+ public String getFullPathToNode(HostAndPort hostAndPort) {
+ String path = String.format("%s/%s",getParentPath()
+ ,hostAndPort.toString());
+ return path;
+ }
+}
+
diff --git a/phoenix-load-balancer/src/main/java/org/apache/phoenix/loadbalancer/service/LoadBalancer.java b/phoenix-load-balancer/src/main/java/org/apache/phoenix/loadbalancer/service/LoadBalancer.java
new file mode 100644
index 00000000000..23e90251fc7
--- /dev/null
+++ b/phoenix-load-balancer/src/main/java/org/apache/phoenix/loadbalancer/service/LoadBalancer.java
@@ -0,0 +1,178 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.loadbalancer.service;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Lists;
+import com.google.common.net.HostAndPort;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.framework.api.UnhandledErrorListener;
+import org.apache.curator.framework.recipes.cache.PathChildrenCache;
+import org.apache.curator.framework.state.ConnectionState;
+import org.apache.curator.framework.state.ConnectionStateListener;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.curator.utils.CloseableUtils;
+import org.apache.hadoop.hbase.HBaseConfiguration;
+
+import java.net.ConnectException;
+import java.util.ArrayList;
+import java.util.concurrent.ThreadLocalRandom;
+import java.io.Closeable;
+import java.util.List;
+
+/**
+ * LoadBalancer class is singleton , used by the client
+ * to find out about various location of PQS servers.
+ * The client needs to configure the HBase-site.xml with the needed
+ * properties i.e. location of zookeeper ( or service locator), username, password etc.
+ */
+public class LoadBalancer {
+
+ private static final LoadBalanceZookeeperConf CONFIG = new LoadBalanceZookeeperConfImpl(HBaseConfiguration.create());
+ private static CuratorFramework curaFramework = null;
+ protected static final Log LOG = LogFactory.getLog(LoadBalancer.class);
+ private static PathChildrenCache cache = null;
+ private static final LoadBalancer loadBalancer = new LoadBalancer();
+ private ConnectionStateListener connectionStateListener = null;
+ private UnhandledErrorListener unhandledErrorListener = null;
+ private List closeAbles = Lists.newArrayList();
+
+ private LoadBalancer() {
+ try {
+ start();
+ }catch(Exception ex){
+ LOG.error("Exception while creating a zookeeper clients and cache",ex);
+ if ((curaFramework != null) && (connectionStateListener != null)){
+ curaFramework.getConnectionStateListenable()
+ .removeListener(connectionStateListener);
+ }
+ if ((curaFramework != null) && (unhandledErrorListener != null)){
+ curaFramework.getUnhandledErrorListenable()
+ .removeListener(unhandledErrorListener);
+ }
+ for (Closeable closeable : closeAbles) {
+ CloseableUtils.closeQuietly(closeable);
+ }
+ }
+ }
+
+
+
+ /**
+ * Return Singleton Load Balancer every single time.
+ * @return LoadBalancer
+ */
+ public static LoadBalancer getLoadBalancer() {
+ return loadBalancer;
+ }
+
+ /**
+ * It returns the location of Phoenix Query Server
+ * in form of Guava HostAndPort
+ * from the cache. The client should catch Exception incase
+ * the method is unable to fetch PQS location due to network failure or
+ * in-correct configuration issues.
+ * @return - return Guava HostAndPort. See http://google.com
+ * @throws Exception
+ */
+ public HostAndPort getSingleServiceLocation() throws Exception{
+ List childNodes = conductSanityCheckAndReturn();
+ // get an random connect string
+ int i = ThreadLocalRandom.current().nextInt(0, childNodes.size());
+ return childNodes.get(i);
+ }
+
+ /**
+ * return locations of all Phoenix Query Servers
+ * in the form of a List of PQS servers HostAndPort
+ * @return - HostAndPort
+ * @throws Exception
+ */
+ public List getAllServiceLocation() throws Exception{
+ return conductSanityCheckAndReturn();
+ }
+
+ private List conductSanityCheckAndReturn() throws Exception{
+ Preconditions.checkNotNull(curaFramework
+ ," curator framework in not initialized ");
+ Preconditions.checkNotNull(cache," cache value is not initialized");
+ boolean connected = curaFramework.getZookeeperClient().isConnected();
+ if (!connected) {
+ String message = " Zookeeper seems to be down. The data is stale ";
+ ConnectException exception =
+ new ConnectException(message);
+ LOG.error(message, exception);
+ throw exception;
+ }
+ List currentNodes = curaFramework.getChildren().forPath(CONFIG.getParentPath());
+ List returnNodes = new ArrayList<>();
+ String nodeAsString = null;
+ for(String node:currentNodes) {
+ try {
+ returnNodes.add(HostAndPort.fromString(node));
+ } catch(Throwable ex) {
+ LOG.error(" something wrong with node string "+nodeAsString,ex);
+ }
+ }
+ return returnNodes;
+ }
+ private String getZkConnectString(){
+ return CONFIG.getZkConnectString();
+ }
+
+ private ConnectionStateListener getConnectionStateListener(){
+ return new ConnectionStateListener() {
+ @Override
+ public void stateChanged(CuratorFramework client, ConnectionState newState) {
+ if (!newState.isConnected()) {
+ LOG.error( " connection to zookeeper broken. It is in "+ newState.name()+" state.");
+ }
+ }
+ };
+ }
+
+ private UnhandledErrorListener getUnhandledErrorListener(){
+ return new UnhandledErrorListener() {
+ @Override
+ public void unhandledError(String message, Throwable e) {
+ LOG.error("unhandled exception: "+ message,e);
+ }
+ };
+ }
+
+ private void start() throws Exception{
+ curaFramework = CuratorFrameworkFactory.newClient(getZkConnectString(),
+ new ExponentialBackoffRetry(1000, 3));
+ curaFramework.start();
+ curaFramework.setACL().withACL(CONFIG.getAcls());
+ connectionStateListener = getConnectionStateListener();
+ curaFramework.getConnectionStateListenable()
+ .addListener(connectionStateListener);
+ unhandledErrorListener = getUnhandledErrorListener();
+ curaFramework.getUnhandledErrorListenable()
+ .addListener(unhandledErrorListener);
+ cache = new PathChildrenCache(curaFramework, CONFIG.getParentPath(), true);
+ cache.start(PathChildrenCache.StartMode.BUILD_INITIAL_CACHE);
+ closeAbles.add(cache);
+ closeAbles.add(curaFramework);
+ }
+}
diff --git a/phoenix-load-balancer/src/main/java/org/apache/phoenix/queryserver/register/ZookeeperRegistry.java b/phoenix-load-balancer/src/main/java/org/apache/phoenix/queryserver/register/ZookeeperRegistry.java
new file mode 100644
index 00000000000..8aee1779c63
--- /dev/null
+++ b/phoenix-load-balancer/src/main/java/org/apache/phoenix/queryserver/register/ZookeeperRegistry.java
@@ -0,0 +1,72 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.queryserver.register;
+
+
+import com.google.common.net.HostAndPort;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.curator.framework.CuratorFramework;
+import org.apache.curator.framework.CuratorFrameworkFactory;
+import org.apache.curator.retry.ExponentialBackoffRetry;
+import org.apache.curator.utils.CloseableUtils;
+import org.apache.phoenix.loadbalancer.service.LoadBalanceZookeeperConf;
+import org.apache.phoenix.queryserver.server.QueryServer;
+import org.apache.zookeeper.CreateMode;
+import org.apache.zookeeper.data.Stat;
+
+import java.nio.charset.StandardCharsets;
+
+
+public class ZookeeperRegistry implements Registry {
+
+ private static final Log LOG = LogFactory.getLog(ZookeeperRegistry.class);
+ private CuratorFramework client;
+
+ public ZookeeperRegistry(){}
+
+ @Override
+ public void registerServer(LoadBalanceZookeeperConf configuration, int pqsPort,
+ String zookeeperConnectString, String pqsHost)
+ throws Exception {
+
+ this.client = CuratorFrameworkFactory.newClient(zookeeperConnectString,
+ new ExponentialBackoffRetry(1000,10));
+ this.client.start();
+ HostAndPort hostAndPort = HostAndPort.fromParts(pqsHost,pqsPort);
+ String path = configuration.getFullPathToNode(hostAndPort);
+ String node = hostAndPort.toString();
+ this.client.create().creatingParentsIfNeeded().withMode(CreateMode.EPHEMERAL).forPath(path
+ ,node.getBytes(StandardCharsets.UTF_8));
+ Stat stat = this.client.setACL().withACL(configuration.getAcls()).forPath(path);
+ if (stat != null) {
+ LOG.info(" node created with right ACL");
+ }
+ else {
+ LOG.error("could not create node with right ACL. So, system would exit now.");
+ throw new RuntimeException(" Unable to connect to Zookeeper");
+ }
+
+ }
+
+ @Override
+ public void unRegisterServer() throws Exception {
+ CloseableUtils.closeQuietly(this.client);
+ }
+}
diff --git a/phoenix-load-balancer/src/main/resources/META-INF/services/org.apache.phoenix.loadbalancer.service.LoadBalanceZookeeperConf b/phoenix-load-balancer/src/main/resources/META-INF/services/org.apache.phoenix.loadbalancer.service.LoadBalanceZookeeperConf
new file mode 100644
index 00000000000..4cc6ea47135
--- /dev/null
+++ b/phoenix-load-balancer/src/main/resources/META-INF/services/org.apache.phoenix.loadbalancer.service.LoadBalanceZookeeperConf
@@ -0,0 +1 @@
+org.apache.phoenix.loadbalancer.service.LoadBalanceZookeeperConfImpl
\ No newline at end of file
diff --git a/phoenix-queryserver/src/main/java/org/apache/phoenix/loadbalancer/service/LoadBalanceZookeeperConf.java b/phoenix-queryserver/src/main/java/org/apache/phoenix/loadbalancer/service/LoadBalanceZookeeperConf.java
new file mode 100644
index 00000000000..afce5bef509
--- /dev/null
+++ b/phoenix-queryserver/src/main/java/org/apache/phoenix/loadbalancer/service/LoadBalanceZookeeperConf.java
@@ -0,0 +1,42 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.loadbalancer.service;
+
+import com.google.common.net.HostAndPort;
+import org.apache.zookeeper.data.ACL;
+
+import java.util.List;
+
+
+public interface LoadBalanceZookeeperConf {
+
+ String getQueryServerBasePath();
+
+ String getServiceName();
+
+ String getZkConnectString();
+
+ List getAcls();
+
+ String getParentPath();
+
+ String getFullPathToNode(HostAndPort hostAndPort);
+
+
+}
diff --git a/phoenix-queryserver/src/main/java/org/apache/phoenix/queryserver/register/Registry.java b/phoenix-queryserver/src/main/java/org/apache/phoenix/queryserver/register/Registry.java
new file mode 100644
index 00000000000..598fc5adf2f
--- /dev/null
+++ b/phoenix-queryserver/src/main/java/org/apache/phoenix/queryserver/register/Registry.java
@@ -0,0 +1,48 @@
+/**
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.phoenix.queryserver.register;
+
+
+import org.apache.phoenix.loadbalancer.service.LoadBalanceZookeeperConf;
+
+/**
+ * Registry interface for implementing registering
+ * and un-registering to service locator.
+ */
+public interface Registry {
+
+ /**
+ * Unreqister the server with zookeeper. All Cleanup
+ * is done in this method.
+ * @throws Exception
+ */
+ void unRegisterServer() throws Exception;
+
+ /**
+ * Registers the server with the service locator ( zookeeper).
+ * @param configuration - Hbase Configuration
+ * @param port - port for PQS server
+ * @param connectString - zookeeper connect string
+ * @param pqsHost - host for PQS server.
+ * @throws Exception
+ */
+ void registerServer(LoadBalanceZookeeperConf configuration, int port
+ , String connectString, String pqsHost) throws Exception ;
+
+}
diff --git a/phoenix-queryserver/src/main/java/org/apache/phoenix/queryserver/server/QueryServer.java b/phoenix-queryserver/src/main/java/org/apache/phoenix/queryserver/server/QueryServer.java
index 86aa686a7fa..234201c3652 100644
--- a/phoenix-queryserver/src/main/java/org/apache/phoenix/queryserver/server/QueryServer.java
+++ b/phoenix-queryserver/src/main/java/org/apache/phoenix/queryserver/server/QueryServer.java
@@ -18,6 +18,7 @@
package org.apache.phoenix.queryserver.server;
import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
import com.google.common.cache.CacheBuilder;
import com.google.common.cache.CacheLoader;
import com.google.common.cache.LoadingCache;
@@ -48,16 +49,21 @@
import org.apache.hadoop.util.ToolRunner;
import org.apache.phoenix.query.QueryServices;
import org.apache.phoenix.query.QueryServicesOptions;
+import org.apache.phoenix.loadbalancer.service.LoadBalanceZookeeperConf;
+import org.apache.phoenix.queryserver.register.Registry;
import java.io.File;
import java.lang.management.ManagementFactory;
import java.lang.management.RuntimeMXBean;
+import java.net.InetAddress;
import java.security.PrivilegedExceptionAction;
import java.util.Arrays;
import java.util.HashSet;
import java.util.Map;
import java.util.Objects;
import java.util.Set;
+import java.util.ServiceConfigurationError;
+import java.util.ServiceLoader;
import java.util.concurrent.Callable;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.ExecutionException;
@@ -77,6 +83,7 @@ public final class QueryServer extends Configured implements Tool, Runnable {
private HttpServer server = null;
private int retCode = 0;
private Throwable t = null;
+ private Registry registry;
/**
* Log information about the currently running JVM.
@@ -182,12 +189,13 @@ public int run(String[] args) throws Exception {
QueryServices.QUERY_SERVER_HBASE_SECURITY_CONF_ATTRIB));
final boolean disableSpnego = getConf().getBoolean(QueryServices.QUERY_SERVER_SPNEGO_AUTH_DISABLED_ATTRIB,
QueryServicesOptions.DEFAULT_QUERY_SERVER_SPNEGO_AUTH_DISABLED);
+ String hostname;
final boolean disableLogin = getConf().getBoolean(QueryServices.QUERY_SERVER_DISABLE_KERBEROS_LOGIN,
QueryServicesOptions.DEFAULT_QUERY_SERVER_DISABLE_KERBEROS_LOGIN);
// handle secure cluster credentials
if (isKerberos && !disableSpnego && !disableLogin) {
- String hostname = Strings.domainNamePointerToHostName(DNS.getDefaultHost(
+ hostname = Strings.domainNamePointerToHostName(DNS.getDefaultHost(
getConf().get(QueryServices.QUERY_SERVER_DNS_INTERFACE_ATTRIB, "default"),
getConf().get(QueryServices.QUERY_SERVER_DNS_NAMESERVER_ATTRIB, "default")));
if (LOG.isDebugEnabled()) {
@@ -199,6 +207,9 @@ public int run(String[] args) throws Exception {
SecurityUtil.login(getConf(), QueryServices.QUERY_SERVER_KEYTAB_FILENAME_ATTRIB,
QueryServices.QUERY_SERVER_KERBEROS_PRINCIPAL_ATTRIB, hostname);
LOG.info("Login successful.");
+ } else {
+ hostname = InetAddress.getLocalHost().getHostName();
+ LOG.info(" Kerberos is off and hostname is : "+hostname);
}
Class extends PhoenixMetaFactory> factoryClass = getConf().getClass(
@@ -248,6 +259,7 @@ public int run(String[] args) throws Exception {
// Build and start the HttpServer
server = builder.build();
server.start();
+ registerToServiceProvider(hostname);
runningLatch.countDown();
server.join();
return 0;
@@ -255,6 +267,8 @@ public int run(String[] args) throws Exception {
LOG.fatal("Unrecoverable service error. Shutting down.", t);
this.t = t;
return -1;
+ } finally {
+ unRegister();
}
}
@@ -262,6 +276,62 @@ public synchronized void stop() {
server.stop();
}
+ public boolean registerToServiceProvider(String hostName) {
+
+ boolean success = true ;
+ try {
+ LoadBalanceZookeeperConf loadBalanceConfiguration = getLoadBalanceConfiguration();
+ Preconditions.checkNotNull(loadBalanceConfiguration);
+ this.registry = getRegistry();
+ Preconditions.checkNotNull(registry);
+ String zkConnectString = loadBalanceConfiguration.getZkConnectString();
+ this.registry.registerServer(loadBalanceConfiguration, getPort(), zkConnectString, hostName);
+ } catch(Throwable ex){
+ LOG.error("error while trying to register ",ex);
+ success = false;
+ } finally {
+ return success;
+ }
+ }
+
+
+ public LoadBalanceZookeeperConf getLoadBalanceConfiguration() {
+ ServiceLoader serviceLocator= ServiceLoader.load(LoadBalanceZookeeperConf.class);
+ LoadBalanceZookeeperConf zookeeperConfig = null;
+ try {
+ if (serviceLocator.iterator().hasNext())
+ zookeeperConfig = serviceLocator.iterator().next();
+ } catch(ServiceConfigurationError ex) {
+ LOG.error("Unable to locate the service provider for loadbalancer configuration",ex);
+ } finally {
+ return zookeeperConfig;
+ }
+ }
+
+ public Registry getRegistry() {
+ ServiceLoader serviceLocator= ServiceLoader.load(Registry.class);
+ Registry registry = null;
+ try {
+ if (serviceLocator.iterator().hasNext())
+ registry = serviceLocator.iterator().next();
+ } catch(ServiceConfigurationError ex) {
+ LOG.error("Unable to locate the zookeeper registry",ex);
+ } finally {
+ return registry;
+ }
+ }
+
+ public boolean unRegister() {
+ boolean success = true;
+ try {
+ registry.unRegisterServer();
+ }catch(Throwable ex) {
+ LOG.error("error while de-registering the query server",ex);
+ success = false;
+ } finally {
+ return success;
+ }
+ }
/**
* Parses the serialization method from the configuration.
*
diff --git a/pom.xml b/pom.xml
index b2009a4d7b8..9ff0a5043b6 100644
--- a/pom.xml
+++ b/pom.xml
@@ -36,6 +36,7 @@
phoenix-server
phoenix-assembly
phoenix-tracing-webapp
+ phoenix-load-balancer
@@ -127,6 +128,8 @@
UTF-8
UTF-8
+ 2.12.0
+
@@ -570,6 +573,11 @@
phoenix-queryserver-client
${project.version}
+
+ org.apache.phoenix
+ phoenix-load-balancer
+ ${project.version}
+
@@ -952,6 +960,16 @@
joda-time
${jodatime.version}
+
+ org.apache.curator
+ curator-test
+ ${curator.version}
+
+
+ org.apache.curator
+ curator-client
+ ${curator.version}
+