Skip to content

Commit

Permalink
[FLINK-21164][rest] Delete temporary jars
Browse files Browse the repository at this point in the history
  • Loading branch information
zentol authored Jan 29, 2021
1 parent dd0ee24 commit 30f6964
Show file tree
Hide file tree
Showing 6 changed files with 39 additions and 25 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -242,12 +242,8 @@ protected void run(String[] args) throws Exception {

LOG.debug("Effective executor configuration: {}", effectiveConfiguration);

final PackagedProgram program = getPackagedProgram(programOptions, effectiveConfiguration);

try {
try (PackagedProgram program = getPackagedProgram(programOptions, effectiveConfiguration)) {
executeProgram(effectiveConfiguration, program);
} finally {
program.deleteExtractedLibraries();
}
}

Expand Down Expand Up @@ -387,7 +383,7 @@ protected void info(String[] args) throws Exception {
}
} finally {
if (program != null) {
program.deleteExtractedLibraries();
program.close();
}
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,9 @@
import org.apache.flink.util.InstantiationUtil;
import org.apache.flink.util.JarUtils;

import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import javax.annotation.Nullable;

import java.io.BufferedInputStream;
Expand Down Expand Up @@ -58,7 +61,9 @@
* This class encapsulates represents a program, packaged in a jar file. It supplies functionality
* to extract nested libraries, search for the program entry point, and extract a program plan.
*/
public class PackagedProgram {
public class PackagedProgram implements AutoCloseable {

private static final Logger LOG = LoggerFactory.getLogger(PackagedProgram.class);

/**
* Property name of the entry in JAR manifest file that describes the Flink specific entry
Expand Down Expand Up @@ -290,7 +295,7 @@ public static List<URL> getJobJarAndDependencies(
}

/** Deletes all temporary files created for contained packaged libraries. */
public void deleteExtractedLibraries() {
private void deleteExtractedLibraries() {
deleteExtractedLibraries(this.extractedTempLibraries);
this.extractedTempLibraries.clear();
}
Expand Down Expand Up @@ -617,6 +622,15 @@ private static void checkJarFile(URL jarfile) throws ProgramInvocationException
}
}

@Override
public void close() {
try {
deleteExtractedLibraries();
} catch (Exception e) {
LOG.debug("Error while deleting jars extracted from user-jar.", e);
}
}

/** A Builder For {@link PackagedProgram}. */
public static class Builder {

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -145,23 +145,19 @@ protected CompletableFuture<JarListInfo> handleRequest(
for (String clazz : classes) {
clazz = clazz.trim();

PackagedProgram program = null;
try {
program =
PackagedProgram.newBuilder()
.setJarFile(f)
.setEntryPointClassName(clazz)
.setConfiguration(configuration)
.build();
} catch (Exception ignored) {
// ignore jar files which throw an error upon creating a
// PackagedProgram
}
if (program != null) {
try (PackagedProgram program =
PackagedProgram.newBuilder()
.setJarFile(f)
.setEntryPointClassName(clazz)
.setConfiguration(configuration)
.build()) {
JarListInfo.JarEntryInfo jarEntryInfo =
new JarListInfo.JarEntryInfo(
clazz, program.getDescription());
jarEntryList.add(jarEntryInfo);
} catch (Exception ignored) {
// ignore jar files which throw an error upon creating a
// PackagedProgram
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -19,6 +19,7 @@
package org.apache.flink.runtime.webmonitor.handlers;

import org.apache.flink.api.common.time.Time;
import org.apache.flink.client.program.PackagedProgram;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.runtime.jobgraph.JobGraph;
import org.apache.flink.runtime.jobgraph.jsonplan.JsonPlanGenerator;
Expand Down Expand Up @@ -100,8 +101,12 @@ protected CompletableFuture<JobPlanInfo> handleRequest(

return CompletableFuture.supplyAsync(
() -> {
final JobGraph jobGraph = context.toJobGraph(configuration, true);
return planGenerator.apply(jobGraph);
try (PackagedProgram packagedProgram =
context.toPackagedProgram(configuration)) {
final JobGraph jobGraph =
context.toJobGraph(packagedProgram, configuration, true);
return planGenerator.apply(jobGraph);
}
},
executor);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -103,6 +103,7 @@ protected CompletableFuture<JarRunResponseBody> handleRequest(
executor)
.handle(
(jobIds, throwable) -> {
program.close();
if (throwable != null) {
throw new CompletionException(
new RestHandlerException(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -151,9 +151,11 @@ public void applyToConfiguration(final Configuration configuration) {
URL::toString);
}

public JobGraph toJobGraph(Configuration configuration, boolean suppressOutput) {
public JobGraph toJobGraph(
PackagedProgram packagedProgram,
Configuration configuration,
boolean suppressOutput) {
try {
final PackagedProgram packagedProgram = toPackagedProgram(configuration);
return PackagedProgramUtils.createJobGraph(
packagedProgram, configuration, parallelism, jobId, suppressOutput);
} catch (final ProgramInvocationException e) {
Expand Down

0 comments on commit 30f6964

Please sign in to comment.