From d7b3a3b2f2f1d3fb839da0d8063ce30cb18de9e6 Mon Sep 17 00:00:00 2001 From: Maximilian Michels Date: Fri, 28 Aug 2015 17:01:41 +0200 Subject: [PATCH] [FLINK-2594][client] implement a method to retrieve the accumulators of a job - move SerializedValue from runtime to core - unified code to deserialize accumulators --- .../apache/flink/client/program/Client.java | 75 +++++++++++++++++++ .../accumulators/AccumulatorHelper.java | 36 +++++++++ .../apache/flink}/util/SerializedValue.java | 4 +- .../accumulators/AccumulatorSnapshot.java | 2 +- .../runtime/checkpoint/PendingCheckpoint.java | 2 +- .../runtime/checkpoint/StateForTask.java | 2 +- .../client/SerializedJobExecutionResult.java | 18 +---- .../deployment/TaskDeploymentDescriptor.java | 2 +- .../runtime/executiongraph/Execution.java | 2 +- .../executiongraph/ExecutionGraph.java | 2 +- .../executiongraph/ExecutionVertex.java | 2 +- .../checkpoint/AcknowledgeCheckpoint.java | 2 +- .../taskmanager/RuntimeEnvironment.java | 2 +- .../flink/runtime/taskmanager/Task.java | 2 +- .../flink/runtime/jobmanager/JobManager.scala | 8 +- .../accumulators/AccumulatorMessages.scala | 2 +- .../CheckpointStateRestoreTest.java | 2 +- .../SerializedJobExecutionResultTest.java | 4 +- .../messages/CheckpointMessagesTest.java | 2 +- .../runtime/util/SerializedValueTest.java | 1 + .../flink/tachyon/FileStateHandleTest.java | 2 +- 21 files changed, 137 insertions(+), 37 deletions(-) rename {flink-runtime/src/main/java/org/apache/flink/runtime => flink-core/src/main/java/org/apache/flink}/util/SerializedValue.java (96%) diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java index 78c82f6dc835c..06156fa3d4f49 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/Client.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/Client.java @@ -25,11 +25,14 @@ import java.net.InetAddress; import java.net.InetSocketAddress; import java.net.UnknownHostException; +import java.util.Collections; import java.util.List; +import java.util.Map; import java.util.concurrent.TimeUnit; import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.JobSubmissionResult; +import org.apache.flink.api.common.accumulators.AccumulatorHelper; import org.apache.flink.configuration.IllegalConfigurationException; import org.apache.flink.runtime.akka.AkkaUtils; import org.apache.flink.api.common.JobExecutionResult; @@ -55,6 +58,10 @@ import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobmanager.JobManager; import org.apache.flink.runtime.messages.JobManagerMessages; +import org.apache.flink.runtime.messages.accumulators.AccumulatorResultsErroneous; +import org.apache.flink.runtime.messages.accumulators.AccumulatorResultsFound; +import org.apache.flink.runtime.messages.accumulators.RequestAccumulatorResults; +import org.apache.flink.util.SerializedValue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; @@ -492,6 +499,74 @@ public void cancel(JobID jobId) throws Exception { } + /** + * Requests and returns the accumulators for the given job identifier. Accumulators can be + * requested while a is running or after it has finished. The default class loader is used + * to deserialize the incoming accumulator results. + * @param jobID The job identifier of a job. + * @return A Map containing the accumulator's name and its value. + */ + public Map getAccumulators(JobID jobID) throws Exception { + return getAccumulators(jobID, ClassLoader.getSystemClassLoader()); + } + + /** + * Requests and returns the accumulators for the given job identifier. Accumulators can be + * requested while a is running or after it has finished. + * @param jobID The job identifier of a job. + * @param loader The class loader for deserializing the accumulator results. + * @return A Map containing the accumulator's name and its value. + */ + public Map getAccumulators(JobID jobID, ClassLoader loader) throws Exception { + + final FiniteDuration timeout = AkkaUtils.getTimeout(configuration); + + ActorSystem actorSystem; + try { + actorSystem = JobClient.startJobClientActorSystem(configuration); + } catch (Exception e) { + throw new Exception("Could start client actor system.", e); + } + + ActorRef jobManager; + try { + jobManager = JobManager.getJobManagerRemoteReference(jobManagerAddress, actorSystem, timeout); + } catch (Exception e) { + throw new Exception("Error getting the remote actor reference for the job manager.", e); + } + + Future response; + try { + ActorGateway jobManagerGateway = JobManager.getJobManagerGateway(jobManager, timeout); + response = jobManagerGateway.ask(new RequestAccumulatorResults(jobID), timeout); + } catch (Exception e) { + throw new Exception("Failed to query the job manager gateway for accumulators.", e); + } + + try { + + Object result = Await.result(response, timeout); + + if (result instanceof AccumulatorResultsFound) { + Map> serializedAccumulators = + ((AccumulatorResultsFound) result).result(); + + return AccumulatorHelper.deserializeAccumulators(serializedAccumulators, loader); + + } else if (result instanceof AccumulatorResultsErroneous) { + throw ((AccumulatorResultsErroneous) result).cause(); + } else { + LOG.warn("Failed to fetch accumulators for job {}.", jobID); + } + + } catch (Exception e) { + LOG.error("Error occurred while fetching accumulators for {}", jobID, e); + } + + return Collections.emptyMap(); + } + + // -------------------------------------------------------------------------------------------- public static final class OptimizerPlanEnvironment extends ExecutionEnvironment { diff --git a/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java b/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java index 4fa173cf43adf..bb48bddab2857 100644 --- a/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java +++ b/flink-core/src/main/java/org/apache/flink/api/common/accumulators/AccumulatorHelper.java @@ -18,7 +18,11 @@ package org.apache.flink.api.common.accumulators; +import org.apache.flink.util.SerializedValue; + +import java.io.IOException; import java.io.Serializable; +import java.util.Collections; import java.util.HashMap; import java.util.Map; @@ -134,4 +138,36 @@ public static void resetAndClearAccumulators(Map> accu return result; } + /** + * Takes the serialized accumulator results and tries to deserialize them using the provided + * class loader. + * @param serializedAccumulators The serialized accumulator results. + * @param loader The class loader to use. + * @return The deserialized accumulator results. + * @throws IOException + * @throws ClassNotFoundException + */ + public static Map deserializeAccumulators( + Map> serializedAccumulators, ClassLoader loader) + throws IOException, ClassNotFoundException { + + if (serializedAccumulators == null || serializedAccumulators.isEmpty()) { + return Collections.emptyMap(); + } + + Map accumulators = new HashMap<>(serializedAccumulators.size()); + + for (Map.Entry> entry : serializedAccumulators.entrySet()) { + + Object value = null; + if (entry.getValue() != null) { + value = entry.getValue().deserializeValue(loader); + } + + accumulators.put(entry.getKey(), value); + } + + return accumulators; + } + } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializedValue.java b/flink-core/src/main/java/org/apache/flink/util/SerializedValue.java similarity index 96% rename from flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializedValue.java rename to flink-core/src/main/java/org/apache/flink/util/SerializedValue.java index 6a5468a5355ed..5731fc1d7abda 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/util/SerializedValue.java +++ b/flink-core/src/main/java/org/apache/flink/util/SerializedValue.java @@ -16,9 +16,7 @@ * limitations under the License. */ -package org.apache.flink.runtime.util; - -import org.apache.flink.util.InstantiationUtil; +package org.apache.flink.util; import java.io.IOException; import java.util.Arrays; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorSnapshot.java b/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorSnapshot.java index 0f1911db2f137..b813153c2b5c5 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorSnapshot.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/accumulators/AccumulatorSnapshot.java @@ -21,7 +21,7 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.api.common.accumulators.Accumulator; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; -import org.apache.flink.runtime.util.SerializedValue; +import org.apache.flink.util.SerializedValue; import java.io.IOException; import java.io.Serializable; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java index 9ea3b6fdca9a9..370ae50fd0a2e 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/PendingCheckpoint.java @@ -26,7 +26,7 @@ import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.executiongraph.ExecutionVertex; import org.apache.flink.runtime.state.StateHandle; -import org.apache.flink.runtime.util.SerializedValue; +import org.apache.flink.util.SerializedValue; /** * A pending checkpoint is a checkpoint that has been started, but has not been diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateForTask.java b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateForTask.java index 73deeed7b556e..120c503e7bd06 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateForTask.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/checkpoint/StateForTask.java @@ -20,7 +20,7 @@ import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.state.StateHandle; -import org.apache.flink.runtime.util.SerializedValue; +import org.apache.flink.util.SerializedValue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/client/SerializedJobExecutionResult.java b/flink-runtime/src/main/java/org/apache/flink/runtime/client/SerializedJobExecutionResult.java index 029bc3876f738..ec2312f36b878 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/client/SerializedJobExecutionResult.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/client/SerializedJobExecutionResult.java @@ -20,11 +20,10 @@ import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.JobID; -import org.apache.flink.runtime.util.SerializedValue; +import org.apache.flink.api.common.accumulators.AccumulatorHelper; +import org.apache.flink.util.SerializedValue; import java.io.IOException; -import java.util.Collections; -import java.util.HashMap; import java.util.Map; import java.util.concurrent.TimeUnit; @@ -80,17 +79,8 @@ public Map> getSerializedAccumulatorResults() { } public JobExecutionResult toJobExecutionResult(ClassLoader loader) throws IOException, ClassNotFoundException { - Map accumulators = null; - if (accumulatorResults != null) { - accumulators = accumulatorResults.isEmpty() ? - Collections.emptyMap() : - new HashMap(this.accumulatorResults.size()); - - for (Map.Entry> entry : this.accumulatorResults.entrySet()) { - Object o = entry.getValue() == null ? null : entry.getValue().deserializeValue(loader); - accumulators.put(entry.getKey(), o); - } - } + Map accumulators = + AccumulatorHelper.deserializeAccumulators(accumulatorResults, loader); return new JobExecutionResult(jobId, netRuntime, accumulators); } diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java index 0a1268d989733..c4065d227784a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/deployment/TaskDeploymentDescriptor.java @@ -24,7 +24,7 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.jobgraph.JobVertexID; import org.apache.flink.runtime.state.StateHandle; -import org.apache.flink.runtime.util.SerializedValue; +import org.apache.flink.util.SerializedValue; import java.io.Serializable; import java.util.Collection; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java index 64c4d470f540d..c7191fa78fa43 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/Execution.java @@ -46,7 +46,7 @@ import org.apache.flink.runtime.messages.TaskMessages.TaskOperationResult; import org.apache.flink.runtime.state.StateHandle; import org.apache.flink.runtime.taskmanager.Task; -import org.apache.flink.runtime.util.SerializedValue; +import org.apache.flink.util.SerializedValue; import org.apache.flink.util.ExceptionUtils; import org.slf4j.Logger; import scala.concurrent.ExecutionContext; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java index b52a4e814229d..169971d43c1fb 100755 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionGraph.java @@ -42,7 +42,7 @@ import org.apache.flink.runtime.messages.ExecutionGraphMessages; import org.apache.flink.runtime.taskmanager.TaskExecutionState; import org.apache.flink.runtime.util.SerializableObject; -import org.apache.flink.runtime.util.SerializedValue; +import org.apache.flink.util.SerializedValue; import org.apache.flink.util.ExceptionUtils; import org.apache.flink.util.InstantiationUtil; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java index dcf64c0f2b786..78e980490ce7a 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/executiongraph/ExecutionVertex.java @@ -43,7 +43,7 @@ import org.apache.flink.runtime.jobmanager.scheduler.Scheduler; import org.apache.flink.runtime.state.StateHandle; -import org.apache.flink.runtime.util.SerializedValue; +import org.apache.flink.util.SerializedValue; import org.slf4j.Logger; import scala.concurrent.duration.FiniteDuration; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java index db12e0a8c7f38..381b8d3dfaa63 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/messages/checkpoint/AcknowledgeCheckpoint.java @@ -21,7 +21,7 @@ import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.executiongraph.ExecutionAttemptID; import org.apache.flink.runtime.state.StateHandle; -import org.apache.flink.runtime.util.SerializedValue; +import org.apache.flink.util.SerializedValue; /** * This message is sent from the {@link org.apache.flink.runtime.taskmanager.TaskManager} to the diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java index 8cfc1c367f7a3..07cea33f51b95 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/RuntimeEnvironment.java @@ -34,7 +34,7 @@ import org.apache.flink.runtime.memorymanager.MemoryManager; import org.apache.flink.runtime.messages.checkpoint.AcknowledgeCheckpoint; import org.apache.flink.runtime.state.StateHandle; -import org.apache.flink.runtime.util.SerializedValue; +import org.apache.flink.util.SerializedValue; import java.util.Map; import java.util.concurrent.Future; diff --git a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java index 1b43139985251..c7abce0bc575f 100644 --- a/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java +++ b/flink-runtime/src/main/java/org/apache/flink/runtime/taskmanager/Task.java @@ -55,7 +55,7 @@ import org.apache.flink.runtime.messages.TaskMessages.TaskInFinalState; import org.apache.flink.runtime.state.StateHandle; import org.apache.flink.runtime.state.StateUtils; -import org.apache.flink.runtime.util.SerializedValue; +import org.apache.flink.util.SerializedValue; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import scala.concurrent.duration.FiniteDuration; diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala index d001d5a680a6e..839fdb4f55432 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/jobmanager/JobManager.scala @@ -48,7 +48,7 @@ import org.apache.flink.runtime.security.SecurityUtils import org.apache.flink.runtime.security.SecurityUtils.FlinkSecuredRunner import org.apache.flink.runtime.taskmanager.TaskManager import org.apache.flink.runtime.util.ZooKeeperUtil -import org.apache.flink.runtime.util.{SerializedValue, EnvironmentInformation} +import org.apache.flink.runtime.util.EnvironmentInformation import org.apache.flink.runtime.webmonitor.WebMonitor import org.apache.flink.runtime.{FlinkActor, StreamingMode, LeaderSessionMessages} import org.apache.flink.runtime.{LogMessages} @@ -60,7 +60,7 @@ import org.apache.flink.runtime.jobmanager.scheduler.{Scheduler => FlinkSchedule import org.apache.flink.runtime.messages.JobManagerMessages._ import org.apache.flink.runtime.messages.RegistrationMessages._ import org.apache.flink.runtime.messages.TaskManagerMessages.{SendStackTrace, Heartbeat} -import org.apache.flink.util.{ExceptionUtils, InstantiationUtil} +import org.apache.flink.util.{SerializedValue, ExceptionUtils, InstantiationUtil} import _root_.akka.pattern.ask import scala.concurrent._ @@ -769,7 +769,7 @@ class JobManager( currentJobs.get(jobID) match { case Some((graph, jobInfo)) => val accumulatorValues = graph.getAccumulatorsSerialized() - sender() ! decorateMessage(AccumulatorResultsFound(jobID, accumulatorValues)) + sender() ! decorateMessage(AccumulatorResultsFound(jobID, accumulatorValues)) case None => archive.forward(message) } @@ -1271,7 +1271,7 @@ object JobManager { /** * Create the job manager components as (instanceManager, scheduler, libraryCacheManager, - * archiverProps, accumulatorManager, defaultExecutionRetries, + * archiverProps, defaultExecutionRetries, * delayBetweenRetries, timeout) * * @param configuration The configuration from which to parse the config values. diff --git a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/accumulators/AccumulatorMessages.scala b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/accumulators/AccumulatorMessages.scala index 015c96e26b84b..107ba825864c6 100644 --- a/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/accumulators/AccumulatorMessages.scala +++ b/flink-runtime/src/main/scala/org/apache/flink/runtime/messages/accumulators/AccumulatorMessages.scala @@ -20,7 +20,7 @@ package org.apache.flink.runtime.messages.accumulators import org.apache.flink.api.common.JobID import org.apache.flink.runtime.accumulators.StringifiedAccumulatorResult -import org.apache.flink.runtime.util.SerializedValue +import org.apache.flink.util.SerializedValue /** * Base trait of all accumulator messages diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java index 902eb4be3020d..08cb0a3b54d35 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/checkpoint/CheckpointStateRestoreTest.java @@ -29,7 +29,7 @@ import org.apache.flink.runtime.state.LocalStateHandle; import org.apache.flink.runtime.state.StateHandle; import org.apache.flink.runtime.util.SerializableObject; -import org.apache.flink.runtime.util.SerializedValue; +import org.apache.flink.util.SerializedValue; import org.junit.Test; import org.mockito.Mockito; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/client/SerializedJobExecutionResultTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/client/SerializedJobExecutionResultTest.java index 5c9ffa7c6747f..a22ed13a0e540 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/client/SerializedJobExecutionResultTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/client/SerializedJobExecutionResultTest.java @@ -21,7 +21,7 @@ import org.apache.flink.api.common.JobExecutionResult; import org.apache.flink.api.common.JobID; import org.apache.flink.runtime.testutils.CommonTestUtils; -import org.apache.flink.runtime.util.SerializedValue; +import org.apache.flink.util.SerializedValue; import org.junit.Test; import java.util.HashMap; @@ -93,7 +93,7 @@ public void testSerializationWithNullValues() { JobExecutionResult jResult = result.toJobExecutionResult(getClass().getClassLoader()); assertNull(jResult.getJobID()); - assertNull(jResult.getAllAccumulatorResults()); + assertTrue(jResult.getAllAccumulatorResults().isEmpty()); } catch (Exception e) { e.printStackTrace(); diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java index 211d5e38ce461..597249acba6e1 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/messages/CheckpointMessagesTest.java @@ -27,7 +27,7 @@ import org.apache.flink.runtime.messages.checkpoint.TriggerCheckpoint; import org.apache.flink.runtime.state.StateHandle; import org.apache.flink.runtime.testutils.CommonTestUtils; -import org.apache.flink.runtime.util.SerializedValue; +import org.apache.flink.util.SerializedValue; import org.junit.Test; import java.io.IOException; diff --git a/flink-runtime/src/test/java/org/apache/flink/runtime/util/SerializedValueTest.java b/flink-runtime/src/test/java/org/apache/flink/runtime/util/SerializedValueTest.java index 0154334493392..0d19613c3fa2d 100644 --- a/flink-runtime/src/test/java/org/apache/flink/runtime/util/SerializedValueTest.java +++ b/flink-runtime/src/test/java/org/apache/flink/runtime/util/SerializedValueTest.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.util; import org.apache.flink.runtime.testutils.CommonTestUtils; +import org.apache.flink.util.SerializedValue; import org.junit.Test; import static org.junit.Assert.assertEquals; diff --git a/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/tachyon/FileStateHandleTest.java b/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/tachyon/FileStateHandleTest.java index 2873c789cf92d..ec414c020c5d5 100644 --- a/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/tachyon/FileStateHandleTest.java +++ b/flink-staging/flink-fs-tests/src/test/java/org/apache/flink/tachyon/FileStateHandleTest.java @@ -30,7 +30,7 @@ import org.apache.flink.runtime.state.FileStateHandle; import org.apache.flink.runtime.state.StateHandle; import org.apache.flink.runtime.state.StateHandleProvider; -import org.apache.flink.runtime.util.SerializedValue; +import org.apache.flink.util.SerializedValue; import org.apache.hadoop.conf.Configuration; import org.apache.hadoop.fs.FileUtil; import org.apache.hadoop.hdfs.MiniDFSCluster;