Skip to content

Commit

Permalink
Adjust wait_for_approval; Possible returning cancel reason
Browse files Browse the repository at this point in the history
  • Loading branch information
nieznanysprawiciel committed Feb 15, 2021
1 parent 6635dcf commit 32b8f63
Show file tree
Hide file tree
Showing 2 changed files with 25 additions and 28 deletions.
2 changes: 1 addition & 1 deletion core/market/src/negotiation/notifier.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,7 +13,7 @@ where
{
#[error("Timeout while waiting for events for id [{}]", .0.display())]
Timeout(Type),
#[error("Unsubscribed [{}]", .0.display())]
#[error("Unsubscribed notifications for [{}]", .0.display())]
Unsubscribed(Type),
#[error("Channel closed while waiting for events for id [{}]", .0.display())]
ChannelClosed(Type),
Expand Down
51 changes: 24 additions & 27 deletions core/market/src/negotiation/requestor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,15 +22,14 @@ use crate::protocol::negotiation::{error::*, messages::*, requestor::Negotiation
use super::{common::*, error::*, notifier::NotifierError, EventNotifier};
use crate::config::Config;
use crate::db::dao::AgreementEventsDao;
use crate::db::model::AgreementEventType;
use crate::utils::display::EnableDisplay;

#[derive(Clone, derive_more::Display, Debug, PartialEq)]
pub enum ApprovalStatus {
#[display(fmt = "Approved")]
Approved,
#[display(fmt = "Cancelled")]
Cancelled,
Cancelled { reason: Option<Reason> },
#[display(fmt = "Rejected")]
Rejected { reason: Option<Reason> },
}
Expand Down Expand Up @@ -359,30 +358,14 @@ impl RequestorBroker {
AgreementState::Approved => {
return Ok(ApprovalStatus::Approved);
}
// `AgreementRejectedEvent` should be last and the only event for this Agreement.
AgreementState::Rejected => {
// `AgreementRejectedEvent` should be last and the only event for this
// Agreement. If it' not
return Ok(ApprovalStatus::Rejected {
reason: self
.common
.db
.as_dao::<AgreementEventsDao>()
.select_for_agreement(&agreement.id)
.await
.map(|events| {
events.last().cloned().map(|event| {
if event.event_type != AgreementEventType::Rejected { log::error!("Expected AgreementRejected event in DB for Agreement [{}].", &agreement.id);
};
event.reason.map(|reason| reason.0)
})
})
.ok()
.flatten()
.flatten(),
});
let reason = self.query_reason_for(&agreement.id).await;
return Ok(ApprovalStatus::Rejected { reason });
}
AgreementState::Cancelled => {
return Ok(ApprovalStatus::Cancelled);
let reason = self.query_reason_for(&agreement.id).await;
return Ok(ApprovalStatus::Cancelled { reason });
}
AgreementState::Expired => return Err(WaitForApprovalError::Expired(id.clone())),
AgreementState::Proposal => {
Expand All @@ -397,10 +380,7 @@ impl RequestorBroker {
if let Err(error) = notifier.wait_for_event_with_timeout(timeout).await {
return match error {
NotifierError::Timeout(_) => Err(WaitForApprovalError::Timeout(id.clone())),
NotifierError::ChannelClosed(_) => {
Err(WaitForApprovalError::Internal(error.to_string()))
}
NotifierError::Unsubscribed(_) => Ok(ApprovalStatus::Cancelled),
e => Err(WaitForApprovalError::Internal(e.to_string())),
};
}
}
Expand Down Expand Up @@ -461,6 +441,23 @@ impl RequestorBroker {
}
return Ok(());
}

async fn query_reason_for(&self, agreement_id: &AgreementId) -> Option<Reason> {
self.common
.db
.as_dao::<AgreementEventsDao>()
.select_for_agreement(agreement_id)
.await
.map(|events| {
events
.last()
.cloned()
.map(|event| event.reason.map(|reason| reason.0))
})
.ok()
.flatten()
.flatten()
}
}

async fn on_agreement_approved(
Expand Down

0 comments on commit 32b8f63

Please sign in to comment.