Skip to content

Commit

Permalink
Add directory monitoring for model loading
Browse files Browse the repository at this point in the history
  • Loading branch information
varunsh-xilinx committed Jun 13, 2022
1 parent abab0ad commit 6459797
Show file tree
Hide file tree
Showing 6 changed files with 111 additions and 4 deletions.
9 changes: 9 additions & 0 deletions Dockerfile
Expand Up @@ -458,6 +458,15 @@ RUN VERSION=2.9.1 \
&& cd /tmp \
&& rm -fr /tmp/*

# install efsw for directory monitoring
RUN git clone https://github.com/SpartanJ/efsw.git \
&& cd efsw && mkdir build && cd build \
&& cmake .. \
&& make -j \
&& make install \
&& cat install_manifest.txt | xargs -i bash -c "if [ -f {} ]; then cp --parents -P {} ${COPY_DIR}; fi" \
&& cd /tmp && rm -fr /tmp/*

# Delete /usr/local/man which is a symlink and cannot be copied later by BuildKit.
# Note: this works without BuildKit: https://github.com/docker/buildx/issues/150
# RUN cp -rf ${COPY_DIR}/usr/local/man/ ${COPY_DIR}/usr/local/share/man/ \
Expand Down
1 change: 1 addition & 0 deletions docs/dependencies.rst
Expand Up @@ -129,6 +129,7 @@ The following packages are installed in the Xilinx Inference Server dev containe
:github:`jarro2783/cxxopts`,2.2.1,MIT,Statically linked by proteus-server for command-line argument parsing
:github:`gdraheim/docker-systemctl-replacement`,1.5.4505,EUPL,Executable created by pyinstaller for starting XRM
:github:`drogonframework/drogon`,1.7.5,MIT,Dynamically linked by proteus-server for an HTTP and websocket server
:github:`SpartanJ/efsw`,latest,MIT,Dynamically linked by proteus-server for directory monitoring
:github:`tschaub/gh-pages`,latest,MIT,Executable used to publish documentation to gh-pages branch
:github:`git-lfs/git-lfs`,2.13.3,MIT + others,Executable used to manage large files in git
:github:`tianon/gosu`,1.12,Apache 2.0,Executable used to drop down to user when starting container
Expand Down
4 changes: 2 additions & 2 deletions src/proteus/CMakeLists.txt
Expand Up @@ -20,7 +20,7 @@ endif()
target_link_options(proteus-server PRIVATE "LINKER:-E")
enable_ipo_on_target(proteus-server)

target_link_libraries(proteus-server PRIVATE batching buffers clients core servers)
target_link_libraries(proteus-server PRIVATE batching buffers clients core servers efsw)

add_subdirectory(batching)
add_subdirectory(bindings)
Expand Down Expand Up @@ -52,7 +52,7 @@ add_library(proteus ${TYPE}
${SERVER_OBJS}
)
enable_ipo_on_target(proteus)
target_link_libraries(proteus PRIVATE helpers Threads::Threads jsoncpp ${CMAKE_DL_LIBS} lib_config)
target_link_libraries(proteus PRIVATE helpers Threads::Threads jsoncpp ${CMAKE_DL_LIBS} lib_config efsw)
if(${PROTEUS_ENABLE_VITIS})
target_link_libraries(proteus PRIVATE xir)
endif()
Expand Down
54 changes: 54 additions & 0 deletions src/proteus/core/model_repository.cpp
Expand Up @@ -23,7 +23,10 @@
#include <fstream>

#include "model_config.pb.h"
#include "proteus/core/manager.hpp"
#include "proteus/core/predict_api.hpp"
#include "proteus/core/worker_info.hpp"
#include "proteus/observation/logging.hpp"

namespace fs = std::filesystem;

Expand Down Expand Up @@ -111,4 +114,55 @@ void ModelRepository::ModelRepositoryImpl::modelLoad(
mapProtoToParameters2(config.parameters(), parameters);
}

void UpdateListener::handleFileAction([[maybe_unused]] efsw::WatchID watchid,
const std::string& dir,
const std::string& filename,
efsw::Action action,
std::string oldFilename) {
Logger logger{Loggers::kServer};
if (filename == "config.pbtxt") {
if (action == efsw::Actions::Add) {
// arbitrary delay to make sure filesystem has settled
std::this_thread::sleep_for(std::chrono::milliseconds(100));
auto model = fs::path(dir).parent_path().filename();
// TODO(varunsh): replace with native client
RequestParameters params;
try {
ModelRepository::modelLoad(model, &params);
Manager::getInstance().loadWorker(model, params);
} catch (const std::runtime_error& e) {
PROTEUS_LOG_INFO(logger, "Error loading " + model.string());
}
} else if (action == efsw::Actions::Delete) {
// arbitrary delay to make sure filesystem has settled
std::this_thread::sleep_for(std::chrono::milliseconds(100));
auto model = fs::path(dir).parent_path().filename();
// TODO(varunsh): replace with native client
Manager::getInstance().unloadWorker(model);
}
}

switch (action) {
case efsw::Actions::Add:
PROTEUS_LOG_DEBUG(
logger, "DIR (" + dir + ") FILE (" + filename + ") has event Added");
break;
case efsw::Actions::Delete:
PROTEUS_LOG_DEBUG(
logger, "DIR (" + dir + ") FILE (" + filename + ") has event Delete");
break;
case efsw::Actions::Modified:
PROTEUS_LOG_DEBUG(
logger, "DIR (" + dir + ") FILE (" + filename + ") has event Modified");
break;
case efsw::Actions::Moved:
PROTEUS_LOG_DEBUG(logger, "DIR (" + dir + ") FILE (" + filename +
") has event Moved from (" + oldFilename +
")");
break;
default:
PROTEUS_LOG_ERROR(logger, "Should never happen");
}
}

} // namespace proteus
8 changes: 8 additions & 0 deletions src/proteus/core/model_repository.hpp
Expand Up @@ -15,13 +15,21 @@
#ifndef GUARD_PROTEUS_CORE_MODEL_REPOSITORY
#define GUARD_PROTEUS_CORE_MODEL_REPOSITORY

#include <efsw/efsw.hpp>
#include <filesystem>
#include <string>

namespace proteus {

class RequestParameters;

class UpdateListener : public efsw::FileWatchListener {
public:
void handleFileAction(efsw::WatchID watchid, const std::string& dir,
const std::string& filename, efsw::Action action,
std::string oldFilename) override;
};

class ModelRepository {
public:
static void modelLoad(const std::string& model,
Expand Down
39 changes: 37 additions & 2 deletions src/proteus/main.cpp
Expand Up @@ -20,15 +20,20 @@
#include <csignal> // for signal, SIGINT
#include <cstdlib> // for exit
#include <cxxopts/cxxopts.hpp> // for Options, value, OptionAdder
#include <iostream> // for operator<<, endl, basic_o...
#include <string> // for string, allocator, operat...
#include <efsw/efsw.hpp> // for FileWatcher
#include <filesystem>
#include <iostream> // for operator<<, endl, basic_o...
#include <string> // for string, allocator, operat...

#include "proteus/build_options.hpp" // for PROTEUS_ENABLE_HTTP, PROT...
#include "proteus/clients/native.hpp" // for initialize, terminate
#include "proteus/core/model_repository.hpp" // for ModelRepository
#include "proteus/observation/logging.hpp" // for logger
#include "proteus/servers/grpc_server.hpp" // for start, stop
#include "proteus/servers/http_server.hpp" // for start, stop

namespace fs = std::filesystem;

/**
* @brief Handler for incoming interrupt signals
*
Expand Down Expand Up @@ -59,13 +64,20 @@ int main(int argc, char* argv[]) {
int grpc_port = kDefaultGrpcPort;
#endif
std::string model_repository = "/mnt/models";
bool enable_repository_watcher = false;
bool use_polling_watcher = false;

try {
cxxopts::Options options("proteus-server", "Inference in the cloud");
// clang-format off
options.add_options()
("model-repository", "Path to the model repository",
cxxopts::value<std::string>(model_repository))
("enable-repository-watcher",
"Actively monitor the model-repository directory for new models",
cxxopts::value<bool>(enable_repository_watcher))
("use-polling-watcher", "Use polling to monitor model-repository directory",
cxxopts::value<bool>(use_polling_watcher))
#ifdef PROTEUS_ENABLE_HTTP
("http-port", "Port to use for HTTP server", cxxopts::value<int>(http_port))
#endif
Expand All @@ -87,9 +99,32 @@ int main(int argc, char* argv[]) {
}

proteus::initialize();
proteus::Logger logger{proteus::Loggers::kServer};

proteus::ModelRepository::setRepository(model_repository);

std::unique_ptr<efsw::FileWatcher> file_watcher;
std::unique_ptr<proteus::UpdateListener> listener;
if (enable_repository_watcher) {
file_watcher = std::make_unique<efsw::FileWatcher>(use_polling_watcher);
listener = std::make_unique<proteus::UpdateListener>();

file_watcher->addWatch(model_repository, listener.get(), true);
file_watcher->watch();

proteus::NativeClient client;
for (const auto& path : fs::directory_iterator(model_repository)) {
if (path.is_directory()) {
auto model = path.path().filename();
try {
client.modelLoad(model, nullptr);
} catch (const std::runtime_error& e) {
PROTEUS_LOG_INFO(logger, "Error loading " + model.string());
}
}
}
}

#ifdef PROTEUS_ENABLE_GRPC
std::cout << "gRPC server starting at port " << grpc_port << "\n";
proteus::grpc::start(grpc_port);
Expand Down

0 comments on commit 6459797

Please sign in to comment.