diff --git a/core/payment/examples/payment_api.rs b/core/payment/examples/payment_api.rs index 57b78ea5f6..60327f41b5 100644 --- a/core/payment/examples/payment_api.rs +++ b/core/payment/examples/payment_api.rs @@ -21,6 +21,7 @@ use ya_persistence::executor::DbExecutor; use ya_service_api_web::middleware::auth::dummy::DummyAuth; use ya_service_api_web::middleware::Identity; use ya_service_api_web::rest_api_addr; +use ya_service_api_web::scope::ExtendableScope; use ya_service_bus::typed as bus; use ya_zksync_driver as zksync; @@ -293,14 +294,16 @@ async fn main() -> anyhow::Result<()> { role: "".to_string(), }; - let provider_scope = - ya_payment::api::provider_scope().wrap(DummyAuth::new(provider_identity)); - let requestor_scope = - ya_payment::api::requestor_scope().wrap(DummyAuth::new(requestor_identity)); + let provider_api_scope = Scope::new("/provider") + .extend(ya_payment::api::api_scope) + .wrap(DummyAuth::new(provider_identity)); + let requestor_api_scope = Scope::new("/requestor") + .extend(ya_payment::api::api_scope) + .wrap(DummyAuth::new(requestor_identity)); let payment_service = Scope::new(PAYMENT_API_PATH) .data(db.clone()) - .service(provider_scope) - .service(requestor_scope); + .service(provider_api_scope) + .service(requestor_api_scope); App::new() .wrap(middleware::Logger::default()) .service(payment_service) diff --git a/core/payment/src/api.rs b/core/payment/src/api.rs index e653204f54..ef311ce119 100644 --- a/core/payment/src/api.rs +++ b/core/payment/src/api.rs @@ -5,22 +5,27 @@ use ya_client_model::payment::PAYMENT_API_PATH; use ya_persistence::executor::DbExecutor; use ya_service_api_web::scope::ExtendableScope; -mod provider; -mod requestor; +mod accounts; +mod allocations; +mod debit_notes; +mod invoices; +mod payments; -pub fn provider_scope() -> Scope { - Scope::new("/provider").extend(provider::register_endpoints) -} - -pub fn requestor_scope() -> Scope { - Scope::new("/requestor").extend(requestor::register_endpoints) +pub fn api_scope(scope: Scope) -> Scope { + scope + .extend(accounts::register_endpoints) + .extend(allocations::register_endpoints) + .extend(debit_notes::register_endpoints) + .extend(invoices::register_endpoints) + .extend(payments::register_endpoints) } pub fn web_scope(db: &DbExecutor) -> Scope { Scope::new(PAYMENT_API_PATH) .data(db.clone()) - .service(provider_scope()) - .service(requestor_scope()) + .service(api_scope(Scope::new(""))) + // TODO: TEST + // Scope::new(PAYMENT_API_PATH).extend(api_scope).data(db.clone()) } pub const DEFAULT_ACK_TIMEOUT: f64 = 60.0; // seconds diff --git a/core/payment/src/api/accounts.rs b/core/payment/src/api/accounts.rs new file mode 100644 index 0000000000..8ffbaae3a1 --- /dev/null +++ b/core/payment/src/api/accounts.rs @@ -0,0 +1,56 @@ +// Extrnal crates +use actix_web::web::get; +use actix_web::{HttpResponse, Scope}; + +// Workspace uses +use ya_client_model::payment::*; +use ya_core_model::payment::local::{GetAccounts, BUS_ID as LOCAL_SERVICE}; +use ya_service_api_web::middleware::Identity; +use ya_service_bus::{typed as bus, RpcEndpoint}; + +// Local uses +use crate::utils::*; + +pub fn register_endpoints(scope: Scope) -> Scope { + scope + .route("/providerAccounts", get().to(get_provider_accounts)) + .route("/requestorAccounts", get().to(get_requestor_accounts)) +} + +async fn get_provider_accounts(id: Identity) -> HttpResponse { + let node_id = id.identity.to_string(); + let all_accounts = match bus::service(LOCAL_SERVICE).send(GetAccounts {}).await { + Ok(Ok(accounts)) => accounts, + Ok(Err(e)) => return response::server_error(&e), + Err(e) => return response::server_error(&e), + }; + let recv_accounts: Vec = all_accounts + .into_iter() + .filter(|account| account.receive) + .filter(|account| account.address == node_id) // TODO: Implement proper account permission system + .map(|account| Account { + platform: account.platform, + address: account.address, + }) + .collect(); + response::ok(recv_accounts) +} + +async fn get_requestor_accounts(id: Identity) -> HttpResponse { + let node_id = id.identity.to_string(); + let all_accounts = match bus::service(LOCAL_SERVICE).send(GetAccounts {}).await { + Ok(Ok(accounts)) => accounts, + Ok(Err(e)) => return response::server_error(&e), + Err(e) => return response::server_error(&e), + }; + let recv_accounts: Vec = all_accounts + .into_iter() + .filter(|account| account.send) + .filter(|account| account.address == node_id) // TODO: Implement proper account permission system + .map(|account| Account { + platform: account.platform, + address: account.address, + }) + .collect(); + response::ok(recv_accounts) +} diff --git a/core/payment/src/api/allocations.rs b/core/payment/src/api/allocations.rs new file mode 100644 index 0000000000..91ffdab2df --- /dev/null +++ b/core/payment/src/api/allocations.rs @@ -0,0 +1,163 @@ +// Extrnal crates +use actix_web::web::{delete, get, post, put, Data, Json, Path, Query}; +use actix_web::{HttpResponse, Scope}; +use serde_json::value::Value::Null; + +// Workspace uses +use ya_agreement_utils::{ClauseOperator, ConstraintKey, Constraints}; +use ya_client_model::payment::*; +use ya_core_model::payment::local::{ + ValidateAllocation, ValidateAllocationError, BUS_ID as LOCAL_SERVICE, +}; +use ya_core_model::payment::RpcMessageError; +use ya_persistence::executor::DbExecutor; +use ya_service_api_web::middleware::Identity; +use ya_service_bus::{typed as bus, RpcEndpoint}; + +// Local uses +use crate::api::*; +use crate::dao::*; +use crate::error::{DbError, Error}; +use crate::utils::response; +use crate::DEFAULT_PAYMENT_PLATFORM; + +pub fn register_endpoints(scope: Scope) -> Scope { + scope + .route("/allocations", post().to(create_allocation)) + .route("/allocations", get().to(get_allocations)) + .route("/allocations/{allocation_id}", get().to(get_allocation)) + .route("/allocations/{allocation_id}", put().to(amend_allocation)) + .route( + "/allocations/{allocation_id}", + delete().to(release_allocation), + ) + .route("/decorateDemand", get().to(decorate_demand)) +} + +async fn create_allocation( + db: Data, + body: Json, + id: Identity, +) -> HttpResponse { + // TODO: Handle deposits & timeouts + let allocation = body.into_inner(); + let node_id = id.identity; + let payment_platform = allocation + .payment_platform + .clone() + .unwrap_or(DEFAULT_PAYMENT_PLATFORM.to_string()); + let address = allocation.address.clone().unwrap_or(node_id.to_string()); + + let validate_msg = ValidateAllocation { + platform: payment_platform.clone(), + address: address.clone(), + amount: allocation.total_amount.clone(), + }; + match async move { Ok(bus::service(LOCAL_SERVICE).send(validate_msg).await??) }.await { + Ok(true) => {} + Ok(false) => return response::bad_request(&"Insufficient funds to make allocation"), + Err(Error::Rpc(RpcMessageError::ValidateAllocation( + ValidateAllocationError::AccountNotRegistered, + ))) => return response::bad_request(&"Account not registered"), + Err(e) => return response::server_error(&e), + } + + let dao: AllocationDao = db.as_dao(); + match async move { + let allocation_id = dao + .create(allocation, node_id, payment_platform, address) + .await?; + Ok(dao.get(allocation_id, node_id).await?) + } + .await + { + Ok(Some(allocation)) => response::created(allocation), + Ok(None) => response::server_error(&"Database error"), + Err(DbError::Query(e)) => response::bad_request(&e), + Err(e) => response::server_error(&e), + } +} + +async fn get_allocations(db: Data, id: Identity) -> HttpResponse { + let node_id = id.identity; + let dao: AllocationDao = db.as_dao(); + match dao.get_for_owner(node_id).await { + Ok(allocations) => response::ok(allocations), + Err(e) => response::server_error(&e), + } +} + +async fn get_allocation( + db: Data, + path: Path, + id: Identity, +) -> HttpResponse { + let allocation_id = path.allocation_id.clone(); + let node_id = id.identity; + let dao: AllocationDao = db.as_dao(); + match dao.get(allocation_id, node_id).await { + Ok(Some(allocation)) => response::ok(allocation), + Ok(None) => response::not_found(), + Err(e) => response::server_error(&e), + } +} + +async fn amend_allocation( + db: Data, + path: Path, + body: Json, +) -> HttpResponse { + response::not_implemented() // TODO +} + +async fn release_allocation( + db: Data, + path: Path, + id: Identity, +) -> HttpResponse { + let allocation_id = path.allocation_id.clone(); + let node_id = id.identity; + let dao: AllocationDao = db.as_dao(); + match dao.release(allocation_id, node_id).await { + Ok(true) => response::ok(Null), + Ok(false) => response::not_found(), + Err(e) => response::server_error(&e), + } +} + +async fn decorate_demand( + db: Data, + path: Query, + id: Identity, +) -> HttpResponse { + let allocation_ids = path.allocation_ids.clone(); + let node_id = id.identity; + let dao: AllocationDao = db.as_dao(); + let allocations = match dao.get_many(allocation_ids, node_id).await { + Ok(allocations) => allocations, + Err(e) => return response::server_error(&e), + }; + if allocations.len() != path.allocation_ids.len() { + return response::not_found(); + } + + let properties: Vec = allocations + .into_iter() + .map(|allocation| MarketProperty { + key: format!( + "golem.com.payment.platform.{}.address", + allocation.payment_platform + ), + value: allocation.address, + }) + .collect(); + let constraints = properties + .iter() + .map(|property| ConstraintKey::new(property.key.as_str()).equal_to(ConstraintKey::new("*"))) + .collect(); + let constraints = vec![Constraints::new_clause(ClauseOperator::Or, constraints).to_string()]; + response::ok(MarketDecoration { + properties, + constraints, + }) +} diff --git a/core/payment/src/api/debit_notes.rs b/core/payment/src/api/debit_notes.rs new file mode 100644 index 0000000000..9b39b86125 --- /dev/null +++ b/core/payment/src/api/debit_notes.rs @@ -0,0 +1,309 @@ +// Extrnal crates +use actix_web::web::{get, post, Data, Json, Path, Query}; +use actix_web::{HttpResponse, Scope}; +use serde_json::value::Value::Null; + +// Workspace uses +use metrics::counter; +use ya_client_model::payment::*; +use ya_core_model::payment::local::{SchedulePayment, BUS_ID as LOCAL_SERVICE}; +use ya_core_model::payment::public::{ + AcceptDebitNote, AcceptRejectError, SendDebitNote, SendError, BUS_ID as PUBLIC_SERVICE, +}; +use ya_core_model::payment::RpcMessageError; +use ya_net::RemoteEndpoint; +use ya_persistence::executor::DbExecutor; +use ya_persistence::types::Role; +use ya_service_api_web::middleware::Identity; +use ya_service_bus::{typed as bus, RpcEndpoint}; + +// Local uses +use crate::api::*; +use crate::dao::*; +use crate::error::{DbError, Error}; +use crate::utils::provider::get_agreement_for_activity; +use crate::utils::*; + +pub fn register_endpoints(scope: Scope) -> Scope { + scope + // Shared + .route("/debitNotes", get().to(get_debit_notes)) + .route("/debitNotes/{debit_note_id}", get().to(get_debit_note)) + .route( + "/debitNotes/{debit_note_id}/payments", + get().to(get_debit_note_payments), + ) + .route("/debitNoteEvents", get().to(get_debit_note_events)) + // Provider + .route("/debitNotes", post().to(issue_debit_note)) + .route( + "/debitNotes/{debit_note_id}/send", + post().to(send_debit_note), + ) + .route( + "/debitNotes/{debit_note_id}/cancel", + post().to(cancel_debit_note), + ) + // Requestor + .route( + "/debitNotes/{debit_note_id}/accept", + post().to(accept_debit_note), + ) + .route( + "/debitNotes/{debit_note_id}/reject", + post().to(reject_debit_note), + ) +} + +async fn get_debit_notes(db: Data, id: Identity) -> HttpResponse { + let node_id = id.identity; + let dao: DebitNoteDao = db.as_dao(); + match dao.get_for_node_id(node_id).await { + Ok(debit_notes) => response::ok(debit_notes), + Err(e) => response::server_error(&e), + } +} + +async fn get_debit_note( + db: Data, + path: Path, + id: Identity, +) -> HttpResponse { + let debit_note_id = path.debit_note_id.clone(); + let node_id = id.identity; + let dao: DebitNoteDao = db.as_dao(); + match dao.get(debit_note_id, node_id).await { + Ok(Some(debit_note)) => response::ok(debit_note), + Ok(None) => response::not_found(), + Err(e) => response::server_error(&e), + } +} + +async fn get_debit_note_payments(db: Data, path: Path) -> HttpResponse { + response::not_implemented() // TODO +} + +async fn get_debit_note_events( + db: Data, + query: Query, + id: Identity, +) -> HttpResponse { + let node_id = id.identity; + let timeout_secs = query.timeout; + let later_than = query.later_than.map(|d| d.naive_utc()); + + let dao: DebitNoteEventDao = db.as_dao(); + let getter = || async { + dao.get_for_node_id(node_id.clone(), later_than.clone()) + .await + }; + + match listen_for_events(getter, timeout_secs).await { + Ok(events) => response::ok(events), + Err(e) => response::server_error(&e), + } +} + +// Provider + +async fn issue_debit_note( + db: Data, + body: Json, + id: Identity, +) -> HttpResponse { + let debit_note = body.into_inner(); + let activity_id = debit_note.activity_id.clone(); + + let agreement = match get_agreement_for_activity(activity_id.clone()).await { + Ok(Some(agreement_id)) => agreement_id, + Ok(None) => return response::bad_request(&format!("Activity not found: {}", &activity_id)), + Err(e) => return response::server_error(&e), + }; + let agreement_id = agreement.agreement_id.clone(); + + let node_id = id.identity; + if &node_id != agreement.provider_id() { + return response::unauthorized(); + } + + match async move { + db.as_dao::() + .create_if_not_exists(agreement, node_id, Role::Provider) + .await?; + db.as_dao::() + .create_if_not_exists(activity_id, node_id, Role::Provider, agreement_id) + .await?; + + let dao: DebitNoteDao = db.as_dao(); + let debit_note_id = dao.create_new(debit_note, node_id).await?; + let debit_note = dao.get(debit_note_id, node_id).await?; + + counter!("payment.debit_notes.provider.issued", 1); + Ok(debit_note) + } + .await + { + Ok(Some(debit_note)) => response::created(debit_note), + Ok(None) => response::server_error(&"Database error"), + Err(DbError::Query(e)) => response::bad_request(&e), + Err(e) => response::server_error(&e), + } +} + +async fn send_debit_note( + db: Data, + path: Path, + query: Query, + id: Identity, +) -> HttpResponse { + let debit_note_id = path.debit_note_id.clone(); + let node_id = id.identity; + let dao: DebitNoteDao = db.as_dao(); + let debit_note = match dao.get(debit_note_id.clone(), node_id).await { + Ok(Some(debit_note)) => debit_note, + Ok(None) => return response::not_found(), + Err(e) => return response::server_error(&e), + }; + + if debit_note.status != DocumentStatus::Issued { + return response::ok(Null); // Debit note has been already sent + } + + with_timeout(query.timeout, async move { + match async move { + ya_net::from(node_id) + .to(debit_note.recipient_id) + .service(PUBLIC_SERVICE) + .call(SendDebitNote(debit_note)) + .await??; + dao.mark_received(debit_note_id, node_id).await?; + counter!("payment.debit_notes.provider.sent", 1); + Ok(()) + } + .await + { + Ok(_) => response::ok(Null), + Err(Error::Rpc(RpcMessageError::Send(SendError::BadRequest(e)))) => { + response::bad_request(&e) + } + Err(e) => response::server_error(&e), + } + }) + .await +} + +async fn cancel_debit_note( + db: Data, + path: Path, + query: Query, +) -> HttpResponse { + response::not_implemented() // TODO +} + +// Requestor + +async fn accept_debit_note( + db: Data, + path: Path, + query: Query, + body: Json, + id: Identity, +) -> HttpResponse { + let debit_note_id = path.debit_note_id.clone(); + let node_id = id.identity; + let acceptance = body.into_inner(); + let allocation_id = acceptance.allocation_id.clone(); + + let dao: DebitNoteDao = db.as_dao(); + let debit_note: DebitNote = match dao.get(debit_note_id.clone(), node_id).await { + Ok(Some(debit_note)) => debit_note, + Ok(None) => return response::not_found(), + Err(e) => return response::server_error(&e), + }; + + if debit_note.total_amount_due != acceptance.total_amount_accepted { + return response::bad_request(&"Invalid amount accepted"); + } + + match debit_note.status { + DocumentStatus::Received => (), + DocumentStatus::Rejected => (), + DocumentStatus::Failed => (), + DocumentStatus::Accepted => return response::ok(Null), + DocumentStatus::Settled => return response::ok(Null), + DocumentStatus::Issued => return response::server_error(&"Illegal status: issued"), + DocumentStatus::Cancelled => return response::bad_request(&"Debit note cancelled"), + } + + let activity_id = debit_note.activity_id.clone(); + let activity = match db + .as_dao::() + .get(activity_id.clone(), node_id) + .await + { + Ok(Some(activity)) => activity, + Ok(None) => return response::server_error(&format!("Activity {} not found", activity_id)), + Err(e) => return response::server_error(&e), + }; + let amount_to_pay = &debit_note.total_amount_due - &activity.total_amount_accepted.0; + + let allocation = match db + .as_dao::() + .get(allocation_id.clone(), node_id) + .await + { + Ok(Some(allocation)) => allocation, + Ok(None) => { + return response::bad_request(&format!("Allocation {} not found", allocation_id)) + } + Err(e) => return response::server_error(&e), + }; + if amount_to_pay > allocation.remaining_amount { + let msg = format!( + "Not enough funds. Allocated: {} Needed: {}", + allocation.remaining_amount, amount_to_pay + ); + return response::bad_request(&msg); + } + + with_timeout(query.timeout, async move { + let issuer_id = debit_note.issuer_id; + let accept_msg = AcceptDebitNote::new(debit_note_id.clone(), acceptance, issuer_id); + let schedule_msg = + SchedulePayment::from_debit_note(debit_note, allocation_id, amount_to_pay); + match async move { + ya_net::from(node_id) + .to(issuer_id) + .service(PUBLIC_SERVICE) + .call(accept_msg) + .await??; + if let Some(msg) = schedule_msg { + bus::service(LOCAL_SERVICE).send(msg).await??; + } + dao.accept(debit_note_id, node_id).await?; + + counter!("payment.debit_notes.requestor.accepted", 1); + Ok(()) + } + .await + { + Ok(_) => response::ok(Null), + Err(Error::Rpc(RpcMessageError::AcceptReject(AcceptRejectError::BadRequest(e)))) => { + return response::bad_request(&e); + } + Err(e) => return response::server_error(&e), + } + + // TODO: Compute amount to pay and schedule payment + }) + .await +} + +async fn reject_debit_note( + db: Data, + path: Path, + query: Query, + body: Json, +) -> HttpResponse { + response::not_implemented() // TODO +} diff --git a/core/payment/src/api/provider.rs b/core/payment/src/api/invoices.rs similarity index 55% rename from core/payment/src/api/provider.rs rename to core/payment/src/api/invoices.rs index 39caf60977..bebff4d331 100644 --- a/core/payment/src/api/provider.rs +++ b/core/payment/src/api/invoices.rs @@ -1,16 +1,15 @@ -use crate::api::*; -use crate::dao::*; -use crate::error::{DbError, Error}; -use crate::utils::provider::*; -use crate::utils::*; +// Extrnal crates use actix_web::web::{get, post, Data, Json, Path, Query}; use actix_web::{HttpResponse, Scope}; -use metrics::counter; use serde_json::value::Value::Null; + +// Workspace uses +use metrics::counter; use ya_client_model::payment::*; -use ya_core_model::payment::local::{GetAccounts, BUS_ID as LOCAL_SERVICE}; +use ya_core_model::payment::local::{SchedulePayment, BUS_ID as LOCAL_SERVICE}; use ya_core_model::payment::public::{ - CancelError, CancelInvoice, SendDebitNote, SendError, SendInvoice, BUS_ID as PUBLIC_SERVICE, + AcceptInvoice, AcceptRejectError, CancelError, CancelInvoice, SendError, SendInvoice, + BUS_ID as PUBLIC_SERVICE, }; use ya_core_model::payment::RpcMessageError; use ya_net::RemoteEndpoint; @@ -19,160 +18,57 @@ use ya_persistence::types::Role; use ya_service_api_web::middleware::Identity; use ya_service_bus::{typed as bus, RpcEndpoint}; +// Local uses +use crate::api::*; +use crate::dao::*; +use crate::error::{DbError, Error}; +use crate::utils::provider::get_agreement_id; +use crate::utils::*; + pub fn register_endpoints(scope: Scope) -> Scope { scope - .route("/debitNotes", post().to(issue_debit_note)) - .route("/debitNotes", get().to(get_debit_notes)) - .route("/debitNotes/{debit_note_id}", get().to(get_debit_note)) - .route( - "/debitNotes/{debit_note_id}/payments", - get().to(get_debit_note_payments), - ) - .route( - "/debitNotes/{debit_note_id}/send", - post().to(send_debit_note), - ) - .route( - "/debitNotes/{debit_note_id}/cancel", - post().to(cancel_debit_note), - ) - .route("/debitNoteEvents", get().to(get_debit_note_events)) - .route("/invoices", post().to(issue_invoice)) + // Shared .route("/invoices", get().to(get_invoices)) .route("/invoices/{invoice_id}", get().to(get_invoice)) .route( "/invoices/{invoice_id}/payments", get().to(get_invoice_payments), ) + .route("/invoiceEvents", get().to(get_invoice_events)) + // Provider + .route("/invoices", post().to(issue_invoice)) .route("/invoices/{invoice_id}/send", post().to(send_invoice)) .route("/invoices/{invoice_id}/cancel", post().to(cancel_invoice)) - .route("/invoiceEvents", get().to(get_invoice_events)) - .route("/payments", get().to(get_payments)) - .route("/payments/{payment_id}", get().to(get_payment)) - .route("/accounts", get().to(get_accounts)) -} - -// ************************** DEBIT NOTE ************************** - -async fn issue_debit_note( - db: Data, - body: Json, - id: Identity, -) -> HttpResponse { - let debit_note = body.into_inner(); - let activity_id = debit_note.activity_id.clone(); - - let agreement = match get_agreement_for_activity(activity_id.clone()).await { - Ok(Some(agreement_id)) => agreement_id, - Ok(None) => return response::bad_request(&format!("Activity not found: {}", &activity_id)), - Err(e) => return response::server_error(&e), - }; - let agreement_id = agreement.agreement_id.clone(); - - let node_id = id.identity; - if &node_id != agreement.provider_id() { - return response::unauthorized(); - } - - match async move { - db.as_dao::() - .create_if_not_exists(agreement, node_id, Role::Provider) - .await?; - db.as_dao::() - .create_if_not_exists(activity_id, node_id, Role::Provider, agreement_id) - .await?; - - let dao: DebitNoteDao = db.as_dao(); - let debit_note_id = dao.create_new(debit_note, node_id).await?; - let debit_note = dao.get(debit_note_id, node_id).await?; - - counter!("payment.debit_notes.provider.issued", 1); - Ok(debit_note) - } - .await - { - Ok(Some(debit_note)) => response::created(debit_note), - Ok(None) => response::server_error(&"Database error"), - Err(DbError::Query(e)) => response::bad_request(&e), - Err(e) => response::server_error(&e), - } + // Requestor + .route("/invoices/{invoice_id}/accept", post().to(accept_invoice)) + .route("/invoices/{invoice_id}/reject", post().to(reject_invoice)) } -async fn get_debit_notes(db: Data, id: Identity) -> HttpResponse { +async fn get_invoices(db: Data, id: Identity) -> HttpResponse { let node_id = id.identity; - let dao: DebitNoteDao = db.as_dao(); - match dao.get_for_provider(node_id).await { - Ok(debit_notes) => response::ok(debit_notes), + let dao: InvoiceDao = db.as_dao(); + match dao.get_for_node_id(node_id).await { + Ok(invoices) => response::ok(invoices), Err(e) => response::server_error(&e), } } -async fn get_debit_note( - db: Data, - path: Path, - id: Identity, -) -> HttpResponse { - let debit_note_id = path.debit_note_id.clone(); +async fn get_invoice(db: Data, path: Path, id: Identity) -> HttpResponse { + let invoice_id = path.invoice_id.clone(); let node_id = id.identity; - let dao: DebitNoteDao = db.as_dao(); - match dao.get(debit_note_id, node_id).await { - Ok(Some(debit_note)) => response::ok(debit_note), + let dao: InvoiceDao = db.as_dao(); + match dao.get(invoice_id, node_id).await { + Ok(Some(invoice)) => response::ok(invoice), Ok(None) => response::not_found(), Err(e) => response::server_error(&e), } } -async fn send_debit_note( - db: Data, - path: Path, - query: Query, - id: Identity, -) -> HttpResponse { - let debit_note_id = path.debit_note_id.clone(); - let node_id = id.identity; - let dao: DebitNoteDao = db.as_dao(); - let debit_note = match dao.get(debit_note_id.clone(), node_id).await { - Ok(Some(debit_note)) => debit_note, - Ok(None) => return response::not_found(), - Err(e) => return response::server_error(&e), - }; - - if debit_note.status != DocumentStatus::Issued { - return response::ok(Null); // Debit note has been already sent - } - - with_timeout(query.timeout, async move { - match async move { - ya_net::from(node_id) - .to(debit_note.recipient_id) - .service(PUBLIC_SERVICE) - .call(SendDebitNote(debit_note)) - .await??; - dao.mark_received(debit_note_id, node_id).await?; - counter!("payment.debit_notes.provider.sent", 1); - Ok(()) - } - .await - { - Ok(_) => response::ok(Null), - Err(Error::Rpc(RpcMessageError::Send(SendError::BadRequest(e)))) => { - response::bad_request(&e) - } - Err(e) => response::server_error(&e), - } - }) - .await -} - -async fn cancel_debit_note( - db: Data, - path: Path, - query: Query, -) -> HttpResponse { +async fn get_invoice_payments(db: Data, path: Path) -> HttpResponse { response::not_implemented() // TODO } -async fn get_debit_note_events( +async fn get_invoice_events( db: Data, query: Query, id: Identity, @@ -181,19 +77,18 @@ async fn get_debit_note_events( let timeout_secs = query.timeout; let later_than = query.later_than.map(|d| d.naive_utc()); - let dao: DebitNoteEventDao = db.as_dao(); + let dao: InvoiceEventDao = db.as_dao(); let getter = || async { - dao.get_for_provider(node_id.clone(), later_than.clone()) + dao.get_for_node_id(node_id.clone(), later_than.clone()) .await }; - match listen_for_events(getter, timeout_secs).await { Ok(events) => response::ok(events), Err(e) => response::server_error(&e), } } -// *************************** INVOICE **************************** +// Provider async fn issue_invoice(db: Data, body: Json, id: Identity) -> HttpResponse { let invoice = body.into_inner(); @@ -256,26 +151,6 @@ async fn issue_invoice(db: Data, body: Json, id: Identit } } -async fn get_invoices(db: Data, id: Identity) -> HttpResponse { - let node_id = id.identity; - let dao: InvoiceDao = db.as_dao(); - match dao.get_for_provider(node_id).await { - Ok(invoices) => response::ok(invoices), - Err(e) => response::server_error(&e), - } -} - -async fn get_invoice(db: Data, path: Path, id: Identity) -> HttpResponse { - let invoice_id = path.invoice_id.clone(); - let node_id = id.identity; - let dao: InvoiceDao = db.as_dao(); - match dao.get(invoice_id, node_id).await { - Ok(Some(invoice)) => response::ok(invoice), - Ok(None) => response::not_found(), - Err(e) => response::server_error(&e), - } -} - async fn send_invoice( db: Data, path: Path, @@ -371,85 +246,108 @@ async fn cancel_invoice( .await } -async fn get_invoice_events( +// Requestor + +async fn accept_invoice( db: Data, - query: Query, + path: Path, + query: Query, + body: Json, id: Identity, ) -> HttpResponse { + let invoice_id = path.invoice_id.clone(); let node_id = id.identity; - let timeout_secs = query.timeout; - let later_than = query.later_than.map(|d| d.naive_utc()); + let acceptance = body.into_inner(); + let allocation_id = acceptance.allocation_id.clone(); - let dao: InvoiceEventDao = db.as_dao(); - let getter = || async { - dao.get_for_provider(node_id.clone(), later_than.clone()) - .await + let dao: InvoiceDao = db.as_dao(); + let invoice = match dao.get(invoice_id.clone(), node_id).await { + Ok(Some(invoice)) => invoice, + Ok(None) => return response::not_found(), + Err(e) => return response::server_error(&e), }; - match listen_for_events(getter, timeout_secs).await { - Ok(events) => response::ok(events), - Err(e) => response::server_error(&e), - } -} -// *************************** PAYMENT **************************** + if invoice.amount != acceptance.total_amount_accepted { + return response::bad_request(&"Invalid amount accepted"); + } -async fn get_payments( - db: Data, - query: Query, - id: Identity, -) -> HttpResponse { - let node_id = id.identity; - let timeout_secs = query.timeout; - let later_than = query.later_than.map(|d| d.naive_utc()); + match invoice.status { + DocumentStatus::Received => (), + DocumentStatus::Rejected => (), + DocumentStatus::Failed => (), + DocumentStatus::Accepted => return response::ok(Null), + DocumentStatus::Settled => return response::ok(Null), + DocumentStatus::Cancelled => return response::bad_request(&"Invoice cancelled"), + DocumentStatus::Issued => return response::server_error(&"Illegal status: issued"), + } - let dao: PaymentDao = db.as_dao(); - let getter = || async { - dao.get_for_provider(node_id.clone(), later_than.clone()) - .await + let agreement_id = invoice.agreement_id.clone(); + let agreement = match db + .as_dao::() + .get(agreement_id.clone(), node_id) + .await + { + Ok(Some(agreement)) => agreement, + Ok(None) => { + return response::server_error(&format!("Agreement {} not found", agreement_id)) + } + Err(e) => return response::server_error(&e), }; + let amount_to_pay = &invoice.amount - &agreement.total_amount_accepted.0; - match listen_for_events(getter, timeout_secs).await { - Ok(payments) => response::ok(payments), - Err(e) => response::server_error(&e), + let allocation = match db + .as_dao::() + .get(allocation_id.clone(), node_id) + .await + { + Ok(Some(allocation)) => allocation, + Ok(None) => { + return response::bad_request(&format!("Allocation {} not found", allocation_id)) + } + Err(e) => return response::server_error(&e), + }; + // FIXME: remaining amount should be 'locked' until payment is done to avoid double spending + if amount_to_pay > allocation.remaining_amount { + let msg = format!( + "Not enough funds. Allocated: {} Needed: {}", + allocation.remaining_amount, amount_to_pay + ); + return response::bad_request(&msg); } -} -async fn get_payment(db: Data, path: Path, id: Identity) -> HttpResponse { - let payment_id = path.payment_id.clone(); - let node_id = id.identity; - let dao: PaymentDao = db.as_dao(); - match dao.get(payment_id, node_id).await { - Ok(Some(payment)) => response::ok(payment), - Ok(None) => response::not_found(), - Err(e) => response::server_error(&e), - } -} + with_timeout(query.timeout, async move { + let issuer_id = invoice.issuer_id; + let accept_msg = AcceptInvoice::new(invoice_id.clone(), acceptance, issuer_id); + let schedule_msg = SchedulePayment::from_invoice(invoice, allocation_id, amount_to_pay); + match async move { + ya_net::from(node_id) + .to(issuer_id) + .service(PUBLIC_SERVICE) + .call(accept_msg) + .await??; + bus::service(LOCAL_SERVICE).send(schedule_msg).await??; + dao.accept(invoice_id, node_id).await?; -async fn get_debit_note_payments(db: Data, path: Path) -> HttpResponse { - response::not_implemented() // TODO + counter!("payment.invoices.requestor.accepted", 1); + Ok(()) + } + .await + { + Ok(_) => response::ok(Null), + Err(Error::Rpc(RpcMessageError::AcceptReject(AcceptRejectError::BadRequest(e)))) => { + return response::bad_request(&e) + } + Err(e) => return response::server_error(&e), + } + }) + .await } -async fn get_invoice_payments(db: Data, path: Path) -> HttpResponse { +async fn reject_invoice( + db: Data, + path: Path, + query: Query, + body: Json, +) -> HttpResponse { response::not_implemented() // TODO } - -// *************************** ACCOUNTS **************************** - -async fn get_accounts(id: Identity) -> HttpResponse { - let node_id = id.identity.to_string(); - let all_accounts = match bus::service(LOCAL_SERVICE).send(GetAccounts {}).await { - Ok(Ok(accounts)) => accounts, - Ok(Err(e)) => return response::server_error(&e), - Err(e) => return response::server_error(&e), - }; - let recv_accounts: Vec = all_accounts - .into_iter() - .filter(|account| account.receive) - .filter(|account| account.address == node_id) // TODO: Implement proper account permission system - .map(|account| Account { - platform: account.platform, - address: account.address, - }) - .collect(); - response::ok(recv_accounts) -} diff --git a/core/payment/src/api/payments.rs b/core/payment/src/api/payments.rs new file mode 100644 index 0000000000..234fbda0a6 --- /dev/null +++ b/core/payment/src/api/payments.rs @@ -0,0 +1,50 @@ +// Extrnal crates +use actix_web::web::{get, Data, Path, Query}; +use actix_web::{HttpResponse, Scope}; + +// Workspace uses +use ya_persistence::executor::DbExecutor; +use ya_service_api_web::middleware::Identity; + +// Local uses +use crate::api::*; +use crate::dao::*; +use crate::utils::*; + +pub fn register_endpoints(scope: Scope) -> Scope { + scope + .route("/payments", get().to(get_payments)) + .route("/payments/{payment_id}", get().to(get_payment)) +} + +async fn get_payments( + db: Data, + query: Query, + id: Identity, +) -> HttpResponse { + let node_id = id.identity; + let timeout_secs = query.timeout; + let later_than = query.later_than.map(|d| d.naive_utc()); + + let dao: PaymentDao = db.as_dao(); + let getter = || async { + dao.get_for_node_id(node_id.clone(), later_than.clone()) + .await + }; + + match listen_for_events(getter, timeout_secs).await { + Ok(payments) => response::ok(payments), + Err(e) => response::server_error(&e), + } +} + +async fn get_payment(db: Data, path: Path, id: Identity) -> HttpResponse { + let payment_id = path.payment_id.clone(); + let node_id = id.identity; + let dao: PaymentDao = db.as_dao(); + match dao.get(payment_id, node_id).await { + Ok(Some(payment)) => response::ok(payment), + Ok(None) => response::not_found(), + Err(e) => response::server_error(&e), + } +} diff --git a/core/payment/src/api/requestor.rs b/core/payment/src/api/requestor.rs deleted file mode 100644 index 634a61a6e0..0000000000 --- a/core/payment/src/api/requestor.rs +++ /dev/null @@ -1,557 +0,0 @@ -use crate::api::*; -use crate::dao::*; -use crate::error::{DbError, Error}; -use crate::utils::{listen_for_events, response, with_timeout}; -use crate::DEFAULT_PAYMENT_PLATFORM; -use actix_web::web::{delete, get, post, put, Data, Json, Path, Query}; -use actix_web::{HttpResponse, Scope}; -use metrics::counter; -use serde_json::value::Value::Null; -use ya_agreement_utils::{ClauseOperator, ConstraintKey, Constraints}; -use ya_client_model::payment::*; -use ya_core_model::payment::local::{ - GetAccounts, SchedulePayment, ValidateAllocation, ValidateAllocationError, - BUS_ID as LOCAL_SERVICE, -}; -use ya_core_model::payment::public::{ - AcceptDebitNote, AcceptInvoice, AcceptRejectError, BUS_ID as PUBLIC_SERVICE, -}; -use ya_core_model::payment::RpcMessageError; -use ya_net::RemoteEndpoint; -use ya_persistence::executor::DbExecutor; -use ya_service_api_web::middleware::Identity; -use ya_service_bus::{typed as bus, RpcEndpoint}; - -pub fn register_endpoints(scope: Scope) -> Scope { - scope - .route("/debitNotes", get().to(get_debit_notes)) - .route("/debitNotes/{debit_note_id}", get().to(get_debit_note)) - .route( - "/debitNotes/{debit_note_id}/payments", - get().to(get_debit_note_payments), - ) - .route( - "/debitNotes/{debit_note_id}/accept", - post().to(accept_debit_note), - ) - .route( - "/debitNotes/{debit_note_id}/reject", - post().to(reject_debit_note), - ) - .route("/debitNoteEvents", get().to(get_debit_note_events)) - .route("/invoices", get().to(get_invoices)) - .route("/invoices/{invoice_id}", get().to(get_invoice)) - .route( - "/invoices/{invoice_id}/payments", - get().to(get_invoice_payments), - ) - .route("/invoices/{invoice_id}/accept", post().to(accept_invoice)) - .route("/invoices/{invoice_id}/reject", post().to(reject_invoice)) - .route("/invoiceEvents", get().to(get_invoice_events)) - .route("/allocations", post().to(create_allocation)) - .route("/allocations", get().to(get_allocations)) - .route("/allocations/{allocation_id}", get().to(get_allocation)) - .route("/allocations/{allocation_id}", put().to(amend_allocation)) - .route( - "/allocations/{allocation_id}", - delete().to(release_allocation), - ) - .route("/payments", get().to(get_payments)) - .route("/payments/{payment_id}", get().to(get_payment)) - .route("/accounts", get().to(get_accounts)) - .route("/decorateDemand", get().to(decorate_demand)) -} - -// ************************** DEBIT NOTE ************************** - -async fn get_debit_notes(db: Data, id: Identity) -> HttpResponse { - let node_id = id.identity; - let dao: DebitNoteDao = db.as_dao(); - match dao.get_for_requestor(node_id).await { - Ok(debit_notes) => response::ok(debit_notes), - Err(e) => response::server_error(&e), - } -} - -async fn get_debit_note( - db: Data, - path: Path, - id: Identity, -) -> HttpResponse { - let debit_note_id = path.debit_note_id.clone(); - let node_id = id.identity; - let dao: DebitNoteDao = db.as_dao(); - match dao.get(debit_note_id, node_id).await { - Ok(Some(debit_note)) => response::ok(debit_note), - Ok(None) => response::not_found(), - Err(e) => response::server_error(&e), - } -} - -async fn accept_debit_note( - db: Data, - path: Path, - query: Query, - body: Json, - id: Identity, -) -> HttpResponse { - let debit_note_id = path.debit_note_id.clone(); - let node_id = id.identity; - let acceptance = body.into_inner(); - let allocation_id = acceptance.allocation_id.clone(); - - let dao: DebitNoteDao = db.as_dao(); - let debit_note: DebitNote = match dao.get(debit_note_id.clone(), node_id).await { - Ok(Some(debit_note)) => debit_note, - Ok(None) => return response::not_found(), - Err(e) => return response::server_error(&e), - }; - - if debit_note.total_amount_due != acceptance.total_amount_accepted { - return response::bad_request(&"Invalid amount accepted"); - } - - match debit_note.status { - DocumentStatus::Received => (), - DocumentStatus::Rejected => (), - DocumentStatus::Failed => (), - DocumentStatus::Accepted => return response::ok(Null), - DocumentStatus::Settled => return response::ok(Null), - DocumentStatus::Issued => return response::server_error(&"Illegal status: issued"), - DocumentStatus::Cancelled => return response::bad_request(&"Debit note cancelled"), - } - - let activity_id = debit_note.activity_id.clone(); - let activity = match db - .as_dao::() - .get(activity_id.clone(), node_id) - .await - { - Ok(Some(activity)) => activity, - Ok(None) => return response::server_error(&format!("Activity {} not found", activity_id)), - Err(e) => return response::server_error(&e), - }; - let amount_to_pay = &debit_note.total_amount_due - &activity.total_amount_accepted.0; - - let allocation = match db - .as_dao::() - .get(allocation_id.clone(), node_id) - .await - { - Ok(Some(allocation)) => allocation, - Ok(None) => { - return response::bad_request(&format!("Allocation {} not found", allocation_id)) - } - Err(e) => return response::server_error(&e), - }; - if amount_to_pay > allocation.remaining_amount { - let msg = format!( - "Not enough funds. Allocated: {} Needed: {}", - allocation.remaining_amount, amount_to_pay - ); - return response::bad_request(&msg); - } - - with_timeout(query.timeout, async move { - let issuer_id = debit_note.issuer_id; - let accept_msg = AcceptDebitNote::new(debit_note_id.clone(), acceptance, issuer_id); - let schedule_msg = - SchedulePayment::from_debit_note(debit_note, allocation_id, amount_to_pay); - match async move { - ya_net::from(node_id) - .to(issuer_id) - .service(PUBLIC_SERVICE) - .call(accept_msg) - .await??; - if let Some(msg) = schedule_msg { - bus::service(LOCAL_SERVICE).send(msg).await??; - } - dao.accept(debit_note_id, node_id).await?; - - counter!("payment.debit_notes.requestor.accepted", 1); - Ok(()) - } - .await - { - Ok(_) => response::ok(Null), - Err(Error::Rpc(RpcMessageError::AcceptReject(AcceptRejectError::BadRequest(e)))) => { - return response::bad_request(&e); - } - Err(e) => return response::server_error(&e), - } - - // TODO: Compute amount to pay and schedule payment - }) - .await -} - -async fn reject_debit_note( - db: Data, - path: Path, - query: Query, - body: Json, -) -> HttpResponse { - response::not_implemented() // TODO -} - -async fn get_debit_note_events( - db: Data, - query: Query, - id: Identity, -) -> HttpResponse { - let node_id = id.identity; - let timeout_secs = query.timeout; - let later_than = query.later_than.map(|d| d.naive_utc()); - - let dao: DebitNoteEventDao = db.as_dao(); - let getter = || async { - dao.get_for_requestor(node_id.clone(), later_than.clone()) - .await - }; - - match listen_for_events(getter, timeout_secs).await { - Ok(events) => response::ok(events), - Err(e) => response::server_error(&e), - } -} - -// *************************** INVOICE **************************** - -async fn get_invoices(db: Data, id: Identity) -> HttpResponse { - let node_id = id.identity; - let dao: InvoiceDao = db.as_dao(); - match dao.get_for_requestor(node_id).await { - Ok(invoices) => response::ok(invoices), - Err(e) => response::server_error(&e), - } -} - -async fn get_invoice(db: Data, path: Path, id: Identity) -> HttpResponse { - let invoice_id = path.invoice_id.clone(); - let node_id = id.identity; - let dao: InvoiceDao = db.as_dao(); - match dao.get(invoice_id, node_id).await { - Ok(Some(invoice)) => response::ok(invoice), - Ok(None) => response::not_found(), - Err(e) => response::server_error(&e), - } -} - -async fn accept_invoice( - db: Data, - path: Path, - query: Query, - body: Json, - id: Identity, -) -> HttpResponse { - let invoice_id = path.invoice_id.clone(); - let node_id = id.identity; - let acceptance = body.into_inner(); - let allocation_id = acceptance.allocation_id.clone(); - - let dao: InvoiceDao = db.as_dao(); - let invoice = match dao.get(invoice_id.clone(), node_id).await { - Ok(Some(invoice)) => invoice, - Ok(None) => return response::not_found(), - Err(e) => return response::server_error(&e), - }; - - if invoice.amount != acceptance.total_amount_accepted { - return response::bad_request(&"Invalid amount accepted"); - } - - match invoice.status { - DocumentStatus::Received => (), - DocumentStatus::Rejected => (), - DocumentStatus::Failed => (), - DocumentStatus::Accepted => return response::ok(Null), - DocumentStatus::Settled => return response::ok(Null), - DocumentStatus::Cancelled => return response::bad_request(&"Invoice cancelled"), - DocumentStatus::Issued => return response::server_error(&"Illegal status: issued"), - } - - let agreement_id = invoice.agreement_id.clone(); - let agreement = match db - .as_dao::() - .get(agreement_id.clone(), node_id) - .await - { - Ok(Some(agreement)) => agreement, - Ok(None) => { - return response::server_error(&format!("Agreement {} not found", agreement_id)) - } - Err(e) => return response::server_error(&e), - }; - let amount_to_pay = &invoice.amount - &agreement.total_amount_accepted.0; - - let allocation = match db - .as_dao::() - .get(allocation_id.clone(), node_id) - .await - { - Ok(Some(allocation)) => allocation, - Ok(None) => { - return response::bad_request(&format!("Allocation {} not found", allocation_id)) - } - Err(e) => return response::server_error(&e), - }; - // FIXME: remaining amount should be 'locked' until payment is done to avoid double spending - if amount_to_pay > allocation.remaining_amount { - let msg = format!( - "Not enough funds. Allocated: {} Needed: {}", - allocation.remaining_amount, amount_to_pay - ); - return response::bad_request(&msg); - } - - with_timeout(query.timeout, async move { - let issuer_id = invoice.issuer_id; - let accept_msg = AcceptInvoice::new(invoice_id.clone(), acceptance, issuer_id); - let schedule_msg = SchedulePayment::from_invoice(invoice, allocation_id, amount_to_pay); - match async move { - ya_net::from(node_id) - .to(issuer_id) - .service(PUBLIC_SERVICE) - .call(accept_msg) - .await??; - bus::service(LOCAL_SERVICE).send(schedule_msg).await??; - dao.accept(invoice_id, node_id).await?; - - counter!("payment.invoices.requestor.accepted", 1); - Ok(()) - } - .await - { - Ok(_) => response::ok(Null), - Err(Error::Rpc(RpcMessageError::AcceptReject(AcceptRejectError::BadRequest(e)))) => { - return response::bad_request(&e) - } - Err(e) => return response::server_error(&e), - } - }) - .await -} - -async fn reject_invoice( - db: Data, - path: Path, - query: Query, - body: Json, -) -> HttpResponse { - response::not_implemented() // TODO -} - -async fn get_invoice_events( - db: Data, - query: Query, - id: Identity, -) -> HttpResponse { - let node_id = id.identity; - let timeout_secs = query.timeout; - let later_than = query.later_than.map(|d| d.naive_utc()); - - let dao: InvoiceEventDao = db.as_dao(); - let getter = || async { - dao.get_for_requestor(node_id.clone(), later_than.clone()) - .await - }; - match listen_for_events(getter, timeout_secs).await { - Ok(events) => response::ok(events), - Err(e) => response::server_error(&e), - } -} - -// ************************** ALLOCATION ************************** - -async fn create_allocation( - db: Data, - body: Json, - id: Identity, -) -> HttpResponse { - // TODO: Handle deposits & timeouts - let allocation = body.into_inner(); - let node_id = id.identity; - let payment_platform = allocation - .payment_platform - .clone() - .unwrap_or(DEFAULT_PAYMENT_PLATFORM.to_string()); - let address = allocation.address.clone().unwrap_or(node_id.to_string()); - - let validate_msg = ValidateAllocation { - platform: payment_platform.clone(), - address: address.clone(), - amount: allocation.total_amount.clone(), - }; - match async move { Ok(bus::service(LOCAL_SERVICE).send(validate_msg).await??) }.await { - Ok(true) => {} - Ok(false) => return response::bad_request(&"Insufficient funds to make allocation"), - Err(Error::Rpc(RpcMessageError::ValidateAllocation( - ValidateAllocationError::AccountNotRegistered, - ))) => return response::bad_request(&"Account not registered"), - Err(e) => return response::server_error(&e), - } - - let dao: AllocationDao = db.as_dao(); - match async move { - let allocation_id = dao - .create(allocation, node_id, payment_platform, address) - .await?; - Ok(dao.get(allocation_id, node_id).await?) - } - .await - { - Ok(Some(allocation)) => response::created(allocation), - Ok(None) => response::server_error(&"Database error"), - Err(DbError::Query(e)) => response::bad_request(&e), - Err(e) => response::server_error(&e), - } -} - -async fn get_allocations(db: Data, id: Identity) -> HttpResponse { - let node_id = id.identity; - let dao: AllocationDao = db.as_dao(); - match dao.get_for_owner(node_id).await { - Ok(allocations) => response::ok(allocations), - Err(e) => response::server_error(&e), - } -} - -async fn get_allocation( - db: Data, - path: Path, - id: Identity, -) -> HttpResponse { - let allocation_id = path.allocation_id.clone(); - let node_id = id.identity; - let dao: AllocationDao = db.as_dao(); - match dao.get(allocation_id, node_id).await { - Ok(Some(allocation)) => response::ok(allocation), - Ok(None) => response::not_found(), - Err(e) => response::server_error(&e), - } -} - -async fn amend_allocation( - db: Data, - path: Path, - body: Json, -) -> HttpResponse { - response::not_implemented() // TODO -} - -async fn release_allocation( - db: Data, - path: Path, - id: Identity, -) -> HttpResponse { - let allocation_id = path.allocation_id.clone(); - let node_id = id.identity; - let dao: AllocationDao = db.as_dao(); - match dao.release(allocation_id, node_id).await { - Ok(true) => response::ok(Null), - Ok(false) => response::not_found(), - Err(e) => response::server_error(&e), - } -} - -// *************************** PAYMENT **************************** - -async fn get_payments( - db: Data, - query: Query, - id: Identity, -) -> HttpResponse { - let node_id = id.identity; - let timeout_secs = query.timeout; - let later_than = query.later_than.map(|d| d.naive_utc()); - - let dao: PaymentDao = db.as_dao(); - let getter = || async { - dao.get_for_requestor(node_id.clone(), later_than.clone()) - .await - }; - - match listen_for_events(getter, timeout_secs).await { - Ok(payments) => response::ok(payments), - Err(e) => response::server_error(&e), - } -} - -async fn get_payment(db: Data, path: Path, id: Identity) -> HttpResponse { - let payment_id = path.payment_id.clone(); - let node_id = id.identity; - let dao: PaymentDao = db.as_dao(); - match dao.get(payment_id, node_id).await { - Ok(Some(payment)) => response::ok(payment), - Ok(None) => response::not_found(), - Err(e) => response::server_error(&e), - } -} - -async fn get_debit_note_payments(db: Data, path: Path) -> HttpResponse { - response::not_implemented() // TODO -} - -async fn get_invoice_payments(db: Data, path: Path) -> HttpResponse { - response::not_implemented() // TODO -} - -// *************************** ACCOUNTS **************************** - -async fn get_accounts(id: Identity) -> HttpResponse { - let node_id = id.identity.to_string(); - let all_accounts = match bus::service(LOCAL_SERVICE).send(GetAccounts {}).await { - Ok(Ok(accounts)) => accounts, - Ok(Err(e)) => return response::server_error(&e), - Err(e) => return response::server_error(&e), - }; - let recv_accounts: Vec = all_accounts - .into_iter() - .filter(|account| account.send) - .filter(|account| account.address == node_id) // TODO: Implement proper account permission system - .map(|account| Account { - platform: account.platform, - address: account.address, - }) - .collect(); - response::ok(recv_accounts) -} - -// **************************** MARKET ***************************** - -async fn decorate_demand( - db: Data, - path: Query, - id: Identity, -) -> HttpResponse { - let allocation_ids = path.allocation_ids.clone(); - let node_id = id.identity; - let dao: AllocationDao = db.as_dao(); - let allocations = match dao.get_many(allocation_ids, node_id).await { - Ok(allocations) => allocations, - Err(e) => return response::server_error(&e), - }; - if allocations.len() != path.allocation_ids.len() { - return response::not_found(); - } - - let properties: Vec = allocations - .into_iter() - .map(|allocation| MarketProperty { - key: format!( - "golem.com.payment.platform.{}.address", - allocation.payment_platform - ), - value: allocation.address, - }) - .collect(); - let constraints = properties - .iter() - .map(|property| ConstraintKey::new(property.key.as_str()).equal_to(ConstraintKey::new("*"))) - .collect(); - let constraints = vec![Constraints::new_clause(ClauseOperator::Or, constraints).to_string()]; - response::ok(MarketDecoration { - properties, - constraints, - }) -} diff --git a/core/payment/src/dao/debit_note.rs b/core/payment/src/dao/debit_note.rs index 168bc08033..27194a31c7 100644 --- a/core/payment/src/dao/debit_note.rs +++ b/core/payment/src/dao/debit_note.rs @@ -199,25 +199,16 @@ impl<'c> DebitNoteDao<'c> { .await } - async fn get_for_role(&self, node_id: NodeId, role: Role) -> DbResult> { + pub async fn get_for_node_id(&self, node_id: NodeId) -> DbResult> { readonly_transaction(self.pool, move |conn| { let debit_notes: Vec = query!() .filter(dsl::owner_id.eq(node_id)) - .filter(dsl::role.eq(role)) .load(conn)?; debit_notes.into_iter().map(TryInto::try_into).collect() }) .await } - pub async fn get_for_provider(&self, node_id: NodeId) -> DbResult> { - self.get_for_role(node_id, Role::Provider).await - } - - pub async fn get_for_requestor(&self, node_id: NodeId) -> DbResult> { - self.get_for_role(node_id, Role::Requestor).await - } - pub async fn mark_received(&self, debit_note_id: String, owner_id: NodeId) -> DbResult<()> { do_with_transaction(self.pool, move |conn| { diesel::update(dsl::pay_debit_note.find((debit_note_id, owner_id))) diff --git a/core/payment/src/dao/debit_note_event.rs b/core/payment/src/dao/debit_note_event.rs index d2e50ba611..a75f2dbcdd 100644 --- a/core/payment/src/dao/debit_note_event.rs +++ b/core/payment/src/dao/debit_note_event.rs @@ -11,7 +11,6 @@ use ya_client_model::NodeId; use ya_persistence::executor::{ do_with_transaction, readonly_transaction, AsDao, ConnType, PoolType, }; -use ya_persistence::types::Role; pub fn create( debit_note_id: String, @@ -55,17 +54,15 @@ impl<'c> DebitNoteEventDao<'c> { .await } - async fn get_for_role( + pub async fn get_for_node_id( &self, node_id: NodeId, later_than: Option, - role: Role, ) -> DbResult> { readonly_transaction(self.pool, move |conn| { let query = dsl::pay_debit_note_event .inner_join(event_type_dsl::pay_event_type) .filter(dsl::owner_id.eq(node_id)) - .filter(event_type_dsl::role.eq(role)) .select(crate::schema::pay_debit_note_event::all_columns) .order_by(dsl::timestamp.asc()); let events: Vec = match later_than { @@ -76,21 +73,4 @@ impl<'c> DebitNoteEventDao<'c> { }) .await } - - pub async fn get_for_requestor( - &self, - node_id: NodeId, - later_than: Option, - ) -> DbResult> { - self.get_for_role(node_id, later_than, Role::Requestor) - .await - } - - pub async fn get_for_provider( - &self, - node_id: NodeId, - later_than: Option, - ) -> DbResult> { - self.get_for_role(node_id, later_than, Role::Provider).await - } } diff --git a/core/payment/src/dao/invoice.rs b/core/payment/src/dao/invoice.rs index 738c3aa13c..4a406a90c6 100644 --- a/core/payment/src/dao/invoice.rs +++ b/core/payment/src/dao/invoice.rs @@ -158,11 +158,10 @@ impl<'c> InvoiceDao<'c> { .await } - async fn get_for_role(&self, node_id: NodeId, role: Role) -> DbResult> { + pub async fn get_for_node_id(&self, node_id: NodeId) -> DbResult> { readonly_transaction(self.pool, move |conn| { let invoices = query!() .filter(dsl::owner_id.eq(node_id)) - .filter(dsl::role.eq(&role)) .load(conn)?; let activities = activity_dsl::pay_invoice_x_activity .inner_join( @@ -171,7 +170,6 @@ impl<'c> InvoiceDao<'c> { .and(activity_dsl::invoice_id.eq(dsl::id))), ) .filter(dsl::owner_id.eq(node_id)) - .filter(dsl::role.eq(role)) .select(crate::schema::pay_invoice_x_activity::all_columns) .load(conn)?; join_invoices_with_activities(invoices, activities) @@ -206,14 +204,6 @@ impl<'c> InvoiceDao<'c> { Ok(stats) } - pub async fn get_for_provider(&self, node_id: NodeId) -> DbResult> { - self.get_for_role(node_id, Role::Provider).await - } - - pub async fn get_for_requestor(&self, node_id: NodeId) -> DbResult> { - self.get_for_role(node_id, Role::Requestor).await - } - pub async fn mark_received(&self, invoice_id: String, owner_id: NodeId) -> DbResult<()> { do_with_transaction(self.pool, move |conn| { update_status(&invoice_id, &owner_id, &DocumentStatus::Received, conn) diff --git a/core/payment/src/dao/invoice_event.rs b/core/payment/src/dao/invoice_event.rs index 4d19937d22..09732036c4 100644 --- a/core/payment/src/dao/invoice_event.rs +++ b/core/payment/src/dao/invoice_event.rs @@ -11,7 +11,6 @@ use ya_client_model::NodeId; use ya_persistence::executor::{ do_with_transaction, readonly_transaction, AsDao, ConnType, PoolType, }; -use ya_persistence::types::Role; pub fn create( invoice_id: String, @@ -55,17 +54,15 @@ impl<'c> InvoiceEventDao<'c> { .await } - async fn get_for_role( + pub async fn get_for_node_id( &self, node_id: NodeId, later_than: Option, - role: Role, ) -> DbResult> { readonly_transaction(self.pool, move |conn| { let query = dsl::pay_invoice_event .inner_join(event_type_dsl::pay_event_type) .filter(dsl::owner_id.eq(node_id)) - .filter(event_type_dsl::role.eq(role)) .select(crate::schema::pay_invoice_event::all_columns) .order_by(dsl::timestamp.asc()); let events: Vec = match later_than { @@ -76,21 +73,4 @@ impl<'c> InvoiceEventDao<'c> { }) .await } - - pub async fn get_for_requestor( - &self, - node_id: NodeId, - later_than: Option, - ) -> DbResult> { - self.get_for_role(node_id, later_than, Role::Requestor) - .await - } - - pub async fn get_for_provider( - &self, - node_id: NodeId, - later_than: Option, - ) -> DbResult> { - self.get_for_role(node_id, later_than, Role::Provider).await - } } diff --git a/core/payment/src/dao/payment.rs b/core/payment/src/dao/payment.rs index 844f437d0d..1fb2d5178f 100644 --- a/core/payment/src/dao/payment.rs +++ b/core/payment/src/dao/payment.rs @@ -17,7 +17,6 @@ use ya_client_model::NodeId; use ya_persistence::executor::{ do_with_transaction, readonly_transaction, AsDao, ConnType, PoolType, }; -use ya_persistence::types::Role; pub struct PaymentDao<'c> { pool: &'c PoolType, @@ -188,16 +187,14 @@ impl<'c> PaymentDao<'c> { .await } - async fn get_for_role( + pub async fn get_for_node_id( &self, node_id: NodeId, later_than: Option, - role: Role, ) -> DbResult> { readonly_transaction(self.pool, move |conn| { let query = dsl::pay_payment .filter(dsl::owner_id.eq(&node_id)) - .filter(dsl::role.eq(&role)) .order_by(dsl::timestamp.asc()); let payments: Vec = match later_than { Some(timestamp) => query.filter(dsl::timestamp.gt(timestamp)).load(conn)?, @@ -210,7 +207,6 @@ impl<'c> PaymentDao<'c> { .and(activity_pay_dsl::payment_id.eq(dsl::id))), ) .filter(dsl::owner_id.eq(&node_id)) - .filter(dsl::role.eq(&role)) .select(crate::schema::pay_activity_payment::all_columns) .load(conn)?; let agreement_payments = agreement_pay_dsl::pay_agreement_payment @@ -220,7 +216,6 @@ impl<'c> PaymentDao<'c> { .and(agreement_pay_dsl::payment_id.eq(dsl::id))), ) .filter(dsl::owner_id.eq(&node_id)) - .filter(dsl::role.eq(&role)) .select(crate::schema::pay_agreement_payment::all_columns) .load(conn)?; Ok(join_activity_and_agreement_payments( @@ -231,23 +226,6 @@ impl<'c> PaymentDao<'c> { }) .await } - - pub async fn get_for_requestor( - &self, - node_id: NodeId, - later_than: Option, - ) -> DbResult> { - self.get_for_role(node_id, later_than, Role::Requestor) - .await - } - - pub async fn get_for_provider( - &self, - node_id: NodeId, - later_than: Option, - ) -> DbResult> { - self.get_for_role(node_id, later_than, Role::Provider).await - } } fn join_activity_and_agreement_payments(