diff --git a/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorker.java b/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorker.java index 7c5fb98c..04beda0c 100644 --- a/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorker.java +++ b/spark-submission-worker/src/main/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorker.java @@ -139,6 +139,12 @@ protected SparkAppDriverConf buildDriverConf( if (StringUtils.isNotEmpty(applicationSpec.getJars())) { primaryResource = new JavaMainAppResource(Option.apply(applicationSpec.getJars())); effectiveSparkConf.setIfMissing("spark.jars", applicationSpec.getJars()); + } else if ("org.apache.spark.deploy.PythonRunner".equals(applicationSpec.getMainClass())) { + String[] files = applicationSpec.getPyFiles().split(",", 2); + primaryResource = new PythonMainAppResource(files[0]); + if (files.length > 1 && !files[1].isBlank()) { + effectiveSparkConf.setIfMissing("spark.submit.pyFiles", files[1]); + } } else if (StringUtils.isNotEmpty(applicationSpec.getPyFiles())) { primaryResource = new PythonMainAppResource(applicationSpec.getPyFiles()); effectiveSparkConf.setIfMissing("spark.submit.pyFiles", applicationSpec.getPyFiles()); diff --git a/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorkerTest.java b/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorkerTest.java index 19c67870..4376b28e 100644 --- a/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorkerTest.java +++ b/spark-submission-worker/src/test/java/org/apache/spark/k8s/operator/SparkAppSubmissionWorkerTest.java @@ -129,6 +129,34 @@ void buildDriverConfForPythonApp() { } } + @Test + void handlePyFiles() { + Map> constructorArgs = new HashMap<>(); + try (MockedConstruction mocked = + mockConstruction( + SparkAppDriverConf.class, + (mock, context) -> constructorArgs.put(mock, new ArrayList<>(context.arguments())))) { + SparkApplication mockApp = mock(SparkApplication.class); + ApplicationSpec mockSpec = mock(ApplicationSpec.class); + ObjectMeta appMeta = new ObjectMetaBuilder().withName("app1").withNamespace("ns1").build(); + when(mockApp.getSpec()).thenReturn(mockSpec); + when(mockApp.getMetadata()).thenReturn(appMeta); + when(mockSpec.getMainClass()).thenReturn("org.apache.spark.deploy.PythonRunner"); + when(mockSpec.getPyFiles()).thenReturn("main.py,lib.py"); + + SparkAppSubmissionWorker submissionWorker = new SparkAppSubmissionWorker(); + SparkAppDriverConf conf = submissionWorker.buildDriverConf(mockApp, Collections.emptyMap()); + assertEquals(6, constructorArgs.get(conf).size()); + assertEquals( + "lib.py", ((SparkConf) constructorArgs.get(conf).get(0)).get("spark.submit.pyFiles")); + + // validate main resources + assertInstanceOf(PythonMainAppResource.class, constructorArgs.get(conf).get(2)); + PythonMainAppResource mainResource = (PythonMainAppResource) constructorArgs.get(conf).get(2); + assertEquals("main.py", mainResource.primaryResource()); + } + } + @Test void buildDriverConfForRApp() { Map> constructorArgs = new HashMap<>();