Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion fabric_cf/actor/core/apis/abc_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,7 +172,7 @@ def get_reservations(self, *, slice_id: ID = None, graph_node_id: str = None, pr

@abstractmethod
def get_components(self, *, node_id: str, states: list[int], rsv_type: list[str], component: str = None,
bdf: str = None) -> Dict[str, List[str]]:
bdf: str = None, start: datetime = None, end: datetime = None) -> Dict[str, List[str]]:
"""
Retrieves the components.

Expand Down
14 changes: 8 additions & 6 deletions fabric_cf/actor/core/core/actor.py
Original file line number Diff line number Diff line change
Expand Up @@ -376,12 +376,14 @@ def recover(self):
client_slices = self.plugin.get_database().get_slices(slc_type=[SliceTypes.ClientSlice,
SliceTypes.BrokerClientSlice],
states=[SliceState.Configuring.value,
SliceState.Nascent.value,
SliceState.StableOK.value,
SliceState.StableError.value,
SliceState.Modifying.value,
SliceState.ModifyOK.value,
SliceState.ModifyError.value])
SliceState.Nascent.value,
SliceState.StableOK.value,
SliceState.StableError.value,
SliceState.Modifying.value,
SliceState.ModifyOK.value,
SliceState.ModifyError.value,
SliceState.AllocatedOK.value,
SliceState.AllocatedError.value])
self.logger.debug("Found {} client slices".format(len(client_slices)))
self.recover_slices(slices=client_slices)
self.logger.debug("Recovery of client slices complete")
Expand Down
5 changes: 5 additions & 0 deletions fabric_cf/actor/core/kernel/reservation_client.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
# Author: Komal Thareja (kthare10@renci.org)
from __future__ import annotations

import datetime
import json
import re
import threading
Expand Down Expand Up @@ -422,6 +423,10 @@ def approve_redeem(self):
@return true if approved; false otherwise
"""
approved = True
now = datetime.datetime.now(datetime.timezone.utc)
if self.requested_term and self.requested_term.get_start_time() > now:
self.logger.debug(f"Future Reservation : {self}!")
return False

for pred_state in self.redeem_predecessors.values():
if pred_state.get_reservation() is None or \
Expand Down
22 changes: 19 additions & 3 deletions fabric_cf/actor/core/kernel/slice.py
Original file line number Diff line number Diff line change
Expand Up @@ -289,7 +289,7 @@ def is_stable_error(self) -> bool:
def is_stable(self) -> bool:
state_changed, slice_state = self.transition_slice(operation=SliceStateMachine.REEVALUATE)

if slice_state == SliceState.StableError or slice_state == SliceState.StableOk:
if slice_state in [SliceState.StableError, SliceState.StableOK]:
return True

return False
Expand All @@ -313,15 +313,31 @@ def is_modify_error(self) -> bool:
def is_modified(self) -> bool:
state_changed, slice_state = self.transition_slice(operation=SliceStateMachine.REEVALUATE)

if slice_state == SliceState.ModifyError or slice_state == SliceState.ModifyOK:
if slice_state in [SliceState.ModifyError, SliceState.ModifyOK]:
return True

return False

def is_allocated_error(self) -> bool:
state_changed, slice_state = self.transition_slice(operation=SliceStateMachine.REEVALUATE)

if slice_state == SliceState.AllocatedError:
return True

return False

def is_allocated(self) -> bool:
state_changed, slice_state = self.transition_slice(operation=SliceStateMachine.REEVALUATE)

if slice_state in [SliceState.AllocatedOK, SliceState.AllocatedError]:
return True

return False

def is_dead_or_closing(self) -> bool:
state_changed, slice_state = self.transition_slice(operation=SliceStateMachine.REEVALUATE)

if slice_state == SliceState.Dead or slice_state == SliceState.Closing:
if slice_state in [SliceState.Dead, SliceState.Closing]:
return True

return False
Expand Down
82 changes: 72 additions & 10 deletions fabric_cf/actor/core/kernel/slice_state_machine.py
Original file line number Diff line number Diff line change
Expand Up @@ -44,6 +44,8 @@ class SliceState(Enum):
Modifying = enum.auto()
ModifyError = enum.auto()
ModifyOK = enum.auto()
AllocatedError = enum.auto()
AllocatedOK = enum.auto()
All = enum.auto() # used only for querying

def __str__(self):
Expand Down Expand Up @@ -102,6 +104,10 @@ def translate(state_name: str):
return SliceState.Closing
elif state_name.lower() == SliceState.Dead.name.lower():
return SliceState.Dead
elif state_name.lower() == SliceState.AllocatedOK.name.lower():
return SliceState.Closing
elif state_name.lower() == SliceState.AllocatedError.name.lower():
return SliceState.Dead
else:
return SliceState.All

Expand All @@ -117,6 +123,12 @@ def is_stable(*, state) -> bool:
return True
return False

@staticmethod
def is_allocated(*, state) -> bool:
if state == SliceState.AllocatedOK or state == SliceState.AllocatedError:
return True
return False

@staticmethod
def is_modified(*, state) -> bool:
if state == SliceState.ModifyOK or state == SliceState.ModifyError:
Expand Down Expand Up @@ -183,18 +195,20 @@ def has_state_other_than(self, *states) -> bool:
class SliceStateMachine:
CREATE = SliceOperation(SliceCommand.Create, SliceState.Nascent)

MODIFY = SliceOperation(SliceCommand.Modify, SliceState.StableOK, SliceState.StableError, SliceState.Configuring)
MODIFY = SliceOperation(SliceCommand.Modify, SliceState.StableOK, SliceState.StableError, SliceState.Configuring,
SliceState.AllocatedOK, SliceState.AllocatedError)

MODIFY_ACCEPT = SliceOperation(SliceCommand.ModifyAccept, SliceState.ModifyOK, SliceState.ModifyError,
SliceState.Modifying)
SliceState.Modifying, SliceState.AllocatedOK, SliceState.AllocatedError)

DELETE = SliceOperation(SliceCommand.Delete, SliceState.Nascent, SliceState.StableOK, SliceState.StableError,
SliceState.Configuring, SliceState.Modifying, SliceState.ModifyOK, SliceState.ModifyError,
SliceState.Dead)
SliceState.Dead, SliceState.AllocatedOK, SliceState.AllocatedError)

REEVALUATE = SliceOperation(SliceCommand.Reevaluate, SliceState.Nascent, SliceState.StableOK,
SliceState.StableError, SliceState.Configuring, SliceState.Dead, SliceState.Closing,
SliceState.Modifying, SliceState.ModifyError, SliceState.ModifyOK)
SliceState.Modifying, SliceState.ModifyError, SliceState.ModifyOK,
SliceState.AllocatedError, SliceState.AllocatedOK)

def __init__(self, *, slice_id: ID):
self.slice_guid = slice_id
Expand Down Expand Up @@ -238,9 +252,16 @@ def transition_slice(self, *, operation: SliceOperation, reservations: Reservati

elif operation.command == SliceCommand.ModifyAccept:
if self.state == SliceState.ModifyError:
self.state = SliceState.StableError
if self.last_state in [SliceState.AllocatedOK, SliceState.AllocatedError]:
self.state = SliceState.AllocatedError
else:
self.state = SliceState.StableError

elif self.state == SliceState.ModifyOK:
self.state = SliceState.StableOK
if self.last_state in [SliceState.AllocatedOK, SliceState.AllocatedError]:
self.state = SliceState.AllocatedOK
else:
self.state = SliceState.StableOK

elif operation.command == SliceCommand.Delete:
if self.state != SliceState.Dead:
Expand All @@ -266,7 +287,36 @@ def transition_slice(self, *, operation: SliceOperation, reservations: Reservati
if not has_error and r.get_error_message() is not None and len(r.get_error_message()) > 0:
has_error = True

if self.state == SliceState.Nascent or self.state == SliceState.Configuring:
if self.state in [SliceState.Nascent, SliceState.Configuring]:
if not bins.has_state_other_than(ReservationStates.Active, ReservationStates.Closed,
ReservationStates.CloseFail):
if not has_error:
self.state = SliceState.StableOK
else:
self.state = SliceState.StableError

if (not bins.has_state_other_than(ReservationStates.Active, ReservationStates.Failed,
ReservationStates.Closed, ReservationStates.CloseFail)) and \
bins.has_state(s=ReservationStates.Failed):
self.state = SliceState.StableError

if not bins.has_state_other_than(ReservationStates.Ticketed, ReservationStates.Closed,
ReservationStates.CloseFail):
if not has_error:
self.state = SliceState.AllocatedOK
else:
self.state = SliceState.AllocatedError

if (not bins.has_state_other_than(ReservationStates.Ticketed, ReservationStates.Failed,
ReservationStates.Closed, ReservationStates.CloseFail)) and \
bins.has_state(s=ReservationStates.Failed):
self.state = SliceState.AllocatedError

if not bins.has_state_other_than(ReservationStates.Closed, ReservationStates.CloseWait,
ReservationStates.Failed, ReservationStates.CloseFail):
self.state = SliceState.Closing

if self.state in [SliceState.AllocatedOK, SliceState.AllocatedError]:
if not bins.has_state_other_than(ReservationStates.Active, ReservationStates.Closed,
ReservationStates.CloseFail):
if not has_error:
Expand Down Expand Up @@ -296,12 +346,24 @@ def transition_slice(self, *, operation: SliceOperation, reservations: Reservati
bins.has_state(s=ReservationStates.Failed):
self.state = SliceState.ModifyError

if not bins.has_state_other_than(ReservationStates.Ticketed, ReservationStates.Closed,
ReservationStates.CloseFail):
if has_error:
self.state = SliceState.ModifyError
else:
self.state = SliceState.ModifyOK

if (not bins.has_state_other_than(ReservationStates.Ticketed, ReservationStates.Failed,
ReservationStates.Closed, ReservationStates.CloseFail)) and \
bins.has_state(s=ReservationStates.Failed):
self.state = SliceState.ModifyError

if not bins.has_state_other_than(ReservationStates.Closed, ReservationStates.CloseWait,
ReservationStates.Failed, ReservationStates.CloseFail):
self.state = SliceState.Closing

elif self.state == SliceState.StableError or self.state == SliceState.StableOK or \
self.state == SliceState.ModifyError or self.state == SliceState.ModifyOK:
elif self.state in [SliceState.StableError, SliceState.StableOK, SliceState.ModifyError,
SliceState.ModifyOK, SliceState.AllocatedError, SliceState.AllocatedOK]:
if not bins.has_state_other_than(ReservationStates.Closed, ReservationStates.CloseWait,
ReservationStates.Failed, ReservationStates.CloseFail):
self.state = SliceState.Dead
Expand All @@ -326,4 +388,4 @@ def get_state(self) -> SliceState:
return self.state

def clear(self):
self.state = SliceState.Nascent
self.state = SliceState.Nascent
10 changes: 2 additions & 8 deletions fabric_cf/actor/core/manage/actor_management_object.py
Original file line number Diff line number Diff line change
Expand Up @@ -30,8 +30,6 @@
from typing import TYPE_CHECKING, List, Dict, Tuple

from fabric_cf.actor.fim.fim_helper import FimHelper
from fabric_mb.message_bus.messages.poa_avro import PoaAvro
from fabric_mb.message_bus.messages.poa_info_avro import PoaInfoAvro
from fabric_mb.message_bus.messages.reservation_mng import ReservationMng
from fabric_mb.message_bus.messages.result_delegation_avro import ResultDelegationAvro
from fabric_mb.message_bus.messages.result_poa_avro import ResultPoaAvro
Expand All @@ -43,7 +41,6 @@
from fabric_mb.message_bus.messages.result_slice_avro import ResultSliceAvro
from fabric_mb.message_bus.messages.slice_avro import SliceAvro
from fim.user import GraphFormat
from fim.user.topology import AdvertizedTopology

from fabric_cf.actor.core.apis.abc_actor_runnable import ABCActorRunnable
from fabric_cf.actor.core.common.constants import Constants, ErrorCodes
Expand Down Expand Up @@ -96,7 +93,6 @@ def save(self) -> dict:
return properties

def recover(self):
actor_name = None
if Constants.PROPERTY_ACTOR_NAME in self.serial:
actor_name = self.serial[Constants.PROPERTY_ACTOR_NAME]
else:
Expand Down Expand Up @@ -135,7 +131,6 @@ def make_local_db_object(self, *, actor: ABCActorMixin):
def set_actor(self, *, actor: ABCActorMixin):
if self.actor is None:
self.actor = actor
#self.db = actor.get_plugin().get_database()
self.logger = actor.get_logger()
self.id = actor.get_guid()
self.make_local_db_object(actor=actor)
Expand Down Expand Up @@ -195,7 +190,7 @@ def add_slice(self, *, slice_obj: SliceAvro, caller: AuthToken) -> ResultStringA
slice_obj_new.set_graph_id(graph_id=slice_obj.graph_id)
slice_obj_new.set_config_properties(value=slice_obj.get_config_properties())
slice_obj_new.set_lease_end(lease_end=slice_obj.get_lease_end())
slice_obj_new.set_lease_start(lease_start=datetime.now(timezone.utc))
slice_obj_new.set_lease_start(lease_start=slice_obj.get_lease_start())

if slice_obj.get_inventory():
slice_obj_new.set_inventory(value=True)
Expand Down Expand Up @@ -870,8 +865,7 @@ def build_broker_query_model(self, level_0_broker_query_model: str, level: int,
start: datetime = None, end: datetime = None, includes: str = None,
excludes: str = None) -> str:
try:
db = self.actor.get_plugin().get_database()
return FimHelper.build_broker_query_model(db=db, level_0_broker_query_model=level_0_broker_query_model,
return FimHelper.build_broker_query_model(level_0_broker_query_model=level_0_broker_query_model,
level=level, graph_format=graph_format, start=start,
end=end, includes=includes, excludes=excludes)
except Exception as e:
Expand Down
2 changes: 1 addition & 1 deletion fabric_cf/actor/core/manage/local/local_controller.py
Original file line number Diff line number Diff line change
Expand Up @@ -243,4 +243,4 @@ def poa(self, *, poa: PoaAvro) -> bool:
except Exception as e:
self.on_exception(e=e, traceback_str=traceback.format_exc())

return False
return False
15 changes: 11 additions & 4 deletions fabric_cf/actor/core/plugins/db/actor_database.py
Original file line number Diff line number Diff line change
Expand Up @@ -262,6 +262,8 @@ def add_reservation(self, *, reservation: ABCReservationMixin):
if node_id and comp_id and bdf:
components.append((node_id, comp_id, bdf))

term = reservation.get_term()

self.db.add_reservation(slc_guid=str(reservation.get_slice_id()),
rsv_resid=str(reservation.get_reservation_id()),
rsv_category=reservation.get_category().value,
Expand All @@ -271,7 +273,9 @@ def add_reservation(self, *, reservation: ABCReservationMixin):
properties=properties,
rsv_graph_node_id=reservation.get_graph_node_id(),
oidc_claim_sub=oidc_claim_sub, email=email, site=site, rsv_type=rsv_type,
components=components)
components=components,
lease_start=term.get_start_time() if term else None,
lease_end=term.get_end_time() if term else None)
self.logger.debug(
"Reservation {} added to slice {}".format(reservation.get_reservation_id(), reservation.get_slice()))
finally:
Expand Down Expand Up @@ -308,6 +312,7 @@ def update_reservation(self, *, reservation: ABCReservationMixin):
if node_id and comp_id and bdf:
components.append((node_id, comp_id, bdf))

term = reservation.get_term()
begin = time.time()
properties = pickle.dumps(reservation)
diff = int(time.time() - begin)
Expand All @@ -322,7 +327,9 @@ def update_reservation(self, *, reservation: ABCReservationMixin):
rsv_joining=reservation.get_join_state().value,
properties=properties,
rsv_graph_node_id=reservation.get_graph_node_id(),
site=site, rsv_type=rsv_type, components=components)
site=site, rsv_type=rsv_type, components=components,
lease_start=term.get_start_time() if term else None,
lease_end=term.get_end_time() if term else None)
diff = int(time.time() - begin)
if diff > 0:
self.logger.info(f"DB TIME: {diff}")
Expand Down Expand Up @@ -459,10 +466,10 @@ def get_authority_reservations(self) -> List[ABCReservationMixin]:
return result

def get_components(self, *, node_id: str, states: list[int], rsv_type: list[str], component: str = None,
bdf: str = None) -> Dict[str, List[str]]:
bdf: str = None, start: datetime = None, end: datetime = None) -> Dict[str, List[str]]:
try:
return self.db.get_components(node_id=node_id, states=states, component=component, bdf=bdf,
rsv_type=rsv_type)
rsv_type=rsv_type, start=start, end=end)
except Exception as e:
self.logger.error(e)
finally:
Expand Down
Loading