From df0460e416191dab0bd7c2c84094230210b26e74 Mon Sep 17 00:00:00 2001 From: Jing Yu Date: Tue, 20 Jun 2023 17:54:22 -0400 Subject: [PATCH] HBASE-27902 Utility to invoke coproc on multiple servers using AsyncAdmin (#5295) Signed-off-by: Duo Zhang Signed-off-by: Viraj Jasani --- .../hbase/client/AsyncAdminClientUtils.java | 85 +++++++++ ...CoprocessorOnAllRegionServersEndpoint.java | 167 ++++++++++++++++++ 2 files changed, 252 insertions(+) create mode 100644 hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminClientUtils.java create mode 100644 hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAsyncCoprocessorOnAllRegionServersEndpoint.java diff --git a/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminClientUtils.java b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminClientUtils.java new file mode 100644 index 000000000000..82269aabdb9d --- /dev/null +++ b/hbase-client/src/main/java/org/apache/hadoop/hbase/client/AsyncAdminClientUtils.java @@ -0,0 +1,85 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.client; + +import com.google.protobuf.RpcChannel; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.CompletableFuture; +import java.util.concurrent.ConcurrentHashMap; +import java.util.function.Function; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.util.FutureUtils; +import org.apache.yetus.audience.InterfaceAudience; + +/** + * Additional Asynchronous Admin capabilities for clients. + */ +@InterfaceAudience.Public +public final class AsyncAdminClientUtils { + + private AsyncAdminClientUtils() { + } + + /** + * Execute the given coprocessor call on all region servers. + *

+ * The {@code stubMaker} is just a delegation to the {@code newStub} call. Usually it is only a + * one line lambda expression, like: + * + *

+   * channel -> xxxService.newStub(channel)
+   * 
+ * + * @param asyncAdmin the asynchronous administrative API for HBase. + * @param stubMaker a delegation to the actual {@code newStub} call. + * @param callable a delegation to the actual protobuf rpc call. See the comment of + * {@link ServiceCaller} for more details. + * @param the type of the asynchronous stub + * @param the type of the return value + * @return Map of each region server to its result of the protobuf rpc call, wrapped by a + * {@link CompletableFuture}. + * @see ServiceCaller + */ + public static CompletableFuture> + coprocessorServiceOnAllRegionServers(AsyncAdmin asyncAdmin, Function stubMaker, + ServiceCaller callable) { + CompletableFuture> future = new CompletableFuture<>(); + FutureUtils.addListener(asyncAdmin.getRegionServers(), (regionServers, error) -> { + if (error != null) { + future.completeExceptionally(error); + return; + } + Map resultMap = new ConcurrentHashMap<>(); + for (ServerName regionServer : regionServers) { + FutureUtils.addListener(asyncAdmin.coprocessorService(stubMaker, callable, regionServer), + (server, err) -> { + if (err != null) { + resultMap.put(regionServer, err); + } else { + resultMap.put(regionServer, server); + } + if (resultMap.size() == regionServers.size()) { + future.complete(Collections.unmodifiableMap(resultMap)); + } + }); + } + }); + return future; + } +} diff --git a/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAsyncCoprocessorOnAllRegionServersEndpoint.java b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAsyncCoprocessorOnAllRegionServersEndpoint.java new file mode 100644 index 000000000000..018c67588029 --- /dev/null +++ b/hbase-endpoint/src/test/java/org/apache/hadoop/hbase/coprocessor/TestAsyncCoprocessorOnAllRegionServersEndpoint.java @@ -0,0 +1,167 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.apache.hadoop.hbase.coprocessor; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import com.google.protobuf.RpcCallback; +import com.google.protobuf.RpcController; +import com.google.protobuf.Service; +import java.io.FileNotFoundException; +import java.util.Collections; +import java.util.Map; +import java.util.concurrent.ExecutionException; +import java.util.concurrent.atomic.AtomicInteger; +import org.apache.hadoop.hbase.HBaseClassTestRule; +import org.apache.hadoop.hbase.HConstants; +import org.apache.hadoop.hbase.ServerName; +import org.apache.hadoop.hbase.client.AsyncAdminClientUtils; +import org.apache.hadoop.hbase.client.ConnectionFactory; +import org.apache.hadoop.hbase.client.RetriesExhaustedException; +import org.apache.hadoop.hbase.client.ServiceCaller; +import org.apache.hadoop.hbase.client.TestAsyncAdminBase; +import org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyRequest; +import org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyResponse; +import org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyService; +import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils; +import org.apache.hadoop.hbase.testclassification.ClientTests; +import org.apache.hadoop.hbase.testclassification.MediumTests; +import org.junit.AfterClass; +import org.junit.BeforeClass; +import org.junit.ClassRule; +import org.junit.Test; +import org.junit.experimental.categories.Category; +import org.junit.runner.RunWith; +import org.junit.runners.Parameterized; + +@RunWith(Parameterized.class) +@Category({ ClientTests.class, MediumTests.class }) +public class TestAsyncCoprocessorOnAllRegionServersEndpoint extends TestAsyncAdminBase { + @ClassRule + public static final HBaseClassTestRule CLASS_RULE = + HBaseClassTestRule.forClass(TestAsyncCoprocessorOnAllRegionServersEndpoint.class); + + private static final String THROW_CLASS_NAME = "java.io.FileNotFoundException"; + private static final String DUMMY_VALUE = "val"; + private static final int NUM_SLAVES = 5; + private static final int NUM_SUCCESS_REGION_SERVERS = 3; + + @BeforeClass + public static void setUpBeforeClass() throws Exception { + TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 60000); + TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 120000); + TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2); + TEST_UTIL.getConfiguration().setStrings(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY, + ProtobufCoprocessorService.class.getName()); + TEST_UTIL.getConfiguration().setStrings(CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY, + DummyRegionServerEndpoint.class.getName()); + TEST_UTIL.startMiniCluster(NUM_SLAVES); + ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get(); + } + + @AfterClass + public static void tearDownAfterClass() throws Exception { + TEST_UTIL.shutdownMiniCluster(); + } + + @Test + public void testRegionServersCoprocessorService() + throws ExecutionException, InterruptedException { + DummyRequest request = DummyRequest.getDefaultInstance(); + Map resultMap = AsyncAdminClientUtils.coprocessorServiceOnAllRegionServers(admin, + DummyService::newStub, (ServiceCaller) (stub, controller, + rpcCallback) -> stub.dummyCall(controller, request, rpcCallback)) + .get(); + resultMap.forEach((k, v) -> { + assertTrue(v instanceof DummyResponse); + DummyResponse resp = (DummyResponse) v; + assertEquals(DUMMY_VALUE, resp.getValue()); + }); + } + + @Test + public void testRegionServerCoprocessorsServiceAllFail() + throws ExecutionException, InterruptedException { + DummyRequest request = DummyRequest.getDefaultInstance(); + Map resultMap = AsyncAdminClientUtils.coprocessorServiceOnAllRegionServers(admin, + DummyService::newStub, (ServiceCaller) (stub, controller, + rpcCallback) -> stub.dummyThrow(controller, request, rpcCallback)) + .get(); + + resultMap.forEach((k, v) -> { + assertTrue(v instanceof RetriesExhaustedException); + Throwable e = (Throwable) v; + assertTrue(e.getMessage().contains(THROW_CLASS_NAME)); + }); + } + + @Test + public void testRegionServerCoprocessorsServicePartialFail() + throws ExecutionException, InterruptedException { + DummyRequest request = DummyRequest.getDefaultInstance(); + AtomicInteger callCount = new AtomicInteger(); + Map resultMap = + AsyncAdminClientUtils.coprocessorServiceOnAllRegionServers(admin, DummyService::newStub, + (ServiceCaller) (stub, controller, rpcCallback) -> { + callCount.addAndGet(1); + if (callCount.get() <= NUM_SUCCESS_REGION_SERVERS) { + stub.dummyCall(controller, request, rpcCallback); + } else { + stub.dummyThrow(controller, request, rpcCallback); + } + }).get(); + + AtomicInteger successCallCount = new AtomicInteger(); + resultMap.forEach((k, v) -> { + if (v instanceof DummyResponse) { + successCallCount.addAndGet(1); + DummyResponse resp = (DummyResponse) v; + assertEquals(DUMMY_VALUE, resp.getValue()); + } else { + assertTrue(v instanceof RetriesExhaustedException); + Throwable e = (Throwable) v; + assertTrue(e.getMessage().contains(THROW_CLASS_NAME)); + } + }); + assertEquals(NUM_SUCCESS_REGION_SERVERS, successCallCount.get()); + } + + public static class DummyRegionServerEndpoint extends DummyService + implements RegionServerCoprocessor { + @Override + public Iterable getServices() { + return Collections.singleton(this); + } + + @Override + public void dummyCall(RpcController controller, DummyRequest request, + RpcCallback callback) { + callback.run(DummyResponse.newBuilder().setValue(DUMMY_VALUE).build()); + } + + @Override + public void dummyThrow(RpcController controller, DummyRequest request, + RpcCallback done) { + CoprocessorRpcUtils.setControllerException(controller, + new FileNotFoundException("/file.txt")); + } + } +}