Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

HBASE-24055 Make AsyncFSWAL can run on EC cluster #1437

Merged
merged 2 commits into from
Apr 7, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -17,8 +17,6 @@
*/
package org.apache.hadoop.hbase.io.asyncfs;

import static org.apache.hadoop.fs.CreateFlag.CREATE;
import static org.apache.hadoop.fs.CreateFlag.OVERWRITE;
import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.createEncryptor;
import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.trySaslNegotiate;
import static org.apache.hadoop.hdfs.client.HdfsClientConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
Expand Down Expand Up @@ -177,6 +175,11 @@ Object createObject(ClientProtocol instance, String src, FsPermission masked, St

private static final FileCreator FILE_CREATOR;

// CreateFlag.SHOULD_REPLICATE is to make OutputStream on a EC directory support hflush/hsync, but
// EC is introduced in hadoop 3.x so we do not have this enum on 2.x, that's why we need to
// indirectly reference it through reflection.
private static final CreateFlag SHOULD_REPLICATE_FLAG;

private static DFSClientAdaptor createDFSClientAdaptor() throws NoSuchMethodException {
Method isClientRunningMethod = DFSClient.class.getDeclaredMethod("isClientRunning");
isClientRunningMethod.setAccessible(true);
Expand Down Expand Up @@ -272,6 +275,15 @@ private static FileCreator createFileCreator() throws NoSuchMethodException {
return createFileCreator2();
}

private static CreateFlag loadShouldReplicateFlag() {
try {
return CreateFlag.valueOf("SHOULD_REPLICATE");
} catch (IllegalArgumentException e) {
LOG.debug("can not find SHOULD_REPLICATE flag, should be hadoop 2.x", e);
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be logged at warn instead?

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I used to make all the adaptor code to log as WARN but it was proven to be annoying for most users and we changed them all to debug...

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This happens every time we open a file in hadoop2? Or just once on class loading? (Looks like the latter).

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is how we test if the the SHOULD_REPLICATE flag is available? It is not available if hadoop2? If so, can we have a comment on the method to this effect? The log is a little confusing. Suggest: "SHOULD_REPLICATE is not available; this is a problem if we are running on hadoop3 (Its expected if hadoop2)".

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Only once when loading.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All the other log messages are just like this, so do not want to only change this one. Let me add some comments on the field.

return null;
}
}

// cancel the processing if DFSClient is already closed.
static final class CancelOnClose implements CancelableProgressable {

Expand All @@ -292,6 +304,7 @@ public boolean progress() {
LEASE_MANAGER = createLeaseManager();
DFS_CLIENT_ADAPTOR = createDFSClientAdaptor();
FILE_CREATOR = createFileCreator();
SHOULD_REPLICATE_FLAG = loadShouldReplicateFlag();
} catch (Exception e) {
String msg = "Couldn't properly initialize access to HDFS internals. Please " +
"update your WAL Provider to not make use of the 'asyncfs' provider. See " +
Expand Down Expand Up @@ -486,6 +499,18 @@ public NameNodeException(Throwable cause) {
}
}

private static EnumSetWritable<CreateFlag> getCreateFlags(boolean overwrite) {
List<CreateFlag> flags = new ArrayList<>();
flags.add(CreateFlag.CREATE);
if (overwrite) {
flags.add(CreateFlag.OVERWRITE);
}
if (SHOULD_REPLICATE_FLAG != null) {
flags.add(SHOULD_REPLICATE_FLAG);
}
return new EnumSetWritable<>(EnumSet.copyOf(flags));
}

private static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem dfs, String src,
boolean overwrite, boolean createParent, short replication, long blockSize,
EventLoopGroup eventLoopGroup, Class<? extends Channel> channelClass) throws IOException {
Expand All @@ -502,8 +527,8 @@ private static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem d
try {
stat = FILE_CREATOR.create(namenode, src,
FsPermission.getFileDefault().applyUMask(FsPermission.getUMask(conf)), clientName,
new EnumSetWritable<>(overwrite ? EnumSet.of(CREATE, OVERWRITE) : EnumSet.of(CREATE)),
createParent, replication, blockSize, CryptoProtocolVersion.supported());
getCreateFlags(overwrite), createParent, replication, blockSize,
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nice cleanup.

CryptoProtocolVersion.supported());
} catch (Exception e) {
if (e instanceof RemoteException) {
throw (RemoteException) e;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,9 @@

import static org.apache.hadoop.hdfs.DFSConfigKeys.DFS_CLIENT_SOCKET_TIMEOUT_KEY;
import static org.hamcrest.CoreMatchers.instanceOf;
import static org.hamcrest.MatcherAssert.assertThat;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertThat;
import static org.junit.Assert.fail;

import java.io.FileNotFoundException;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,11 @@

import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertTrue;

import java.io.IOException;
import java.lang.reflect.Method;
import java.util.Arrays;
import java.util.List;
import org.apache.hadoop.fs.FSDataOutputStream;
import org.apache.hadoop.fs.Path;
import org.apache.hadoop.fs.StreamCapabilities;
Expand All @@ -34,40 +35,49 @@
import org.apache.hadoop.hbase.client.Put;
import org.apache.hadoop.hbase.client.Table;
import org.apache.hadoop.hbase.testclassification.LargeTests;
import org.apache.hadoop.hbase.testclassification.RegionServerTests;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.hadoop.hbase.util.CommonFSUtils;
import org.apache.hadoop.hbase.wal.WALFactory;
import org.apache.hadoop.hdfs.DFSClient;
import org.apache.hadoop.hdfs.DFSTestUtil;
import org.apache.hadoop.hdfs.DistributedFileSystem;
import org.apache.hadoop.hdfs.MiniDFSCluster;
import org.junit.AfterClass;
import org.junit.After;
import org.junit.Assume;
import org.junit.Before;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.experimental.categories.Category;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.runners.Parameterized.Parameter;
import org.junit.runners.Parameterized.Parameters;

@Category(LargeTests.class)
@RunWith(Parameterized.class)
@Category({ RegionServerTests.class, LargeTests.class })
public class TestHBaseWalOnEC {

@ClassRule
public static final HBaseClassTestRule CLASS_RULE =
HBaseClassTestRule.forClass(TestHBaseWalOnEC.class);
HBaseClassTestRule.forClass(TestHBaseWalOnEC.class);

private static final HBaseTestingUtility util = new HBaseTestingUtility();
private static final HBaseTestingUtility UTIL = new HBaseTestingUtility();

@BeforeClass
public static void setup() throws Exception {
public static void setUpBeforeClass() throws Exception {
try {
MiniDFSCluster cluster = util.startMiniDFSCluster(3); // Need 3 DNs for RS-3-2 policy
MiniDFSCluster cluster = UTIL.startMiniDFSCluster(3); // Need 3 DNs for RS-3-2 policy
DistributedFileSystem fs = cluster.getFileSystem();

Method enableAllECPolicies = DFSTestUtil.class.getMethod("enableAllECPolicies",
DistributedFileSystem.class);
Method enableAllECPolicies =
DFSTestUtil.class.getMethod("enableAllECPolicies", DistributedFileSystem.class);
enableAllECPolicies.invoke(null, fs);

DFSClient client = fs.getClient();
Method setErasureCodingPolicy = DFSClient.class.getMethod("setErasureCodingPolicy",
String.class, String.class);
Method setErasureCodingPolicy =
DFSClient.class.getMethod("setErasureCodingPolicy", String.class, String.class);
setErasureCodingPolicy.invoke(client, "/", "RS-3-2-1024k"); // try a built-in policy

try (FSDataOutputStream out = fs.create(new Path("/canary"))) {
Expand All @@ -80,38 +90,43 @@ public static void setup() throws Exception {
Assume.assumeNoException("Using an older version of hadoop; EC not available.", e);
}

util.getConfiguration().setBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, true);
util.startMiniCluster();
UTIL.getConfiguration().setBoolean(CommonFSUtils.UNSAFE_STREAM_CAPABILITY_ENFORCE, true);

}

@AfterClass
public static void tearDown() throws Exception {
util.shutdownMiniCluster();
@Parameter
public String walProvider;

@Parameters
public static List<Object[]> params() {
return Arrays.asList(new Object[] { "asyncfs" }, new Object[] { "filesystem" });
}

@Test
public void testStreamCreate() throws IOException {
try (FSDataOutputStream out = CommonFSUtils.createForWal(util.getDFSCluster().getFileSystem(),
new Path("/testStreamCreate"), true)) {
assertTrue(out.hasCapability(StreamCapabilities.HFLUSH));
}
@Before
public void setUp() throws Exception {
UTIL.getConfiguration().set(WALFactory.WAL_PROVIDER, walProvider);
UTIL.startMiniCluster(3);
}

@After
public void tearDown() throws Exception {
UTIL.shutdownMiniCluster();
}

@Test
public void testFlush() throws IOException {
public void testReadWrite() throws IOException {
byte[] row = Bytes.toBytes("row");
byte[] cf = Bytes.toBytes("cf");
byte[] cq = Bytes.toBytes("cq");
byte[] value = Bytes.toBytes("value");

TableName name = TableName.valueOf(getClass().getSimpleName());

Table t = util.createTable(name, cf);
Table t = UTIL.createTable(name, cf);
t.put(new Put(row).addColumn(cf, cq, value));

util.getAdmin().flush(name);
UTIL.getAdmin().flush(name);

assertArrayEquals(value, t.get(new Get(row)).getValue(cf, cq));
}
}