Skip to content

Commit

Permalink
[FLINK-13758][client] Support to register DFS files as distributed cache
Browse files Browse the repository at this point in the history
All the Flink Standalone, Yarn, Mesos, Kubernetes session clusters are using RestClusterClient#submitJob to submit a job to an existing session. Before this commit, the Flink client will hang when trying to register DFS artifacts as distributed cache for session cluster.
  • Loading branch information
wangyang0918 authored and kl0u committed Jan 21, 2020
1 parent 90ad2e6 commit 0ea00ea
Show file tree
Hide file tree
Showing 3 changed files with 45 additions and 6 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -343,8 +343,17 @@ public CompletableFuture<JobSubmissionResult> submitJob(@Nonnull JobGraph jobGra
}

for (Map.Entry<String, DistributedCache.DistributedCacheEntry> artifacts : jobGraph.getUserArtifacts().entrySet()) {
artifactFileNames.add(new JobSubmitRequestBody.DistributedCacheFile(artifacts.getKey(), new Path(artifacts.getValue().filePath).getName()));
filesToUpload.add(new FileUpload(Paths.get(artifacts.getValue().filePath), RestConstants.CONTENT_TYPE_BINARY));
final Path artifactFilePath = new Path(artifacts.getValue().filePath);
try {
// Only local artifacts need to be uploaded.
if (!artifactFilePath.getFileSystem().isDistributedFS()) {
artifactFileNames.add(new JobSubmitRequestBody.DistributedCacheFile(artifacts.getKey(), artifactFilePath.getName()));
filesToUpload.add(new FileUpload(Paths.get(artifacts.getValue().filePath), RestConstants.CONTENT_TYPE_BINARY));
}
} catch (IOException e) {
throw new CompletionException(
new FlinkException("Failed to get the FileSystem of artifact " + artifactFilePath + ".", e));
}
}

final JobSubmitRequestBody requestBody = new JobSubmitRequestBody(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,9 +19,12 @@
package org.apache.flink.hdfstests;

import org.apache.flink.api.common.functions.RichMapFunction;
import org.apache.flink.client.program.rest.RestClusterClient;
import org.apache.flink.core.fs.FileStatus;
import org.apache.flink.core.fs.FileSystem;
import org.apache.flink.core.fs.Path;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobmaster.JobResult;
import org.apache.flink.runtime.testutils.MiniClusterResourceConfiguration;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.sink.DiscardingSink;
Expand Down Expand Up @@ -120,8 +123,36 @@ public static void teardown() {
}

@Test
public void testDistributeFileViaDFS() throws Exception {
public void testDistributedFileViaDFS() throws Exception {
createJobWithRegisteredCachedFiles().execute("Distributed Cache Via Blob Test Program");
}

/**
* All the Flink Standalone, Yarn, Mesos, Kubernetes sessions are using {@link RestClusterClient#submitJob(JobGraph)}
* to submit a job to an existing session. This test will cover this cases.
*/
@Test(timeout = 30000)
public void testSubmittingJobViaRestClusterClient() throws Exception {
RestClusterClient<String> restClusterClient = new RestClusterClient<>(
MINI_CLUSTER_RESOURCE.getClientConfiguration(),
"testSubmittingJobViaRestClusterClient");

final JobGraph jobGraph = createJobWithRegisteredCachedFiles()
.getStreamGraph()
.getJobGraph();

final JobResult jobResult = restClusterClient
.submitJob(jobGraph)
.thenCompose(restClusterClient::requestJobResult)
.get();

final String messageInCaseOfFailure = jobResult.getSerializedThrowable().isPresent() ?
jobResult.getSerializedThrowable().get().getFullStringifiedStackTrace()
: "Job failed.";
assertTrue(messageInCaseOfFailure, jobResult.isSuccess());
}

private StreamExecutionEnvironment createJobWithRegisteredCachedFiles() {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setParallelism(1);

Expand All @@ -131,8 +162,7 @@ public void testDistributeFileViaDFS() throws Exception {
env.fromElements(1)
.map(new TestMapFunction())
.addSink(new DiscardingSink<>());

env.execute("Distributed Cache Via Blob Test Program");
return env;
}

private static class TestMapFunction extends RichMapFunction<Integer, String> {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ public static void uploadJobGraphFiles(
throw new FlinkException("Could not upload job files.", ioe);
}
}
jobGraph.writeUserArtifactEntriesToConfiguration();
}

/**
Expand Down Expand Up @@ -137,6 +138,5 @@ private static void setUserArtifactBlobKeys(JobGraph jobGraph, Collection<Tuple2
for (Tuple2<String, PermanentBlobKey> blobKey : blobKeys) {
jobGraph.setUserArtifactBlobKey(blobKey.f0, blobKey.f1);
}
jobGraph.writeUserArtifactEntriesToConfiguration();
}
}

0 comments on commit 0ea00ea

Please sign in to comment.