Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

TerminateAgreement endpoint #832

Merged
merged 18 commits into from
Dec 8, 2020
Merged
Show file tree
Hide file tree
Changes from 14 commits
Commits
Show all changes
18 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
373 changes: 223 additions & 150 deletions Cargo.lock

Large diffs are not rendered by default.

77 changes: 77 additions & 0 deletions core/market/src/db/dao/agreement.rs
Original file line number Diff line number Diff line change
Expand Up @@ -92,6 +92,55 @@ impl<'c> AgreementDao<'c> {
.await
}

pub async fn select_by_node(
&self,
id: AgreementId,
node_id: NodeId,
validation_ts: NaiveDateTime,
) -> DbResult<Option<Agreement>> {
// Because we explicitly disallow agreements between the same identities
// (i.e. provider_id != requestor_id), we'll always get the right db row
// with this query.
let id_swapped = id.clone().swap_owner();
let id_orig = id.clone();
do_with_transaction(self.pool, move |conn| {
let query = market_agreement
.filter(agreement::id.eq_any(vec![id_orig, id_swapped]))
.filter(
agreement::provider_id
.eq(node_id)
.or(agreement::requestor_id.eq(node_id)),
);
Ok(match query.first::<Agreement>(conn).optional()? {
Some(mut agreement) => {
if agreement.valid_to < validation_ts {
agreement.state = AgreementState::Expired;
update_state(conn, &id, &agreement.state)?;
}
Some(agreement)
}
None => {
log::debug!("Not in DB"); //XXX
None
}
})
})
.await
}

pub async fn terminate(
&self,
id: &AgreementId,
reason: Option<String>,
owner_type: OwnerType,
) -> DbResult<bool> {
let id = id.clone();
do_with_transaction(self.pool, move |conn| {
terminate(conn, &id, reason, owner_type)
})
.await
}

pub async fn save(&self, agreement: Agreement) -> Result<Agreement, SaveAgreementError> {
// Agreement is always created for last Provider Proposal.
let proposal_id = agreement.offer_proposal_id.clone();
Expand Down Expand Up @@ -258,3 +307,31 @@ fn update_session(conn: &ConnType, id: &AgreementId, session: &str) -> DbResult<
.execute(conn)?;
Ok(num_updated > 0)
}

fn terminate(
conn: &ConnType,
id: &AgreementId,
reason: Option<String>,
owner_type: OwnerType,
) -> DbResult<bool> {
log::debug!("Termination reason: {:?}", reason);
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This will be ugly log:
Some("Reason")
Maybe you should implement something like .display()

let num_updated = diesel::update(agreement::market_agreement.find(id))
.set(agreement::state.eq(AgreementState::Terminated))
.execute(conn)?;

if num_updated == 0 {
return Ok(false);
}

let event = NewAgreementEvent {
jiivan marked this conversation as resolved.
Show resolved Hide resolved
agreement_id: id.clone(),
reason,
event_type: AgreementEventType::Terminated,
issuer: owner_type,
};

diesel::insert_into(market_agreement_event)
.values(&event)
.execute(conn)?;
Ok(true)
}
2 changes: 1 addition & 1 deletion core/market/src/db/model/agreement_events.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ impl AgreementEvent {
let event_date = DateTime::<Utc>::from_utc(self.timestamp, Utc);
let reason = self
.reason
.map(|reason| serde_json::from_str::<JsonReason>(&reason))
.map(|reason| serde_json::from_str(&reason).map(|value| JsonReason {json: value}))
.map(|result| result.map_err(|e| {
log::warn!(
"Agreement Event with not parsable Reason in database. Error: {}. Shouldn't happen \
Expand Down
13 changes: 13 additions & 0 deletions core/market/src/market.rs
Original file line number Diff line number Diff line change
Expand Up @@ -271,6 +271,19 @@ impl MarketService {
.map(|event| event.into_client())
.collect())
}

pub async fn terminate_agreement(
&self,
id: Identity,
agreement_id: AgreementId,
reason: Option<String>,
) -> Result<(), AgreementError> {
// We won't attach ourselves too much to owner type here. It will be replaced in CommonBroker
self.requestor_engine
.common
.terminate_agreement(id, agreement_id, reason)
.await
}
}

impl Service for MarketService {
Expand Down
7 changes: 7 additions & 0 deletions core/market/src/matcher/resolver.rs
Original file line number Diff line number Diff line change
Expand Up @@ -110,6 +110,13 @@ impl Resolver {
}

fn matches(offer: &Offer, demand: &Demand) -> bool {
if offer.node_id == demand.node_id {
log::info!(
"Rejecting Demand Offer pair from single identity. node_id: {}",
offer.node_id
);
return false;
}
match match_demand_offer(
&demand.properties,
&demand.constraints,
Expand Down
154 changes: 148 additions & 6 deletions core/market/src/negotiation/common.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,14 +5,17 @@ use std::sync::Arc;
use std::time::{Duration, Instant};

use ya_client::model::market::proposal::Proposal as ClientProposal;
use ya_client::model::market::reason::Reason;
use ya_client::model::market::NewProposal;
use ya_client::model::NodeId;
use ya_market_resolver::{match_demand_offer, Match};
use ya_persistence::executor::DbExecutor;
use ya_service_api_web::middleware::Identity;

use crate::config::Config;
use crate::db::dao::{AgreementEventsDao, NegotiationEventsDao, ProposalDao, SaveProposalError};
use crate::db::dao::{
AgreementDao, AgreementEventsDao, NegotiationEventsDao, ProposalDao, SaveProposalError,
};
use crate::db::model::{
Agreement, AgreementEvent, AgreementId, AgreementState, AppSessionId, IssuerType, MarketEvent,
OwnerType, Proposal,
Expand All @@ -22,14 +25,19 @@ use crate::matcher::{
error::{DemandError, QueryOfferError},
store::SubscriptionStore,
};
use crate::negotiation::error::{AgreementEventsError, AgreementStateError, GetProposalError};
use crate::negotiation::notifier::NotifierError;
use crate::negotiation::{
error::{MatchValidationError, ProposalError, QueryEventsError},
error::{
AgreementError, AgreementEventsError, AgreementStateError, GetProposalError,
MatchValidationError, ProposalError, QueryEventsError, ReasonError,
},
notifier::NotifierError,
EventNotifier,
};
use crate::protocol::negotiation::error::{CounterProposalError, RemoteProposalError};
use crate::protocol::negotiation::messages::ProposalReceived;
use crate::protocol::negotiation::common as protocol_common;
use crate::protocol::negotiation::error::{
CounterProposalError, RemoteAgreementError, RemoteProposalError, TerminateAgreementError,
};
use crate::protocol::negotiation::messages::{AgreementTerminated, ProposalReceived};

type IsFirst = bool;

Expand Down Expand Up @@ -279,6 +287,134 @@ impl CommonBroker {
})
}

// Called locally via REST
pub async fn terminate_agreement(
&self,
id: Identity,
agreement_id: AgreementId,
reason: Option<String>,
jiivan marked this conversation as resolved.
Show resolved Hide resolved
) -> Result<(), AgreementError> {
verify_reason(reason.as_ref())?;
let dao = self.db.as_dao::<AgreementDao>();
log::debug!(
"Getting agreement. id: {:?}, agrid: {}, reason: {:?}",
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please don't display using debug if it is possible

id,
agreement_id,
reason
); // XXX
let mut agreement = match dao
.select_by_node(
agreement_id.clone(),
id.identity.clone(),
Utc::now().naive_utc(),
)
.await
.map_err(|e| AgreementError::Get(agreement_id.clone(), e))?
{
None => return Err(AgreementError::NotFound(agreement_id)),
Some(agreement) => agreement,
};
// from now on agreement_id is invalid. Use only agreement.id
// (which has valid owner)
expect_state(&agreement, AgreementState::Approved)?;
agreement.state = AgreementState::Terminated;
let owner_type = agreement.id.owner();
protocol_common::propagate_terminate_agreement(
&agreement,
id.identity.clone(),
match owner_type {
OwnerType::Requestor => agreement.provider_id,
OwnerType::Provider => agreement.requestor_id,
},
reason.clone(),
)
.await?;
dao.terminate(&agreement.id, reason, owner_type)
.await
.map_err(|e| AgreementError::Get(agreement.id.clone(), e))?;

match owner_type {
OwnerType::Provider => counter!("market.agreements.provider.terminated", 1),
OwnerType::Requestor => counter!("market.agreements.requestor.terminated", 1),
};
log::info!(
"Requestor {} terminated Agreement [{}] and sent to Provider.",
&id.identity,
&agreement.id,
);
Ok(())
}

// Called remotely via GSB
pub async fn on_agreement_terminated(
self,
caller: String,
msg: AgreementTerminated,
owner_type: OwnerType,
) -> Result<(), TerminateAgreementError> {
let caller: NodeId =
caller
.parse()
jiivan marked this conversation as resolved.
Show resolved Hide resolved
.map_err(|e: ya_client::model::node_id::ParseError| {
TerminateAgreementError::CallerParseError {
e: e.to_string(),
caller,
id: msg.agreement_id.clone(),
}
})?;
Ok(self
.on_agreement_terminated_inner(caller, msg, owner_type)
.await?)
}

async fn on_agreement_terminated_inner(
self,
caller: NodeId,
msg: AgreementTerminated,
owner_type: OwnerType,
) -> Result<(), RemoteAgreementError> {
let dao = self.db.as_dao::<AgreementDao>();
let agreement_id = msg.agreement_id.translate(owner_type);
let agreement = dao
.select(&agreement_id, None, Utc::now().naive_utc())
.await
.map_err(|_e| RemoteAgreementError::NotFound(agreement_id.clone()))?
.ok_or(RemoteAgreementError::NotFound(agreement_id.clone()))?;

match owner_type {
OwnerType::Requestor => {
if agreement.provider_id != caller {
// Don't reveal, that we know this Agreement id.
Err(RemoteAgreementError::NotFound(agreement_id.clone()))?
}
}
OwnerType::Provider => {
if agreement.requestor_id != caller {
// Don't reveal, that we know this Agreement id.
Err(RemoteAgreementError::NotFound(agreement_id.clone()))?
}
}
}

// Opposite side terminated.
let terminator = match owner_type {
OwnerType::Provider => OwnerType::Requestor,
OwnerType::Requestor => OwnerType::Provider,
};

dao.terminate(&agreement_id, msg.reason, terminator)
.await
.map_err(|e| {
log::info!(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I would say it is even warn, because it would leave Provider and Requestor in inconsistent state

"Couldn't terminate agreement. id: {}, e: {}",
agreement_id,
e
);
RemoteAgreementError::InternalError(agreement_id.clone())
})?;
Ok(())
}

// TODO: We need more elegant solution than this. This function still returns
// CounterProposalError, which should be hidden in negotiation API and implementations
// of handlers should return RemoteProposalError.
Expand Down Expand Up @@ -445,3 +581,9 @@ pub fn expect_state(
AgreementState::Terminated => AgreementStateError::Terminated(agreement.id.clone()),
})?
}

fn verify_reason(reason: Option<&String>) -> Result<(), ReasonError> {
Ok(if let Some(s) = reason {
serde_json::from_str::<Reason>(s)?;
})
}
10 changes: 9 additions & 1 deletion core/market/src/negotiation/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,7 @@ use crate::db::model::{
use crate::db::{dao::SaveProposalError, dao::TakeEventsError, DbError};
use crate::protocol::negotiation::error::{
ApproveAgreementError, CounterProposalError as ProtocolProposalError, GsbAgreementError,
NegotiationApiInitError, ProposeAgreementError,
NegotiationApiInitError, ProposeAgreementError, TerminateAgreementError,
};

#[derive(Error, Debug)]
Expand Down Expand Up @@ -82,10 +82,18 @@ pub enum AgreementError {
ProtocolCreate(#[from] ProposeAgreementError),
#[error("Protocol error while approving: {0}")]
ProtocolApprove(#[from] ApproveAgreementError),
#[error("Protocol error while terminating: {0}")]
ProtocolTerminate(#[from] TerminateAgreementError),
#[error(transparent)]
ReasonError(#[from] ReasonError),
#[error("Internal error: {0}")]
Internal(String),
}

#[derive(Error, Debug)]
#[error("Reason parse error: {0}")]
pub struct ReasonError(#[from] pub serde_json::Error);

#[derive(Error, Debug)]
pub enum WaitForApprovalError {
#[error("Agreement [{0}] not found.")]
Expand Down
6 changes: 6 additions & 0 deletions core/market/src/negotiation/provider.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,6 +43,7 @@ impl ProviderBroker {
let broker1 = broker.clone();
let broker2 = broker.clone();
let broker3 = broker.clone();
let broker_terminated = broker.clone();

let api = NegotiationApi::new(
move |caller: String, msg: InitialProposalReceived| {
Expand All @@ -58,6 +59,11 @@ impl ProviderBroker {
on_agreement_received(broker3.clone(), caller, msg)
},
move |_caller: String, _msg: AgreementCancelled| async move { unimplemented!() },
move |caller: String, msg: AgreementTerminated| {
broker_terminated
.clone()
.on_agreement_terminated(caller, msg, OwnerType::Provider)
},
);

// Initialize counters to 0 value. Otherwise they won't appear on metrics endpoint
Expand Down
7 changes: 7 additions & 0 deletions core/market/src/negotiation/requestor.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,7 @@ impl RequestorBroker {

let broker1 = broker.clone();
let broker2 = broker.clone();
let broker_terminated = broker.clone();
let notifier = broker.negotiation_notifier.clone();

let api = NegotiationApi::new(
Expand All @@ -64,6 +65,11 @@ impl RequestorBroker {
on_agreement_approved(broker2.clone(), caller, msg)
},
move |_caller: String, _msg: AgreementRejected| async move { unimplemented!() },
move |caller: String, msg: AgreementTerminated| {
broker_terminated
.clone()
.on_agreement_terminated(caller, msg, OwnerType::Requestor)
},
);

let engine = RequestorBroker {
Expand All @@ -78,6 +84,7 @@ impl RequestorBroker {
counter!("market.agreements.requestor.approved", 0);
counter!("market.agreements.requestor.rejected", 0);
counter!("market.agreements.requestor.cancelled", 0);
counter!("market.agreements.requestor.terminated", 0);
counter!("market.proposals.requestor.generated", 0);
counter!("market.proposals.requestor.received", 0);
counter!("market.proposals.requestor.countered", 0);
Expand Down
Loading