From 8eded2f333e80f7d9ef5adaada7a3323774b8869 Mon Sep 17 00:00:00 2001 From: Chesnay Schepler Date: Wed, 27 Jan 2021 13:15:08 +0100 Subject: [PATCH 1/5] [FLINK-21164][rest] Delete temporary jars --- .../webmonitor/handlers/JarPlanHandler.java | 16 ++++++++++++++-- .../webmonitor/handlers/JarRunHandler.java | 5 +++++ .../handlers/utils/JarHandlerUtils.java | 6 ++++-- 3 files changed, 23 insertions(+), 4 deletions(-) diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java index d7cd6fb717f44..a80ec5f8edfb5 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java @@ -19,6 +19,7 @@ package org.apache.flink.runtime.webmonitor.handlers; import org.apache.flink.api.common.time.Time; +import org.apache.flink.client.program.PackagedProgram; import org.apache.flink.configuration.Configuration; import org.apache.flink.runtime.jobgraph.JobGraph; import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator; @@ -100,8 +101,19 @@ protected CompletableFuture handleRequest( return CompletableFuture.supplyAsync( () -> { - final JobGraph jobGraph = context.toJobGraph(configuration, true); - return planGenerator.apply(jobGraph); + final PackagedProgram packagedProgram = + context.toPackagedProgram(configuration); + try { + final JobGraph jobGraph = + context.toJobGraph(packagedProgram, configuration, true); + return planGenerator.apply(jobGraph); + } finally { + try { + packagedProgram.deleteExtractedLibraries(); + } catch (Exception e) { + log.debug("Error while deleting jars extracted from user-jar."); + } + } }, executor); } diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java index 06351c2e64aac..e1e41ce7b2002 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java @@ -103,6 +103,11 @@ protected CompletableFuture handleRequest( executor) .handle( (jobIds, throwable) -> { + try { + program.deleteExtractedLibraries(); + } catch (Exception e) { + log.debug("Error while deleting jars extracted from user-jar."); + } if (throwable != null) { throw new CompletionException( new RestHandlerException( diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/utils/JarHandlerUtils.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/utils/JarHandlerUtils.java index ffcac3157e936..84e69a3b02684 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/utils/JarHandlerUtils.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/utils/JarHandlerUtils.java @@ -151,9 +151,11 @@ public void applyToConfiguration(final Configuration configuration) { URL::toString); } - public JobGraph toJobGraph(Configuration configuration, boolean suppressOutput) { + public JobGraph toJobGraph( + PackagedProgram packagedProgram, + Configuration configuration, + boolean suppressOutput) { try { - final PackagedProgram packagedProgram = toPackagedProgram(configuration); return PackagedProgramUtils.createJobGraph( packagedProgram, configuration, parallelism, jobId, suppressOutput); } catch (final ProgramInvocationException e) { From b1c458c3b35787114ab2393bacfde13b8c779cd4 Mon Sep 17 00:00:00 2001 From: Chesnay Schepler Date: Fri, 29 Jan 2021 14:29:01 +0100 Subject: [PATCH 2/5] Update flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java Co-authored-by: Till Rohrmann --- .../apache/flink/runtime/webmonitor/handlers/JarRunHandler.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java index e1e41ce7b2002..6f56878728558 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java @@ -106,7 +106,7 @@ protected CompletableFuture handleRequest( try { program.deleteExtractedLibraries(); } catch (Exception e) { - log.debug("Error while deleting jars extracted from user-jar."); + log.debug("Error while deleting jars extracted from user-jar.", e); } if (throwable != null) { throw new CompletionException( From e5bef5265a9416b01554377cac0a912179c9ca60 Mon Sep 17 00:00:00 2001 From: Chesnay Schepler Date: Fri, 29 Jan 2021 14:29:06 +0100 Subject: [PATCH 3/5] Update flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java Co-authored-by: Till Rohrmann --- .../flink/runtime/webmonitor/handlers/JarPlanHandler.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java index a80ec5f8edfb5..24bcdf5e55416 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java @@ -111,7 +111,7 @@ protected CompletableFuture handleRequest( try { packagedProgram.deleteExtractedLibraries(); } catch (Exception e) { - log.debug("Error while deleting jars extracted from user-jar."); + log.debug("Error while deleting jars extracted from user-jar.", e); } } }, From ce3c983720499212e587319cd6fdef3ccc6b97b5 Mon Sep 17 00:00:00 2001 From: Chesnay Schepler Date: Fri, 29 Jan 2021 14:43:52 +0100 Subject: [PATCH 4/5] Add PackagedProgram#close() --- .../flink/client/program/PackagedProgram.java | 16 +++++++++++++- .../webmonitor/handlers/JarListHandler.java | 22 ++++++++----------- .../webmonitor/handlers/JarPlanHandler.java | 11 ++-------- .../webmonitor/handlers/JarRunHandler.java | 6 +---- 4 files changed, 27 insertions(+), 28 deletions(-) diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java index 006d136a4c68a..d3486e72a1f72 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java @@ -26,6 +26,9 @@ import org.apache.flink.util.InstantiationUtil; import org.apache.flink.util.JarUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + import javax.annotation.Nullable; import java.io.BufferedInputStream; @@ -58,7 +61,9 @@ * This class encapsulates represents a program, packaged in a jar file. It supplies functionality * to extract nested libraries, search for the program entry point, and extract a program plan. */ -public class PackagedProgram { +public class PackagedProgram implements AutoCloseable { + + private static final Logger LOG = LoggerFactory.getLogger(PackagedProgram.class); /** * Property name of the entry in JAR manifest file that describes the Flink specific entry @@ -617,6 +622,15 @@ private static void checkJarFile(URL jarfile) throws ProgramInvocationException } } + @Override + public void close() { + try { + deleteExtractedLibraries(); + } catch (Exception e) { + LOG.debug("Error while deleting jars extracted from user-jar.", e); + } + } + /** A Builder For {@link PackagedProgram}. */ public static class Builder { diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java index fafe6168f6dea..27ef7f40736c7 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarListHandler.java @@ -145,23 +145,19 @@ protected CompletableFuture handleRequest( for (String clazz : classes) { clazz = clazz.trim(); - PackagedProgram program = null; - try { - program = - PackagedProgram.newBuilder() - .setJarFile(f) - .setEntryPointClassName(clazz) - .setConfiguration(configuration) - .build(); - } catch (Exception ignored) { - // ignore jar files which throw an error upon creating a - // PackagedProgram - } - if (program != null) { + try (PackagedProgram program = + PackagedProgram.newBuilder() + .setJarFile(f) + .setEntryPointClassName(clazz) + .setConfiguration(configuration) + .build()) { JarListInfo.JarEntryInfo jarEntryInfo = new JarListInfo.JarEntryInfo( clazz, program.getDescription()); jarEntryList.add(jarEntryInfo); + } catch (Exception ignored) { + // ignore jar files which throw an error upon creating a + // PackagedProgram } } diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java index 24bcdf5e55416..4690792ba808c 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarPlanHandler.java @@ -101,18 +101,11 @@ protected CompletableFuture handleRequest( return CompletableFuture.supplyAsync( () -> { - final PackagedProgram packagedProgram = - context.toPackagedProgram(configuration); - try { + try (PackagedProgram packagedProgram = + context.toPackagedProgram(configuration)) { final JobGraph jobGraph = context.toJobGraph(packagedProgram, configuration, true); return planGenerator.apply(jobGraph); - } finally { - try { - packagedProgram.deleteExtractedLibraries(); - } catch (Exception e) { - log.debug("Error while deleting jars extracted from user-jar.", e); - } } }, executor); diff --git a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java index 6f56878728558..457330004e1e4 100644 --- a/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java +++ b/flink-runtime-web/src/main/java/org/apache/flink/runtime/webmonitor/handlers/JarRunHandler.java @@ -103,11 +103,7 @@ protected CompletableFuture handleRequest( executor) .handle( (jobIds, throwable) -> { - try { - program.deleteExtractedLibraries(); - } catch (Exception e) { - log.debug("Error while deleting jars extracted from user-jar.", e); - } + program.close(); if (throwable != null) { throw new CompletionException( new RestHandlerException( From c833d995b48a47ca6a8af7ae7c5eae5fd9d2c7ba Mon Sep 17 00:00:00 2001 From: Chesnay Schepler Date: Fri, 29 Jan 2021 14:54:02 +0100 Subject: [PATCH 5/5] migrate CliFrontend to PackagedProgram#close --- .../java/org/apache/flink/client/cli/CliFrontend.java | 8 ++------ .../org/apache/flink/client/program/PackagedProgram.java | 2 +- 2 files changed, 3 insertions(+), 7 deletions(-) diff --git a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java index a489219eeb91e..c5ff839b414c8 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java +++ b/flink-clients/src/main/java/org/apache/flink/client/cli/CliFrontend.java @@ -242,12 +242,8 @@ protected void run(String[] args) throws Exception { LOG.debug("Effective executor configuration: {}", effectiveConfiguration); - final PackagedProgram program = getPackagedProgram(programOptions, effectiveConfiguration); - - try { + try (PackagedProgram program = getPackagedProgram(programOptions, effectiveConfiguration)) { executeProgram(effectiveConfiguration, program); - } finally { - program.deleteExtractedLibraries(); } } @@ -387,7 +383,7 @@ protected void info(String[] args) throws Exception { } } finally { if (program != null) { - program.deleteExtractedLibraries(); + program.close(); } } } diff --git a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java index d3486e72a1f72..051a56d2465e8 100644 --- a/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java +++ b/flink-clients/src/main/java/org/apache/flink/client/program/PackagedProgram.java @@ -295,7 +295,7 @@ public static List getJobJarAndDependencies( } /** Deletes all temporary files created for contained packaged libraries. */ - public void deleteExtractedLibraries() { + private void deleteExtractedLibraries() { deleteExtractedLibraries(this.extractedTempLibraries); this.extractedTempLibraries.clear(); }