Skip to content

Commit

Permalink
[Feature] expose local mode in web-ui
Browse files Browse the repository at this point in the history
  • Loading branch information
zhouli16 committed Mar 15, 2023
1 parent ad44867 commit 3de1079
Show file tree
Hide file tree
Showing 9 changed files with 79 additions and 9 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -118,4 +118,8 @@ public static boolean isRemoteMode(Integer value) {
public static boolean isRemoteMode(ExecutionMode mode) {
return REMOTE.equals(mode);
}

public static boolean isLocalMode(ExecutionMode mode) {
return LOCAL.equals(mode);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -560,6 +560,7 @@ public static StorageType getStorageType(Integer execMode) {
case KUBERNETES_NATIVE_SESSION:
case KUBERNETES_NATIVE_APPLICATION:
case REMOTE:
case LOCAL:
return StorageType.LFS;
default:
throw new UnsupportedOperationException("Unsupported ".concat(executionMode.getName()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,7 @@ private BuildPipeline createPipelineInstance(@Nonnull Application app) {
case YARN_PER_JOB:
case YARN_SESSION:
case REMOTE:
case LOCAL:
FlinkRemotePerJobBuildRequest buildRequest =
new FlinkRemotePerJobBuildRequest(
app.getJobName(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1112,6 +1112,9 @@ public Application getApp(Application appParam) {
&& application.getStartTime().getTime() > 0) {
application.setDuration(now - application.getStartTime().getTime());
}
// add flink web url info for local-mode
} else if (ExecutionMode.isLocalMode(application.getExecutionModeEnum())) {
application.setFlinkRestUrl(application.getJobManagerUrl());
}

setYarnQueue(application);
Expand Down Expand Up @@ -1226,6 +1229,11 @@ public void cancel(Application appParam) throws Exception {
URI activeAddress = cluster.getRemoteURI();
properties.put(RestOptions.ADDRESS.key(), activeAddress.getHost());
properties.put(RestOptions.PORT.key(), activeAddress.getPort());
} else if (ExecutionMode.isLocalMode(application.getExecutionModeEnum())) {
String jobManagerUrl = application.getJobManagerUrl();
URI uri = new URI(jobManagerUrl);
properties.put(RestOptions.ADDRESS.key(), uri.getHost());
properties.put(RestOptions.PORT.key(), uri.getPort());
}

CancelRequest cancelRequest =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import javax.annotation.Nullable;

import java.net.URI;
import java.net.URISyntaxException;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -298,6 +299,19 @@ public void trigger(Long appId, @Nullable String savepointPath) {

Map<String, Object> properties = this.tryGetRestProps(application, cluster);

if (ExecutionMode.isLocalMode(application.getExecutionModeEnum())) {
String jobManagerUrl = application.getJobManagerUrl();
URI uri = null;
try {
uri = new URI(jobManagerUrl);
properties.put(RestOptions.ADDRESS.key(), uri.getHost());
properties.put(RestOptions.PORT.key(), uri.getPort());
} catch (URISyntaxException e) {
log.error("Trigger savepoint for flink job failed.", e);
throw new ApiAlertException(e);
}
}

TriggerSavepointRequest request =
new TriggerSavepointRequest(
flinkEnv.getFlinkVersion(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -656,6 +656,10 @@ private FlinkCluster getFlinkCluster(Application application) {
FLINK_CLUSTER_MAP.put(application.getFlinkClusterId(), flinkCluster);
}
return flinkCluster;
} else if (ExecutionMode.isLocalMode(application.getExecutionModeEnum())) {
FlinkCluster flinkCluster = new FlinkCluster();
flinkCluster.setAddress(application.getJobManagerUrl());
return flinkCluster;
}
return null;
}
Expand Down Expand Up @@ -701,7 +705,8 @@ private JobsOverview httpJobsOverview(Application application, FlinkCluster flin
}
return yarnRestRequest(reqURL, JobsOverview.class);
} else if (ExecutionMode.REMOTE.equals(execMode)
|| ExecutionMode.YARN_SESSION.equals(execMode)) {
|| ExecutionMode.YARN_SESSION.equals(execMode)
|| ExecutionMode.LOCAL.equals(execMode)) {
if (application.getJobId() != null) {
String remoteUrl = flinkCluster.getAddress() + "/" + flinkUrl;
JobsOverview jobsOverview = httpRestRequest(remoteUrl, JobsOverview.class);
Expand Down Expand Up @@ -734,7 +739,8 @@ private CheckPoints httpCheckpoints(Application application, FlinkCluster flinkC
}
return yarnRestRequest(reqURL, CheckPoints.class);
} else if (ExecutionMode.REMOTE.equals(execMode)
|| ExecutionMode.YARN_SESSION.equals(execMode)) {
|| ExecutionMode.YARN_SESSION.equals(execMode)
|| ExecutionMode.LOCAL.equals(execMode)) {
if (application.getJobId() != null) {
String remoteUrl =
flinkCluster.getAddress() + "/" + String.format(flinkUrl, application.getJobId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ export enum BuildStateEnum {
}
/* ExecutionMode */
export enum ExecModeEnum {
/** LOCAL */
LOCAL = 0,
/** remote (standalone) */
REMOTE = 1,
/** yarn per-job (deprecated, please use yarn-application mode) */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ export const k8sRestExposedType = [
];

export const executionModes = [
{ label: 'local (for testing only, not recommended for production environment)', value: ExecModeEnum.LOCAL, disabled: false },
{ label: 'remote', value: ExecModeEnum.REMOTE, disabled: false },
{ label: 'yarn application', value: ExecModeEnum.YARN_APPLICATION, disabled: false },
{ label: 'yarn session', value: ExecModeEnum.YARN_SESSION, disabled: false },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,22 @@

package org.apache.streampark.flink.client.impl

import java.lang.{Integer => JavaInt}
import org.apache.flink.api.common.JobID

import java.lang.{Integer => JavaInt}
import org.apache.flink.client.deployment.executors.RemoteExecutor
import org.apache.flink.client.program.{ClusterClient, MiniClusterClient, PackagedProgram}
import org.apache.flink.client.program.MiniClusterClient.MiniClusterId
import org.apache.flink.configuration._
import org.apache.flink.runtime.jobmaster.JobResult
import org.apache.flink.runtime.minicluster.{MiniCluster, MiniClusterConfiguration}

import org.apache.streampark.common.enums.ExecutionMode
import org.apache.streampark.common.util.Utils
import org.apache.streampark.flink.client.`trait`.FlinkClientTrait
import org.apache.streampark.flink.client.bean._

import java.util.function.Consumer

object LocalClient extends FlinkClientTrait {

override def setConfig(submitRequest: SubmitRequest, flinkConfig: Configuration): Unit = {
Expand All @@ -44,14 +48,15 @@ object LocalClient extends FlinkClientTrait {
override def doSubmit(submitRequest: SubmitRequest, flinkConfig: Configuration): SubmitResponse = {
var packageProgram: PackagedProgram = null
var client: ClusterClient[MiniClusterId] = null
var jobId: JobID = null
try {
// build JobGraph
val packageProgramJobGraph = super.getJobGraph(flinkConfig, submitRequest, submitRequest.userJarFile)
packageProgram = packageProgramJobGraph._1
val jobGraph = packageProgramJobGraph._2
client = createLocalCluster(flinkConfig)
val jobId = client.submitJob(jobGraph).get().toString
SubmitResponse(jobId, flinkConfig.toMap, jobId, client.getWebInterfaceURL)
jobId = client.submitJob(jobGraph).get()
SubmitResponse(jobId.toString, flinkConfig.toMap, jobId.toString, client.getWebInterfaceURL)
} catch {
case e: Exception =>
logError(s"submit flink job fail in ${submitRequest.executionMode} mode")
Expand All @@ -61,16 +66,44 @@ object LocalClient extends FlinkClientTrait {
if (submitRequest.safePackageProgram) {
Utils.close(packageProgram)
}
Utils.close(client)
client.requestJobResult(jobId).thenAccept(
new Consumer[JobResult] {
override def accept(t: JobResult): Unit = {
client.shutDownCluster()
Utils.close(client)
}
}
);
}
}

override def doTriggerSavepoint(request: TriggerSavepointRequest, flinkConfig: Configuration): SavepointResponse = {
RemoteClient.doTriggerSavepoint(request, flinkConfig)
// This is a workaround, here we use ClusterClient of StandaloneCluster instead of MiniClusterClient. Because there is no good way to
// retrieve MiniClusterClient for specific local job, for multi local job management, then we have to maintain the mapping between local
// job and MiniClusterClient. With the aid of Standalone ClusterClient, we can get rid of this step.
val requestAdapter = TriggerSavepointRequest(
request.flinkVersion,
ExecutionMode.REMOTE,
request.properties,
request.clusterId,
request.jobId,
request.savepointPath,
request.kubernetesNamespace)
RemoteClient.doTriggerSavepoint(requestAdapter, flinkConfig)
}

override def doCancel(cancelRequest: CancelRequest, flinkConfig: Configuration): CancelResponse = {
RemoteClient.doCancel(cancelRequest, flinkConfig)
val requestAdapter = CancelRequest(
cancelRequest.flinkVersion,
ExecutionMode.REMOTE,
cancelRequest.properties,
cancelRequest.clusterId,
cancelRequest.jobId,
cancelRequest.withSavepoint,
cancelRequest.withDrain,
cancelRequest.savepointPath,
cancelRequest.kubernetesNamespace)
RemoteClient.doCancel(requestAdapter, flinkConfig)
}

private[this] def createLocalCluster(flinkConfig: Configuration) = {
Expand Down

0 comments on commit 3de1079

Please sign in to comment.