From dfcfdbe91f556b85e4c2825e6ef2732c84f128cb Mon Sep 17 00:00:00 2001
From: 1996fanrui <1996fanrui@gmail.com>
Date: Thu, 26 Oct 2023 15:35:15 +0800
Subject: [PATCH] [FLINK-33354][runtime] Cache TaskInformation and
JobInformation to avoid deserializing duplicate big objects
---
.../deployment/TaskDeploymentDescriptor.java | 105 ++++++++++----
.../executiongraph/JobInformation.java | 59 ++++++--
.../executiongraph/TaskInformation.java | 39 +++++
.../runtime/taskexecutor/TaskExecutor.java | 21 ++-
.../taskexecutor/TaskManagerServices.java | 23 +++
.../TaskDeploymentDescriptorTest.java | 136 ++++++++++++++----
.../runtime/dispatcher/JobMasterTester.java | 4 +-
.../DefaultExecutionGraphDeploymentTest.java | 12 +-
...oymentWithSmallBlobCacheSizeLimitTest.java | 6 +-
.../TaskManagerServicesBuilder.java | 2 +
10 files changed, 327 insertions(+), 80 deletions(-)
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
index 97596e1ac9fd32..c5f8f41705d7ba 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java
@@ -29,6 +29,7 @@
import org.apache.flink.runtime.executiongraph.TaskInformation;
import org.apache.flink.runtime.util.GroupCache;
import org.apache.flink.util.FileUtils;
+import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
@@ -99,10 +100,22 @@ public Offloaded(PermanentBlobKey serializedValueKey) {
}
/** Serialized job information if non-offloaded or PermanentBlobKey if offloaded. */
- private MaybeOffloaded serializedJobInformation;
+ private final MaybeOffloaded serializedJobInformation;
/** Serialized task information if non-offloaded or PermanentBlobKey if offloaded. */
- private MaybeOffloaded serializedTaskInformation;
+ private final MaybeOffloaded serializedTaskInformation;
+
+ /**
+ * The job information, it isn't null when serializedJobInformation is offloaded and after
+ * {@link #loadBigData}.
+ */
+ private transient JobInformation jobInformation;
+
+ /**
+ * The task information, it isn't null when serializedTaskInformation is offloaded and after
+ * {@link #loadBigData}.
+ */
+ private transient TaskInformation taskInformation;
/**
* The ID referencing the job this task belongs to.
@@ -151,13 +164,6 @@ public TaskDeploymentDescriptor(
this.inputGates = Preconditions.checkNotNull(inputGateDeploymentDescriptors);
}
- /**
- * Return the sub task's serialized job information.
- *
- * @return serialized job information (may throw {@link IllegalStateException} if {@link
- * #loadBigData} is not called beforehand).
- * @throws IllegalStateException If job information is offloaded to BLOB store.
- */
public SerializedValue getSerializedJobInformation() {
if (serializedJobInformation instanceof NonOffloaded) {
NonOffloaded jobInformation =
@@ -170,12 +176,45 @@ public SerializedValue getSerializedJobInformation() {
}
/**
- * Return the sub task's serialized task information.
+ * Return the sub task's job information.
+ *
+ * @return job information (may throw {@link IllegalStateException} if {@link #loadBigData} is
+ * not called beforehand).
+ * @throws IllegalStateException If job information is offloaded to BLOB store.
+ */
+ public JobInformation getJobInformation() throws IOException, ClassNotFoundException {
+ if (jobInformation != null) {
+ return jobInformation;
+ }
+ if (serializedJobInformation instanceof NonOffloaded) {
+ NonOffloaded jobInformation =
+ (NonOffloaded) serializedJobInformation;
+ return jobInformation.serializedValue.deserializeValue(getClass().getClassLoader());
+ }
+ throw new IllegalStateException(
+ "Trying to work with offloaded serialized job information.");
+ }
+
+ /**
+ * Return the sub task's task information.
*
- * @return serialized task information (may throw {@link IllegalStateException} if {@link
- * #loadBigData} is not called beforehand)).
+ * @return task information (may throw {@link IllegalStateException} if {@link #loadBigData} is
+ * not called beforehand)).
* @throws IllegalStateException If job information is offloaded to BLOB store.
*/
+ public TaskInformation getTaskInformation() throws IOException, ClassNotFoundException {
+ if (taskInformation != null) {
+ return taskInformation;
+ }
+ if (serializedTaskInformation instanceof NonOffloaded) {
+ NonOffloaded taskInformation =
+ (NonOffloaded) serializedTaskInformation;
+ return taskInformation.serializedValue.deserializeValue(getClass().getClassLoader());
+ }
+ throw new IllegalStateException(
+ "Trying to work with offloaded serialized task information.");
+ }
+
public SerializedValue getSerializedTaskInformation() {
if (serializedTaskInformation instanceof NonOffloaded) {
NonOffloaded taskInformation =
@@ -243,6 +282,8 @@ public AllocationID getAllocationId() {
*/
public void loadBigData(
@Nullable PermanentBlobService blobService,
+ GroupCache jobInformationCache,
+ GroupCache taskInformationCache,
GroupCache shuffleDescriptorsCache)
throws IOException, ClassNotFoundException {
@@ -254,13 +295,19 @@ public void loadBigData(
Preconditions.checkNotNull(blobService);
- final File dataFile = blobService.getFile(jobId, jobInfoKey);
- // NOTE: Do not delete the job info BLOB since it may be needed again during recovery.
- // (it is deleted automatically on the BLOB server and cache when the job
- // enters a terminal state)
- SerializedValue serializedValue =
- SerializedValue.fromBytes(FileUtils.readAllBytes(dataFile.toPath()));
- serializedJobInformation = new NonOffloaded<>(serializedValue);
+ JobInformation jobInformation = jobInformationCache.get(jobId, jobInfoKey);
+ if (jobInformation == null) {
+ final File dataFile = blobService.getFile(jobId, jobInfoKey);
+ // NOTE: Do not delete the job info BLOB since it may be needed again during
+ // recovery. (it is deleted automatically on the BLOB server and cache when the job
+ // enters a terminal state)
+ jobInformation =
+ InstantiationUtil.deserializeObject(
+ FileUtils.readAllBytes(dataFile.toPath()),
+ getClass().getClassLoader());
+ jobInformationCache.put(jobId, jobInfoKey, jobInformation);
+ }
+ this.jobInformation = jobInformation.deepCopy();
}
// re-integrate offloaded task info from blob
@@ -270,13 +317,19 @@ public void loadBigData(
Preconditions.checkNotNull(blobService);
- final File dataFile = blobService.getFile(jobId, taskInfoKey);
- // NOTE: Do not delete the task info BLOB since it may be needed again during recovery.
- // (it is deleted automatically on the BLOB server and cache when the job
- // enters a terminal state)
- SerializedValue serializedValue =
- SerializedValue.fromBytes(FileUtils.readAllBytes(dataFile.toPath()));
- serializedTaskInformation = new NonOffloaded<>(serializedValue);
+ TaskInformation taskInformation = taskInformationCache.get(jobId, taskInfoKey);
+ if (taskInformation == null) {
+ final File dataFile = blobService.getFile(jobId, taskInfoKey);
+ // NOTE: Do not delete the task info BLOB since it may be needed again during
+ // recovery. (it is deleted automatically on the BLOB server and cache when the job
+ // enters a terminal state)
+ taskInformation =
+ InstantiationUtil.deserializeObject(
+ FileUtils.readAllBytes(dataFile.toPath()),
+ getClass().getClassLoader());
+ taskInformationCache.put(jobId, taskInfoKey, taskInformation);
+ }
+ this.taskInformation = taskInformation.deepCopy();
}
for (InputGateDeploymentDescriptor inputGate : inputGates) {
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/JobInformation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/JobInformation.java
index f30bf6f32d2144..5792caa2711532 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/JobInformation.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/JobInformation.java
@@ -21,13 +21,18 @@
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
import org.apache.flink.configuration.Configuration;
+import org.apache.flink.configuration.UnmodifiableConfiguration;
import org.apache.flink.runtime.blob.PermanentBlobKey;
import org.apache.flink.util.Preconditions;
import org.apache.flink.util.SerializedValue;
+import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableCollection;
+import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableList;
+
import java.io.Serializable;
import java.net.URL;
import java.util.Collection;
+import java.util.Objects;
/** Container class for job information which is stored in the {@link ExecutionGraph}. */
public class JobInformation implements Serializable {
@@ -44,13 +49,13 @@ public class JobInformation implements Serializable {
private final SerializedValue serializedExecutionConfig;
/** Configuration of the job. */
- private final Configuration jobConfiguration;
+ private final UnmodifiableConfiguration jobConfiguration;
/** Blob keys for the required jar files. */
- private final Collection requiredJarFileBlobKeys;
+ private final ImmutableCollection requiredJarFileBlobKeys;
/** URLs specifying the classpath to add to the class loader. */
- private final Collection requiredClasspathURLs;
+ private final ImmutableCollection requiredClasspathURLs;
public JobInformation(
JobID jobId,
@@ -62,9 +67,12 @@ public JobInformation(
this.jobId = Preconditions.checkNotNull(jobId);
this.jobName = Preconditions.checkNotNull(jobName);
this.serializedExecutionConfig = Preconditions.checkNotNull(serializedExecutionConfig);
- this.jobConfiguration = Preconditions.checkNotNull(jobConfiguration);
- this.requiredJarFileBlobKeys = Preconditions.checkNotNull(requiredJarFileBlobKeys);
- this.requiredClasspathURLs = Preconditions.checkNotNull(requiredClasspathURLs);
+ this.jobConfiguration =
+ new UnmodifiableConfiguration(Preconditions.checkNotNull(jobConfiguration));
+ this.requiredJarFileBlobKeys =
+ ImmutableList.copyOf(Preconditions.checkNotNull(requiredJarFileBlobKeys));
+ this.requiredClasspathURLs =
+ ImmutableList.copyOf(Preconditions.checkNotNull(requiredClasspathURLs));
}
public JobID getJobId() {
@@ -79,18 +87,51 @@ public SerializedValue getSerializedExecutionConfig() {
return serializedExecutionConfig;
}
- public Configuration getJobConfiguration() {
+ public UnmodifiableConfiguration getJobConfiguration() {
return jobConfiguration;
}
- public Collection getRequiredJarFileBlobKeys() {
+ public ImmutableCollection getRequiredJarFileBlobKeys() {
return requiredJarFileBlobKeys;
}
- public Collection getRequiredClasspathURLs() {
+ public ImmutableCollection getRequiredClasspathURLs() {
return requiredClasspathURLs;
}
+ // All fields are immutable, so return this directly.
+ public JobInformation deepCopy() {
+ return this;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ JobInformation that = (JobInformation) o;
+ return Objects.equals(jobId, that.jobId)
+ && Objects.equals(jobName, that.jobName)
+ && Objects.equals(serializedExecutionConfig, that.serializedExecutionConfig)
+ && Objects.equals(jobConfiguration, that.jobConfiguration)
+ && Objects.equals(requiredJarFileBlobKeys, that.requiredJarFileBlobKeys)
+ && Objects.equals(requiredClasspathURLs, that.requiredClasspathURLs);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(
+ jobId,
+ jobName,
+ serializedExecutionConfig,
+ jobConfiguration,
+ requiredJarFileBlobKeys,
+ requiredClasspathURLs);
+ }
+
// ------------------------------------------------------------------------
@Override
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/TaskInformation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/TaskInformation.java
index 6a3b1a26e7f0b2..ed6bf973b755ab 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/TaskInformation.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/TaskInformation.java
@@ -23,6 +23,7 @@
import org.apache.flink.util.Preconditions;
import java.io.Serializable;
+import java.util.Objects;
/**
* Container class for operator/task specific information which are stored at the {@link
@@ -88,4 +89,42 @@ public String getInvokableClassName() {
public Configuration getTaskConfiguration() {
return taskConfiguration;
}
+
+ public TaskInformation deepCopy() {
+ return new TaskInformation(
+ getJobVertexId(),
+ getTaskName(),
+ getNumberOfSubtasks(),
+ getMaxNumberOfSubtasks(),
+ getInvokableClassName(),
+ new Configuration(getTaskConfiguration()));
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+ TaskInformation that = (TaskInformation) o;
+ return numberOfSubtasks == that.numberOfSubtasks
+ && maxNumberOfSubtasks == that.maxNumberOfSubtasks
+ && Objects.equals(jobVertexId, that.jobVertexId)
+ && Objects.equals(taskName, that.taskName)
+ && Objects.equals(invokableClassName, that.invokableClassName)
+ && Objects.equals(taskConfiguration, that.taskConfiguration);
+ }
+
+ @Override
+ public int hashCode() {
+ return Objects.hash(
+ jobVertexId,
+ taskName,
+ numberOfSubtasks,
+ maxNumberOfSubtasks,
+ invokableClassName,
+ taskConfiguration);
+ }
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
index 7146c2befa952a..c3a705e0e0e68c 100644
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java
@@ -300,6 +300,8 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway {
private final ThreadInfoSampleService threadInfoSampleService;
+ private final GroupCache jobInformationCache;
+ private final GroupCache taskInformationCache;
private final GroupCache
shuffleDescriptorsCache;
@@ -378,6 +380,8 @@ public TaskExecutor(
taskExecutorServices.getSlotAllocationSnapshotPersistenceService();
this.sharedResources = taskExecutorServices.getSharedResources();
+ this.jobInformationCache = taskExecutorServices.getJobInformationCache();
+ this.taskInformationCache = taskExecutorServices.getTaskInformationCache();
this.shuffleDescriptorsCache = taskExecutorServices.getShuffleDescriptorCache();
}
@@ -508,6 +512,8 @@ public CompletableFuture onStop() {
changelogStoragesManager.shutdown();
channelStateExecutorFactoryManager.shutdown();
+ jobInformationCache.clear();
+ taskInformationCache.clear();
shuffleDescriptorsCache.clear();
Preconditions.checkState(jobTable.isEmpty());
@@ -672,7 +678,10 @@ public CompletableFuture submitTask(
// re-integrate offloaded data and deserialize shuffle descriptors
try {
tdd.loadBigData(
- taskExecutorBlobService.getPermanentBlobService(), shuffleDescriptorsCache);
+ taskExecutorBlobService.getPermanentBlobService(),
+ jobInformationCache,
+ taskInformationCache,
+ shuffleDescriptorsCache);
} catch (IOException | ClassNotFoundException e) {
throw new TaskSubmissionException(
"Could not re-integrate offloaded TaskDeploymentDescriptor data.", e);
@@ -682,12 +691,8 @@ public CompletableFuture submitTask(
final JobInformation jobInformation;
final TaskInformation taskInformation;
try {
- jobInformation =
- tdd.getSerializedJobInformation()
- .deserializeValue(getClass().getClassLoader());
- taskInformation =
- tdd.getSerializedTaskInformation()
- .deserializeValue(getClass().getClassLoader());
+ jobInformation = tdd.getJobInformation();
+ taskInformation = tdd.getTaskInformation();
} catch (IOException | ClassNotFoundException e) {
throw new TaskSubmissionException(
"Could not deserialize the job or task information.", e);
@@ -1907,6 +1912,8 @@ private void releaseJobResources(JobID jobId, Exception cause) {
changelogStoragesManager.releaseResourcesForJob(jobId);
currentSlotOfferPerJob.remove(jobId);
channelStateExecutorFactoryManager.releaseResourcesForJob(jobId);
+ jobInformationCache.clearCacheForGroup(jobId);
+ taskInformationCache.clearCacheForGroup(jobId);
shuffleDescriptorsCache.clearCacheForGroup(jobId);
fileMergingManager.releaseMergingSnapshotManagerForJob(jobId);
}
diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
index d988923a96ac4d..5302e54bcb52ed 100755
--- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
+++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java
@@ -29,6 +29,8 @@
import org.apache.flink.runtime.entrypoint.WorkingDirectory;
import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager;
import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager;
+import org.apache.flink.runtime.executiongraph.JobInformation;
+import org.apache.flink.runtime.executiongraph.TaskInformation;
import org.apache.flink.runtime.io.disk.iomanager.IOManager;
import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync;
import org.apache.flink.runtime.io.network.TaskEventDispatcher;
@@ -94,6 +96,8 @@ public class TaskManagerServices {
private final LibraryCacheManager libraryCacheManager;
private final SlotAllocationSnapshotPersistenceService slotAllocationSnapshotPersistenceService;
private final SharedResources sharedResources;
+ private final GroupCache jobInformationCache;
+ private final GroupCache taskInformationCache;
private final GroupCache
shuffleDescriptorsCache;
@@ -116,6 +120,8 @@ public class TaskManagerServices {
LibraryCacheManager libraryCacheManager,
SlotAllocationSnapshotPersistenceService slotAllocationSnapshotPersistenceService,
SharedResources sharedResources,
+ GroupCache jobInformationCache,
+ GroupCache taskInformationCache,
GroupCache shuffleDescriptorsCache) {
this.unresolvedTaskManagerLocation =
@@ -138,6 +144,8 @@ public class TaskManagerServices {
this.libraryCacheManager = Preconditions.checkNotNull(libraryCacheManager);
this.slotAllocationSnapshotPersistenceService = slotAllocationSnapshotPersistenceService;
this.sharedResources = Preconditions.checkNotNull(sharedResources);
+ this.jobInformationCache = jobInformationCache;
+ this.taskInformationCache = taskInformationCache;
this.shuffleDescriptorsCache = Preconditions.checkNotNull(shuffleDescriptorsCache);
}
@@ -213,6 +221,14 @@ public SharedResources getSharedResources() {
return sharedResources;
}
+ public GroupCache getJobInformationCache() {
+ return jobInformationCache;
+ }
+
+ public GroupCache getTaskInformationCache() {
+ return taskInformationCache;
+ }
+
public GroupCache getShuffleDescriptorCache() {
return shuffleDescriptorsCache;
}
@@ -411,6 +427,11 @@ public static TaskManagerServices fromConfiguration(
NoOpSlotAllocationSnapshotPersistenceService.INSTANCE;
}
+ final GroupCache jobInformationCache =
+ new DefaultGroupCache.Factory().create();
+ final GroupCache taskInformationCache =
+ new DefaultGroupCache.Factory().create();
+
final GroupCache shuffleDescriptorsCache =
new DefaultGroupCache.Factory()
.create();
@@ -434,6 +455,8 @@ public static TaskManagerServices fromConfiguration(
libraryCacheManager,
slotAllocationSnapshotPersistenceService,
new SharedResources(),
+ jobInformationCache,
+ taskInformationCache,
shuffleDescriptorsCache);
}
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
index 6191b19767d20a..178df288eaa4ce 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java
@@ -20,9 +20,13 @@
import org.apache.flink.api.common.ExecutionConfig;
import org.apache.flink.api.common.JobID;
+import org.apache.flink.configuration.BlobServerOptions;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.core.testutils.CommonTestUtils;
+import org.apache.flink.runtime.blob.BlobServer;
+import org.apache.flink.runtime.blob.BlobWriter;
import org.apache.flink.runtime.blob.PermanentBlobKey;
+import org.apache.flink.runtime.blob.VoidBlobStore;
import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore;
import org.apache.flink.runtime.checkpoint.TaskStateSnapshot;
import org.apache.flink.runtime.clusterframework.types.AllocationID;
@@ -32,14 +36,21 @@
import org.apache.flink.runtime.jobgraph.JobVertexID;
import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable;
import org.apache.flink.runtime.operators.BatchTask;
+import org.apache.flink.runtime.util.DefaultGroupCache;
+import org.apache.flink.runtime.util.GroupCache;
+import org.apache.flink.runtime.util.NoOpGroupCache;
+import org.apache.flink.testutils.junit.utils.TempDirUtils;
+import org.apache.flink.types.Either;
import org.apache.flink.util.SerializedValue;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.io.TempDir;
import javax.annotation.Nonnull;
import java.io.IOException;
import java.net.URL;
+import java.nio.file.Path;
import java.util.ArrayList;
import java.util.List;
@@ -50,6 +61,8 @@
/** Tests for the {@link TaskDeploymentDescriptor}. */
class TaskDeploymentDescriptorTest {
+ @TempDir Path temporaryFolder;
+
private static final JobID jobID = new JobID();
private static final JobVertexID vertexID = new JobVertexID();
private static final ExecutionAttemptID execId = createExecutionAttemptId(vertexID);
@@ -73,24 +86,28 @@ class TaskDeploymentDescriptorTest {
private final SerializedValue executionConfig =
new SerializedValue<>(new ExecutionConfig());
+
+ private final JobInformation jobInformation =
+ new JobInformation(
+ jobID,
+ jobName,
+ executionConfig,
+ jobConfiguration,
+ requiredJars,
+ requiredClasspaths);
private final SerializedValue serializedJobInformation =
- new SerializedValue<>(
- new JobInformation(
- jobID,
- jobName,
- executionConfig,
- jobConfiguration,
- requiredJars,
- requiredClasspaths));
+ new SerializedValue<>(jobInformation);
+
+ private final TaskInformation taskInformation =
+ new TaskInformation(
+ vertexID,
+ taskName,
+ currentNumberOfSubtasks,
+ numberOfKeyGroups,
+ invokableClass.getName(),
+ taskConfiguration);
private final SerializedValue serializedJobVertexInformation =
- new SerializedValue<>(
- new TaskInformation(
- vertexID,
- taskName,
- currentNumberOfSubtasks,
- numberOfKeyGroups,
- invokableClass.getName(),
- taskConfiguration));
+ new SerializedValue<>(taskInformation);
TaskDeploymentDescriptorTest() throws IOException {}
@@ -104,19 +121,15 @@ void testSerialization() throws Exception {
final TaskDeploymentDescriptor copy = CommonTestUtils.createCopySerializable(orig);
- assertThat(orig.getSerializedJobInformation())
- .isNotSameAs(copy.getSerializedJobInformation());
- assertThat(orig.getSerializedTaskInformation())
- .isNotSameAs(copy.getSerializedTaskInformation());
+ assertThat(orig.getJobInformation()).isNotSameAs(copy.getJobInformation());
+ assertThat(orig.getTaskInformation()).isNotSameAs(copy.getTaskInformation());
assertThat(orig.getExecutionAttemptId()).isNotSameAs(copy.getExecutionAttemptId());
assertThat(orig.getTaskRestore()).isNotSameAs(copy.getTaskRestore());
assertThat(orig.getProducedPartitions()).isNotSameAs(copy.getProducedPartitions());
assertThat(orig.getInputGates()).isNotSameAs(copy.getInputGates());
- assertThat(orig.getSerializedJobInformation())
- .isEqualTo(copy.getSerializedJobInformation());
- assertThat(orig.getSerializedTaskInformation())
- .isEqualTo(copy.getSerializedTaskInformation());
+ assertThat(orig.getJobInformation()).isEqualTo(copy.getJobInformation());
+ assertThat(orig.getTaskInformation()).isEqualTo(copy.getTaskInformation());
assertThat(orig.getExecutionAttemptId()).isEqualTo(copy.getExecutionAttemptId());
assertThat(orig.getAllocationId()).isEqualTo(copy.getAllocationId());
assertThat(orig.getSubtaskIndex()).isEqualTo(copy.getSubtaskIndex());
@@ -130,20 +143,85 @@ void testSerialization() throws Exception {
}
@Test
- void testOffLoadedAndNonOffLoadedPayload() {
+ void testOffLoadedAndNonOffLoadedPayload() throws IOException, ClassNotFoundException {
final TaskDeploymentDescriptor taskDeploymentDescriptor =
createTaskDeploymentDescriptor(
new TaskDeploymentDescriptor.NonOffloaded<>(serializedJobInformation),
new TaskDeploymentDescriptor.Offloaded<>(new PermanentBlobKey()));
- SerializedValue actualSerializedJobInformation =
- taskDeploymentDescriptor.getSerializedJobInformation();
- assertThat(actualSerializedJobInformation).isSameAs(serializedJobInformation);
+ JobInformation actualJobInformation = taskDeploymentDescriptor.getJobInformation();
+ assertThat(actualJobInformation).isEqualTo(jobInformation);
- assertThatThrownBy(taskDeploymentDescriptor::getSerializedTaskInformation)
+ assertThatThrownBy(taskDeploymentDescriptor::getTaskInformation)
.isInstanceOf(IllegalStateException.class);
}
+ @Test
+ void testTaskInformationCache() throws IOException, ClassNotFoundException {
+ try (BlobServer blobServer = setupBlobServer()) {
+ // Serialize taskInformation to blobServer and get the permanentBlobKey
+ Either, PermanentBlobKey> taskInformationOrBlobKey =
+ BlobWriter.serializeAndTryOffload(taskInformation, jobID, blobServer);
+ assertThat(taskInformationOrBlobKey.isRight()).isTrue();
+ PermanentBlobKey permanentBlobKey = taskInformationOrBlobKey.right();
+
+ GroupCache taskInformationCache =
+ new DefaultGroupCache.Factory()
+ .create();
+ // Test for tdd1
+ final TaskDeploymentDescriptor tdd1 =
+ createTaskDeploymentDescriptor(
+ new TaskDeploymentDescriptor.NonOffloaded<>(serializedJobInformation),
+ new TaskDeploymentDescriptor.Offloaded<>(permanentBlobKey));
+ assertThat(taskInformationCache.get(jobID, permanentBlobKey)).isNull();
+ tdd1.loadBigData(
+ blobServer,
+ new NoOpGroupCache<>(),
+ taskInformationCache,
+ new NoOpGroupCache<>());
+ TaskInformation taskInformation1 = tdd1.getTaskInformation();
+ // The TaskInformation is cached in taskInformationCache, and the cached taskInformation
+ // is equals to taskInformation1.
+ assertThat(taskInformationCache.get(jobID, permanentBlobKey))
+ .isNotNull()
+ .isEqualTo(taskInformation1);
+
+ // Test for tdd2
+ final TaskDeploymentDescriptor tdd2 =
+ createTaskDeploymentDescriptor(
+ new TaskDeploymentDescriptor.NonOffloaded<>(serializedJobInformation),
+ new TaskDeploymentDescriptor.Offloaded<>(permanentBlobKey));
+ tdd2.loadBigData(
+ blobServer,
+ new NoOpGroupCache<>(),
+ taskInformationCache,
+ new NoOpGroupCache<>());
+ TaskInformation taskInformation2 = tdd2.getTaskInformation();
+ // The TaskInformation2 is equals to taskInformation1, but they are not same.
+ assertThat(taskInformation2)
+ .isNotNull()
+ .isEqualTo(taskInformation1)
+ .isNotSameAs(taskInformation1);
+ // Configuration may be changed by subtask, so the configuration must be not same.
+ assertThat(taskInformation2.getTaskConfiguration())
+ .isNotNull()
+ .isEqualTo(taskInformation1.getTaskConfiguration())
+ .isNotSameAs(taskInformation1.getTaskConfiguration());
+ }
+ }
+
+ private BlobServer setupBlobServer() throws IOException {
+
+ Configuration config = new Configuration();
+ // always offload the serialized job and task information
+ config.setInteger(BlobServerOptions.OFFLOAD_MINSIZE, 0);
+ BlobServer blobServer =
+ new BlobServer(
+ config, TempDirUtils.newFolder(temporaryFolder), new VoidBlobStore());
+ blobServer.start();
+ return blobServer;
+ }
+
@Nonnull
private TaskDeploymentDescriptor createTaskDeploymentDescriptor(
TaskDeploymentDescriptor.MaybeOffloaded jobInformation,
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/JobMasterTester.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/JobMasterTester.java
index a5508f9698ec7f..08783509a9fe15 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/JobMasterTester.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/JobMasterTester.java
@@ -204,9 +204,7 @@ private CompletableFuture getTaskInformation(
"Task descriptor for %s not found.",
executionAttemptId)));
try {
- return descriptor
- .getSerializedTaskInformation()
- .deserializeValue(Thread.currentThread().getContextClassLoader());
+ return descriptor.getTaskInformation();
} catch (Exception e) {
throw new IllegalStateException(
String.format(
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentTest.java
index 54d8fc30964020..d354b49fedde9e 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentTest.java
@@ -179,7 +179,11 @@ void testBuildDeploymentDescriptor() throws Exception {
taskManagerGateway.setSubmitConsumer(
FunctionUtils.uncheckedConsumer(
taskDeploymentDescriptor -> {
- taskDeploymentDescriptor.loadBigData(blobCache, new NoOpGroupCache<>());
+ taskDeploymentDescriptor.loadBigData(
+ blobCache,
+ new NoOpGroupCache<>(),
+ new NoOpGroupCache<>(),
+ new NoOpGroupCache<>());
tdd.complete(taskDeploymentDescriptor);
}));
@@ -202,10 +206,8 @@ void testBuildDeploymentDescriptor() throws Exception {
TaskDeploymentDescriptor descr = tdd.get();
assertThat(descr).isNotNull();
- JobInformation jobInformation =
- descr.getSerializedJobInformation().deserializeValue(getClass().getClassLoader());
- TaskInformation taskInformation =
- descr.getSerializedTaskInformation().deserializeValue(getClass().getClassLoader());
+ JobInformation jobInformation = descr.getJobInformation();
+ TaskInformation taskInformation = descr.getTaskInformation();
assertThat(descr.getJobId()).isEqualTo(jobId);
assertThat(jobInformation.getJobId()).isEqualTo(jobId);
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentWithSmallBlobCacheSizeLimitTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentWithSmallBlobCacheSizeLimitTest.java
index 1848faa3959bbb..fae6a754646a4b 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentWithSmallBlobCacheSizeLimitTest.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentWithSmallBlobCacheSizeLimitTest.java
@@ -120,7 +120,11 @@ void testDeployMultipleTasksWithSmallBlobCacheSizeLimit() throws Exception {
taskManagerGateway.setSubmitConsumer(
FunctionUtils.uncheckedConsumer(
taskDeploymentDescriptor -> {
- taskDeploymentDescriptor.loadBigData(blobCache, new NoOpGroupCache<>());
+ taskDeploymentDescriptor.loadBigData(
+ blobCache,
+ new NoOpGroupCache<>(),
+ new NoOpGroupCache<>(),
+ new NoOpGroupCache<>());
tdds.offer(taskDeploymentDescriptor);
}));
diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesBuilder.java
index 193e56e20c3a99..68d9d5a275de73 100644
--- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesBuilder.java
+++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesBuilder.java
@@ -201,6 +201,8 @@ public TaskManagerServices build() {
libraryCacheManager,
slotAllocationSnapshotPersistenceService,
sharedResources,
+ new NoOpGroupCache<>(),
+ new NoOpGroupCache<>(),
new NoOpGroupCache<>());
}
}