Skip to content

Commit

Permalink
Bugfix: create the tmp dir if it doesn't exist (#102599) (#102693)
Browse files Browse the repository at this point in the history
* Make sure the tmp dir exists before creating a tmp file

* Correct file permissions

* changelog

* Recreate tmp dir before creating named pipes
  • Loading branch information
jan-elastic committed Nov 28, 2023
1 parent 2f6d264 commit ddfd048
Show file tree
Hide file tree
Showing 7 changed files with 120 additions and 0 deletions.
5 changes: 5 additions & 0 deletions docs/changelog/102599.yaml
@@ -0,0 +1,5 @@
pr: 102599
summary: "Recreate the Elasticsearch private temporary directory if it doesn't exist when an ML job is opened"
area: Machine Learning
type: bug
issues: []
1 change: 1 addition & 0 deletions x-pack/plugin/ml/build.gradle
Expand Up @@ -87,6 +87,7 @@ dependencies {
changing = true
}
testImplementation 'org.ini4j:ini4j:0.5.2'
testImplementation "com.google.jimfs:jimfs:${versions.jimfs}"
}

def mlCppVersion(){
Expand Down
Expand Up @@ -12,6 +12,7 @@
import org.elasticsearch.xcontent.json.JsonXContent;
import org.elasticsearch.xpack.ml.process.NativeController;
import org.elasticsearch.xpack.ml.process.ProcessPipes;
import org.elasticsearch.xpack.ml.utils.FileUtils;

import java.io.IOException;
import java.io.OutputStreamWriter;
Expand Down Expand Up @@ -80,6 +81,7 @@ private List<String> buildAnalyticsCommand() throws IOException {

private void addConfigFile(List<String> command) throws IOException {
Path tempDir = tempDirPathSupplier.get();
FileUtils.recreateTempDirectoryIfNeeded(tempDir);
Path configFile = Files.createTempFile(tempDir, "analysis", ".conf");
filesToDelete.add(configFile);
try (
Expand Down
Expand Up @@ -23,6 +23,7 @@
import org.elasticsearch.xpack.ml.job.process.autodetect.writer.ScheduledEventToRuleWriter;
import org.elasticsearch.xpack.ml.process.NativeController;
import org.elasticsearch.xpack.ml.process.ProcessPipes;
import org.elasticsearch.xpack.ml.utils.FileUtils;

import java.io.BufferedWriter;
import java.io.IOException;
Expand Down Expand Up @@ -208,6 +209,7 @@ public static Path writeNormalizerInitState(String jobId, String state, Environm
// createTempFile has a race condition where it may return the same
// temporary file name to different threads if called simultaneously
// from multiple threads, hence add the thread ID to avoid this
FileUtils.recreateTempDirectoryIfNeeded(env.tmpFile());
Path stateFile = Files.createTempFile(
env.tmpFile(),
jobId + "_quantiles_" + Thread.currentThread().getId(),
Expand All @@ -225,6 +227,7 @@ private void buildScheduledEventsConfig(List<String> command) throws IOException
if (scheduledEvents.isEmpty()) {
return;
}
FileUtils.recreateTempDirectoryIfNeeded(env.tmpFile());
Path eventsConfigFile = Files.createTempFile(env.tmpFile(), "eventsConfig", JSON_EXTENSION);
filesToDelete.add(eventsConfigFile);

Expand All @@ -249,6 +252,7 @@ private void buildScheduledEventsConfig(List<String> command) throws IOException
}

private void buildJobConfig(List<String> command) throws IOException {
FileUtils.recreateTempDirectoryIfNeeded(env.tmpFile());
Path configFile = Files.createTempFile(env.tmpFile(), "config", JSON_EXTENSION);
filesToDelete.add(configFile);
try (
Expand All @@ -267,6 +271,7 @@ private void buildFiltersConfig(List<String> command) throws IOException {
if (referencedFilters.isEmpty()) {
return;
}
FileUtils.recreateTempDirectoryIfNeeded(env.tmpFile());
Path filtersConfigFile = Files.createTempFile(env.tmpFile(), "filtersConfig", JSON_EXTENSION);
filesToDelete.add(filtersConfigFile);

Expand Down
Expand Up @@ -10,11 +10,13 @@
import org.elasticsearch.env.Environment;
import org.elasticsearch.monitor.jvm.JvmInfo;
import org.elasticsearch.xpack.ml.process.logging.CppLogMessageHandler;
import org.elasticsearch.xpack.ml.utils.FileUtils;
import org.elasticsearch.xpack.ml.utils.NamedPipeHelper;

import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
import java.nio.file.Path;
import java.time.Duration;
import java.util.List;
import java.util.Locale;
Expand All @@ -41,6 +43,7 @@ public class ProcessPipes {

private final NamedPipeHelper namedPipeHelper;
private final String jobId;
private final Path tempDir;

/**
* <code>null</code> indicates a pipe won't be used
Expand Down Expand Up @@ -91,6 +94,7 @@ public ProcessPipes(
) {
this.namedPipeHelper = namedPipeHelper;
this.jobId = jobId;
this.tempDir = env.tmpFile();
this.timeout = timeout;

// The way the pipe names are formed MUST match what is done in the controller main()
Expand Down Expand Up @@ -150,6 +154,7 @@ public void addArgs(List<String> command) {
* and this JVM.
*/
public void connectLogStream() throws IOException {
FileUtils.recreateTempDirectoryIfNeeded(tempDir);
logStreamHandler = new CppLogMessageHandler(jobId, namedPipeHelper.openNamedPipeInputStream(logPipeName, timeout));
}

Expand All @@ -162,6 +167,7 @@ public void connectOtherStreams() throws IOException {
if (logStreamHandler == null) {
throw new NullPointerException("Must connect log stream before other streams");
}
FileUtils.recreateTempDirectoryIfNeeded(tempDir);
// The order here is important. It must match the order that the C++ process tries to connect to the pipes, otherwise
// a timeout is guaranteed. Also change api::CIoManager in the C++ code if changing the order here.
try {
Expand Down
@@ -0,0 +1,43 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.ml.utils;

import java.io.IOException;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.attribute.FileAttribute;
import java.nio.file.attribute.PosixFilePermission;
import java.nio.file.attribute.PosixFilePermissions;
import java.util.EnumSet;

/**
* Some utility functions for managing files.
*/
public final class FileUtils {

private FileUtils() {}

private static final FileAttribute<?>[] POSIX_TMP_DIR_PERMISSIONS = new FileAttribute<?>[] {
PosixFilePermissions.asFileAttribute(
EnumSet.of(PosixFilePermission.OWNER_READ, PosixFilePermission.OWNER_WRITE, PosixFilePermission.OWNER_EXECUTE)
) };

/**
* Recreates the Elasticsearch temporary directory if it doesn't exist.
* The operating system may have cleaned it up due to inactivity, which
* causes some (machine learning) processes to fail.
* @param tmpDir the path to the temporary directory
*/
public static void recreateTempDirectoryIfNeeded(Path tmpDir) throws IOException {
if (tmpDir.getFileSystem().supportedFileAttributeViews().contains("posix")) {
Files.createDirectories(tmpDir, POSIX_TMP_DIR_PERMISSIONS);
} else {
Files.createDirectories(tmpDir);
}
}
}
@@ -0,0 +1,58 @@
/*
* Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
* or more contributor license agreements. Licensed under the Elastic License
* 2.0; you may not use this file except in compliance with the Elastic License
* 2.0.
*/

package org.elasticsearch.xpack.ml.utils;

import com.google.common.jimfs.Configuration;
import com.google.common.jimfs.Jimfs;

import org.elasticsearch.core.PathUtilsForTesting;
import org.elasticsearch.test.ESTestCase;

import java.io.IOException;
import java.nio.file.FileSystem;
import java.nio.file.Files;
import java.nio.file.Path;
import java.nio.file.attribute.BasicFileAttributes;
import java.nio.file.attribute.PosixFileAttributes;
import java.nio.file.attribute.PosixFilePermission;
import java.util.Set;

public class FileUtilsTests extends ESTestCase {

public void test_recreateTempDirectoryIfNeeded_forWindows() throws IOException {
FileSystem fileSystem = Jimfs.newFileSystem(Configuration.windows());
PathUtilsForTesting.installMock(fileSystem);

Path tmpDir = fileSystem.getPath("c:\\tmp\\elasticsearch");

assertFalse(Files.exists(tmpDir));
FileUtils.recreateTempDirectoryIfNeeded(tmpDir);
assertTrue(Files.exists(tmpDir));

BasicFileAttributes attributes = Files.readAttributes(tmpDir, BasicFileAttributes.class);
assertTrue(attributes.isDirectory());
}

public void test_recreateTempDirectoryIfNeeded_forPosix() throws IOException {
FileSystem fileSystem = Jimfs.newFileSystem(Configuration.unix().toBuilder().setAttributeViews("posix").build());
PathUtilsForTesting.installMock(fileSystem);

Path tmpDir = fileSystem.getPath("/tmp/elasticsearch-1234567890");

assertFalse(Files.exists(tmpDir));
FileUtils.recreateTempDirectoryIfNeeded(tmpDir);
assertTrue(Files.exists(tmpDir));

PosixFileAttributes attributes = Files.readAttributes(tmpDir, PosixFileAttributes.class);
assertTrue(attributes.isDirectory());
assertEquals(
attributes.permissions(),
Set.of(PosixFilePermission.OWNER_READ, PosixFilePermission.OWNER_WRITE, PosixFilePermission.OWNER_EXECUTE)
);
}
}

0 comments on commit ddfd048

Please sign in to comment.