From e55309ad8274164e23782a5cb53de93ffa0bc24e Mon Sep 17 00:00:00 2001 From: Jie Yu Date: Thu, 2 Nov 2017 14:19:19 +0100 Subject: [PATCH] Introduced ResourceConversion to represent conversion to Resources. Currently, we couple offer operations with resource conversions. For instance, we have interfaces like `Resources::apply` that takes an offer operation. This becomes less ideal because an offer operation might not represent a `conversion` anymore with new offer operation like `CREATE_VOLUME` being introduced. This patch decoupled resource conversions from offer operations. `Resources::apply` will now take a `ResourceConversion` object. We also provide some helpers to get a `ResourceConversion` from an offer operation (if supported). Review: https://reviews.apache.org/r/63511 --- include/mesos/resources.hpp | 60 +++++-- include/mesos/v1/resources.hpp | 60 +++++-- src/common/resources.cpp | 306 +++++---------------------------- src/common/resources_utils.cpp | 131 ++++++++++++++ src/common/resources_utils.hpp | 17 ++ src/v1/resources.cpp | 306 +++++---------------------------- 6 files changed, 332 insertions(+), 548 deletions(-) diff --git a/include/mesos/resources.hpp b/include/mesos/resources.hpp index 53152b393ec..08c544d05b6 100644 --- a/include/mesos/resources.hpp +++ b/include/mesos/resources.hpp @@ -36,6 +36,7 @@ #include #include #include +#include #include #include @@ -53,6 +54,10 @@ namespace mesos { +// Forward declaration. +class ResourceConversion; + + // NOTE: Resource objects stored in the class are always valid, are in // the "post-reservation-refinement" format, and kept combined if possible. // It is the caller's responsibility to validate any Resource object or @@ -464,26 +469,30 @@ class Resources // example frameworks to leverage. Option find(const Resources& targets) const; - // Certain offer operations alter the offered resources. The - // following methods provide a convenient way to get the transformed - // resources by applying the given offer operation(s). Returns an - // Error if the offer operation(s) cannot be applied. - Try apply( - const Offer::Operation& operation, - const Option& convertedResources = None()) const; + // Applies a resource conversion by taking out the `consumed` + // resources and adding back the `converted` resources. Returns an + // Error if the conversion cannot be applied. + Try apply(const ResourceConversion& conversion) const; + + // Obtains the conversion from the given operation and applies the + // conversion. This method serves a syntax sugar for applying a + // resource conversion. + // TODO(jieyu): Consider remove this method once we updated all the + // call sites. + Try apply(const Offer::Operation& operation) const; template - Try apply(const Iterable& operations) const + Try apply(const Iterable& iterable) const { Resources result = *this; - foreach (const Offer::Operation& operation, operations) { - Try transformed = result.apply(operation); - if (transformed.isError()) { - return Error(transformed.error()); + foreach (const auto& t, iterable) { + Try converted = result.apply(t); + if (converted.isError()) { + return Error(converted.error()); } - result = transformed.get(); + result = converted.get(); } return result; @@ -663,6 +672,31 @@ hashmap operator+( return result; } + +/** + * Represents a resource conversion, usually as a result of an offer + * operation. See more details in `Resources::apply` method. + */ +class ResourceConversion +{ +public: + typedef lambda::function(const Resources&)> PostValidation; + + ResourceConversion( + const Resources& _consumed, + const Resources& _converted, + const Option& _postValidation = None()) + : consumed(_consumed), + converted(_converted), + postValidation(_postValidation) {} + + Try apply(const Resources& resources) const; + + Resources consumed; + Resources converted; + Option postValidation; +}; + } // namespace mesos { #endif // __RESOURCES_HPP__ diff --git a/include/mesos/v1/resources.hpp b/include/mesos/v1/resources.hpp index 6c2191c044c..d59fa35d1bc 100644 --- a/include/mesos/v1/resources.hpp +++ b/include/mesos/v1/resources.hpp @@ -35,6 +35,7 @@ #include #include #include +#include #include #include @@ -53,6 +54,10 @@ namespace mesos { namespace v1 { +// Forward declaration. +class ResourceConversion; + + // NOTE: Resource objects stored in the class are always valid, are in // the "post-reservation-refinement" format, and kept combined if possible. // It is the caller's responsibility to validate any Resource object or @@ -464,26 +469,30 @@ class Resources // example frameworks to leverage. Option find(const Resources& targets) const; - // Certain offer operations alter the offered resources. The - // following methods provide a convenient way to get the transformed - // resources by applying the given offer operation(s). Returns an - // Error if the offer operation(s) cannot be applied. - Try apply( - const Offer::Operation& operation, - const Option& convertedResources = None()) const; + // Applies a resource conversion by taking out the `consumed` + // resources and adding back the `converted` resources. Returns an + // Error if the conversion cannot be applied. + Try apply(const ResourceConversion& conversion) const; + + // Obtains the conversion from the given operation and applies the + // conversion. This method serves a syntax sugar for applying a + // resource conversion. + // TODO(jieyu): Consider remove this method once we updated all the + // call sites. + Try apply(const Offer::Operation& operation) const; template - Try apply(const Iterable& operations) const + Try apply(const Iterable& iterable) const { Resources result = *this; - foreach (const Offer::Operation& operation, operations) { - Try transformed = result.apply(operation); - if (transformed.isError()) { - return Error(transformed.error()); + foreach (const auto& t, iterable) { + Try converted = result.apply(t); + if (converted.isError()) { + return Error(converted.error()); } - result = transformed.get(); + result = converted.get(); } return result; @@ -663,6 +672,31 @@ hashmap operator+( return result; } + +/** + * Represents a resource conversion, usually as a result of an offer + * operation. See more details in `Resources::apply` method. + */ +class ResourceConversion +{ +public: + typedef lambda::function(const Resources&)> PostValidation; + + ResourceConversion( + const Resources& _consumed, + const Resources& _converted, + const Option& _postValidation = None()) + : consumed(_consumed), + converted(_converted), + postValidation(_postValidation) {} + + Try apply(const Resources& resources) const; + + Resources consumed; + Resources converted; + Option postValidation; +}; + } // namespace v1 { } // namespace mesos { diff --git a/src/common/resources.cpp b/src/common/resources.cpp index 914d3e2b1e8..4ccfdc162c8 100644 --- a/src/common/resources.cpp +++ b/src/common/resources.cpp @@ -1623,277 +1623,35 @@ Option Resources::find(const Resources& targets) const } -Try Resources::apply( - const Offer::Operation& operation, - const Option& convertedResources) const +Try Resources::apply(const ResourceConversion& conversion) const { - Resources result = *this; - - switch (operation.type()) { - case Offer::Operation::LAUNCH: - return Error("Cannot apply LAUNCH Operation"); - - case Offer::Operation::LAUNCH_GROUP: - return Error("Cannot apply LAUNCH_GROUP Operation"); - - case Offer::Operation::RESERVE: { - if (convertedResources.isSome()) { - return Error( - "Converted resources not expected for RESERVE Operation"); - } - - Option error = validate(operation.reserve().resources()); - if (error.isSome()) { - return Error("Invalid RESERVE Operation: " + error->message); - } - - foreach (const Resource& reserved, operation.reserve().resources()) { - if (!Resources::isReserved(reserved)) { - return Error("Invalid RESERVE Operation: Resource must be reserved"); - } else if (!Resources::isDynamicallyReserved(reserved)) { - return Error( - "Invalid RESERVE Operation: Resource must be" - " dynamically reserved"); - } - - // Note that we only allow "pushing" a single reservation at time. - Resources resources = Resources(reserved).popReservation(); - - if (!result.contains(resources)) { - return Error("Invalid RESERVE Operation: " + stringify(result) + - " does not contain " + stringify(resources)); - } - - result -= resources; - result.add(reserved); - } - break; - } - - case Offer::Operation::UNRESERVE: { - if (convertedResources.isSome()) { - return Error( - "Converted resources not expected for UNRESERVE Operation"); - } - - Option error = validate(operation.unreserve().resources()); - if (error.isSome()) { - return Error("Invalid UNRESERVE Operation: " + error->message); - } - - foreach (const Resource& reserved, operation.unreserve().resources()) { - if (!Resources::isReserved(reserved)) { - return Error("Invalid UNRESERVE Operation: Resource is not reserved"); - } else if (!Resources::isDynamicallyReserved(reserved)) { - return Error( - "Invalid UNRESERVE Operation: Resource is not" - " dynamically reserved"); - } - - if (!result.contains(reserved)) { - return Error("Invalid UNRESERVE Operation: " + stringify(result) + - " does not contain " + stringify(reserved)); - } - - // Note that we only allow "popping" a single reservation at time. - Resources resources = Resources(reserved).popReservation(); - - result.subtract(reserved); - result += resources; - } - break; - } - - case Offer::Operation::CREATE: { - if (convertedResources.isSome()) { - return Error( - "Converted resources not expected for CREATE Operation"); - } - - Option error = validate(operation.create().volumes()); - if (error.isSome()) { - return Error("Invalid CREATE Operation: " + error->message); - } - - foreach (const Resource& volume, operation.create().volumes()) { - if (!volume.has_disk()) { - return Error("Invalid CREATE Operation: Missing 'disk'"); - } else if (!volume.disk().has_persistence()) { - return Error("Invalid CREATE Operation: Missing 'persistence'"); - } - - // Strip persistence and volume from the disk info so that we - // can subtract it from the original resources. - // TODO(jieyu): Non-persistent volumes are not supported for - // now. Persistent volumes can only be be created from regular - // disk resources. Revisit this once we start to support - // non-persistent volumes. - Resource stripped = volume; - - if (stripped.disk().has_source()) { - stripped.mutable_disk()->clear_persistence(); - stripped.mutable_disk()->clear_volume(); - } else { - stripped.clear_disk(); - } - - // Since we only allow persistent volumes to be shared, the - // original resource must be non-shared. - stripped.clear_shared(); - - if (!result.contains(stripped)) { - return Error("Invalid CREATE Operation: Insufficient disk resources" - " for persistent volume " + stringify(volume)); - } - - result.subtract(stripped); - result.add(volume); - } - break; - } - - case Offer::Operation::DESTROY: { - if (convertedResources.isSome()) { - return Error( - "Converted resources not expected for DESTROY Operation"); - } - - Option error = validate(operation.destroy().volumes()); - if (error.isSome()) { - return Error("Invalid DESTROY Operation: " + error->message); - } - - foreach (const Resource& volume, operation.destroy().volumes()) { - if (!volume.has_disk()) { - return Error("Invalid DESTROY Operation: Missing 'disk'"); - } else if (!volume.disk().has_persistence()) { - return Error("Invalid DESTROY Operation: Missing 'persistence'"); - } - - if (!result.contains(volume)) { - return Error( - "Invalid DESTROY Operation: Persistent volume does not exist"); - } - - result.subtract(volume); - - if (result.contains(volume)) { - return Error( - "Invalid DESTROY Operation: Persistent volume " + - stringify(volume) + " cannot be removed due to additional " + - "shared copies"); - } - - // Strip persistence and volume from the disk info so that we - // can subtract it from the original resources. - Resource stripped = volume; - - if (stripped.disk().has_source()) { - stripped.mutable_disk()->clear_persistence(); - stripped.mutable_disk()->clear_volume(); - } else { - stripped.clear_disk(); - } - - // Since we only allow persistent volumes to be shared, we - // return the resource to non-shared state after destroy. - stripped.clear_shared(); - - result.add(stripped); - } - break; - } - - case Offer::Operation::CREATE_VOLUME: { - if (convertedResources.isNone()) { - return Error( - "Converted resources not specified for CREATE_VOLUME Operation"); - } - - const Resource& consumed = operation.create_volume().source(); - - if (!result.contains(consumed)) { - return Error( - "Invalid CREATE_VOLUME Operation: " + stringify(result) + - " does not contain " + stringify(consumed)); - } - - result.subtract(consumed); - result += convertedResources.get(); - break; - } - - case Offer::Operation::DESTROY_VOLUME: { - if (convertedResources.isNone()) { - return Error( - "Converted resources not specified for DESTROY_VOLUME Operation"); - } - - const Resource& consumed = operation.destroy_volume().volume(); - - if (!result.contains(consumed)) { - return Error( - "Invalid DESTROY_VOLUME Operation: " + stringify(result) + - " does not contain " + stringify(consumed)); - } - - result.subtract(consumed); - result += convertedResources.get(); - break; - } - - case Offer::Operation::CREATE_BLOCK: { - if (convertedResources.isNone()) { - return Error( - "Converted resources not specified for CREATE_BLOCK Operation"); - } - - const Resource& consumed = operation.create_block().source(); - - if (!result.contains(consumed)) { - return Error( - "Invalid CREATE_BLOCK Operation: " + stringify(result) + - " does not contain " + stringify(consumed)); - } - - result.subtract(consumed); - result += convertedResources.get(); - break; - } - - case Offer::Operation::DESTROY_BLOCK: { - if (convertedResources.isNone()) { - return Error( - "Converted resources not specified for DESTROY_BLOCK Operation"); - } + return conversion.apply(*this); +} - const Resource& consumed = operation.destroy_block().block(); - if (!result.contains(consumed)) { - return Error( - "Invalid DESTROY_BLOCK Operation: " + stringify(result) + - " does not contain " + stringify(consumed)); - } +Try Resources::apply(const Offer::Operation& operation) const +{ + Try> conversions = + getResourceConversions(operation); - result.subtract(consumed); - result += convertedResources.get(); - break; - } + if (conversions.isError()) { + return Error("Cannot get conversions: " + conversions.error()); + } - case Offer::Operation::UNKNOWN: - return Error("Unknown offer operation"); + Try result = apply(conversions.get()); + if (result.isError()) { + return Error(result.error()); } - // The following are sanity checks to ensure the amount of each type of - // resource does not change. + // The following are sanity checks to ensure the amount of each type + // of resource does not change. // TODO(jieyu): Currently, we only check known resource types like // cpus, gpus, mem, disk, ports, etc. We should generalize this. - - CHECK(result.cpus() == cpus()); - CHECK(result.gpus() == gpus()); - CHECK(result.mem() == mem()); - CHECK(result.disk() == disk()); - CHECK(result.ports() == ports()); + CHECK(result->cpus() == cpus()); + CHECK(result->gpus() == gpus()); + CHECK(result->mem() == mem()); + CHECK(result->disk() == disk()); + CHECK(result->ports() == ports()); return result; } @@ -2525,4 +2283,28 @@ ostream& operator<<( return stream << JSON::protobuf(resources); } + +Try ResourceConversion::apply(const Resources& resources) const +{ + Resources result = resources; + + if (!result.contains(consumed)) { + return Error( + stringify(result) + " does not contain " + + stringify(consumed)); + } + + result -= consumed; + result += converted; + + if (postValidation.isSome()) { + Try validation = postValidation.get()(result); + if (validation.isError()) { + return Error(validation.error()); + } + } + + return result; +} + } // namespace mesos { diff --git a/src/common/resources_utils.cpp b/src/common/resources_utils.cpp index 8304da445eb..7c48704638d 100644 --- a/src/common/resources_utils.cpp +++ b/src/common/resources_utils.cpp @@ -19,6 +19,8 @@ #include "common/resources_utils.hpp" +using std::vector; + using google::protobuf::RepeatedPtrField; namespace mesos { @@ -91,6 +93,135 @@ Try applyCheckpointedResources( } +namespace internal { + +// NOTE: Use template here so that it works for both internal and v1. +template +Try> getResourceConversions( + const TOfferOperation& operation) +{ + vector conversions; + + switch (operation.type()) { + case TOfferOperation::UNKNOWN: + return Error("Unknown offer operation"); + + case TOfferOperation::LAUNCH: + case TOfferOperation::LAUNCH_GROUP: + case TOfferOperation::CREATE_VOLUME: + case TOfferOperation::DESTROY_VOLUME: + case TOfferOperation::CREATE_BLOCK: + case TOfferOperation::DESTROY_BLOCK: + return Error("Offer operation not supported"); + + case TOfferOperation::RESERVE: { + foreach (const TResource& reserved, operation.reserve().resources()) { + // Note that we only allow "pushing" a single reservation at time. + TResources consumed = TResources(reserved).popReservation(); + conversions.emplace_back(consumed, reserved); + } + break; + } + + case TOfferOperation::UNRESERVE: { + foreach (const TResource& reserved, operation.unreserve().resources()) { + // Note that we only allow "popping" a single reservation at time. + TResources converted = TResources(reserved).popReservation(); + conversions.emplace_back(reserved, converted); + } + break; + } + + case TOfferOperation::CREATE: { + foreach (const TResource& volume, operation.create().volumes()) { + // Strip persistence and volume from the disk info so that we + // can subtract it from the original resources. + // TODO(jieyu): Non-persistent volumes are not supported for + // now. Persistent volumes can only be be created from regular + // disk resources. Revisit this once we start to support + // non-persistent volumes. + TResource stripped = volume; + + if (stripped.disk().has_source()) { + stripped.mutable_disk()->clear_persistence(); + stripped.mutable_disk()->clear_volume(); + } else { + stripped.clear_disk(); + } + + // Since we only allow persistent volumes to be shared, the + // original resource must be non-shared. + stripped.clear_shared(); + + conversions.emplace_back(stripped, volume); + } + break; + } + + case TOfferOperation::DESTROY: { + foreach (const TResource& volume, operation.destroy().volumes()) { + // Strip persistence and volume from the disk info so that we + // can subtract it from the original resources. + TResource stripped = volume; + + if (stripped.disk().has_source()) { + stripped.mutable_disk()->clear_persistence(); + stripped.mutable_disk()->clear_volume(); + } else { + stripped.clear_disk(); + } + + // Since we only allow persistent volumes to be shared, we + // return the resource to non-shared state after destroy. + stripped.clear_shared(); + + conversions.emplace_back( + volume, + stripped, + [volume](const TResources& resources) -> Try { + if (resources.contains(volume)) { + return Error( + "Persistent volume " + stringify(volume) + " cannot be " + "removed due to additional shared copies"); + } + return Nothing(); + }); + } + break; + } + } + + return conversions; +} + +} // namespace internal { + + +Try> getResourceConversions( + const Offer::Operation& operation) +{ + return internal::getResourceConversions< + Resources, + Resource, + ResourceConversion, + Offer::Operation>(operation); +} + + +Try> getResourceConversions( + const v1::Offer::Operation& operation) +{ + return internal::getResourceConversions< + v1::Resources, + v1::Resource, + v1::ResourceConversion, + v1::Offer::Operation>(operation); +} + + Result getResourceProviderId( const Offer::Operation& operation) { diff --git a/src/common/resources_utils.hpp b/src/common/resources_utils.hpp index b8c29b33d69..5b74ff2dd3e 100644 --- a/src/common/resources_utils.hpp +++ b/src/common/resources_utils.hpp @@ -24,6 +24,9 @@ #include #include +#include +#include + #include #include #include @@ -55,6 +58,20 @@ Result getResourceProviderId( const Offer::Operation& operation); +// Returns the resource conversions from the given offer operation. +// This helper assumes that the given operation has already been +// validated. +Try> getResourceConversions( + const Offer::Operation& operation); + + +// Returns the resource conversions from the given offer operation. +// This helper assumes that the given operation has already been +// validated. +Try> getResourceConversions( + const v1::Offer::Operation& operation); + + // Resource format options to be used with the `convertResourceFormat` function. // // The preconditions of the options are asymmetric, centered around the diff --git a/src/v1/resources.cpp b/src/v1/resources.cpp index 58568b8d25c..43d9b0f48c3 100644 --- a/src/v1/resources.cpp +++ b/src/v1/resources.cpp @@ -39,6 +39,8 @@ #include #include +#include "common/resources_utils.hpp" + using std::map; using std::ostream; using std::set; @@ -1654,275 +1656,35 @@ Option Resources::find(const Resources& targets) const } -Try Resources::apply( - const Offer::Operation& operation, - const Option& convertedResources) const +Try Resources::apply(const ResourceConversion& conversion) const { - Resources result = *this; - - switch (operation.type()) { - case Offer::Operation::LAUNCH: - return Error("Cannot apply LAUNCH Operation"); - - case Offer::Operation::LAUNCH_GROUP: - return Error("Cannot apply LAUNCH_GROUP Operation"); - - case Offer::Operation::RESERVE: { - if (convertedResources.isSome()) { - return Error( - "Converted resources not expected for RESERVE Operation"); - } - - Option error = validate(operation.reserve().resources()); - if (error.isSome()) { - return Error("Invalid RESERVE Operation: " + error->message); - } - - foreach (const Resource& reserved, operation.reserve().resources()) { - if (!Resources::isReserved(reserved)) { - return Error("Invalid RESERVE Operation: Resource must be reserved"); - } else if (!Resources::isDynamicallyReserved(reserved)) { - return Error( - "Invalid RESERVE Operation: Resource must be" - " dynamically reserved"); - } - - Resources resources = Resources(reserved).popReservation(); - - if (!result.contains(resources)) { - return Error("Invalid RESERVE Operation: " + stringify(result) + - " does not contain " + stringify(resources)); - } - - result -= resources; - result.add(reserved); - } - break; - } - - case Offer::Operation::UNRESERVE: { - if (convertedResources.isSome()) { - return Error( - "Converted resources not expected for UNRESERVE Operation"); - } - - Option error = validate(operation.unreserve().resources()); - if (error.isSome()) { - return Error("Invalid UNRESERVE Operation: " + error->message); - } - - foreach (const Resource& reserved, operation.unreserve().resources()) { - if (!Resources::isReserved(reserved)) { - return Error("Invalid UNRESERVE Operation: Resource is not reserved"); - } else if (!Resources::isDynamicallyReserved(reserved)) { - return Error( - "Invalid UNRESERVE Operation: Resource is not" - " dynamically reserved"); - } - - if (!result.contains(reserved)) { - return Error("Invalid UNRESERVE Operation: " + stringify(result) + - " does not contain " + stringify(reserved)); - } - - Resources resources = Resources(reserved).popReservation(); - - result.subtract(reserved); - result += resources; - } - break; - } - - case Offer::Operation::CREATE: { - if (convertedResources.isSome()) { - return Error( - "Converted resources not expected for CREATE Operation"); - } - - Option error = validate(operation.create().volumes()); - if (error.isSome()) { - return Error("Invalid CREATE Operation: " + error->message); - } - - foreach (const Resource& volume, operation.create().volumes()) { - if (!volume.has_disk()) { - return Error("Invalid CREATE Operation: Missing 'disk'"); - } else if (!volume.disk().has_persistence()) { - return Error("Invalid CREATE Operation: Missing 'persistence'"); - } - - // Strip persistence and volume from the disk info so that we - // can subtract it from the original resources. - // TODO(jieyu): Non-persistent volumes are not supported for - // now. Persistent volumes can only be be created from regular - // disk resources. Revisit this once we start to support - // non-persistent volumes. - Resource stripped = volume; - - if (stripped.disk().has_source()) { - stripped.mutable_disk()->clear_persistence(); - stripped.mutable_disk()->clear_volume(); - } else { - stripped.clear_disk(); - } - - // Since we only allow persistent volumes to be shared, the - // original resource must be non-shared. - stripped.clear_shared(); - - if (!result.contains(stripped)) { - return Error("Invalid CREATE Operation: Insufficient disk resources" - " for persistent volume " + stringify(volume)); - } - - result.subtract(stripped); - result.add(volume); - } - break; - } - - case Offer::Operation::DESTROY: { - if (convertedResources.isSome()) { - return Error( - "Converted resources not expected for DESTROY Operation"); - } - - Option error = validate(operation.destroy().volumes()); - if (error.isSome()) { - return Error("Invalid DESTROY Operation: " + error->message); - } - - foreach (const Resource& volume, operation.destroy().volumes()) { - if (!volume.has_disk()) { - return Error("Invalid DESTROY Operation: Missing 'disk'"); - } else if (!volume.disk().has_persistence()) { - return Error("Invalid DESTROY Operation: Missing 'persistence'"); - } - - if (!result.contains(volume)) { - return Error( - "Invalid DESTROY Operation: Persistent volume does not exist"); - } - - result.subtract(volume); - - if (result.contains(volume)) { - return Error( - "Invalid DESTROY Operation: Persistent volume " + - stringify(volume) + " cannot be removed due to additional " + - "shared copies"); - } - - // Strip persistence and volume from the disk info so that we - // can subtract it from the original resources. - Resource stripped = volume; - - if (stripped.disk().has_source()) { - stripped.mutable_disk()->clear_persistence(); - stripped.mutable_disk()->clear_volume(); - } else { - stripped.clear_disk(); - } - - // Since we only allow persistent volumes to be shared, we - // return the resource to non-shared state after destroy. - stripped.clear_shared(); - - result.add(stripped); - } - break; - } - - case Offer::Operation::CREATE_VOLUME: { - if (convertedResources.isNone()) { - return Error( - "Converted resources not specified for CREATE_VOLUME Operation"); - } - - const Resource& consumed = operation.create_volume().source(); - - if (!result.contains(consumed)) { - return Error( - "Invalid CREATE_VOLUME Operation: " + stringify(result) + - " does not contain " + stringify(consumed)); - } - - result.subtract(consumed); - result += convertedResources.get(); - break; - } - - case Offer::Operation::DESTROY_VOLUME: { - if (convertedResources.isNone()) { - return Error( - "Converted resources not specified for DESTROY_VOLUME Operation"); - } - - const Resource& consumed = operation.destroy_volume().volume(); - - if (!result.contains(consumed)) { - return Error( - "Invalid DESTROY_VOLUME Operation: " + stringify(result) + - " does not contain " + stringify(consumed)); - } - - result.subtract(consumed); - result += convertedResources.get(); - break; - } - - case Offer::Operation::CREATE_BLOCK: { - if (convertedResources.isNone()) { - return Error( - "Converted resources not specified for CREATE_BLOCK Operation"); - } - - const Resource& consumed = operation.create_block().source(); - - if (!result.contains(consumed)) { - return Error( - "Invalid CREATE_BLOCK Operation: " + stringify(result) + - " does not contain " + stringify(consumed)); - } - - result.subtract(consumed); - result += convertedResources.get(); - break; - } - - case Offer::Operation::DESTROY_BLOCK: { - if (convertedResources.isNone()) { - return Error( - "Converted resources not specified for DESTROY_BLOCK Operation"); - } + return conversion.apply(*this); +} - const Resource& consumed = operation.destroy_block().block(); - if (!result.contains(consumed)) { - return Error( - "Invalid DESTROY_BLOCK Operation: " + stringify(result) + - " does not contain " + stringify(consumed)); - } +Try Resources::apply(const Offer::Operation& operation) const +{ + Try> conversions = + getResourceConversions(operation); - result.subtract(consumed); - result += convertedResources.get(); - break; - } + if (conversions.isError()) { + return Error("Cannot get conversions: " + conversions.error()); + } - case Offer::Operation::UNKNOWN: - return Error("Unknown offer operation"); + Try result = apply(conversions.get()); + if (result.isError()) { + return Error(result.error()); } - // The following are sanity checks to ensure the amount of each type of - // resource does not change. + // The following are sanity checks to ensure the amount of each type + // of resource does not change. // TODO(jieyu): Currently, we only check known resource types like // cpus, gpus, mem, disk, ports, etc. We should generalize this. - - CHECK(result.cpus() == cpus()); - CHECK(result.gpus() == gpus()); - CHECK(result.mem() == mem()); - CHECK(result.disk() == disk()); - CHECK(result.ports() == ports()); + CHECK(result->cpus() == cpus()); + CHECK(result->gpus() == gpus()); + CHECK(result->mem() == mem()); + CHECK(result->disk() == disk()); + CHECK(result->ports() == ports()); return result; } @@ -2554,5 +2316,29 @@ ostream& operator<<( return stream << JSON::protobuf(resources); } + +Try ResourceConversion::apply(const Resources& resources) const +{ + Resources result = resources; + + if (!result.contains(consumed)) { + return Error( + stringify(result) + " does not contain " + + stringify(consumed)); + } + + result -= consumed; + result += converted; + + if (postValidation.isSome()) { + Try validation = postValidation.get()(result); + if (validation.isError()) { + return Error(validation.error()); + } + } + + return result; +} + } // namespace v1 { } // namespace mesos {