From 44d2bbf4e618bb9681bd5118bdcfba5f01503616 Mon Sep 17 00:00:00 2001 From: ChengJie1053 <18033291053@163.com> Date: Sat, 4 Nov 2023 22:59:02 +0800 Subject: [PATCH] [Feature] Pyflink remote mode support maven dependency (#3311) * [Feature] Pyflink remote mode support maven dependency --- .../flink/client/trait/FlinkClientTrait.scala | 13 +++++++++---- 1 file changed, 9 insertions(+), 4 deletions(-) diff --git a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala index 5025a92b28..435d882c73 100644 --- a/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala +++ b/streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala @@ -22,7 +22,7 @@ import org.apache.streampark.common.conf.ConfigKeys._ import org.apache.streampark.common.conf.Workspace import org.apache.streampark.common.enums.{ApplicationType, FlinkDevelopmentMode, FlinkExecutionMode, FlinkRestoreMode} import org.apache.streampark.common.fs.FsOperator -import org.apache.streampark.common.util.{DeflaterUtils, ExceptionUtils, FileUtils, Logger, PropertiesUtils, SystemPropertyUtils, Utils} +import org.apache.streampark.common.util._ import org.apache.streampark.flink.client.bean._ import org.apache.streampark.flink.core.FlinkClusterClient import org.apache.streampark.flink.core.conf.FlinkRunOption @@ -42,11 +42,10 @@ import org.apache.flink.runtime.jobgraph.{JobGraph, SavepointConfigOptions} import org.apache.flink.util.FlinkException import org.apache.flink.util.Preconditions.checkNotNull +import java.io.File import java.util import java.util.{Collections, List => JavaList, Map => JavaMap} -import scala.annotation.tailrec -import scala.collection.JavaConverters._ import scala.collection.convert.ImplicitConversions._ import scala.collection.mutable import scala.collection.mutable.ArrayBuffer @@ -277,7 +276,13 @@ trait FlinkClientTrait extends Logger { throw new RuntimeException(s"$pythonVenv File does not exist") } // including $app/lib - includingPipelineJars(submitRequest, flinkConfig) + val localLib: String = s"${Workspace.local.APP_WORKSPACE}/${submitRequest.id}/lib" + if (FileUtils.exists(localLib) && FileUtils.directoryNotBlank(localLib)) { + val localLibUrl = new File(localLib).listFiles().map(_.toURI.toURL).toList + pkgBuilder.setUserClassPaths( + Lists.newArrayList(localLibUrl: _*) + ) + } flinkConfig // python.archives .safeSet(PythonOptions.PYTHON_ARCHIVES, pythonVenv)