Permalink
628 lines (510 sloc) 19.7 KB
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#include <string>
#include <vector>
#include <process/owned.hpp>
#include <process/subprocess.hpp>
#include <stout/archiver.hpp>
#include <stout/json.hpp>
#include <stout/net.hpp>
#include <stout/option.hpp>
#include <stout/os.hpp>
#include <stout/path.hpp>
#include <stout/protobuf.hpp>
#include <stout/strings.hpp>
#include <stout/os/constants.hpp>
#include <stout/os/copyfile.hpp>
#include <mesos/mesos.hpp>
#include <mesos/fetcher/fetcher.hpp>
#include "common/status_utils.hpp"
#include "hdfs/hdfs.hpp"
#include "logging/flags.hpp"
#include "logging/logging.hpp"
#include "slave/slave.hpp"
#include "slave/containerizer/fetcher.hpp"
using namespace process;
using namespace mesos;
using namespace mesos::internal;
using std::string;
using std::vector;
using mesos::fetcher::FetcherInfo;
using mesos::internal::slave::Fetcher;
// Try to extract sourcePath into directory. If sourcePath is
// recognized as an archive it will be extracted and true returned;
// if not recognized then false will be returned. An Error is
// returned if the extraction command fails.
static Try<bool> extract(
const string& sourcePath,
const string& destinationDirectory)
{
Try<Nothing> result = Nothing();
Option<Subprocess::IO> in = None();
Option<Subprocess::IO> out = None();
vector<string> command;
// Extract any .tar, .tgz, tar.gz, tar.bz2 or zip files.
if (strings::endsWith(sourcePath, ".tar") ||
strings::endsWith(sourcePath, ".tgz") ||
strings::endsWith(sourcePath, ".tar.gz") ||
strings::endsWith(sourcePath, ".tbz2") ||
strings::endsWith(sourcePath, ".tar.bz2") ||
strings::endsWith(sourcePath, ".txz") ||
strings::endsWith(sourcePath, ".tar.xz") ||
strings::endsWith(sourcePath, ".zip")) {
Try<Nothing> result = archiver::extract(sourcePath, destinationDirectory);
if (result.isError()) {
return Error(
"Failed to extract archive '" + sourcePath +
"' to '" + destinationDirectory + "': " + result.error());
}
return true;
} else if (strings::endsWith(sourcePath, ".gz")) {
// Unfortunately, libarchive can't extract bare files, so leave this to
// the 'gunzip' program, if it exists.
string pathWithoutExtension = sourcePath.substr(0, sourcePath.length() - 3);
string filename = Path(pathWithoutExtension).basename();
string destinationPath = path::join(destinationDirectory, filename);
command = {"gunzip", "-d", "-c"};
in = Subprocess::PATH(sourcePath);
out = Subprocess::PATH(destinationPath);
} else {
return false;
}
CHECK_GT(command.size(), 0u);
Try<Subprocess> extractProcess = subprocess(
command[0],
command,
in.getOrElse(Subprocess::PATH(os::DEV_NULL)),
out.getOrElse(Subprocess::FD(STDOUT_FILENO)),
Subprocess::FD(STDERR_FILENO));
if (extractProcess.isError()) {
return Error(
"Failed to extract '" + sourcePath + "': '" +
strings::join(" ", command) + "' failed: " +
extractProcess.error());
}
// `status()` never fails or gets discarded.
int status = extractProcess->status()->get();
if (!WSUCCEEDED(status)) {
return Error(
"Failed to extract '" + sourcePath + "': '" +
strings::join(" ", command) + "' failed: " +
WSTRINGIFY(status));
}
LOG(INFO) << "Extracted '" << sourcePath << "' into '"
<< destinationDirectory << "'";
return true;
}
// Attempt to get the uri using the hadoop client.
static Try<string> downloadWithHadoopClient(
const string& sourceUri,
const string& destinationPath)
{
Try<Owned<HDFS>> hdfs = HDFS::create();
if (hdfs.isError()) {
return Error("Failed to create HDFS client: " + hdfs.error());
}
LOG(INFO) << "Downloading resource with Hadoop client from '" << sourceUri
<< "' to '" << destinationPath << "'";
Future<Nothing> result = hdfs.get()->copyToLocal(sourceUri, destinationPath);
result.await();
if (!result.isReady()) {
return Error("HDFS copyToLocal failed: " +
(result.isFailed() ? result.failure() : "discarded"));
}
return destinationPath;
}
static Try<string> downloadWithNet(
const string& sourceUri,
const string& destinationPath,
const Option<Duration>& stallTimeout)
{
// The net::download function only supports these protocols.
CHECK(strings::startsWith(sourceUri, "http://") ||
strings::startsWith(sourceUri, "https://") ||
strings::startsWith(sourceUri, "ftp://") ||
strings::startsWith(sourceUri, "ftps://"));
LOG(INFO) << "Downloading resource from '" << sourceUri
<< "' to '" << destinationPath << "'";
Try<int> code = net::download(sourceUri, destinationPath, stallTimeout);
if (code.isError()) {
return Error("Error downloading resource: " + code.error());
} else {
// The status code for successful HTTP requests is 200, the status code
// for successful FTP file transfers is 226.
if (strings::startsWith(sourceUri, "ftp://") ||
strings::startsWith(sourceUri, "ftps://")) {
if (code.get() != 226) {
return Error("Error downloading resource, received FTP return code " +
stringify(code.get()));
}
} else {
if (code.get() != 200) {
return Error("Error downloading resource, received HTTP return code " +
stringify(code.get()));
}
}
}
return destinationPath;
}
// TODO(coffler): Refactor code to eliminate redundant function.
static Try<string> copyFile(
const string& sourcePath, const string& destinationPath)
{
const Try<Nothing> result = os::copyfile(sourcePath, destinationPath);
if (result.isError()) {
return Error(result.error());
}
return destinationPath;
}
static Try<string> download(
const string& _sourceUri,
const string& destinationPath,
const Option<string>& frameworksHome,
const Option<Duration>& stallTimeout)
{
// Trim leading whitespace for 'sourceUri'.
const string sourceUri = strings::trim(_sourceUri, strings::PREFIX);
Try<Nothing> validation = Fetcher::validateUri(sourceUri);
if (validation.isError()) {
return Error(validation.error());
}
// 1. Try to fetch using a local copy.
// We regard as local: "file://" or the absence of any URI scheme.
Result<string> sourcePath =
Fetcher::uriToLocalPath(sourceUri, frameworksHome);
if (sourcePath.isError()) {
return Error(sourcePath.error());
} else if (sourcePath.isSome()) {
return copyFile(sourcePath.get(), destinationPath);
}
// 2. Try to fetch URI using os::net / libcurl implementation.
// We consider http, https, ftp, ftps compatible with libcurl.
if (Fetcher::isNetUri(sourceUri)) {
return downloadWithNet(sourceUri, destinationPath, stallTimeout);
}
// 3. Try to fetch the URI using hadoop client.
// We use the hadoop client to fetch any URIs that are not
// handled by other fetchers(local / os::net). These URIs may be
// `hdfs://` URIs or any other URI that has been configured (and
// hence handled) in the hadoop client. This allows mesos to
// externalize the handling of previously unknown resource
// endpoints without the need to support them natively.
// Note: Hadoop Client is not a hard dependency for running mesos.
// This allows users to get mesos up and running without a
// hadoop_home or the hadoop client setup but in case we reach
// this part and don't have one configured, the fetch would fail
// and log an appropriate error.
return downloadWithHadoopClient(sourceUri, destinationPath);
}
// TODO(bernd-mesos): Refactor this into stout so that we can more easily
// chmod an executable. For example, we could define some static flags
// so that someone can do: os::chmod(path, EXECUTABLE_CHMOD_FLAGS).
static Try<string> chmodExecutable(const string& filePath)
{
// TODO(coffler): Fix Windows chmod handling, see MESOS-3176.
#ifndef __WINDOWS__
Try<Nothing> chmod = os::chmod(
filePath, S_IRWXU | S_IRGRP | S_IXGRP | S_IROTH | S_IXOTH);
if (chmod.isError()) {
return Error("Failed to chmod executable '" +
filePath + "': " + chmod.error());
}
#endif // __WINDOWS__
return filePath;
}
// Returns the resulting file or in case of extraction the destination
// directory (for logging).
static Try<string> fetchBypassingCache(
const CommandInfo::URI& uri,
const string& sandboxDirectory,
const Option<string>& frameworksHome,
const Option<Duration>& stallTimeout)
{
LOG(INFO) << "Fetching '" << uri.value()
<< "' directly into the sandbox directory";
// TODO(mrbrowning): Factor out duplicated processing of "output_file" field
// here and in fetchFromCache into a separate helper function.
if (uri.has_output_file()) {
string dirname = Path(uri.output_file()).dirname();
if (dirname != ".") {
Try<Nothing> result =
os::mkdir(path::join(sandboxDirectory, dirname), true);
if (result.isError()) {
return Error(
"Unable to create subdirectory " + dirname +
" in sandbox: " + result.error());
}
}
}
Try<string> outputFile = uri.has_output_file()
? uri.output_file()
: Fetcher::basename(uri.value());
if (outputFile.isError()) {
return Error(outputFile.error());
}
string path = path::join(sandboxDirectory, outputFile.get());
Try<string> downloaded =
download(uri.value(), path, frameworksHome, stallTimeout);
if (downloaded.isError()) {
return Error(downloaded.error());
}
if (uri.executable()) {
return chmodExecutable(downloaded.get());
} else if (uri.extract()) {
Try<bool> extracted = extract(path, sandboxDirectory);
if (extracted.isError()) {
return Error(extracted.error());
} else if (!extracted.get()) {
LOG(WARNING) << "Copying instead of extracting resource from URI with "
<< "'extract' flag, because it does not seem to be an "
<< "archive: " << uri.value();
}
}
return downloaded;
}
// Returns the resulting file or in case of extraction the destination
// directory (for logging).
static Try<string> fetchFromCache(
const FetcherInfo::Item& item,
const string& cacheDirectory,
const string& sandboxDirectory)
{
LOG(INFO) << "Fetching URI '" << item.uri().value() << "' from cache";
if (item.uri().has_output_file()) {
string dirname = Path(item.uri().output_file()).dirname();
if (dirname != ".") {
Try<Nothing> result =
os::mkdir(path::join(sandboxDirectory, dirname), true);
if (result.isError()) {
return Error(
"Unable to create subdirectory " + dirname +
" in sandbox: " + result.error());
}
}
}
Try<string> outputFile = item.uri().has_output_file()
? item.uri().output_file()
: Fetcher::basename(item.uri().value());
if (outputFile.isError()) {
return Error(outputFile.error());
}
string destinationPath = path::join(sandboxDirectory, outputFile.get());
// Non-empty cache filename is guaranteed by the callers of this function.
CHECK(!item.cache_filename().empty());
string sourcePath = path::join(cacheDirectory, item.cache_filename());
if (item.uri().executable()) {
Try<string> copied = copyFile(sourcePath, destinationPath);
if (copied.isError()) {
return Error(copied.error());
}
return chmodExecutable(copied.get());
} else if (item.uri().extract()) {
Try<bool> extracted = extract(sourcePath, sandboxDirectory);
if (extracted.isError()) {
return Error(extracted.error());
} else if (extracted.get()) {
return sandboxDirectory;
} else {
LOG(WARNING) << "Copying instead of extracting resource from URI with "
<< "'extract' flag, because it does not seem to be an "
<< "archive: " << item.uri().value();
}
}
return copyFile(sourcePath, destinationPath);
}
// Returns the resulting file or in case of extraction the destination
// directory (for logging).
static Try<string> fetchThroughCache(
const FetcherInfo::Item& item,
const Option<string>& cacheDirectory,
const string& sandboxDirectory,
const Option<string>& frameworksHome,
const Option<Duration>& stallTimeout)
{
if (cacheDirectory.isNone() || cacheDirectory->empty()) {
return Error("Cache directory not specified");
}
if (!item.has_cache_filename() || item.cache_filename().empty()) {
// This should never happen if this program is used by the Mesos
// slave and could then be a CHECK. But other uses are possible.
return Error("No cache file name for: " + item.uri().value());
}
CHECK_NE(FetcherInfo::Item::BYPASS_CACHE, item.action())
<< "Unexpected fetcher action selector";
CHECK(os::exists(cacheDirectory.get()))
<< "Fetcher cache directory was expected to exist but was not found";
if (item.action() == FetcherInfo::Item::DOWNLOAD_AND_CACHE) {
const string cachePath =
path::join(cacheDirectory.get(), item.cache_filename());
if (!os::exists(cachePath)) {
Try<string> downloaded = download(
item.uri().value(),
cachePath,
frameworksHome,
stallTimeout);
if (downloaded.isError()) {
return Error(downloaded.error());
}
}
}
return fetchFromCache(item, cacheDirectory.get(), sandboxDirectory);
}
// Returns the resulting file or in case of extraction the destination
// directory (for logging).
static Try<string> fetch(
const FetcherInfo::Item& item,
const Option<string>& cacheDirectory,
const string& sandboxDirectory,
const Option<string>& frameworksHome,
const Option<Duration>& stallTimeout)
{
LOG(INFO) << "Fetching URI '" << item.uri().value() << "'";
if (item.action() == FetcherInfo::Item::BYPASS_CACHE) {
return fetchBypassingCache(
item.uri(),
sandboxDirectory,
frameworksHome,
stallTimeout);
}
return fetchThroughCache(
item,
cacheDirectory,
sandboxDirectory,
frameworksHome,
stallTimeout);
}
// Checks to see if it's necessary to create a fetcher cache directory for this
// user, and creates it if so.
static Try<Nothing> createCacheDirectory(const FetcherInfo& fetcherInfo)
{
if (!fetcherInfo.has_cache_directory()) {
return Nothing();
}
foreach (const FetcherInfo::Item& item, fetcherInfo.items()) {
if (item.action() != FetcherInfo::Item::BYPASS_CACHE) {
// If this user has fetched anything into the cache before, their cache
// directory will already exist. Set `recursive = true` when calling
// `os::mkdir` to ensure no error is returned in this case.
Try<Nothing> mkdir = os::mkdir(fetcherInfo.cache_directory(), true);
if (mkdir.isError()) {
return mkdir;
}
if (fetcherInfo.has_user()) {
// Fetching is performed as the task's user,
// so chown the cache directory.
// TODO(coffler): Fix Windows chown handling, see MESOS-8063.
#ifndef __WINDOWS__
Try<Nothing> chown = os::chown(
fetcherInfo.user(),
fetcherInfo.cache_directory(),
false);
if (chown.isError()) {
return chown;
}
#endif // __WINDOWS__
}
break;
}
}
return Nothing();
}
// This "fetcher program" is invoked by the slave's fetcher actor
// (Fetcher, FetcherProcess) to "fetch" URIs into the sandbox directory
// of a given task. Its parameters are provided in the form of the env
// var MESOS_FETCHER_INFO which contains a FetcherInfo (see
// fetcher.proto) object formatted in JSON. These are set by the actor
// to indicate what set of URIs to process and how to proceed with
// each one. A URI can be downloaded directly to the task's sandbox
// directory or it can be copied to a cache first or it can be reused
// from the cache, avoiding downloading. All cache management and
// bookkeeping is centralized in the slave's fetcher actor, which can
// have multiple instances of this fetcher program running at any
// given time. Exit code: 0 if entirely successful, otherwise 1.
int main(int argc, char* argv[])
{
GOOGLE_PROTOBUF_VERIFY_VERSION;
mesos::internal::logging::Flags flags;
Try<flags::Warnings> load = flags.load("MESOS_", argc, argv);
if (flags.help) {
std::cout << flags.usage() << std::endl;
return EXIT_SUCCESS;
}
if (load.isError()) {
std::cerr << flags.usage(load.error()) << std::endl;
return EXIT_FAILURE;
}
logging::initialize(argv[0], true, flags); // Catch signals.
// Log any flag warnings (after logging is initialized).
foreach (const flags::Warning& warning, load->warnings) {
LOG(WARNING) << warning.message;
}
const Option<string> jsonFetcherInfo = os::getenv("MESOS_FETCHER_INFO");
CHECK_SOME(jsonFetcherInfo)
<< "Missing MESOS_FETCHER_INFO environment variable";
LOG(INFO) << "Fetcher Info: " << jsonFetcherInfo.get();
Try<JSON::Object> parse = JSON::parse<JSON::Object>(jsonFetcherInfo.get());
CHECK_SOME(parse) << "Failed to parse MESOS_FETCHER_INFO: " << parse.error();
Try<FetcherInfo> fetcherInfo = ::protobuf::parse<FetcherInfo>(parse.get());
CHECK_SOME(fetcherInfo)
<< "Failed to parse FetcherInfo: " << fetcherInfo.error();
CHECK(!fetcherInfo->sandbox_directory().empty())
<< "Missing sandbox directory";
const string sandboxDirectory = fetcherInfo->sandbox_directory();
Try<Nothing> result = createCacheDirectory(fetcherInfo.get());
if (result.isError()) {
EXIT(EXIT_FAILURE)
<< "Could not create the fetcher cache directory: " << result.error();
}
// If the `FetcherInfo` specifies a user, use `os::su()` to fetch files as the
// task's user to ensure that filesystem permissions are enforced.
if (fetcherInfo->has_user()) {
// TODO(coffler): No support for os::su on Windows, see MESOS-8063
#ifndef __WINDOWS__
result = os::su(fetcherInfo->user());
if (result.isError()) {
EXIT(EXIT_FAILURE) << "Fetcher could not execute `os::su()` for user '"
<< fetcherInfo->user() << "'";
}
#endif // __WINDOWS__
}
const Option<string> cacheDirectory =
fetcherInfo->has_cache_directory()
? Option<string>::some(fetcherInfo->cache_directory())
: Option<string>::none();
const Option<string> frameworksHome =
fetcherInfo->has_frameworks_home()
? Option<string>::some(fetcherInfo->frameworks_home())
: Option<string>::none();
const Option<Duration> stallTimeout =
fetcherInfo->has_stall_timeout()
? Nanoseconds(fetcherInfo->stall_timeout().nanoseconds())
: Option<Duration>::none();
// Fetch each URI to a local file and chmod if necessary.
foreach (const FetcherInfo::Item& item, fetcherInfo->items()) {
Try<string> fetched = fetch(
item, cacheDirectory, sandboxDirectory, frameworksHome, stallTimeout);
if (fetched.isError()) {
EXIT(EXIT_FAILURE)
<< "Failed to fetch '" << item.uri().value() << "': " + fetched.error();
} else {
LOG(INFO) << "Fetched '" << item.uri().value()
<< "' to '" << fetched.get() << "'";
}
}
LOG(INFO) << "Successfully fetched all URIs into "
<< "'" << sandboxDirectory << "'";
return 0;
}