Skip to content

Commit

Permalink
[FLINK-34616][python] Fix python dist dir doesn't clean when open met…
Browse files Browse the repository at this point in the history
…hod construct resource has exception.

This closes #24462.
  • Loading branch information
liuyongvs committed Mar 8, 2024
1 parent 3959289 commit 21306f4
Showing 1 changed file with 37 additions and 26 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -123,9 +123,16 @@ public void open() throws Exception {
"Could not create the base directory: " + baseDirectory);
}

Map<String, String> env = constructEnvironmentVariables(baseDirectory);
installRequirements(baseDirectory, env);
return Tuple2.of(baseDirectory, env);
try {
Map<String, String> env =
constructEnvironmentVariables(baseDirectory);
installRequirements(baseDirectory, env);
return Tuple2.of(baseDirectory, env);
} catch (Throwable e) {
deleteBaseDirectory(baseDirectory);
LOG.warn("Failed to create resource.", e);
throw e;
}
});
shutdownHook =
ShutdownHookUtil.addShutdownHook(
Expand Down Expand Up @@ -213,6 +220,32 @@ private static String createBaseDirectory(String[] tmpDirectories) throws IOExce
+ "' for storing the generated files of python dependency.");
}

private static void deleteBaseDirectory(String baseDirectory) {
int retries = 0;
while (true) {
try {
FileUtils.deleteDirectory(new File(baseDirectory));
break;
} catch (Throwable t) {
retries++;
if (retries <= CHECK_TIMEOUT / CHECK_INTERVAL) {
LOG.warn(
String.format(
"Failed to delete the working directory %s of the Python UDF worker. Retrying...",
baseDirectory),
t);
} else {
LOG.warn(
String.format(
"Failed to delete the working directory %s of the Python UDF worker.",
baseDirectory),
t);
break;
}
}
}
}

private void installRequirements(String baseDirectory, Map<String, String> env)
throws IOException {
// Directory for storing the installation result of the requirements file.
Expand Down Expand Up @@ -475,29 +508,7 @@ void decRef() {

@Override
public void close() throws Exception {
int retries = 0;
while (true) {
try {
FileUtils.deleteDirectory(new File(baseDirectory));
break;
} catch (Throwable t) {
retries++;
if (retries <= CHECK_TIMEOUT / CHECK_INTERVAL) {
LOG.warn(
String.format(
"Failed to delete the working directory %s of the Python UDF worker. Retrying...",
baseDirectory),
t);
} else {
LOG.warn(
String.format(
"Failed to delete the working directory %s of the Python UDF worker.",
baseDirectory),
t);
break;
}
}
}
deleteBaseDirectory(baseDirectory);
}
}
}

0 comments on commit 21306f4

Please sign in to comment.