Skip to content

Commit

Permalink
HBASE-28064:Implement truncate_region command (#5480)
Browse files Browse the repository at this point in the history
Signed-off-by: Wellington Chevreuil <wchevreuil@apache.org>
  • Loading branch information
vaijosh committed Oct 31, 2023
1 parent 4f3e63e commit a208ba2
Show file tree
Hide file tree
Showing 29 changed files with 1,030 additions and 3 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -3303,4 +3303,18 @@ List<LogEntry> getLogEntries(Set<ServerName> serverNames, String logType, Server
* Flush master local region
*/
void flushMasterStore() throws IOException;

/**
* Truncate an individual region.
* @param regionName region to truncate
* @throws IOException if a remote or network exception occurs
*/
void truncateRegion(byte[] regionName) throws IOException;

/**
* Truncate an individual region. Asynchronous operation.
* @param regionName region to truncate
* @throws IOException if a remote or network exception occurs
*/
Future<Void> truncateRegionAsync(byte[] regionName) throws IOException;
}
Original file line number Diff line number Diff line change
Expand Up @@ -625,6 +625,12 @@ default CompletableFuture<Void> mergeRegions(byte[] nameOfRegionA, byte[] nameOf
*/
CompletableFuture<Void> splitRegion(byte[] regionName, byte[] splitPoint);

/**
* Truncate an individual region.
* @param regionName region to truncate
*/
CompletableFuture<Void> truncateRegion(byte[] regionName);

/**
* Assign an individual region.
* @param regionName Encoded or full name of region to assign.
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -386,6 +386,11 @@ public CompletableFuture<Void> splitRegion(byte[] regionName, byte[] splitPoint)
return wrap(rawAdmin.splitRegion(regionName, splitPoint));
}

@Override
public CompletableFuture<Void> truncateRegion(byte[] regionName) {
return wrap(rawAdmin.truncateRegion(regionName));
}

@Override
public CompletableFuture<Void> assign(byte[] regionName) {
return wrap(rawAdmin.assign(regionName));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1559,6 +1559,12 @@ public MasterProtos.SplitTableRegionResponse splitRegion(RpcController controlle
return stub.splitRegion(controller, request);
}

@Override
public MasterProtos.TruncateRegionResponse truncateRegion(RpcController controller,
MasterProtos.TruncateRegionRequest request) throws ServiceException {
return stub.truncateRegion(controller, request);
}

@Override
public MasterProtos.DeleteTableResponse deleteTable(RpcController controller,
MasterProtos.DeleteTableRequest request) throws ServiceException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2057,6 +2057,65 @@ public void splitRegion(final byte[] regionName, final byte[] splitPoint) throws
splitRegionAsync(regionServerPair.getFirst(), splitPoint);
}

@Override
public void truncateRegion(byte[] regionName) throws IOException {
get(truncateRegionAsync(regionName), syncWaitTimeout, TimeUnit.MILLISECONDS);
}

@Override
public Future<Void> truncateRegionAsync(byte[] regionName) throws IOException {
Pair<RegionInfo, ServerName> pair = getRegion(regionName);
RegionInfo hri;

if (pair != null) {
hri = pair.getFirst();
if (hri != null && hri.getReplicaId() != HRegionInfo.DEFAULT_REPLICA_ID) {
throw new IllegalArgumentException(
"Can't truncate replicas directly.Replicas are auto-truncated "
+ "when their primary is truncated.");
}
} else {
throw new IllegalArgumentException("Invalid region: " + Bytes.toStringBinary(regionName));
}

TableName tableName = hri.getTable();

MasterProtos.TruncateRegionResponse response =
executeCallable(getTruncateRegionCallable(tableName, hri));

return new TruncateRegionFuture(this, tableName, response);
}

private MasterCallable<MasterProtos.TruncateRegionResponse>
getTruncateRegionCallable(TableName tableName, RegionInfo hri) {
return new MasterCallable<MasterProtos.TruncateRegionResponse>(getConnection(),
getRpcControllerFactory()) {
Long nonceGroup = ng.getNonceGroup();
Long nonce = ng.newNonce();

@Override
protected MasterProtos.TruncateRegionResponse rpcCall() throws Exception {
setPriority(tableName);
MasterProtos.TruncateRegionRequest request =
RequestConverter.buildTruncateRegionRequest(hri, nonceGroup, nonce);
return master.truncateRegion(getRpcController(), request);
}
};
}

private static class TruncateRegionFuture extends TableFuture<Void> {
public TruncateRegionFuture(final HBaseAdmin admin, final TableName tableName,
final MasterProtos.TruncateRegionResponse response) {
super(admin, tableName,
(response != null && response.hasProcId()) ? response.getProcId() : null);
}

@Override
public String getOperationType() {
return "TRUNCATE_REGION";
}
}

private static class ModifyTableFuture extends TableFuture<Void> {
public ModifyTableFuture(final HBaseAdmin admin, final TableName tableName,
final ModifyTableResponse response) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1603,6 +1603,60 @@ private CompletableFuture<Void> split(final RegionInfo hri, byte[] splitPoint) {
return future;
}

@Override
public CompletableFuture<Void> truncateRegion(byte[] regionName) {
CompletableFuture<Void> future = new CompletableFuture<>();
addListener(getRegionLocation(regionName), (location, err) -> {
if (err != null) {
future.completeExceptionally(err);
return;
}
RegionInfo regionInfo = location.getRegion();
if (regionInfo.getReplicaId() != RegionInfo.DEFAULT_REPLICA_ID) {
future.completeExceptionally(new IllegalArgumentException(
"Can't truncate replicas directly.Replicas are auto-truncated "
+ "when their primary is truncated."));
return;
}
ServerName serverName = location.getServerName();
if (serverName == null) {
future
.completeExceptionally(new NoServerForRegionException(Bytes.toStringBinary(regionName)));
return;
}
addListener(truncateRegion(regionInfo), (ret, err2) -> {
if (err2 != null) {
future.completeExceptionally(err2);
} else {
future.complete(ret);
}
});
});
return future;
}

private CompletableFuture<Void> truncateRegion(final RegionInfo hri) {
CompletableFuture<Void> future = new CompletableFuture<>();
TableName tableName = hri.getTable();
final MasterProtos.TruncateRegionRequest request;
try {
request = RequestConverter.buildTruncateRegionRequest(hri, ng.getNonceGroup(), ng.newNonce());
} catch (DeserializationException e) {
future.completeExceptionally(e);
return future;
}
addListener(this.procedureCall(tableName, request, MasterService.Interface::truncateRegion,
MasterProtos.TruncateRegionResponse::getProcId,
new TruncateRegionProcedureBiConsumer(tableName)), (ret, err2) -> {
if (err2 != null) {
future.completeExceptionally(err2);
} else {
future.complete(ret);
}
});
return future;
}

@Override
public CompletableFuture<Void> assign(byte[] regionName) {
CompletableFuture<Void> future = new CompletableFuture<>();
Expand Down Expand Up @@ -2850,6 +2904,18 @@ String getOperationType() {
}
}

private static class TruncateRegionProcedureBiConsumer extends TableProcedureBiConsumer {

TruncateRegionProcedureBiConsumer(TableName tableName) {
super(tableName);
}

@Override
String getOperationType() {
return "TRUNCATE_REGION";
}
}

private static class SnapshotProcedureBiConsumer extends TableProcedureBiConsumer {
SnapshotProcedureBiConsumer(TableName tableName) {
super(tableName);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -730,6 +730,12 @@ public SplitTableRegionResponse splitRegion(RpcController controller,
return stub.splitRegion(controller, request);
}

@Override
public MasterProtos.TruncateRegionResponse truncateRegion(RpcController controller,
MasterProtos.TruncateRegionRequest request) throws ServiceException {
return stub.truncateRegion(controller, request);
}

@Override
public SwitchRpcThrottleResponse switchRpcThrottle(RpcController controller,
SwitchRpcThrottleRequest request) throws ServiceException {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1187,6 +1187,17 @@ public static SplitTableRegionRequest buildSplitTableRegionRequest(final RegionI
return builder.build();
}

public static MasterProtos.TruncateRegionRequest
buildTruncateRegionRequest(final RegionInfo regionInfo, final long nonceGroup, final long nonce)
throws DeserializationException {
MasterProtos.TruncateRegionRequest.Builder builder =
MasterProtos.TruncateRegionRequest.newBuilder();
builder.setRegionInfo(ProtobufUtil.toRegionInfo(regionInfo));
builder.setNonceGroup(nonceGroup);
builder.setNonce(nonce);
return builder.build();
}

/**
* Create a protocol buffer AssignRegionRequest
* @return an AssignRegionRequest
Expand Down
16 changes: 16 additions & 0 deletions hbase-protocol-shaded/src/main/protobuf/Master.proto
Original file line number Diff line number Diff line change
Expand Up @@ -136,6 +136,16 @@ message SplitTableRegionResponse {
optional uint64 proc_id = 1;
}

message TruncateRegionRequest {
required RegionInfo region_info = 1;
optional uint64 nonce_group = 2 [default = 0];
optional uint64 nonce = 3 [default = 0];
}

message TruncateRegionResponse {
optional uint64 proc_id = 1;
}

message CreateTableRequest {
required TableSchema table_schema = 1;
repeated bytes split_keys = 2;
Expand Down Expand Up @@ -863,6 +873,12 @@ service MasterService {
rpc SplitRegion(SplitTableRegionRequest)
returns(SplitTableRegionResponse);

/**
* Truncate region
*/
rpc TruncateRegion(TruncateRegionRequest)
returns(TruncateRegionResponse);

/** Deletes a table */
rpc DeleteTable(DeleteTableRequest)
returns(DeleteTableResponse);
Expand Down
8 changes: 8 additions & 0 deletions hbase-protocol-shaded/src/main/protobuf/MasterProcedure.proto
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,14 @@ message TruncateTableStateData {
repeated RegionInfo region_info = 5;
}

enum TruncateRegionState {
TRUNCATE_REGION_PRE_OPERATION = 1;
TRUNCATE_REGION_MAKE_OFFLINE = 2;
TRUNCATE_REGION_REMOVE = 3;
TRUNCATE_REGION_MAKE_ONLINE = 4;
TRUNCATE_REGION_POST_OPERATION = 5;
}

enum DeleteTableState {
DELETE_TABLE_PRE_OPERATION = 1;
DELETE_TABLE_REMOVE_FROM_META = 2;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -665,6 +665,46 @@ default void preSplitRegionAction(final ObserverContext<MasterCoprocessorEnviron
final TableName tableName, final byte[] splitRow) throws IOException {
}

/**
* Called before the region is truncated.
* @param c The environment to interact with the framework and master
* @param regionInfo The Region being truncated
*/
@SuppressWarnings("unused")
default void preTruncateRegionAction(final ObserverContext<MasterCoprocessorEnvironment> c,
final RegionInfo regionInfo) {
}

/**
* Called before the truncate region procedure is called.
* @param c The environment to interact with the framework and master
* @param regionInfo The Region being truncated
*/
@SuppressWarnings("unused")
default void preTruncateRegion(final ObserverContext<MasterCoprocessorEnvironment> c,
RegionInfo regionInfo) {
}

/**
* Called after the truncate region procedure is called.
* @param c The environment to interact with the framework and master
* @param regionInfo The Region being truncated
*/
@SuppressWarnings("unused")
default void postTruncateRegion(final ObserverContext<MasterCoprocessorEnvironment> c,
RegionInfo regionInfo) {
}

/**
* Called post the region is truncated.
* @param c The environment to interact with the framework and master
* @param regionInfo The Region To be truncated
*/
@SuppressWarnings("unused")
default void postTruncateRegionAction(final ObserverContext<MasterCoprocessorEnvironment> c,
final RegionInfo regionInfo) {
}

/**
* Called after the region is split.
* @param c the environment to interact with the framework and master
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -163,6 +163,7 @@
import org.apache.hadoop.hbase.master.procedure.ProcedureSyncWait;
import org.apache.hadoop.hbase.master.procedure.ReopenTableRegionsProcedure;
import org.apache.hadoop.hbase.master.procedure.ServerCrashProcedure;
import org.apache.hadoop.hbase.master.procedure.TruncateRegionProcedure;
import org.apache.hadoop.hbase.master.procedure.TruncateTableProcedure;
import org.apache.hadoop.hbase.master.region.MasterRegion;
import org.apache.hadoop.hbase.master.region.MasterRegionFactory;
Expand Down Expand Up @@ -2493,6 +2494,36 @@ protected String getDescription() {
});
}

@Override
public long truncateRegion(final RegionInfo regionInfo, final long nonceGroup, final long nonce)
throws IOException {
checkInitialized();

return MasterProcedureUtil
.submitProcedure(new MasterProcedureUtil.NonceProcedureRunnable(this, nonceGroup, nonce) {
@Override
protected void run() throws IOException {
getMaster().getMasterCoprocessorHost().preTruncateRegion(regionInfo);

LOG.info(
getClientIdAuditPrefix() + " truncate region " + regionInfo.getRegionNameAsString());

// Execute the operation asynchronously
ProcedurePrepareLatch latch = ProcedurePrepareLatch.createLatch(2, 0);
submitProcedure(
new TruncateRegionProcedure(procedureExecutor.getEnvironment(), regionInfo, latch));
latch.await();

getMaster().getMasterCoprocessorHost().postTruncateRegion(regionInfo);
}

@Override
protected String getDescription() {
return "TruncateRegionProcedure";
}
});
}

@Override
public long addColumn(final TableName tableName, final ColumnFamilyDescriptor column,
final long nonceGroup, final long nonce) throws IOException {
Expand Down

0 comments on commit a208ba2

Please sign in to comment.