diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputGateDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputGateDeploymentDescriptor.java index 50ed98ed1dcf7..333a91e0a7320 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputGateDeploymentDescriptor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/InputGateDeploymentDescriptor.java @@ -33,7 +33,7 @@ import org.apache.flink.runtime.jobgraph.DistributionPattern; import org.apache.flink.runtime.jobgraph.IntermediateDataSetID; import org.apache.flink.runtime.shuffle.ShuffleDescriptor; -import org.apache.flink.runtime.taskexecutor.ShuffleDescriptorsCache; +import org.apache.flink.runtime.util.GroupCache; import org.apache.flink.util.CompressedSerializedValue; import org.apache.flink.util.Preconditions; @@ -167,7 +167,7 @@ public ShuffleDescriptor[] getShuffleDescriptors() { public void tryLoadAndDeserializeShuffleDescriptors( @Nullable PermanentBlobService blobService, JobID jobId, - ShuffleDescriptorsCache shuffleDescriptorsCache) + GroupCache shuffleDescriptorsCache) throws IOException { if (inputChannels != null) { return; @@ -190,13 +190,14 @@ private void tryLoadAndDeserializeShuffleDescriptorGroup( @Nullable PermanentBlobService blobService, JobID jobId, MaybeOffloaded serializedShuffleDescriptors, - ShuffleDescriptorsCache shuffleDescriptorsCache) + GroupCache shuffleDescriptorsCache) throws IOException, ClassNotFoundException { if (serializedShuffleDescriptors instanceof Offloaded) { PermanentBlobKey blobKey = ((Offloaded) serializedShuffleDescriptors) .serializedValueKey; - ShuffleDescriptorGroup shuffleDescriptorGroup = shuffleDescriptorsCache.get(blobKey); + ShuffleDescriptorGroup shuffleDescriptorGroup = + shuffleDescriptorsCache.get(jobId, blobKey); if (shuffleDescriptorGroup == null) { Preconditions.checkNotNull(blobService); // NOTE: Do not delete the ShuffleDescriptor BLOBs since it may be needed again diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java index bd3b770142c01..5684066735f03 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java @@ -23,19 +23,22 @@ import org.apache.flink.runtime.blob.PermanentBlobService; import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore; import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory.ShuffleDescriptorGroup; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.JobInformation; import org.apache.flink.runtime.executiongraph.TaskInformation; -import org.apache.flink.runtime.taskexecutor.ShuffleDescriptorsCache; -import org.apache.flink.util.FileUtils; +import org.apache.flink.runtime.util.GroupCache; +import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.Preconditions; import org.apache.flink.util.SerializedValue; import javax.annotation.Nullable; +import java.io.BufferedInputStream; import java.io.File; import java.io.IOException; import java.io.Serializable; +import java.nio.file.Files; import java.util.List; /** @@ -97,11 +100,23 @@ public Offloaded(PermanentBlobKey serializedValueKey) { } } - /** Serialized job information or null if offloaded. */ - private MaybeOffloaded serializedJobInformation; + /** Serialized job information if non-offloaded or PermanentBlobKey if offloaded. */ + private final MaybeOffloaded serializedJobInformation; - /** Serialized task information or null if offloaded. */ - private MaybeOffloaded serializedTaskInformation; + /** Serialized task information if non-offloaded or PermanentBlobKey if offloaded. */ + private final MaybeOffloaded serializedTaskInformation; + + /** + * The job information, it isn't null when serializedJobInformation is offloaded and after + * {@link #loadBigData}. + */ + private transient JobInformation jobInformation; + + /** + * The task information, it isn't null when serializedTaskInformation is offloaded and after + * {@link #loadBigData}. + */ + private transient TaskInformation taskInformation; /** * The ID referencing the job this task belongs to. @@ -151,39 +166,43 @@ public TaskDeploymentDescriptor( } /** - * Return the sub task's serialized job information. + * Return the sub task's job information. * - * @return serialized job information (may throw {@link IllegalStateException} if {@link - * #loadBigData} is not called beforehand). + * @return job information (may throw {@link IllegalStateException} if {@link #loadBigData} is + * not called beforehand). * @throws IllegalStateException If job information is offloaded to BLOB store. */ - public SerializedValue getSerializedJobInformation() { + public JobInformation getJobInformation() throws IOException, ClassNotFoundException { + if (jobInformation != null) { + return jobInformation; + } if (serializedJobInformation instanceof NonOffloaded) { NonOffloaded jobInformation = (NonOffloaded) serializedJobInformation; - return jobInformation.serializedValue; - } else { - throw new IllegalStateException( - "Trying to work with offloaded serialized job information."); + return jobInformation.serializedValue.deserializeValue(getClass().getClassLoader()); } + throw new IllegalStateException( + "Trying to work with offloaded serialized job information."); } /** - * Return the sub task's serialized task information. + * Return the sub task's task information. * - * @return serialized task information (may throw {@link IllegalStateException} if {@link - * #loadBigData} is not called beforehand)). + * @return task information (may throw {@link IllegalStateException} if {@link #loadBigData} is + * not called beforehand)). * @throws IllegalStateException If job information is offloaded to BLOB store. */ - public SerializedValue getSerializedTaskInformation() { + public TaskInformation getTaskInformation() throws IOException, ClassNotFoundException { + if (taskInformation != null) { + return taskInformation; + } if (serializedTaskInformation instanceof NonOffloaded) { NonOffloaded taskInformation = (NonOffloaded) serializedTaskInformation; - return taskInformation.serializedValue; - } else { - throw new IllegalStateException( - "Trying to work with offloaded serialized job information."); + return taskInformation.serializedValue.deserializeValue(getClass().getClassLoader()); } + throw new IllegalStateException( + "Trying to work with offloaded serialized task information."); } /** @@ -242,7 +261,9 @@ public AllocationID getAllocationId() { */ public void loadBigData( @Nullable PermanentBlobService blobService, - ShuffleDescriptorsCache shuffleDescriptorsCache) + GroupCache jobInformationCache, + GroupCache taskInformationCache, + GroupCache shuffleDescriptorsCache) throws IOException, ClassNotFoundException { // re-integrate offloaded job info from blob @@ -253,13 +274,19 @@ public void loadBigData( Preconditions.checkNotNull(blobService); - final File dataFile = blobService.getFile(jobId, jobInfoKey); - // NOTE: Do not delete the job info BLOB since it may be needed again during recovery. - // (it is deleted automatically on the BLOB server and cache when the job - // enters a terminal state) - SerializedValue serializedValue = - SerializedValue.fromBytes(FileUtils.readAllBytes(dataFile.toPath())); - serializedJobInformation = new NonOffloaded<>(serializedValue); + JobInformation jobInformation = jobInformationCache.get(jobId, jobInfoKey); + if (jobInformation == null) { + final File dataFile = blobService.getFile(jobId, jobInfoKey); + // NOTE: Do not delete the job info BLOB since it may be needed again during + // recovery. (it is deleted automatically on the BLOB server and cache when the job + // enters a terminal state) + jobInformation = + InstantiationUtil.deserializeObject( + new BufferedInputStream(Files.newInputStream(dataFile.toPath())), + getClass().getClassLoader()); + jobInformationCache.put(jobId, jobInfoKey, jobInformation); + } + this.jobInformation = jobInformation.deepCopy(); } // re-integrate offloaded task info from blob @@ -269,23 +296,25 @@ public void loadBigData( Preconditions.checkNotNull(blobService); - final File dataFile = blobService.getFile(jobId, taskInfoKey); - // NOTE: Do not delete the task info BLOB since it may be needed again during recovery. - // (it is deleted automatically on the BLOB server and cache when the job - // enters a terminal state) - SerializedValue serializedValue = - SerializedValue.fromBytes(FileUtils.readAllBytes(dataFile.toPath())); - serializedTaskInformation = new NonOffloaded<>(serializedValue); + TaskInformation taskInformation = taskInformationCache.get(jobId, taskInfoKey); + if (taskInformation == null) { + final File dataFile = blobService.getFile(jobId, taskInfoKey); + // NOTE: Do not delete the task info BLOB since it may be needed again during + // recovery. (it is deleted automatically on the BLOB server and cache when the job + // enters a terminal state) + taskInformation = + InstantiationUtil.deserializeObject( + new BufferedInputStream(Files.newInputStream(dataFile.toPath())), + getClass().getClassLoader()); + taskInformationCache.put(jobId, taskInfoKey, taskInformation); + } + this.taskInformation = taskInformation.deepCopy(); } for (InputGateDeploymentDescriptor inputGate : inputGates) { inputGate.tryLoadAndDeserializeShuffleDescriptors( blobService, jobId, shuffleDescriptorsCache); } - - // make sure that the serialized job and task information fields are filled - Preconditions.checkNotNull(serializedJobInformation); - Preconditions.checkNotNull(serializedTaskInformation); } @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/JobInformation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/JobInformation.java index f30bf6f32d214..5792caa271153 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/JobInformation.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/JobInformation.java @@ -21,13 +21,18 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.Configuration; +import org.apache.flink.configuration.UnmodifiableConfiguration; import org.apache.flink.runtime.blob.PermanentBlobKey; import org.apache.flink.util.Preconditions; import org.apache.flink.util.SerializedValue; +import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableCollection; +import org.apache.flink.shaded.guava31.com.google.common.collect.ImmutableList; + import java.io.Serializable; import java.net.URL; import java.util.Collection; +import java.util.Objects; /** Container class for job information which is stored in the {@link ExecutionGraph}. */ public class JobInformation implements Serializable { @@ -44,13 +49,13 @@ public class JobInformation implements Serializable { private final SerializedValue serializedExecutionConfig; /** Configuration of the job. */ - private final Configuration jobConfiguration; + private final UnmodifiableConfiguration jobConfiguration; /** Blob keys for the required jar files. */ - private final Collection requiredJarFileBlobKeys; + private final ImmutableCollection requiredJarFileBlobKeys; /** URLs specifying the classpath to add to the class loader. */ - private final Collection requiredClasspathURLs; + private final ImmutableCollection requiredClasspathURLs; public JobInformation( JobID jobId, @@ -62,9 +67,12 @@ public JobInformation( this.jobId = Preconditions.checkNotNull(jobId); this.jobName = Preconditions.checkNotNull(jobName); this.serializedExecutionConfig = Preconditions.checkNotNull(serializedExecutionConfig); - this.jobConfiguration = Preconditions.checkNotNull(jobConfiguration); - this.requiredJarFileBlobKeys = Preconditions.checkNotNull(requiredJarFileBlobKeys); - this.requiredClasspathURLs = Preconditions.checkNotNull(requiredClasspathURLs); + this.jobConfiguration = + new UnmodifiableConfiguration(Preconditions.checkNotNull(jobConfiguration)); + this.requiredJarFileBlobKeys = + ImmutableList.copyOf(Preconditions.checkNotNull(requiredJarFileBlobKeys)); + this.requiredClasspathURLs = + ImmutableList.copyOf(Preconditions.checkNotNull(requiredClasspathURLs)); } public JobID getJobId() { @@ -79,18 +87,51 @@ public SerializedValue getSerializedExecutionConfig() { return serializedExecutionConfig; } - public Configuration getJobConfiguration() { + public UnmodifiableConfiguration getJobConfiguration() { return jobConfiguration; } - public Collection getRequiredJarFileBlobKeys() { + public ImmutableCollection getRequiredJarFileBlobKeys() { return requiredJarFileBlobKeys; } - public Collection getRequiredClasspathURLs() { + public ImmutableCollection getRequiredClasspathURLs() { return requiredClasspathURLs; } + // All fields are immutable, so return this directly. + public JobInformation deepCopy() { + return this; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + JobInformation that = (JobInformation) o; + return Objects.equals(jobId, that.jobId) + && Objects.equals(jobName, that.jobName) + && Objects.equals(serializedExecutionConfig, that.serializedExecutionConfig) + && Objects.equals(jobConfiguration, that.jobConfiguration) + && Objects.equals(requiredJarFileBlobKeys, that.requiredJarFileBlobKeys) + && Objects.equals(requiredClasspathURLs, that.requiredClasspathURLs); + } + + @Override + public int hashCode() { + return Objects.hash( + jobId, + jobName, + serializedExecutionConfig, + jobConfiguration, + requiredJarFileBlobKeys, + requiredClasspathURLs); + } + // ------------------------------------------------------------------------ @Override diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/TaskInformation.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/TaskInformation.java index 6a3b1a26e7f0b..e1b59d4bfb12f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/TaskInformation.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/TaskInformation.java @@ -23,6 +23,7 @@ import org.apache.flink.util.Preconditions; import java.io.Serializable; +import java.util.Objects; /** * Container class for operator/task specific information which are stored at the {@link @@ -88,4 +89,43 @@ public String getInvokableClassName() { public Configuration getTaskConfiguration() { return taskConfiguration; } + + public TaskInformation deepCopy() { + return new TaskInformation( + getJobVertexId(), + getTaskName(), + getNumberOfSubtasks(), + getMaxNumberOfSubtasks(), + getInvokableClassName(), + // Return the new Configuration to avoid shared conf being changed. + new Configuration(getTaskConfiguration())); + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + TaskInformation that = (TaskInformation) o; + return numberOfSubtasks == that.numberOfSubtasks + && maxNumberOfSubtasks == that.maxNumberOfSubtasks + && Objects.equals(jobVertexId, that.jobVertexId) + && Objects.equals(taskName, that.taskName) + && Objects.equals(invokableClassName, that.invokableClassName) + && Objects.equals(taskConfiguration, that.taskConfiguration); + } + + @Override + public int hashCode() { + return Objects.hash( + jobVertexId, + taskName, + numberOfSubtasks, + maxNumberOfSubtasks, + invokableClassName, + taskConfiguration); + } } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/DefaultShuffleDescriptorsCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/DefaultShuffleDescriptorsCache.java deleted file mode 100644 index 99a97f2370b27..0000000000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/DefaultShuffleDescriptorsCache.java +++ /dev/null @@ -1,153 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.taskexecutor; - -import org.apache.flink.annotation.VisibleForTesting; -import org.apache.flink.api.common.JobID; -import org.apache.flink.runtime.blob.PermanentBlobKey; -import org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory.ShuffleDescriptorGroup; - -import org.apache.flink.shaded.guava31.com.google.common.base.Ticker; -import org.apache.flink.shaded.guava31.com.google.common.cache.Cache; -import org.apache.flink.shaded.guava31.com.google.common.cache.CacheBuilder; -import org.apache.flink.shaded.guava31.com.google.common.cache.RemovalNotification; - -import java.time.Duration; -import java.util.HashMap; -import java.util.HashSet; -import java.util.Map; -import java.util.Set; - -import static org.apache.flink.util.Preconditions.checkNotNull; - -/** Default implement of {@link ShuffleDescriptorsCache}. Entries will be expired after timeout. */ -public class DefaultShuffleDescriptorsCache implements ShuffleDescriptorsCache { - private final Cache shuffleDescriptorsCache; - private final Map> cachedBlobKeysPerJob; - - private DefaultShuffleDescriptorsCache( - Duration expireTimeout, int cacheSizeLimit, Ticker ticker) { - this.cachedBlobKeysPerJob = new HashMap<>(); - this.shuffleDescriptorsCache = - CacheBuilder.newBuilder() - .concurrencyLevel(1) - .maximumSize(cacheSizeLimit) - .expireAfterAccess(expireTimeout) - .ticker(ticker) - .removalListener(this::onCacheRemoval) - .build(); - } - - @Override - public void clear() { - cachedBlobKeysPerJob.clear(); - shuffleDescriptorsCache.cleanUp(); - } - - @Override - public ShuffleDescriptorGroup get(PermanentBlobKey blobKey) { - ShuffleDescriptorCacheEntry entry = shuffleDescriptorsCache.getIfPresent(blobKey); - return entry == null ? null : entry.getShuffleDescriptorGroup(); - } - - @Override - public void put( - JobID jobId, PermanentBlobKey blobKey, ShuffleDescriptorGroup shuffleDescriptorGroup) { - shuffleDescriptorsCache.put( - blobKey, new ShuffleDescriptorCacheEntry(shuffleDescriptorGroup, jobId)); - cachedBlobKeysPerJob.computeIfAbsent(jobId, ignore -> new HashSet<>()).add(blobKey); - } - - @Override - public void clearCacheForJob(JobID jobId) { - Set removed = cachedBlobKeysPerJob.remove(jobId); - if (removed != null) { - shuffleDescriptorsCache.invalidateAll(removed); - } - } - - /** - * Removal listener that remove the index of serializedShuffleDescriptorsPerJob . - * - * @param removalNotification of removed element. - */ - private void onCacheRemoval( - RemovalNotification - removalNotification) { - PermanentBlobKey blobKey = removalNotification.getKey(); - ShuffleDescriptorCacheEntry entry = removalNotification.getValue(); - if (blobKey != null && entry != null) { - cachedBlobKeysPerJob.computeIfPresent( - entry.getJobId(), - (jobID, permanentBlobKeys) -> { - permanentBlobKeys.remove(blobKey); - if (permanentBlobKeys.isEmpty()) { - return null; - } else { - return permanentBlobKeys; - } - }); - } - } - - private static class ShuffleDescriptorCacheEntry { - private final ShuffleDescriptorGroup shuffleDescriptorGroup; - private final JobID jobId; - - public ShuffleDescriptorCacheEntry( - ShuffleDescriptorGroup shuffleDescriptorGroup, JobID jobId) { - this.shuffleDescriptorGroup = checkNotNull(shuffleDescriptorGroup); - this.jobId = checkNotNull(jobId); - } - - public ShuffleDescriptorGroup getShuffleDescriptorGroup() { - return shuffleDescriptorGroup; - } - - public JobID getJobId() { - return jobId; - } - } - - /** The Factory of {@link DefaultShuffleDescriptorsCache}. */ - public static class Factory { - private static final Duration DEFAULT_CACHE_EXPIRE_TIMEOUT = Duration.ofSeconds(300); - private static final int DEFAULT_CACHE_SIZE_LIMIT = 100; - private static final Ticker DEFAULT_TICKER = Ticker.systemTicker(); - - private final Duration cacheExpireTimeout; - private final int cacheSizeLimit; - private final Ticker ticker; - - public Factory() { - this(DEFAULT_CACHE_EXPIRE_TIMEOUT, DEFAULT_CACHE_SIZE_LIMIT, DEFAULT_TICKER); - } - - @VisibleForTesting - public Factory(Duration cacheExpireTimeout, int cacheSizeLimit, Ticker ticker) { - this.cacheExpireTimeout = cacheExpireTimeout; - this.cacheSizeLimit = cacheSizeLimit; - this.ticker = ticker; - } - - public DefaultShuffleDescriptorsCache create() { - return new DefaultShuffleDescriptorsCache(cacheExpireTimeout, cacheSizeLimit, ticker); - } - } -} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/ShuffleDescriptorsCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/ShuffleDescriptorsCache.java deleted file mode 100644 index a86e6a67722da..0000000000000 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/ShuffleDescriptorsCache.java +++ /dev/null @@ -1,54 +0,0 @@ -/* - * Licensed to the Apache Software Foundation (ASF) under one - * or more contributor license agreements. See the NOTICE file - * distributed with this work for additional information - * regarding copyright ownership. The ASF licenses this file - * to you under the Apache License, Version 2.0 (the - * "License"); you may not use this file except in compliance - * with the License. You may obtain a copy of the License at - * - * http://www.apache.org/licenses/LICENSE-2.0 - * - * Unless required by applicable law or agreed to in writing, software - * distributed under the License is distributed on an "AS IS" BASIS, - * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. - * See the License for the specific language governing permissions and - * limitations under the License. - */ - -package org.apache.flink.runtime.taskexecutor; - -import org.apache.flink.api.common.JobID; -import org.apache.flink.runtime.blob.PermanentBlobKey; -import org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory.ShuffleDescriptorGroup; - -/** Cache of shuffle descriptors in TaskExecutor. */ -public interface ShuffleDescriptorsCache { - - /** clear all cache. */ - void clear(); - - /** - * Get shuffle descriptor group in cache. - * - * @param blobKey identify the shuffle descriptor group - * @return shuffle descriptor group in cache if exists, otherwise null - */ - ShuffleDescriptorGroup get(PermanentBlobKey blobKey); - - /** - * Put shuffle descriptor group to cache. - * - * @param jobId of job - * @param blobKey identify the shuffle descriptor group - * @param shuffleDescriptorGroup shuffle descriptor group to cache - */ - void put(JobID jobId, PermanentBlobKey blobKey, ShuffleDescriptorGroup shuffleDescriptorGroup); - - /** - * Clear all cache for the Job. - * - * @param jobId of job - */ - void clearCacheForJob(JobID jobId); -} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java index 680514f09a6f1..c3a705e0e0e68 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskExecutor.java @@ -25,6 +25,7 @@ import org.apache.flink.management.jmx.JMXService; import org.apache.flink.runtime.accumulators.AccumulatorSnapshot; import org.apache.flink.runtime.blob.JobPermanentBlobService; +import org.apache.flink.runtime.blob.PermanentBlobKey; import org.apache.flink.runtime.blob.TaskExecutorBlobService; import org.apache.flink.runtime.blob.TransientBlobKey; import org.apache.flink.runtime.blob.TransientBlobService; @@ -39,6 +40,7 @@ import org.apache.flink.runtime.clusterframework.types.SlotID; import org.apache.flink.runtime.deployment.ResultPartitionDeploymentDescriptor; import org.apache.flink.runtime.deployment.TaskDeploymentDescriptor; +import org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory.ShuffleDescriptorGroup; import org.apache.flink.runtime.entrypoint.ClusterInformation; import org.apache.flink.runtime.execution.ExecutionState; import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager; @@ -131,6 +133,7 @@ import org.apache.flink.runtime.taskmanager.TaskExecutionState; import org.apache.flink.runtime.taskmanager.TaskManagerActions; import org.apache.flink.runtime.taskmanager.UnresolvedTaskManagerLocation; +import org.apache.flink.runtime.util.GroupCache; import org.apache.flink.runtime.webmonitor.threadinfo.ThreadInfoSamplesRequest; import org.apache.flink.types.SerializableOptional; import org.apache.flink.util.CollectionUtil; @@ -297,7 +300,10 @@ public class TaskExecutor extends RpcEndpoint implements TaskExecutorGateway { private final ThreadInfoSampleService threadInfoSampleService; - private final ShuffleDescriptorsCache shuffleDescriptorsCache; + private final GroupCache jobInformationCache; + private final GroupCache taskInformationCache; + private final GroupCache + shuffleDescriptorsCache; public TaskExecutor( RpcService rpcService, @@ -374,6 +380,8 @@ public TaskExecutor( taskExecutorServices.getSlotAllocationSnapshotPersistenceService(); this.sharedResources = taskExecutorServices.getSharedResources(); + this.jobInformationCache = taskExecutorServices.getJobInformationCache(); + this.taskInformationCache = taskExecutorServices.getTaskInformationCache(); this.shuffleDescriptorsCache = taskExecutorServices.getShuffleDescriptorCache(); } @@ -504,6 +512,8 @@ public CompletableFuture onStop() { changelogStoragesManager.shutdown(); channelStateExecutorFactoryManager.shutdown(); + jobInformationCache.clear(); + taskInformationCache.clear(); shuffleDescriptorsCache.clear(); Preconditions.checkState(jobTable.isEmpty()); @@ -668,7 +678,10 @@ public CompletableFuture submitTask( // re-integrate offloaded data and deserialize shuffle descriptors try { tdd.loadBigData( - taskExecutorBlobService.getPermanentBlobService(), shuffleDescriptorsCache); + taskExecutorBlobService.getPermanentBlobService(), + jobInformationCache, + taskInformationCache, + shuffleDescriptorsCache); } catch (IOException | ClassNotFoundException e) { throw new TaskSubmissionException( "Could not re-integrate offloaded TaskDeploymentDescriptor data.", e); @@ -678,12 +691,8 @@ public CompletableFuture submitTask( final JobInformation jobInformation; final TaskInformation taskInformation; try { - jobInformation = - tdd.getSerializedJobInformation() - .deserializeValue(getClass().getClassLoader()); - taskInformation = - tdd.getSerializedTaskInformation() - .deserializeValue(getClass().getClassLoader()); + jobInformation = tdd.getJobInformation(); + taskInformation = tdd.getTaskInformation(); } catch (IOException | ClassNotFoundException e) { throw new TaskSubmissionException( "Could not deserialize the job or task information.", e); @@ -1903,7 +1912,9 @@ private void releaseJobResources(JobID jobId, Exception cause) { changelogStoragesManager.releaseResourcesForJob(jobId); currentSlotOfferPerJob.remove(jobId); channelStateExecutorFactoryManager.releaseResourcesForJob(jobId); - shuffleDescriptorsCache.clearCacheForJob(jobId); + jobInformationCache.clearCacheForGroup(jobId); + taskInformationCache.clearCacheForGroup(jobId); + shuffleDescriptorsCache.clearCacheForGroup(jobId); fileMergingManager.releaseMergingSnapshotManagerForJob(jobId); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java index da8a063618944..5302e54bcb52e 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskexecutor/TaskManagerServices.java @@ -18,14 +18,19 @@ package org.apache.flink.runtime.taskexecutor; +import org.apache.flink.api.common.JobID; import org.apache.flink.configuration.CoreOptions; import org.apache.flink.metrics.MetricGroup; +import org.apache.flink.runtime.blob.PermanentBlobKey; import org.apache.flink.runtime.blob.PermanentBlobService; import org.apache.flink.runtime.broadcast.BroadcastVariableManager; import org.apache.flink.runtime.clusterframework.types.AllocationID; +import org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory.ShuffleDescriptorGroup; import org.apache.flink.runtime.entrypoint.WorkingDirectory; import org.apache.flink.runtime.execution.librarycache.BlobLibraryCacheManager; import org.apache.flink.runtime.execution.librarycache.LibraryCacheManager; +import org.apache.flink.runtime.executiongraph.JobInformation; +import org.apache.flink.runtime.executiongraph.TaskInformation; import org.apache.flink.runtime.io.disk.iomanager.IOManager; import org.apache.flink.runtime.io.disk.iomanager.IOManagerAsync; import org.apache.flink.runtime.io.network.TaskEventDispatcher; @@ -48,6 +53,8 @@ import org.apache.flink.runtime.taskexecutor.slot.TimerService; import org.apache.flink.runtime.taskmanager.Task; import org.apache.flink.runtime.taskmanager.UnresolvedTaskManagerLocation; +import org.apache.flink.runtime.util.DefaultGroupCache; +import org.apache.flink.runtime.util.GroupCache; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.FlinkException; import org.apache.flink.util.Preconditions; @@ -89,7 +96,10 @@ public class TaskManagerServices { private final LibraryCacheManager libraryCacheManager; private final SlotAllocationSnapshotPersistenceService slotAllocationSnapshotPersistenceService; private final SharedResources sharedResources; - private final ShuffleDescriptorsCache shuffleDescriptorsCache; + private final GroupCache jobInformationCache; + private final GroupCache taskInformationCache; + private final GroupCache + shuffleDescriptorsCache; TaskManagerServices( UnresolvedTaskManagerLocation unresolvedTaskManagerLocation, @@ -110,7 +120,9 @@ public class TaskManagerServices { LibraryCacheManager libraryCacheManager, SlotAllocationSnapshotPersistenceService slotAllocationSnapshotPersistenceService, SharedResources sharedResources, - ShuffleDescriptorsCache shuffleDescriptorsCache) { + GroupCache jobInformationCache, + GroupCache taskInformationCache, + GroupCache shuffleDescriptorsCache) { this.unresolvedTaskManagerLocation = Preconditions.checkNotNull(unresolvedTaskManagerLocation); @@ -132,6 +144,8 @@ public class TaskManagerServices { this.libraryCacheManager = Preconditions.checkNotNull(libraryCacheManager); this.slotAllocationSnapshotPersistenceService = slotAllocationSnapshotPersistenceService; this.sharedResources = Preconditions.checkNotNull(sharedResources); + this.jobInformationCache = jobInformationCache; + this.taskInformationCache = taskInformationCache; this.shuffleDescriptorsCache = Preconditions.checkNotNull(shuffleDescriptorsCache); } @@ -207,7 +221,15 @@ public SharedResources getSharedResources() { return sharedResources; } - public ShuffleDescriptorsCache getShuffleDescriptorCache() { + public GroupCache getJobInformationCache() { + return jobInformationCache; + } + + public GroupCache getTaskInformationCache() { + return taskInformationCache; + } + + public GroupCache getShuffleDescriptorCache() { return shuffleDescriptorsCache; } @@ -405,8 +427,14 @@ public static TaskManagerServices fromConfiguration( NoOpSlotAllocationSnapshotPersistenceService.INSTANCE; } - final ShuffleDescriptorsCache shuffleDescriptorsCache = - new DefaultShuffleDescriptorsCache.Factory().create(); + final GroupCache jobInformationCache = + new DefaultGroupCache.Factory().create(); + final GroupCache taskInformationCache = + new DefaultGroupCache.Factory().create(); + + final GroupCache shuffleDescriptorsCache = + new DefaultGroupCache.Factory() + .create(); return new TaskManagerServices( unresolvedTaskManagerLocation, @@ -427,6 +455,8 @@ public static TaskManagerServices fromConfiguration( libraryCacheManager, slotAllocationSnapshotPersistenceService, new SharedResources(), + jobInformationCache, + taskInformationCache, shuffleDescriptorsCache); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/DefaultGroupCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/DefaultGroupCache.java new file mode 100644 index 0000000000000..e424e0521b69e --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/DefaultGroupCache.java @@ -0,0 +1,163 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.util; + +import org.apache.flink.annotation.VisibleForTesting; + +import org.apache.flink.shaded.guava31.com.google.common.base.Ticker; +import org.apache.flink.shaded.guava31.com.google.common.cache.Cache; +import org.apache.flink.shaded.guava31.com.google.common.cache.CacheBuilder; +import org.apache.flink.shaded.guava31.com.google.common.cache.RemovalNotification; + +import javax.annotation.concurrent.NotThreadSafe; + +import java.time.Duration; +import java.util.HashMap; +import java.util.HashSet; +import java.util.Map; +import java.util.Objects; +import java.util.Set; + +/** Default implement of {@link GroupCache}. Entries will be expired after timeout. */ +@NotThreadSafe +public class DefaultGroupCache implements GroupCache { + private final Cache, V> cache; + private final Map>> cachedBlobKeysPerJob; + + private DefaultGroupCache(Duration expireTimeout, int cacheSizeLimit, Ticker ticker) { + this.cachedBlobKeysPerJob = new HashMap<>(); + this.cache = + CacheBuilder.newBuilder() + .concurrencyLevel(1) + .maximumSize(cacheSizeLimit) + .expireAfterAccess(expireTimeout) + .ticker(ticker) + .removalListener(this::onCacheRemoval) + .build(); + } + + @Override + public void clear() { + cachedBlobKeysPerJob.clear(); + cache.cleanUp(); + } + + @Override + public V get(G group, K key) { + return cache.getIfPresent(new CacheKey<>(group, key)); + } + + @Override + public void put(G group, K key, V value) { + CacheKey cacheKey = new CacheKey<>(group, key); + cache.put(cacheKey, value); + cachedBlobKeysPerJob.computeIfAbsent(group, ignore -> new HashSet<>()).add(cacheKey); + } + + @Override + public void clearCacheForGroup(G group) { + Set> removed = cachedBlobKeysPerJob.remove(group); + if (removed != null) { + cache.invalidateAll(removed); + } + } + + /** + * Removal listener that remove the cache key of this group . + * + * @param removalNotification of removed element. + */ + private void onCacheRemoval(RemovalNotification, V> removalNotification) { + CacheKey cacheKey = removalNotification.getKey(); + V value = removalNotification.getValue(); + if (cacheKey != null && value != null) { + cachedBlobKeysPerJob.computeIfPresent( + cacheKey.getGroup(), + (group, keys) -> { + keys.remove(cacheKey); + if (keys.isEmpty()) { + return null; + } else { + return keys; + } + }); + } + } + + private static class CacheKey { + private final G group; + private final K key; + + public CacheKey(G group, K key) { + this.group = group; + this.key = key; + } + + public G getGroup() { + return group; + } + + public K getKey() { + return key; + } + + @Override + public boolean equals(Object o) { + if (this == o) { + return true; + } + if (o == null || getClass() != o.getClass()) { + return false; + } + CacheKey cacheKey = (CacheKey) o; + return Objects.equals(group, cacheKey.group) && Objects.equals(key, cacheKey.key); + } + + @Override + public int hashCode() { + return Objects.hash(group, key); + } + } + + /** The Factory of {@link DefaultGroupCache}. */ + public static class Factory { + private static final Duration DEFAULT_CACHE_EXPIRE_TIMEOUT = Duration.ofSeconds(300); + private static final int DEFAULT_CACHE_SIZE_LIMIT = 100; + private static final Ticker DEFAULT_TICKER = Ticker.systemTicker(); + + private final Duration cacheExpireTimeout; + private final int cacheSizeLimit; + private final Ticker ticker; + + public Factory() { + this(DEFAULT_CACHE_EXPIRE_TIMEOUT, DEFAULT_CACHE_SIZE_LIMIT, DEFAULT_TICKER); + } + + @VisibleForTesting + public Factory(Duration cacheExpireTimeout, int cacheSizeLimit, Ticker ticker) { + this.cacheExpireTimeout = cacheExpireTimeout; + this.cacheSizeLimit = cacheSizeLimit; + this.ticker = ticker; + } + + public DefaultGroupCache create() { + return new DefaultGroupCache<>(cacheExpireTimeout, cacheSizeLimit, ticker); + } + } +} diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/GroupCache.java b/flink-runtime/src/main/java/org/apache/flink/runtime/util/GroupCache.java new file mode 100644 index 0000000000000..25757837878be --- /dev/null +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/util/GroupCache.java @@ -0,0 +1,50 @@ +/* + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +package org.apache.flink.runtime.util; + +import javax.annotation.Nullable; + +/** + * This {@link GroupCache} can cache group, key and value. The group and key are cache key, each key + * belongs to a certain group. All corresponding keys and values will be cleared if a group is + * cleared. + * + * @param The group. + * @param The key. + * @param The value. + */ +public interface GroupCache { + + /** clear all cache. */ + void clear(); + + /** + * Get value in cache. + * + * @return value in cache if exists, otherwise null + */ + @Nullable + V get(G group, K key); + + /** Put group, key and value to cache. */ + void put(G group, K key, V value); + + /** Clear all caches of the corresponding group. */ + void clearCacheForGroup(G group); +} diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java index 6191b19767d20..ac003665584fe 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptorTest.java @@ -20,9 +20,13 @@ import org.apache.flink.api.common.ExecutionConfig; import org.apache.flink.api.common.JobID; +import org.apache.flink.configuration.BlobServerOptions; import org.apache.flink.configuration.Configuration; import org.apache.flink.core.testutils.CommonTestUtils; +import org.apache.flink.runtime.blob.BlobServer; +import org.apache.flink.runtime.blob.BlobWriter; import org.apache.flink.runtime.blob.PermanentBlobKey; +import org.apache.flink.runtime.blob.VoidBlobStore; import org.apache.flink.runtime.checkpoint.JobManagerTaskRestore; import org.apache.flink.runtime.checkpoint.TaskStateSnapshot; import org.apache.flink.runtime.clusterframework.types.AllocationID; @@ -32,14 +36,21 @@ import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.jobgraph.tasks.AbstractInvokable; import org.apache.flink.runtime.operators.BatchTask; +import org.apache.flink.runtime.util.DefaultGroupCache; +import org.apache.flink.runtime.util.GroupCache; +import org.apache.flink.runtime.util.NoOpGroupCache; +import org.apache.flink.testutils.junit.utils.TempDirUtils; +import org.apache.flink.types.Either; import org.apache.flink.util.SerializedValue; import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.io.TempDir; import javax.annotation.Nonnull; import java.io.IOException; import java.net.URL; +import java.nio.file.Path; import java.util.ArrayList; import java.util.List; @@ -50,6 +61,8 @@ /** Tests for the {@link TaskDeploymentDescriptor}. */ class TaskDeploymentDescriptorTest { + @TempDir Path temporaryFolder; + private static final JobID jobID = new JobID(); private static final JobVertexID vertexID = new JobVertexID(); private static final ExecutionAttemptID execId = createExecutionAttemptId(vertexID); @@ -73,24 +86,28 @@ class TaskDeploymentDescriptorTest { private final SerializedValue executionConfig = new SerializedValue<>(new ExecutionConfig()); + + private final JobInformation jobInformation = + new JobInformation( + jobID, + jobName, + executionConfig, + jobConfiguration, + requiredJars, + requiredClasspaths); private final SerializedValue serializedJobInformation = - new SerializedValue<>( - new JobInformation( - jobID, - jobName, - executionConfig, - jobConfiguration, - requiredJars, - requiredClasspaths)); + new SerializedValue<>(jobInformation); + + private final TaskInformation taskInformation = + new TaskInformation( + vertexID, + taskName, + currentNumberOfSubtasks, + numberOfKeyGroups, + invokableClass.getName(), + taskConfiguration); private final SerializedValue serializedJobVertexInformation = - new SerializedValue<>( - new TaskInformation( - vertexID, - taskName, - currentNumberOfSubtasks, - numberOfKeyGroups, - invokableClass.getName(), - taskConfiguration)); + new SerializedValue<>(taskInformation); TaskDeploymentDescriptorTest() throws IOException {} @@ -104,19 +121,15 @@ void testSerialization() throws Exception { final TaskDeploymentDescriptor copy = CommonTestUtils.createCopySerializable(orig); - assertThat(orig.getSerializedJobInformation()) - .isNotSameAs(copy.getSerializedJobInformation()); - assertThat(orig.getSerializedTaskInformation()) - .isNotSameAs(copy.getSerializedTaskInformation()); + assertThat(orig.getJobInformation()).isNotSameAs(copy.getJobInformation()); + assertThat(orig.getTaskInformation()).isNotSameAs(copy.getTaskInformation()); assertThat(orig.getExecutionAttemptId()).isNotSameAs(copy.getExecutionAttemptId()); assertThat(orig.getTaskRestore()).isNotSameAs(copy.getTaskRestore()); assertThat(orig.getProducedPartitions()).isNotSameAs(copy.getProducedPartitions()); assertThat(orig.getInputGates()).isNotSameAs(copy.getInputGates()); - assertThat(orig.getSerializedJobInformation()) - .isEqualTo(copy.getSerializedJobInformation()); - assertThat(orig.getSerializedTaskInformation()) - .isEqualTo(copy.getSerializedTaskInformation()); + assertThat(orig.getJobInformation()).isEqualTo(copy.getJobInformation()); + assertThat(orig.getTaskInformation()).isEqualTo(copy.getTaskInformation()); assertThat(orig.getExecutionAttemptId()).isEqualTo(copy.getExecutionAttemptId()); assertThat(orig.getAllocationId()).isEqualTo(copy.getAllocationId()); assertThat(orig.getSubtaskIndex()).isEqualTo(copy.getSubtaskIndex()); @@ -130,20 +143,88 @@ void testSerialization() throws Exception { } @Test - void testOffLoadedAndNonOffLoadedPayload() { + void testOffLoadedAndNonOffLoadedPayload() throws IOException, ClassNotFoundException { final TaskDeploymentDescriptor taskDeploymentDescriptor = createTaskDeploymentDescriptor( new TaskDeploymentDescriptor.NonOffloaded<>(serializedJobInformation), new TaskDeploymentDescriptor.Offloaded<>(new PermanentBlobKey())); - SerializedValue actualSerializedJobInformation = - taskDeploymentDescriptor.getSerializedJobInformation(); - assertThat(actualSerializedJobInformation).isSameAs(serializedJobInformation); + JobInformation actualJobInformation = taskDeploymentDescriptor.getJobInformation(); + assertThat(actualJobInformation).isEqualTo(jobInformation); - assertThatThrownBy(taskDeploymentDescriptor::getSerializedTaskInformation) + assertThatThrownBy(taskDeploymentDescriptor::getTaskInformation) .isInstanceOf(IllegalStateException.class); } + @Test + void testTaskInformationCache() throws IOException, ClassNotFoundException { + try (BlobServer blobServer = setupBlobServer()) { + // Serialize taskInformation to blobServer and get the permanentBlobKey + Either, PermanentBlobKey> taskInformationOrBlobKey = + BlobWriter.serializeAndTryOffload(taskInformation, jobID, blobServer); + assertThat(taskInformationOrBlobKey.isRight()).isTrue(); + PermanentBlobKey permanentBlobKey = taskInformationOrBlobKey.right(); + + GroupCache taskInformationCache = + new DefaultGroupCache.Factory() + .create(); + // Test for tdd1 + final TaskDeploymentDescriptor tdd1 = + createTaskDeploymentDescriptor( + new TaskDeploymentDescriptor.NonOffloaded<>(serializedJobInformation), + new TaskDeploymentDescriptor.Offloaded<>(permanentBlobKey)); + assertThat(taskInformationCache.get(jobID, permanentBlobKey)).isNull(); + tdd1.loadBigData( + blobServer, + new NoOpGroupCache<>(), + taskInformationCache, + new NoOpGroupCache<>()); + TaskInformation taskInformation1 = tdd1.getTaskInformation(); + assertThat(taskInformation1).isEqualTo(taskInformation); + // The TaskInformation is cached in taskInformationCache, and it's equals to + // taskInformation1. + assertThat(taskInformationCache.get(jobID, permanentBlobKey)) + .isNotNull() + .isEqualTo(taskInformation1); + + // Test for tdd2 + final TaskDeploymentDescriptor tdd2 = + createTaskDeploymentDescriptor( + new TaskDeploymentDescriptor.NonOffloaded<>(serializedJobInformation), + new TaskDeploymentDescriptor.Offloaded<>(permanentBlobKey)); + tdd2.loadBigData( + blobServer, + new NoOpGroupCache<>(), + taskInformationCache, + new NoOpGroupCache<>()); + TaskInformation taskInformation2 = tdd2.getTaskInformation(); + // The TaskInformation2 is equals to taskInformation1 and original taskInformation, but + // they are not same. + assertThat(taskInformation2) + .isNotNull() + .isEqualTo(taskInformation1) + .isNotSameAs(taskInformation1) + .isEqualTo(taskInformation); + // Configuration may be changed by subtask, so the configuration must be not same. + assertThat(taskInformation2.getTaskConfiguration()) + .isNotNull() + .isEqualTo(taskInformation1.getTaskConfiguration()) + .isNotSameAs(taskInformation1.getTaskConfiguration()); + } + } + + private BlobServer setupBlobServer() throws IOException { + + Configuration config = new Configuration(); + // always offload the serialized job and task information + config.setInteger(BlobServerOptions.OFFLOAD_MINSIZE, 0); + BlobServer blobServer = + new BlobServer( + config, TempDirUtils.newFolder(temporaryFolder), new VoidBlobStore()); + blobServer.start(); + return blobServer; + } + @Nonnull private TaskDeploymentDescriptor createTaskDeploymentDescriptor( TaskDeploymentDescriptor.MaybeOffloaded jobInformation, diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/JobMasterTester.java b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/JobMasterTester.java index a5508f9698ec7..08783509a9fe1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/JobMasterTester.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/dispatcher/JobMasterTester.java @@ -204,9 +204,7 @@ private CompletableFuture getTaskInformation( "Task descriptor for %s not found.", executionAttemptId))); try { - return descriptor - .getSerializedTaskInformation() - .deserializeValue(Thread.currentThread().getContextClassLoader()); + return descriptor.getTaskInformation(); } catch (Exception e) { throw new IllegalStateException( String.format( diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentTest.java index fc2df07230376..d354b49fedde9 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentTest.java @@ -58,7 +58,6 @@ import org.apache.flink.runtime.scheduler.TestingPhysicalSlotProvider; import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup; import org.apache.flink.runtime.shuffle.ShuffleDescriptor; -import org.apache.flink.runtime.taskexecutor.NoOpShuffleDescriptorsCache; import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGateway; import org.apache.flink.runtime.taskexecutor.TestingTaskExecutorGatewayBuilder; import org.apache.flink.runtime.taskmanager.LocalTaskManagerLocation; @@ -66,6 +65,7 @@ import org.apache.flink.runtime.taskmanager.TaskManagerLocation; import org.apache.flink.runtime.testtasks.NoOpInvokable; import org.apache.flink.runtime.testutils.DirectScheduledExecutorService; +import org.apache.flink.runtime.util.NoOpGroupCache; import org.apache.flink.testutils.TestingUtils; import org.apache.flink.testutils.executor.TestExecutorExtension; import org.apache.flink.util.function.FunctionUtils; @@ -180,7 +180,10 @@ void testBuildDeploymentDescriptor() throws Exception { FunctionUtils.uncheckedConsumer( taskDeploymentDescriptor -> { taskDeploymentDescriptor.loadBigData( - blobCache, NoOpShuffleDescriptorsCache.INSTANCE); + blobCache, + new NoOpGroupCache<>(), + new NoOpGroupCache<>(), + new NoOpGroupCache<>()); tdd.complete(taskDeploymentDescriptor); })); @@ -203,10 +206,8 @@ void testBuildDeploymentDescriptor() throws Exception { TaskDeploymentDescriptor descr = tdd.get(); assertThat(descr).isNotNull(); - JobInformation jobInformation = - descr.getSerializedJobInformation().deserializeValue(getClass().getClassLoader()); - TaskInformation taskInformation = - descr.getSerializedTaskInformation().deserializeValue(getClass().getClassLoader()); + JobInformation jobInformation = descr.getJobInformation(); + TaskInformation taskInformation = descr.getTaskInformation(); assertThat(descr.getJobId()).isEqualTo(jobId); assertThat(jobInformation.getJobId()).isEqualTo(jobId); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentWithSmallBlobCacheSizeLimitTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentWithSmallBlobCacheSizeLimitTest.java index 6fdc9de9adeb5..fae6a754646a4 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentWithSmallBlobCacheSizeLimitTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/executiongraph/DefaultExecutionGraphDeploymentWithSmallBlobCacheSizeLimitTest.java @@ -42,8 +42,8 @@ import org.apache.flink.runtime.jobmaster.TestingLogicalSlotBuilder; import org.apache.flink.runtime.operators.BatchTask; import org.apache.flink.runtime.scheduler.strategy.ConsumedPartitionGroup; -import org.apache.flink.runtime.taskexecutor.NoOpShuffleDescriptorsCache; import org.apache.flink.runtime.testutils.DirectScheduledExecutorService; +import org.apache.flink.runtime.util.NoOpGroupCache; import org.apache.flink.testutils.junit.utils.TempDirUtils; import org.apache.flink.util.function.FunctionUtils; @@ -121,7 +121,10 @@ void testDeployMultipleTasksWithSmallBlobCacheSizeLimit() throws Exception { FunctionUtils.uncheckedConsumer( taskDeploymentDescriptor -> { taskDeploymentDescriptor.loadBigData( - blobCache, NoOpShuffleDescriptorsCache.INSTANCE); + blobCache, + new NoOpGroupCache<>(), + new NoOpGroupCache<>(), + new NoOpGroupCache<>()); tdds.offer(taskDeploymentDescriptor); })); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesBuilder.java b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesBuilder.java index 2bd18f543a1d5..68d9d5a275de7 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesBuilder.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/TaskManagerServicesBuilder.java @@ -39,6 +39,7 @@ import org.apache.flink.runtime.taskmanager.LocalUnresolvedTaskManagerLocation; import org.apache.flink.runtime.taskmanager.Task; import org.apache.flink.runtime.taskmanager.UnresolvedTaskManagerLocation; +import org.apache.flink.runtime.util.NoOpGroupCache; import java.util.concurrent.CompletableFuture; import java.util.concurrent.Executors; @@ -200,6 +201,8 @@ public TaskManagerServices build() { libraryCacheManager, slotAllocationSnapshotPersistenceService, sharedResources, - NoOpShuffleDescriptorsCache.INSTANCE); + new NoOpGroupCache<>(), + new NoOpGroupCache<>(), + new NoOpGroupCache<>()); } } diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/DefaultShuffleDescriptorsCacheTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/DefaultGroupCacheTest.java similarity index 76% rename from flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/DefaultShuffleDescriptorsCacheTest.java rename to flink-runtime/src/test/java/org/apache/flink/runtime/util/DefaultGroupCacheTest.java index 84c1acaa4e118..b5a3b717094fc 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/DefaultShuffleDescriptorsCacheTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/DefaultGroupCacheTest.java @@ -16,7 +16,7 @@ * limitations under the License. */ -package org.apache.flink.runtime.taskexecutor; +package org.apache.flink.runtime.util; import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.blob.PermanentBlobKey; @@ -35,14 +35,14 @@ import static org.assertj.core.api.Assertions.assertThat; -/** Tests for {@link DefaultShuffleDescriptorsCache}. */ -class DefaultShuffleDescriptorsCacheTest { +/** Tests for {@link DefaultGroupCache}. */ +class DefaultGroupCacheTest { private final Duration expireTimeout = Duration.ofSeconds(10); @Test void testGetEntry() { - DefaultShuffleDescriptorsCache cache = - new DefaultShuffleDescriptorsCache.Factory( + DefaultGroupCache cache = + new DefaultGroupCache.Factory( expireTimeout, Integer.MAX_VALUE, Ticker.systemTicker()) .create(); @@ -56,16 +56,16 @@ void testGetEntry() { PermanentBlobKey blobKey = new PermanentBlobKey(); - assertThat(cache.get(blobKey)).isNull(); + assertThat(cache.get(jobId, blobKey)).isNull(); cache.put(jobId, blobKey, shuffleDescriptorGroup); - assertThat(cache.get(blobKey)).isEqualTo(shuffleDescriptorGroup); + assertThat(cache.get(jobId, blobKey)).isEqualTo(shuffleDescriptorGroup); } @Test void testClearCacheForJob() { - DefaultShuffleDescriptorsCache cache = - new DefaultShuffleDescriptorsCache.Factory( + DefaultGroupCache cache = + new DefaultGroupCache.Factory( expireTimeout, Integer.MAX_VALUE, Ticker.systemTicker()) .create(); @@ -78,19 +78,20 @@ void testClearCacheForJob() { }); PermanentBlobKey blobKey = new PermanentBlobKey(); - assertThat(cache.get(blobKey)).isNull(); + assertThat(cache.get(jobId, blobKey)).isNull(); cache.put(jobId, blobKey, shuffleDescriptorGroup); - assertThat(cache.get(blobKey)).isEqualTo(shuffleDescriptorGroup); + assertThat(cache.get(jobId, blobKey)).isEqualTo(shuffleDescriptorGroup); - cache.clearCacheForJob(jobId); - assertThat(cache.get(blobKey)).isNull(); + cache.clearCacheForGroup(jobId); + assertThat(cache.get(jobId, blobKey)).isNull(); } @Test void testPutWhenOverLimit() { - DefaultShuffleDescriptorsCache cache = - new DefaultShuffleDescriptorsCache.Factory(expireTimeout, 1, Ticker.systemTicker()) + DefaultGroupCache cache = + new DefaultGroupCache.Factory( + expireTimeout, 1, Ticker.systemTicker()) .create(); JobID jobId = new JobID(); @@ -104,7 +105,7 @@ void testPutWhenOverLimit() { PermanentBlobKey blobKey = new PermanentBlobKey(); cache.put(jobId, blobKey, shuffleDescriptorGroup); - assertThat(cache.get(blobKey)).isEqualTo(shuffleDescriptorGroup); + assertThat(cache.get(jobId, blobKey)).isEqualTo(shuffleDescriptorGroup); ShuffleDescriptorGroup otherShuffleDescriptorGroup = new ShuffleDescriptorGroup( @@ -115,15 +116,15 @@ void testPutWhenOverLimit() { PermanentBlobKey otherBlobKey = new PermanentBlobKey(); cache.put(jobId, otherBlobKey, otherShuffleDescriptorGroup); - assertThat(cache.get(blobKey)).isNull(); - assertThat(cache.get(otherBlobKey)).isEqualTo(otherShuffleDescriptorGroup); + assertThat(cache.get(jobId, blobKey)).isNull(); + assertThat(cache.get(jobId, otherBlobKey)).isEqualTo(otherShuffleDescriptorGroup); } @Test void testEntryExpired() { TestingTicker ticker = new TestingTicker(); - DefaultShuffleDescriptorsCache cache = - new DefaultShuffleDescriptorsCache.Factory( + DefaultGroupCache cache = + new DefaultGroupCache.Factory( Duration.ofSeconds(1), Integer.MAX_VALUE, ticker) .create(); @@ -138,10 +139,10 @@ void testEntryExpired() { PermanentBlobKey blobKey = new PermanentBlobKey(); cache.put(jobId, blobKey, shuffleDescriptorGroup); - assertThat(cache.get(blobKey)).isEqualTo(shuffleDescriptorGroup); + assertThat(cache.get(jobId, blobKey)).isEqualTo(shuffleDescriptorGroup); ticker.advance(Duration.ofSeconds(2)); - assertThat(cache.get(blobKey)).isNull(); + assertThat(cache.get(jobId, blobKey)).isNull(); } private static class TestingTicker extends Ticker { diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NoOpShuffleDescriptorsCache.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/NoOpGroupCache.java similarity index 56% rename from flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NoOpShuffleDescriptorsCache.java rename to flink-runtime/src/test/java/org/apache/flink/runtime/util/NoOpGroupCache.java index 5c7ad4236e3ea..93a0012bdd3d8 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/taskexecutor/NoOpShuffleDescriptorsCache.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/NoOpGroupCache.java @@ -16,29 +16,22 @@ * limitations under the License. */ -package org.apache.flink.runtime.taskexecutor; +package org.apache.flink.runtime.util; -import org.apache.flink.api.common.JobID; -import org.apache.flink.runtime.blob.PermanentBlobKey; -import org.apache.flink.runtime.deployment.TaskDeploymentDescriptorFactory.ShuffleDescriptorGroup; - -/** Non op implement of {@link ShuffleDescriptorsCache}. */ -public class NoOpShuffleDescriptorsCache implements ShuffleDescriptorsCache { - - public static final NoOpShuffleDescriptorsCache INSTANCE = new NoOpShuffleDescriptorsCache(); +/** Non op implement of {@link GroupCache}. */ +public class NoOpGroupCache implements GroupCache { @Override public void clear() {} @Override - public ShuffleDescriptorGroup get(PermanentBlobKey blobKey) { + public V get(G group, K key) { return null; } @Override - public void put( - JobID jobId, PermanentBlobKey blobKey, ShuffleDescriptorGroup shuffleDescriptorGroup) {} + public void put(G group, K key, V value) {} @Override - public void clearCacheForJob(JobID jobId) {} + public void clearCacheForGroup(G group) {} }