Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Events and API unification - Market API. #52

Merged
merged 41 commits into from
Nov 9, 2020
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
Show all changes
41 commits
Select commit Hold shift + click to select a range
1f20fee
Events and API unification - Market API.
Oct 24, 2020
2f02945
Trim trailing whitespaces.
Oct 24, 2020
5c8a833
No newline at end of file.
Oct 24, 2020
7f58ff6
Pre-commit ran.
Oct 24, 2020
7931a26
Review comments & updates
Oct 27, 2020
c73279e
Merge branch 'master' into stranger80/market-api-events
Oct 27, 2020
bcfc8a1
pre-commit
Oct 27, 2020
f255ee0
Using DemandOfferBase as input to Subscribe calls
stranger80 Oct 28, 2020
5ebc2ad
Using DemandOfferBase as input to CounterProposal
stranger80 Oct 28, 2020
764042d
back compatibility for reject proposal
tworec Nov 2, 2020
8fc1b6b
remove unused: 410/Gone from agreement events and AgreementProposalEvent
tworec Nov 2, 2020
e123906
[mkt spec] AgreementEvent back compatibility
tworec Nov 3, 2020
dfb24af
[mkt spec] named types fo Timestamp and ValidTo + required fields con…
tworec Nov 3, 2020
86d25ea
[mkt spec] bump version
tworec Nov 3, 2020
83cc60c
[mkt spec] restored date-time to enable description + required fields…
tworec Nov 3, 2020
7ce47ef
[mkt spec] dedent to enable proper code generation
tworec Nov 3, 2020
c5c72e9
[mkt] client code compliant with 1.6.1 spec
tworec Nov 3, 2020
6604805
[mkt spec] apply review comments
tworec Nov 3, 2020
932869d
[mkt] Event: align generated code with yaml spec
tworec Nov 3, 2020
261f6ce
[mkt spec] remove AgreementTimeoutEvent + minor tweaks
tworec Nov 3, 2020
2c7b6c4
[mkt spec] backward compatibility for query params
tworec Nov 3, 2020
d795f91
[mkt] Agreement events endpoint + appSessionId + auto mixed case for …
tworec Nov 3, 2020
e897456
[act,pay] remove non snake case
tworec Nov 3, 2020
aba1c0c
turn off backtrace
tworec Nov 3, 2020
c8a04c9
[mkt] facilitate countering proposal
tworec Nov 3, 2020
637c535
[mkt models] use NodeId
tworec Nov 3, 2020
b50641a
[mkt models] Proposal timestamp as DateTime
tworec Nov 3, 2020
a5d4e95
market_interaction example fix
tworec Nov 3, 2020
2e745db
[mkt] revert facilitate countering proposal
tworec Nov 3, 2020
d84bb37
apply review sugestions to parameter maximum and default value
tworec Nov 4, 2020
008066b
apply review comments:
tworec Nov 5, 2020
44df81c
[mkt] use aliases instead of `DemandOfferBase`
tworec Nov 5, 2020
e6d5a2d
[mkt] add appSessionId to the Agreement
tworec Nov 5, 2020
27bdc52
[mkt] terminatedSignature added to Agreement
tworec Nov 5, 2020
b6ce271
Merge branch 'master' into stranger80/market-api-events
tworec Nov 6, 2020
9a35d94
[mkt] model back compatiblity - pub use DemandOfferBase
tworec Nov 6, 2020
50f1897
[mkt] Reason to support extra properties
tworec Nov 6, 2020
25da200
[mkt] added cancelAgreementWithReason in spec + all endpoints with re…
tworec Nov 6, 2020
9cb09d0
[mkt] termination signature moved from Agreement to Event
tworec Nov 6, 2020
28cef62
[mkt] **All** Agreement events will be populated on **both** sides
tworec Nov 6, 2020
ba89a54
[mkt] improoved descriptions for some endpoints
tworec Nov 6, 2020
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -22,13 +22,13 @@ members = [
ya-client-model = { version="0.2", path = "model" }

awc = "1.0"
backtrace = "0.3"
bytes = "0.5"
chrono = "0.4"
envy = "0.4"
failure = "0.1.6"
futures = "0.3"
hex = "0.4"
heck = "0.3.1"
log = "0.4"
rand = "0.6"
mime = "0.3"
Expand Down
147 changes: 73 additions & 74 deletions examples/market_interaction.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,8 @@ use url::Url;
use ya_client::{
market::{MarketProviderApi, MarketRequestorApi},
model::market::{
proposal::State, AgreementProposal, Demand, Offer, Proposal, ProviderEvent, RequestorEvent,
proposal::State, AgreementProposal, NewDemand, NewOffer, Proposal, ProviderEvent,
RequestorEvent,
},
web::{WebClient, WebInterface},
Error, Result,
Expand All @@ -23,31 +24,23 @@ struct Options {
log_level: String,
}

async fn check_provider_subscriptions(
client: &MarketProviderApi,
expected_cnt: usize,
) -> Result<()> {
let provider_subscriptions = client.get_offers().await?;
async fn unsubscribe_old_offers(client: &MarketProviderApi) -> Result<()> {
let offers = client.get_offers().await?;

println!(
" <=PROVIDER | {} active subscriptions",
provider_subscriptions.len(),
);
assert_eq!(provider_subscriptions.len(), expected_cnt);
println!(" <=PROVIDER | {} active offers", offers.len(),);
for o in offers {
client.unsubscribe(&o.offer_id).await?;
}
Ok(())
}

async fn check_requestor_subscriptions(
client: &MarketRequestorApi,
expected_cnt: usize,
) -> Result<()> {
let requestor_subscriptions = client.get_demands().await?;
async fn unsubscribe_old_demands(client: &MarketRequestorApi) -> Result<()> {
let demands = client.get_demands().await?;

println!(
"REQUESTOR=> | {} active subscriptions",
requestor_subscriptions.len(),
);
assert_eq!(requestor_subscriptions.len(), expected_cnt);
println!("REQUESTOR=> | {} active demands", demands.len(),);
for d in demands {
client.unsubscribe(&d.demand_id).await?;
}
Ok(())
}

Expand All @@ -68,48 +61,40 @@ fn cmp_proposals(p1: &Proposal, p2: &Proposal) {
// PROVIDER //
//////////////
async fn provider_interact(client: MarketProviderApi) -> Result<()> {
unsubscribe_old_offers(&client).await?;
// provider - publish offer
let offer = Offer::new(serde_json::json!({"zima":"już"}), "(&(lato=nie))".into());
let provider_subscription_id = client.subscribe(&offer).await?;
println!(
" <=PROVIDER | subscription id: {}",
provider_subscription_id
);

check_provider_subscriptions(&client, 1).await?;
let offer = NewOffer::new(serde_json::json!({"zima":"już"}), "(&(lato=nie))".into());
let offer_id = client.subscribe(&offer).await?;
println!(" <=PROVIDER | offer id: {}", offer_id);

// provider - get events
'prov_events: loop {
let mut provider_events = vec![];
while provider_events.is_empty() {
provider_events = client
.collect(&provider_subscription_id, Some(1.0), Some(2))
.await?;
provider_events = client.collect(&offer_id, Some(1.0), Some(2)).await?;
println!(" <=PROVIDER | waiting for events");
thread::sleep(Duration::from_millis(3000))
}
println!(" <=PROVIDER | Yay! Got event(s): {:#?}", provider_events);

for event in &provider_events {
match &event {
for event in provider_events {
match event {
// demand proposal received --> respond with an counter offer
ProviderEvent::ProposalEvent { proposal, .. } => {
if proposal.prev_proposal_id.is_none() && proposal.state()? == &State::Draft {
if proposal.prev_proposal_id.is_none() && proposal.state == State::Draft {
log::error!("Draft Proposal but wo prev id: {:#?}", proposal)
}

let proposal_id = proposal.proposal_id()?;
let proposal_id = &proposal.proposal_id;

// this is not needed in regular flow; just to illustrate possibility
let demand_proposal = client
.get_proposal(&provider_subscription_id, &proposal_id)
.await?;
cmp_proposals(&demand_proposal, proposal);
let demand_proposal = client.get_proposal(&offer_id, proposal_id).await?;
cmp_proposals(&demand_proposal, &proposal);

println!(" <=PROVIDER | Huha! Got Demand Proposal. Accepting...");
let bespoke_proposal = proposal.counter_offer(offer.clone())?;
let bespoke_proposal = offer.clone();
let new_prop_id = client
.counter_proposal(&bespoke_proposal, &provider_subscription_id)
.counter_proposal(&bespoke_proposal, &offer_id, proposal_id)
.await?;
println!(
" <=PROVIDER | Responded with Counter proposal: {}",
Expand All @@ -124,14 +109,24 @@ async fn provider_interact(client: MarketProviderApi) -> Result<()> {
agreement_id
);

let status = client.approve_agreement(agreement_id, None).await?;
let status = client.approve_agreement(agreement_id, None, None).await?;
// one can also call:
// let res = client.reject_agreement(agreement_id).await?;
println!(" <=PROVIDER | Agreement {} by Requestor!", status);

println!(" <=PROVIDER | I'm done for now! Bye...");
break 'prov_events;
}
ProviderEvent::ProposalRejectedEvent {
proposal_id,
reason,
..
} => {
println!(
"Proposal rejected [{}], reason: '{:?}'",
proposal_id, reason
);
}
ProviderEvent::PropertyQueryEvent { .. } => {
println!("Unsupported PropertyQueryEvent.");
}
Expand All @@ -140,59 +135,51 @@ async fn provider_interact(client: MarketProviderApi) -> Result<()> {
}

println!(" <=PROVIDER | Unsubscribing...");
let res = client.unsubscribe(&provider_subscription_id).await?;
let res = client.unsubscribe(&offer_id).await?;
println!(" <=PROVIDER | Unsubscribed: {}", res);
check_provider_subscriptions(&client, 0).await
Ok(())
}

//\\\\\\\\\\\//
// REQUESTOR //
//\\\\\\\\\\\//
async fn requestor_interact(client: MarketRequestorApi) -> Result<()> {
thread::sleep(Duration::from_millis(300));
unsubscribe_old_demands(&client).await?;
// requestor - publish demand
let demand = Demand::new(serde_json::json!({"lato":"nie"}), "(&(zima=już))".into());
let requestor_subscription_id = client.subscribe(&demand).await?;
println!(
"REQUESTOR=> | subscription id: {}",
requestor_subscription_id
);

check_requestor_subscriptions(&client, 1).await?;
let demand = NewDemand::new(serde_json::json!({"lato":"nie"}), "(&(zima=już))".into());
let demand_id = client.subscribe(&demand).await?;
println!("REQUESTOR=> | demand id: {}", demand_id);

// requestor - get events
'req_events: loop {
let mut requestor_events = vec![];
while requestor_events.is_empty() {
requestor_events = client
.collect(&requestor_subscription_id, Some(1.0), Some(2))
.await?;
requestor_events = client.collect(&demand_id, Some(1.0), Some(2)).await?;
println!("REQUESTOR=> | waiting for events");
thread::sleep(Duration::from_millis(3000))
}
println!("REQUESTOR=> | Yay! Got event(s): {:#?}", requestor_events);

// requestor - support first event
for event in &requestor_events {
match &event {
for event in requestor_events {
match event {
RequestorEvent::ProposalEvent { proposal, .. } => {
let proposal_id = proposal.proposal_id()?;
let proposal_id = &proposal.proposal_id;

// this is not needed in regular flow; just to illustrate possibility
let offer_proposal = client
.get_proposal(&requestor_subscription_id, proposal.proposal_id()?)
.await?;
cmp_proposals(&offer_proposal, proposal);
let offer_proposal = client.get_proposal(&demand_id, proposal_id).await?;
cmp_proposals(&offer_proposal, &proposal);

match proposal.state()? {
match proposal.state {
State::Initial => {
if proposal.prev_proposal_id.is_some() {
log::error!("Initial Proposal but with prev id: {:#?}", proposal);
}
println!("REQUESTOR=> | Negotiating proposal...");
let bespoke_proposal = proposal.counter_demand(demand.clone())?;
let bespoke_proposal = demand.clone();
let new_proposal_id = client
.counter_proposal(&bespoke_proposal, &requestor_subscription_id)
.counter_proposal(&bespoke_proposal, &demand_id, proposal_id)
.await?;
println!(
"REQUESTOR=> | Responded with counter proposal (id: {})",
Expand All @@ -202,21 +189,23 @@ async fn requestor_interact(client: MarketRequestorApi) -> Result<()> {
State::Draft => {
println!("REQUESTOR=> | Got draft proposal. Creating agreement...");
let new_agreement_id = proposal_id.clone();
let agreement =
AgreementProposal::new(new_agreement_id, chrono::Utc::now());
let id = client.create_agreement(&agreement).await?;
let agreement = AgreementProposal::new(
new_agreement_id,
chrono::Utc::now() + chrono::Duration::minutes(5),
);
let agreement_id = client.create_agreement(&agreement).await?;
println!(
"REQUESTOR=> | agreement created {}: \n{:#?}\nConfirming...",
id, &agreement
agreement_id, &agreement
);
let res = client.confirm_agreement(&agreement.proposal_id).await?;
let res = client.confirm_agreement(&agreement_id, None).await?;
println!(
"REQUESTOR=> | agreement {} confirmed: {}",
&agreement.proposal_id, res
);

println!("REQUESTOR=> | Waiting for Agreement approval...");
match client.wait_for_approval(&agreement.proposal_id, None).await {
match client.wait_for_approval(&agreement_id, None).await {
Err(Error::TimeoutError { .. }) => {
println!(
"REQUESTOR=> | Timeout waiting for Agreement approval..."
Expand All @@ -236,6 +225,16 @@ async fn requestor_interact(client: MarketRequestorApi) -> Result<()> {
_ => log::error!("unsupported offer proposal state: {:#?}", proposal),
}
}
RequestorEvent::ProposalRejectedEvent {
proposal_id,
reason,
..
} => {
println!(
"Proposal rejected [{}], reason: '{:?}'",
proposal_id, reason
);
}
RequestorEvent::PropertyQueryEvent { .. } => {
log::error!("Unsupported PropertyQueryEvent.");
}
Expand All @@ -244,9 +243,9 @@ async fn requestor_interact(client: MarketRequestorApi) -> Result<()> {
}

println!("REQUESTOR=> | Unsunscribing...");
let res = client.unsubscribe(&requestor_subscription_id).await?;
let res = client.unsubscribe(&demand_id).await?;
println!("REQUESTOR=> | Unsubscribed: {}", res);
check_requestor_subscriptions(&client, 0).await
Ok(())
}

#[actix_rt::main]
Expand Down
20 changes: 13 additions & 7 deletions model/src/market.rs
Original file line number Diff line number Diff line change
@@ -1,17 +1,23 @@
pub mod agreement;
pub mod agreement_event;
pub mod agreement_proposal;
pub mod demand;
pub mod demand_offer_base;
pub mod event;
pub mod offer;
pub mod property_query;
pub mod proposal;
pub mod reason;

pub use self::agreement::Agreement;
pub use self::agreement_proposal::AgreementProposal;
pub use self::demand::Demand;
pub use self::event::{ProviderEvent, RequestorEvent};
pub use self::offer::Offer;
pub use self::property_query::PropertyQuery;
pub use self::proposal::Proposal;
pub use agreement::Agreement;
pub use agreement_event::AgreementOperationEvent;
pub use agreement_proposal::AgreementProposal;
pub use demand::Demand;
pub use demand_offer_base::{DemandOfferBase, NewDemand, NewOffer, NewProposal};
pub use event::{ProviderEvent, RequestorEvent};
pub use offer::Offer;
pub use property_query::PropertyQuery;
pub use proposal::Proposal;
pub use reason::{convert_reason, ConvertReason, JsonReason, Reason};

pub const MARKET_API_PATH: &str = "market-api/v1/";
Loading