Skip to content

Commit

Permalink
[apache#895] improvement: Rename Hdfs*.java to Hadoop*.java to suppor…
Browse files Browse the repository at this point in the history
…t other Hadoop FS-compatible distributed filesystem (apache#898)

### What changes were proposed in this pull request?

In server and storage modules, there are many classes prefixed with Hdfs which use Hadoop FS API and are thus impl agnostic, not depending on specific Hdfs impl. So, it's better to rename them to Hadoop* so that we can support other Hadoop FS compatible distributed filesystem by extending existing classes. It'll make code look more naturally. 

There may be some slight differences among different Hadoop FS impls, like [hadoop-daos](https://github.com/daos-stack/daos/tree/master/src/client/java/hadoop-daos) not have a dedicated thread for reading and writing data, which is different from hdfs. Thus, we don't need to close outputstream at each flush to FS. 

### Why are the changes needed?

Fix: apache#895 

### Does this PR introduce _any_ user-facing change?

No.

### How was this patch tested?

CI passed.

Signed-off-by: jiafu zhang <jiafu.zhang@intel.com>
  • Loading branch information
jiafuzha authored May 24, 2023
1 parent e6f8fcb commit 3e58805
Show file tree
Hide file tree
Showing 66 changed files with 534 additions and 530 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -39,13 +39,13 @@
import org.apache.uniffle.common.compression.Codec;
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.util.ChecksumUtils;
import org.apache.uniffle.storage.HdfsTestBase;
import org.apache.uniffle.storage.HadoopTestBase;
import org.apache.uniffle.storage.handler.api.ShuffleWriteHandler;

import static org.junit.jupiter.api.Assertions.assertEquals;


public abstract class AbstractRssReaderTest extends HdfsTestBase {
public abstract class AbstractRssReaderTest extends HadoopTestBase {

private AtomicInteger atomicInteger = new AtomicInteger(0);

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -46,7 +46,7 @@
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.common.util.ChecksumUtils;
import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.storage.handler.impl.HdfsShuffleWriteHandler;
import org.apache.uniffle.storage.handler.impl.HadoopShuffleWriteHandler;
import org.apache.uniffle.storage.util.StorageType;

import static org.junit.jupiter.api.Assertions.assertEquals;
Expand All @@ -69,8 +69,8 @@ public class RssShuffleDataIteratorTest extends AbstractRssReaderTest {
@Test
public void readTest1() throws Exception {
String basePath = HDFS_URI + "readTest1";
HdfsShuffleWriteHandler writeHandler =
new HdfsShuffleWriteHandler("appId", 0, 0, 1, basePath, ssi1.getId(), conf);
HadoopShuffleWriteHandler writeHandler =
new HadoopShuffleWriteHandler("appId", 0, 0, 1, basePath, ssi1.getId(), conf);

Map<String, String> expectedData = Maps.newHashMap();
Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
Expand Down Expand Up @@ -130,10 +130,10 @@ public void readTest2() throws Exception {
@Test
public void readTest3() throws Exception {
String basePath = HDFS_URI + "readTest3";
HdfsShuffleWriteHandler writeHandler1 =
new HdfsShuffleWriteHandler("appId", 0, 0, 1, basePath, ssi1.getId(), conf);
HdfsShuffleWriteHandler writeHandler2 =
new HdfsShuffleWriteHandler("appId", 0, 0, 1, basePath, ssi2.getId(), conf);
HadoopShuffleWriteHandler writeHandler1 =
new HadoopShuffleWriteHandler("appId", 0, 0, 1, basePath, ssi1.getId(), conf);
HadoopShuffleWriteHandler writeHandler2 =
new HadoopShuffleWriteHandler("appId", 0, 0, 1, basePath, ssi2.getId(), conf);

Map<String, String> expectedData = Maps.newHashMap();
Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
Expand Down Expand Up @@ -165,8 +165,8 @@ public void readTest3() throws Exception {
@Test
public void readTest4() throws Exception {
String basePath = HDFS_URI + "readTest4";
HdfsShuffleWriteHandler writeHandler =
new HdfsShuffleWriteHandler("appId", 0, 0, 1, basePath, ssi1.getId(), conf);
HadoopShuffleWriteHandler writeHandler =
new HadoopShuffleWriteHandler("appId", 0, 0, 1, basePath, ssi1.getId(), conf);

Map<String, String> expectedData = Maps.newHashMap();
Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
Expand Down Expand Up @@ -199,8 +199,8 @@ public void readTest4() throws Exception {
@Test
public void readTest5() throws Exception {
String basePath = HDFS_URI + "readTest5";
HdfsShuffleWriteHandler writeHandler =
new HdfsShuffleWriteHandler("appId", 0, 0, 1, basePath, ssi1.getId(), conf);
HadoopShuffleWriteHandler writeHandler =
new HadoopShuffleWriteHandler("appId", 0, 0, 1, basePath, ssi1.getId(), conf);

Map<String, String> expectedData = Maps.newHashMap();
Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
Expand All @@ -225,8 +225,8 @@ public void readTest5() throws Exception {
@Test
public void readTest7() throws Exception {
String basePath = HDFS_URI + "readTest7";
HdfsShuffleWriteHandler writeHandler =
new HdfsShuffleWriteHandler("appId", 0, 0, 1, basePath, ssi1.getId(), conf);
HadoopShuffleWriteHandler writeHandler =
new HadoopShuffleWriteHandler("appId", 0, 0, 1, basePath, ssi1.getId(), conf);

Map<String, String> expectedData = Maps.newHashMap();
Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
Expand Down Expand Up @@ -269,10 +269,10 @@ public void readTestUncompressedShuffle() throws Exception {

private void readTestCompressOrNot(String path, boolean compress) throws Exception {
String basePath = HDFS_URI + path;
HdfsShuffleWriteHandler writeHandler1 =
new HdfsShuffleWriteHandler("appId", 0, 0, 1, basePath, ssi1.getId(), conf);
HdfsShuffleWriteHandler writeHandler2 =
new HdfsShuffleWriteHandler("appId", 0, 0, 1, basePath, ssi2.getId(), conf);
HadoopShuffleWriteHandler writeHandler1 =
new HadoopShuffleWriteHandler("appId", 0, 0, 1, basePath, ssi1.getId(), conf);
HadoopShuffleWriteHandler writeHandler2 =
new HadoopShuffleWriteHandler("appId", 0, 0, 1, basePath, ssi2.getId(), conf);

Map<String, String> expectedData = Maps.newHashMap();
Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.config.RssClientConf;
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.storage.handler.impl.HdfsShuffleWriteHandler;
import org.apache.uniffle.storage.handler.impl.HadoopShuffleWriteHandler;
import org.apache.uniffle.storage.util.StorageType;

import static org.mockito.Mockito.doNothing;
Expand All @@ -54,8 +54,8 @@ public class RssShuffleReaderTest extends AbstractRssReaderTest {
public void readTest() throws Exception {
ShuffleServerInfo ssi = new ShuffleServerInfo("127.0.0.1", 0);
String basePath = HDFS_URI + "readTest1";
HdfsShuffleWriteHandler writeHandler =
new HdfsShuffleWriteHandler("appId", 0, 0, 1, basePath, ssi.getId(), conf);
HadoopShuffleWriteHandler writeHandler =
new HadoopShuffleWriteHandler("appId", 0, 0, 1, basePath, ssi.getId(), conf);

Map<String, String> expectedData = Maps.newHashMap();
Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,7 +40,7 @@
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.config.RssClientConf;
import org.apache.uniffle.common.config.RssConf;
import org.apache.uniffle.storage.handler.impl.HdfsShuffleWriteHandler;
import org.apache.uniffle.storage.handler.impl.HadoopShuffleWriteHandler;
import org.apache.uniffle.storage.util.StorageType;

import static org.mockito.Mockito.doNothing;
Expand All @@ -56,10 +56,10 @@ public class RssShuffleReaderTest extends AbstractRssReaderTest {
public void readTest() throws Exception {
ShuffleServerInfo ssi = new ShuffleServerInfo("127.0.0.1", 0);
String basePath = HDFS_URI + "readTest1";
HdfsShuffleWriteHandler writeHandler =
new HdfsShuffleWriteHandler("appId", 0, 0, 0, basePath, ssi.getId(), conf);
final HdfsShuffleWriteHandler writeHandler1 =
new HdfsShuffleWriteHandler("appId", 0, 1, 1, basePath, ssi.getId(), conf);
HadoopShuffleWriteHandler writeHandler =
new HadoopShuffleWriteHandler("appId", 0, 0, 0, basePath, ssi.getId(), conf);
final HadoopShuffleWriteHandler writeHandler1 =
new HadoopShuffleWriteHandler("appId", 0, 1, 1, basePath, ssi.getId(), conf);

Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
final Roaring64NavigableMap taskIdBitmap = Roaring64NavigableMap.bitmapOf(0);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ public static boolean waitUntilDoneOrFail(List<CompletableFuture<Boolean>> futur
public static void validateTestModeConf(boolean testMode, String storageType) {
if (!testMode && (StorageType.LOCALFILE.name().equals(storageType)
|| (StorageType.HDFS.name()).equals(storageType))) {
throw new IllegalArgumentException("LOCALFILE or HDFS storage type should be used in test mode only, "
throw new IllegalArgumentException("LOCALFILE or HADOOP storage type should be used in test mode only, "
+ "because of the poor performance of these two types.");
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,8 +41,8 @@
import org.apache.uniffle.common.ShuffleServerInfo;
import org.apache.uniffle.common.util.ChecksumUtils;
import org.apache.uniffle.common.util.Constants;
import org.apache.uniffle.storage.HdfsTestBase;
import org.apache.uniffle.storage.handler.impl.HdfsShuffleWriteHandler;
import org.apache.uniffle.storage.HadoopTestBase;
import org.apache.uniffle.storage.handler.impl.HadoopShuffleWriteHandler;
import org.apache.uniffle.storage.util.StorageType;

import static org.junit.jupiter.api.Assertions.assertEquals;
Expand All @@ -51,7 +51,7 @@
import static org.junit.jupiter.api.Assertions.fail;
import static org.mockito.ArgumentMatchers.any;

public class ShuffleReadClientImplTest extends HdfsTestBase {
public class ShuffleReadClientImplTest extends HadoopTestBase {

private static final String EXPECTED_EXCEPTION_MESSAGE = "Exception should be thrown";
private static AtomicLong ATOMIC_LONG = new AtomicLong(0);
Expand All @@ -62,8 +62,8 @@ public class ShuffleReadClientImplTest extends HdfsTestBase {
@Test
public void readTest1() throws Exception {
String basePath = HDFS_URI + "clientReadTest1";
HdfsShuffleWriteHandler writeHandler =
new HdfsShuffleWriteHandler("appId", 0, 1, 1, basePath, ssi1.getId(), conf);
HadoopShuffleWriteHandler writeHandler =
new HadoopShuffleWriteHandler("appId", 0, 1, 1, basePath, ssi1.getId(), conf);

Map<Long, byte[]> expectedData = Maps.newHashMap();
Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
Expand Down Expand Up @@ -99,10 +99,10 @@ public void readTest1() throws Exception {
@Test
public void readTest2() throws Exception {
String basePath = HDFS_URI + "clientReadTest2";
HdfsShuffleWriteHandler writeHandler1 =
new HdfsShuffleWriteHandler("appId", 0, 0, 1, basePath, ssi1.getId(), conf);
HdfsShuffleWriteHandler writeHandler2 =
new HdfsShuffleWriteHandler("appId", 0, 0, 1, basePath, ssi2.getId(), conf);
HadoopShuffleWriteHandler writeHandler1 =
new HadoopShuffleWriteHandler("appId", 0, 0, 1, basePath, ssi1.getId(), conf);
HadoopShuffleWriteHandler writeHandler2 =
new HadoopShuffleWriteHandler("appId", 0, 0, 1, basePath, ssi2.getId(), conf);

Map<Long, byte[]> expectedData = Maps.newHashMap();
Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
Expand All @@ -123,10 +123,10 @@ public void readTest2() throws Exception {
@Test
public void readTest3() throws Exception {
String basePath = HDFS_URI + "clientReadTest3";
HdfsShuffleWriteHandler writeHandler1 =
new HdfsShuffleWriteHandler("appId", 0, 0, 1, basePath, ssi1.getId(), conf);
HdfsShuffleWriteHandler writeHandler2 =
new HdfsShuffleWriteHandler("appId", 0, 0, 1, basePath, ssi2.getId(), conf);
HadoopShuffleWriteHandler writeHandler1 =
new HadoopShuffleWriteHandler("appId", 0, 0, 1, basePath, ssi1.getId(), conf);
HadoopShuffleWriteHandler writeHandler2 =
new HadoopShuffleWriteHandler("appId", 0, 0, 1, basePath, ssi2.getId(), conf);

Map<Long, byte[]> expectedData = Maps.newHashMap();
final Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
Expand Down Expand Up @@ -158,8 +158,8 @@ public void readTest3() throws Exception {
@Test
public void readTest4() throws Exception {
String basePath = HDFS_URI + "clientReadTest4";
HdfsShuffleWriteHandler writeHandler =
new HdfsShuffleWriteHandler("appId", 0, 0, 1, basePath, ssi1.getId(), conf);
HadoopShuffleWriteHandler writeHandler =
new HadoopShuffleWriteHandler("appId", 0, 0, 1, basePath, ssi1.getId(), conf);

Map<Long, byte[]> expectedData = Maps.newHashMap();
Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
Expand Down Expand Up @@ -193,8 +193,8 @@ public void readTest4() throws Exception {
@Test
public void readTest5() throws Exception {
String basePath = HDFS_URI + "clientReadTest5";
HdfsShuffleWriteHandler writeHandler =
new HdfsShuffleWriteHandler("appId", 0, 0, 1, basePath, ssi1.getId(), conf);
HadoopShuffleWriteHandler writeHandler =
new HadoopShuffleWriteHandler("appId", 0, 0, 1, basePath, ssi1.getId(), conf);

Map<Long, byte[]> expectedData = Maps.newHashMap();
Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
Expand All @@ -215,8 +215,8 @@ public void readTest5() throws Exception {
@Test
public void readTest7() throws Exception {
String basePath = HDFS_URI + "clientReadTest7";
HdfsShuffleWriteHandler writeHandler =
new HdfsShuffleWriteHandler("appId", 0, 0, 1, basePath, ssi1.getId(), conf);
HadoopShuffleWriteHandler writeHandler =
new HadoopShuffleWriteHandler("appId", 0, 0, 1, basePath, ssi1.getId(), conf);

Map<Long, byte[]> expectedData1 = Maps.newHashMap();
Map<Long, byte[]> expectedData2 = Maps.newHashMap();
Expand Down Expand Up @@ -247,8 +247,8 @@ public void readTest7() throws Exception {
@Test
public void readTest8() throws Exception {
String basePath = HDFS_URI + "clientReadTest8";
HdfsShuffleWriteHandler writeHandler =
new HdfsShuffleWriteHandler("appId", 0, 0, 1, basePath, ssi1.getId(), conf);
HadoopShuffleWriteHandler writeHandler =
new HadoopShuffleWriteHandler("appId", 0, 0, 1, basePath, ssi1.getId(), conf);

Map<Long, byte[]> expectedData = Maps.newHashMap();
Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
Expand Down Expand Up @@ -296,8 +296,8 @@ public void readTest9() {
@Test
public void readTest10() throws Exception {
String basePath = HDFS_URI + "clientReadTest10";
HdfsShuffleWriteHandler writeHandler =
new HdfsShuffleWriteHandler("appId", 0, 0, 1, basePath, ssi1.getId(), conf);
HadoopShuffleWriteHandler writeHandler =
new HadoopShuffleWriteHandler("appId", 0, 0, 1, basePath, ssi1.getId(), conf);

Map<Long, byte[]> expectedData = Maps.newHashMap();
Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
Expand Down Expand Up @@ -325,8 +325,8 @@ public void readTest10() throws Exception {
@Test
public void readTest11() throws Exception {
String basePath = HDFS_URI + "clientReadTest11";
HdfsShuffleWriteHandler writeHandler =
new HdfsShuffleWriteHandler("appId", 0, 1, 1, basePath, ssi1.getId(), conf);
HadoopShuffleWriteHandler writeHandler =
new HadoopShuffleWriteHandler("appId", 0, 1, 1, basePath, ssi1.getId(), conf);

Map<Long, byte[]> expectedData = Maps.newHashMap();
Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
Expand Down Expand Up @@ -378,8 +378,8 @@ public void readTest11() throws Exception {
@Test
public void readTest12() throws Exception {
String basePath = HDFS_URI + "clientReadTest12";
HdfsShuffleWriteHandler writeHandler =
new HdfsShuffleWriteHandler("appId", 0, 1, 1, basePath, ssi1.getId(), conf);
HadoopShuffleWriteHandler writeHandler =
new HadoopShuffleWriteHandler("appId", 0, 1, 1, basePath, ssi1.getId(), conf);

Map<Long, byte[]> expectedData = Maps.newHashMap();
final Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
Expand All @@ -402,8 +402,8 @@ public void readTest12() throws Exception {
@Test
public void readTest13() throws Exception {
String basePath = HDFS_URI + "clientReadTest13";
HdfsShuffleWriteHandler writeHandler =
new HdfsShuffleWriteHandler("appId", 0, 1, 1, basePath, ssi1.getId(), conf);
HadoopShuffleWriteHandler writeHandler =
new HadoopShuffleWriteHandler("appId", 0, 1, 1, basePath, ssi1.getId(), conf);

Map<Long, byte[]> expectedData = Maps.newHashMap();
final Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
Expand All @@ -429,8 +429,8 @@ public void readTest13() throws Exception {
@Test
public void readTest14() throws Exception {
String basePath = HDFS_URI + "clientReadTest14";
HdfsShuffleWriteHandler writeHandler =
new HdfsShuffleWriteHandler("appId", 0, 1, 1, basePath, ssi1.getId(), conf);
HadoopShuffleWriteHandler writeHandler =
new HadoopShuffleWriteHandler("appId", 0, 1, 1, basePath, ssi1.getId(), conf);

Map<Long, byte[]> expectedData = Maps.newHashMap();
final Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
Expand All @@ -453,8 +453,8 @@ public void readTest14() throws Exception {
@Test
public void readTest15() throws Exception {
String basePath = HDFS_URI + "clientReadTest15";
HdfsShuffleWriteHandler writeHandler =
new HdfsShuffleWriteHandler("appId", 0, 1, 1, basePath, ssi1.getId(), conf);
HadoopShuffleWriteHandler writeHandler =
new HadoopShuffleWriteHandler("appId", 0, 1, 1, basePath, ssi1.getId(), conf);

Map<Long, byte[]> expectedData = Maps.newHashMap();
final Roaring64NavigableMap blockIdBitmap = Roaring64NavigableMap.bitmapOf();
Expand All @@ -477,7 +477,7 @@ public void readTest15() throws Exception {
}

private void writeTestData(
HdfsShuffleWriteHandler writeHandler,
HadoopShuffleWriteHandler writeHandler,
int num, int length, long taskAttemptId,
Map<Long, byte[]> expectedData,
Roaring64NavigableMap blockIdBitmap) throws Exception {
Expand All @@ -496,7 +496,7 @@ private void writeTestData(
}

private void writeDuplicatedData(
HdfsShuffleWriteHandler writeHandler,
HadoopShuffleWriteHandler writeHandler,
int num, int length, long taskAttemptId,
Map<Long, byte[]> expectedData,
Roaring64NavigableMap blockIdBitmap) throws Exception {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,8 +70,8 @@
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;

public class KerberizedHdfs implements Serializable {
private static final Logger LOGGER = LoggerFactory.getLogger(KerberizedHdfs.class);
public class KerberizedHadoop implements Serializable {
private static final Logger LOGGER = LoggerFactory.getLogger(KerberizedHadoop.class);

private MiniKdc kdc;
private File workDir;
Expand All @@ -80,7 +80,7 @@ public class KerberizedHdfs implements Serializable {

private MiniDFSCluster kerberizedDfsCluster;

private Class testRunnerCls = KerberizedHdfs.class;
private Class testRunnerCls = KerberizedHadoop.class;

// The superuser for accessing HDFS
private String hdfsKeytab;
Expand Down Expand Up @@ -237,7 +237,7 @@ public void tearDown() throws IOException {
if (kdc != null) {
kdc.stop();
}
setTestRunner(KerberizedHdfs.class);
setTestRunner(KerberizedHadoop.class);
UserGroupInformation.reset();
}

Expand Down
Loading

0 comments on commit 3e58805

Please sign in to comment.