From cb97fd67a1da8e89f58d6f9a09145d39e280b269 Mon Sep 17 00:00:00 2001 From: Komal Thareja Date: Tue, 19 Jul 2022 10:21:46 -0400 Subject: [PATCH] changes for improving create response time --- fabric_cf/__init__.py | 2 +- fabric_cf/actor/core/common/constants.py | 1 + .../actor/core/kernel/reservation_client.py | 13 +- .../orchestrator/core/orchestrator_handler.py | 31 ++- .../orchestrator/core/orchestrator_kernel.py | 38 +--- .../orchestrator/core/slice_demand_thread.py | 182 ++++++++++++++++++ 6 files changed, 216 insertions(+), 51 deletions(-) create mode 100644 fabric_cf/orchestrator/core/slice_demand_thread.py diff --git a/fabric_cf/__init__.py b/fabric_cf/__init__.py index 02357ebb..6ac9e20d 100644 --- a/fabric_cf/__init__.py +++ b/fabric_cf/__init__.py @@ -1 +1 @@ -__VERSION__ = "1.2.3" +__VERSION__ = "1.2.4" diff --git a/fabric_cf/actor/core/common/constants.py b/fabric_cf/actor/core/common/constants.py index 7ce6eae1..ff16b895 100644 --- a/fabric_cf/actor/core/common/constants.py +++ b/fabric_cf/actor/core/common/constants.py @@ -249,6 +249,7 @@ class Constants: CLAIMS_SUB = "sub" CLAIMS_EMAIL = "email" CLAIMS_PROJECTS = "projects" + PROJECT_ID = "project_id" PROPERTY_EXCEPTION_MESSAGE = "exception.message" PROPERTY_TARGET_NAME = "target.name" diff --git a/fabric_cf/actor/core/kernel/reservation_client.py b/fabric_cf/actor/core/kernel/reservation_client.py index 52ff8418..4bdd3c79 100644 --- a/fabric_cf/actor/core/kernel/reservation_client.py +++ b/fabric_cf/actor/core/kernel/reservation_client.py @@ -316,8 +316,10 @@ def accept_lease_update(self, *, incoming: ABCReservationMixin, update_data: Upd # Alternative: could transition to (state, None) to allow retry of the # redeem/extend by a higher level. if update_data.failed: - self.transition(prefix="failed lease update", state=ReservationStates.Failed, - pending=ReservationPendingStates.None_) + self.fail(message=f"failed lease update- {update_data.get_message()}", + sliver=incoming.get_resources().get_sliver()) + #self.transition(prefix="failed lease update", state=ReservationStates.Failed, + # pending=ReservationPendingStates.None_) else: try: self.lease_update_satisfies(incoming=incoming, update_data=update_data) @@ -1648,9 +1650,12 @@ def recover(self): elif self.state == ReservationStates.Failed: self.logger.warning("Reservation #{} has failed".format(self.get_reservation_id())) - def fail(self, *, message: str, exception: Exception = None): + def fail(self, *, message: str, exception: Exception = None, sliver: BaseSliver = None): super().fail(message=message, exception=exception) - if self.requested_resources is not None and self.requested_resources.sliver is not None: + if sliver is None and self.requested_resources is not None and self.requested_resources.sliver is not None: + sliver = self.requested_resources.sliver + + if sliver is not None: self.update_slice_graph(sliver=self.requested_resources.sliver) def update_slice_graph(self, *, sliver: BaseSliver): diff --git a/fabric_cf/orchestrator/core/orchestrator_handler.py b/fabric_cf/orchestrator/core/orchestrator_handler.py index 26fcc621..804954fa 100644 --- a/fabric_cf/orchestrator/core/orchestrator_handler.py +++ b/fabric_cf/orchestrator/core/orchestrator_handler.py @@ -235,7 +235,7 @@ def create_slice(self, *, token: str, slice_name: str, slice_graph: str, ssh_key slice_id = None controller = None - orchestrator_slice = None + new_slice_object = None asm_graph = None try: end_time = self.__validate_lease_end_time(lease_end_time=lease_end_time) @@ -274,7 +274,8 @@ def create_slice(self, *, token: str, slice_name: str, slice_graph: str, ssh_key slice_obj.set_client_slice(True) slice_obj.set_description("Description") slice_obj.graph_id = asm_graph.get_graph_id() - slice_obj.set_config_properties(value={Constants.USER_SSH_KEY: ssh_key}) + slice_obj.set_config_properties(value={Constants.USER_SSH_KEY: ssh_key, + Constants.PROJECT_ID: project}) slice_obj.set_lease_end(lease_end=end_time) auth = AuthAvro() auth.oidc_sub_claim = fabric_token.get_subject() @@ -292,25 +293,19 @@ def create_slice(self, *, token: str, slice_name: str, slice_graph: str, ssh_key self.logger.debug(f"Slice {slice_name}/{slice_id} added successfully") slice_obj.set_slice_id(slice_id=str(slice_id)) - orchestrator_slice = OrchestratorSliceWrapper(controller=controller, broker=broker, - slice_obj=slice_obj, logger=self.logger) + new_slice_object = OrchestratorSliceWrapper(controller=controller, broker=broker, + slice_obj=slice_obj, logger=self.logger) - orchestrator_slice.lock() + new_slice_object.lock() # Create Slivers from Slice Graph; Compute Reservations from Slivers; # Add Reservations to relational database; - computed_reservations = orchestrator_slice.create(slice_graph=asm_graph) + computed_reservations = new_slice_object.create(slice_graph=asm_graph) - # Process the Slice i.e. Demand the computed reservations i.e. Add them to the policy - # Once added to the policy; Actor Tick Handler will do following asynchronously: - # 1. Ticket message exchange with broker and - # 2. Redeem message exchange with AM once ticket is granted by Broker - self.controller_state.demand_slice(controller_slice=orchestrator_slice) - - for r in orchestrator_slice.computed_l3_reservations: - res_status_update = ReservationStatusUpdate(logger=self.logger) - self.controller_state.get_sut().add_active_status_watch(watch=ID(uid=r.get_reservation_id()), - callback=res_status_update) + # Enqueue the slice on the demand thread + # Demand thread is responsible for demanding the reservations + # Helps improve the create response time + self.controller_state.get_demand_thread().queue_slice(controller_slice=new_slice_object) return ResponseBuilder.get_reservation_summary(res_list=computed_reservations) except Exception as e: @@ -321,8 +316,8 @@ def create_slice(self, *, token: str, slice_name: str, slice_graph: str, ssh_key self.logger.error(f"Exception occurred processing create_slice e: {e}") raise e finally: - if orchestrator_slice is not None: - orchestrator_slice.unlock() + if new_slice_object is not None: + new_slice_object.unlock() def get_slivers(self, *, token: str, slice_id: str, sliver_id: str = None, include_notices: bool = True) -> dict: """ diff --git a/fabric_cf/orchestrator/core/orchestrator_kernel.py b/fabric_cf/orchestrator/core/orchestrator_kernel.py index 8e65f01b..f85e1dd1 100644 --- a/fabric_cf/orchestrator/core/orchestrator_kernel.py +++ b/fabric_cf/orchestrator/core/orchestrator_kernel.py @@ -24,19 +24,17 @@ # # Author: Komal Thareja (kthare10@renci.org) import threading -import traceback from fim.user import GraphFormat from fabric_cf.actor.core.apis.abc_mgmt_controller_mixin import ABCMgmtControllerMixin from fabric_cf.actor.core.common.constants import Constants -from fabric_cf.actor.core.kernel.reservation_states import ReservationStates from fabric_cf.actor.core.manage.management_utils import ManagementUtils from fabric_cf.actor.core.util.id import ID from fabric_cf.orchestrator.core.bqm_wrapper import BqmWrapper from fabric_cf.orchestrator.core.exceptions import OrchestratorException -from fabric_cf.orchestrator.core.orchestrator_slice_wrapper import OrchestratorSliceWrapper from fabric_cf.orchestrator.core.reservation_status_update_thread import ReservationStatusUpdateThread +from fabric_cf.orchestrator.core.slice_demand_thread import SliceDemandThread class OrchestratorKernel: @@ -46,6 +44,7 @@ class OrchestratorKernel: def __init__(self): self.lock = threading.Lock() + self.demand_thread = None self.sut = None self.broker = None self.logger = None @@ -79,31 +78,6 @@ def save_bqm(self, *, bqm: str, graph_format: GraphFormat): finally: self.lock.release() - def demand_slice(self, *, controller_slice: OrchestratorSliceWrapper): - """ - Demand slice reservations. - :param controller_slice: - """ - computed_reservations = controller_slice.get_computed_reservations() - - try: - self.lock.acquire() - for reservation in computed_reservations: - self.get_logger().debug(f"Issuing demand for reservation: {reservation.get_reservation_id()}") - - if reservation.get_state() != ReservationStates.Unknown.value: - self.get_logger().debug(f"Reservation not in {reservation.get_state()} state, ignoring it") - continue - - if not self.controller.demand_reservation(reservation=reservation): - raise OrchestratorException(f"Could not demand resources: {self.controller.get_last_error()}") - self.get_logger().debug(f"Reservation #{reservation.get_reservation_id()} demanded successfully") - except Exception as e: - self.get_logger().error(traceback.format_exc()) - self.get_logger().error("Unable to get orchestrator or demand reservation: {}".format(e)) - finally: - self.lock.release() - def set_broker(self, *, broker: ID): """ Set Broker @@ -119,6 +93,9 @@ def get_broker(self) -> ID: """ return self.broker + def get_demand_thread(self) -> SliceDemandThread: + return self.demand_thread + def get_sut(self) -> ReservationStatusUpdateThread: """ Get SUT thread @@ -150,6 +127,8 @@ def stop_threads(self): Stop threads :return: """ + if self.demand_thread is not None: + self.demand_thread.stop() if self.sut is not None: self.sut.stop() @@ -158,6 +137,9 @@ def start_threads(self): Start threads :return: """ + self.get_logger().debug("Starting SliceDemandThread") + self.demand_thread = SliceDemandThread(kernel=self) + self.demand_thread.start() self.get_logger().debug("Starting ReservationStatusUpdateThread") self.sut = ReservationStatusUpdateThread() self.sut.start() diff --git a/fabric_cf/orchestrator/core/slice_demand_thread.py b/fabric_cf/orchestrator/core/slice_demand_thread.py new file mode 100644 index 00000000..f35846dc --- /dev/null +++ b/fabric_cf/orchestrator/core/slice_demand_thread.py @@ -0,0 +1,182 @@ +#!/usr/bin/env python3 +# MIT License +# +# Copyright (c) 2020 FABRIC Testbed +# +# Permission is hereby granted, free of charge, to any person obtaining a copy +# of this software and associated documentation files (the "Software"), to deal +# in the Software without restriction, including without limitation the rights +# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell +# copies of the Software, and to permit persons to whom the Software is +# furnished to do so, subject to the following conditions: +# +# The above copyright notice and this permission notice shall be included in all +# copies or substantial portions of the Software. +# +# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR +# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY, +# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE +# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER +# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM, +# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE +# SOFTWARE. +# +# +# Author: Komal Thareja (kthare10@renci.org) +import queue +import threading +import traceback + +from fabric_cf.actor.core.kernel.reservation_states import ReservationStates +from fabric_cf.actor.core.util.id import ID +from fabric_cf.actor.core.util.iterable_queue import IterableQueue +from fabric_cf.orchestrator.core.exceptions import OrchestratorException +from fabric_cf.orchestrator.core.orchestrator_slice_wrapper import OrchestratorSliceWrapper +from fabric_cf.orchestrator.core.reservation_status_update import ReservationStatusUpdate + + +class SliceDemandThread: + """ + This runs as a standalone thread started by Orchestrator and deals with issuing demand for the slivers for + the newly created slices. The purpose of this thread is to help orchestrator respond back to the create + without waiting for the slivers to be demanded + """ + + def __init__(self, *, kernel): + self.slice_queue = queue.Queue() + self.slice_avail_condition = threading.Condition() + self.thread_lock = threading.Lock() + self.thread = None + self.stopped = False + from fabric_cf.actor.core.container.globals import GlobalsSingleton + self.logger = GlobalsSingleton.get().get_logger() + self.mgmt_actor = kernel.get_management_actor() + self.sut = kernel.get_sut() + + def queue_slice(self, *, controller_slice: OrchestratorSliceWrapper): + """ + Queue a slice + :param controller_slice: + :return: + """ + with self.slice_avail_condition: + self.slice_queue.put_nowait(controller_slice) + self.logger.debug(f"Added slice to slices queue {controller_slice.get_slice_id()}") + self.slice_avail_condition.notify_all() + + def start(self): + """ + Start thread + :return: + """ + try: + self.thread_lock.acquire() + if self.thread is not None: + raise OrchestratorException("This SliceDemandThread has already been started") + + self.thread = threading.Thread(target=self.run) + self.thread.setName(self.__class__.__name__) + self.thread.setDaemon(True) + self.thread.start() + + finally: + self.thread_lock.release() + + def stop(self): + """ + Stop thread + :return: + """ + self.stopped = True + try: + self.thread_lock.acquire() + temp = self.thread + self.thread = None + if temp is not None: + self.logger.warning("It seems that the SliceDemandThread is running. Interrupting it") + try: + # TODO find equivalent of interrupt + with self.slice_avail_condition: + self.slice_avail_condition.notify_all() + temp.join() + except Exception as e: + self.logger.error(f"Could not join SliceDemandThread thread {e}") + finally: + self.thread_lock.release() + finally: + if self.thread_lock is not None and self.thread_lock.locked(): + self.thread_lock.release() + + def run(self): + """ + Thread main loop + :return: + """ + self.logger.debug("SliceDemandThread started") + while True: + slices = [] + with self.slice_avail_condition: + + while self.slice_queue.empty() and not self.stopped: + try: + self.slice_avail_condition.wait() + except InterruptedError as e: + self.logger.info("Slice Demand thread interrupted. Exiting") + return + + if self.stopped: + self.logger.info("Slice Demand Thread exiting") + return + + if not self.slice_queue.empty(): + try: + for s in IterableQueue(source_queue=self.slice_queue): + slices.append(s) + except Exception as e: + self.logger.error(f"Error while adding slice to slice queue! e: {e}") + self.logger.error(traceback.format_exc()) + + self.slice_avail_condition.notify_all() + + if len(slices) > 0: + self.logger.debug(f"Processing {len(slices)} slices") + for s in slices: + try: + # Process the Slice i.e. Demand the computed reservations i.e. Add them to the policy + # Once added to the policy; Actor Tick Handler will do following asynchronously: + # 1. Ticket message exchange with broker and + # 2. Redeem message exchange with AM once ticket is granted by Broker + self.demand_slice(controller_slice=s) + except Exception as e: + self.logger.error(f"Error while processing slice {type(s)}, {e}") + self.logger.error(traceback.format_exc()) + + def demand_slice(self, *, controller_slice: OrchestratorSliceWrapper): + """ + Demand slice reservations. + :param controller_slice: + """ + computed_reservations = controller_slice.get_computed_reservations() + + try: + controller_slice.lock() + for reservation in computed_reservations: + self.logger.debug(f"Issuing demand for reservation: {reservation.get_reservation_id()}") + + if reservation.get_state() != ReservationStates.Unknown.value: + self.logger.debug(f"Reservation not in {reservation.get_state()} state, ignoring it") + continue + + if not self.mgmt_actor.demand_reservation(reservation=reservation): + raise OrchestratorException(f"Could not demand resources: {self.mgmt_actor.get_last_error()}") + self.logger.debug(f"Reservation #{reservation.get_reservation_id()} demanded successfully") + + for r in controller_slice.computed_l3_reservations: + res_status_update = ReservationStatusUpdate(logger=self.logger) + self.sut.add_active_status_watch(watch=ID(uid=r.get_reservation_id()), + callback=res_status_update) + except Exception as e: + self.logger.error(traceback.format_exc()) + self.logger.error("Unable to get orchestrator or demand reservation: {}".format(e)) + finally: + controller_slice.unlock()