Skip to content

Commit

Permalink
HBASE-28061 HBaseTestingUtility failed to start MiniHbaseCluster in c…
Browse files Browse the repository at this point in the history
…ase of Hadoop3.3.1 (#5401)

Co-authored-by: Butao Zhang <butaozhang1@163.com>
Signed-off-by: Xin Sun <ddupgs@gmail.com>
  • Loading branch information
Apache9 and zhangbutao committed Sep 15, 2023
1 parent 2892208 commit ff2c10c
Show file tree
Hide file tree
Showing 9 changed files with 101 additions and 30 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.completeFile;
import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.endFileLease;
import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputHelper.getStatus;
import static org.apache.hadoop.hbase.util.LocatedBlockHelper.getLocatedBlockLocations;
import static org.apache.hadoop.hbase.util.NettyFutureUtils.consume;
import static org.apache.hadoop.hbase.util.NettyFutureUtils.safeWrite;
import static org.apache.hadoop.hbase.util.NettyFutureUtils.safeWriteAndFlush;
Expand Down Expand Up @@ -364,7 +365,7 @@ private void setupReceiver(int timeoutMs) {
this.clientName = clientName;
this.src = src;
this.block = locatedBlock.getBlock();
this.locations = locatedBlock.getLocations();
this.locations = getLocatedBlockLocations(locatedBlock);
this.encryptor = encryptor;
this.datanodeInfoMap = datanodeInfoMap;
this.summer = summer;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@

import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.createEncryptor;
import static org.apache.hadoop.hbase.io.asyncfs.FanOutOneBlockAsyncDFSOutputSaslHelper.trySaslNegotiate;
import static org.apache.hadoop.hbase.util.LocatedBlockHelper.getLocatedBlockLocations;
import static org.apache.hadoop.hbase.util.NettyFutureUtils.addListener;
import static org.apache.hadoop.hbase.util.NettyFutureUtils.safeClose;
import static org.apache.hadoop.hbase.util.NettyFutureUtils.safeWriteAndFlush;
Expand Down Expand Up @@ -383,7 +384,7 @@ private static List<Future<Channel>> connectToDataNodes(Configuration conf, DFSC
BlockConstructionStage stage, DataChecksum summer, EventLoopGroup eventLoopGroup,
Class<? extends Channel> channelClass) {
StorageType[] storageTypes = locatedBlock.getStorageTypes();
DatanodeInfo[] datanodeInfos = locatedBlock.getLocations();
DatanodeInfo[] datanodeInfos = getLocatedBlockLocations(locatedBlock);
boolean connectToDnViaHostname =
conf.getBoolean(DFS_CLIENT_USE_DN_HOSTNAME, DFS_CLIENT_USE_DN_HOSTNAME_DEFAULT);
int timeoutMs = conf.getInt(DFS_CLIENT_SOCKET_TIMEOUT_KEY, READ_TIMEOUT);
Expand Down Expand Up @@ -495,7 +496,7 @@ private static FanOutOneBlockAsyncDFSOutput createOutput(DistributedFileSystem d
futureList = connectToDataNodes(conf, client, clientName, locatedBlock, 0L, 0L,
PIPELINE_SETUP_CREATE, summer, eventLoopGroup, channelClass);
for (int i = 0, n = futureList.size(); i < n; i++) {
DatanodeInfo datanodeInfo = locatedBlock.getLocations()[i];
DatanodeInfo datanodeInfo = getLocatedBlockLocations(locatedBlock)[i];
try {
datanodes.put(futureList.get(i).syncUninterruptibly().getNow(), datanodeInfo);
} catch (Exception e) {
Expand Down
Original file line number Diff line number Diff line change
@@ -0,0 +1,57 @@
/*
* Licensed to the Apache Software Foundation (ASF) under one
* or more contributor license agreements. See the NOTICE file
* distributed with this work for additional information
* regarding copyright ownership. The ASF licenses this file
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.apache.hadoop.hbase.util;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import org.apache.hadoop.hdfs.protocol.DatanodeInfo;
import org.apache.hadoop.hdfs.protocol.LocatedBlock;
import org.apache.yetus.audience.InterfaceAudience;

/**
* hadoop 3.3.1 changed the return value of this method from {@code DatanodeInfo[]} to
* {@code DatanodeInfoWithStorage[]}, which causes the JVM can not locate the method if we are
* compiled with hadoop 3.2 and then link with hadoop 3.3+, so here we need to use reflection to
* make it work for both hadoop versions, otherwise we need to publish more artifacts for different
* hadoop versions...
*/
@InterfaceAudience.Private
public final class LocatedBlockHelper {

private static final Method GET_LOCATED_BLOCK_LOCATIONS_METHOD;

static {
try {
GET_LOCATED_BLOCK_LOCATIONS_METHOD = LocatedBlock.class.getMethod("getLocations");
} catch (Exception e) {
throw new Error("Can not initialize access to HDFS LocatedBlock.getLocations method", e);
}
}

private LocatedBlockHelper() {
}

public static DatanodeInfo[] getLocatedBlockLocations(LocatedBlock block) {
try {
// DatanodeInfoWithStorage[] can be casted to DatanodeInfo[] directly
return (DatanodeInfo[]) GET_LOCATED_BLOCK_LOCATIONS_METHOD.invoke(block);
} catch (IllegalAccessException | InvocationTargetException e) {
throw new RuntimeException(e);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hbase.fs;

import static org.apache.hadoop.hbase.util.LocatedBlockHelper.getLocatedBlockLocations;

import edu.umd.cs.findbugs.annotations.Nullable;
import java.io.Closeable;
import java.io.IOException;
Expand Down Expand Up @@ -425,7 +427,7 @@ public void reorderBlocks(Configuration conf, LocatedBlocks lbs, String src)

// Just check for all blocks
for (LocatedBlock lb : lbs.getLocatedBlocks()) {
DatanodeInfo[] dnis = lb.getLocations();
DatanodeInfo[] dnis = getLocatedBlockLocations(lb);
if (dnis != null && dnis.length > 1) {
boolean found = false;
for (int i = 0; i < dnis.length - 1 && !found; i++) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@
*/
package org.apache.hadoop.hbase.util;

import static org.apache.hadoop.hbase.util.LocatedBlockHelper.getLocatedBlockLocations;
import static org.apache.hadoop.hdfs.protocol.HdfsConstants.SafeModeAction.SAFEMODE_GET;

import edu.umd.cs.findbugs.annotations.CheckForNull;
Expand Down Expand Up @@ -691,7 +692,7 @@ public static boolean metaRegionExists(FileSystem fs, Path rootDir) throws IOExc
}

private static String[] getHostsForLocations(LocatedBlock block) {
DatanodeInfo[] locations = block.getLocations();
DatanodeInfo[] locations = getLocatedBlockLocations(block);
String[] hosts = new String[locations.length];
for (int i = 0; i < hosts.length; i++) {
hosts[i] = locations[i].getHostName();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hbase.fs;

import static org.apache.hadoop.hbase.util.LocatedBlockHelper.getLocatedBlockLocations;

import java.lang.reflect.InvocationTargetException;
import java.lang.reflect.Method;
import java.net.BindException;
Expand Down Expand Up @@ -160,8 +162,8 @@ public void testBlockLocationReorder() throws Exception {
@Override
public void reorderBlocks(Configuration c, LocatedBlocks lbs, String src) {
for (LocatedBlock lb : lbs.getLocatedBlocks()) {
if (lb.getLocations().length > 1) {
DatanodeInfo[] infos = lb.getLocations();
if (getLocatedBlockLocations(lb).length > 1) {
DatanodeInfo[] infos = getLocatedBlockLocations(lb);
if (infos[0].getHostName().equals(lookup)) {
LOG.info("HFileSystem bad host, inverting");
DatanodeInfo tmp = infos[0];
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hbase.fs;

import static org.apache.hadoop.hbase.util.LocatedBlockHelper.getLocatedBlockLocations;

import java.lang.reflect.Field;
import org.apache.hadoop.conf.Configuration;
import org.apache.hadoop.fs.FSDataOutputStream;
Expand Down Expand Up @@ -117,21 +119,22 @@ public void testBlockLocation() throws Exception {

for (int i = 0; i < 10; i++) {
// The interceptor is not set in this test, so we get the raw list at this point
LocatedBlocks l;
LocatedBlocks lbs;
final long max = EnvironmentEdgeManager.currentTime() + 10000;
do {
l = getNamenode(dfs.getClient()).getBlockLocations(fileName, 0, 1);
Assert.assertNotNull(l.getLocatedBlocks());
Assert.assertEquals(1, l.getLocatedBlocks().size());
Assert.assertTrue("Expecting " + repCount + " , got " + l.get(0).getLocations().length,
lbs = getNamenode(dfs.getClient()).getBlockLocations(fileName, 0, 1);
Assert.assertNotNull(lbs.getLocatedBlocks());
Assert.assertEquals(1, lbs.getLocatedBlocks().size());
Assert.assertTrue(
"Expecting " + repCount + " , got " + getLocatedBlockLocations(lbs.get(0)).length,
EnvironmentEdgeManager.currentTime() < max);
} while (l.get(0).getLocations().length != repCount);
} while (getLocatedBlockLocations(lbs.get(0)).length != repCount);

// Should be filtered, the name is different => The order won't change
Object originalList[] = l.getLocatedBlocks().toArray();
Object[] originalList = lbs.getLocatedBlocks().toArray();
HFileSystem.ReorderWALBlocks lrb = new HFileSystem.ReorderWALBlocks();
lrb.reorderBlocks(conf, l, fileName);
Assert.assertArrayEquals(originalList, l.getLocatedBlocks().toArray());
lrb.reorderBlocks(conf, lbs, fileName);
Assert.assertArrayEquals(originalList, lbs.getLocatedBlocks().toArray());

// Should be reordered, as we pretend to be a file name with a compliant stuff
Assert.assertNotNull(conf.get(HConstants.HBASE_DIR));
Expand All @@ -144,12 +147,12 @@ public void testBlockLocation() throws Exception {
AbstractFSWALProvider.getServerNameFromWALDirectoryName(dfs.getConf(), pseudoLogFile));

// And check we're doing the right reorder.
lrb.reorderBlocks(conf, l, pseudoLogFile);
Assert.assertEquals(host1, l.get(0).getLocations()[2].getHostName());
lrb.reorderBlocks(conf, lbs, pseudoLogFile);
Assert.assertEquals(host1, getLocatedBlockLocations(lbs.get(0))[2].getHostName());

// Check again, it should remain the same.
lrb.reorderBlocks(conf, l, pseudoLogFile);
Assert.assertEquals(host1, l.get(0).getLocations()[2].getHostName());
lrb.reorderBlocks(conf, lbs, pseudoLogFile);
Assert.assertEquals(host1, getLocatedBlockLocations(lbs.get(0))[2].getHostName());
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,8 @@
*/
package org.apache.hadoop.hbase.fs;

import static org.apache.hadoop.hbase.util.LocatedBlockHelper.getLocatedBlockLocations;

import java.io.FileNotFoundException;
import java.io.IOException;
import java.lang.reflect.Field;
Expand Down Expand Up @@ -245,25 +247,26 @@ private void testFromDFS(DistributedFileSystem dfs, String src, int repCount, St
throws Exception {
// Multiple times as the order is random
for (int i = 0; i < 10; i++) {
LocatedBlocks l;
LocatedBlocks lbs;
// The NN gets the block list asynchronously, so we may need multiple tries to get the list
final long max = EnvironmentEdgeManager.currentTime() + 10000;
boolean done;
do {
Assert.assertTrue("Can't get enouth replica", EnvironmentEdgeManager.currentTime() < max);
l = getNamenode(dfs.getClient()).getBlockLocations(src, 0, 1);
Assert.assertNotNull("Can't get block locations for " + src, l);
Assert.assertNotNull(l.getLocatedBlocks());
Assert.assertTrue(l.getLocatedBlocks().size() > 0);
lbs = getNamenode(dfs.getClient()).getBlockLocations(src, 0, 1);
Assert.assertNotNull("Can't get block locations for " + src, lbs);
Assert.assertNotNull(lbs.getLocatedBlocks());
Assert.assertTrue(lbs.getLocatedBlocks().size() > 0);

done = true;
for (int y = 0; y < l.getLocatedBlocks().size() && done; y++) {
done = (l.get(y).getLocations().length == repCount);
for (int y = 0; y < lbs.getLocatedBlocks().size() && done; y++) {
done = getLocatedBlockLocations(lbs.get(y)).length == repCount;
}
} while (!done);

for (int y = 0; y < l.getLocatedBlocks().size() && done; y++) {
Assert.assertEquals(localhost, l.get(y).getLocations()[repCount - 1].getHostName());
for (int y = 0; y < lbs.getLocatedBlocks().size() && done; y++) {
Assert.assertEquals(localhost,
getLocatedBlockLocations(lbs.get(y))[repCount - 1].getHostName());
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
package org.apache.hadoop.hbase.tool;

import static org.apache.hadoop.hbase.HBaseTestingUtil.countRows;
import static org.apache.hadoop.hbase.util.LocatedBlockHelper.getLocatedBlockLocations;
import static org.hamcrest.Matchers.greaterThan;
import static org.junit.Assert.assertArrayEquals;
import static org.junit.Assert.assertEquals;
Expand Down Expand Up @@ -731,7 +732,7 @@ private void verifyHFileFavoriteNode(Path p, AsyncTableRegionLocator regionLocat
isFavoriteNode = false;
final LocatedBlock block = locatedBlocks.get(index);

final DatanodeInfo[] locations = block.getLocations();
final DatanodeInfo[] locations = getLocatedBlockLocations(block);
for (DatanodeInfo location : locations) {

final String hostName = location.getHostName();
Expand Down

0 comments on commit ff2c10c

Please sign in to comment.