Skip to content

Commit

Permalink
speed up hbase truncate with disableTableAsync() (#868)
Browse files Browse the repository at this point in the history
Optimized hbase clear() cost time from 19s to 6s

improve: #770
Change-Id: Ie4da8b77a4ce63d388322c838415870e7225ae21
  • Loading branch information
javeme committed Feb 27, 2020
1 parent 4f46a85 commit 9b0290c
Show file tree
Hide file tree
Showing 2 changed files with 90 additions and 30 deletions.
Expand Up @@ -42,6 +42,7 @@
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.Size;
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.TableNotDisabledException;
import org.apache.hadoop.hbase.TableNotEnabledException;
import org.apache.hadoop.hbase.client.Admin;
import org.apache.hadoop.hbase.client.ColumnFamilyDescriptorBuilder;
Expand Down Expand Up @@ -74,6 +75,7 @@
import com.baidu.hugegraph.util.E;
import com.baidu.hugegraph.util.StringEncoding;
import com.baidu.hugegraph.util.VersionUtil;
import com.google.common.util.concurrent.Futures;

public class HbaseSessions extends BackendSessionPool {

Expand Down Expand Up @@ -147,7 +149,7 @@ protected synchronized void doClose() {
}

public boolean existsNamespace() throws IOException {
try(Admin admin = this.hbase.getAdmin()) {
try (Admin admin = this.hbase.getAdmin()) {
for (NamespaceDescriptor ns : admin.listNamespaceDescriptors()) {
if (this.namespace.equals(ns.getName())) {
return true;
Expand All @@ -160,20 +162,20 @@ public boolean existsNamespace() throws IOException {
public void createNamespace() throws IOException {
NamespaceDescriptor ns = NamespaceDescriptor.create(this.namespace)
.build();
try(Admin admin = this.hbase.getAdmin()) {
try (Admin admin = this.hbase.getAdmin()) {
admin.createNamespace(ns);
}
}

public void dropNamespace() throws IOException {
try(Admin admin = this.hbase.getAdmin()) {
try (Admin admin = this.hbase.getAdmin()) {
admin.deleteNamespace(this.namespace);
}
}

public boolean existsTable(String table) throws IOException {
TableName tableName = TableName.valueOf(this.namespace, table);
try(Admin admin = this.hbase.getAdmin()) {
try (Admin admin = this.hbase.getAdmin()) {
return admin.tableExists(tableName);
}
}
Expand All @@ -185,14 +187,14 @@ public void createTable(String table, List<byte[]> cfs) throws IOException {
tb.setColumnFamily(ColumnFamilyDescriptorBuilder.newBuilder(cf)
.build());
}
try(Admin admin = this.hbase.getAdmin()) {
try (Admin admin = this.hbase.getAdmin()) {
admin.createTable(tb.build());
}
}

public void dropTable(String table) throws IOException {
TableName tableName = TableName.valueOf(this.namespace, table);
try(Admin admin = this.hbase.getAdmin()) {
try (Admin admin = this.hbase.getAdmin()) {
try {
admin.disableTable(tableName);
} catch (TableNotEnabledException ignored) {
Expand All @@ -202,22 +204,45 @@ public void dropTable(String table) throws IOException {
}
}

public Future<Void> truncateTable(String table) throws IOException {
public void enableTable(String table) throws IOException {
assert this.existsTable(table);
TableName tableName = TableName.valueOf(this.namespace, table);
try(Admin admin = this.hbase.getAdmin()) {
try (Admin admin = this.hbase.getAdmin()) {
if (admin.isTableEnabled(tableName)) {
return;
}
try {
admin.disableTable(tableName);
} catch (TableNotEnabledException ignored) {
admin.enableTable(tableName);
} catch (TableNotDisabledException ignored) {
// pass
}
}
}

public Future<Void> disableTableAsync(String table) throws IOException {
assert this.existsTable(table);
TableName tableName = TableName.valueOf(this.namespace, table);
try (Admin admin = this.hbase.getAdmin()) {
try {
return admin.disableTableAsync(tableName);
} catch (TableNotEnabledException ignored) {
// Ignore if it's disabled
return Futures.immediateFuture(null);
}
}
}

public Future<Void> truncateTableAsync(String table) throws IOException {
assert this.existsTable(table);
TableName tableName = TableName.valueOf(this.namespace, table);
try (Admin admin = this.hbase.getAdmin()) {
return admin.truncateTableAsync(tableName, true);
}
}

public long storeSize(String table) throws IOException {
long total = 0;
try(Admin admin = this.hbase.getAdmin()) {
try (Admin admin = this.hbase.getAdmin()) {
for (ServerName rs : admin.getRegionServers()) {
// NOTE: we can use getLoad() before hbase 2.0
//ServerLoad load = admin.getClusterStatus().getLoad(rs);
Expand Down
Expand Up @@ -24,16 +24,14 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;
import java.util.function.BiFunction;
import java.util.stream.Collectors;

import org.apache.hadoop.hbase.NamespaceExistException;
import org.apache.hadoop.hbase.TableExistsException;
import org.apache.hadoop.hbase.TableNotFoundException;
import org.apache.hadoop.hbase.client.Connection;
import org.slf4j.Logger;

import com.baidu.hugegraph.backend.BackendException;
Expand Down Expand Up @@ -237,7 +235,7 @@ public void init() {
// Ignore due to both schema & graph store would create namespace
} catch (IOException e) {
throw new BackendException(
"Failed to create namespace '%s' for '%s'",
"Failed to create namespace '%s' for '%s' store",
e, this.namespace, this.store);
}

Expand All @@ -249,7 +247,7 @@ public void init() {
continue;
} catch (IOException e) {
throw new BackendException(
"Failed to create table '%s' for '%s'",
"Failed to create table '%s' for '%s' store",
e, table, this.store);
}
}
Expand Down Expand Up @@ -278,11 +276,11 @@ public void clear(boolean clearSpace) {
try {
this.sessions.dropTable(table);
} catch (TableNotFoundException e) {
LOG.warn("The table '{}' for '{}' does not exist " +
LOG.warn("The table '{}' of '{}' store does not exist " +
"when trying to drop", table, this.store);
} catch (IOException e) {
throw new BackendException(
"Failed to drop table '%s' for '%s'",
"Failed to drop table '%s' of '%s' store",
e, table, this.store);
}
}
Expand All @@ -297,7 +295,7 @@ public void clear(boolean clearSpace) {
this.namespace, e);
} else {
throw new BackendException(
"Failed to drop namespace '%s' for '%s'",
"Failed to drop namespace '%s' of '%s' store",
e, this.namespace, this.store);
}
}
Expand Down Expand Up @@ -329,31 +327,68 @@ public boolean initialized() {
public void truncate() {
this.checkOpened();

// Total time may cost 3 * TRUNCATE_TIMEOUT, due to there are 3 stores
long timeout = this.sessions.config().get(HbaseOptions.TRUNCATE_TIMEOUT);
long start = System.currentTimeMillis();

BiFunction<String, Future<Void>, Void> wait = (table, future) -> {
long elapsed = System.currentTimeMillis() - start;
long remainingTime = timeout - elapsed / 1000L;
try {
return future.get(remainingTime, TimeUnit.SECONDS);
} catch (Exception e) {
throw new BackendException(
"Error when truncating table '%s' of '%s' store: %s",
table, this.store, e.toString());
}
};

// Truncate tables
List<String> tables = this.tableNames();
Map<String, Future<Void>> futures = new HashMap<>(tables.size());
String currentTable = null;

try {
// Disable tables async
for (String table : tables) {
futures.put(table, this.sessions.disableTableAsync(table));
}
for (Map.Entry<String, Future<Void>> entry : futures.entrySet()) {
wait.apply(entry.getKey(), entry.getValue());
}
} catch (Exception e) {
this.enableTables();
throw new BackendException(
"Failed to disable table for '%s' store", e, this.store);
}

try {
// Truncate tables async
for (String table : tables) {
currentTable = table;
futures.put(table, this.sessions.truncateTable(table));
futures.put(table, this.sessions.truncateTableAsync(table));
}
long timeout = this.sessions.config()
.get(HbaseOptions.TRUNCATE_TIMEOUT);
for (Map.Entry<String, Future<Void>> entry : futures.entrySet()) {
currentTable = entry.getKey();
entry.getValue().get(timeout, TimeUnit.SECONDS);
wait.apply(entry.getKey(), entry.getValue());
}
} catch (IOException | InterruptedException |
ExecutionException | TimeoutException e) {
} catch (Exception e) {
this.enableTables();
throw new BackendException(
"Failed to truncate table '%s' for '%s'",
e, currentTable, this.store);
"Failed to truncate table for '%s' store", e, this.store);
}

LOG.debug("Store truncated: {}", this.store);
}

private void enableTables() {
for (String table : this.tableNames()) {
try {
this.sessions.enableTable(table);
} catch (Exception e) {
LOG.warn("Failed to enable table '{}' of '{}' store",
table, this.store, e);
}
}
}

@Override
public void beginTx() {
// pass
Expand Down

0 comments on commit 9b0290c

Please sign in to comment.