Skip to content

Commit

Permalink
HBASE-27902 Utility to invoke coproc on multiple servers using AsyncA…
Browse files Browse the repository at this point in the history
…dmin (#5295)

Signed-off-by: Duo Zhang <zhangduo@apache.org>
Signed-off-by: Viraj Jasani <vjasani@apache.org>
  • Loading branch information
jinggou authored and virajjasani committed Jun 20, 2023
1 parent c74fadd commit 8c50433
Show file tree
Hide file tree
Showing 2 changed files with 252 additions and 0 deletions.
Original file line number Diff line number Diff line change
@@ -0,0 +1,85 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;

import com.google.protobuf.RpcChannel;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.function.Function;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.util.FutureUtils;
import org.apache.yetus.audience.InterfaceAudience;

/**
* Additional Asynchronous Admin capabilities for clients.
*/
@InterfaceAudience.Public
public final class AsyncAdminClientUtils {

private AsyncAdminClientUtils() {
}

/**
* Execute the given coprocessor call on all region servers.
* <p>
* The {@code stubMaker} is just a delegation to the {@code newStub} call. Usually it is only a
* one line lambda expression, like:
*
* <pre>
* channel -&gt; xxxService.newStub(channel)
* </pre>
*
* @param asyncAdmin the asynchronous administrative API for HBase.
* @param stubMaker a delegation to the actual {@code newStub} call.
* @param callable a delegation to the actual protobuf rpc call. See the comment of
* {@link ServiceCaller} for more details.
* @param <S> the type of the asynchronous stub
* @param <R> the type of the return value
* @return Map of each region server to its result of the protobuf rpc call, wrapped by a
* {@link CompletableFuture}.
* @see ServiceCaller
*/
public static <S, R> CompletableFuture<Map<ServerName, Object>>
coprocessorServiceOnAllRegionServers(AsyncAdmin asyncAdmin, Function<RpcChannel, S> stubMaker,
ServiceCaller<S, R> callable) {
CompletableFuture<Map<ServerName, Object>> future = new CompletableFuture<>();
FutureUtils.addListener(asyncAdmin.getRegionServers(), (regionServers, error) -> {
if (error != null) {
future.completeExceptionally(error);
return;
}
Map<ServerName, Object> resultMap = new ConcurrentHashMap<>();
for (ServerName regionServer : regionServers) {
FutureUtils.addListener(asyncAdmin.coprocessorService(stubMaker, callable, regionServer),
(server, err) -> {
if (err != null) {
resultMap.put(regionServer, err);
} else {
resultMap.put(regionServer, server);
}
if (resultMap.size() == regionServers.size()) {
future.complete(Collections.unmodifiableMap(resultMap));
}
});
}
});
return future;
}
}
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.coprocessor;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertTrue;

import com.google.protobuf.RpcCallback;
import com.google.protobuf.RpcController;
import com.google.protobuf.Service;
import java.io.FileNotFoundException;
import java.util.Collections;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.atomic.AtomicInteger;
import org.apache.hadoop.hbase.HBaseClassTestRule;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.client.AsyncAdminClientUtils;
import org.apache.hadoop.hbase.client.ConnectionFactory;
import org.apache.hadoop.hbase.client.RetriesExhaustedException;
import org.apache.hadoop.hbase.client.ServiceCaller;
import org.apache.hadoop.hbase.client.TestAsyncAdminBase;
import org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyRequest;
import org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyResponse;
import org.apache.hadoop.hbase.coprocessor.protobuf.generated.DummyRegionServerEndpointProtos.DummyService;
import org.apache.hadoop.hbase.ipc.CoprocessorRpcUtils;
import org.apache.hadoop.hbase.testclassification.ClientTests;
import org.apache.hadoop.hbase.testclassification.MediumTests;
import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

@RunWith(Parameterized.class)
@Category({ ClientTests.class, MediumTests.class })
public class TestAsyncCoprocessorOnAllRegionServersEndpoint extends TestAsyncAdminBase {
@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestAsyncCoprocessorOnAllRegionServersEndpoint.class);

private static final String THROW_CLASS_NAME = "java.io.FileNotFoundException";
private static final String DUMMY_VALUE = "val";
private static final int NUM_SLAVES = 5;
private static final int NUM_SUCCESS_REGION_SERVERS = 3;

@BeforeClass
public static void setUpBeforeClass() throws Exception {
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_RPC_TIMEOUT_KEY, 60000);
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_OPERATION_TIMEOUT, 120000);
TEST_UTIL.getConfiguration().setInt(HConstants.HBASE_CLIENT_RETRIES_NUMBER, 2);
TEST_UTIL.getConfiguration().setStrings(CoprocessorHost.MASTER_COPROCESSOR_CONF_KEY,
ProtobufCoprocessorService.class.getName());
TEST_UTIL.getConfiguration().setStrings(CoprocessorHost.REGIONSERVER_COPROCESSOR_CONF_KEY,
DummyRegionServerEndpoint.class.getName());
TEST_UTIL.startMiniCluster(NUM_SLAVES);
ASYNC_CONN = ConnectionFactory.createAsyncConnection(TEST_UTIL.getConfiguration()).get();
}

@AfterClass
public static void tearDownAfterClass() throws Exception {
TEST_UTIL.shutdownMiniCluster();
}

@Test
public void testRegionServersCoprocessorService()
throws ExecutionException, InterruptedException {
DummyRequest request = DummyRequest.getDefaultInstance();
Map<ServerName,
Object> resultMap = AsyncAdminClientUtils.coprocessorServiceOnAllRegionServers(admin,
DummyService::newStub, (ServiceCaller<DummyService.Stub, DummyResponse>) (stub, controller,
rpcCallback) -> stub.dummyCall(controller, request, rpcCallback))
.get();
resultMap.forEach((k, v) -> {
assertTrue(v instanceof DummyResponse);
DummyResponse resp = (DummyResponse) v;
assertEquals(DUMMY_VALUE, resp.getValue());
});
}

@Test
public void testRegionServerCoprocessorsServiceAllFail()
throws ExecutionException, InterruptedException {
DummyRequest request = DummyRequest.getDefaultInstance();
Map<ServerName,
Object> resultMap = AsyncAdminClientUtils.coprocessorServiceOnAllRegionServers(admin,
DummyService::newStub, (ServiceCaller<DummyService.Stub, DummyResponse>) (stub, controller,
rpcCallback) -> stub.dummyThrow(controller, request, rpcCallback))
.get();

resultMap.forEach((k, v) -> {
assertTrue(v instanceof RetriesExhaustedException);
Throwable e = (Throwable) v;
assertTrue(e.getMessage().contains(THROW_CLASS_NAME));
});
}

@Test
public void testRegionServerCoprocessorsServicePartialFail()
throws ExecutionException, InterruptedException {
DummyRequest request = DummyRequest.getDefaultInstance();
AtomicInteger callCount = new AtomicInteger();
Map<ServerName, Object> resultMap =
AsyncAdminClientUtils.coprocessorServiceOnAllRegionServers(admin, DummyService::newStub,
(ServiceCaller<DummyService.Stub, DummyResponse>) (stub, controller, rpcCallback) -> {
callCount.addAndGet(1);
if (callCount.get() <= NUM_SUCCESS_REGION_SERVERS) {
stub.dummyCall(controller, request, rpcCallback);
} else {
stub.dummyThrow(controller, request, rpcCallback);
}
}).get();

AtomicInteger successCallCount = new AtomicInteger();
resultMap.forEach((k, v) -> {
if (v instanceof DummyResponse) {
successCallCount.addAndGet(1);
DummyResponse resp = (DummyResponse) v;
assertEquals(DUMMY_VALUE, resp.getValue());
} else {
assertTrue(v instanceof RetriesExhaustedException);
Throwable e = (Throwable) v;
assertTrue(e.getMessage().contains(THROW_CLASS_NAME));
}
});
assertEquals(NUM_SUCCESS_REGION_SERVERS, successCallCount.get());
}

public static class DummyRegionServerEndpoint extends DummyService
implements RegionServerCoprocessor {
@Override
public Iterable<Service> getServices() {
return Collections.singleton(this);
}

@Override
public void dummyCall(RpcController controller, DummyRequest request,
RpcCallback<DummyResponse> callback) {
callback.run(DummyResponse.newBuilder().setValue(DUMMY_VALUE).build());
}

@Override
public void dummyThrow(RpcController controller, DummyRequest request,
RpcCallback<DummyResponse> done) {
CoprocessorRpcUtils.setControllerException(controller,
new FileNotFoundException("/file.txt"));
}
}
}

0 comments on commit 8c50433

Please sign in to comment.