From 1875b16b569eca1df0cb33818110d0b91763f183 Mon Sep 17 00:00:00 2001 From: Y Ethan Guo Date: Thu, 16 Jan 2020 23:00:06 -0800 Subject: [PATCH 1/5] [MINOR] Abstract a test case class for DFS Source to make it extensible --- .../hudi/utilities/sources/TestDFSSource.java | 256 ++++++++++-------- 1 file changed, 148 insertions(+), 108 deletions(-) diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestDFSSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestDFSSource.java index f7ac61f28b2a6..3128ca924cab6 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestDFSSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestDFSSource.java @@ -20,6 +20,7 @@ import org.apache.hudi.AvroConversionUtils; import org.apache.hudi.common.HoodieTestDataGenerator; +import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.util.Option; import org.apache.hudi.common.util.TypedProperties; import org.apache.hudi.utilities.UtilitiesTestBase; @@ -34,7 +35,7 @@ import org.apache.spark.api.java.JavaRDD; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; -import org.apache.spark.sql.SQLContext; +import org.apache.spark.sql.SparkSession; import org.junit.After; import org.junit.AfterClass; import org.junit.Before; @@ -77,118 +78,157 @@ public void teardown() throws Exception { @Test public void testJsonDFSSource() throws IOException { - dfs.mkdirs(new Path(dfsBasePath + "/jsonFiles")); - HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); - - TypedProperties props = new TypedProperties(); - props.setProperty("hoodie.deltastreamer.source.dfs.root", dfsBasePath + "/jsonFiles"); - JsonDFSSource jsonDFSSource = new JsonDFSSource(props, jsc, sparkSession, schemaProvider); - SourceFormatAdapter jsonSource = new SourceFormatAdapter(jsonDFSSource); - - // 1. Extract without any checkpoint => get all the data, respecting sourceLimit - assertEquals(Option.empty(), jsonSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE).getBatch()); - UtilitiesTestBase.Helpers.saveStringsToDFS(Helpers.jsonifyRecords(dataGenerator.generateInserts("000", 100)), dfs, - dfsBasePath + "/jsonFiles/1.json"); - // Test respecting sourceLimit - int sourceLimit = 10; - RemoteIterator files = dfs.listFiles(new Path(dfsBasePath + "/jsonFiles/1.json"), true); - FileStatus file1Status = files.next(); - assertTrue(file1Status.getLen() > sourceLimit); - assertEquals(Option.empty(), jsonSource.fetchNewDataInAvroFormat(Option.empty(), sourceLimit).getBatch()); - // Test json -> Avro - InputBatch> fetch1 = jsonSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE); - assertEquals(100, fetch1.getBatch().get().count()); - // Test json -> Row format - InputBatch> fetch1AsRows = jsonSource.fetchNewDataInRowFormat(Option.empty(), Long.MAX_VALUE); - assertEquals(100, fetch1AsRows.getBatch().get().count()); - // Test Avro -> Row format - Dataset fetch1Rows = AvroConversionUtils.createDataFrame(JavaRDD.toRDD(fetch1.getBatch().get()), - schemaProvider.getSourceSchema().toString(), jsonDFSSource.getSparkSession()); - assertEquals(100, fetch1Rows.count()); - - // 2. Produce new data, extract new data - UtilitiesTestBase.Helpers.saveStringsToDFS(Helpers.jsonifyRecords(dataGenerator.generateInserts("001", 10000)), dfs, - dfsBasePath + "/jsonFiles/2.json"); - InputBatch> fetch2 = - jsonSource.fetchNewDataInRowFormat(Option.of(fetch1.getCheckpointForNextBatch()), Long.MAX_VALUE); - assertEquals(10000, fetch2.getBatch().get().count()); - - // 3. Extract with previous checkpoint => gives same data back (idempotent) - InputBatch> fetch3 = - jsonSource.fetchNewDataInRowFormat(Option.of(fetch1.getCheckpointForNextBatch()), Long.MAX_VALUE); - assertEquals(10000, fetch3.getBatch().get().count()); - assertEquals(fetch2.getCheckpointForNextBatch(), fetch3.getCheckpointForNextBatch()); - fetch3.getBatch().get().registerTempTable("test_dfs_table"); - Dataset rowDataset = new SQLContext(jsc.sc()).sql("select * from test_dfs_table"); - assertEquals(10000, rowDataset.count()); - - // 4. Extract with latest checkpoint => no new data returned - InputBatch> fetch4 = - jsonSource.fetchNewDataInAvroFormat(Option.of(fetch2.getCheckpointForNextBatch()), Long.MAX_VALUE); - assertEquals(Option.empty(), fetch4.getBatch()); - - // 5. Extract from the beginning - InputBatch> fetch5 = jsonSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE); - assertEquals(10100, fetch5.getBatch().get().count()); + new DFSSourceTestCase(dfsBasePath + "/jsonFiles", ".json") { + @Override + Source prepareDFSSource() { + TypedProperties props = new TypedProperties(); + props.setProperty("hoodie.deltastreamer.source.dfs.root", dfsRoot); + return new JsonDFSSource(props, jsc, sparkSession, schemaProvider); + } + + @Override + void writeNewDataToFile(List records, Path path) throws IOException { + UtilitiesTestBase.Helpers.saveStringsToDFS( + Helpers.jsonifyRecords(records), dfs, path.toString()); + } + }.run(); } @Test public void testParquetDFSSource() throws IOException { - dfs.mkdirs(new Path(dfsBasePath + "/parquetFiles")); + new DFSSourceTestCase(dfsBasePath + "/parquetFiles", ".parquet") { + @Override + Source prepareDFSSource() { + TypedProperties props = new TypedProperties(); + props.setProperty("hoodie.deltastreamer.source.dfs.root", dfsRoot); + return new ParquetDFSSource(props, jsc, sparkSession, schemaProvider); + } + + @Override + void writeNewDataToFile(List records, Path path) throws IOException { + Helpers.saveParquetToDFS(Helpers.toGenericRecords(records, dataGenerator), path); + } + }.run(); + } + + /** + * An abstract test case for {@link Source} using DFS as the file system. + */ + abstract class DFSSourceTestCase { + + String dfsRoot; + String fileSuffix; + int fileCount = 1; HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); - TypedProperties props = new TypedProperties(); - props.setProperty("hoodie.deltastreamer.source.dfs.root", dfsBasePath + "/parquetFiles"); - ParquetDFSSource parquetDFSSource = new ParquetDFSSource(props, jsc, sparkSession, schemaProvider); - SourceFormatAdapter parquetSource = new SourceFormatAdapter(parquetDFSSource); - - // 1. Extract without any checkpoint => get all the data, respecting sourceLimit - assertEquals(Option.empty(), parquetSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE).getBatch()); - List batch1 = Helpers.toGenericRecords(dataGenerator.generateInserts("000", 100), dataGenerator); - Path file1 = new Path(dfsBasePath + "/parquetFiles", "1.parquet"); - Helpers.saveParquetToDFS(batch1, file1); - // Test respecting sourceLimit - int sourceLimit = 10; - RemoteIterator files = dfs.listFiles(file1, true); - FileStatus file1Status = files.next(); - assertTrue(file1Status.getLen() > sourceLimit); - assertEquals(Option.empty(), parquetSource.fetchNewDataInAvroFormat(Option.empty(), sourceLimit).getBatch()); - // Test parquet -> Avro - InputBatch> fetch1 = parquetSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE); - assertEquals(100, fetch1.getBatch().get().count()); - // Test parquet -> Row - InputBatch> fetch1AsRows = parquetSource.fetchNewDataInRowFormat(Option.empty(), Long.MAX_VALUE); - assertEquals(100, fetch1AsRows.getBatch().get().count()); - - // 2. Produce new data, extract new data - List batch2 = Helpers.toGenericRecords(dataGenerator.generateInserts("001", 10000), dataGenerator); - Path file2 = new Path(dfsBasePath + "/parquetFiles", "2.parquet"); - Helpers.saveParquetToDFS(batch2, file2); - // Test parquet -> Avro - InputBatch> fetch2 = - parquetSource.fetchNewDataInAvroFormat(Option.of(fetch1.getCheckpointForNextBatch()), Long.MAX_VALUE); - assertEquals(10000, fetch2.getBatch().get().count()); - // Test parquet -> Row - InputBatch> fetch2AsRows = - parquetSource.fetchNewDataInRowFormat(Option.of(fetch1AsRows.getCheckpointForNextBatch()), Long.MAX_VALUE); - assertEquals(10000, fetch2AsRows.getBatch().get().count()); - - // 3. Extract with previous checkpoint => gives same data back (idempotent) - InputBatch> fetch3AsRows = - parquetSource.fetchNewDataInRowFormat(Option.of(fetch1AsRows.getCheckpointForNextBatch()), Long.MAX_VALUE); - assertEquals(10000, fetch3AsRows.getBatch().get().count()); - assertEquals(fetch2AsRows.getCheckpointForNextBatch(), fetch3AsRows.getCheckpointForNextBatch()); - fetch3AsRows.getBatch().get().registerTempTable("test_dfs_table"); - Dataset rowDataset = new SQLContext(jsc.sc()).sql("select * from test_dfs_table"); - assertEquals(10000, rowDataset.count()); - - // 4. Extract with latest checkpoint => no new data returned - InputBatch> fetch4 = - parquetSource.fetchNewDataInAvroFormat(Option.of(fetch2.getCheckpointForNextBatch()), Long.MAX_VALUE); - assertEquals(Option.empty(), fetch4.getBatch()); - - // 5. Extract from the beginning - InputBatch> fetch5 = parquetSource.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE); - assertEquals(10100, fetch5.getBatch().get().count()); + DFSSourceTestCase(String dfsRoot, String fileSuffix) { + this.dfsRoot = dfsRoot; + this.fileSuffix = fileSuffix; + } + + /** + * Prepares the specific {@link Source} to test, by passing in necessary configurations. + * + * @return A {@link Source} using DFS as the file system. + */ + abstract Source prepareDFSSource(); + + /** + * Writes test data, i.e., a {@link List} of {@link HoodieRecord}, to a file on DFS. + * + * @param records Test data. + * @param path The path in {@link Path} of the file to write. + * @throws IOException + */ + abstract void writeNewDataToFile(List records, Path path) throws IOException; + + /** + * Generates a batch of test data and writes the data to a file. This can be called multiple + * times to generate multiple files. + * + * @return The {@link Path} of the file. + * @throws IOException + */ + Path generateOneFile() throws IOException { + Path path = new Path(dfsRoot, fileCount + fileSuffix); + switch (fileCount) { + case 1: + writeNewDataToFile(dataGenerator.generateInserts("000", 100), path); + fileCount++; + return path; + case 2: + writeNewDataToFile(dataGenerator.generateInserts("001", 10000), path); + fileCount++; + return path; + default: + return null; + } + } + + /** + * Runs the test scenario. + * + * @throws IOException + */ + void run() throws IOException { + dfs.mkdirs(new Path(dfsRoot)); + SourceFormatAdapter sourceFormatAdapter = new SourceFormatAdapter(prepareDFSSource()); + + // 1. Extract without any checkpoint => get all the data, respecting sourceLimit + assertEquals(Option.empty(), + sourceFormatAdapter.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE).getBatch()); + // Test respecting sourceLimit + int sourceLimit = 10; + RemoteIterator files = dfs.listFiles(generateOneFile(), true); + FileStatus file1Status = files.next(); + assertTrue(file1Status.getLen() > sourceLimit); + assertEquals(Option.empty(), + sourceFormatAdapter.fetchNewDataInAvroFormat(Option.empty(), sourceLimit).getBatch()); + // Test fetching Avro format + InputBatch> fetch1 = + sourceFormatAdapter.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE); + assertEquals(100, fetch1.getBatch().get().count()); + // Test fetching Row format + InputBatch> fetch1AsRows = + sourceFormatAdapter.fetchNewDataInRowFormat(Option.empty(), Long.MAX_VALUE); + assertEquals(100, fetch1AsRows.getBatch().get().count()); + // Test Avro to Row format + Dataset fetch1Rows = AvroConversionUtils + .createDataFrame(JavaRDD.toRDD(fetch1.getBatch().get()), + schemaProvider.getSourceSchema().toString(), sparkSession); + assertEquals(100, fetch1Rows.count()); + + // 2. Produce new data, extract new data + generateOneFile(); + // Test fetching Avro format + InputBatch> fetch2 = sourceFormatAdapter.fetchNewDataInAvroFormat( + Option.of(fetch1.getCheckpointForNextBatch()), Long.MAX_VALUE); + assertEquals(10000, fetch2.getBatch().get().count()); + // Test fetching Row format + InputBatch> fetch2AsRows = sourceFormatAdapter.fetchNewDataInRowFormat( + Option.of(fetch1AsRows.getCheckpointForNextBatch()), Long.MAX_VALUE); + assertEquals(10000, fetch2AsRows.getBatch().get().count()); + + // 3. Extract with previous checkpoint => gives same data back (idempotent) + InputBatch> fetch3AsRows = sourceFormatAdapter.fetchNewDataInRowFormat( + Option.of(fetch1AsRows.getCheckpointForNextBatch()), Long.MAX_VALUE); + assertEquals(10000, fetch3AsRows.getBatch().get().count()); + assertEquals(fetch2AsRows.getCheckpointForNextBatch(), + fetch3AsRows.getCheckpointForNextBatch()); + fetch3AsRows.getBatch().get().createOrReplaceTempView("test_dfs_table"); + Dataset rowDataset = SparkSession.builder().sparkContext(jsc.sc()).getOrCreate() + .sql("select * from test_dfs_table"); + assertEquals(10000, rowDataset.count()); + + // 4. Extract with latest checkpoint => no new data returned + InputBatch> fetch4 = sourceFormatAdapter.fetchNewDataInAvroFormat( + Option.of(fetch2.getCheckpointForNextBatch()), Long.MAX_VALUE); + assertEquals(Option.empty(), fetch4.getBatch()); + + // 5. Extract from the beginning + InputBatch> fetch5 = sourceFormatAdapter.fetchNewDataInAvroFormat( + Option.empty(), Long.MAX_VALUE); + assertEquals(10100, fetch5.getBatch().get().count()); + } } } From 4e2b8ead8d1a1fc024040cc75fcd842c9ef268e6 Mon Sep 17 00:00:00 2001 From: Y Ethan Guo Date: Fri, 17 Jan 2020 22:35:56 -0800 Subject: [PATCH 2/5] Rearrange the test classes of DFS Source. --- .../sources/AbstractDFSSourceTestBase.java | 185 ++++++++++++++ .../hudi/utilities/sources/TestDFSSource.java | 234 ------------------ .../utilities/sources/TestJsonDFSSource.java | 53 ++++ .../sources/TestParquetDFSSource.java | 51 ++++ 4 files changed, 289 insertions(+), 234 deletions(-) create mode 100644 hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/AbstractDFSSourceTestBase.java delete mode 100644 hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestDFSSource.java create mode 100644 hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonDFSSource.java create mode 100644 hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestParquetDFSSource.java diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/AbstractDFSSourceTestBase.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/AbstractDFSSourceTestBase.java new file mode 100644 index 0000000000000..841373b239a39 --- /dev/null +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/AbstractDFSSourceTestBase.java @@ -0,0 +1,185 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.sources; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + +import java.io.IOException; +import java.util.List; +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; +import org.apache.hudi.AvroConversionUtils; +import org.apache.hudi.common.HoodieTestDataGenerator; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.util.Option; +import org.apache.hudi.utilities.UtilitiesTestBase; +import org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter; +import org.apache.hudi.utilities.schema.FilebasedSchemaProvider; +import org.apache.spark.api.java.JavaRDD; +import org.apache.spark.sql.Dataset; +import org.apache.spark.sql.Row; +import org.apache.spark.sql.SparkSession; +import org.junit.After; +import org.junit.AfterClass; +import org.junit.Before; +import org.junit.BeforeClass; +import org.junit.Test; + +/** + * An abstract test base for {@link Source} using DFS as the file system. + */ +public abstract class AbstractDFSSourceTestBase extends UtilitiesTestBase { + + FilebasedSchemaProvider schemaProvider; + String dfsRoot; + String fileSuffix; + int fileCount = 1; + HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); + + @BeforeClass + public static void initClass() throws Exception { + UtilitiesTestBase.initClass(); + } + + @AfterClass + public static void cleanupClass() throws Exception { + UtilitiesTestBase.cleanupClass(); + } + + @Before + public void setup() throws Exception { + super.setup(); + schemaProvider = new FilebasedSchemaProvider(Helpers.setupSchemaOnDFS(), jsc); + } + + @After + public void teardown() throws Exception { + super.teardown(); + } + + /** + * Prepares the specific {@link Source} to test, by passing in necessary configurations. + * + * @return A {@link Source} using DFS as the file system. + */ + abstract Source prepareDFSSource(); + + /** + * Writes test data, i.e., a {@link List} of {@link HoodieRecord}, to a file on DFS. + * + * @param records Test data. + * @param path The path in {@link Path} of the file to write. + * @throws IOException + */ + abstract void writeNewDataToFile(List records, Path path) throws IOException; + + /** + * Generates a batch of test data and writes the data to a file. This can be called multiple + * times to generate multiple files. + * + * @return The {@link Path} of the file. + * @throws IOException + */ + Path generateOneFile() throws IOException { + Path path = new Path(dfsRoot, fileCount + fileSuffix); + switch (fileCount) { + case 1: + writeNewDataToFile(dataGenerator.generateInserts("000", 100), path); + fileCount++; + return path; + case 2: + writeNewDataToFile(dataGenerator.generateInserts("001", 10000), path); + fileCount++; + return path; + default: + return null; + } + } + + /** + * Runs the test scenario of reading data from the source. + * + * @throws IOException + */ + @Test + public void testReadingFromSource() throws IOException { + dfs.mkdirs(new Path(dfsRoot)); + SourceFormatAdapter sourceFormatAdapter = new SourceFormatAdapter(prepareDFSSource()); + + // 1. Extract without any checkpoint => get all the data, respecting sourceLimit + assertEquals(Option.empty(), + sourceFormatAdapter.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE).getBatch()); + // Test respecting sourceLimit + int sourceLimit = 10; + RemoteIterator files = dfs.listFiles(generateOneFile(), true); + FileStatus file1Status = files.next(); + assertTrue(file1Status.getLen() > sourceLimit); + assertEquals(Option.empty(), + sourceFormatAdapter.fetchNewDataInAvroFormat(Option.empty(), sourceLimit).getBatch()); + // Test fetching Avro format + InputBatch> fetch1 = + sourceFormatAdapter.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE); + assertEquals(100, fetch1.getBatch().get().count()); + // Test fetching Row format + InputBatch> fetch1AsRows = + sourceFormatAdapter.fetchNewDataInRowFormat(Option.empty(), Long.MAX_VALUE); + assertEquals(100, fetch1AsRows.getBatch().get().count()); + // Test Avro to Row format + Dataset fetch1Rows = AvroConversionUtils + .createDataFrame(JavaRDD.toRDD(fetch1.getBatch().get()), + schemaProvider.getSourceSchema().toString(), sparkSession); + assertEquals(100, fetch1Rows.count()); + + // 2. Produce new data, extract new data + generateOneFile(); + // Test fetching Avro format + InputBatch> fetch2 = sourceFormatAdapter.fetchNewDataInAvroFormat( + Option.of(fetch1.getCheckpointForNextBatch()), Long.MAX_VALUE); + assertEquals(10000, fetch2.getBatch().get().count()); + // Test fetching Row format + InputBatch> fetch2AsRows = sourceFormatAdapter.fetchNewDataInRowFormat( + Option.of(fetch1AsRows.getCheckpointForNextBatch()), Long.MAX_VALUE); + assertEquals(10000, fetch2AsRows.getBatch().get().count()); + + // 3. Extract with previous checkpoint => gives same data back (idempotent) + InputBatch> fetch3AsRows = sourceFormatAdapter.fetchNewDataInRowFormat( + Option.of(fetch1AsRows.getCheckpointForNextBatch()), Long.MAX_VALUE); + assertEquals(10000, fetch3AsRows.getBatch().get().count()); + assertEquals(fetch2AsRows.getCheckpointForNextBatch(), + fetch3AsRows.getCheckpointForNextBatch()); + fetch3AsRows.getBatch().get().createOrReplaceTempView("test_dfs_table"); + Dataset rowDataset = SparkSession.builder().sparkContext(jsc.sc()).getOrCreate() + .sql("select * from test_dfs_table"); + assertEquals(10000, rowDataset.count()); + + // 4. Extract with latest checkpoint => no new data returned + InputBatch> fetch4 = sourceFormatAdapter.fetchNewDataInAvroFormat( + Option.of(fetch2.getCheckpointForNextBatch()), Long.MAX_VALUE); + assertEquals(Option.empty(), fetch4.getBatch()); + + // 5. Extract from the beginning + InputBatch> fetch5 = sourceFormatAdapter.fetchNewDataInAvroFormat( + Option.empty(), Long.MAX_VALUE); + assertEquals(10100, fetch5.getBatch().get().count()); + } +} diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestDFSSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestDFSSource.java deleted file mode 100644 index 3128ca924cab6..0000000000000 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestDFSSource.java +++ /dev/null @@ -1,234 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.hudi.utilities.sources; - -import org.apache.hudi.AvroConversionUtils; -import org.apache.hudi.common.HoodieTestDataGenerator; -import org.apache.hudi.common.model.HoodieRecord; -import org.apache.hudi.common.util.Option; -import org.apache.hudi.common.util.TypedProperties; -import org.apache.hudi.utilities.UtilitiesTestBase; -import org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter; -import org.apache.hudi.utilities.schema.FilebasedSchemaProvider; - -import org.apache.avro.generic.GenericRecord; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.LocatedFileStatus; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.RemoteIterator; -import org.apache.spark.api.java.JavaRDD; -import org.apache.spark.sql.Dataset; -import org.apache.spark.sql.Row; -import org.apache.spark.sql.SparkSession; -import org.junit.After; -import org.junit.AfterClass; -import org.junit.Before; -import org.junit.BeforeClass; -import org.junit.Test; - -import java.io.IOException; -import java.util.List; - -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -/** - * Basic tests against all subclasses of {@link JsonDFSSource} and {@link ParquetDFSSource}. - */ -public class TestDFSSource extends UtilitiesTestBase { - - private FilebasedSchemaProvider schemaProvider; - - @BeforeClass - public static void initClass() throws Exception { - UtilitiesTestBase.initClass(); - } - - @AfterClass - public static void cleanupClass() throws Exception { - UtilitiesTestBase.cleanupClass(); - } - - @Before - public void setup() throws Exception { - super.setup(); - schemaProvider = new FilebasedSchemaProvider(Helpers.setupSchemaOnDFS(), jsc); - } - - @After - public void teardown() throws Exception { - super.teardown(); - } - - @Test - public void testJsonDFSSource() throws IOException { - new DFSSourceTestCase(dfsBasePath + "/jsonFiles", ".json") { - @Override - Source prepareDFSSource() { - TypedProperties props = new TypedProperties(); - props.setProperty("hoodie.deltastreamer.source.dfs.root", dfsRoot); - return new JsonDFSSource(props, jsc, sparkSession, schemaProvider); - } - - @Override - void writeNewDataToFile(List records, Path path) throws IOException { - UtilitiesTestBase.Helpers.saveStringsToDFS( - Helpers.jsonifyRecords(records), dfs, path.toString()); - } - }.run(); - } - - @Test - public void testParquetDFSSource() throws IOException { - new DFSSourceTestCase(dfsBasePath + "/parquetFiles", ".parquet") { - @Override - Source prepareDFSSource() { - TypedProperties props = new TypedProperties(); - props.setProperty("hoodie.deltastreamer.source.dfs.root", dfsRoot); - return new ParquetDFSSource(props, jsc, sparkSession, schemaProvider); - } - - @Override - void writeNewDataToFile(List records, Path path) throws IOException { - Helpers.saveParquetToDFS(Helpers.toGenericRecords(records, dataGenerator), path); - } - }.run(); - } - - /** - * An abstract test case for {@link Source} using DFS as the file system. - */ - abstract class DFSSourceTestCase { - - String dfsRoot; - String fileSuffix; - int fileCount = 1; - HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); - - DFSSourceTestCase(String dfsRoot, String fileSuffix) { - this.dfsRoot = dfsRoot; - this.fileSuffix = fileSuffix; - } - - /** - * Prepares the specific {@link Source} to test, by passing in necessary configurations. - * - * @return A {@link Source} using DFS as the file system. - */ - abstract Source prepareDFSSource(); - - /** - * Writes test data, i.e., a {@link List} of {@link HoodieRecord}, to a file on DFS. - * - * @param records Test data. - * @param path The path in {@link Path} of the file to write. - * @throws IOException - */ - abstract void writeNewDataToFile(List records, Path path) throws IOException; - - /** - * Generates a batch of test data and writes the data to a file. This can be called multiple - * times to generate multiple files. - * - * @return The {@link Path} of the file. - * @throws IOException - */ - Path generateOneFile() throws IOException { - Path path = new Path(dfsRoot, fileCount + fileSuffix); - switch (fileCount) { - case 1: - writeNewDataToFile(dataGenerator.generateInserts("000", 100), path); - fileCount++; - return path; - case 2: - writeNewDataToFile(dataGenerator.generateInserts("001", 10000), path); - fileCount++; - return path; - default: - return null; - } - } - - /** - * Runs the test scenario. - * - * @throws IOException - */ - void run() throws IOException { - dfs.mkdirs(new Path(dfsRoot)); - SourceFormatAdapter sourceFormatAdapter = new SourceFormatAdapter(prepareDFSSource()); - - // 1. Extract without any checkpoint => get all the data, respecting sourceLimit - assertEquals(Option.empty(), - sourceFormatAdapter.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE).getBatch()); - // Test respecting sourceLimit - int sourceLimit = 10; - RemoteIterator files = dfs.listFiles(generateOneFile(), true); - FileStatus file1Status = files.next(); - assertTrue(file1Status.getLen() > sourceLimit); - assertEquals(Option.empty(), - sourceFormatAdapter.fetchNewDataInAvroFormat(Option.empty(), sourceLimit).getBatch()); - // Test fetching Avro format - InputBatch> fetch1 = - sourceFormatAdapter.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE); - assertEquals(100, fetch1.getBatch().get().count()); - // Test fetching Row format - InputBatch> fetch1AsRows = - sourceFormatAdapter.fetchNewDataInRowFormat(Option.empty(), Long.MAX_VALUE); - assertEquals(100, fetch1AsRows.getBatch().get().count()); - // Test Avro to Row format - Dataset fetch1Rows = AvroConversionUtils - .createDataFrame(JavaRDD.toRDD(fetch1.getBatch().get()), - schemaProvider.getSourceSchema().toString(), sparkSession); - assertEquals(100, fetch1Rows.count()); - - // 2. Produce new data, extract new data - generateOneFile(); - // Test fetching Avro format - InputBatch> fetch2 = sourceFormatAdapter.fetchNewDataInAvroFormat( - Option.of(fetch1.getCheckpointForNextBatch()), Long.MAX_VALUE); - assertEquals(10000, fetch2.getBatch().get().count()); - // Test fetching Row format - InputBatch> fetch2AsRows = sourceFormatAdapter.fetchNewDataInRowFormat( - Option.of(fetch1AsRows.getCheckpointForNextBatch()), Long.MAX_VALUE); - assertEquals(10000, fetch2AsRows.getBatch().get().count()); - - // 3. Extract with previous checkpoint => gives same data back (idempotent) - InputBatch> fetch3AsRows = sourceFormatAdapter.fetchNewDataInRowFormat( - Option.of(fetch1AsRows.getCheckpointForNextBatch()), Long.MAX_VALUE); - assertEquals(10000, fetch3AsRows.getBatch().get().count()); - assertEquals(fetch2AsRows.getCheckpointForNextBatch(), - fetch3AsRows.getCheckpointForNextBatch()); - fetch3AsRows.getBatch().get().createOrReplaceTempView("test_dfs_table"); - Dataset rowDataset = SparkSession.builder().sparkContext(jsc.sc()).getOrCreate() - .sql("select * from test_dfs_table"); - assertEquals(10000, rowDataset.count()); - - // 4. Extract with latest checkpoint => no new data returned - InputBatch> fetch4 = sourceFormatAdapter.fetchNewDataInAvroFormat( - Option.of(fetch2.getCheckpointForNextBatch()), Long.MAX_VALUE); - assertEquals(Option.empty(), fetch4.getBatch()); - - // 5. Extract from the beginning - InputBatch> fetch5 = sourceFormatAdapter.fetchNewDataInAvroFormat( - Option.empty(), Long.MAX_VALUE); - assertEquals(10100, fetch5.getBatch().get().count()); - } - } -} diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonDFSSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonDFSSource.java new file mode 100644 index 0000000000000..2a682e45e167a --- /dev/null +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonDFSSource.java @@ -0,0 +1,53 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.sources; + +import java.io.IOException; +import java.util.List; +import org.apache.hadoop.fs.Path; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.util.TypedProperties; +import org.apache.hudi.utilities.UtilitiesTestBase; +import org.junit.Before; + +/** + * Basic tests for {@link JsonDFSSource}. + */ +public class TestJsonDFSSource extends AbstractDFSSourceTestBase { + + @Before + public void setup() throws Exception { + super.setup(); + this.dfsRoot = dfsBasePath + "/jsonFiles"; + this.fileSuffix = ".json"; + } + + @Override + Source prepareDFSSource() { + TypedProperties props = new TypedProperties(); + props.setProperty("hoodie.deltastreamer.source.dfs.root", dfsRoot); + return new JsonDFSSource(props, jsc, sparkSession, schemaProvider); + } + + @Override + void writeNewDataToFile(List records, Path path) throws IOException { + UtilitiesTestBase.Helpers.saveStringsToDFS( + Helpers.jsonifyRecords(records), dfs, path.toString()); + } +} diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestParquetDFSSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestParquetDFSSource.java new file mode 100644 index 0000000000000..c76f2bcf73dfc --- /dev/null +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestParquetDFSSource.java @@ -0,0 +1,51 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.hudi.utilities.sources; + +import java.io.IOException; +import java.util.List; +import org.apache.hadoop.fs.Path; +import org.apache.hudi.common.model.HoodieRecord; +import org.apache.hudi.common.util.TypedProperties; +import org.junit.Before; + +/** + * Basic tests for {@link ParquetDFSSource}. + */ +public class TestParquetDFSSource extends AbstractDFSSourceTestBase { + + @Before + public void setup() throws Exception { + super.setup(); + this.dfsRoot = dfsBasePath + "/parquetFiles"; + this.fileSuffix = ".parquet"; + } + + @Override + Source prepareDFSSource() { + TypedProperties props = new TypedProperties(); + props.setProperty("hoodie.deltastreamer.source.dfs.root", dfsRoot); + return new ParquetDFSSource(props, jsc, sparkSession, schemaProvider); + } + + @Override + void writeNewDataToFile(List records, Path path) throws IOException { + Helpers.saveParquetToDFS(Helpers.toGenericRecords(records, dataGenerator), path); + } +} From 54032670a865e1498fa266648e3397f8f6908694 Mon Sep 17 00:00:00 2001 From: Y Ethan Guo Date: Fri, 17 Jan 2020 23:09:29 -0800 Subject: [PATCH 3/5] Fix import ordering. --- .../sources/AbstractDFSSourceTestBase.java | 25 ++++++++++--------- .../utilities/sources/TestJsonDFSSource.java | 8 +++--- .../sources/TestParquetDFSSource.java | 8 +++--- 3 files changed, 23 insertions(+), 18 deletions(-) diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/AbstractDFSSourceTestBase.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/AbstractDFSSourceTestBase.java index 841373b239a39..127abe1d59abc 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/AbstractDFSSourceTestBase.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/AbstractDFSSourceTestBase.java @@ -18,16 +18,6 @@ package org.apache.hudi.utilities.sources; -import static org.junit.Assert.assertEquals; -import static org.junit.Assert.assertTrue; - -import java.io.IOException; -import java.util.List; -import org.apache.avro.generic.GenericRecord; -import org.apache.hadoop.fs.FileStatus; -import org.apache.hadoop.fs.LocatedFileStatus; -import org.apache.hadoop.fs.Path; -import org.apache.hadoop.fs.RemoteIterator; import org.apache.hudi.AvroConversionUtils; import org.apache.hudi.common.HoodieTestDataGenerator; import org.apache.hudi.common.model.HoodieRecord; @@ -35,6 +25,12 @@ import org.apache.hudi.utilities.UtilitiesTestBase; import org.apache.hudi.utilities.deltastreamer.SourceFormatAdapter; import org.apache.hudi.utilities.schema.FilebasedSchemaProvider; + +import org.apache.avro.generic.GenericRecord; +import org.apache.hadoop.fs.FileStatus; +import org.apache.hadoop.fs.LocatedFileStatus; +import org.apache.hadoop.fs.Path; +import org.apache.hadoop.fs.RemoteIterator; import org.apache.spark.api.java.JavaRDD; import org.apache.spark.sql.Dataset; import org.apache.spark.sql.Row; @@ -45,6 +41,12 @@ import org.junit.BeforeClass; import org.junit.Test; +import java.io.IOException; +import java.util.List; + +import static org.junit.Assert.assertEquals; +import static org.junit.Assert.assertTrue; + /** * An abstract test base for {@link Source} using DFS as the file system. */ @@ -94,8 +96,7 @@ public void teardown() throws Exception { abstract void writeNewDataToFile(List records, Path path) throws IOException; /** - * Generates a batch of test data and writes the data to a file. This can be called multiple - * times to generate multiple files. + * Generates a batch of test data and writes the data to a file. This can be called multiple times to generate multiple files. * * @return The {@link Path} of the file. * @throws IOException diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonDFSSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonDFSSource.java index 2a682e45e167a..3cdae7aa78940 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonDFSSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestJsonDFSSource.java @@ -18,14 +18,16 @@ package org.apache.hudi.utilities.sources; -import java.io.IOException; -import java.util.List; -import org.apache.hadoop.fs.Path; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.util.TypedProperties; import org.apache.hudi.utilities.UtilitiesTestBase; + +import org.apache.hadoop.fs.Path; import org.junit.Before; +import java.io.IOException; +import java.util.List; + /** * Basic tests for {@link JsonDFSSource}. */ diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestParquetDFSSource.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestParquetDFSSource.java index c76f2bcf73dfc..6d8d7001a1f54 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestParquetDFSSource.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/TestParquetDFSSource.java @@ -18,13 +18,15 @@ package org.apache.hudi.utilities.sources; -import java.io.IOException; -import java.util.List; -import org.apache.hadoop.fs.Path; import org.apache.hudi.common.model.HoodieRecord; import org.apache.hudi.common.util.TypedProperties; + +import org.apache.hadoop.fs.Path; import org.junit.Before; +import java.io.IOException; +import java.util.List; + /** * Basic tests for {@link ParquetDFSSource}. */ From 4a589e984a37b49406b8372a31e08f80b8c981cc Mon Sep 17 00:00:00 2001 From: Y Ethan Guo Date: Sat, 18 Jan 2020 21:50:14 -0800 Subject: [PATCH 4/5] Simplify the generateOneFile() method. --- .../sources/AbstractDFSSourceTestBase.java | 32 ++++++++----------- 1 file changed, 13 insertions(+), 19 deletions(-) diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/AbstractDFSSourceTestBase.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/AbstractDFSSourceTestBase.java index 127abe1d59abc..6568ea908c299 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/AbstractDFSSourceTestBase.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/AbstractDFSSourceTestBase.java @@ -55,7 +55,6 @@ public abstract class AbstractDFSSourceTestBase extends UtilitiesTestBase { FilebasedSchemaProvider schemaProvider; String dfsRoot; String fileSuffix; - int fileCount = 1; HoodieTestDataGenerator dataGenerator = new HoodieTestDataGenerator(); @BeforeClass @@ -95,26 +94,21 @@ public void teardown() throws Exception { */ abstract void writeNewDataToFile(List records, Path path) throws IOException; + // Generates a batch of test data and writes the data to a file. + /** - * Generates a batch of test data and writes the data to a file. This can be called multiple times to generate multiple files. + * Generates a batch of test data and writes the data to a file. * - * @return The {@link Path} of the file. + * @param filename The name of the file. + * @param commitTime The commit time. + * @param n The number of records to generate. + * @return The file path. * @throws IOException */ - Path generateOneFile() throws IOException { - Path path = new Path(dfsRoot, fileCount + fileSuffix); - switch (fileCount) { - case 1: - writeNewDataToFile(dataGenerator.generateInserts("000", 100), path); - fileCount++; - return path; - case 2: - writeNewDataToFile(dataGenerator.generateInserts("001", 10000), path); - fileCount++; - return path; - default: - return null; - } + Path generateOneFile(String filename, String commitTime, int n) throws IOException { + Path path = new Path(dfsRoot, filename + fileSuffix); + writeNewDataToFile(dataGenerator.generateInserts(commitTime, n), path); + return path; } /** @@ -132,7 +126,7 @@ public void testReadingFromSource() throws IOException { sourceFormatAdapter.fetchNewDataInAvroFormat(Option.empty(), Long.MAX_VALUE).getBatch()); // Test respecting sourceLimit int sourceLimit = 10; - RemoteIterator files = dfs.listFiles(generateOneFile(), true); + RemoteIterator files = dfs.listFiles(generateOneFile("1", "000", 100), true); FileStatus file1Status = files.next(); assertTrue(file1Status.getLen() > sourceLimit); assertEquals(Option.empty(), @@ -152,7 +146,7 @@ public void testReadingFromSource() throws IOException { assertEquals(100, fetch1Rows.count()); // 2. Produce new data, extract new data - generateOneFile(); + generateOneFile("2", "001", 10000); // Test fetching Avro format InputBatch> fetch2 = sourceFormatAdapter.fetchNewDataInAvroFormat( Option.of(fetch1.getCheckpointForNextBatch()), Long.MAX_VALUE); From 33a19a68c014682884e06f0348348747db790ed4 Mon Sep 17 00:00:00 2001 From: Y Ethan Guo Date: Sat, 18 Jan 2020 21:58:04 -0800 Subject: [PATCH 5/5] Remove redundant comments. --- .../hudi/utilities/sources/AbstractDFSSourceTestBase.java | 2 -- 1 file changed, 2 deletions(-) diff --git a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/AbstractDFSSourceTestBase.java b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/AbstractDFSSourceTestBase.java index 6568ea908c299..5815317d9281b 100644 --- a/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/AbstractDFSSourceTestBase.java +++ b/hudi-utilities/src/test/java/org/apache/hudi/utilities/sources/AbstractDFSSourceTestBase.java @@ -94,8 +94,6 @@ public void teardown() throws Exception { */ abstract void writeNewDataToFile(List records, Path path) throws IOException; - // Generates a batch of test data and writes the data to a file. - /** * Generates a batch of test data and writes the data to a file. *