Skip to content

Commit

Permalink
Merge branch 'sp/#204-EmAgent' into sp/#000-em-combined
Browse files Browse the repository at this point in the history
# Conflicts:
#	src/main/scala/edu/ie3/simona/ontology/messages/SchedulerMessage.scala
#	src/main/scala/edu/ie3/simona/scheduler/SchedulerHelper.scala
  • Loading branch information
sebastian-peter committed Oct 23, 2022
2 parents 4b8e073 + fe9514e commit 507748f
Show file tree
Hide file tree
Showing 3 changed files with 69 additions and 19 deletions.
49 changes: 32 additions & 17 deletions src/main/scala/edu/ie3/simona/agent/participant/em/EmAgent.scala
Original file line number Diff line number Diff line change
Expand Up @@ -278,13 +278,13 @@ class EmAgent(
),
baseStateData: EmModelBaseStateData
) =>
createNextTriggerIfApplicable(
val maybeNextTrigger = createNextTriggerIfApplicable(
baseStateData.schedulerStateData,
scheduleTriggerMessage.trigger.tick
) foreach { stm =>
// since we've been sent a trigger, we need to complete it as well
scheduler ! CompletionMessage(triggerId, Some(Seq(stm)))
}
)

// since we've been sent a trigger, we need to complete it as well
scheduler ! CompletionMessage(triggerId, maybeNextTrigger.map(Seq(_)))

stay() using
baseStateData.copy(schedulerStateData =
Expand Down Expand Up @@ -332,14 +332,17 @@ class EmAgent(
// here, participants that are changing their flex options at the current
// tick are activated and are sent flex options requests

// FIXME this is only necessary until we revoke triggers with main scheduler
if (baseStateData.schedulerStateData.mainTriggerId.nonEmpty) {
log.error(
s"EmAgent $self is already active at $newTick with ${baseStateData.schedulerStateData.mainTriggerId} (new triggerId $triggerId)"
)

scheduler ! CompletionMessage(triggerId, None)
stay() using baseStateData

} else {

// schedule flex options request for those agents that need to be activated at the very next tick
// schedule flex options request for those agents that need to be activated at the next activated tick
val schedulerDataWithNext =
scheduleFlexRequestAtNextTick(
baseStateData.schedulerStateData,
Expand Down Expand Up @@ -400,10 +403,9 @@ class EmAgent(
schedulerDataWithNext.copy(flexTrigger = updatedFlexTrigger)
)

// if we don't have anything to do, complete right away
// FIXME this is only necessary until we revoke triggers with main scheduler
// we should not get triggered without any scheduled triggers for the new tick
if (expectedRequests.isEmpty)
maybeTicksCompleted(baseStateData.schedulerStateData)
log.error(s"No requests for $self at $newTick")

// send out all ActivityStartTriggers and RequestFlexOptions
goto(Idle) using setActiveTickAndSendTriggers(
Expand Down Expand Up @@ -526,23 +528,36 @@ class EmAgent(
schedulerStateData: EmSchedulerStateData,
newTick: Long
): Option[ScheduleTriggerMessage] = {
// FIXME it'd be better if we also revoked the former next tick, because that one could also be revoked before it is reached

val isCurrentlyInactive = schedulerStateData.mainTriggerId.isEmpty

// this defaults to true if no next tick is scheduled
val scheduleNextTrigger = getNextScheduledTick(
val maybeNextScheduledTick = getNextScheduledTick(
schedulerStateData
).forall { nextScheduledTick =>
)

// only revoke next scheduled tick if it exists and is later than new tick
val maybeTickToRevoke = maybeNextScheduledTick.filter { nextScheduledTick =>
newTick < nextScheduledTick
}

Option.when(isCurrentlyInactive && scheduleNextTrigger) {
// schedule new tick if we're inactive and
// - there is no scheduled next tick or
// - the new tick is earlier than the scheduled next tick
val scheduleNewTick =
isCurrentlyInactive && (maybeNextScheduledTick.isEmpty || maybeTickToRevoke.nonEmpty)

Option.when(scheduleNewTick) {
val maybeRevokeTrigger =
maybeTickToRevoke.map(revokeTick =>
(ActivityStartTrigger(revokeTick), self)
)

ScheduleTriggerMessage(
ActivityStartTrigger(newTick),
self
self,
maybeRevokeTrigger
)
}

}

private def maybeIssueFlexControl(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -40,10 +40,13 @@ object SchedulerMessage {
* to schedule
* @param actorToBeScheduled
* the agent that should receive the trigger
* @param revokeTrigger
* Trigger to be revoked for given actor, if applicable
*/
final case class ScheduleTriggerMessage(
trigger: Trigger,
actorToBeScheduled: ActorRef,
revokeTrigger: Option[(Trigger, ActorRef)] = None,
priority: Boolean = false
) extends SchedulerMessage
with Trigger {
Expand Down
36 changes: 34 additions & 2 deletions src/main/scala/edu/ie3/simona/scheduler/SchedulerHelper.scala
Original file line number Diff line number Diff line change
Expand Up @@ -804,13 +804,20 @@ trait SchedulerHelper extends SimonaActorLogging {
protected final def scheduleTrigger(
triggerMessage: ScheduleTriggerMessage,
stateData: SchedulerStateData
): SchedulerStateData =
): SchedulerStateData = {
val updatedStateData = triggerMessage.revokeTrigger
.map { case (trigger, actor) =>
revokeTrigger(trigger, actor, stateData)
}
.getOrElse(stateData)

scheduleTrigger(
triggerMessage.trigger,
triggerMessage.actorToBeScheduled,
triggerMessage.priority,
stateData
updatedStateData
)
}

/** Adds the provided trigger to the trigger queue to schedule it at the
* requested tick
Expand Down Expand Up @@ -879,4 +886,29 @@ trait SchedulerHelper extends SimonaActorLogging {

}

protected def revokeTrigger(
trigger: Trigger,
actor: ActorRef,
stateData: SchedulerStateData
): SchedulerStateData = {

// sanity check

if (trigger.tick <= stateData.time.nowInTicks)
log.warning(
s"Trying to revoke a trigger for a tick (${trigger.tick}) earlier or equal to now (${stateData.time.nowInTicks})"
)

// just remove trigger from both priority and regular queue

stateData.trigger.triggerQueue.remove(
trigger.tick,
scheduledTrigger =>
scheduledTrigger.triggerWithIdMessage.trigger == trigger &&
scheduledTrigger.agent == actor
)

stateData
}

}

0 comments on commit 507748f

Please sign in to comment.