Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[FLINK-32837][JUnit5 Migration] Migrate the client and clusterframework packages of flink-runtime module to junit5 #23241

Merged
Merged
Show file tree
Hide file tree
Changes from 1 commit
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Jump to
Jump to file
Failed to load files.
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -28,47 +28,49 @@
import org.apache.flink.runtime.blob.VoidBlobStore;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.testutils.junit.utils.TempDirUtils;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.TestLogger;

import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.file.Files;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;

import static org.junit.Assert.assertEquals;
import static org.assertj.core.api.Assertions.assertThat;

/** Tests for {@link ClientUtils}. */
public class ClientUtilsTest extends TestLogger {
public class ClientUtilsTest {

@ClassRule public static TemporaryFolder temporaryFolder = new TemporaryFolder();
@TempDir private static java.nio.file.Path temporaryFolder;

private static BlobServer blobServer = null;

@BeforeClass
public static void setup() throws IOException {
@BeforeAll
static void setup() throws IOException {
Configuration config = new Configuration();
blobServer = new BlobServer(config, temporaryFolder.newFolder(), new VoidBlobStore());
blobServer =
new BlobServer(
config, TempDirUtils.newFolder(temporaryFolder), new VoidBlobStore());
blobServer.start();
}

@AfterClass
public static void teardown() throws IOException {
@AfterAll
static void teardown() throws IOException {
if (blobServer != null) {
blobServer.close();
}
}

@Test
public void uploadAndSetUserJars() throws Exception {
java.nio.file.Path tmpDir = temporaryFolder.newFolder().toPath();
void uploadAndSetUserJars() throws Exception {
java.nio.file.Path tmpDir = TempDirUtils.newFolder(temporaryFolder).toPath();
JobGraph jobGraph = JobGraphTestUtils.emptyJobGraph();

Collection<Path> jars =
Expand All @@ -78,8 +80,8 @@ public void uploadAndSetUserJars() throws Exception {

jars.forEach(jobGraph::addJar);

assertEquals(jars.size(), jobGraph.getUserJars().size());
assertEquals(0, jobGraph.getUserJarBlobKeys().size());
assertThat(jobGraph.getUserJars()).hasSameSizeAs(jars);
assertThat(jobGraph.getUserJarBlobKeys()).isEmpty();

ClientUtils.extractAndUploadJobGraphFiles(
jobGraph,
Expand All @@ -88,18 +90,18 @@ public void uploadAndSetUserJars() throws Exception {
new InetSocketAddress("localhost", blobServer.getPort()),
new Configuration()));

assertEquals(jars.size(), jobGraph.getUserJars().size());
assertEquals(jars.size(), jobGraph.getUserJarBlobKeys().size());
assertEquals(jars.size(), jobGraph.getUserJarBlobKeys().stream().distinct().count());
assertThat(jobGraph.getUserJars()).hasSameSizeAs(jars);
assertThat(jobGraph.getUserJarBlobKeys()).hasSameSizeAs(jars);
assertThat(jobGraph.getUserJarBlobKeys().stream().distinct()).hasSameSizeAs(jars);

for (PermanentBlobKey blobKey : jobGraph.getUserJarBlobKeys()) {
blobServer.getFile(jobGraph.getJobID(), blobKey);
}
}

@Test
public void uploadAndSetUserArtifacts() throws Exception {
java.nio.file.Path tmpDir = temporaryFolder.newFolder().toPath();
void uploadAndSetUserArtifacts() throws Exception {
java.nio.file.Path tmpDir = TempDirUtils.newFolder(temporaryFolder).toPath();
JobGraph jobGraph = JobGraphTestUtils.emptyJobGraph();

Collection<DistributedCache.DistributedCacheEntry> localArtifacts =
Expand All @@ -114,7 +116,7 @@ public void uploadAndSetUserArtifacts() throws Exception {
Files.createFile(tmpDir.resolve("art4")).toString(), true, false));

Collection<DistributedCache.DistributedCacheEntry> distributedArtifacts =
Arrays.asList(
Collections.singletonList(
new DistributedCache.DistributedCacheEntry(
"hdfs://localhost:1234/test", true, false));

Expand All @@ -127,12 +129,11 @@ public void uploadAndSetUserArtifacts() throws Exception {

final int totalNumArtifacts = localArtifacts.size() + distributedArtifacts.size();

assertEquals(totalNumArtifacts, jobGraph.getUserArtifacts().size());
assertEquals(
0,
jobGraph.getUserArtifacts().values().stream()
.filter(entry -> entry.blobKey != null)
.count());
assertThat(jobGraph.getUserArtifacts()).hasSize(totalNumArtifacts);
assertThat(
jobGraph.getUserArtifacts().values().stream()
.filter(entry -> entry.blobKey != null))
.isEmpty();

ClientUtils.extractAndUploadJobGraphFiles(
jobGraph,
Expand All @@ -141,24 +142,21 @@ public void uploadAndSetUserArtifacts() throws Exception {
new InetSocketAddress("localhost", blobServer.getPort()),
new Configuration()));

assertEquals(totalNumArtifacts, jobGraph.getUserArtifacts().size());
assertEquals(
localArtifacts.size(),
jobGraph.getUserArtifacts().values().stream()
.filter(entry -> entry.blobKey != null)
.count());
assertEquals(
distributedArtifacts.size(),
jobGraph.getUserArtifacts().values().stream()
.filter(entry -> entry.blobKey == null)
.count());
assertThat(jobGraph.getUserArtifacts()).hasSize(totalNumArtifacts);
assertThat(
jobGraph.getUserArtifacts().values().stream()
.filter(entry -> entry.blobKey != null))
.hasSameSizeAs(localArtifacts);
assertThat(
jobGraph.getUserArtifacts().values().stream()
.filter(entry -> entry.blobKey == null))
.hasSameSizeAs(distributedArtifacts);
// 1 unique key for each local artifact, and null for distributed artifacts
assertEquals(
localArtifacts.size() + 1,
jobGraph.getUserArtifacts().values().stream()
.map(entry -> entry.blobKey)
.distinct()
.count());
assertThat(
jobGraph.getUserArtifacts().values().stream()
.map(entry -> entry.blobKey)
.distinct())
.hasSize(localArtifacts.size() + 1);
for (DistributedCache.DistributedCacheEntry original : localArtifacts) {
assertState(
original,
Expand All @@ -181,10 +179,10 @@ private static void assertState(
boolean isBlobKeyNull,
JobID jobId)
throws Exception {
assertEquals(original.isZipped, actual.isZipped);
assertEquals(original.isExecutable, actual.isExecutable);
assertEquals(original.filePath, actual.filePath);
assertEquals(isBlobKeyNull, actual.blobKey == null);
assertThat(actual.isZipped).isEqualTo(original.isZipped);
assertThat(actual.isExecutable).isEqualTo(original.isExecutable);
assertThat(actual.filePath).isEqualTo(original.filePath);
assertThat(actual.blobKey == null).isEqualTo(isBlobKeyNull);
if (!isBlobKeyNull) {
blobServer.getFile(
jobId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,28 +22,24 @@
import org.apache.flink.api.common.JobID;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.OptionalFailure;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;

import org.junit.Test;
import org.junit.jupiter.api.Test;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

/** Tests for the SerializedJobExecutionResult */
public class SerializedJobExecutionResultTest extends TestLogger {
class SerializedJobExecutionResultTest {

@Test
public void testSerialization() throws Exception {
void testSerialization() throws Exception {
final ClassLoader classloader = getClass().getClassLoader();

JobID origJobId = new JobID();
Expand All @@ -62,61 +58,53 @@ public void testSerialization() throws Exception {
// serialize and deserialize the object
SerializedJobExecutionResult cloned = CommonTestUtils.createCopySerializable(result);

assertEquals(origJobId, cloned.getJobId());
assertEquals(origTime, cloned.getNetRuntime());
assertEquals(origTime, cloned.getNetRuntime(TimeUnit.MILLISECONDS));
assertEquals(origMap, cloned.getSerializedAccumulatorResults());
assertThat(cloned.getJobId()).isEqualTo(origJobId);
assertThat(cloned.getNetRuntime()).isEqualTo(origTime);
assertThat(cloned.getNetRuntime(TimeUnit.MILLISECONDS)).isEqualTo(origTime);
assertThat(cloned.getSerializedAccumulatorResults()).isEqualTo(origMap);

// convert to deserialized result
JobExecutionResult jResult = result.toJobExecutionResult(classloader);
JobExecutionResult jResultCopied = result.toJobExecutionResult(classloader);

assertEquals(origJobId, jResult.getJobID());
assertEquals(origJobId, jResultCopied.getJobID());
assertEquals(origTime, jResult.getNetRuntime());
assertEquals(origTime, jResult.getNetRuntime(TimeUnit.MILLISECONDS));
assertEquals(origTime, jResultCopied.getNetRuntime());
assertEquals(origTime, jResultCopied.getNetRuntime(TimeUnit.MILLISECONDS));
assertThat(jResult.getJobID()).isEqualTo(origJobId);
assertThat(jResultCopied.getJobID()).isEqualTo(origJobId);
assertThat(jResult.getNetRuntime()).isEqualTo(origTime);
assertThat(jResult.getNetRuntime(TimeUnit.MILLISECONDS)).isEqualTo(origTime);
assertThat(jResultCopied.getNetRuntime()).isEqualTo(origTime);
assertThat(jResultCopied.getNetRuntime(TimeUnit.MILLISECONDS)).isEqualTo(origTime);

for (Map.Entry<String, SerializedValue<OptionalFailure<Object>>> entry :
origMap.entrySet()) {
String name = entry.getKey();
OptionalFailure<Object> value = entry.getValue().deserializeValue(classloader);
if (value.isFailure()) {
try {
jResult.getAccumulatorResult(name);
fail("expected failure");
} catch (FlinkRuntimeException ex) {
assertTrue(
ExceptionUtils.findThrowable(ex, ExpectedTestException.class)
.isPresent());
}
try {
jResultCopied.getAccumulatorResult(name);
fail("expected failure");
} catch (FlinkRuntimeException ex) {
assertTrue(
ExceptionUtils.findThrowable(ex, ExpectedTestException.class)
.isPresent());
}
assertThatThrownBy(() -> jResult.getAccumulatorResult(name))
.isInstanceOf(FlinkRuntimeException.class)
.hasCauseInstanceOf(ExpectedTestException.class);

assertThatThrownBy(() -> jResultCopied.getAccumulatorResult(name))
.isInstanceOf(FlinkRuntimeException.class)
.hasCauseInstanceOf(ExpectedTestException.class);
} else {
assertEquals(value.get(), jResult.getAccumulatorResult(name));
assertEquals(value.get(), jResultCopied.getAccumulatorResult(name));
assertThat((Object) jResult.getAccumulatorResult(name)).isEqualTo(value.get());
assertThat((Object) jResultCopied.getAccumulatorResult(name))
.isEqualTo(value.get());
}
}
}

@Test
public void testSerializationWithNullValues() throws Exception {
void testSerializationWithNullValues() throws Exception {
SerializedJobExecutionResult result = new SerializedJobExecutionResult(null, 0L, null);
SerializedJobExecutionResult cloned = CommonTestUtils.createCopySerializable(result);

assertNull(cloned.getJobId());
assertEquals(0L, cloned.getNetRuntime());
assertNull(cloned.getSerializedAccumulatorResults());
assertThat(cloned.getJobId()).isNull();
assertThat(cloned.getNetRuntime()).isZero();
assertThat(cloned.getSerializedAccumulatorResults()).isNull();

JobExecutionResult jResult = result.toJobExecutionResult(getClass().getClassLoader());
assertNull(jResult.getJobID());
assertTrue(jResult.getAllAccumulatorResults().isEmpty());
assertThat(jResult.getJobID()).isNull();
assertThat(jResult.getAllAccumulatorResults()).isEmpty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@
package org.apache.flink.runtime.clusterframework;

import org.apache.flink.api.common.JobStatus;
import org.apache.flink.util.TestLoggerExtension;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;

Expand All @@ -34,64 +32,63 @@
import static org.assertj.core.api.Assertions.assertThatThrownBy;

/** Tests for the {@link ApplicationStatus}. */
@ExtendWith(TestLoggerExtension.class)
public class ApplicationStatusTest {
class ApplicationStatusTest {

private static final int SUCCESS_EXIT_CODE = 0;

@Test
public void succeededStatusMapsToSuccessExitCode() {
void succeededStatusMapsToSuccessExitCode() {
int exitCode = ApplicationStatus.SUCCEEDED.processExitCode();
assertThat(exitCode).isEqualTo(SUCCESS_EXIT_CODE);
}

@Test
public void cancelledStatusMapsToSuccessExitCode() {
void cancelledStatusMapsToSuccessExitCode() {
int exitCode = ApplicationStatus.CANCELED.processExitCode();
assertThat(exitCode).isEqualTo(SUCCESS_EXIT_CODE);
}

@Test
public void notSucceededNorCancelledStatusMapsToNonSuccessExitCode() {
void notSucceededNorCancelledStatusMapsToNonSuccessExitCode() {
Iterable<Integer> exitCodes = exitCodes(notSucceededNorCancelledStatus());
assertThat(exitCodes).doesNotContain(SUCCESS_EXIT_CODE);
}

@Test
public void testJobStatusFromSuccessApplicationStatus() {
void testJobStatusFromSuccessApplicationStatus() {
assertThat(ApplicationStatus.SUCCEEDED.deriveJobStatus()).isEqualTo(JobStatus.FINISHED);
}

@Test
public void testJobStatusFromFailedApplicationStatus() {
void testJobStatusFromFailedApplicationStatus() {
assertThat(ApplicationStatus.FAILED.deriveJobStatus()).isEqualTo(JobStatus.FAILED);
}

@Test
public void testJobStatusFromCancelledApplicationStatus() {
void testJobStatusFromCancelledApplicationStatus() {
assertThat(ApplicationStatus.CANCELED.deriveJobStatus()).isEqualTo(JobStatus.CANCELED);
}

@Test
public void testJobStatusFailsFromUnknownApplicationStatuses() {
void testJobStatusFailsFromUnknownApplicationStatuses() {
assertThatThrownBy(ApplicationStatus.UNKNOWN::deriveJobStatus)
.isInstanceOf(UnsupportedOperationException.class);
}

@Test
public void testSuccessApplicationStatusFromJobStatus() {
void testSuccessApplicationStatusFromJobStatus() {
assertThat(ApplicationStatus.fromJobStatus(JobStatus.FINISHED))
.isEqualTo(ApplicationStatus.SUCCEEDED);
}

@Test
public void testFailedApplicationStatusFromJobStatus() {
void testFailedApplicationStatusFromJobStatus() {
assertThat(ApplicationStatus.fromJobStatus(JobStatus.FAILED))
.isEqualTo(ApplicationStatus.FAILED);
}

@Test
public void testCancelledApplicationStatusFromJobStatus() {
void testCancelledApplicationStatusFromJobStatus() {
assertThat(ApplicationStatus.fromJobStatus(JobStatus.CANCELED))
.isEqualTo(ApplicationStatus.CANCELED);
}
Expand All @@ -114,7 +111,7 @@ public void testUnknownApplicationStatusFromJobStatus(JobStatus jobStatus) {
}

@Test
public void testUnknownApplicationStatusForMissingJobStatus() {
void testUnknownApplicationStatusForMissingJobStatus() {
assertThat(ApplicationStatus.fromJobStatus(null)).isEqualTo(ApplicationStatus.UNKNOWN);
}

Expand Down