diff --git a/streampark-common/src/main/java/org/apache/streampark/common/enums/ExecutionMode.java b/streampark-common/src/main/java/org/apache/streampark/common/enums/ExecutionMode.java index eabc41333f..1042dd11b3 100644 --- a/streampark-common/src/main/java/org/apache/streampark/common/enums/ExecutionMode.java +++ b/streampark-common/src/main/java/org/apache/streampark/common/enums/ExecutionMode.java @@ -118,4 +118,8 @@ public static boolean isRemoteMode(Integer value) { public static boolean isRemoteMode(ExecutionMode mode) { return REMOTE.equals(mode); } + + public static boolean isLocalMode(ExecutionMode mode) { + return LOCAL.equals(mode); + } } diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java index ab41691e81..dbeb82ff41 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/entity/Application.java @@ -560,6 +560,7 @@ public static StorageType getStorageType(Integer execMode) { case KUBERNETES_NATIVE_SESSION: case KUBERNETES_NATIVE_APPLICATION: case REMOTE: + case LOCAL: return StorageType.LFS; default: throw new UnsupportedOperationException("Unsupported ".concat(executionMode.getName())); diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java index 7a978e1b03..b6babb499b 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/AppBuildPipeServiceImpl.java @@ -346,6 +346,7 @@ private BuildPipeline createPipelineInstance(@Nonnull Application app) { case YARN_PER_JOB: case YARN_SESSION: case REMOTE: + case LOCAL: FlinkRemotePerJobBuildRequest buildRequest = new FlinkRemotePerJobBuildRequest( app.getJobName(), diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java index 3a363c7c8c..9012833e8c 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ApplicationServiceImpl.java @@ -1112,6 +1112,9 @@ public Application getApp(Application appParam) { && application.getStartTime().getTime() > 0) { application.setDuration(now - application.getStartTime().getTime()); } + // add flink web url info for local-mode + } else if (ExecutionMode.isLocalMode(application.getExecutionModeEnum())) { + application.setFlinkRestUrl(application.getJobManagerUrl()); } setYarnQueue(application); @@ -1226,6 +1229,11 @@ public void cancel(Application appParam) throws Exception { URI activeAddress = cluster.getRemoteURI(); properties.put(RestOptions.ADDRESS.key(), activeAddress.getHost()); properties.put(RestOptions.PORT.key(), activeAddress.getPort()); + } else if (ExecutionMode.isLocalMode(application.getExecutionModeEnum())) { + String jobManagerUrl = application.getJobManagerUrl(); + URI uri = new URI(jobManagerUrl); + properties.put(RestOptions.ADDRESS.key(), uri.getHost()); + properties.put(RestOptions.PORT.key(), uri.getPort()); } CancelRequest cancelRequest = diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java index f36eeb2a11..89f26bcb3d 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/SavePointServiceImpl.java @@ -67,6 +67,7 @@ import javax.annotation.Nullable; import java.net.URI; +import java.net.URISyntaxException; import java.util.Date; import java.util.HashMap; import java.util.Map; @@ -298,6 +299,19 @@ public void trigger(Long appId, @Nullable String savepointPath) { Map properties = this.tryGetRestProps(application, cluster); + if (ExecutionMode.isLocalMode(application.getExecutionModeEnum())) { + String jobManagerUrl = application.getJobManagerUrl(); + URI uri = null; + try { + uri = new URI(jobManagerUrl); + properties.put(RestOptions.ADDRESS.key(), uri.getHost()); + properties.put(RestOptions.PORT.key(), uri.getPort()); + } catch (URISyntaxException e) { + log.error("Trigger savepoint for flink job failed.", e); + throw new ApiAlertException(e); + } + } + TriggerSavepointRequest request = new TriggerSavepointRequest( flinkEnv.getFlinkVersion(), diff --git a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java index c3d16acb5c..ba546818c4 100644 --- a/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java +++ b/streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/task/FlinkRESTAPIWatcher.java @@ -656,6 +656,10 @@ private FlinkCluster getFlinkCluster(Application application) { FLINK_CLUSTER_MAP.put(application.getFlinkClusterId(), flinkCluster); } return flinkCluster; + } else if (ExecutionMode.isLocalMode(application.getExecutionModeEnum())) { + FlinkCluster flinkCluster = new FlinkCluster(); + flinkCluster.setAddress(application.getJobManagerUrl()); + return flinkCluster; } return null; } @@ -701,7 +705,8 @@ private JobsOverview httpJobsOverview(Application application, FlinkCluster flin } return yarnRestRequest(reqURL, JobsOverview.class); } else if (ExecutionMode.REMOTE.equals(execMode) - || ExecutionMode.YARN_SESSION.equals(execMode)) { + || ExecutionMode.YARN_SESSION.equals(execMode) + || ExecutionMode.LOCAL.equals(execMode)) { if (application.getJobId() != null) { String remoteUrl = flinkCluster.getAddress() + "/" + flinkUrl; JobsOverview jobsOverview = httpRestRequest(remoteUrl, JobsOverview.class); @@ -734,7 +739,8 @@ private CheckPoints httpCheckpoints(Application application, FlinkCluster flinkC } return yarnRestRequest(reqURL, CheckPoints.class); } else if (ExecutionMode.REMOTE.equals(execMode) - || ExecutionMode.YARN_SESSION.equals(execMode)) { + || ExecutionMode.YARN_SESSION.equals(execMode) + || ExecutionMode.LOCAL.equals(execMode)) { if (application.getJobId() != null) { String remoteUrl = flinkCluster.getAddress() + "/" + String.format(flinkUrl, application.getJobId()); diff --git a/streampark-console/streampark-console-webapp/src/enums/flinkEnum.ts b/streampark-console/streampark-console-webapp/src/enums/flinkEnum.ts index 6a0093b942..d8f6c6e4bc 100644 --- a/streampark-console/streampark-console-webapp/src/enums/flinkEnum.ts +++ b/streampark-console/streampark-console-webapp/src/enums/flinkEnum.ts @@ -29,6 +29,8 @@ export enum BuildStateEnum { } /* ExecutionMode */ export enum ExecModeEnum { + /** LOCAL */ + LOCAL = 0, /** remote (standalone) */ REMOTE = 1, /** yarn per-job (deprecated, please use yarn-application mode) */ diff --git a/streampark-console/streampark-console-webapp/src/views/flink/app/data/index.ts b/streampark-console/streampark-console-webapp/src/views/flink/app/data/index.ts index e8f0b71421..6fc19e8300 100644 --- a/streampark-console/streampark-console-webapp/src/views/flink/app/data/index.ts +++ b/streampark-console/streampark-console-webapp/src/views/flink/app/data/index.ts @@ -110,6 +110,7 @@ export const k8sRestExposedType = [ ]; export const executionModes = [ + { label: 'local (for testing only, not recommended for production environment)', value: ExecModeEnum.LOCAL, disabled: false }, { label: 'remote', value: ExecModeEnum.REMOTE, disabled: false }, { label: 'yarn application', value: ExecModeEnum.YARN_APPLICATION, disabled: false }, { label: 'yarn session', value: ExecModeEnum.YARN_SESSION, disabled: false }, diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/LocalClient.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/LocalClient.scala index 93c0d90890..733886a973 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/LocalClient.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/LocalClient.scala @@ -17,18 +17,22 @@ package org.apache.streampark.flink.client.impl -import java.lang.{Integer => JavaInt} +import org.apache.flink.api.common.JobID +import java.lang.{Integer => JavaInt} import org.apache.flink.client.deployment.executors.RemoteExecutor import org.apache.flink.client.program.{ClusterClient, MiniClusterClient, PackagedProgram} import org.apache.flink.client.program.MiniClusterClient.MiniClusterId import org.apache.flink.configuration._ +import org.apache.flink.runtime.jobmaster.JobResult import org.apache.flink.runtime.minicluster.{MiniCluster, MiniClusterConfiguration} - +import org.apache.streampark.common.enums.ExecutionMode import org.apache.streampark.common.util.Utils import org.apache.streampark.flink.client.`trait`.FlinkClientTrait import org.apache.streampark.flink.client.bean._ +import java.util.function.Consumer + object LocalClient extends FlinkClientTrait { override def setConfig(submitRequest: SubmitRequest, flinkConfig: Configuration): Unit = { @@ -44,14 +48,15 @@ object LocalClient extends FlinkClientTrait { override def doSubmit(submitRequest: SubmitRequest, flinkConfig: Configuration): SubmitResponse = { var packageProgram: PackagedProgram = null var client: ClusterClient[MiniClusterId] = null + var jobId: JobID = null try { // build JobGraph val packageProgramJobGraph = super.getJobGraph(flinkConfig, submitRequest, submitRequest.userJarFile) packageProgram = packageProgramJobGraph._1 val jobGraph = packageProgramJobGraph._2 client = createLocalCluster(flinkConfig) - val jobId = client.submitJob(jobGraph).get().toString - SubmitResponse(jobId, flinkConfig.toMap, jobId, client.getWebInterfaceURL) + jobId = client.submitJob(jobGraph).get() + SubmitResponse(jobId.toString, flinkConfig.toMap, jobId.toString, client.getWebInterfaceURL) } catch { case e: Exception => logError(s"submit flink job fail in ${submitRequest.executionMode} mode") @@ -61,16 +66,44 @@ object LocalClient extends FlinkClientTrait { if (submitRequest.safePackageProgram) { Utils.close(packageProgram) } - Utils.close(client) + client.requestJobResult(jobId).thenAccept( + new Consumer[JobResult] { + override def accept(t: JobResult): Unit = { + client.shutDownCluster() + Utils.close(client) + } + } + ); } } override def doTriggerSavepoint(request: TriggerSavepointRequest, flinkConfig: Configuration): SavepointResponse = { - RemoteClient.doTriggerSavepoint(request, flinkConfig) + // This is a workaround, here we use ClusterClient of StandaloneCluster instead of MiniClusterClient. Because there is no good way to + // retrieve MiniClusterClient for specific local job, for multi local job management, then we have to maintain the mapping between local + // job and MiniClusterClient. With the aid of Standalone ClusterClient, we can get rid of this step. + val requestAdapter = TriggerSavepointRequest( + request.flinkVersion, + ExecutionMode.REMOTE, + request.properties, + request.clusterId, + request.jobId, + request.savepointPath, + request.kubernetesNamespace) + RemoteClient.doTriggerSavepoint(requestAdapter, flinkConfig) } override def doCancel(cancelRequest: CancelRequest, flinkConfig: Configuration): CancelResponse = { - RemoteClient.doCancel(cancelRequest, flinkConfig) + val requestAdapter = CancelRequest( + cancelRequest.flinkVersion, + ExecutionMode.REMOTE, + cancelRequest.properties, + cancelRequest.clusterId, + cancelRequest.jobId, + cancelRequest.withSavepoint, + cancelRequest.withDrain, + cancelRequest.savepointPath, + cancelRequest.kubernetesNamespace) + RemoteClient.doCancel(requestAdapter, flinkConfig) } private[this] def createLocalCluster(flinkConfig: Configuration) = {