Skip to content

Commit

Permalink
Merge pull request #779 from ie3-institute/ms/#736-refactoring-GridAg…
Browse files Browse the repository at this point in the history
…ent-messages

Refactoring of `GridAgent` messages.
  • Loading branch information
sebastian-peter committed Apr 12, 2024
2 parents 49cc2c3 + b7a9a5c commit 501a910
Show file tree
Hide file tree
Showing 37 changed files with 605 additions and 739 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0
- Converting `SimonaSim` to pekko typed/terminating SimonSim when initialization fails [#210](https://github.com/ie3-institute/simona/issues/210)
- Converting the `GridAgent` and the `DBFSAlgorithm` to `pekko typed` [#666](https://github.com/ie3-institute/simona/issues/666)
- Validation of grid will throw exception instead of just logging errors [#463](https://github.com/ie3-institute/simona/issues/463)
- Refactoring of `GridAgent` messages [#736](https://github.com/ie3-institute/simona/issues/736)

### Fixed
- Removed a repeated line in the documentation of vn_simona config [#658](https://github.com/ie3-institute/simona/issues/658)
Expand Down
136 changes: 58 additions & 78 deletions src/main/scala/edu/ie3/simona/agent/grid/DBFSAlgorithm.scala
Original file line number Diff line number Diff line change
Expand Up @@ -21,22 +21,17 @@ import edu.ie3.simona.agent.grid.GridAgentData.{
GridAgentConstantData,
PowerFlowDoneData,
}
import edu.ie3.simona.agent.grid.GridAgentMessage._
import edu.ie3.simona.agent.grid.ReceivedValues._
import edu.ie3.simona.agent.grid.VoltageMessage.ProvideSlackVoltageMessage.ExchangeVoltage
import edu.ie3.simona.agent.grid.VoltageMessage.{
ProvideSlackVoltageMessage,
RequestSlackVoltageMessage,
}
import edu.ie3.simona.agent.grid.GridAgentMessages.Responses.ExchangeVoltage
import edu.ie3.simona.agent.grid.GridAgentMessages._
import edu.ie3.simona.agent.participant.ParticipantAgent.{
FinishParticipantSimulation,
ParticipantMessage,
RequestAssetPowerMessage,
}
import edu.ie3.simona.event.RuntimeEvent.PowerFlowFailed
import edu.ie3.simona.exceptions.agent.DBFSAlgorithmException
import edu.ie3.simona.model.grid.{NodeModel, RefSystem}
import edu.ie3.simona.ontology.messages.Activation
import edu.ie3.simona.ontology.messages.PowerMessage._
import edu.ie3.simona.ontology.messages.SchedulerMessage.Completion
import edu.ie3.simona.util.TickUtil.TickLong
import edu.ie3.util.scala.quantities.DefaultQuantities._
Expand Down Expand Up @@ -78,8 +73,8 @@ trait DBFSAlgorithm extends PowerFlowSupport with GridResultsSupport {
currentTick: Long,
)(implicit
constantData: GridAgentConstantData,
buffer: StashBuffer[GridAgentMessage],
): Behavior[GridAgentMessage] = Behaviors.receivePartial {
buffer: StashBuffer[GridAgent.Request],
): Behavior[GridAgent.Request] = Behaviors.receivePartial {
case (ctx, message) =>
(message, gridAgentData) match {
// first part of the grid simulation, same for all gridAgents on all levels
Expand Down Expand Up @@ -179,7 +174,7 @@ trait DBFSAlgorithm extends PowerFlowSupport with GridResultsSupport {

// if we receive a request for slack voltages from our inferior grids we want to answer it
case (
RequestSlackVoltageMessage(
SlackVoltageRequest(
currentSweepNo,
nodeUuids,
sender,
Expand Down Expand Up @@ -269,7 +264,7 @@ trait DBFSAlgorithm extends PowerFlowSupport with GridResultsSupport {
}
} match {
case exchangeVoltages =>
sender ! ProvideSlackVoltageMessage(
sender ! SlackVoltageResponse(
currentSweepNo,
exchangeVoltages,
)
Expand All @@ -280,12 +275,10 @@ trait DBFSAlgorithm extends PowerFlowSupport with GridResultsSupport {
// before power flow calc for this sweep we either have to stash() the message to answer it later (in current sweep)
// or trigger a new run for the next sweepNo
case (
msg @ WrappedPowerMessage(
RequestGridPowerMessage(
requestSweepNo,
_,
_,
)
msg @ RequestGridPower(
requestSweepNo,
_,
_,
),
gridAgentBaseData: GridAgentBaseData,
) =>
Expand Down Expand Up @@ -316,9 +309,7 @@ trait DBFSAlgorithm extends PowerFlowSupport with GridResultsSupport {

// after power flow calc for this sweepNo
case (
WrappedPowerMessage(
RequestGridPowerMessage(_, requestedNodeUuids, sender)
),
RequestGridPower(_, requestedNodeUuids, sender),
powerFlowDoneData @ PowerFlowDoneData(
gridAgentBaseData,
powerFlowResult,
Expand Down Expand Up @@ -380,7 +371,7 @@ trait DBFSAlgorithm extends PowerFlowSupport with GridResultsSupport {
}
}
.map { case (nodeUuid, (p, q)) =>
ProvideGridPowerMessage.ExchangePower(
Responses.ExchangePower(
nodeUuid,
p,
q,
Expand Down Expand Up @@ -410,13 +401,11 @@ trait DBFSAlgorithm extends PowerFlowSupport with GridResultsSupport {
)
}

sender ! WrappedPowerMessage(
ProvideGridPowerMessage(exchangePowers)
)
sender ! GridPowerResponse(exchangePowers)
simulateGrid(updatedGridAgentBaseData, currentTick)

case _: FailedNewtonRaphsonPFResult =>
sender ! WrappedPowerMessage(FailedPowerFlow)
sender ! FailedPowerFlow
simulateGrid(gridAgentBaseData, currentTick)
}
case None =>
Expand All @@ -425,7 +414,7 @@ trait DBFSAlgorithm extends PowerFlowSupport with GridResultsSupport {
"I got a grid power request from a subgrid I don't know. Can't answer it properly."
)

sender ! WrappedPowerMessage(FailedPowerFlow)
sender ! FailedPowerFlow
Behaviors.same
}

Expand Down Expand Up @@ -523,8 +512,8 @@ trait DBFSAlgorithm extends PowerFlowSupport with GridResultsSupport {
currentTick: Long,
)(implicit
constantData: GridAgentConstantData,
buffer: StashBuffer[GridAgentMessage],
): Behavior[GridAgentMessage] = Behaviors.receivePartial {
buffer: StashBuffer[GridAgent.Request],
): Behavior[GridAgent.Request] = Behaviors.receivePartial {
case (ctx, message) =>
(message, gridAgentData) match {
// main method for power flow calculations
Expand Down Expand Up @@ -794,29 +783,23 @@ trait DBFSAlgorithm extends PowerFlowSupport with GridResultsSupport {
// happens only when we received slack data and power values before we received a request to provide grid data
// (only possible when first simulation triggered and this agent is faster in this state as the request
// by a superior grid arrives)
case (
msg: WrappedPowerMessage,
_: GridAgentBaseData,
) =>
case (powerResponse: PowerResponse, _: GridAgentBaseData) =>
ctx.log.debug(
"Received Request for Grid Power too early. Stashing away"
)

buffer.stash(msg)
buffer.stash(powerResponse)
Behaviors.same

// happens only when we received slack data and power values before we received a request to provide grid
// (only possible when first simulation triggered and this agent is faster
// with its power flow calculation in this state as the request by a superior grid arrives)
case (
msg: WrappedPowerMessage,
_: PowerFlowDoneData,
) =>
case (powerResponse: PowerResponse, _: PowerFlowDoneData) =>
ctx.log.debug(
"Received Request for Grid Power too early. Stashing away"
)

buffer.stash(msg)
buffer.stash(powerResponse)
Behaviors.same

case _ =>
Expand All @@ -836,8 +819,8 @@ trait DBFSAlgorithm extends PowerFlowSupport with GridResultsSupport {
gridAgentBaseData: GridAgentBaseData
)(implicit
constantData: GridAgentConstantData,
buffer: StashBuffer[GridAgentMessage],
): Behavior[GridAgentMessage] = Behaviors.receivePartial {
buffer: StashBuffer[GridAgent.Request],
): Behavior[GridAgent.Request] = Behaviors.receivePartial {

case (ctx, CheckPowerDifferencesTrigger(currentTick)) =>
ctx.log.debug("Starting the power differences check ...")
Expand Down Expand Up @@ -1003,12 +986,12 @@ trait DBFSAlgorithm extends PowerFlowSupport with GridResultsSupport {
allReceived: Boolean,
gridAgentBaseData: GridAgentBaseData,
currentTick: Long,
behavior: (GridAgentData, Long) => Behavior[GridAgentMessage],
behavior: (GridAgentData, Long) => Behavior[GridAgent.Request],
)(implicit
ctx: ActorContext[GridAgentMessage],
ctx: ActorContext[GridAgent.Request],
constantData: GridAgentConstantData,
buffer: StashBuffer[GridAgentMessage],
): Behavior[GridAgentMessage] =
buffer: StashBuffer[GridAgent.Request],
): Behavior[GridAgent.Request] =
if (allReceived) {
ctx.log.debug(
"All power values of inferior grids, assets + voltage superior grid slack voltages received."
Expand Down Expand Up @@ -1074,12 +1057,12 @@ trait DBFSAlgorithm extends PowerFlowSupport with GridResultsSupport {
allReceived: Boolean,
gridAgentBaseData: GridAgentBaseData,
currentTick: Long,
behavior: (GridAgentData, Long) => Behavior[GridAgentMessage],
behavior: (GridAgentData, Long) => Behavior[GridAgent.Request],
)(implicit
ctx: ActorContext[GridAgentMessage],
ctx: ActorContext[GridAgent.Request],
constantData: GridAgentConstantData,
buffer: StashBuffer[GridAgentMessage],
): Behavior[GridAgentMessage] = {
buffer: StashBuffer[GridAgent.Request],
): Behavior[GridAgent.Request] = {
if (allReceived) {
ctx.log.debug(
"All power values of child assets + inferior grids received."
Expand Down Expand Up @@ -1120,11 +1103,11 @@ trait DBFSAlgorithm extends PowerFlowSupport with GridResultsSupport {
private def handlePowerFlowFailure(
gridAgentBaseData: GridAgentBaseData,
currentTick: Long,
ctx: ActorContext[GridAgentMessage],
ctx: ActorContext[GridAgent.Request],
)(implicit
constantData: GridAgentConstantData,
buffer: StashBuffer[GridAgentMessage],
): Behavior[GridAgentMessage] = {
buffer: StashBuffer[GridAgent.Request],
): Behavior[GridAgent.Request] = {
constantData.environmentRefs.runtimeEventListener ! PowerFlowFailed

if (gridAgentBaseData.powerFlowParams.stopOnFailure) {
Expand Down Expand Up @@ -1154,8 +1137,8 @@ trait DBFSAlgorithm extends PowerFlowSupport with GridResultsSupport {
currentTick: Long,
)(implicit
constantData: GridAgentConstantData,
buffer: StashBuffer[GridAgentMessage],
): Behavior[GridAgentMessage] = {
buffer: StashBuffer[GridAgent.Request],
): Behavior[GridAgent.Request] = {
constantData.environmentRefs.scheduler ! Completion(
constantData.activationAdapter,
Some(currentTick),
Expand Down Expand Up @@ -1191,7 +1174,7 @@ trait DBFSAlgorithm extends PowerFlowSupport with GridResultsSupport {
refSystem: RefSystem,
askTimeout: Duration,
)(implicit
ctx: ActorContext[GridAgentMessage]
ctx: ActorContext[GridAgent.Request]
): Boolean = {
implicit val timeout: PekkoTimeout = PekkoTimeout.create(askTimeout)
implicit val ec: ExecutionContext = ctx.executionContext
Expand Down Expand Up @@ -1265,11 +1248,11 @@ trait DBFSAlgorithm extends PowerFlowSupport with GridResultsSupport {
*/
private def askInferiorGridsForPowers(
currentSweepNo: Int,
subGridGateToActorRef: Map[SubGridGate, ActorRef[GridAgentMessage]],
subGridGateToActorRef: Map[SubGridGate, ActorRef[GridAgent.Request]],
inferiorGridGates: Seq[SubGridGate],
askTimeout: Duration,
)(implicit
ctx: ActorContext[GridAgentMessage]
ctx: ActorContext[GridAgent.Request]
): Boolean = {
implicit val timeout: PekkoTimeout = PekkoTimeout.create(askTimeout)
implicit val ec: ExecutionContext = ctx.executionContext
Expand Down Expand Up @@ -1298,21 +1281,17 @@ trait DBFSAlgorithm extends PowerFlowSupport with GridResultsSupport {
}
.map { case (inferiorGridAgentRef, inferiorGridGateNodes) =>
inferiorGridAgentRef
.ask[GridAgentMessage](ref =>
WrappedPowerMessage(
RequestGridPowerMessage(
currentSweepNo,
inferiorGridGateNodes.distinct,
ref,
)
.ask[GridAgent.Request](ref =>
RequestGridPower(
currentSweepNo,
inferiorGridGateNodes.distinct,
ref,
)
)
.map {
case WrappedPowerMessage(
provideGridPowerMessage: ProvideGridPowerMessage
) =>
case provideGridPowerMessage: GridPowerResponse =>
(inferiorGridAgentRef, provideGridPowerMessage)
case WrappedPowerMessage(FailedPowerFlow) =>
case FailedPowerFlow =>
(inferiorGridAgentRef, FailedPowerFlow)
}
}
Expand Down Expand Up @@ -1340,11 +1319,11 @@ trait DBFSAlgorithm extends PowerFlowSupport with GridResultsSupport {
*/
private def askSuperiorGridsForSlackVoltages(
currentSweepNo: Int,
subGridGateToActorRef: Map[SubGridGate, ActorRef[GridAgentMessage]],
subGridGateToActorRef: Map[SubGridGate, ActorRef[GridAgent.Request]],
superiorGridGates: Vector[SubGridGate],
askTimeout: Duration,
)(implicit
ctx: ActorContext[GridAgentMessage]
ctx: ActorContext[GridAgent.Request]
): Boolean = {
implicit val timeout: PekkoTimeout = PekkoTimeout.create(askTimeout)
implicit val ec: ExecutionContext = ctx.executionContext
Expand All @@ -1362,14 +1341,14 @@ trait DBFSAlgorithm extends PowerFlowSupport with GridResultsSupport {
.groupBy(subGridGateToActorRef(_))
.map { case (superiorGridAgent, gridGates) =>
superiorGridAgent
.ask[GridAgentMessage](ref =>
RequestSlackVoltageMessage(
.ask[GridAgent.Request](ref =>
SlackVoltageRequest(
currentSweepNo,
gridGates.map(_.superiorNode.getUuid),
ref,
)
)
.map { case providedSlackValues: ProvideSlackVoltageMessage =>
.map { case providedSlackValues: SlackVoltageResponse =>
(superiorGridAgent, providedSlackValues)
}
}
Expand Down Expand Up @@ -1416,19 +1395,20 @@ trait DBFSAlgorithm extends PowerFlowSupport with GridResultsSupport {

/** This method uses [[ActorContext.pipeToSelf()]] to send a future message to
* itself. If the future is a [[Success]] the message is send, else a
* [[ReceivedFailure]] with the thrown error is send.
* [[WrappedFailure]] with the thrown error is send.
*
* @param future
* future message that should be send to the agent after it was processed
* @param ctx
* [[ActorContext]] of the receiving actor
*/
private def pipeToSelf(
future: Future[GridAgentMessage],
ctx: ActorContext[GridAgentMessage],
future: Future[GridAgent.Request],
ctx: ActorContext[GridAgent.Request],
): Unit = {
ctx.pipeToSelf[GridAgentMessage](future) {
ctx.pipeToSelf[GridAgent.Request](future) {
case Success(value) => value
case Failure(exception) => ReceivedFailure(exception)
case Failure(exception) => WrappedFailure(exception)
}
}
}

0 comments on commit 501a910

Please sign in to comment.