Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion fabric_cf/__init__.py
Original file line number Diff line number Diff line change
@@ -1 +1 @@
__VERSION__ = "1.2.3"
__VERSION__ = "1.2.4"
1 change: 1 addition & 0 deletions fabric_cf/actor/core/common/constants.py
Original file line number Diff line number Diff line change
Expand Up @@ -249,6 +249,7 @@ class Constants:
CLAIMS_SUB = "sub"
CLAIMS_EMAIL = "email"
CLAIMS_PROJECTS = "projects"
PROJECT_ID = "project_id"

PROPERTY_EXCEPTION_MESSAGE = "exception.message"
PROPERTY_TARGET_NAME = "target.name"
Expand Down
13 changes: 9 additions & 4 deletions fabric_cf/actor/core/kernel/reservation_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -316,8 +316,10 @@ def accept_lease_update(self, *, incoming: ABCReservationMixin, update_data: Upd
# Alternative: could transition to (state, None) to allow retry of the
# redeem/extend by a higher level.
if update_data.failed:
self.transition(prefix="failed lease update", state=ReservationStates.Failed,
pending=ReservationPendingStates.None_)
self.fail(message=f"failed lease update- {update_data.get_message()}",
sliver=incoming.get_resources().get_sliver())
#self.transition(prefix="failed lease update", state=ReservationStates.Failed,
# pending=ReservationPendingStates.None_)
else:
try:
self.lease_update_satisfies(incoming=incoming, update_data=update_data)
Expand Down Expand Up @@ -1648,9 +1650,12 @@ def recover(self):
elif self.state == ReservationStates.Failed:
self.logger.warning("Reservation #{} has failed".format(self.get_reservation_id()))

def fail(self, *, message: str, exception: Exception = None):
def fail(self, *, message: str, exception: Exception = None, sliver: BaseSliver = None):
super().fail(message=message, exception=exception)
if self.requested_resources is not None and self.requested_resources.sliver is not None:
if sliver is None and self.requested_resources is not None and self.requested_resources.sliver is not None:
sliver = self.requested_resources.sliver

if sliver is not None:
self.update_slice_graph(sliver=self.requested_resources.sliver)

def update_slice_graph(self, *, sliver: BaseSliver):
Expand Down
31 changes: 13 additions & 18 deletions fabric_cf/orchestrator/core/orchestrator_handler.py
Original file line number Diff line number Diff line change
Expand Up @@ -235,7 +235,7 @@ def create_slice(self, *, token: str, slice_name: str, slice_graph: str, ssh_key

slice_id = None
controller = None
orchestrator_slice = None
new_slice_object = None
asm_graph = None
try:
end_time = self.__validate_lease_end_time(lease_end_time=lease_end_time)
Expand Down Expand Up @@ -274,7 +274,8 @@ def create_slice(self, *, token: str, slice_name: str, slice_graph: str, ssh_key
slice_obj.set_client_slice(True)
slice_obj.set_description("Description")
slice_obj.graph_id = asm_graph.get_graph_id()
slice_obj.set_config_properties(value={Constants.USER_SSH_KEY: ssh_key})
slice_obj.set_config_properties(value={Constants.USER_SSH_KEY: ssh_key,
Constants.PROJECT_ID: project})
slice_obj.set_lease_end(lease_end=end_time)
auth = AuthAvro()
auth.oidc_sub_claim = fabric_token.get_subject()
Expand All @@ -292,25 +293,19 @@ def create_slice(self, *, token: str, slice_name: str, slice_graph: str, ssh_key
self.logger.debug(f"Slice {slice_name}/{slice_id} added successfully")

slice_obj.set_slice_id(slice_id=str(slice_id))
orchestrator_slice = OrchestratorSliceWrapper(controller=controller, broker=broker,
slice_obj=slice_obj, logger=self.logger)
new_slice_object = OrchestratorSliceWrapper(controller=controller, broker=broker,
slice_obj=slice_obj, logger=self.logger)

orchestrator_slice.lock()
new_slice_object.lock()

# Create Slivers from Slice Graph; Compute Reservations from Slivers;
# Add Reservations to relational database;
computed_reservations = orchestrator_slice.create(slice_graph=asm_graph)
computed_reservations = new_slice_object.create(slice_graph=asm_graph)

# Process the Slice i.e. Demand the computed reservations i.e. Add them to the policy
# Once added to the policy; Actor Tick Handler will do following asynchronously:
# 1. Ticket message exchange with broker and
# 2. Redeem message exchange with AM once ticket is granted by Broker
self.controller_state.demand_slice(controller_slice=orchestrator_slice)

for r in orchestrator_slice.computed_l3_reservations:
res_status_update = ReservationStatusUpdate(logger=self.logger)
self.controller_state.get_sut().add_active_status_watch(watch=ID(uid=r.get_reservation_id()),
callback=res_status_update)
# Enqueue the slice on the demand thread
# Demand thread is responsible for demanding the reservations
# Helps improve the create response time
self.controller_state.get_demand_thread().queue_slice(controller_slice=new_slice_object)

return ResponseBuilder.get_reservation_summary(res_list=computed_reservations)
except Exception as e:
Expand All @@ -321,8 +316,8 @@ def create_slice(self, *, token: str, slice_name: str, slice_graph: str, ssh_key
self.logger.error(f"Exception occurred processing create_slice e: {e}")
raise e
finally:
if orchestrator_slice is not None:
orchestrator_slice.unlock()
if new_slice_object is not None:
new_slice_object.unlock()

def get_slivers(self, *, token: str, slice_id: str, sliver_id: str = None, include_notices: bool = True) -> dict:
"""
Expand Down
38 changes: 10 additions & 28 deletions fabric_cf/orchestrator/core/orchestrator_kernel.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,17 @@
#
# Author: Komal Thareja (kthare10@renci.org)
import threading
import traceback

from fim.user import GraphFormat

from fabric_cf.actor.core.apis.abc_mgmt_controller_mixin import ABCMgmtControllerMixin
from fabric_cf.actor.core.common.constants import Constants
from fabric_cf.actor.core.kernel.reservation_states import ReservationStates
from fabric_cf.actor.core.manage.management_utils import ManagementUtils
from fabric_cf.actor.core.util.id import ID
from fabric_cf.orchestrator.core.bqm_wrapper import BqmWrapper
from fabric_cf.orchestrator.core.exceptions import OrchestratorException
from fabric_cf.orchestrator.core.orchestrator_slice_wrapper import OrchestratorSliceWrapper
from fabric_cf.orchestrator.core.reservation_status_update_thread import ReservationStatusUpdateThread
from fabric_cf.orchestrator.core.slice_demand_thread import SliceDemandThread


class OrchestratorKernel:
Expand All @@ -46,6 +44,7 @@ class OrchestratorKernel:

def __init__(self):
self.lock = threading.Lock()
self.demand_thread = None
self.sut = None
self.broker = None
self.logger = None
Expand Down Expand Up @@ -79,31 +78,6 @@ def save_bqm(self, *, bqm: str, graph_format: GraphFormat):
finally:
self.lock.release()

def demand_slice(self, *, controller_slice: OrchestratorSliceWrapper):
"""
Demand slice reservations.
:param controller_slice:
"""
computed_reservations = controller_slice.get_computed_reservations()

try:
self.lock.acquire()
for reservation in computed_reservations:
self.get_logger().debug(f"Issuing demand for reservation: {reservation.get_reservation_id()}")

if reservation.get_state() != ReservationStates.Unknown.value:
self.get_logger().debug(f"Reservation not in {reservation.get_state()} state, ignoring it")
continue

if not self.controller.demand_reservation(reservation=reservation):
raise OrchestratorException(f"Could not demand resources: {self.controller.get_last_error()}")
self.get_logger().debug(f"Reservation #{reservation.get_reservation_id()} demanded successfully")
except Exception as e:
self.get_logger().error(traceback.format_exc())
self.get_logger().error("Unable to get orchestrator or demand reservation: {}".format(e))
finally:
self.lock.release()

def set_broker(self, *, broker: ID):
"""
Set Broker
Expand All @@ -119,6 +93,9 @@ def get_broker(self) -> ID:
"""
return self.broker

def get_demand_thread(self) -> SliceDemandThread:
return self.demand_thread

def get_sut(self) -> ReservationStatusUpdateThread:
"""
Get SUT thread
Expand Down Expand Up @@ -150,6 +127,8 @@ def stop_threads(self):
Stop threads
:return:
"""
if self.demand_thread is not None:
self.demand_thread.stop()
if self.sut is not None:
self.sut.stop()

Expand All @@ -158,6 +137,9 @@ def start_threads(self):
Start threads
:return:
"""
self.get_logger().debug("Starting SliceDemandThread")
self.demand_thread = SliceDemandThread(kernel=self)
self.demand_thread.start()
self.get_logger().debug("Starting ReservationStatusUpdateThread")
self.sut = ReservationStatusUpdateThread()
self.sut.start()
Expand Down
182 changes: 182 additions & 0 deletions fabric_cf/orchestrator/core/slice_demand_thread.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,182 @@
#!/usr/bin/env python3
# MIT License
#
# Copyright (c) 2020 FABRIC Testbed
#
# Permission is hereby granted, free of charge, to any person obtaining a copy
# of this software and associated documentation files (the "Software"), to deal
# in the Software without restriction, including without limitation the rights
# to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
# copies of the Software, and to permit persons to whom the Software is
# furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in all
# copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
# AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
# OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
# SOFTWARE.
#
#
# Author: Komal Thareja (kthare10@renci.org)
import queue
import threading
import traceback

from fabric_cf.actor.core.kernel.reservation_states import ReservationStates
from fabric_cf.actor.core.util.id import ID
from fabric_cf.actor.core.util.iterable_queue import IterableQueue
from fabric_cf.orchestrator.core.exceptions import OrchestratorException
from fabric_cf.orchestrator.core.orchestrator_slice_wrapper import OrchestratorSliceWrapper
from fabric_cf.orchestrator.core.reservation_status_update import ReservationStatusUpdate


class SliceDemandThread:
"""
This runs as a standalone thread started by Orchestrator and deals with issuing demand for the slivers for
the newly created slices. The purpose of this thread is to help orchestrator respond back to the create
without waiting for the slivers to be demanded
"""

def __init__(self, *, kernel):
self.slice_queue = queue.Queue()
self.slice_avail_condition = threading.Condition()
self.thread_lock = threading.Lock()
self.thread = None
self.stopped = False
from fabric_cf.actor.core.container.globals import GlobalsSingleton
self.logger = GlobalsSingleton.get().get_logger()
self.mgmt_actor = kernel.get_management_actor()
self.sut = kernel.get_sut()

def queue_slice(self, *, controller_slice: OrchestratorSliceWrapper):
"""
Queue a slice
:param controller_slice:
:return:
"""
with self.slice_avail_condition:
self.slice_queue.put_nowait(controller_slice)
self.logger.debug(f"Added slice to slices queue {controller_slice.get_slice_id()}")
self.slice_avail_condition.notify_all()

def start(self):
"""
Start thread
:return:
"""
try:
self.thread_lock.acquire()
if self.thread is not None:
raise OrchestratorException("This SliceDemandThread has already been started")

self.thread = threading.Thread(target=self.run)
self.thread.setName(self.__class__.__name__)
self.thread.setDaemon(True)
self.thread.start()

finally:
self.thread_lock.release()

def stop(self):
"""
Stop thread
:return:
"""
self.stopped = True
try:
self.thread_lock.acquire()
temp = self.thread
self.thread = None
if temp is not None:
self.logger.warning("It seems that the SliceDemandThread is running. Interrupting it")
try:
# TODO find equivalent of interrupt
with self.slice_avail_condition:
self.slice_avail_condition.notify_all()
temp.join()
except Exception as e:
self.logger.error(f"Could not join SliceDemandThread thread {e}")
finally:
self.thread_lock.release()
finally:
if self.thread_lock is not None and self.thread_lock.locked():
self.thread_lock.release()

def run(self):
"""
Thread main loop
:return:
"""
self.logger.debug("SliceDemandThread started")
while True:
slices = []
with self.slice_avail_condition:

while self.slice_queue.empty() and not self.stopped:
try:
self.slice_avail_condition.wait()
except InterruptedError as e:
self.logger.info("Slice Demand thread interrupted. Exiting")
return

if self.stopped:
self.logger.info("Slice Demand Thread exiting")
return

if not self.slice_queue.empty():
try:
for s in IterableQueue(source_queue=self.slice_queue):
slices.append(s)
except Exception as e:
self.logger.error(f"Error while adding slice to slice queue! e: {e}")
self.logger.error(traceback.format_exc())

self.slice_avail_condition.notify_all()

if len(slices) > 0:
self.logger.debug(f"Processing {len(slices)} slices")
for s in slices:
try:
# Process the Slice i.e. Demand the computed reservations i.e. Add them to the policy
# Once added to the policy; Actor Tick Handler will do following asynchronously:
# 1. Ticket message exchange with broker and
# 2. Redeem message exchange with AM once ticket is granted by Broker
self.demand_slice(controller_slice=s)
except Exception as e:
self.logger.error(f"Error while processing slice {type(s)}, {e}")
self.logger.error(traceback.format_exc())

def demand_slice(self, *, controller_slice: OrchestratorSliceWrapper):
"""
Demand slice reservations.
:param controller_slice:
"""
computed_reservations = controller_slice.get_computed_reservations()

try:
controller_slice.lock()
for reservation in computed_reservations:
self.logger.debug(f"Issuing demand for reservation: {reservation.get_reservation_id()}")

if reservation.get_state() != ReservationStates.Unknown.value:
self.logger.debug(f"Reservation not in {reservation.get_state()} state, ignoring it")
continue

if not self.mgmt_actor.demand_reservation(reservation=reservation):
raise OrchestratorException(f"Could not demand resources: {self.mgmt_actor.get_last_error()}")
self.logger.debug(f"Reservation #{reservation.get_reservation_id()} demanded successfully")

for r in controller_slice.computed_l3_reservations:
res_status_update = ReservationStatusUpdate(logger=self.logger)
self.sut.add_active_status_watch(watch=ID(uid=r.get_reservation_id()),
callback=res_status_update)
except Exception as e:
self.logger.error(traceback.format_exc())
self.logger.error("Unable to get orchestrator or demand reservation: {}".format(e))
finally:
controller_slice.unlock()