Skip to content

Commit

Permalink
Add PackagedProgram#close()
Browse files Browse the repository at this point in the history
  • Loading branch information
zentol committed Jan 29, 2021
1 parent e5bef52 commit ce3c983
Show file tree
Hide file tree
Showing 4 changed files with 27 additions and 28 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.JarUtils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.io.BufferedInputStream;
Expand Down Expand Up @@ -58,7 +61,9 @@
* This class encapsulates represents a program, packaged in a jar file. It supplies functionality
* to extract nested libraries, search for the program entry point, and extract a program plan.
*/
public class PackagedProgram {
public class PackagedProgram implements AutoCloseable {

private static final Logger LOG = LoggerFactory.getLogger(PackagedProgram.class);

/**
* Property name of the entry in JAR manifest file that describes the Flink specific entry
Expand Down Expand Up @@ -617,6 +622,15 @@ private static void checkJarFile(URL jarfile) throws ProgramInvocationException
}
}

@Override
public void close() {
try {
deleteExtractedLibraries();
} catch (Exception e) {
LOG.debug("Error while deleting jars extracted from user-jar.", e);
}
}

/** A Builder For {@link PackagedProgram}. */
public static class Builder {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,23 +145,19 @@ protected CompletableFuture<JarListInfo> handleRequest(
for (String clazz : classes) {
clazz = clazz.trim();

PackagedProgram program = null;
try {
program =
PackagedProgram.newBuilder()
.setJarFile(f)
.setEntryPointClassName(clazz)
.setConfiguration(configuration)
.build();
} catch (Exception ignored) {
// ignore jar files which throw an error upon creating a
// PackagedProgram
}
if (program != null) {
try (PackagedProgram program =
PackagedProgram.newBuilder()
.setJarFile(f)
.setEntryPointClassName(clazz)
.setConfiguration(configuration)
.build()) {
JarListInfo.JarEntryInfo jarEntryInfo =
new JarListInfo.JarEntryInfo(
clazz, program.getDescription());
jarEntryList.add(jarEntryInfo);
} catch (Exception ignored) {
// ignore jar files which throw an error upon creating a
// PackagedProgram
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -101,18 +101,11 @@ protected CompletableFuture<JobPlanInfo> handleRequest(

return CompletableFuture.supplyAsync(
() -> {
final PackagedProgram packagedProgram =
context.toPackagedProgram(configuration);
try {
try (PackagedProgram packagedProgram =
context.toPackagedProgram(configuration)) {
final JobGraph jobGraph =
context.toJobGraph(packagedProgram, configuration, true);
return planGenerator.apply(jobGraph);
} finally {
try {
packagedProgram.deleteExtractedLibraries();
} catch (Exception e) {
log.debug("Error while deleting jars extracted from user-jar.", e);
}
}
},
executor);
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,11 +103,7 @@ protected CompletableFuture<JarRunResponseBody> handleRequest(
executor)
.handle(
(jobIds, throwable) -> {
try {
program.deleteExtractedLibraries();
} catch (Exception e) {
log.debug("Error while deleting jars extracted from user-jar.", e);
}
program.close();
if (throwable != null) {
throw new CompletionException(
new RestHandlerException(
Expand Down

0 comments on commit ce3c983

Please sign in to comment.