Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -83,11 +83,11 @@ class AsyncConnectionImpl implements AsyncConnection {

final AsyncConnectionConfiguration connConf;

private final User user;
protected final User user;

final ConnectionRegistry registry;

private final int rpcTimeout;
protected final int rpcTimeout;

protected final RpcClient rpcClient;

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AuthenticationProtos.TokenIdentifier.Kind;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionServerStatusProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.MasterProtos.MasterService;
Expand Down Expand Up @@ -54,6 +55,8 @@ public class SecurityInfo {
new SecurityInfo(SecurityConstants.MASTER_KRB_PRINCIPAL, Kind.HBASE_AUTH_TOKEN));
infos.put(CompactionServerStatusProtos.CompactionServerStatusService.getDescriptor().getName(),
new SecurityInfo(SecurityConstants.MASTER_KRB_PRINCIPAL, Kind.HBASE_AUTH_TOKEN));
infos.put(CompactionProtos.CompactionService.getDescriptor().getName(),
new SecurityInfo(SecurityConstants.COMPACTION_SERVER_KRB_PRINCIPAL, Kind.HBASE_AUTH_TOKEN));
// NOTE: IF ADDING A NEW SERVICE, BE SURE TO UPDATE HBasePolicyProvider ALSO ELSE
// new Service will not be found when all is Kerberized!!!!
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -172,6 +172,7 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.ScanRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClusterStatusProtos.RegionLoad;
import org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ComparatorProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.FilterProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos;
Expand Down Expand Up @@ -3801,4 +3802,16 @@ public static HBaseProtos.LogRequest toBalancerDecisionRequest(int limit) {
.build();
}

public static String toString(CompactionProtos.CompactRequest request) {
ServerName rsServerName = ProtobufUtil.toServerName(request.getServer());
org.apache.hadoop.hbase.client.RegionInfo regionInfo =
ProtobufUtil.toRegionInfo(request.getRegionInfo());
ColumnFamilyDescriptor cfd = ProtobufUtil.toColumnFamilyDescriptor(request.getFamily());
boolean major = request.getMajor();
int priority = request.getPriority();
return new StringBuilder("RS: ").append(rsServerName).append(", region: ")
.append(regionInfo.getRegionNameAsString()).append(", CF: ").append(cfd.getNameAsString())
.append(", major:").append(major).append(", priority:").append(priority).toString();
}

}
62 changes: 62 additions & 0 deletions hbase-protocol-shaded/src/main/protobuf/server/Compaction.proto
Original file line number Diff line number Diff line change
@@ -0,0 +1,62 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
syntax = "proto2";

// This file contains protocol buffers that are used for CompactionManagerProtocol.
package hbase.pb;

option java_package = "org.apache.hadoop.hbase.shaded.protobuf.generated";
option java_outer_classname = "CompactionProtos";
option java_generic_services = true;
option java_generate_equals_and_hash = true;
option optimize_for = SPEED;

import "HBase.proto";
import "server/ClusterStatus.proto";
import "server/ErrorHandling.proto";

message CompactRequest {
required ServerName server = 1;
required RegionInfo region_info = 2;
required ColumnFamilySchema family = 3;
required bool major = 4;
required int32 priority = 5;
repeated ServerName favored_nodes = 6;
}

message CompactResponse {
}

message CompleteCompactionRequest {
required RegionInfo region_info = 1;
required ColumnFamilySchema family = 2;
repeated string selected_files = 3;
repeated string new_files = 4;
required bool new_force_major = 5;
}

message CompleteCompactionResponse {
required bool success = 1;
}

service CompactionService {
/** Called when a region server request compact a column of a region. */
rpc RequestCompaction(CompactRequest)
returns(CompactResponse);

}
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import "server/Quota.proto";
import "server/ClusterStatus.proto";
import "server/region/WAL.proto";
import "server/region/TooSlowLog.proto";
import "server/Compaction.proto";

message GetRegionInfoRequest {
required RegionSpecifier region = 1;
Expand Down Expand Up @@ -393,6 +394,9 @@ service AdminService {
rpc ExecuteProcedures(ExecuteProceduresRequest)
returns(ExecuteProceduresResponse);

rpc CompleteCompaction(CompleteCompactionRequest)
returns(CompleteCompactionResponse);

rpc ClearSlowLogsResponses(ClearSlowLogResponseRequest)
returns(ClearSlowLogResponses);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ public interface AsyncClusterConnection extends AsyncConnection {
*/
AsyncRegionServerAdmin getRegionServerAdmin(ServerName serverName);

/**
* Get the compaction service for the given compaction server.
*/
AsyncCompactionServerService getCompactionServerService(ServerName serverName);

/**
* Get the nonce generator for this connection.
*/
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.CleanupBulkLoadResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.ClientProtos.PrepareBulkLoadResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier;
import org.apache.hadoop.hbase.shaded.protobuf.generated.HBaseProtos.RegionSpecifier.RegionSpecifierType;

Expand All @@ -49,12 +50,16 @@
*/
@InterfaceAudience.Private
class AsyncClusterConnectionImpl extends AsyncConnectionImpl implements AsyncClusterConnection {

public AsyncClusterConnectionImpl(Configuration conf, ConnectionRegistry registry,
String clusterId, SocketAddress localAddress, User user) {
super(conf, registry, clusterId, localAddress, user);
}

CompactionProtos.CompactionService.Interface createCompactionServerStub(ServerName serverName) {
return CompactionProtos.CompactionService
.newStub(rpcClient.createRpcChannel(serverName, user, rpcTimeout));
}

@Override
public NonceGenerator getNonceGenerator() {
return super.getNonceGenerator();
Expand All @@ -70,6 +75,11 @@ public AsyncRegionServerAdmin getRegionServerAdmin(ServerName serverName) {
return new AsyncRegionServerAdmin(serverName, this);
}

@Override
public AsyncCompactionServerService getCompactionServerService(ServerName serverName) {
return new AsyncCompactionServerService(serverName, this);
}

@Override
public CompletableFuture<FlushRegionResponse> flush(byte[] regionName,
boolean writeFlushWALMarker) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
/**
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.client;

import java.util.concurrent.CompletableFuture;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.ipc.HBaseRpcController;
import org.apache.yetus.audience.InterfaceAudience;

import org.apache.hbase.thirdparty.com.google.protobuf.RpcCallback;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;

import org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompactRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompactResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompactionService;


/**
* A simple wrapper of the {@link CompactionService} for a compaction server, which returns a
* {@link CompletableFuture}. This is easier to use, as if you use the raw protobuf interface, you
* need to get the result from the {@link RpcCallback}, and if there is an exception, you need to
* get it from the {@link RpcController} passed in.
* <p/>
* Notice that there is no retry, and this is intentional. We have different retry for different
* usage for now, if later we want to unify them, we can move the retry logic into this class.
*/
@InterfaceAudience.Private
public class AsyncCompactionServerService {

private final ServerName server;

private final AsyncClusterConnectionImpl conn;

AsyncCompactionServerService(ServerName server, AsyncClusterConnectionImpl conn) {
this.server = server;
this.conn = conn;
}

@FunctionalInterface
private interface RpcCall<RESP> {
void call(CompactionService.Interface stub, HBaseRpcController controller,
RpcCallback<RESP> done);
}

// TODO: eliminate duplicate code in AsyncRegionServerAdmin and maybe we could also change the
// way on how to do regionServerReport
private <RESP> CompletableFuture<RESP> call(RpcCall<RESP> rpcCall) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We do not have a common method in ConnectionUtils?

Copy link
Contributor Author

@nyl3532016 nyl3532016 Jun 16, 2021

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

yes, the call method in ConnectionUtils against ClientService.Interface

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Oh, the one in ConnectionUtils is not suitable here as it requires ClientService.Interface. But we do have another version in AsyncRegionServerAdmin which has almost the same code. Should find a way to share the code in the future.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

And this one is a call to master? If we want to follow the pattern here, maybe we could also change the way on how to do regionServerReport...

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Recorded in TODO

CompletableFuture<RESP> future = new CompletableFuture<>();
HBaseRpcController controller = conn.rpcControllerFactory.newController();
try {
rpcCall.call(conn.createCompactionServerStub(server), controller, new RpcCallback<RESP>() {
@Override
public void run(RESP resp) {
if (controller.failed()) {
future.completeExceptionally(controller.getFailed());
} else {
future.complete(resp);
}
}
});
} catch (Exception e) {
future.completeExceptionally(e);
}
return future;
}

public CompletableFuture<CompactResponse> requestCompaction(CompactRequest request) {
return call((stub, controller, done) -> stub.requestCompaction(controller, request, done));
}

}
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,8 @@
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.UpdateFavoredNodesResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.AdminProtos.WarmupRegionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompleteCompactionRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompleteCompactionResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsRequest;
import org.apache.hadoop.hbase.shaded.protobuf.generated.QuotaProtos.GetSpaceQuotaSnapshotsResponse;

Expand Down Expand Up @@ -133,6 +135,11 @@ public CompletableFuture<GetOnlineRegionResponse> getOnlineRegion(
return call((stub, controller, done) -> stub.getOnlineRegion(controller, request, done));
}

public CompletableFuture<CompleteCompactionResponse>
completeCompaction(CompleteCompactionRequest request) {
return call((stub, controller, done) -> stub.completeCompaction(controller, request, done));
}

public CompletableFuture<OpenRegionResponse> openRegion(OpenRegionRequest request) {
return call((stub, controller, done) -> stub.openRegion(controller, request, done));
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,7 @@
import java.io.IOException;
import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.atomic.LongAdder;

import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.hbase.AbstractRpcServices;
Expand All @@ -31,23 +32,35 @@
import org.slf4j.LoggerFactory;

import org.apache.hbase.thirdparty.com.google.common.collect.ImmutableList;
import org.apache.hbase.thirdparty.com.google.protobuf.RpcController;

import org.apache.hadoop.hbase.shaded.protobuf.ProtobufUtil;
import org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos;
import org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompactResponse;
import org.apache.hadoop.hbase.shaded.protobuf.generated.CompactionProtos.CompactionService;

@InterfaceAudience.Private
public class CSRpcServices extends AbstractRpcServices {
public class CSRpcServices extends AbstractRpcServices
implements CompactionService.BlockingInterface {
protected static final Logger LOG = LoggerFactory.getLogger(CSRpcServices.class);

protected final HCompactionServer compactionServer;
private final HCompactionServer compactionServer;

// Request counter.
final LongAdder requestCount = new LongAdder();
/** RPC scheduler to use for the compaction server. */
public static final String COMPACTION_SERVER_RPC_SCHEDULER_FACTORY_CLASS =
"hbase.compaction.server.rpc.scheduler.factory.class";

/**
* @return immutable list of blocking services and the security info classes that this server
* supports
*/
protected List<RpcServer.BlockingServiceAndInterface> getServices(final Configuration conf) {
// now return empty, compaction server do not receive rpc request
List<RpcServer.BlockingServiceAndInterface> bssi = new ArrayList<>();
bssi.add(new RpcServer.BlockingServiceAndInterface(
CompactionService.newReflectiveBlockingService(this),
CompactionService.BlockingInterface.class));
return new ImmutableList.Builder<RpcServer.BlockingServiceAndInterface>().addAll(bssi).build();
}

Expand All @@ -65,4 +78,19 @@ protected Class<?> getRpcSchedulerFactoryClass(Configuration conf) {
compactionServer = cs;
}


/**
* Request compaction on the compaction server.
* @param controller the RPC controller
* @param request the compaction request
*/
@Override
public CompactResponse requestCompaction(RpcController controller,
CompactionProtos.CompactRequest request) {
requestCount.increment();
LOG.info("Receive compaction request from {}", ProtobufUtil.toString(request));
compactionServer.compactionThreadManager.requestCompaction();
return CompactionProtos.CompactResponse.newBuilder().build();
}

}
Loading