Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -29,7 +29,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import java.time.Duration;
import java.util.HashMap;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
Expand All @@ -44,18 +43,21 @@ public class JobManager {
private ConcurrentHashMap<JobID, FlinkJobProgressPoller> jobProgressPollerMap =
new ConcurrentHashMap<>();
private FlinkZeppelinContext z;
private String flinkWebUI;
private String flinkWebUrl;
private String replacedFlinkWebUrl;

public JobManager(FlinkZeppelinContext z,
String flinkWebUI) {
String flinkWebUrl,
String replacedFlinkWebUrl) {
this.z = z;
this.flinkWebUI = flinkWebUI;
this.flinkWebUrl = flinkWebUrl;
this.replacedFlinkWebUrl = replacedFlinkWebUrl;
}

public void addJob(InterpreterContext context, JobClient jobClient) {
String paragraphId = context.getParagraphId();
JobClient previousJobClient = this.jobs.put(paragraphId, jobClient);
FlinkJobProgressPoller thread = new FlinkJobProgressPoller(flinkWebUI, jobClient.getJobID(), context);
FlinkJobProgressPoller thread = new FlinkJobProgressPoller(flinkWebUrl, jobClient.getJobID(), context);
thread.setName("JobProgressPoller-Thread-" + paragraphId);
thread.start();
this.jobProgressPollerMap.put(jobClient.getJobID(), thread);
Expand All @@ -82,7 +84,12 @@ public void removeJob(String paragraphId) {
public void sendFlinkJobUrl(InterpreterContext context) {
JobClient jobClient = jobs.get(context.getParagraphId());
if (jobClient != null) {
String jobUrl = flinkWebUI + "#/job/" + jobClient.getJobID();
String jobUrl = null;
if (replacedFlinkWebUrl != null) {
jobUrl = replacedFlinkWebUrl + "#/job/" + jobClient.getJobID();
} else {
jobUrl = flinkWebUrl + "#/job/" + jobClient.getJobID();
}
Map<String, String> infos = new HashMap<>();
infos.put("jobUrl", jobUrl);
infos.put("label", "FLINK JOB");
Expand Down Expand Up @@ -162,16 +169,16 @@ public void shutdown() {

class FlinkJobProgressPoller extends Thread {

private String flinkWebUI;
private String flinkWebUrl;
private JobID jobId;
private InterpreterContext context;
private boolean isStreamingInsertInto;
private int progress;
private AtomicBoolean running = new AtomicBoolean(true);
private boolean isFirstPoll = true;

FlinkJobProgressPoller(String flinkWebUI, JobID jobId, InterpreterContext context) {
this.flinkWebUI = flinkWebUI;
FlinkJobProgressPoller(String flinkWebUrl, JobID jobId, InterpreterContext context) {
this.flinkWebUrl = flinkWebUrl;
this.jobId = jobId;
this.context = context;
this.isStreamingInsertInto = context.getLocalProperties().containsKey("flink.streaming.insert_into");
Expand All @@ -186,7 +193,7 @@ public void run() {
synchronized (running) {
running.wait(1000);
}
rootNode = Unirest.get(flinkWebUI + "/jobs/" + jobId.toString())
rootNode = Unirest.get(flinkWebUrl + "/jobs/" + jobId.toString())
.asJson().getBody();
JSONArray vertices = rootNode.getObject().getJSONArray("vertices");
int totalTasks = 0;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -102,6 +102,7 @@ class FlinkScalaInterpreter(val properties: Properties) {
private var flinkVersion: FlinkVersion = _
private var flinkShims: FlinkShims = _
private var jmWebUrl: String = _
private var replacedJMWebUrl: String = _
private var jobManager: JobManager = _
private var defaultParallelism = 1
private var defaultSqlParallelism = 1
Expand All @@ -120,7 +121,7 @@ class FlinkScalaInterpreter(val properties: Properties) {
modifiers.add("@transient")
this.bind("z", z.getClass().getCanonicalName(), z, modifiers);

this.jobManager = new JobManager(this.z, jmWebUrl)
this.jobManager = new JobManager(this.z, jmWebUrl, replacedJMWebUrl)

// register JobListener
val jobListener = new FlinkJobListener()
Expand Down Expand Up @@ -268,7 +269,7 @@ class FlinkScalaInterpreter(val properties: Properties) {
// for some cloud vender, the yarn address may be mapped to some other address.
val yarnAddress = properties.getProperty("flink.webui.yarn.address")
if (!StringUtils.isBlank(yarnAddress)) {
this.jmWebUrl = replaceYarnAddress(this.jmWebUrl, yarnAddress)
this.replacedJMWebUrl = replaceYarnAddress(this.jmWebUrl, yarnAddress)
}
} else {
this.jmWebUrl = clusterClient.getWebInterfaceURL
Expand Down