Skip to content

Commit

Permalink
Market decentralized - database models (#269)
Browse files Browse the repository at this point in the history
* Add database most important tables

* Insert offer to database

* [Test] Check market state after subscribing offer

* Unsubscribe offer

* [Test] Add check on not existing subsctiption id

* Subscribe demand and unsubscribe [+Test]

* Add http endpoints

* Decentralized market running instruction

* SubscriptionId consists of random part and offer/demand hash

* rebased on service-ctx-sep-dbs

* Add timestamps to Offer/Demand

* Offer/Demand - validating incoming subscription id hash [+Test]

* Apply review requested changes

* Review changes: Add hardcoded expiration time; Chnage terminate_agreement method to post; Use {0} error syntax

* Added Offer/Demand timestamp of adding record to database

* insertion_ts instead of modification_ts

* [mkt-matcher] error simplify

Co-authored-by: Piotr Chromiec <tworec@golem.network>
  • Loading branch information
nieznanysprawiciel and tworec authored Jun 4, 2020
1 parent e0cd0d9 commit c5fc1d3
Show file tree
Hide file tree
Showing 29 changed files with 1,431 additions and 26 deletions.
5 changes: 5 additions & 0 deletions .cargo/config
Original file line number Diff line number Diff line change
@@ -1,2 +1,7 @@
[build]
rustflags = ["-C", "target-feature=+crt-static"]


[cargo-new]
name = "Golem Factory"
email = "contact@golem.network"
13 changes: 11 additions & 2 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

10 changes: 10 additions & 0 deletions core/market/decentralized/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -14,22 +14,32 @@ ya-persistence = "0.2"
ya-service-api = "0.1"
ya-service-bus = "0.2"
ya-service-api-interfaces = "0.1"
ya-service-api-web = "0.1"

actix-web = "2.0"
anyhow = "1.0"
async-trait = "0.1"
chrono = { version = "0.4", features = ["serde"] }
derive_more = "0.99.5"
diesel = { version = "1.4", features = ["chrono", "sqlite", "r2d2"] }
diesel_migrations = "1.4"
digest = "0.8.1"
futures = "0.3"
lazy_static = "1.4"
libsqlite3-sys = { version = "0.9.1", features = ["bundled"] }
log = "0.4"
r2d2 = "0.8"
serde = { version = "^1.0", features = ["derive"] }
serde_json = "^1.0"
sha3 = "0.8.2"
thiserror = "1.0"
tokio = { version = "0.2", features = ["time"] }
uuid = { version = "0.8", features = ["v4"] }

[dev-dependencies]
actix-rt = "1.0.0"
env_logger = "0.7"
rand = "0.7.2"
serde_json = "1.0"
tokio = { version = "0.2", features = ["macros"] }

Expand Down
Empty file.
Original file line number Diff line number Diff line change
@@ -0,0 +1,4 @@
-- This file should undo anything in `up.sql`

DROP TABLE "market_offer";
DROP TABLE "market_demand";
Original file line number Diff line number Diff line change
@@ -0,0 +1,23 @@
-- Your SQL goes here

CREATE TABLE market_offer (
id VARCHAR(97) NOT NULL PRIMARY KEY,
properties TEXT NOT NULL,
constraints TEXT NOT NULL,
node_id VARCHAR(20) NOT NULL,

creation_ts DATETIME NOT NULL,
insertion_ts DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
expiration_ts DATETIME NOT NULL
);

CREATE TABLE market_demand (
id VARCHAR(97) NOT NULL PRIMARY KEY,
properties TEXT NOT NULL,
constraints TEXT NOT NULL,
node_id VARCHAR(20) NOT NULL,

creation_ts DATETIME NOT NULL,
insertion_ts DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP,
expiration_ts DATETIME NOT NULL
);
21 changes: 21 additions & 0 deletions core/market/decentralized/readme.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,21 @@
# Decentralized market Mk1

## Running yagna with decentralized market

You can enable decentralized market using cargo features.
Run yagna daemon with flags:
```
cargo run --no-default-features --features market-decentralized service run
```

## Running decentralized market test suite

To test market-test-suite run:
```
cargo test --workspace --features ya-market-decentralized/market-test-suite
```
or for market crate only
```
cargo test -p ya-market-decentralized --features ya-market-decentralized/market-test-suite
```

58 changes: 58 additions & 0 deletions core/market/decentralized/src/api.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,58 @@
pub mod provider;
pub mod requestor;
pub mod response;

use serde::{Deserialize, Serialize};

pub const DEFAULT_EVENT_TIMEOUT: f32 = 0.0; // seconds
pub const DEFAULT_QUERY_TIMEOUT: f32 = 12.0;

#[derive(Deserialize)]
pub struct PathAgreement {
pub agreement_id: String,
}

#[derive(Deserialize)]
pub struct PathSubscription {
pub subscription_id: String,
}

#[derive(Deserialize)]
pub struct PathSubscriptionProposal {
pub subscription_id: String,
pub proposal_id: String,
}

#[derive(Deserialize)]
pub struct QueryTimeout {
#[serde(rename = "timeout", default = "default_query_timeout")]
pub timeout: Option<f32>,
}

#[derive(Deserialize)]
pub struct QueryTimeoutCommandIndex {
#[serde(rename = "timeout")]
pub timeout: Option<f32>,
#[serde(rename = "commandIndex")]
pub command_index: Option<usize>,
}

#[derive(Deserialize, Debug)]
pub struct QueryTimeoutMaxEvents {
/// number of milliseconds to wait
#[serde(rename = "timeout", default = "default_event_timeout")]
pub timeout: Option<f32>,
/// maximum count of events to return
#[serde(rename = "maxEvents", default)]
pub max_events: Option<i32>,
}

#[inline(always)]
pub(crate) fn default_query_timeout() -> Option<f32> {
Some(DEFAULT_QUERY_TIMEOUT)
}

#[inline(always)]
pub(crate) fn default_event_timeout() -> Option<f32> {
Some(DEFAULT_EVENT_TIMEOUT)
}
138 changes: 138 additions & 0 deletions core/market/decentralized/src/api/provider.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,138 @@
use actix_web::web::{Data, Json, Path, Query};
use actix_web::{HttpResponse, Scope};
use std::sync::Arc;

use super::response;
use super::{
PathAgreement, PathSubscription, PathSubscriptionProposal, QueryTimeout, QueryTimeoutMaxEvents,
};
use crate::market::MarketService;

use ya_client::model::market::{Agreement, AgreementProposal, Offer, Proposal};
use ya_service_api_web::middleware::Identity;

// This file contains market REST endpoints. Responsibility of these functions
// is calling respective functions in market modules and mapping return values
// to http responses. No market logic is allowed here.

pub fn register_endpoints(scope: Scope) -> Scope {
scope
.service(subscribe)
.service(get_offers)
.service(unsubscribe)
.service(collect)
.service(counter_proposal)
.service(get_proposal)
.service(reject_proposal)
.service(approve_agreement)
.service(reject_agreement)
.service(terminate_agreement)
.service(get_agreement)
}

#[actix_web::post("/offers")]
async fn subscribe(
market: Data<Arc<MarketService>>,
body: Json<Offer>,
id: Identity,
) -> HttpResponse {
match market.subscribe_offer(body.into_inner(), id).await {
Ok(subscription_id) => response::created(subscription_id),
// TODO: Translate MarketError to better HTTP response.
Err(error) => response::server_error(&format!("{}", error)),
}
}

#[actix_web::get("/offers")]
async fn get_offers(market: Data<Arc<MarketService>>, id: Identity) -> HttpResponse {
response::not_implemented()
}

#[actix_web::delete("/offers/{subscription_id}")]
async fn unsubscribe(
market: Data<Arc<MarketService>>,
path: Path<PathSubscription>,
id: Identity,
) -> HttpResponse {
let subscription_id = path.into_inner().subscription_id;
match market.unsubscribe_offer(subscription_id.clone(), id).await {
Ok(()) => response::ok(subscription_id),
// TODO: Translate MatcherError to better HTTP response.
Err(error) => response::server_error(&format!("{}", error)),
}
}

#[actix_web::get("/offers/{subscription_id}/events")]
async fn collect(
market: Data<Arc<MarketService>>,
path: Path<PathSubscription>,
query: Query<QueryTimeoutMaxEvents>,
id: Identity,
) -> HttpResponse {
response::not_implemented()
}

#[actix_web::post("/offers/{subscription_id}/proposals/{proposal_id}")]
async fn counter_proposal(
market: Data<Arc<MarketService>>,
path: Path<PathSubscriptionProposal>,
body: Json<Proposal>,
id: Identity,
) -> HttpResponse {
response::not_implemented()
}

#[actix_web::get("/offers/{subscription_id}/proposals/{proposal_id}")]
async fn get_proposal(
market: Data<Arc<MarketService>>,
path: Path<PathSubscriptionProposal>,
id: Identity,
) -> HttpResponse {
response::not_implemented()
}

#[actix_web::delete("/offers/{subscription_id}/proposals/{proposal_id}")]
async fn reject_proposal(
market: Data<Arc<MarketService>>,
path: Path<PathSubscriptionProposal>,
id: Identity,
) -> HttpResponse {
response::not_implemented()
}

#[actix_web::post("/agreements/{agreement_id}/approve")]
async fn approve_agreement(
market: Data<Arc<MarketService>>,
path: Path<PathAgreement>,
query: Query<QueryTimeout>,
id: Identity,
) -> HttpResponse {
response::not_implemented()
}

#[actix_web::post("/agreements/{agreement_id}/reject")]
async fn reject_agreement(
market: Data<Arc<MarketService>>,
path: Path<PathAgreement>,
id: Identity,
) -> HttpResponse {
response::not_implemented()
}

#[actix_web::post("/agreements/{agreement_id}/terminate")]
async fn terminate_agreement(
market: Data<Arc<MarketService>>,
path: Path<PathAgreement>,
id: Identity,
) -> HttpResponse {
response::not_implemented()
}

#[actix_web::get("/agreements/{agreement_id}")]
async fn get_agreement(
market: Data<Arc<MarketService>>,
path: Path<PathAgreement>,
id: Identity,
) -> HttpResponse {
response::not_implemented()
}
Loading

0 comments on commit c5fc1d3

Please sign in to comment.