Skip to content

Commit

Permalink
HBASE-19399 Purge curator dependency from hbase-client
Browse files Browse the repository at this point in the history
  • Loading branch information
Apache9 committed Dec 3, 2017
1 parent 8354a56 commit 7a5b078
Show file tree
Hide file tree
Showing 14 changed files with 687 additions and 134 deletions.
@@ -0,0 +1,21 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.testclassification;

public interface ZKTests {
}
8 changes: 0 additions & 8 deletions hbase-client/pom.xml
Expand Up @@ -195,14 +195,6 @@
<artifactId>mockito-core</artifactId>
<scope>test</scope>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-framework</artifactId>
</dependency>
<dependency>
<groupId>org.apache.curator</groupId>
<artifactId>curator-client</artifactId>
</dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-crypto</artifactId>
Expand Down
Expand Up @@ -17,8 +17,6 @@
*/
package org.apache.hadoop.hbase.client;

import static org.apache.hadoop.hbase.HConstants.DEFAULT_ZK_SESSION_TIMEOUT;
import static org.apache.hadoop.hbase.HConstants.ZK_SESSION_TIMEOUT;
import static org.apache.hadoop.hbase.client.RegionInfo.DEFAULT_REPLICA_ID;
import static org.apache.hadoop.hbase.client.RegionInfoBuilder.FIRST_META_REGIONINFO;
import static org.apache.hadoop.hbase.client.RegionReplicaUtil.getRegionInfoForDefaultReplica;
Expand All @@ -28,33 +26,25 @@

import java.io.IOException;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.TimeUnit;

import org.apache.commons.lang3.mutable.MutableInt;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.curator.framework.CuratorFramework;
import org.apache.curator.framework.CuratorFrameworkFactory;
import org.apache.curator.framework.api.BackgroundPathable;
import org.apache.curator.framework.api.CuratorEvent;
import org.apache.curator.retry.RetryNTimes;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.ClusterId;
import org.apache.hadoop.hbase.HRegionLocation;
import org.apache.hadoop.hbase.RegionLocations;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.exceptions.DeserializationException;
import org.apache.hadoop.hbase.master.RegionState;
import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos;
import org.apache.hadoop.hbase.util.Pair;
import org.apache.hadoop.hbase.util.Threads;
import org.apache.hadoop.hbase.zookeeper.ZKConfig;
import org.apache.hadoop.hbase.zookeeper.ReadOnlyZKClient;
import org.apache.hadoop.hbase.zookeeper.ZNodePaths;
import org.apache.yetus.audience.InterfaceAudience;
import org.apache.zookeeper.data.Stat;

import org.apache.hadoop.hbase.shaded.com.google.common.annotations.VisibleForTesting;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ZooKeeperProtos;

/**
* Fetch the registry data from zookeeper.
Expand All @@ -64,53 +54,36 @@ class ZKAsyncRegistry implements AsyncRegistry {

private static final Log LOG = LogFactory.getLog(ZKAsyncRegistry.class);

private final CuratorFramework zk;
private final ReadOnlyZKClient zk;

private final ZNodePaths znodePaths;

ZKAsyncRegistry(Configuration conf) {
this.znodePaths = new ZNodePaths(conf);
int zkSessionTimeout = conf.getInt(ZK_SESSION_TIMEOUT, DEFAULT_ZK_SESSION_TIMEOUT);
int zkRetry = conf.getInt("zookeeper.recovery.retry", 30);
int zkRetryIntervalMs = conf.getInt("zookeeper.recovery.retry.intervalmill", 1000);
this.zk = CuratorFrameworkFactory.builder()
.connectString(ZKConfig.getZKQuorumServersString(conf)).sessionTimeoutMs(zkSessionTimeout)
.retryPolicy(new RetryNTimes(zkRetry, zkRetryIntervalMs))
.threadFactory(
Threads.newDaemonThreadFactory(String.format("ZKClusterRegistry-0x%08x", hashCode())))
.build();
this.zk.start();
// TODO: temporary workaround for HBASE-19312, must be removed before 2.0.0 release!
try {
this.zk.blockUntilConnected(2, TimeUnit.SECONDS);
} catch (InterruptedException e) {
return;
}
this.zk = new ReadOnlyZKClient(conf);
}

private interface CuratorEventProcessor<T> {
T process(CuratorEvent event) throws Exception;
private interface Converter<T> {
T convert(byte[] data) throws Exception;
}

private static <T> CompletableFuture<T> exec(BackgroundPathable<?> opBuilder, String path,
CuratorEventProcessor<T> processor) {
private <T> CompletableFuture<T> getAndConvert(String path, Converter<T> converter) {
CompletableFuture<T> future = new CompletableFuture<>();
try {
opBuilder.inBackground((client, event) -> {
try {
future.complete(processor.process(event));
} catch (Exception e) {
future.completeExceptionally(e);
}
}).withUnhandledErrorListener((msg, e) -> future.completeExceptionally(e)).forPath(path);
} catch (Exception e) {
future.completeExceptionally(e);
}
zk.get(path).whenComplete((data, error) -> {
if (error != null) {
future.completeExceptionally(error);
return;
}
try {
future.complete(converter.convert(data));
} catch (Exception e) {
future.completeExceptionally(e);
}
});
return future;
}

private static String getClusterId(CuratorEvent event) throws DeserializationException {
byte[] data = event.getData();
private static String getClusterId(byte[] data) throws DeserializationException {
if (data == null || data.length == 0) {
return null;
}
Expand All @@ -120,17 +93,15 @@ private static String getClusterId(CuratorEvent event) throws DeserializationExc

@Override
public CompletableFuture<String> getClusterId() {
return exec(zk.getData(), znodePaths.clusterIdZNode, ZKAsyncRegistry::getClusterId);
return getAndConvert(znodePaths.clusterIdZNode, ZKAsyncRegistry::getClusterId);
}

@VisibleForTesting
CuratorFramework getCuratorFramework() {
ReadOnlyZKClient getZKClient() {
return zk;
}

private static ZooKeeperProtos.MetaRegionServer getMetaProto(CuratorEvent event)
throws IOException {
byte[] data = event.getData();
private static ZooKeeperProtos.MetaRegionServer getMetaProto(byte[] data) throws IOException {
if (data == null || data.length == 0) {
return null;
}
Expand Down Expand Up @@ -169,7 +140,7 @@ public CompletableFuture<RegionLocations> getMetaRegionLocation() {
MutableInt remaining = new MutableInt(locs.length);
znodePaths.metaReplicaZNodes.forEach((replicaId, path) -> {
if (replicaId == DEFAULT_REPLICA_ID) {
exec(zk.getData(), path, ZKAsyncRegistry::getMetaProto).whenComplete((proto, error) -> {
getAndConvert(path, ZKAsyncRegistry::getMetaProto).whenComplete((proto, error) -> {
if (error != null) {
future.completeExceptionally(error);
return;
Expand All @@ -184,13 +155,13 @@ public CompletableFuture<RegionLocations> getMetaRegionLocation() {
new IOException("Meta region is in state " + stateAndServerName.getFirst()));
return;
}
locs[DEFAULT_REPLICA_ID] = new HRegionLocation(
getRegionInfoForDefaultReplica(FIRST_META_REGIONINFO),
stateAndServerName.getSecond());
locs[DEFAULT_REPLICA_ID] =
new HRegionLocation(getRegionInfoForDefaultReplica(FIRST_META_REGIONINFO),
stateAndServerName.getSecond());
tryComplete(remaining, locs, future);
});
} else {
exec(zk.getData(), path, ZKAsyncRegistry::getMetaProto).whenComplete((proto, error) -> {
getAndConvert(path, ZKAsyncRegistry::getMetaProto).whenComplete((proto, error) -> {
if (future.isDone()) {
return;
}
Expand All @@ -203,13 +174,13 @@ public CompletableFuture<RegionLocations> getMetaRegionLocation() {
} else {
Pair<RegionState.State, ServerName> stateAndServerName = getStateAndServerName(proto);
if (stateAndServerName.getFirst() != RegionState.State.OPEN) {
LOG.warn("Meta region for replica " + replicaId + " is in state "
+ stateAndServerName.getFirst());
LOG.warn("Meta region for replica " + replicaId + " is in state " +
stateAndServerName.getFirst());
locs[replicaId] = null;
} else {
locs[replicaId] = new HRegionLocation(
getRegionInfoForReplica(FIRST_META_REGIONINFO, replicaId),
stateAndServerName.getSecond());
locs[replicaId] =
new HRegionLocation(getRegionInfoForReplica(FIRST_META_REGIONINFO, replicaId),
stateAndServerName.getSecond());
}
}
tryComplete(remaining, locs, future);
Expand All @@ -219,18 +190,12 @@ public CompletableFuture<RegionLocations> getMetaRegionLocation() {
return future;
}

private static int getCurrentNrHRS(CuratorEvent event) {
Stat stat = event.getStat();
return stat != null ? stat.getNumChildren() : 0;
}

@Override
public CompletableFuture<Integer> getCurrentNrHRS() {
return exec(zk.checkExists(), znodePaths.rsZNode, ZKAsyncRegistry::getCurrentNrHRS);
return zk.exists(znodePaths.rsZNode).thenApply(s -> s != null ? s.getNumChildren() : 0);
}

private static ZooKeeperProtos.Master getMasterProto(CuratorEvent event) throws IOException {
byte[] data = event.getData();
private static ZooKeeperProtos.Master getMasterProto(byte[] data) throws IOException {
if (data == null || data.length == 0) {
return null;
}
Expand All @@ -241,7 +206,7 @@ private static ZooKeeperProtos.Master getMasterProto(CuratorEvent event) throws

@Override
public CompletableFuture<ServerName> getMasterAddress() {
return exec(zk.getData(), znodePaths.masterAddressZNode, ZKAsyncRegistry::getMasterProto)
return getAndConvert(znodePaths.masterAddressZNode, ZKAsyncRegistry::getMasterProto)
.thenApply(proto -> {
if (proto == null) {
return null;
Expand All @@ -254,7 +219,7 @@ public CompletableFuture<ServerName> getMasterAddress() {

@Override
public CompletableFuture<Integer> getMasterInfoPort() {
return exec(zk.getData(), znodePaths.masterAddressZNode, ZKAsyncRegistry::getMasterProto)
return getAndConvert(znodePaths.masterAddressZNode, ZKAsyncRegistry::getMasterProto)
.thenApply(proto -> proto != null ? proto.getInfoPort() : 0);
}

Expand Down

0 comments on commit 7a5b078

Please sign in to comment.