From 4cb034edb115870ea69fbd5c2c5891e0b038f01d Mon Sep 17 00:00:00 2001 From: Jojy G Varghese Date: Mon, 14 Sep 2015 18:00:23 -0700 Subject: [PATCH 1/3] Adding new c++11 based dispatch. --- .../libprocess/include/process/dispatch.hpp | 27 +++++++++++++++++++ .../libprocess/include/process/future.hpp | 2 ++ 2 files changed, 29 insertions(+) diff --git a/3rdparty/libprocess/include/process/dispatch.hpp b/3rdparty/libprocess/include/process/dispatch.hpp index c5b81375861..bbfef9df697 100644 --- a/3rdparty/libprocess/include/process/dispatch.hpp +++ b/3rdparty/libprocess/include/process/dispatch.hpp @@ -347,6 +347,33 @@ Future dispatch( #undef TEMPLATE +template +auto dispatch( + const ProcessBase* process, + Callable fn, + Args&&... args) -> typename std::result_of::type +{ + typedef typename std::result_of::type::value_type result_type; //NOLINT + std::shared_ptr> promise(new Promise()); + + auto call(std::bind(fn, std::placeholders::_1, std::forward(args)...)); + + std::shared_ptr> f( + new std::function( + [=](ProcessBase* process) { + assert(process != NULL); + I* t = dynamic_cast(process); + assert(t != NULL); + + promise->associate(call(t)); + })); + + internal::dispatch(process->self(), f, &typeid(fn)); + + return promise->future(); +} + + inline void dispatch( const UPID& pid, const std::function& f) diff --git a/3rdparty/libprocess/include/process/future.hpp b/3rdparty/libprocess/include/process/future.hpp index 9006b8a83d0..3c30505a8cc 100644 --- a/3rdparty/libprocess/include/process/future.hpp +++ b/3rdparty/libprocess/include/process/future.hpp @@ -90,6 +90,8 @@ template class Future { public: + typedef T value_type; + // Constructs a failed future. static Future failed(const std::string& message); From f611143e40064c056039015431e62a3698395131 Mon Sep 17 00:00:00 2001 From: Jojy G Varghese Date: Mon, 14 Sep 2015 18:02:45 -0700 Subject: [PATCH 2/3] Introducing generic process dispatcher --- src/Makefile.am | 1 + src/common/process_dispatcher.hpp | 117 ++++++++++++++++++++++++++++++ 2 files changed, 118 insertions(+) create mode 100644 src/common/process_dispatcher.hpp diff --git a/src/Makefile.am b/src/Makefile.am index 8963cea9fd7..0707ab5f9be 100644 --- a/src/Makefile.am +++ b/src/Makefile.am @@ -706,6 +706,7 @@ libmesos_no_3rdparty_la_SOURCES += \ common/date_utils.hpp \ common/http.hpp \ common/parse.hpp \ + common/process_dispatcher.hpp \ common/protobuf_utils.hpp \ common/recordio.hpp \ common/resources_utils.hpp \ diff --git a/src/common/process_dispatcher.hpp b/src/common/process_dispatcher.hpp new file mode 100644 index 00000000000..192a809849f --- /dev/null +++ b/src/common/process_dispatcher.hpp @@ -0,0 +1,117 @@ +/** + * Licensed to the Apache Software Foundation (ASF) under one + * or more contributor license agreements. See the NOTICE file + * distributed with this work for additional information + * regarding copyright ownership. The ASF licenses this file + * to you under the Apache License, Version 2.0 (the + * "License"); you may not use this file except in compliance + * with the License. You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#ifndef __PROCESS_DISPATCHER_HPP__ +#define __PROCESS_DISPATCHER_HPP__ + +#include +#include + +#include + +#include +#include +#include + +namespace mesos { + +template +class Dispatchable +{ +public: + virtual ~Dispatchable(){} + + template + auto dispatch(Callable fn, Args&&... args) + -> typename std::result_of::type + { + ProcessBase* process = dynamic_cast(getInterface()); + return process::dispatch( + process, + fn, + std::forward(args)...); + } + +protected: + void setInterface(const process::Shared& i) + { + interface = i; + } + + I* getInterface() + { + return const_cast(interface.get()); + } + + process::Shared interface; +}; + +template +class ProcessDispatcher : public Dispatchable +{ +public: + template + static Try>> create(Args&&... args) + { + Try> newProcess(P::create(std::forward(args)...)); + + if (newProcess.isError()) { + return Error(newProcess.error()); + } + + process::Shared sharedInterface(newProcess.get().release()); + + return process::Owned>( + new ProcessDispatcher(sharedInterface)); + } + + static Try> create( + process::Shared process) + { + return(process::Owned(new ProcessDispatcher(process))); + } + + ~ProcessDispatcher() + { + ProcessBase* process = dynamic_cast(this->getInterface()); + + terminate(process); + process::wait(process); + } + +private: + ProcessDispatcher(const process::Shared& i) + :Dispatchable() + { + this->setInterface(i); + + ProcessBase* process = dynamic_cast(const_cast(i.get())); + if (!process) + { + assert(false); + } + + spawn(process); + } + + ProcessDispatcher(const ProcessDispatcher&) = delete; +}; + +} // namespace mesos { + +#endif // __PROCESS_DISPATCHER_HPP__ From 1695c98f20b580bf0654f06b80befae1d8caaf88 Mon Sep 17 00:00:00 2001 From: Jojy G Varghese Date: Mon, 14 Sep 2015 18:35:41 -0700 Subject: [PATCH 3/3] Changed TokenManager to use process dispatcher. --- .../provisioners/docker/token_manager.cpp | 103 +---------- .../provisioners/docker/token_manager.hpp | 98 ++++++++--- .../provisioners/docker_provisioner_tests.cpp | 161 +++++++++++++++++- 3 files changed, 225 insertions(+), 137 deletions(-) diff --git a/src/slave/containerizer/provisioners/docker/token_manager.cpp b/src/slave/containerizer/provisioners/docker/token_manager.cpp index aec915f25f6..1acda505ffa 100644 --- a/src/slave/containerizer/provisioners/docker/token_manager.cpp +++ b/src/slave/containerizer/provisioners/docker/token_manager.cpp @@ -42,68 +42,6 @@ namespace slave { namespace docker { namespace registry { -class TokenManagerProcess : public Process -{ -public: - static Try> create(const URL& realm); - - Future getToken( - const string& service, - const string& scope, - const Option& account); - -private: - static const string TOKEN_PATH_PREFIX; - static const Duration RESPONSE_TIMEOUT; - - TokenManagerProcess(const URL& realm) - : realm_(realm) {} - - Try getTokenFromResponse(const Response& response) const; - - /** - * Key for the token cache. - */ - struct TokenCacheKey - { - string service; - string scope; - }; - - struct TokenCacheKeyHash - { - size_t operator()(const TokenCacheKey& key) const - { - hash hashFn; - - return (hashFn(key.service) ^ - (hashFn(key.scope) << 1)); - } - }; - - struct TokenCacheKeyEqual - { - bool operator()( - const TokenCacheKey& left, - const TokenCacheKey& right) const - { - return ((left.service == right.service) && - (left.scope == right.scope)); - } - }; - - typedef hashmap< - const TokenCacheKey, - Token, - TokenCacheKeyHash, - TokenCacheKeyEqual> TokenCacheType; - - const URL realm_; - TokenCacheType tokenCache_; - - TokenManagerProcess(const TokenManagerProcess&) = delete; - TokenManagerProcess& operator=(const TokenManagerProcess&) = delete; -}; const Duration TokenManagerProcess::RESPONSE_TIMEOUT = Seconds(10); const string TokenManagerProcess::TOKEN_PATH_PREFIX = "/v2/token/"; @@ -233,46 +171,6 @@ bool Token::isValid() const } -Try> TokenManager::create( - const URL& realm) -{ - Try> process = TokenManagerProcess::create(realm); - if (process.isError()) { - return Error(process.error()); - } - - return Owned(new TokenManager(process.get())); -} - - -TokenManager::TokenManager(Owned& process) - : process_(process) -{ - spawn(CHECK_NOTNULL(process_.get())); -} - - -TokenManager::~TokenManager() -{ - terminate(process_.get()); - process::wait(process_.get()); -} - - -Future TokenManager::getToken( - const string& service, - const string& scope, - const Option& account) -{ - return dispatch( - process_.get(), - &TokenManagerProcess::getToken, - service, - scope, - account); -} - - Try> TokenManagerProcess::create(const URL& realm) { return Owned(new TokenManagerProcess(realm)); @@ -308,6 +206,7 @@ Future TokenManagerProcess::getToken( const string& scope, const Option& account) { + std::cout << __FILE__ << ":" << __LINE__ << std::endl; const TokenCacheKey tokenKey = {service, scope}; if (tokenCache_.contains(tokenKey)) { diff --git a/src/slave/containerizer/provisioners/docker/token_manager.hpp b/src/slave/containerizer/provisioners/docker/token_manager.hpp index 879269dab9a..18708d3523a 100644 --- a/src/slave/containerizer/provisioners/docker/token_manager.hpp +++ b/src/slave/containerizer/provisioners/docker/token_manager.hpp @@ -96,10 +96,6 @@ struct Token }; -// Forward declaration. -class TokenManagerProcess; - - /** * Acquires and manages docker registry tokens. It keeps the tokens in its * cache to server any future request for the same token. @@ -110,19 +106,6 @@ class TokenManagerProcess; class TokenManager { public: - /** - * Factory method for creating TokenManager object. - * - * TokenManager and registry authorization realm has a 1:1 relationship. - * - * @param realm URL of the authorization server from where token will be - * requested by this TokenManager. - * @returns Owned if success. - * Error on failure. - */ - static Try> create( - const process::http::URL& realm); - /** * Returns JSON Web Token from cache or from remote server using "Basic * authorization". @@ -136,13 +119,14 @@ class TokenManager * @param password base64 encoded password for basic authorization. * @returns Token struct that encapsulates JSON Web Token. */ - process::Future getToken( + /* + virtual process::Future getToken( const std::string& service, const std::string& scope, const Option& account, const std::string& user, - const Option& password); - + const Option& password) = 0; +*/ /** * Returns JSON Web Token from cache or from remote server using "TLS/Cert" * based authorization. @@ -154,22 +138,82 @@ class TokenManager * @param account Name of the account which the client is acting as. * @returns Token struct that encapsulates JSON Web Token. */ + + virtual process::Future getToken( + const std::string& service, + const std::string& scope, + const Option& account) = 0; + + virtual ~TokenManager() {} +}; + + +class TokenManagerProcess : + public TokenManager, + public process::Process +{ +public: + static Try> create( + const process::http::URL& realm); + process::Future getToken( const std::string& service, const std::string& scope, const Option& account); - ~TokenManager(); - private: - TokenManager(process::Owned& process); + static const std::string TOKEN_PATH_PREFIX; + static const Duration RESPONSE_TIMEOUT; - TokenManager(const TokenManager&) = delete; - TokenManager& operator=(const TokenManager&) = delete; + TokenManagerProcess(const process::http::URL& realm) + : realm_(realm) {} - process::Owned process_; -}; + Try getTokenFromResponse( + const process::http::Response& response) const; + /** + * Key for the token cache. + */ + struct TokenCacheKey + { + std::string service; + std::string scope; + }; + + struct TokenCacheKeyHash + { + size_t operator()(const TokenCacheKey& key) const + { + std::hash hashFn; + + return (hashFn(key.service) ^ + (hashFn(key.scope) << 1)); + } + }; + + struct TokenCacheKeyEqual + { + bool operator()( + const TokenCacheKey& left, + const TokenCacheKey& right) const + { + return ((left.service == right.service) && + (left.scope == right.scope)); + } + }; + + typedef hashmap< + const TokenCacheKey, + Token, + TokenCacheKeyHash, + TokenCacheKeyEqual> TokenCacheType; + + const process::http::URL realm_; + TokenCacheType tokenCache_; + + TokenManagerProcess(const TokenManagerProcess&) = delete; + TokenManagerProcess& operator=(const TokenManagerProcess&) = delete; +}; } // namespace registry { } // namespace docker { } // namespace slave { diff --git a/src/tests/provisioners/docker_provisioner_tests.cpp b/src/tests/provisioners/docker_provisioner_tests.cpp index ff29d562c7f..cec909d4b03 100644 --- a/src/tests/provisioners/docker_provisioner_tests.cpp +++ b/src/tests/provisioners/docker_provisioner_tests.cpp @@ -31,6 +31,7 @@ #include +#include "common/process_dispatcher.hpp" #include "slave/containerizer/provisioners/docker/token_manager.hpp" #include "tests/mesos.hpp" @@ -234,11 +235,152 @@ TEST_F(DockerRegistryClientTest, SimpleGetToken) server.get().address().get().hostname().get(), server.get().address().get().port); - Try> tokenMgr = TokenManager::create(url); - ASSERT_SOME(tokenMgr); + auto tokenManagerProcess = TokenManagerProcess::create(url); + ASSERT_SOME(tokenManagerProcess); + + Shared tmProcess(tokenManagerProcess.get().release()); + auto tokenManager = + ProcessDispatcher::create(tmProcess); + + Future token = + tokenManager.get()->dispatch( + &TokenManager::getToken, + "registry.docker.io", + "repository:library/busybox:pull", + None()); + + AWAIT_ASSERT_READY(socket); + + // Construct response and send(server side). + const double expirySecs = Clock::now().secs() + Days(365).secs(); + + claimsJsonString = + "{\"access\" \ + :[ \ + { \ + \"type\":\"repository\", \ + \"name\":\"library/busybox\", \ + \"actions\":[\"pull\"]}], \ + \"aud\":\"registry.docker.io\", \ + \"exp\":" + stringify(expirySecs) + ", \ + \"iat\":1438887168, \ + \"iss\":\"auth.docker.io\", \ + \"jti\":\"l2PJDFkzwvoL7-TajJF7\", \ + \"nbf\":1438887166, \ + \"sub\":\"\" \ + }"; + + const string tokenString(getTokenString()); + const string tokenResponse = "{\"token\":\"" + tokenString + "\"}"; + + const string buffer = + string("HTTP/1.1 200 OK\r\n") + + "Content-Length : " + + stringify(tokenResponse.length()) + "\r\n" + + "\r\n" + + tokenResponse; + + AWAIT_ASSERT_READY(Socket(socket.get()).send(buffer)); + + AWAIT_ASSERT_READY(token); + ASSERT_EQ(token.get().raw, tokenString); +} + + +TEST_F(DockerRegistryClientTest, TokenManagerInterface) +{ + class AnotherTokenManager : + public TokenManager, + public Process + { + public: + static Owned create(const string& str) + { + return Owned(new AnotherTokenManager(str)); + } + + AnotherTokenManager(const string str) + :str_(str) {} + + process::Future getToken( + const std::string& service, + const std::string& scope, + const Option& account) + { + return Failure("AnotherTokenManager"); + } + + private: + const string str_; + }; + + Try server = setup_server({ + {"SSL_ENABLED", "true"}, + {"SSL_KEY_FILE", key_path().value}, + {"SSL_CERT_FILE", certificate_path().value}}); + + ASSERT_SOME(server); + ASSERT_SOME(server.get().address()); + ASSERT_SOME(server.get().address().get().hostname()); + + Future socket = server.get().accept(); + + // Create URL from server hostname and port. + const http::URL url( + "https", + server.get().address().get().hostname().get(), + server.get().address().get().port); + + vector>> tokenManagerList; + + Try>> tokenManagerProcess1 = + ProcessDispatcher::create(url); + ASSERT_SOME(tokenManagerProcess1); + + + Try>> tokenManagerProcess2 = + ProcessDispatcher::create("test"); + + ASSERT_SOME(tokenManagerProcess2); + + tokenManagerList.push_back(tokenManagerProcess1.get()); + tokenManagerList.push_back(tokenManagerProcess2.get()); + + foreach (Owned>& i, tokenManagerList) { + i->dispatch( + &TokenManager::getToken, + "registry.docker.io", + "repository:library/busybox:pull", + None()); + } +} + + +TEST_F(DockerRegistryClientTest, DispatchOwnsTokenManager) +{ + Try server = setup_server({ + {"SSL_ENABLED", "true"}, + {"SSL_KEY_FILE", key_path().value}, + {"SSL_CERT_FILE", certificate_path().value}}); + + ASSERT_SOME(server); + ASSERT_SOME(server.get().address()); + ASSERT_SOME(server.get().address().get().hostname()); + + Future socket = server.get().accept(); + + // Create URL from server hostname and port. + const http::URL url( + "https", + server.get().address().get().hostname().get(), + server.get().address().get().port); + + auto tokenManager = + ProcessDispatcher::create(url); Future token = - tokenMgr.get()->getToken( + tokenManager.get()->dispatch( + &TokenManager::getToken, "registry.docker.io", "repository:library/busybox:pull", None()); @@ -301,11 +443,13 @@ TEST_F(DockerRegistryClientTest, BadTokenResponse) server.get().address().get().hostname().get(), server.get().address().get().port); - Try> tokenMgr = TokenManager::create(url); + auto tokenMgr = + ProcessDispatcher::create(url); ASSERT_SOME(tokenMgr); Future token = - tokenMgr.get()->getToken( + tokenMgr.get()->dispatch( + &TokenManager::getToken, "registry.docker.io", "repository:library/busybox:pull", None()); @@ -334,11 +478,13 @@ TEST_F(DockerRegistryClientTest, BadTokenServerAddress) // Create an invalid URL with current time. const http::URL url("https", stringify(Clock::now().secs()), 0); - Try> tokenMgr = TokenManager::create(url); + auto tokenMgr = + ProcessDispatcher::create(url); ASSERT_SOME(tokenMgr); Future token = - tokenMgr.get()->getToken( + tokenMgr.get()->dispatch( + &TokenManager::getToken, "registry.docker.io", "repository:library/busybox:pull", None()); @@ -348,7 +494,6 @@ TEST_F(DockerRegistryClientTest, BadTokenServerAddress) #endif // USE_SSL_SOCKET - } // namespace tests { } // namespace internal { } // namespace mesos {