Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[llvm] Move the per-benchmark scratch directory into the service working directory #672

Merged
merged 7 commits into from
May 12, 2022
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion compiler_gym/envs/llvm/datasets/cbench.py
Original file line number Diff line number Diff line change
Expand Up @@ -288,7 +288,7 @@ def validator_cb(env: "LlvmEnv") -> Optional[ValidationError]: # noqa: F821
raise FileNotFoundError(f"Required benchmark input not found: {path}")

# Create a temporary working directory to execute the benchmark in.
with tempfile.TemporaryDirectory(dir=env.service.connection.working_dir) as d:
with tempfile.TemporaryDirectory(dir=env.service.connection.cache.path) as d:
cwd = Path(d)

# Expand shell variable substitutions in the benchmark command.
Expand Down
1 change: 0 additions & 1 deletion compiler_gym/envs/llvm/service/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -96,7 +96,6 @@ cc_library(
":Cost",
"//compiler_gym/service/proto:compiler_gym_service_cc",
"//compiler_gym/util:GrpcStatusMacros",
"//compiler_gym/util:RunfilesPath",
"//compiler_gym/util:Subprocess",
"@boost//:filesystem",
"@com_github_grpc_grpc//:grpc++",
Expand Down
19 changes: 13 additions & 6 deletions compiler_gym/envs/llvm/service/Benchmark.cc
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,6 @@
#include <thread>

#include "compiler_gym/util/GrpcStatusMacros.h"
#include "compiler_gym/util/RunfilesPath.h"
#include "compiler_gym/util/Subprocess.h"
#include "llvm/ADT/SmallVector.h"
#include "llvm/Bitcode/BitcodeReader.h"
Expand Down Expand Up @@ -56,9 +55,17 @@ std::unique_ptr<llvm::Module> makeModuleOrDie(llvm::LLVMContext& context, const

} // anonymous namespace

fs::path createBenchmarkScratchDirectoryOrDie() {
const fs::path cacheRoot = util::getCacheRootPath();
const fs::path dir = fs::unique_path(cacheRoot / "benchmark-scratch-%%%%-%%%%");
fs::path createBenchmarkScratchDirectoryOrDie(const fs::path& workingDirectory) {
// This takes advantage of the fact that
// compiler_gym.service.service_cache.ServiceCache in the Python API creates a
// subdirectory called "disk" that is meant to be on a "traditional"
// filesystem, as opposed to the in-memory filesystem used for the parent
// cache, if supported. We want to use a traditional filesystem so that
// executables can be run, as some Linux distros prevent execution of
// files in in-memory filesystems.
//
// See: github.com/facebookresearch/CompilerGym/issues/465
const fs::path dir = fs::unique_path(workingDirectory / "disk" / "b" / "%%%%-%%%%");

sys::error_code ec;
fs::create_directories(dir, ec);
Expand Down Expand Up @@ -138,7 +145,7 @@ Benchmark::Benchmark(const std::string& name, const Bitcode& bitcode,
const fs::path& workingDirectory, const BaselineCosts& baselineCosts)
: context_(std::make_unique<llvm::LLVMContext>()),
module_(makeModuleOrDie(*context_, bitcode, name)),
scratchDirectory_(createBenchmarkScratchDirectoryOrDie()),
scratchDirectory_(createBenchmarkScratchDirectoryOrDie(workingDirectory)),
dynamicConfigProto_(dynamicConfig),
dynamicConfig_(realizeDynamicConfig(dynamicConfig, scratchDirectory_)),
baselineCosts_(baselineCosts),
Expand All @@ -154,7 +161,7 @@ Benchmark::Benchmark(const std::string& name, std::unique_ptr<llvm::LLVMContext>
const fs::path& workingDirectory, const BaselineCosts& baselineCosts)
: context_(std::move(context)),
module_(std::move(module)),
scratchDirectory_(createBenchmarkScratchDirectoryOrDie()),
scratchDirectory_(createBenchmarkScratchDirectoryOrDie(workingDirectory)),
dynamicConfigProto_(dynamicConfig),
dynamicConfig_(realizeDynamicConfig(dynamicConfig, scratchDirectory_)),
baselineCosts_(baselineCosts),
Expand Down
3 changes: 2 additions & 1 deletion compiler_gym/envs/llvm/service/Benchmark.h
Original file line number Diff line number Diff line change
Expand Up @@ -284,6 +284,7 @@ class Benchmark {
*
* @return fs::path A path.
*/
boost::filesystem::path createBenchmarkScratchDirectoryOrDie();
boost::filesystem::path createBenchmarkScratchDirectoryOrDie(
const boost::filesystem::path& workingDirectory);

} // namespace compiler_gym::llvm_service
2 changes: 1 addition & 1 deletion compiler_gym/envs/llvm/service/BenchmarkFactory.cc
Original file line number Diff line number Diff line change
Expand Up @@ -123,7 +123,7 @@ Status BenchmarkFactory::addBitcode(const std::string& uri, const Bitcode& bitco
// the Benchmark class. Suggest a refactor.
BenchmarkDynamicConfigProto realDynamicConfigProto =
(dynamicConfig.has_value() ? *dynamicConfig : BenchmarkDynamicConfigProto());
const fs::path scratchDirectory = createBenchmarkScratchDirectoryOrDie();
const fs::path scratchDirectory = createBenchmarkScratchDirectoryOrDie(workingDirectory_);
BenchmarkDynamicConfig realDynamicConfig =
realizeDynamicConfig(realDynamicConfigProto, scratchDirectory);

Expand Down
1 change: 0 additions & 1 deletion compiler_gym/envs/llvm/service/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,6 @@ cg_cc_library(
::Cost
compiler_gym::service::proto::compiler_gym_service_cc
compiler_gym::util::GrpcStatusMacros
compiler_gym::util::RunfilesPath
compiler_gym::util::Subprocess
ABS_DEPS
Boost::filesystem
Expand Down
16 changes: 15 additions & 1 deletion compiler_gym/service/BUILD
Original file line number Diff line number Diff line change
Expand Up @@ -7,16 +7,20 @@ load("@rules_cc//cc:defs.bzl", "cc_library")

py_library(
name = "service",
srcs = ["__init__.py"],
srcs = [
"__init__.py",
],
visibility = ["//visibility:public"],
deps = [
":compilation_session",
":connection",
# TODO(github.com/facebookresearch/CompilerGym/pull/633):
# add this after circular dependencies are resolved
# ":client_service_compiler_env",
":service_cache",
"//compiler_gym/errors",
"//compiler_gym/service/proto",
"//compiler_gym/util",
],
)

Expand Down Expand Up @@ -46,6 +50,7 @@ py_library(
srcs = ["connection.py"],
visibility = ["//visibility:public"],
deps = [
":service_cache",
"//compiler_gym/errors",
"//compiler_gym/service/proto",
"//compiler_gym/util",
Expand All @@ -70,3 +75,12 @@ py_library(
"//compiler_gym/views",
],
)

py_library(
name = "service_cache",
srcs = ["service_cache.py"],
visibility = ["//visibility:public"],
deps = [
"//compiler_gym/util",
],
)
12 changes: 12 additions & 0 deletions compiler_gym/service/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ cg_py_library(
# TODO(github.com/facebookresearch/CompilerGym/pull/633):
# add this after circular dependencies are resolved
#::client_service_compiler_env
::service_cache
compiler_gym::errors::errors
compiler_gym::service::proto::proto
PUBLIC
Expand Down Expand Up @@ -52,6 +53,7 @@ cg_py_library(
SRCS
"connection.py"
DEPS
::service_cache
compiler_gym::errors::errors
compiler_gym::service::proto::proto
compiler_gym::util::util
Expand All @@ -76,3 +78,13 @@ cg_py_library(
compiler_gym::views::views
PUBLIC
)

cg_py_library(
NAME
service_cache
SRCS
"service_cache.py"
DEPS
compiler_gym::util::util
PUBLIC
)
57 changes: 19 additions & 38 deletions compiler_gym/service/connection.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,17 +5,15 @@
"""This module contains the logic for connecting to services."""
import logging
import os
import random
import shutil
import subprocess
import sys
from datetime import datetime
from pathlib import Path
from signal import Signals
from time import sleep, time
from typing import Dict, Iterable, List, Optional, TypeVar, Union

import grpc
from deprecated.sphinx import deprecated
from pydantic import BaseModel

import compiler_gym.errors
Expand All @@ -26,12 +24,9 @@
GetSpacesRequest,
ObservationSpace,
)
from compiler_gym.service.service_cache import ServiceCache
from compiler_gym.util.debug_util import get_debug_level, logging_level_to_debug_level
from compiler_gym.util.runfiles_path import (
runfiles_path,
site_data_path,
transient_cache_path,
)
from compiler_gym.util.runfiles_path import runfiles_path, site_data_path
from compiler_gym.util.shell_format import join_cmd, plural
from compiler_gym.util.truncate import truncate_lines

Expand Down Expand Up @@ -297,25 +292,6 @@ def service_is_down(self) -> bool:
return False


def make_working_dir() -> Path:
"""Make a working directory for a service. The calling code is responsible
for removing this directory when done.
"""
while True:
random_hash = random.getrandbits(16)
service_name = datetime.now().strftime(f"s/%m%dT%H%M%S-%f-{random_hash:04x}")
working_dir = transient_cache_path(service_name)
# Guard against the unlike scenario that there is a collision between
# the randomly generated working directories of multiple
# make_working_dir() calls.
try:
(working_dir / "logs").mkdir(parents=True, exist_ok=False)
break
except FileExistsError:
pass
return working_dir


class ManagedConnection(Connection):
"""A connection to a service using a managed subprocess."""

Expand All @@ -337,14 +313,14 @@ def __init__(

if not Path(local_service_binary).is_file():
raise FileNotFoundError(f"File not found: {local_service_binary}")
self.working_dir = make_working_dir()
self.cache = ServiceCache()

# The command that will be executed. The working directory of this
# command will be set to the local_service_binary's parent, so we can
# use the relpath for a neater `ps aux` view.
cmd = [
f"./{local_service_binary.name}",
f"--working_dir={self.working_dir}",
f"--working_dir={self.cache.path}",
]
# Add any custom arguments
cmd += script_args
Expand Down Expand Up @@ -407,7 +383,7 @@ def __init__(

# Read the port from a file generated by the service.
wait_secs = 0.1
port_path = self.working_dir / "port.txt"
port_path = self.cache / "port.txt"
end_time = time() + port_init_max_seconds
while time() < end_time:
returncode = self.process.poll()
Expand All @@ -425,7 +401,7 @@ def __init__(
)
if logs:
msg = f"{msg}\nService logs:\n{logs}"
shutil.rmtree(self.working_dir, ignore_errors=True)
self.cache.close()
raise ServiceError(msg)
if port_path.is_file():
try:
Expand All @@ -445,7 +421,7 @@ def __init__(
else:
self.process.terminate()
self.process.communicate(timeout=rpc_init_max_seconds)
shutil.rmtree(self.working_dir)
self.cache.close()
raise TimeoutError(
"Service failed to produce port file after "
f"{port_init_max_seconds:.1f} seconds"
Expand Down Expand Up @@ -486,14 +462,19 @@ def __init__(
)
logs_message = f" Service logs:\n{logs}" if logs else ""

shutil.rmtree(self.working_dir)
self.cache.close()
raise TimeoutError(
"Failed to connect to RPC service after "
f"{rpc_init_max_seconds:.1f} seconds.{logs_message}"
)

super().__init__(channel, url)

@property
@deprecated(version="0.2.4", reason="Replace `working_directory` with `cache.path`")
def working_dir(self) -> Path:
return self.cache.path

def service_is_down(self) -> bool:
"""Return true if the service subprocess has terminated."""
return self.process.poll() is not None
Expand All @@ -505,9 +486,9 @@ def loglines(self) -> Iterable[str]:
"""
# Compiler services write log files in the logs directory. Iterate over
# them and return their contents.
if not (self.working_dir / "logs").is_dir():
if not (self.cache / "logs").is_dir():
return ()
for path in sorted((self.working_dir / "logs").iterdir()):
for path in sorted((self.cache / "logs").iterdir()):
if not path.is_file():
continue
with open(path) as f:
Expand All @@ -532,7 +513,7 @@ def close(self):
# The service has already been closed, nothing to do.
pass
except ProcessLookupError:
logger.warning("Service process not found at %s", self.working_dir)
logger.warning("Service process not found at %s", self.cache)
except subprocess.TimeoutExpired:
# Try and kill it and then walk away.
try:
Expand All @@ -544,9 +525,9 @@ def close(self):
self.process.communicate(timeout=60)
except: # noqa
pass
logger.warning("Abandoning orphan service at %s", self.working_dir)
logger.warning("Abandoning orphan service at %s", self.cache)
finally:
shutil.rmtree(self.working_dir, ignore_errors=True)
self.cache.close()
super().close()

def __repr__(self):
Expand Down
76 changes: 76 additions & 0 deletions compiler_gym/service/service_cache.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,76 @@
# Copyright (c) Facebook, Inc. and its affiliates.
#
# This source code is licensed under the MIT license found in the
# LICENSE file in the root directory of this source tree.
"""This module defines a filesystem cache for services."""
import os
import random
import shutil
from datetime import datetime
from pathlib import Path

from compiler_gym.util.filesystem import is_in_memory
from compiler_gym.util.runfiles_path import cache_path, transient_cache_path

MAX_CACHE_CONFLICT_RETIRES: int = 1000


def _create_timestamped_unique_service_dir(root: Path) -> Path:
for _ in range(MAX_CACHE_CONFLICT_RETIRES):
random_hash = random.getrandbits(16)
service_name = datetime.now().strftime(f"s/%m%dT%H%M%S-%f-{random_hash:04x}")
path: Path = root / service_name
# Guard against the unlikely scenario that there is a collision between
# the randomly generated working directories of multiple ServiceCache
# constructors.
try:
path.mkdir(parents=True, exist_ok=False)
break
except FileExistsError:
pass
else:
raise OSError(
"Could not create a unique cache directory "
f"after {MAX_CACHE_CONFLICT_RETIRES} retries."
)
return path


class ServiceCache:
"""A filesystem cache for use by managed services.

This provides a directory in which a service can store temporary files and
artifacts. A service can assume exclusive use of this cache. When supported,
the cache will be in an in-memory filesystem.

The cache contains two subdirectories: "logs", which may be used for storing
log files, and "disk", which may be used for storing files that require
being stored on a traditional filesystem. On some Linux distributions,
in-memory filesystems do not permit executing files. See:
<github.com/facebookresearch/CompilerGym/issues/465>
"""

def __init__(self):
self.path = _create_timestamped_unique_service_dir(transient_cache_path("."))
(self.path / "logs").mkdir()

self._directories_to_remove = [self.path]

if is_in_memory(self.path):
disk = _create_timestamped_unique_service_dir(cache_path("."))
self._directories_to_remove.append(disk)
os.symlink(disk, self.path / "disk")
else:
(self.path / "disk").mkdir()

def __truediv__(self, rhs) -> Path:
"""Supports 'cache / "path"' syntax."""
return self.path / rhs

def close(self):
"""Remove the cache directory. This must be called."""
for directory in self._directories_to_remove:
shutil.rmtree(directory, ignore_errors=True)

def __repr__(self) -> str:
return str(self.path)
Loading