Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[HUDI-5145] Avoid starting HDFS in hudi-utilities tests #7171

Merged
merged 1 commit into from Nov 10, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Expand Up @@ -53,8 +53,12 @@ public class HdfsTestService {
private MiniDFSCluster miniDfsCluster;

public HdfsTestService() throws IOException {
hadoopConf = new Configuration();
workDir = Files.createTempDirectory("temp").toAbsolutePath().toString();
this(new Configuration());
}

public HdfsTestService(Configuration hadoopConf) throws IOException {
this.hadoopConf = hadoopConf;
this.workDir = Files.createTempDirectory("temp").toAbsolutePath().toString();
}

public Configuration getHadoopConf() {
Expand Down
Expand Up @@ -65,7 +65,7 @@ public class TestDFSHoodieTestSuiteWriterAdapter extends UtilitiesTestBase {

@BeforeAll
public static void initClass() throws Exception {
UtilitiesTestBase.initTestServices(false, false);
UtilitiesTestBase.initTestServices(true, false, false);
}

@AfterAll
Expand Down Expand Up @@ -131,15 +131,15 @@ public void testDFSTwoFilesWriteWithRollover() throws IOException {
// TODO(HUDI-3668): Fix this test
public void testDFSWorkloadSinkWithMultipleFilesFunctional() throws IOException {
DeltaConfig dfsSinkConfig = new DFSDeltaConfig(DeltaOutputMode.DFS, DeltaInputType.AVRO,
new SerializableConfiguration(jsc.hadoopConfiguration()), dfsBasePath, dfsBasePath,
new SerializableConfiguration(jsc.hadoopConfiguration()), basePath, basePath,
schemaProvider.getSourceSchema().toString(), 10240L, jsc.defaultParallelism(), false, false);
DeltaWriterAdapter<GenericRecord> dfsDeltaWriterAdapter = DeltaWriterFactory
.getDeltaWriterAdapter(dfsSinkConfig, 1);
FlexibleSchemaRecordGenerationIterator itr = new FlexibleSchemaRecordGenerationIterator(1000,
schemaProvider.getSourceSchema().toString());
dfsDeltaWriterAdapter.write(itr);
FileSystem fs = FSUtils.getFs(dfsBasePath, jsc.hadoopConfiguration());
FileStatus[] fileStatuses = fs.listStatus(new Path(dfsBasePath));
FileSystem fs = FSUtils.getFs(basePath, jsc.hadoopConfiguration());
FileStatus[] fileStatuses = fs.listStatus(new Path(basePath));
// Since maxFileSize was 10240L and we produced 1K records each close to 1K size, we should produce more than
// 1 file
assertTrue(fileStatuses.length > 0);
Expand Down
Expand Up @@ -58,7 +58,7 @@ public class TestFileDeltaInputWriter extends UtilitiesTestBase {

@BeforeAll
public static void initClass() throws Exception {
UtilitiesTestBase.initTestServices(false, false);
UtilitiesTestBase.initTestServices(true, false, false);
}

@AfterAll
Expand All @@ -82,7 +82,7 @@ public void teardown() throws Exception {
public void testAvroFileSinkWriter() throws IOException {
// 1. Create a Avro File Sink Writer
DeltaInputWriter<GenericRecord> fileSinkWriter =
new AvroFileDeltaInputWriter(jsc.hadoopConfiguration(), dfsBasePath + "/input", schemaProvider.getSourceSchema()
new AvroFileDeltaInputWriter(jsc.hadoopConfiguration(), basePath + "/input", schemaProvider.getSourceSchema()
.toString(), 1024 * 1024L);
GenericRecordFullPayloadGenerator payloadGenerator =
new GenericRecordFullPayloadGenerator(schemaProvider.getSourceSchema());
Expand All @@ -96,15 +96,15 @@ public void testAvroFileSinkWriter() throws IOException {
});
fileSinkWriter.close();
DeltaWriteStats deltaWriteStats = fileSinkWriter.getDeltaWriteStats();
FileSystem fs = FSUtils.getFs(dfsBasePath, jsc.hadoopConfiguration());
FileSystem fs = FSUtils.getFs(basePath, jsc.hadoopConfiguration());
FileStatus[] fileStatuses = fs.listStatus(new Path(deltaWriteStats.getFilePath()));
// Atleast 1 file was written
assertEquals(1, fileStatuses.length);
// File length should be greater than 0
assertTrue(fileStatuses[0].getLen() > 0);
// File length should be the same as the number of bytes written
assertTrue(deltaWriteStats.getBytesWritten() > 0);
List<String> paths = Arrays.asList(fs.globStatus(new Path(dfsBasePath + "/*/*.avro")))
List<String> paths = Arrays.asList(fs.globStatus(new Path(basePath + "/*/*.avro")))
.stream().map(f -> f.getPath().toString()).collect(Collectors.toList());
JavaRDD<GenericRecord> writtenRecords =
SparkBasedReader.readAvro(sparkSession, schemaProvider.getSourceSchema().toString(), paths, Option.empty(),
Expand All @@ -119,7 +119,7 @@ public void testAvroFileSinkWriter() throws IOException {
public void testAvroFileSinkCreateNewWriter() throws IOException {
// 1. Create a Avro File Sink Writer
DeltaInputWriter<GenericRecord> fileSinkWriter =
new AvroFileDeltaInputWriter(jsc.hadoopConfiguration(), dfsBasePath,
new AvroFileDeltaInputWriter(jsc.hadoopConfiguration(), basePath,
schemaProvider.getSourceSchema().toString(),
1024 * 1024L);
GenericRecordFullPayloadGenerator payloadGenerator =
Expand Down
Expand Up @@ -94,30 +94,30 @@ public static Stream<Arguments> configParams() {

@BeforeAll
public static void initClass() throws Exception {
UtilitiesTestBase.initTestServices(true, true);
UtilitiesTestBase.initTestServices(true, true, true);
// prepare the configs.
UtilitiesTestBase.Helpers.copyToDFSFromAbsolutePath(System.getProperty("user.dir") + "/.."
+ BASE_PROPERTIES_DOCKER_DEMO_RELATIVE_PATH, dfs, dfsBasePath + "/base.properties");
+ BASE_PROPERTIES_DOCKER_DEMO_RELATIVE_PATH, fs, basePath + "/base.properties");
UtilitiesTestBase.Helpers.copyToDFSFromAbsolutePath(System.getProperty("user.dir") + "/.."
+ SOURCE_SCHEMA_DOCKER_DEMO_RELATIVE_PATH, dfs, dfsBasePath + "/source.avsc");
+ SOURCE_SCHEMA_DOCKER_DEMO_RELATIVE_PATH, fs, basePath + "/source.avsc");
UtilitiesTestBase.Helpers.copyToDFSFromAbsolutePath(System.getProperty("user.dir") + "/.."
+ TARGET_SCHEMA_DOCKER_DEMO_RELATIVE_PATH, dfs, dfsBasePath + "/target.avsc");
+ TARGET_SCHEMA_DOCKER_DEMO_RELATIVE_PATH, fs, basePath + "/target.avsc");

UtilitiesTestBase.Helpers.copyToDFSFromAbsolutePath(System.getProperty("user.dir") + "/.."
+ COW_DAG_SOURCE_PATH, dfs, dfsBasePath + "/" + COW_DAG_FILE_NAME);
+ COW_DAG_SOURCE_PATH, fs, basePath + "/" + COW_DAG_FILE_NAME);
UtilitiesTestBase.Helpers.copyToDFSFromAbsolutePath(System.getProperty("user.dir") + "/.."
+ MOR_DAG_SOURCE_PATH, dfs, dfsBasePath + "/" + MOR_DAG_FILE_NAME);
+ MOR_DAG_SOURCE_PATH, fs, basePath + "/" + MOR_DAG_FILE_NAME);

TypedProperties props = getProperties();
UtilitiesTestBase.Helpers.savePropsToDFS(props, dfs, dfsBasePath + "/test-source"
UtilitiesTestBase.Helpers.savePropsToDFS(props, fs, basePath + "/test-source"
+ ".properties");

UtilitiesTestBase.Helpers.copyToDFSFromAbsolutePath(System.getProperty("user.dir") + "/.."
+ COW_DAG_SPARK_DATASOURCE_NODES_RELATIVE_PATH, dfs, dfsBasePath + "/" + COW_DAG_FILE_NAME_SPARK_DATASOURCE_NODES);
UtilitiesTestBase.Helpers.savePropsToDFS(getProperties(), dfs, dfsBasePath + "/test-source"
+ COW_DAG_SPARK_DATASOURCE_NODES_RELATIVE_PATH, fs, basePath + "/" + COW_DAG_FILE_NAME_SPARK_DATASOURCE_NODES);
UtilitiesTestBase.Helpers.savePropsToDFS(getProperties(), fs, basePath + "/test-source"
+ ".properties");
UtilitiesTestBase.Helpers.copyToDFSFromAbsolutePath(System.getProperty("user.dir") + "/.."
+ SPARK_SQL_DAG_SOURCE_PATH, dfs, dfsBasePath + "/" + SPARK_SQL_DAG_FILE_NAME);
+ SPARK_SQL_DAG_SOURCE_PATH, fs, basePath + "/" + SPARK_SQL_DAG_FILE_NAME);

// Properties used for the delta-streamer which incrementally pulls from upstream DFS Avro source and
// writes to downstream hudi table
Expand All @@ -127,10 +127,10 @@ public static void initClass() throws Exception {
downstreamProps.setProperty("hoodie.datasource.write.partitionpath.field", "timestamp");

// Source schema is the target schema of upstream table
downstreamProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/source.avsc");
downstreamProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/source.avsc");
UtilitiesTestBase.Helpers.savePropsToDFS(downstreamProps, dfs,
dfsBasePath + "/test-downstream-source.properties");
downstreamProps.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", basePath + "/source.avsc");
downstreamProps.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", basePath + "/source.avsc");
UtilitiesTestBase.Helpers.savePropsToDFS(downstreamProps, fs,
basePath + "/test-downstream-source.properties");
// these tests cause a lot of log verbosity from spark, turning it down
Logger.getLogger("org.apache.spark").setLevel(Level.WARN);
}
Expand All @@ -151,8 +151,8 @@ public void teardown() throws Exception {
}

private void cleanDFSDirs() throws Exception {
dfs.delete(new Path(dfsBasePath + "/input"), true);
dfs.delete(new Path(dfsBasePath + "/result"), true);
fs.delete(new Path(basePath + "/input"), true);
fs.delete(new Path(basePath + "/result"), true);
}

private static TypedProperties getProperties() {
Expand All @@ -161,9 +161,9 @@ private static TypedProperties getProperties() {
props.setProperty("hoodie.datasource.write.partitionpath.field", "timestamp");
props.setProperty("hoodie.deltastreamer.keygen.timebased.timestamp.type", "UNIX_TIMESTAMP");
props.setProperty("hoodie.deltastreamer.keygen.timebased.output.dateformat", "yyyy/MM/dd");
props.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", dfsBasePath + "/source.avsc");
props.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", dfsBasePath + "/source.avsc");
props.setProperty("hoodie.deltastreamer.source.dfs.root", dfsBasePath + "/input");
props.setProperty("hoodie.deltastreamer.schemaprovider.source.schema.file", basePath + "/source.avsc");
props.setProperty("hoodie.deltastreamer.schemaprovider.target.schema.file", basePath + "/source.avsc");
props.setProperty("hoodie.deltastreamer.source.dfs.root", basePath + "/input");
props.setProperty("hoodie.datasource.hive_sync.assume_date_partitioning", "true");
props.setProperty("hoodie.datasource.hive_sync.skip_ro_suffix", "true");
props.setProperty("hoodie.datasource.write.keytranslator.class", "org.apache.hudi"
Expand Down Expand Up @@ -205,8 +205,8 @@ private static TypedProperties getProperties() {
@MethodSource("configParams")
public void testDagWithInsertUpsertAndValidate(boolean useDeltaStreamer, String tableType) throws Exception {
this.cleanDFSDirs();
String inputBasePath = dfsBasePath + "/input/" + UUID.randomUUID().toString();
String outputBasePath = dfsBasePath + "/result/" + UUID.randomUUID().toString();
String inputBasePath = basePath + "/input/" + UUID.randomUUID().toString();
String outputBasePath = basePath + "/result/" + UUID.randomUUID().toString();
HoodieTestSuiteConfig cfg = makeConfig(inputBasePath, outputBasePath, useDeltaStreamer, tableType);
cfg.workloadDagGenerator = ComplexDagGenerator.class.getName();
HoodieTestSuiteJob hoodieTestSuiteJob = new HoodieTestSuiteJob(cfg, jsc);
Expand All @@ -220,8 +220,8 @@ public void testHiveSync() throws Exception {
boolean useDeltaStreamer = false;
String tableType = "COPY_ON_WRITE";
this.cleanDFSDirs();
String inputBasePath = dfsBasePath + "/input";
String outputBasePath = dfsBasePath + "/result";
String inputBasePath = basePath + "/input";
String outputBasePath = basePath + "/result";
HoodieTestSuiteConfig cfg = makeConfig(inputBasePath, outputBasePath, useDeltaStreamer, tableType);
if (tableType == HoodieTableType.COPY_ON_WRITE.name()) {
cfg.workloadDagGenerator = HiveSyncDagGenerator.class.getName();
Expand All @@ -238,11 +238,11 @@ public void testHiveSync() throws Exception {
public void testCOWFullDagFromYaml() throws Exception {
boolean useDeltaStreamer = false;
this.cleanDFSDirs();
String inputBasePath = dfsBasePath + "/input";
String outputBasePath = dfsBasePath + "/result";
String inputBasePath = basePath + "/input";
String outputBasePath = basePath + "/result";
HoodieTestSuiteConfig cfg = makeConfig(inputBasePath, outputBasePath, useDeltaStreamer, HoodieTableType
.COPY_ON_WRITE.name());
cfg.workloadYamlPath = dfsBasePath + "/" + COW_DAG_FILE_NAME;
cfg.workloadYamlPath = basePath + "/" + COW_DAG_FILE_NAME;
HoodieTestSuiteJob hoodieTestSuiteJob = new HoodieTestSuiteJob(cfg, jsc);
hoodieTestSuiteJob.runTestSuite();
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(new Configuration()).setBasePath(cfg.targetBasePath).build();
Expand All @@ -253,11 +253,11 @@ public void testCOWFullDagFromYaml() throws Exception {
public void testMORFullDagFromYaml() throws Exception {
boolean useDeltaStreamer = false;
this.cleanDFSDirs();
String inputBasePath = dfsBasePath + "/input";
String outputBasePath = dfsBasePath + "/result";
String inputBasePath = basePath + "/input";
String outputBasePath = basePath + "/result";
HoodieTestSuiteConfig cfg = makeConfig(inputBasePath, outputBasePath, useDeltaStreamer, HoodieTableType
.MERGE_ON_READ.name());
cfg.workloadYamlPath = dfsBasePath + "/" + MOR_DAG_FILE_NAME;
cfg.workloadYamlPath = basePath + "/" + MOR_DAG_FILE_NAME;
HoodieTestSuiteJob hoodieTestSuiteJob = new HoodieTestSuiteJob(cfg, jsc);
hoodieTestSuiteJob.runTestSuite();
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(new Configuration()).setBasePath(cfg.targetBasePath).build();
Expand All @@ -272,13 +272,13 @@ public void testSparkDataSourceNodesDagWithLock() throws Exception {
TypedProperties props = getProperties();
props.setProperty("hoodie.write.concurrency.mode", "optimistic_concurrency_control");
props.setProperty("hoodie.failed.writes.cleaner.policy", "LAZY");
UtilitiesTestBase.Helpers.savePropsToDFS(props, dfs, dfsBasePath + "/test-source"
UtilitiesTestBase.Helpers.savePropsToDFS(props, fs, basePath + "/test-source"
+ ".properties");
String inputBasePath = dfsBasePath + "/input";
String outputBasePath = dfsBasePath + "/result";
String inputBasePath = basePath + "/input";
String outputBasePath = basePath + "/result";
HoodieTestSuiteConfig cfg = makeConfig(inputBasePath, outputBasePath, useDeltaStreamer, HoodieTableType
.COPY_ON_WRITE.name());
cfg.workloadYamlPath = dfsBasePath + "/" + COW_DAG_FILE_NAME_SPARK_DATASOURCE_NODES;
cfg.workloadYamlPath = basePath + "/" + COW_DAG_FILE_NAME_SPARK_DATASOURCE_NODES;
HoodieTestSuiteJob hoodieTestSuiteJob = new HoodieTestSuiteJob(cfg, jsc);
hoodieTestSuiteJob.runTestSuite();
HoodieTableMetaClient metaClient = HoodieTableMetaClient.builder().setConf(new Configuration()).setBasePath(cfg.targetBasePath).build();
Expand All @@ -289,11 +289,11 @@ public void testSparkDataSourceNodesDagWithLock() throws Exception {
public void testSparkSqlDag() throws Exception {
boolean useDeltaStreamer = false;
this.cleanDFSDirs();
String inputBasePath = dfsBasePath + "/input";
String outputBasePath = dfsBasePath + "/result";
String inputBasePath = basePath + "/input";
String outputBasePath = basePath + "/result";
HoodieTestSuiteConfig cfg = makeConfig(inputBasePath, outputBasePath, useDeltaStreamer, HoodieTableType
.COPY_ON_WRITE.name());
cfg.workloadYamlPath = dfsBasePath + "/" + SPARK_SQL_DAG_FILE_NAME;
cfg.workloadYamlPath = basePath + "/" + SPARK_SQL_DAG_FILE_NAME;
HoodieTestSuiteJob hoodieTestSuiteJob = new HoodieTestSuiteJob(cfg, jsc);
hoodieTestSuiteJob.runTestSuite();
}
Expand All @@ -307,7 +307,7 @@ protected HoodieTestSuiteConfig makeConfig(String inputBasePath, String outputBa
cfg.tableType = tableType;
cfg.sourceClassName = AvroDFSSource.class.getName();
cfg.sourceOrderingField = SchemaUtils.SOURCE_ORDERING_FIELD;
cfg.propsFilePath = dfsBasePath + "/test-source.properties";
cfg.propsFilePath = basePath + "/test-source.properties";
cfg.outputTypeName = DeltaOutputMode.DFS.name();
cfg.inputFormatName = DeltaInputType.AVRO.name();
cfg.limitFileSize = 1024 * 1024L;
Expand Down
Expand Up @@ -43,7 +43,7 @@ public class TestDFSAvroDeltaInputReader extends UtilitiesTestBase {

@BeforeAll
public static void initClass() throws Exception {
UtilitiesTestBase.initTestServices(false, false);
UtilitiesTestBase.initTestServices(true, false, false);
}

@AfterAll
Expand All @@ -59,12 +59,12 @@ public void setup() throws Exception {
@Test
@Disabled
public void testDFSSinkReader() throws IOException {
FileSystem fs = FSUtils.getFs(dfsBasePath, new Configuration());
FileSystem fs = FSUtils.getFs(basePath, new Configuration());
// Create 10 avro files with 10 records each
TestUtils.createAvroFiles(jsc, sparkSession, dfsBasePath, 10, 10);
FileStatus[] statuses = fs.globStatus(new Path(dfsBasePath + "/*/*.avro"));
TestUtils.createAvroFiles(jsc, sparkSession, basePath, 10, 10);
FileStatus[] statuses = fs.globStatus(new Path(basePath + "/*/*.avro"));
DFSAvroDeltaInputReader reader =
new DFSAvroDeltaInputReader(sparkSession, TestUtils.getSchema().toString(), dfsBasePath, Option.empty(),
new DFSAvroDeltaInputReader(sparkSession, TestUtils.getSchema().toString(), basePath, Option.empty(),
Option.empty());
assertEquals(reader.analyzeSingleFile(statuses[0].getPath().toString()), 5);
assertEquals(reader.read(100).count(), 100);
Expand Down
Expand Up @@ -51,7 +51,7 @@ public class TestDFSHoodieDatasetInputReader extends UtilitiesTestBase {

@BeforeAll
public static void initClass() throws Exception {
UtilitiesTestBase.initTestServices(false, false);
UtilitiesTestBase.initTestServices(true, false, false);
}

@AfterAll
Expand All @@ -62,7 +62,7 @@ public static void cleanupClass() {
@BeforeEach
public void setup() throws Exception {
super.setup();
HoodieTestUtils.init(jsc.hadoopConfiguration(), dfsBasePath);
HoodieTestUtils.init(jsc.hadoopConfiguration(), basePath);
}

@AfterEach
Expand Down Expand Up @@ -117,7 +117,7 @@ private HoodieWriteConfig makeHoodieClientConfig() throws Exception {

private HoodieWriteConfig.Builder makeHoodieClientConfigBuilder() throws Exception {
// Prepare the AvroParquetIO
return HoodieWriteConfig.newBuilder().withPath(dfsBasePath)
return HoodieWriteConfig.newBuilder().withPath(basePath)
.withParallelism(2, 2)
.withDeleteParallelism(2)
.withSchema(HoodieTestDataGenerator
Expand Down
Expand Up @@ -104,14 +104,14 @@ public class HoodieDeltaStreamerTestBase extends UtilitiesTestBase {

@BeforeAll
public static void initClass() throws Exception {
UtilitiesTestBase.initTestServices(true, true);
PARQUET_SOURCE_ROOT = dfsBasePath + "/parquetFiles";
ORC_SOURCE_ROOT = dfsBasePath + "/orcFiles";
JSON_KAFKA_SOURCE_ROOT = dfsBasePath + "/jsonKafkaFiles";
UtilitiesTestBase.initTestServices(false, true, true);
PARQUET_SOURCE_ROOT = basePath + "/parquetFiles";
ORC_SOURCE_ROOT = basePath + "/orcFiles";
JSON_KAFKA_SOURCE_ROOT = basePath + "/jsonKafkaFiles";
testUtils = new KafkaTestUtils();
testUtils.setup();
topicName = "topic" + testNum;
prepareInitialConfigs(dfs, dfsBasePath, testUtils.brokerAddress());
prepareInitialConfigs(fs, basePath, testUtils.brokerAddress());

prepareParquetDFSFiles(PARQUET_NUM_RECORDS, PARQUET_SOURCE_ROOT);
prepareORCDFSFiles(ORC_NUM_RECORDS, ORC_SOURCE_ROOT);
Expand Down