Skip to content

Commit

Permalink
Merge pull request #2551 from golemfactory/cherry-pick-fixes-v0.12.1
Browse files Browse the repository at this point in the history
Cherry-pick changes for patch v0.12.1
  • Loading branch information
pwalski committed Apr 27, 2023
2 parents 36fd76e + 914e535 commit 6f40bdd
Show file tree
Hide file tree
Showing 16 changed files with 1,131 additions and 648 deletions.
1,119 changes: 578 additions & 541 deletions Cargo.lock

Large diffs are not rendered by default.

8 changes: 6 additions & 2 deletions Cargo.toml
Expand Up @@ -198,8 +198,12 @@ ya-service-api-web = { path = "core/serv-api/web" }
#ya-sb-util = { path = "../ya-service-bus/crates/util" }

## CLIENT
ya-client = { git = "https://github.com/golemfactory/ya-client.git", rev = "2a6350f62cf8d926721225a3451822731456e3fe" }
ya-client-model = { git = "https://github.com/golemfactory/ya-client.git", rev = "2a6350f62cf8d926721225a3451822731456e3fe" }
#ya-client = { git = "https://github.com/golemfactory/ya-client.git", rev = "2a6350f62cf8d926721225a3451822731456e3fe" }
#ya-client-model = { git = "https://github.com/golemfactory/ya-client.git", rev = "2a6350f62cf8d926721225a3451822731456e3fe" }

## RELAY and networking stack
ya-relay-stack = { git = "https://github.com/golemfactory/ya-relay.git", rev = "9d958f3c68b337a16dc7a13dd09772cb5c079667" }
ya-relay-client = { git = "https://github.com/golemfactory/ya-relay.git", rev = "9d958f3c68b337a16dc7a13dd09772cb5c079667" }

## OTHERS
gftp = { path = "core/gftp" }
Expand Down
1 change: 0 additions & 1 deletion agent/provider/src/execution/task_runner.rs
Expand Up @@ -265,7 +265,6 @@ impl TaskRunner {
// =========================================== //

#[logfn_inputs(Debug, fmt = "{}Processing {:?} {:?}")]
#[logfn(ok = "INFO", err = "ERROR", fmt = "Activity created: {:?}")]
fn on_create_activity(&mut self, msg: CreateActivity, ctx: &mut Context<Self>) -> Result<()> {
let agreement = match self.active_agreements.get(&msg.agreement_id) {
None => bail!("Can't create activity for not my agreement [{:?}].", msg),
Expand Down
14 changes: 11 additions & 3 deletions agent/provider/src/tasks/task_manager.rs
Expand Up @@ -489,6 +489,7 @@ impl Handler<BreakAgreement> for TaskManager {

fn handle(&mut self, msg: BreakAgreement, ctx: &mut Context<Self>) -> Self::Result {
let actx = self.async_context(ctx);
let agreement_id = msg.agreement_id.clone();

self.cancel_handles(ctx, &msg.agreement_id);

Expand Down Expand Up @@ -518,13 +519,20 @@ impl Handler<BreakAgreement> for TaskManager {
finish_transition(&actx.myself, &msg.agreement_id, new_state).await?;

log::info!("Agreement [{}] cleanup finished.", msg.agreement_id);
Ok(())
anyhow::Ok(())
}
.await
}
.map_err(move |error: Error| log::error!("Can't break agreement. Error: {}", error));
.into_actor(self)
.map(move |result, myself, _| match result {
Err(e) if myself.tasks.is_agreement_finalized(&agreement_id) => {
log::warn!("Can't break already finalized agreement. Error: {e}")
}
Err(e) => log::error!("Can't break agreement. Error: {e}"),
_ => (),
});

ActorResponse::r#async(future.into_actor(self).map(|_, _, _| Ok(())))
ActorResponse::r#async(future.map(|_, _, _| Ok(())))
}
}

Expand Down
15 changes: 13 additions & 2 deletions core/activity/src/provider/service.rs
Expand Up @@ -84,11 +84,13 @@ async fn create_activity_gsb(
caller: String,
msg: activity::Create,
) -> RpcMessageResult<activity::Create> {
authorize_agreement_initiator(caller, &msg.agreement_id, Role::Provider).await?;
authorize_agreement_initiator(caller.clone(), &msg.agreement_id, Role::Provider).await?;

let activity_id = generate_id();
let agreement = get_agreement(&msg.agreement_id, Role::Provider).await?;
let agreement_id = agreement.agreement_id.clone();
let app_session_id = agreement.app_session_id.clone();

if agreement.state != AgreementState::Approved {
// to track inconsistencies between this and remote market service
counter!("activity.provider.create.agreement.not-approved", 1);
Expand Down Expand Up @@ -145,6 +147,9 @@ async fn create_activity_gsb(
e
})?;

log::info!(
"Requestor [{caller}] created Activity [{activity_id}] for Agreement [{agreement_id}]"
);
counter!("activity.provider.created", 1);

Ok(if credentials.is_none() {
Expand Down Expand Up @@ -213,7 +218,7 @@ async fn destroy_activity_gsb(
caller: String,
msg: activity::Destroy,
) -> RpcMessageResult<activity::Destroy> {
authorize_activity_initiator(&db, caller, &msg.activity_id, Role::Provider).await?;
authorize_activity_initiator(&db, caller.clone(), &msg.activity_id, Role::Provider).await?;

if !get_persisted_state(&db, &msg.activity_id).await?.alive() {
return Ok(());
Expand Down Expand Up @@ -243,6 +248,12 @@ async fn destroy_activity_gsb(
.await
.map(|_| ())?;

log::info!(
"Requestor [{caller}] destroyed Activity [{}] for Agreement [{}]",
msg.activity_id,
agreement.agreement_id
);

counter!("activity.provider.destroyed.by_requestor", 1);
Ok(result)
}
Expand Down
1 change: 0 additions & 1 deletion core/net/Cargo.toml
Expand Up @@ -16,7 +16,6 @@ ya-core-model = { version = "^0.8", features=["net", "identity"] }

ya-relay-client = "0.5"
#ya-relay-client = { path = "../../../ya-relay/client" }
#ya-relay-client = { git = "https://github.com/golemfactory/ya-relay.git", rev = "f1009ec013d27795a56c6493447836a851352b72" }

ya-sb-proto = { version = "0.6.1" }
ya-sb-util = { version = "0.4.1" }
Expand Down
3 changes: 2 additions & 1 deletion core/payment-driver/erc20/src/driver/api.rs
Expand Up @@ -81,9 +81,10 @@ pub async fn schedule_payment(
dao: &Erc20Dao,
msg: SchedulePayment,
) -> Result<String, GenericError> {
log::debug!("schedule_payment {msg:?}");

let order_id = Uuid::new_v4().to_string();
dao.insert_payment(&order_id, &msg).await?;
log::info!("schedule_payment()");
Ok(order_id)
}

Expand Down
23 changes: 17 additions & 6 deletions core/payment/src/api/debit_notes.rs
Expand Up @@ -180,13 +180,14 @@ async fn issue_debit_note(
.create_if_not_exists(agreement, node_id, Role::Provider)
.await?;
db.as_dao::<ActivityDao>()
.create_if_not_exists(activity_id, node_id, Role::Provider, agreement_id)
.create_if_not_exists(activity_id.clone(), node_id, Role::Provider, agreement_id)
.await?;

let dao: DebitNoteDao = db.as_dao();
let debit_note_id = dao.create_new(debit_note, node_id).await?;
let debit_note = dao.get(debit_note_id, node_id).await?;
let debit_note = dao.get(debit_note_id.clone(), node_id).await?;

log::info!("DebitNote [{debit_note_id}] for Activity [{activity_id}] issued.");
counter!("payment.debit_notes.provider.issued", 1);
Ok(debit_note)
}
Expand Down Expand Up @@ -225,28 +226,34 @@ async fn send_debit_note(
}

let timeout = query.timeout.unwrap_or(params::DEFAULT_ACK_TIMEOUT);
let activity_id = debit_note.activity_id.clone();
let recipient_id = debit_note.recipient_id;

let result = with_timeout(timeout, async move {
match async move {
log::debug!(
"Sending DebitNote [{}] to [{}].",
debit_note_id,
debit_note.debit_note_id,
debit_note.recipient_id
);

let debit_note_id = debit_note.debit_note_id.clone();

ya_net::from(node_id)
.to(debit_note.recipient_id)
.service(PUBLIC_SERVICE)
.call(SendDebitNote(debit_note))
.await??;
dao.mark_received(debit_note_id, node_id).await?;
dao.mark_received(debit_note_id.clone(), node_id).await?;
Ok(())
}
.timeout(Some(timeout))
.await
{
Ok(Ok(_)) => {
log::info!("DebitNote [{}] sent.", path.debit_note_id);
log::info!(
"DebitNote [{debit_note_id}] for Activity [{activity_id}] sent to [{recipient_id}]."
);
counter!("payment.debit_notes.provider.sent", 1);
response::ok(Null)
}
Expand Down Expand Up @@ -393,7 +400,11 @@ async fn accept_debit_note(
.await
{
Ok(Ok(_)) => {
log::info!("DebitNote [{}] accepted.", path.debit_note_id);
log::info!(
"DebitNote [{}] for Activity [{}] accepted.",
path.debit_note_id,
activity_id
);
counter!("payment.debit_notes.requestor.accepted", 1);
response::ok(Null)
}
Expand Down
37 changes: 23 additions & 14 deletions core/payment/src/api/invoices.rs
Expand Up @@ -148,22 +148,19 @@ async fn issue_invoice(db: Data<DbExecutor>, body: Json<NewInvoice>, id: Identit
.await
{
Ok(Some(agreement)) => agreement,
Ok(None) => {
return response::bad_request(&format!("Agreement not found: {}", agreement_id))
}
Ok(None) => return response::bad_request(&format!("Agreement not found: {agreement_id}")),
Err(e) => return response::server_error(&e),
};

for activity_id in activity_ids.iter() {
match get_agreement_id(activity_id.clone(), ya_client_model::market::Role::Provider).await {
Ok(Some(id)) if id != agreement_id => {
return response::bad_request(&format!(
"Activity {} belongs to agreement {} not {}",
activity_id, id, agreement_id
"Activity {activity_id} belongs to agreement {id} not {agreement_id}"
));
}
Ok(None) => {
return response::bad_request(&format!("Activity not found: {}", activity_id))
return response::bad_request(&format!("Activity not found: {activity_id}"))
}
Err(e) => return response::server_error(&e),
_ => (),
Expand All @@ -188,8 +185,9 @@ async fn issue_invoice(db: Data<DbExecutor>, body: Json<NewInvoice>, id: Identit

let dao: InvoiceDao = db.as_dao();
let invoice_id = dao.create_new(invoice, node_id).await?;
let invoice = dao.get(invoice_id, node_id).await?;
let invoice = dao.get(invoice_id.clone(), node_id).await?;

log::info!("Invoice [{invoice_id}] for Agreement [{agreement_id}] issued.");
counter!("payment.invoices.provider.issued", 1);
Ok(invoice)
}
Expand All @@ -214,7 +212,7 @@ async fn send_invoice(
let node_id = id.identity;
let dao: InvoiceDao = db.as_dao();

log::debug!("Requested send invoice [{}]", invoice_id);
log::debug!("Requested send invoice [{invoice_id}]");
counter!("payment.invoices.provider.sent.call", 1);

let invoice = match dao.get(invoice_id.clone(), node_id).await {
Expand All @@ -227,9 +225,12 @@ async fn send_invoice(
return response::ok(Null); // Invoice has been already sent
}
let timeout = query.timeout.unwrap_or(params::DEFAULT_ACK_TIMEOUT);
let agreement_id = invoice.agreement_id.clone();
let recipient_id = invoice.recipient_id;

let result = async move {
match async move {
let invoice_id = invoice.invoice_id.clone();
log::debug!(
"Sending invoice [{}] to [{}].",
invoice_id,
Expand All @@ -248,7 +249,9 @@ async fn send_invoice(
.await
{
Ok(Ok(_)) => {
log::info!("Invoice [{}] sent.", path.invoice_id);
log::info!(
"Invoice [{invoice_id}] for Agreement [{agreement_id}] sent to [{recipient_id}]."
);
counter!("payment.invoices.provider.sent", 1);
response::ok(Null)
}
Expand Down Expand Up @@ -297,31 +300,33 @@ async fn cancel_invoice(
}

let timeout = query.timeout.unwrap_or(params::DEFAULT_ACK_TIMEOUT);
let agreement_id = invoice.agreement_id.clone();

let result = async move {
match async move {
log::debug!(
"Canceling invoice [{}] sent to [{}].",
invoice_id,
invoice.invoice_id,
invoice.recipient_id
);

ya_net::from(node_id)
.to(invoice.recipient_id)
.service(PUBLIC_SERVICE)
.call(CancelInvoice {
invoice_id: invoice_id.clone(),
invoice_id: invoice.invoice_id.clone(),
recipient_id: invoice.recipient_id,
})
.await??;
dao.cancel(invoice_id, node_id).await?;
dao.cancel(invoice.invoice_id, node_id).await?;
Ok(())
}
.timeout(Some(timeout))
.await
{
Ok(Ok(_)) => {
counter!("payment.invoices.provider.cancelled", 1);
log::info!("Invoice [{}] cancelled.", path.invoice_id);
log::info!("Invoice [{invoice_id}] for Agreement [{agreement_id}] cancelled.");
response::ok(Null)
}
Ok(Err(Error::Rpc(RpcMessageError::Cancel(CancelError::Conflict)))) => {
Expand Down Expand Up @@ -460,7 +465,11 @@ async fn accept_invoice(
{
Ok(Ok(_)) => {
counter!("payment.invoices.requestor.accepted", 1);
log::info!("Invoice [{}] accepted.", path.invoice_id);
log::info!(
"Invoice [{}] for Agreement [{}] accepted.",
path.invoice_id,
agreement_id
);
response::ok(Null)
}
Ok(Err(Error::Rpc(RpcMessageError::AcceptReject(AcceptRejectError::BadRequest(
Expand Down
31 changes: 19 additions & 12 deletions core/payment/src/service.rs
Expand Up @@ -422,16 +422,14 @@ mod public {
.create_if_not_exists(agreement, node_id, Role::Requestor)
.await?;
db.as_dao::<ActivityDao>()
.create_if_not_exists(activity_id, node_id, Role::Requestor, agreement_id)
.create_if_not_exists(activity_id.clone(), node_id, Role::Requestor, agreement_id)
.await?;
db.as_dao::<DebitNoteDao>()
.insert_received(debit_note)
.await?;

log::info!(
"DebitNote [{}] received from node [{}].",
debit_note_id,
issuer_id
"DebitNote [{debit_note_id}] for Activity [{activity_id}] received from node [{issuer_id}]."
);
counter!("payment.debit_notes.requestor.received", 1);
Ok(())
Expand Down Expand Up @@ -492,7 +490,7 @@ mod public {

match dao.accept(debit_note_id.clone(), node_id).await {
Ok(_) => {
log::info!("Node [{}] accepted DebitNote [{}].", node_id, debit_note_id);
log::info!("Node [{node_id}] accepted DebitNote [{debit_note_id}].");
counter!("payment.debit_notes.provider.accepted", 1);
Ok(Ack {})
}
Expand Down Expand Up @@ -584,17 +582,18 @@ mod public {
return Err(SendError::BadRequest("Invalid sender node ID".to_owned()));
}

let node_id = *agreement.requestor_id();
let owner_id = *agreement.requestor_id();
let sender_id = *agreement.provider_id();
match async move {
db.as_dao::<AgreementDao>()
.create_if_not_exists(agreement, node_id, Role::Requestor)
.create_if_not_exists(agreement, owner_id, Role::Requestor)
.await?;

let dao: ActivityDao = db.as_dao();
for activity_id in activity_ids {
dao.create_if_not_exists(
activity_id,
node_id,
owner_id,
Role::Requestor,
agreement_id.clone(),
)
Expand All @@ -603,7 +602,9 @@ mod public {

db.as_dao::<InvoiceDao>().insert_received(invoice).await?;

log::info!("Invoice [{}] received from node [{}].", node_id, invoice_id);
log::info!(
"Invoice [{invoice_id}] for Agreement [{agreement_id}] received from node [{sender_id}]."
);
counter!("payment.invoices.requestor.received", 1);
Ok(())
}
Expand Down Expand Up @@ -663,7 +664,12 @@ mod public {

match dao.accept(invoice_id.clone(), node_id).await {
Ok(_) => {
log::info!("Node [{}] accepted invoice [{}].", node_id, invoice_id);
log::info!(
"Node [{}] accepted invoice [{}] for Agreement [{}].",
node_id,
invoice_id,
invoice.agreement_id
);
counter!("payment.invoices.provider.accepted", 1);
Ok(Ack {})
}
Expand Down Expand Up @@ -718,9 +724,10 @@ mod public {
match dao.cancel(invoice_id.clone(), invoice.recipient_id).await {
Ok(_) => {
log::info!(
"Node [{}] cancelled invoice [{}].",
"Node [{}] cancelled invoice [{}] for Agreement [{}]..",
invoice.recipient_id,
invoice_id
invoice_id,
invoice.agreement_id
);
counter!("payment.invoices.requestor.cancelled", 1);
Ok(Ack {})
Expand Down

0 comments on commit 6f40bdd

Please sign in to comment.