Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -139,6 +139,12 @@ protected SparkAppDriverConf buildDriverConf(
if (StringUtils.isNotEmpty(applicationSpec.getJars())) {
primaryResource = new JavaMainAppResource(Option.apply(applicationSpec.getJars()));
effectiveSparkConf.setIfMissing("spark.jars", applicationSpec.getJars());
} else if ("org.apache.spark.deploy.PythonRunner".equals(applicationSpec.getMainClass())) {
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I wonder why in the branch below, it doesn't check if the main class is PythonRunner before creating PythonMainAppResource?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, previously, it doesn't care of mainClass because it checks the following only.

} else if (StringUtils.isNotEmpty(applicationSpec.getPyFiles())) {

String[] files = applicationSpec.getPyFiles().split(",", 2);
primaryResource = new PythonMainAppResource(files[0]);
if (files.length > 1 && !files[1].isBlank()) {
effectiveSparkConf.setIfMissing("spark.submit.pyFiles", files[1]);
}
} else if (StringUtils.isNotEmpty(applicationSpec.getPyFiles())) {
primaryResource = new PythonMainAppResource(applicationSpec.getPyFiles());
effectiveSparkConf.setIfMissing("spark.submit.pyFiles", applicationSpec.getPyFiles());
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -129,6 +129,34 @@ void buildDriverConfForPythonApp() {
}
}

@Test
void handlePyFiles() {
Map<SparkAppDriverConf, List<Object>> constructorArgs = new HashMap<>();
try (MockedConstruction<SparkAppDriverConf> mocked =
mockConstruction(
SparkAppDriverConf.class,
(mock, context) -> constructorArgs.put(mock, new ArrayList<>(context.arguments())))) {
SparkApplication mockApp = mock(SparkApplication.class);
ApplicationSpec mockSpec = mock(ApplicationSpec.class);
ObjectMeta appMeta = new ObjectMetaBuilder().withName("app1").withNamespace("ns1").build();
when(mockApp.getSpec()).thenReturn(mockSpec);
when(mockApp.getMetadata()).thenReturn(appMeta);
when(mockSpec.getMainClass()).thenReturn("org.apache.spark.deploy.PythonRunner");
when(mockSpec.getPyFiles()).thenReturn("main.py,lib.py");

SparkAppSubmissionWorker submissionWorker = new SparkAppSubmissionWorker();
SparkAppDriverConf conf = submissionWorker.buildDriverConf(mockApp, Collections.emptyMap());
assertEquals(6, constructorArgs.get(conf).size());
assertEquals(
"lib.py", ((SparkConf) constructorArgs.get(conf).get(0)).get("spark.submit.pyFiles"));

// validate main resources
assertInstanceOf(PythonMainAppResource.class, constructorArgs.get(conf).get(2));
PythonMainAppResource mainResource = (PythonMainAppResource) constructorArgs.get(conf).get(2);
assertEquals("main.py", mainResource.primaryResource());
}
}

@Test
void buildDriverConfForRApp() {
Map<SparkAppDriverConf, List<Object>> constructorArgs = new HashMap<>();
Expand Down
Loading