Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[Feature] expose local mode in web-ui #2445

Closed
wants to merge 1 commit into from
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -118,4 +118,8 @@ public static boolean isRemoteMode(Integer value) {
public static boolean isRemoteMode(ExecutionMode mode) {
return REMOTE.equals(mode);
}

public static boolean isLocalMode(ExecutionMode mode) {
return LOCAL.equals(mode);
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -560,6 +560,7 @@ public static StorageType getStorageType(Integer execMode) {
case KUBERNETES_NATIVE_SESSION:
case KUBERNETES_NATIVE_APPLICATION:
case REMOTE:
case LOCAL:
return StorageType.LFS;
default:
throw new UnsupportedOperationException("Unsupported ".concat(executionMode.getName()));
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -346,6 +346,7 @@ private BuildPipeline createPipelineInstance(@Nonnull Application app) {
case YARN_PER_JOB:
case YARN_SESSION:
case REMOTE:
case LOCAL:
FlinkRemotePerJobBuildRequest buildRequest =
new FlinkRemotePerJobBuildRequest(
app.getJobName(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1112,6 +1112,9 @@ public Application getApp(Application appParam) {
&& application.getStartTime().getTime() > 0) {
application.setDuration(now - application.getStartTime().getTime());
}
// add flink web url info for local-mode
} else if (ExecutionMode.isLocalMode(application.getExecutionModeEnum())) {
application.setFlinkRestUrl(application.getJobManagerUrl());
}

setYarnQueue(application);
Expand Down Expand Up @@ -1226,6 +1229,11 @@ public void cancel(Application appParam) throws Exception {
URI activeAddress = cluster.getRemoteURI();
properties.put(RestOptions.ADDRESS.key(), activeAddress.getHost());
properties.put(RestOptions.PORT.key(), activeAddress.getPort());
} else if (ExecutionMode.isLocalMode(application.getExecutionModeEnum())) {
String jobManagerUrl = application.getJobManagerUrl();
URI uri = new URI(jobManagerUrl);
properties.put(RestOptions.ADDRESS.key(), uri.getHost());
properties.put(RestOptions.PORT.key(), uri.getPort());
}

CancelRequest cancelRequest =
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -67,6 +67,7 @@
import javax.annotation.Nullable;

import java.net.URI;
import java.net.URISyntaxException;
import java.util.Date;
import java.util.HashMap;
import java.util.Map;
Expand Down Expand Up @@ -298,6 +299,19 @@ public void trigger(Long appId, @Nullable String savepointPath) {

Map<String, Object> properties = this.tryGetRestProps(application, cluster);

if (ExecutionMode.isLocalMode(application.getExecutionModeEnum())) {
String jobManagerUrl = application.getJobManagerUrl();
URI uri = null;
try {
uri = new URI(jobManagerUrl);
properties.put(RestOptions.ADDRESS.key(), uri.getHost());
properties.put(RestOptions.PORT.key(), uri.getPort());
} catch (URISyntaxException e) {
log.error("Trigger savepoint for flink job failed.", e);
throw new ApiAlertException(e);
}
}

TriggerSavepointRequest request =
new TriggerSavepointRequest(
flinkEnv.getFlinkVersion(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -656,6 +656,10 @@ private FlinkCluster getFlinkCluster(Application application) {
FLINK_CLUSTER_MAP.put(application.getFlinkClusterId(), flinkCluster);
}
return flinkCluster;
} else if (ExecutionMode.isLocalMode(application.getExecutionModeEnum())) {
FlinkCluster flinkCluster = new FlinkCluster();
flinkCluster.setAddress(application.getJobManagerUrl());
return flinkCluster;
}
return null;
}
Expand Down Expand Up @@ -701,7 +705,8 @@ private JobsOverview httpJobsOverview(Application application, FlinkCluster flin
}
return yarnRestRequest(reqURL, JobsOverview.class);
} else if (ExecutionMode.REMOTE.equals(execMode)
|| ExecutionMode.YARN_SESSION.equals(execMode)) {
|| ExecutionMode.YARN_SESSION.equals(execMode)
|| ExecutionMode.LOCAL.equals(execMode)) {
if (application.getJobId() != null) {
String remoteUrl = flinkCluster.getAddress() + "/" + flinkUrl;
JobsOverview jobsOverview = httpRestRequest(remoteUrl, JobsOverview.class);
Expand Down Expand Up @@ -734,7 +739,8 @@ private CheckPoints httpCheckpoints(Application application, FlinkCluster flinkC
}
return yarnRestRequest(reqURL, CheckPoints.class);
} else if (ExecutionMode.REMOTE.equals(execMode)
|| ExecutionMode.YARN_SESSION.equals(execMode)) {
|| ExecutionMode.YARN_SESSION.equals(execMode)
|| ExecutionMode.LOCAL.equals(execMode)) {
if (application.getJobId() != null) {
String remoteUrl =
flinkCluster.getAddress() + "/" + String.format(flinkUrl, application.getJobId());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,8 @@ export enum BuildStateEnum {
}
/* ExecutionMode */
export enum ExecModeEnum {
/** LOCAL */
LOCAL = 0,
/** remote (standalone) */
REMOTE = 1,
/** yarn per-job (deprecated, please use yarn-application mode) */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,7 @@ export const k8sRestExposedType = [
];

export const executionModes = [
{ label: 'local (for testing only, not recommended for production environment)', value: ExecModeEnum.LOCAL, disabled: false },
{ label: 'remote', value: ExecModeEnum.REMOTE, disabled: false },
{ label: 'yarn application', value: ExecModeEnum.YARN_APPLICATION, disabled: false },
{ label: 'yarn session', value: ExecModeEnum.YARN_SESSION, disabled: false },
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,18 +17,22 @@

package org.apache.streampark.flink.client.impl

import java.lang.{Integer => JavaInt}
import org.apache.flink.api.common.JobID

import java.lang.{Integer => JavaInt}
import org.apache.flink.client.deployment.executors.RemoteExecutor
import org.apache.flink.client.program.{ClusterClient, MiniClusterClient, PackagedProgram}
import org.apache.flink.client.program.MiniClusterClient.MiniClusterId
import org.apache.flink.configuration._
import org.apache.flink.runtime.jobmaster.JobResult
import org.apache.flink.runtime.minicluster.{MiniCluster, MiniClusterConfiguration}

import org.apache.streampark.common.enums.ExecutionMode
import org.apache.streampark.common.util.Utils
import org.apache.streampark.flink.client.`trait`.FlinkClientTrait
import org.apache.streampark.flink.client.bean._

import java.util.function.Consumer

object LocalClient extends FlinkClientTrait {

override def setConfig(submitRequest: SubmitRequest, flinkConfig: Configuration): Unit = {
Expand All @@ -44,14 +48,15 @@ object LocalClient extends FlinkClientTrait {
override def doSubmit(submitRequest: SubmitRequest, flinkConfig: Configuration): SubmitResponse = {
var packageProgram: PackagedProgram = null
var client: ClusterClient[MiniClusterId] = null
var jobId: JobID = null
try {
// build JobGraph
val packageProgramJobGraph = super.getJobGraph(flinkConfig, submitRequest, submitRequest.userJarFile)
packageProgram = packageProgramJobGraph._1
val jobGraph = packageProgramJobGraph._2
client = createLocalCluster(flinkConfig)
val jobId = client.submitJob(jobGraph).get().toString
SubmitResponse(jobId, flinkConfig.toMap, jobId, client.getWebInterfaceURL)
jobId = client.submitJob(jobGraph).get()
SubmitResponse(jobId.toString, flinkConfig.toMap, jobId.toString, client.getWebInterfaceURL)
} catch {
case e: Exception =>
logError(s"submit flink job fail in ${submitRequest.executionMode} mode")
Expand All @@ -61,16 +66,44 @@ object LocalClient extends FlinkClientTrait {
if (submitRequest.safePackageProgram) {
Utils.close(packageProgram)
}
Utils.close(client)
client.requestJobResult(jobId).thenAccept(
new Consumer[JobResult] {
override def accept(t: JobResult): Unit = {
client.shutDownCluster()
Utils.close(client)
}
}
);
}
}

override def doTriggerSavepoint(request: TriggerSavepointRequest, flinkConfig: Configuration): SavepointResponse = {
RemoteClient.doTriggerSavepoint(request, flinkConfig)
// This is a workaround, here we use ClusterClient of StandaloneCluster instead of MiniClusterClient. Because there is no good way to
// retrieve MiniClusterClient for specific local job, for multi local job management, then we have to maintain the mapping between local
// job and MiniClusterClient. With the aid of Standalone ClusterClient, we can get rid of this step.
val requestAdapter = TriggerSavepointRequest(
request.flinkVersion,
ExecutionMode.REMOTE,
request.properties,
request.clusterId,
request.jobId,
request.savepointPath,
request.kubernetesNamespace)
RemoteClient.doTriggerSavepoint(requestAdapter, flinkConfig)
}

override def doCancel(cancelRequest: CancelRequest, flinkConfig: Configuration): CancelResponse = {
RemoteClient.doCancel(cancelRequest, flinkConfig)
val requestAdapter = CancelRequest(
cancelRequest.flinkVersion,
ExecutionMode.REMOTE,
cancelRequest.properties,
cancelRequest.clusterId,
cancelRequest.jobId,
cancelRequest.withSavepoint,
cancelRequest.withDrain,
cancelRequest.savepointPath,
cancelRequest.kubernetesNamespace)
RemoteClient.doCancel(requestAdapter, flinkConfig)
}

private[this] def createLocalCluster(flinkConfig: Configuration) = {
Expand Down