Skip to content

Commit

Permalink
[HUDI-5145] Avoid starting HDFS in hudi-utilities tests
Browse files Browse the repository at this point in the history
  • Loading branch information
xushiyan committed Nov 9, 2022
1 parent f19f6e7 commit d7ce19b
Show file tree
Hide file tree
Showing 21 changed files with 314 additions and 292 deletions.
Expand Up @@ -53,8 +53,12 @@ public class HdfsTestService {
private MiniDFSCluster miniDfsCluster;

public HdfsTestService() throws IOException {
hadoopConf = new Configuration();
workDir = Files.createTempDirectory("temp").toAbsolutePath().toString();
this(new Configuration());
}

public HdfsTestService(Configuration hadoopConf) throws IOException {
this.hadoopConf = hadoopConf;
this.workDir = Files.createTempDirectory("temp").toAbsolutePath().toString();
}

public Configuration getHadoopConf() {
Expand Down
Expand Up @@ -65,7 +65,7 @@ public class TestDFSHoodieTestSuiteWriterAdapter extends UtilitiesTestBase {

@BeforeAll
public static void initClass() throws Exception {
UtilitiesTestBase.initTestServices(false, false);
UtilitiesTestBase.initTestServices(true, false, false);
}

@AfterAll
Expand Down Expand Up @@ -131,15 +131,15 @@ public void testDFSTwoFilesWriteWithRollover() throws IOException {
// TODO(HUDI-3668): Fix this test
public void testDFSWorkloadSinkWithMultipleFilesFunctional() throws IOException {
DeltaConfig dfsSinkConfig = new DFSDeltaConfig(DeltaOutputMode.DFS, DeltaInputType.AVRO,
new SerializableConfiguration(jsc.hadoopConfiguration()), dfsBasePath, dfsBasePath,
new SerializableConfiguration(jsc.hadoopConfiguration()), basePath, basePath,
schemaProvider.getSourceSchema().toString(), 10240L, jsc.defaultParallelism(), false, false);
DeltaWriterAdapter<GenericRecord> dfsDeltaWriterAdapter = DeltaWriterFactory
.getDeltaWriterAdapter(dfsSinkConfig, 1);
FlexibleSchemaRecordGenerationIterator itr = new FlexibleSchemaRecordGenerationIterator(1000,
schemaProvider.getSourceSchema().toString());
dfsDeltaWriterAdapter.write(itr);
FileSystem fs = FSUtils.getFs(dfsBasePath, jsc.hadoopConfiguration());
FileStatus[] fileStatuses = fs.listStatus(new Path(dfsBasePath));
FileSystem fs = FSUtils.getFs(basePath, jsc.hadoopConfiguration());
FileStatus[] fileStatuses = fs.listStatus(new Path(basePath));
// Since maxFileSize was 10240L and we produced 1K records each close to 1K size, we should produce more than
// 1 file
assertTrue(fileStatuses.length > 0);
Expand Down
Expand Up @@ -58,7 +58,7 @@ public class TestFileDeltaInputWriter extends UtilitiesTestBase {

@BeforeAll
public static void initClass() throws Exception {
UtilitiesTestBase.initTestServices(false, false);
UtilitiesTestBase.initTestServices(true, false, false);
}

@AfterAll
Expand All @@ -82,7 +82,7 @@ public void teardown() throws Exception {
public void testAvroFileSinkWriter() throws IOException {
// 1. Create a Avro File Sink Writer
DeltaInputWriter<GenericRecord> fileSinkWriter =
new AvroFileDeltaInputWriter(jsc.hadoopConfiguration(), dfsBasePath + "/input", schemaProvider.getSourceSchema()
new AvroFileDeltaInputWriter(jsc.hadoopConfiguration(), basePath + "/input", schemaProvider.getSourceSchema()
.toString(), 1024 * 1024L);
GenericRecordFullPayloadGenerator payloadGenerator =
new GenericRecordFullPayloadGenerator(schemaProvider.getSourceSchema());
Expand All @@ -96,15 +96,15 @@ public void testAvroFileSinkWriter() throws IOException {
});
fileSinkWriter.close();
DeltaWriteStats deltaWriteStats = fileSinkWriter.getDeltaWriteStats();
FileSystem fs = FSUtils.getFs(dfsBasePath, jsc.hadoopConfiguration());
FileSystem fs = FSUtils.getFs(basePath, jsc.hadoopConfiguration());
FileStatus[] fileStatuses = fs.listStatus(new Path(deltaWriteStats.getFilePath()));
// Atleast 1 file was written
assertEquals(1, fileStatuses.length);
// File length should be greater than 0
assertTrue(fileStatuses[0].getLen() > 0);
// File length should be the same as the number of bytes written
assertTrue(deltaWriteStats.getBytesWritten() > 0);
List<String> paths = Arrays.asList(fs.globStatus(new Path(dfsBasePath + "/*/*.avro")))
List<String> paths = Arrays.asList(fs.globStatus(new Path(basePath + "/*/*.avro")))
.stream().map(f -> f.getPath().toString()).collect(Collectors.toList());
JavaRDD<GenericRecord> writtenRecords =
SparkBasedReader.readAvro(sparkSession, schemaProvider.getSourceSchema().toString(), paths, Option.empty(),
Expand All @@ -119,7 +119,7 @@ public void testAvroFileSinkWriter() throws IOException {
public void testAvroFileSinkCreateNewWriter() throws IOException {
// 1. Create a Avro File Sink Writer
DeltaInputWriter<GenericRecord> fileSinkWriter =
new AvroFileDeltaInputWriter(jsc.hadoopConfiguration(), dfsBasePath,
new AvroFileDeltaInputWriter(jsc.hadoopConfiguration(), basePath,
schemaProvider.getSourceSchema().toString(),
1024 * 1024L);
GenericRecordFullPayloadGenerator payloadGenerator =
Expand Down
Expand Up @@ -94,30 +94,30 @@ public static Stream<Arguments> configParams() {

@BeforeAll
public static void initClass() throws Exception {
UtilitiesTestBase.initTestServices(true, true);
UtilitiesTestBase.initTestServices(true, true, true);
// prepare the configs.
UtilitiesTestBase.Helpers.copyToDFSFromAbsolutePath(System.getProperty("user.dir") + "/.."
+ BASE_PROPERTIES_DOCKER_DEMO_RELATIVE_PATH, dfs, dfsBasePath + "/base.properties");
+ BASE_PROPERTIES_DOCKER_DEMO_RELATIVE_PATH, fs, basePath + "/base.properties");
UtilitiesTestBase.Helpers.copyToDFSFromAbsolutePath(System.getProperty("user.dir") + "/.."
+ SOURCE_SCHEMA_DOCKER_DEMO_RELATIVE_PATH, dfs, dfsBasePath + "/source.avsc");
+ SOURCE_SCHEMA_DOCKER_DEMO_RELATIVE_PATH, fs, basePath + "/source.avsc");
UtilitiesTestBase.Helpers.copyToDFSFromAbsolutePath(System.getProperty("user.dir") + "/.."
+ TARGET_SCHEMA_DOCKER_DEMO_RELATIVE_PATH, dfs, dfsBasePath + "/target.avsc");
+ TARGET_SCHEMA_DOCKER_DEMO_RELATIVE_PATH, fs, basePath + "/target.avsc");

UtilitiesTestBase.Helpers.copyToDFSFromAbsolutePath(System.getProperty("user.dir") + "/.."
+ COW_DAG_SOURCE_PATH, dfs, dfsBasePath + "/" + COW_DAG_FILE_NAME);
+ COW_DAG_SOURCE_PATH, fs, basePath + "/" + COW_DAG_FILE_NAME);
UtilitiesTestBase.Helpers.copyToDFSFromAbsolutePath(System.getProperty("user.dir") + "/.."
+ MOR_DAG_SOURCE_PATH, dfs, dfsBasePath + "/" + MOR_DAG_FILE_NAME);
+ MOR_DAG_SOURCE_PATH, fs, basePath + "/" + MOR_DAG_FILE_NAME);

TypedProperties props = getProperties();
UtilitiesTestBase.Helpers.savePropsToDFS(props, dfs, dfsBasePath + "/test-source"
UtilitiesTestBase.Helpers.savePropsToDFS(props, fs, basePath + "/test-source"
+ ".properties");

UtilitiesTestBase.Helpers.copyToDFSFromAbsolutePath(System.getProperty("user.dir") + "/.."
+ COW_DAG_SPARK_DATASOURCE_NODES_RELATIVE_PATH, dfs, dfsBasePath + "/" + COW_DAG_FILE_NAME_SPARK_DATASOURCE_NODES);
UtilitiesTestBase.Helpers.savePropsToDFS(getProperties(), dfs, dfsBasePath + "/test-source"
+ COW_DAG_SPARK_DATASOURCE_NODES_RELATIVE_PATH, fs, basePath + "/" + COW_DAG_FILE_NAME_SPARK_DATASOURCE_NODES);
UtilitiesTestBase.Helpers.savePropsToDFS(getProperties(), fs, basePath + "/test-source"
+ ".properties");
UtilitiesTestBase.Helpers.copyToDFSFromAbsolutePath(System.getProperty("user.dir") + "/.."
+ SPARK_SQL_DAG_SOURCE_PATH, dfs, dfsBasePath + "/" + SPARK_SQL_DAG_FILE_NAME);
+ SPARK_SQL_DAG_SOURCE_PATH, fs, basePath + "/" + SPARK_SQL_DAG_FILE_NAME);

// Properties used for the delta-streamer which incrementally pulls from upstream DFS Avro source and
// writes to downstream hudi table
Expand All @@ -127,10 +127,10 @@ public static void initClass() throws Exception {
downstreamProps.setProperty("hoodie.datasource.write.partitionpath.field", "timestamp");

// Source schema is the target schema of upstream table
downstreamProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/source.avsc");
downstreamProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/source.avsc");
UtilitiesTestBase.Helpers.savePropsToDFS(downstreamProps, dfs,
dfsBasePath + "/test-downstream-source.properties");
downstreamProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", basePath + "/source.avsc");
downstreamProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", basePath + "/source.avsc");
UtilitiesTestBase.Helpers.savePropsToDFS(downstreamProps, fs,
basePath + "/test-downstream-source.properties");
// these tests cause a lot of log verbosity from spark, turning it down
Logger.getLogger("org.apache.spark").setLevel(Level.WARN);
}
Expand All @@ -151,8 +151,8 @@ public void teardown() throws Exception {
}

private void cleanDFSDirs() throws Exception {
dfs.delete(new Path(dfsBasePath + "/input"), true);
dfs.delete(new Path(dfsBasePath + "/result"), true);
fs.delete(new Path(basePath + "/input"), true);
fs.delete(new Path(basePath + "/result"), true);
}

private static TypedProperties getProperties() {
Expand All @@ -161,9 +161,9 @@ private static TypedProperties getProperties() {
props.setProperty("hoodie.datasource.write.partitionpath.field", "timestamp");
props.setProperty("hoodie.deltastreamer.keygen.timebased.timestamp.type", "UNIX_TIMESTAMP");
props.setProperty("hoodie.deltastreamer.keygen.timebased.output.dateformat", "yyyy/MM/dd");
props.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/source.avsc");
props.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/source.avsc");
props.setProperty("hoodie.deltastreamer.source.dfs.root", dfsBasePath + "/input");
props.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", basePath + "/source.avsc");
props.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", basePath + "/source.avsc");
props.setProperty("hoodie.deltastreamer.source.dfs.root", basePath + "/input");
props.setProperty("hoodie.datasource.hive_sync.assume_date_partitioning", "true");
props.setProperty("hoodie.datasource.hive_sync.skip_ro_suffix", "true");
props.setProperty("hoodie.datasource.write.keytranslator.class", "org.apache.hudi"
Expand Down Expand Up @@ -205,8 +205,8 @@ private static TypedProperties getProperties() {
@MethodSource("configParams")
public void testDagWithInsertUpsertAndValidate(boolean useDeltaStreamer, String tableType) throws Exception {
this.cleanDFSDirs();
String inputBasePath = dfsBasePath + "/input/" + UUID.randomUUID().toString();
String outputBasePath = dfsBasePath + "/result/" + UUID.randomUUID().toString();
String inputBasePath = basePath + "/input/" + UUID.randomUUID().toString();
String outputBasePath = basePath + "/result/" + UUID.randomUUID().toString();
HoodieTestSuiteConfig cfg = makeConfig(inputBasePath, outputBasePath, useDeltaStreamer, tableType);
cfg.workloadDagGenerator = ComplexDagGenerator.class.getName();
HoodieTestSuiteJob hoodieTestSuiteJob = new HoodieTestSuiteJob(cfg, jsc);
Expand All @@ -220,8 +220,8 @@ public void testHiveSync() throws Exception {
boolean useDeltaStreamer = false;
String tableType = "COPY_ON_WRITE";
this.cleanDFSDirs();
String inputBasePath = dfsBasePath + "/input";
String outputBasePath = dfsBasePath + "/result";
String inputBasePath = basePath + "/input";
String outputBasePath = basePath + "/result";
HoodieTestSuiteConfig cfg = makeConfig(inputBasePath, outputBasePath, useDeltaStreamer, tableType);
if (tableType == HoodieTableType.COPY_ON_WRITE.name()) {
cfg.workloadDagGenerator = HiveSyncDagGenerator.class.getName();
Expand All @@ -238,11 +238,11 @@ public void testHiveSync() throws Exception {
public void testCOWFullDagFromYaml() throws Exception {
boolean useDeltaStreamer = false;
this.cleanDFSDirs();
String inputBasePath = dfsBasePath + "/input";
String outputBasePath = dfsBasePath + "/result";
String inputBasePath = basePath + "/input";
String outputBasePath = basePath + "/result";
HoodieTestSuiteConfig cfg = makeConfig(inputBasePath, outputBasePath, useDeltaStreamer, HoodieTableType
.COPY_ON_WRITE.name());
cfg.workloadYamlPath = dfsBasePath + "/" + COW_DAG_FILE_NAME;
cfg.workloadYamlPath = basePath + "/" + COW_DAG_FILE_NAME;
HoodieTestSuiteJob hoodieTestSuiteJob = new HoodieTestSuiteJob(cfg, jsc);
hoodieTestSuiteJob.runTestSuite();
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(new Configuration()).setBasePath(cfg.targetBasePath).build();
Expand All @@ -253,11 +253,11 @@ public void testCOWFullDagFromYaml() throws Exception {
public void testMORFullDagFromYaml() throws Exception {
boolean useDeltaStreamer = false;
this.cleanDFSDirs();
String inputBasePath = dfsBasePath + "/input";
String outputBasePath = dfsBasePath + "/result";
String inputBasePath = basePath + "/input";
String outputBasePath = basePath + "/result";
HoodieTestSuiteConfig cfg = makeConfig(inputBasePath, outputBasePath, useDeltaStreamer, HoodieTableType
.MERGE_ON_READ.name());
cfg.workloadYamlPath = dfsBasePath + "/" + MOR_DAG_FILE_NAME;
cfg.workloadYamlPath = basePath + "/" + MOR_DAG_FILE_NAME;
HoodieTestSuiteJob hoodieTestSuiteJob = new HoodieTestSuiteJob(cfg, jsc);
hoodieTestSuiteJob.runTestSuite();
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(new Configuration()).setBasePath(cfg.targetBasePath).build();
Expand All @@ -272,13 +272,13 @@ public void testSparkDataSourceNodesDagWithLock() throws Exception {
TypedProperties props = getProperties();
props.setProperty("hoodie.write.concurrency.mode", "optimistic_concurrency_control");
props.setProperty("hoodie.failed.writes.cleaner.policy", "LAZY");
UtilitiesTestBase.Helpers.savePropsToDFS(props, dfs, dfsBasePath + "/test-source"
UtilitiesTestBase.Helpers.savePropsToDFS(props, fs, basePath + "/test-source"
+ ".properties");
String inputBasePath = dfsBasePath + "/input";
String outputBasePath = dfsBasePath + "/result";
String inputBasePath = basePath + "/input";
String outputBasePath = basePath + "/result";
HoodieTestSuiteConfig cfg = makeConfig(inputBasePath, outputBasePath, useDeltaStreamer, HoodieTableType
.COPY_ON_WRITE.name());
cfg.workloadYamlPath = dfsBasePath + "/" + COW_DAG_FILE_NAME_SPARK_DATASOURCE_NODES;
cfg.workloadYamlPath = basePath + "/" + COW_DAG_FILE_NAME_SPARK_DATASOURCE_NODES;
HoodieTestSuiteJob hoodieTestSuiteJob = new HoodieTestSuiteJob(cfg, jsc);
hoodieTestSuiteJob.runTestSuite();
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(new Configuration()).setBasePath(cfg.targetBasePath).build();
Expand All @@ -289,11 +289,11 @@ public void testSparkDataSourceNodesDagWithLock() throws Exception {
public void testSparkSqlDag() throws Exception {
boolean useDeltaStreamer = false;
this.cleanDFSDirs();
String inputBasePath = dfsBasePath + "/input";
String outputBasePath = dfsBasePath + "/result";
String inputBasePath = basePath + "/input";
String outputBasePath = basePath + "/result";
HoodieTestSuiteConfig cfg = makeConfig(inputBasePath, outputBasePath, useDeltaStreamer, HoodieTableType
.COPY_ON_WRITE.name());
cfg.workloadYamlPath = dfsBasePath + "/" + SPARK_SQL_DAG_FILE_NAME;
cfg.workloadYamlPath = basePath + "/" + SPARK_SQL_DAG_FILE_NAME;
HoodieTestSuiteJob hoodieTestSuiteJob = new HoodieTestSuiteJob(cfg, jsc);
hoodieTestSuiteJob.runTestSuite();
}
Expand All @@ -307,7 +307,7 @@ protected HoodieTestSuiteConfig makeConfig(String inputBasePath, String outputBa
cfg.tableType = tableType;
cfg.sourceClassName = AvroDFSSource.class.getName();
cfg.sourceOrderingField = SchemaUtils.SOURCE_ORDERING_FIELD;
cfg.propsFilePath = dfsBasePath + "/test-source.properties";
cfg.propsFilePath = basePath + "/test-source.properties";
cfg.outputTypeName = DeltaOutputMode.DFS.name();
cfg.inputFormatName = DeltaInputType.AVRO.name();
cfg.limitFileSize = 1024 * 1024L;
Expand Down
Expand Up @@ -43,7 +43,7 @@ public class TestDFSAvroDeltaInputReader extends UtilitiesTestBase {

@BeforeAll
public static void initClass() throws Exception {
UtilitiesTestBase.initTestServices(false, false);
UtilitiesTestBase.initTestServices(true, false, false);
}

@AfterAll
Expand All @@ -59,12 +59,12 @@ public void setup() throws Exception {
@Test
@Disabled
public void testDFSSinkReader() throws IOException {
FileSystem fs = FSUtils.getFs(dfsBasePath, new Configuration());
FileSystem fs = FSUtils.getFs(basePath, new Configuration());
// Create 10 avro files with 10 records each
TestUtils.createAvroFiles(jsc, sparkSession, dfsBasePath, 10, 10);
FileStatus[] statuses = fs.globStatus(new Path(dfsBasePath + "/*/*.avro"));
TestUtils.createAvroFiles(jsc, sparkSession, basePath, 10, 10);
FileStatus[] statuses = fs.globStatus(new Path(basePath + "/*/*.avro"));
DFSAvroDeltaInputReader reader =
new DFSAvroDeltaInputReader(sparkSession, TestUtils.getSchema().toString(), dfsBasePath, Option.empty(),
new DFSAvroDeltaInputReader(sparkSession, TestUtils.getSchema().toString(), basePath, Option.empty(),
Option.empty());
assertEquals(reader.analyzeSingleFile(statuses[0].getPath().toString()), 5);
assertEquals(reader.read(100).count(), 100);
Expand Down
Expand Up @@ -51,7 +51,7 @@ public class TestDFSHoodieDatasetInputReader extends UtilitiesTestBase {

@BeforeAll
public static void initClass() throws Exception {
UtilitiesTestBase.initTestServices(false, false);
UtilitiesTestBase.initTestServices(true, false, false);
}

@AfterAll
Expand All @@ -62,7 +62,7 @@ public static void cleanupClass() {
@BeforeEach
public void setup() throws Exception {
super.setup();
HoodieTestUtils.init(jsc.hadoopConfiguration(), dfsBasePath);
HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath);
}

@AfterEach
Expand Down Expand Up @@ -117,7 +117,7 @@ private HoodieWriteConfig makeHoodieClientConfig() throws Exception {

private HoodieWriteConfig.Builder makeHoodieClientConfigBuilder() throws Exception {
// Prepare the AvroParquetIO
return HoodieWriteConfig.newBuilder().withPath(dfsBasePath)
return HoodieWriteConfig.newBuilder().withPath(basePath)
.withParallelism(2, 2)
.withDeleteParallelism(2)
.withSchema(HoodieTestDataGenerator
Expand Down
Expand Up @@ -104,14 +104,14 @@ public class HoodieDeltaStreamerTestBase extends UtilitiesTestBase {

@BeforeAll
public static void initClass() throws Exception {
UtilitiesTestBase.initTestServices(true, true);
PARQUET_SOURCE_ROOT = dfsBasePath + "/parquetFiles";
ORC_SOURCE_ROOT = dfsBasePath + "/orcFiles";
JSON_KAFKA_SOURCE_ROOT = dfsBasePath + "/jsonKafkaFiles";
UtilitiesTestBase.initTestServices(false, true, true);
PARQUET_SOURCE_ROOT = basePath + "/parquetFiles";
ORC_SOURCE_ROOT = basePath + "/orcFiles";
JSON_KAFKA_SOURCE_ROOT = basePath + "/jsonKafkaFiles";
testUtils = new KafkaTestUtils();
testUtils.setup();
topicName = "topic" + testNum;
prepareInitialConfigs(dfs, dfsBasePath, testUtils.brokerAddress());
prepareInitialConfigs(fs, basePath, testUtils.brokerAddress());

prepareParquetDFSFiles(PARQUET_NUM_RECORDS, PARQUET_SOURCE_ROOT);
prepareORCDFSFiles(ORC_NUM_RECORDS, ORC_SOURCE_ROOT);
Expand Down

0 comments on commit d7ce19b

Please sign in to comment.