diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java index a489219eeb91e..c5ff839b414c8 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java @@ -242,12 +242,8 @@ protected void run(String[] args) throws Exception { LOG.debug("Effective executor configuration: {}", effectiveConfiguration); - final PackagedProgram program = getPackagedProgram(programOptions, effectiveConfiguration); - - try { + try (PackagedProgram program = getPackagedProgram(programOptions, effectiveConfiguration)) { executeProgram(effectiveConfiguration, program); - } finally { - program.deleteExtractedLibraries(); } } @@ -387,7 +383,7 @@ protected void info(String[] args) throws Exception { } } finally { if (program != null) { - program.deleteExtractedLibraries(); + program.close(); } } } diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java index 006d136a4c68a..051a56d2465e8 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java @@ -26,6 +26,9 @@ import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.JarUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import javax.annotation.Nullable; import java.io.BufferedInputStream; @@ -58,7 +61,9 @@ * This class encapsulates represents a program, packaged in a jar file. It supplies functionality * to extract nested libraries, search for the program entry point, and extract a program plan. */ -public class PackagedProgram { +public class PackagedProgram implements AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(PackagedProgram.class); /** * Property name of the entry in JAR manifest file that describes the Flink specific entry @@ -290,7 +295,7 @@ public static List getJobJarAndDependencies( } /** Deletes all temporary files created for contained packaged libraries. */ - public void deleteExtractedLibraries() { + private void deleteExtractedLibraries() { deleteExtractedLibraries(this.extractedTempLibraries); this.extractedTempLibraries.clear(); } @@ -617,6 +622,15 @@ private static void checkJarFile(URL jarfile) throws ProgramInvocationException } } + @Override + public void close() { + try { + deleteExtractedLibraries(); + } catch (Exception e) { + LOG.debug("Error while deleting jars extracted from user-jar.", e); + } + } + /** A Builder For {@link PackagedProgram}. */ public static class Builder { diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java index fafe6168f6dea..27ef7f40736c7 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java @@ -145,23 +145,19 @@ protected CompletableFuture handleRequest( for (String clazz : classes) { clazz = clazz.trim(); - PackagedProgram program = null; - try { - program = - PackagedProgram.newBuilder() - .setJarFile(f) - .setEntryPointClassName(clazz) - .setConfiguration(configuration) - .build(); - } catch (Exception ignored) { - // ignore jar files which throw an error upon creating a - // PackagedProgram - } - if (program != null) { + try (PackagedProgram program = + PackagedProgram.newBuilder() + .setJarFile(f) + .setEntryPointClassName(clazz) + .setConfiguration(configuration) + .build()) { JarListInfo.JarEntryInfo jarEntryInfo = new JarListInfo.JarEntryInfo( clazz, program.getDescription()); jarEntryList.add(jarEntryInfo); + } catch (Exception ignored) { + // ignore jar files which throw an error upon creating a + // PackagedProgram } } diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java index d7cd6fb717f44..4690792ba808c 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.webmonitor.handlers; import org.apache.flink.api.common.time.Time; +import org.apache.flink.client.program.PackagedProgram; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator; @@ -100,8 +101,12 @@ protected CompletableFuture handleRequest( return CompletableFuture.supplyAsync( () -> { - final JobGraph jobGraph = context.toJobGraph(configuration, true); - return planGenerator.apply(jobGraph); + try (PackagedProgram packagedProgram = + context.toPackagedProgram(configuration)) { + final JobGraph jobGraph = + context.toJobGraph(packagedProgram, configuration, true); + return planGenerator.apply(jobGraph); + } }, executor); } diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java index 06351c2e64aac..457330004e1e4 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java @@ -103,6 +103,7 @@ protected CompletableFuture handleRequest( executor) .handle( (jobIds, throwable) -> { + program.close(); if (throwable != null) { throw new CompletionException( new RestHandlerException( diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/utils/JarHandlerUtils.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/utils/JarHandlerUtils.java index ffcac3157e936..84e69a3b02684 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/utils/JarHandlerUtils.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/utils/JarHandlerUtils.java @@ -151,9 +151,11 @@ public void applyToConfiguration(final Configuration configuration) { URL::toString); } - public JobGraph toJobGraph(Configuration configuration, boolean suppressOutput) { + public JobGraph toJobGraph( + PackagedProgram packagedProgram, + Configuration configuration, + boolean suppressOutput) { try { - final PackagedProgram packagedProgram = toPackagedProgram(configuration); return PackagedProgramUtils.createJobGraph( packagedProgram, configuration, parallelism, jobId, suppressOutput); } catch (final ProgramInvocationException e) {