Skip to content

Commit

Permalink
[FLINK-27607][tests] Migrate module flink-connector-files to JUnit5
Browse files Browse the repository at this point in the history
  • Loading branch information
snuyanzin committed May 18, 2022
1 parent 457c3dc commit 29c72ce
Show file tree
Hide file tree
Showing 52 changed files with 1,106 additions and 1,145 deletions.
6 changes: 6 additions & 0 deletions flink-connectors/flink-connector-files/pom.xml
Expand Up @@ -132,6 +132,12 @@ under the License.
<goals>
<goal>test-jar</goal>
</goals>
<configuration>
<excludes>
<!-- test-jar is still used by JUnit4 modules -->
<exclude>META-INF/services/org.junit.jupiter.api.extension.Extension</exclude>
</excludes>
</configuration>
</execution>
</executions>
</plugin>
Expand Down
Expand Up @@ -29,21 +29,18 @@
import org.apache.flink.runtime.minicluster.RpcServiceSharing;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.datastream.DataStreamSink;
import org.apache.flink.test.util.MiniClusterWithClientResource;
import org.apache.flink.test.junit5.MiniClusterExtension;

import org.junit.Rule;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.jupiter.api.extension.RegisterExtension;

/** Tests the compaction of the {@link FileSink} in BATCH mode. */
@RunWith(Parameterized.class)
public class BatchCompactingFileSinkITCase extends BatchExecutionFileSinkITCase {
class BatchCompactingFileSinkITCase extends BatchExecutionFileSinkITCase {

private static final int PARALLELISM = 4;

@Rule
public final MiniClusterWithClientResource miniClusterResource =
new MiniClusterWithClientResource(
@RegisterExtension
private static final MiniClusterExtension MINI_CLUSTER_RESOURCE =
new MiniClusterExtension(
new MiniClusterResourceConfiguration.Builder()
.setNumberTaskManagers(1)
.setNumberSlotsPerTaskManager(PARALLELISM)
Expand Down
Expand Up @@ -34,19 +34,15 @@
import org.apache.flink.streaming.api.graph.StreamGraph;
import org.apache.flink.streaming.api.operators.StreamSource;

import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;

/** Tests the functionality of the {@link FileSink} in BATCH mode. */
@RunWith(Parameterized.class)
public class BatchExecutionFileSinkITCase extends FileSinkITBase {
class BatchExecutionFileSinkITCase extends FileSinkITBase {

/**
* Creating the testing job graph in batch mode. The graph created is [Source] -> [Failover Map
* -> File Sink]. The Failover Map is introduced to ensure the failover would always restart the
* file writer so the data would be re-written.
*/
protected JobGraph createJobGraph(String path) {
protected JobGraph createJobGraph(boolean triggerFailover, String path) {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
Configuration config = new Configuration();
config.set(ExecutionOptions.RUNTIME_MODE, RuntimeExecutionMode.BATCH);
Expand Down
Expand Up @@ -21,59 +21,52 @@
import org.apache.flink.connector.file.sink.utils.FileSinkTestUtils;
import org.apache.flink.core.fs.Path;

import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.jupiter.api.Test;

import java.io.IOException;

import static org.junit.Assert.assertEquals;
import static org.assertj.core.api.Assertions.assertThat;

/** Tests the serialization and deserialization for {@link FileSinkCommittable}. */
public class FileCommittableSerializerTest {

@ClassRule public static final TemporaryFolder TEMPORARY_FOLDER = new TemporaryFolder();
class FileCommittableSerializerTest {

@Test
public void testCommittableWithPendingFile() throws IOException {
void testCommittableWithPendingFile() throws IOException {
FileSinkCommittable committable =
new FileSinkCommittable("0", new FileSinkTestUtils.TestPendingFileRecoverable());
FileSinkCommittable deserialized = serializeAndDeserialize(committable);
assertEquals(committable.getBucketId(), deserialized.getBucketId());
assertEquals(committable.getPendingFile(), deserialized.getPendingFile());
assertEquals(
committable.getInProgressFileToCleanup(),
deserialized.getInProgressFileToCleanup());
assertEquals(
committable.getCompactedFileToCleanup(), deserialized.getCompactedFileToCleanup());
assertThat(committable.getBucketId()).isEqualTo(deserialized.getBucketId());
assertThat(committable.getPendingFile()).isEqualTo(deserialized.getPendingFile());
assertThat(committable.getInProgressFileToCleanup())
.isEqualTo(deserialized.getInProgressFileToCleanup());
assertThat(committable.getCompactedFileToCleanup())
.isEqualTo(deserialized.getCompactedFileToCleanup());
}

@Test
public void testCommittableWithInProgressFileToCleanup() throws IOException {
void testCommittableWithInProgressFileToCleanup() throws IOException {
FileSinkCommittable committable =
new FileSinkCommittable("0", new FileSinkTestUtils.TestInProgressFileRecoverable());
FileSinkCommittable deserialized = serializeAndDeserialize(committable);
assertEquals(committable.getBucketId(), deserialized.getBucketId());
assertEquals(committable.getPendingFile(), deserialized.getPendingFile());
assertEquals(
committable.getInProgressFileToCleanup(),
deserialized.getInProgressFileToCleanup());
assertEquals(
committable.getCompactedFileToCleanup(), deserialized.getCompactedFileToCleanup());
assertThat(committable.getBucketId()).isEqualTo(deserialized.getBucketId());
assertThat(committable.getPendingFile()).isEqualTo(deserialized.getPendingFile());
assertThat(committable.getInProgressFileToCleanup())
.isEqualTo(deserialized.getInProgressFileToCleanup());
assertThat(committable.getCompactedFileToCleanup())
.isEqualTo(deserialized.getCompactedFileToCleanup());
}

@Test
public void testCommittableWithCompactedFileToCleanup() throws IOException {
void testCommittableWithCompactedFileToCleanup() throws IOException {
FileSinkCommittable committable =
new FileSinkCommittable("0", new Path("/tmp/mock_path_to_cleanup"));
FileSinkCommittable deserialized = serializeAndDeserialize(committable);
assertEquals(committable.getBucketId(), deserialized.getBucketId());
assertEquals(committable.getPendingFile(), deserialized.getPendingFile());
assertEquals(
committable.getInProgressFileToCleanup(),
deserialized.getInProgressFileToCleanup());
assertEquals(
committable.getCompactedFileToCleanup(), deserialized.getCompactedFileToCleanup());
assertThat(committable.getBucketId()).isEqualTo(deserialized.getBucketId());
assertThat(committable.getPendingFile()).isEqualTo(deserialized.getPendingFile());
assertThat(committable.getInProgressFileToCleanup())
.isEqualTo(deserialized.getInProgressFileToCleanup());
assertThat(committable.getCompactedFileToCleanup())
.isEqualTo(deserialized.getCompactedFileToCleanup());
}

private FileSinkCommittable serializeAndDeserialize(FileSinkCommittable committable)
Expand Down
Expand Up @@ -29,50 +29,41 @@
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter.OutputStreamBasedInProgressFileRecoverable;
import org.apache.flink.streaming.api.functions.sink.filesystem.OutputStreamBasedPartFileWriter.OutputStreamBasedPendingFileRecoverable;
import org.apache.flink.streaming.api.functions.sink.filesystem.RowWiseBucketWriter;
import org.apache.flink.util.TestLogger;

import org.junit.Assert;
import org.junit.ClassRule;
import org.junit.Ignore;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.runner.RunWith;
import org.junit.runners.Parameterized;
import org.junit.jupiter.api.Disabled;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.MethodSource;

import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.nio.file.Files;
import java.nio.file.Paths;
import java.util.Collection;
import java.util.Collections;
import java.util.stream.Stream;

import static org.assertj.core.api.Assertions.assertThat;

/**
* Tests for the {@link FileSinkCommittableSerializer} that verify we can still read the recoverable
* serialized by the previous versions.
*/
@RunWith(Parameterized.class)
public class FileSinkCommittableSerializerMigrationTest extends TestLogger {
class FileSinkCommittableSerializerMigrationTest {

private static final int CURRENT_VERSION = 1;

@Parameterized.Parameters(name = "Previous Version = {0}")
public static Collection<Integer> previousVersions() {
return Collections.singletonList(1);
static Stream<Integer> previousVersions() {
return Stream.of(1);
}

@Parameterized.Parameter public Integer previousVersion;

private static final String IN_PROGRESS_CONTENT = "writing";
private static final String PENDING_CONTENT = "wrote";

private static final java.nio.file.Path BASE_PATH =
Paths.get("src/test/resources/").resolve("committable-serializer-migration");

@ClassRule public static TemporaryFolder tempFolder = new TemporaryFolder();

@Test
@Ignore
public void prepareDeserializationInProgressToCleanup() throws IOException {
@Disabled
void prepareDeserializationInProgressToCleanup() throws IOException {
String scenario = "in-progress";
java.nio.file.Path path = resolveVersionPath(CURRENT_VERSION, scenario);

Expand All @@ -96,8 +87,9 @@ public void prepareDeserializationInProgressToCleanup() throws IOException {
Files.write(path.resolve("committable"), bytes);
}

@Test
public void testSerializationInProgressToCleanup() throws IOException {
@ParameterizedTest(name = "Previous Version = {0}")
@MethodSource("previousVersions")
void testSerializationInProgressToCleanup(Integer previousVersion) throws IOException {
String scenario = "in-progress";
java.nio.file.Path path = resolveVersionPath(previousVersion, scenario);

Expand All @@ -111,13 +103,13 @@ public void testSerializationInProgressToCleanup() throws IOException {
serializer.deserialize(
previousVersion, Files.readAllBytes(path.resolve("committable")));

Assert.assertTrue(committable.hasInProgressFileToCleanup());
Assert.assertFalse(committable.hasPendingFile());
assertThat(committable.hasInProgressFileToCleanup()).isTrue();
assertThat(committable.hasPendingFile()).isFalse();
}

@Test
@Ignore
public void prepareDeserializationPending() throws IOException {
@Disabled
void prepareDeserializationPending() throws IOException {
String scenario = "pending";
java.nio.file.Path path = resolveVersionPath(CURRENT_VERSION, scenario);

Expand All @@ -141,8 +133,9 @@ public void prepareDeserializationPending() throws IOException {
Files.write(path.resolve("committable"), bytes);
}

@Test
public void testSerializationPending() throws IOException {
@ParameterizedTest(name = "Previous Version = {0}")
@MethodSource("previousVersions")
void testSerializationPending(Integer previousVersion) throws IOException {
String scenario = "pending";
java.nio.file.Path path = resolveVersionPath(previousVersion, scenario);

Expand All @@ -156,8 +149,8 @@ public void testSerializationPending() throws IOException {
serializer.deserialize(
previousVersion, Files.readAllBytes(path.resolve("committable")));

Assert.assertTrue(committable.hasPendingFile());
Assert.assertFalse(committable.hasInProgressFileToCleanup());
assertThat(committable.hasPendingFile()).isTrue();
assertThat(committable.hasInProgressFileToCleanup()).isFalse();
}

private java.nio.file.Path resolveVersionPath(long version, String scenario) {
Expand Down

0 comments on commit 29c72ce

Please sign in to comment.