Skip to content

Commit

Permalink
[FLINK-32837][JUnit5 Migration] Migrate the client and clusterframewo…
Browse files Browse the repository at this point in the history
…rk packages of flink-runtime module to junit5
  • Loading branch information
1996fanrui committed Aug 18, 2023
1 parent 9546f82 commit 7225095
Show file tree
Hide file tree
Showing 8 changed files with 892 additions and 953 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -28,47 +28,49 @@
import org.apache.flink.runtime.blob.VoidBlobStore;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.JobGraphTestUtils;
import org.apache.flink.testutils.junit.utils.TempDirUtils;
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.TestLogger;

import org.junit.AfterClass;
import org.junit.BeforeClass;
import org.junit.ClassRule;
import org.junit.Test;
import org.junit.rules.TemporaryFolder;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.io.TempDir;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.file.Files;
import java.util.Arrays;
import java.util.Collection;
import java.util.Collections;

import static org.junit.Assert.assertEquals;
import static org.assertj.core.api.Assertions.assertThat;

/** Tests for {@link ClientUtils}. */
public class ClientUtilsTest extends TestLogger {
public class ClientUtilsTest {

@ClassRule public static TemporaryFolder temporaryFolder = new TemporaryFolder();
@TempDir private static java.nio.file.Path temporaryFolder;

private static BlobServer blobServer = null;

@BeforeClass
public static void setup() throws IOException {
@BeforeAll
static void setup() throws IOException {
Configuration config = new Configuration();
blobServer = new BlobServer(config, temporaryFolder.newFolder(), new VoidBlobStore());
blobServer =
new BlobServer(
config, TempDirUtils.newFolder(temporaryFolder), new VoidBlobStore());
blobServer.start();
}

@AfterClass
public static void teardown() throws IOException {
@AfterAll
static void teardown() throws IOException {
if (blobServer != null) {
blobServer.close();
}
}

@Test
public void uploadAndSetUserJars() throws Exception {
java.nio.file.Path tmpDir = temporaryFolder.newFolder().toPath();
void uploadAndSetUserJars() throws Exception {
java.nio.file.Path tmpDir = TempDirUtils.newFolder(temporaryFolder).toPath();
JobGraph jobGraph = JobGraphTestUtils.emptyJobGraph();

Collection<Path> jars =
Expand All @@ -78,8 +80,8 @@ public void uploadAndSetUserJars() throws Exception {

jars.forEach(jobGraph::addJar);

assertEquals(jars.size(), jobGraph.getUserJars().size());
assertEquals(0, jobGraph.getUserJarBlobKeys().size());
assertThat(jobGraph.getUserJars()).hasSameSizeAs(jars);
assertThat(jobGraph.getUserJarBlobKeys()).isEmpty();

ClientUtils.extractAndUploadJobGraphFiles(
jobGraph,
Expand All @@ -88,18 +90,18 @@ public void uploadAndSetUserJars() throws Exception {
new InetSocketAddress("localhost", blobServer.getPort()),
new Configuration()));

assertEquals(jars.size(), jobGraph.getUserJars().size());
assertEquals(jars.size(), jobGraph.getUserJarBlobKeys().size());
assertEquals(jars.size(), jobGraph.getUserJarBlobKeys().stream().distinct().count());
assertThat(jobGraph.getUserJars()).hasSameSizeAs(jars);
assertThat(jobGraph.getUserJarBlobKeys()).hasSameSizeAs(jars);
assertThat(jobGraph.getUserJarBlobKeys().stream().distinct()).hasSameSizeAs(jars);

for (PermanentBlobKey blobKey : jobGraph.getUserJarBlobKeys()) {
blobServer.getFile(jobGraph.getJobID(), blobKey);
}
}

@Test
public void uploadAndSetUserArtifacts() throws Exception {
java.nio.file.Path tmpDir = temporaryFolder.newFolder().toPath();
void uploadAndSetUserArtifacts() throws Exception {
java.nio.file.Path tmpDir = TempDirUtils.newFolder(temporaryFolder).toPath();
JobGraph jobGraph = JobGraphTestUtils.emptyJobGraph();

Collection<DistributedCache.DistributedCacheEntry> localArtifacts =
Expand All @@ -114,7 +116,7 @@ public void uploadAndSetUserArtifacts() throws Exception {
Files.createFile(tmpDir.resolve("art4")).toString(), true, false));

Collection<DistributedCache.DistributedCacheEntry> distributedArtifacts =
Arrays.asList(
Collections.singletonList(
new DistributedCache.DistributedCacheEntry(
"hdfs://localhost:1234/test", true, false));

Expand All @@ -127,12 +129,11 @@ public void uploadAndSetUserArtifacts() throws Exception {

final int totalNumArtifacts = localArtifacts.size() + distributedArtifacts.size();

assertEquals(totalNumArtifacts, jobGraph.getUserArtifacts().size());
assertEquals(
0,
jobGraph.getUserArtifacts().values().stream()
.filter(entry -> entry.blobKey != null)
.count());
assertThat(jobGraph.getUserArtifacts()).hasSize(totalNumArtifacts);
assertThat(
jobGraph.getUserArtifacts().values().stream()
.filter(entry -> entry.blobKey != null))
.isEmpty();

ClientUtils.extractAndUploadJobGraphFiles(
jobGraph,
Expand All @@ -141,24 +142,21 @@ public void uploadAndSetUserArtifacts() throws Exception {
new InetSocketAddress("localhost", blobServer.getPort()),
new Configuration()));

assertEquals(totalNumArtifacts, jobGraph.getUserArtifacts().size());
assertEquals(
localArtifacts.size(),
jobGraph.getUserArtifacts().values().stream()
.filter(entry -> entry.blobKey != null)
.count());
assertEquals(
distributedArtifacts.size(),
jobGraph.getUserArtifacts().values().stream()
.filter(entry -> entry.blobKey == null)
.count());
assertThat(jobGraph.getUserArtifacts()).hasSize(totalNumArtifacts);
assertThat(
jobGraph.getUserArtifacts().values().stream()
.filter(entry -> entry.blobKey != null))
.hasSameSizeAs(localArtifacts);
assertThat(
jobGraph.getUserArtifacts().values().stream()
.filter(entry -> entry.blobKey == null))
.hasSameSizeAs(distributedArtifacts);
// 1 unique key for each local artifact, and null for distributed artifacts
assertEquals(
localArtifacts.size() + 1,
jobGraph.getUserArtifacts().values().stream()
.map(entry -> entry.blobKey)
.distinct()
.count());
assertThat(
jobGraph.getUserArtifacts().values().stream()
.map(entry -> entry.blobKey)
.distinct())
.hasSize(localArtifacts.size() + 1);
for (DistributedCache.DistributedCacheEntry original : localArtifacts) {
assertState(
original,
Expand All @@ -181,10 +179,10 @@ private static void assertState(
boolean isBlobKeyNull,
JobID jobId)
throws Exception {
assertEquals(original.isZipped, actual.isZipped);
assertEquals(original.isExecutable, actual.isExecutable);
assertEquals(original.filePath, actual.filePath);
assertEquals(isBlobKeyNull, actual.blobKey == null);
assertThat(actual.isZipped).isEqualTo(original.isZipped);
assertThat(actual.isExecutable).isEqualTo(original.isExecutable);
assertThat(actual.filePath).isEqualTo(original.filePath);
assertThat(actual.blobKey == null).isEqualTo(isBlobKeyNull);
if (!isBlobKeyNull) {
blobServer.getFile(
jobId,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -22,28 +22,24 @@
import org.apache.flink.api.common.JobID;
import org.apache.flink.core.testutils.CommonTestUtils;
import org.apache.flink.runtime.operators.testutils.ExpectedTestException;
import org.apache.flink.util.ExceptionUtils;
import org.apache.flink.util.FlinkRuntimeException;
import org.apache.flink.util.OptionalFailure;
import org.apache.flink.util.SerializedValue;
import org.apache.flink.util.TestLogger;

import org.junit.Test;
import org.junit.jupiter.api.Test;

import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.TimeUnit;

import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import static org.assertj.core.api.Assertions.assertThat;
import static org.assertj.core.api.Assertions.assertThatThrownBy;

/** Tests for the SerializedJobExecutionResult */
public class SerializedJobExecutionResultTest extends TestLogger {
class SerializedJobExecutionResultTest {

@Test
public void testSerialization() throws Exception {
void testSerialization() throws Exception {
final ClassLoader classloader = getClass().getClassLoader();

JobID origJobId = new JobID();
Expand All @@ -62,61 +58,53 @@ public void testSerialization() throws Exception {
// serialize and deserialize the object
SerializedJobExecutionResult cloned = CommonTestUtils.createCopySerializable(result);

assertEquals(origJobId, cloned.getJobId());
assertEquals(origTime, cloned.getNetRuntime());
assertEquals(origTime, cloned.getNetRuntime(TimeUnit.MILLISECONDS));
assertEquals(origMap, cloned.getSerializedAccumulatorResults());
assertThat(cloned.getJobId()).isEqualTo(origJobId);
assertThat(cloned.getNetRuntime()).isEqualTo(origTime);
assertThat(cloned.getNetRuntime(TimeUnit.MILLISECONDS)).isEqualTo(origTime);
assertThat(cloned.getSerializedAccumulatorResults()).isEqualTo(origMap);

// convert to deserialized result
JobExecutionResult jResult = result.toJobExecutionResult(classloader);
JobExecutionResult jResultCopied = result.toJobExecutionResult(classloader);

assertEquals(origJobId, jResult.getJobID());
assertEquals(origJobId, jResultCopied.getJobID());
assertEquals(origTime, jResult.getNetRuntime());
assertEquals(origTime, jResult.getNetRuntime(TimeUnit.MILLISECONDS));
assertEquals(origTime, jResultCopied.getNetRuntime());
assertEquals(origTime, jResultCopied.getNetRuntime(TimeUnit.MILLISECONDS));
assertThat(jResult.getJobID()).isEqualTo(origJobId);
assertThat(jResultCopied.getJobID()).isEqualTo(origJobId);
assertThat(jResult.getNetRuntime()).isEqualTo(origTime);
assertThat(jResult.getNetRuntime(TimeUnit.MILLISECONDS)).isEqualTo(origTime);
assertThat(jResultCopied.getNetRuntime()).isEqualTo(origTime);
assertThat(jResultCopied.getNetRuntime(TimeUnit.MILLISECONDS)).isEqualTo(origTime);

for (Map.Entry<String, SerializedValue<OptionalFailure<Object>>> entry :
origMap.entrySet()) {
String name = entry.getKey();
OptionalFailure<Object> value = entry.getValue().deserializeValue(classloader);
if (value.isFailure()) {
try {
jResult.getAccumulatorResult(name);
fail("expected failure");
} catch (FlinkRuntimeException ex) {
assertTrue(
ExceptionUtils.findThrowable(ex, ExpectedTestException.class)
.isPresent());
}
try {
jResultCopied.getAccumulatorResult(name);
fail("expected failure");
} catch (FlinkRuntimeException ex) {
assertTrue(
ExceptionUtils.findThrowable(ex, ExpectedTestException.class)
.isPresent());
}
assertThatThrownBy(() -> jResult.getAccumulatorResult(name))
.isInstanceOf(FlinkRuntimeException.class)
.hasCauseInstanceOf(ExpectedTestException.class);

assertThatThrownBy(() -> jResultCopied.getAccumulatorResult(name))
.isInstanceOf(FlinkRuntimeException.class)
.hasCauseInstanceOf(ExpectedTestException.class);
} else {
assertEquals(value.get(), jResult.getAccumulatorResult(name));
assertEquals(value.get(), jResultCopied.getAccumulatorResult(name));
assertThat((Object) jResult.getAccumulatorResult(name)).isEqualTo(value.get());
assertThat((Object) jResultCopied.getAccumulatorResult(name))
.isEqualTo(value.get());
}
}
}

@Test
public void testSerializationWithNullValues() throws Exception {
void testSerializationWithNullValues() throws Exception {
SerializedJobExecutionResult result = new SerializedJobExecutionResult(null, 0L, null);
SerializedJobExecutionResult cloned = CommonTestUtils.createCopySerializable(result);

assertNull(cloned.getJobId());
assertEquals(0L, cloned.getNetRuntime());
assertNull(cloned.getSerializedAccumulatorResults());
assertThat(cloned.getJobId()).isNull();
assertThat(cloned.getNetRuntime()).isZero();
assertThat(cloned.getSerializedAccumulatorResults()).isNull();

JobExecutionResult jResult = result.toJobExecutionResult(getClass().getClassLoader());
assertNull(jResult.getJobID());
assertTrue(jResult.getAllAccumulatorResults().isEmpty());
assertThat(jResult.getJobID()).isNull();
assertThat(jResult.getAllAccumulatorResults()).isEmpty();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -19,10 +19,8 @@
package org.apache.flink.runtime.clusterframework;

import org.apache.flink.api.common.JobStatus;
import org.apache.flink.util.TestLoggerExtension;

import org.junit.jupiter.api.Test;
import org.junit.jupiter.api.extension.ExtendWith;
import org.junit.jupiter.params.ParameterizedTest;
import org.junit.jupiter.params.provider.EnumSource;

Expand All @@ -34,64 +32,63 @@
import static org.assertj.core.api.Assertions.assertThatThrownBy;

/** Tests for the {@link ApplicationStatus}. */
@ExtendWith(TestLoggerExtension.class)
public class ApplicationStatusTest {
class ApplicationStatusTest {

private static final int SUCCESS_EXIT_CODE = 0;

@Test
public void succeededStatusMapsToSuccessExitCode() {
void succeededStatusMapsToSuccessExitCode() {
int exitCode = ApplicationStatus.SUCCEEDED.processExitCode();
assertThat(exitCode).isEqualTo(SUCCESS_EXIT_CODE);
}

@Test
public void cancelledStatusMapsToSuccessExitCode() {
void cancelledStatusMapsToSuccessExitCode() {
int exitCode = ApplicationStatus.CANCELED.processExitCode();
assertThat(exitCode).isEqualTo(SUCCESS_EXIT_CODE);
}

@Test
public void notSucceededNorCancelledStatusMapsToNonSuccessExitCode() {
void notSucceededNorCancelledStatusMapsToNonSuccessExitCode() {
Iterable<Integer> exitCodes = exitCodes(notSucceededNorCancelledStatus());
assertThat(exitCodes).doesNotContain(SUCCESS_EXIT_CODE);
}

@Test
public void testJobStatusFromSuccessApplicationStatus() {
void testJobStatusFromSuccessApplicationStatus() {
assertThat(ApplicationStatus.SUCCEEDED.deriveJobStatus()).isEqualTo(JobStatus.FINISHED);
}

@Test
public void testJobStatusFromFailedApplicationStatus() {
void testJobStatusFromFailedApplicationStatus() {
assertThat(ApplicationStatus.FAILED.deriveJobStatus()).isEqualTo(JobStatus.FAILED);
}

@Test
public void testJobStatusFromCancelledApplicationStatus() {
void testJobStatusFromCancelledApplicationStatus() {
assertThat(ApplicationStatus.CANCELED.deriveJobStatus()).isEqualTo(JobStatus.CANCELED);
}

@Test
public void testJobStatusFailsFromUnknownApplicationStatuses() {
void testJobStatusFailsFromUnknownApplicationStatuses() {
assertThatThrownBy(ApplicationStatus.UNKNOWN::deriveJobStatus)
.isInstanceOf(UnsupportedOperationException.class);
}

@Test
public void testSuccessApplicationStatusFromJobStatus() {
void testSuccessApplicationStatusFromJobStatus() {
assertThat(ApplicationStatus.fromJobStatus(JobStatus.FINISHED))
.isEqualTo(ApplicationStatus.SUCCEEDED);
}

@Test
public void testFailedApplicationStatusFromJobStatus() {
void testFailedApplicationStatusFromJobStatus() {
assertThat(ApplicationStatus.fromJobStatus(JobStatus.FAILED))
.isEqualTo(ApplicationStatus.FAILED);
}

@Test
public void testCancelledApplicationStatusFromJobStatus() {
void testCancelledApplicationStatusFromJobStatus() {
assertThat(ApplicationStatus.fromJobStatus(JobStatus.CANCELED))
.isEqualTo(ApplicationStatus.CANCELED);
}
Expand All @@ -114,7 +111,7 @@ public void testUnknownApplicationStatusFromJobStatus(JobStatus jobStatus) {
}

@Test
public void testUnknownApplicationStatusForMissingJobStatus() {
void testUnknownApplicationStatusForMissingJobStatus() {
assertThat(ApplicationStatus.fromJobStatus(null)).isEqualTo(ApplicationStatus.UNKNOWN);
}

Expand Down

0 comments on commit 7225095

Please sign in to comment.