Skip to content

Commit

Permalink
more
Browse files Browse the repository at this point in the history
  • Loading branch information
bbeaudreault committed Jan 14, 2024
1 parent 91be2cf commit 6fd0cd4
Show file tree
Hide file tree
Showing 3 changed files with 126 additions and 27 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.fs;

import java.io.IOException;
import java.lang.reflect.InvocationTargetException;
import java.util.Collection;
import java.util.Objects;
import java.util.stream.Collectors;
Expand All @@ -31,8 +32,8 @@
import org.apache.hadoop.hbase.TableName;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.util.ReflectionUtils;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicyInfo;
import org.apache.yetus.audience.InterfaceAudience;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
Expand Down Expand Up @@ -85,26 +86,59 @@ private static Path createTempDir(FileSystem fs, Path tempDir) throws HBaseIOExc
return currentTempDir;
}

private static void checkAvailable(DistributedFileSystem dfs, String policy)
private static void checkAvailable(DistributedFileSystem dfs, String requestedPolicy)
throws HBaseIOException {
Collection<ErasureCodingPolicyInfo> policies;
Collection<Object> policies;

try {
policies = dfs.getAllErasureCodingPolicies();
policies = callDfsMethod(dfs, "getAllErasureCodingPolicies");
} catch (IOException e) {
throw new HBaseIOException("Failed to check for Erasure Coding policy: " + policy, e);
throw new HBaseIOException("Failed to check for Erasure Coding policy: " + requestedPolicy, e);
}
for (ErasureCodingPolicyInfo policyInfo : policies) {
if (policyInfo.getPolicy().getName().equals(policy)) {
if (!policyInfo.isEnabled()) {
throw new DoNotRetryIOException("Cannot set Erasure Coding policy: " + policy
+ ". The policy must be enabled, but has state " + policyInfo.getState());
}
for (Object policyInfo : policies) {
if (checkPolicyMatch(policyInfo, requestedPolicy)) {
return;
}
}
throw new DoNotRetryIOException(
"Cannot set Erasure Coding policy: " + policy + ". Policy not found. Available policies are: "
+ policies.stream().map(p -> p.getPolicy().getName()).collect(Collectors.joining(", ")));
"Cannot set Erasure Coding policy: " + requestedPolicy + ". Policy not found. Available policies are: "
+ getPolicyNames(policies));
}

private static boolean checkPolicyMatch(Object policyInfo, String requestedPolicy)
throws DoNotRetryIOException {
try {
String policyName = getPolicyNameFromInfo(policyInfo);
if (requestedPolicy.equals(policyName)) {
boolean isEnabled = callObjectMethod(policyInfo, "isEnabled");
if (!isEnabled) {
throw new DoNotRetryIOException("Cannot set Erasure Coding policy: " + requestedPolicy + ". The policy must be enabled, but has state " + callObjectMethod(policyInfo,
"getState"));
}
return true;
}
} catch (DoNotRetryIOException e) {
throw e;
} catch (IOException e) {
throw new DoNotRetryIOException("Unable to check for match of Erasure Coding Policy " + policyInfo, e);
}
return false;
}

private static String getPolicyNameFromInfo(Object policyInfo) throws IOException {
Object policy = callObjectMethod(policyInfo, "getPolicy");
return callObjectMethod(policy, "getName");
}

private static String getPolicyNames(Collection<Object> policyInfos) {
return policyInfos.stream().map(p -> {
try {
return getPolicyNameFromInfo(p);
} catch (IOException e) {
LOG.warn("Could not extract policy name from {}", p, e);
return "unknown";
}
}).collect(Collectors.joining(", "));
}

/**
Expand Down Expand Up @@ -146,7 +180,7 @@ public static void setPolicy(FileSystem fs, Path rootDir, TableName tableName, S
* Sets the EC policy on the path
*/
public static void setPolicy(FileSystem fs, Path path, String policy) throws IOException {
getDfs(fs).setErasureCodingPolicy(path, policy);
callDfsMethod(getDfs(fs), "setErasureCodingPolicy", path, policy);
}

/**
Expand All @@ -156,11 +190,15 @@ public static void unsetPolicy(FileSystem fs, Path rootDir, TableName tableName)
throws IOException {
DistributedFileSystem dfs = getDfs(fs);
Path path = CommonFSUtils.getTableDir(rootDir, tableName);
if (dfs.getErasureCodingPolicy(path) == null) {
if (getPolicyNameForPath(dfs, path) == null) {
LOG.warn("No EC policy set for path {}, nothing to unset", path);
return;
}
dfs.unsetErasureCodingPolicy(path);
callDfsMethod(dfs, "unsetErasureCodingPolicy", path);
}

public static void enablePolicy(FileSystem fs, String policy) throws IOException {
callDfsMethod(getDfs(fs), "enableErasureCodingPolicy", policy);
}

private static DistributedFileSystem getDfs(Configuration conf) throws HBaseIOException {
Expand All @@ -182,4 +220,43 @@ private static DistributedFileSystem getDfs(FileSystem fs) throws DoNotRetryIOEx
}
return (DistributedFileSystem) fs;
}

public static String getPolicyNameForPath(DistributedFileSystem dfs, Path path)
throws IOException {
Object policy = callDfsMethod(dfs, "getErasureCodingPolicy", path);
if (policy == null) {
return null;
}
return callObjectMethod(policy, "getName");
}

private interface ThrowingObjectSupplier {
Object run() throws IOException;
}

private static <T> T callDfsMethod(DistributedFileSystem dfs, String name, Object... params)
throws IOException {
return callObjectMethod(dfs, name, params);
}

private static <T> T callObjectMethod(Object object, String name, Object... params)
throws IOException {
return unwrapInvocationException(() -> ReflectionUtils.invokeMethod(object, name, params));
}

private static <T> T unwrapInvocationException(ThrowingObjectSupplier runnable) throws IOException {
try {
return (T) runnable.run();
} catch (UnsupportedOperationException e) {
if (e.getCause() instanceof InvocationTargetException) {
Throwable cause = e.getCause().getCause();
if (cause instanceof IOException) {
throw (IOException) cause;
} else if (cause instanceof RuntimeException) {
throw (RuntimeException) cause;
}
}
throw e;
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -21,9 +21,9 @@
import static org.hamcrest.Matchers.containsString;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThan;
import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assert.assertThrows;
import static org.junit.Assume.*;

import java.io.IOException;
import java.util.function.Function;
Expand All @@ -40,6 +40,7 @@
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.client.TableDescriptor;
import org.apache.hadoop.hbase.client.TableDescriptorBuilder;
import org.apache.hadoop.hbase.fs.ErasureCodingUtils;
import org.apache.hadoop.hbase.io.hfile.HFile;
import org.apache.hadoop.hbase.regionserver.CompactedHFilesDischarger;
import org.apache.hadoop.hbase.regionserver.HRegion;
Expand All @@ -50,8 +51,8 @@
import org.apache.hadoop.hbase.util.JVMClusterUtil;
import org.apache.hadoop.hbase.util.TableDescriptorChecker;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.protocol.ErasureCodingPolicy;
import org.junit.AfterClass;
import org.junit.Assume;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
Expand All @@ -77,6 +78,7 @@ public class TestManageTableErasureCodingPolicy {
private static final TableDescriptor EC_TABLE_DESC =
TableDescriptorBuilder.newBuilder(EC_TABLE).setErasureCodingPolicy("XOR-2-1-1024k")
.setColumnFamily(ColumnFamilyDescriptorBuilder.of(FAMILY)).build();
private static boolean erasureCodingSupported;

@BeforeClass
public static void beforeClass() throws Exception {
Expand All @@ -85,13 +87,25 @@ public static void beforeClass() throws Exception {
UTIL.startMiniDFSCluster(3); // 3 necessary for XOR-2-1-1024k
UTIL.startMiniCluster(1);
DistributedFileSystem fs = (DistributedFileSystem) FileSystem.get(UTIL.getConfiguration());
fs.enableErasureCodingPolicy("XOR-2-1-1024k");
fs.enableErasureCodingPolicy("RS-6-3-1024k");

erasureCodingSupported = enableErasureCoding(fs);

Table table = UTIL.createTable(NON_EC_TABLE_DESC, null);
UTIL.loadTable(table, FAMILY);
UTIL.flush();
}

private static boolean enableErasureCoding(DistributedFileSystem fs) throws IOException {
try {
ErasureCodingUtils.enablePolicy(fs, "XOR-2-1-1024k");
ErasureCodingUtils.enablePolicy(fs, "RS-6-3-1024k");
return true;
} catch (UnsupportedOperationException e) {
LOG.info("Current hadoop version does not support erasure coding, only validation tests will run.");
return false;
}
}

@AfterClass
public static void afterClass() throws Exception {
UTIL.shutdownMiniCluster();
Expand Down Expand Up @@ -154,6 +168,7 @@ private void runValidatePolicyNameTest(Function<Admin, TableDescriptor> descript

@Test
public void testCreateTableErasureCodingSync() throws IOException {
assumeTrue(erasureCodingSupported);
try (Admin admin = UTIL.getAdmin()) {
recreateTable(admin, EC_TABLE_DESC);
UTIL.flush(EC_TABLE);
Expand All @@ -175,7 +190,8 @@ private void recreateTable(Admin admin, TableDescriptor desc) throws IOException
}

@Test
public void testModifyTableErasureCodingSync() throws IOException, InterruptedException {
public void testModifyTableErasureCodingSync() throws IOException {
assumeTrue(erasureCodingSupported);
try (Admin admin = UTIL.getAdmin()) {
Path rootDir = CommonFSUtils.getRootDir(UTIL.getConfiguration());
DistributedFileSystem dfs = (DistributedFileSystem) FileSystem.get(UTIL.getConfiguration());
Expand Down Expand Up @@ -233,6 +249,7 @@ private void compactAwayOldFiles(TableName tableName) throws IOException {

@Test
public void testRestoreSnapshot() throws IOException {
assumeTrue(erasureCodingSupported);
String snapshotName = "testRestoreSnapshot_snap";
TableName tableName = TableName.valueOf("testRestoreSnapshot_tbl");
try (Admin admin = UTIL.getAdmin()) {
Expand Down Expand Up @@ -291,12 +308,11 @@ private void checkRegionDirAndFilePolicies(DistributedFileSystem dfs, Path rootD

private void checkPolicy(DistributedFileSystem dfs, Path path, String expectedPolicy)
throws IOException {
ErasureCodingPolicy policy = dfs.getErasureCodingPolicy(path);
String policy = ErasureCodingUtils.getPolicyNameForPath(dfs, path);
if (expectedPolicy == null) {
assertThat("policy for " + path, policy, nullValue());
} else {
assertThat("policy for " + path, policy, notNullValue());
assertThat("policy for " + path, policy.getName(), equalTo(expectedPolicy));
assertThat("policy for " + path, policy, equalTo(expectedPolicy));
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
import org.apache.hadoop.hbase.HBaseTestingUtility;
import org.apache.hadoop.hbase.HConstants;
import org.apache.hadoop.hbase.coprocessor.CoprocessorHost;
import org.apache.hadoop.hbase.fs.ErasureCodingUtils;
import org.apache.hadoop.hbase.security.access.SecureTestUtil;
import org.apache.hadoop.hbase.security.visibility.VisibilityTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
Expand All @@ -43,6 +44,8 @@ public abstract class AbstractTestShell {
protected final static HBaseTestingUtility TEST_UTIL = new HBaseTestingUtility();
protected final static ScriptingContainer jruby = new ScriptingContainer();

protected static boolean erasureCodingSupported = false;

protected static void setUpConfig() throws IOException {
Configuration conf = TEST_UTIL.getConfiguration();
conf.setInt("hbase.regionserver.msginterval", 100);
Expand Down Expand Up @@ -103,9 +106,12 @@ public static void setUpBeforeClass() throws Exception {
// Start mini cluster
// 3 datanodes needed for erasure coding checks
TEST_UTIL.startMiniCluster(3);
DistributedFileSystem dfs =
(DistributedFileSystem) FileSystem.get(TEST_UTIL.getConfiguration());
dfs.enableErasureCodingPolicy("XOR-2-1-1024k");
try {
ErasureCodingUtils.enablePolicy(FileSystem.get(TEST_UTIL.getConfiguration()), "XOR-2-1-1024k");
erasureCodingSupported = true;
} catch (UnsupportedOperationException e) {
LOG.info("Current hadoop version does not support erasure coding, only validation tests will run.");
}

setUpJRubyRuntime();
}
Expand Down

0 comments on commit 6fd0cd4

Please sign in to comment.