Skip to content

Commit

Permalink
SubscriptionId consists of random part and offer/demand hash
Browse files Browse the repository at this point in the history
  • Loading branch information
nieznanysprawiciel authored and tworec committed May 26, 2020
1 parent 2701327 commit fd6c148
Show file tree
Hide file tree
Showing 8 changed files with 206 additions and 45 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions core/market/decentralized/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -23,13 +23,15 @@ chrono = { version = "0.4", features = ["serde"] }
derive_more = "0.99.5"
diesel = { version = "1.4", features = ["chrono", "sqlite", "r2d2"] }
diesel_migrations = "1.4"
digest = "0.8.1"
futures = "0.3"
lazy_static = "1.4"
libsqlite3-sys = { version = "0.9.1", features = ["bundled"] }
log = "0.4"
r2d2 = "0.8"
serde = { version = "^1.0", features = ["derive"] }
serde_json = "^1.0"
sha3 = "0.8.2"
thiserror = "1.0"
tokio = { version = "0.2", features = ["time"] }
uuid = { version = "0.8", features = ["v4"] }
Expand Down
3 changes: 3 additions & 0 deletions core/market/decentralized/src/db/models.rs
Original file line number Diff line number Diff line change
@@ -1,5 +1,8 @@
mod demand;
mod offer;
mod subscription;

pub use demand::Demand;
pub use offer::Offer;

pub use subscription::{SubscriptionId, SubscriptionParseError};
44 changes: 23 additions & 21 deletions core/market/decentralized/src/db/models/demand.rs
Original file line number Diff line number Diff line change
@@ -1,54 +1,56 @@
use diesel::prelude::*;
use serde_json;
use std::str::FromStr;
use uuid::Uuid;

use ya_client::model::market::Demand as ClientDemand;
use ya_client::model::ErrorMessage;
use ya_service_api_web::middleware::Identity;

use crate::db::models::offer::generate_subscription_id;
use super::SubscriptionId;
use crate::db::schema::market_demand;

#[derive(Clone, Debug, Identifiable, Insertable, Queryable)]
#[table_name = "market_demand"]
pub struct Demand {
pub id: String,
pub id: SubscriptionId,
pub properties: String,
pub constraints: String,
pub node_id: String,
}

impl Demand {
pub fn from(demand: &ClientDemand) -> Result<Demand, ErrorMessage> {
let properties = demand.properties.to_string();
let constraints = demand.constraints.clone();
let node_id = demand.requestor_id()?.to_string();
let id = SubscriptionId::from_str(demand.demand_id()?)?;

Ok(Demand {
id: demand
.demand_id
.clone()
.unwrap_or(generate_subscription_id()),
properties: demand.properties.to_string(),
constraints: demand.constraints.clone(),
node_id: demand
.requestor_id()
.map_err(|error| format!("Anonymous demand - {}", error))?
.to_string(),
id,
properties,
constraints,
node_id,
})
}

pub fn from_with_identity(demand: &ClientDemand, id: &Identity) -> Demand {
pub fn from_new(demand: &ClientDemand, id: &Identity) -> Demand {
let properties = demand.properties.to_string();
let constraints = demand.constraints.clone();
let node_id = id.identity.to_string();
let id = SubscriptionId::generate_id(&properties, &constraints, &node_id);

Demand {
id: demand
.demand_id
.clone()
.unwrap_or(generate_subscription_id()),
properties: demand.properties.to_string(),
constraints: demand.constraints.clone(),
node_id: id.identity.to_string(),
id,
properties,
constraints,
node_id,
}
}

pub fn into_client_offer(&self) -> Result<ClientDemand, ErrorMessage> {
Ok(ClientDemand {
demand_id: Some(self.id.clone()),
demand_id: Some(self.id.to_string()),
requestor_id: Some(self.node_id.clone()),
constraints: self.constraints.clone(),
properties: serde_json::to_value(&self.properties).map_err(|error| {
Expand Down
45 changes: 26 additions & 19 deletions core/market/decentralized/src/db/models/offer.rs
Original file line number Diff line number Diff line change
@@ -1,52 +1,59 @@
use diesel::prelude::*;
use serde_json;
use std::str::FromStr;
use uuid::Uuid;

use ya_client::model::market::Offer as ClientOffer;
use ya_client::model::ErrorMessage;
use ya_service_api_web::middleware::Identity;

use super::SubscriptionId;
use crate::db::schema::market_offer;

#[derive(Clone, Debug, Identifiable, Insertable, Queryable)]
#[table_name = "market_offer"]
pub struct Offer {
pub id: String,
pub id: SubscriptionId,
pub properties: String,
pub constraints: String,
pub node_id: String,
}

/// TODO: Should be cryptographically strong.
pub fn generate_subscription_id() -> String {
Uuid::new_v4().to_simple().to_string()
}

impl Offer {
pub fn from(offer: &ClientOffer) -> Result<Offer, ErrorMessage> {
let properties = offer.properties.to_string();
let constraints = offer.constraints.clone();
let node_id = offer
.provider_id()
.map_err(|error| format!("Anonymous offer - {}", error))?
.to_string();
let id = SubscriptionId::from_str(offer.offer_id()?)?;

Ok(Offer {
id: offer.offer_id.clone().unwrap_or(generate_subscription_id()),
properties: offer.properties.to_string(),
constraints: offer.constraints.clone(),
node_id: offer
.provider_id()
.map_err(|error| format!("Anonymous offer - {}", error))?
.to_string(),
id,
properties,
constraints,
node_id,
})
}

pub fn from_with_identity(offer: &ClientOffer, id: &Identity) -> Offer {
pub fn from_new(offer: &ClientOffer, id: &Identity) -> Offer {
let properties = offer.properties.to_string();
let constraints = offer.constraints.clone();
let node_id = id.identity.to_string();
let id = SubscriptionId::generate_id(&properties, &constraints, &node_id);

Offer {
id: offer.offer_id.clone().unwrap_or(generate_subscription_id()),
properties: offer.properties.to_string(),
constraints: offer.constraints.clone(),
node_id: id.identity.to_string(),
id,
properties,
constraints,
node_id,
}
}

pub fn into_client_offer(&self) -> Result<ClientOffer, ErrorMessage> {
Ok(ClientOffer {
offer_id: Some(self.id.clone()),
offer_id: Some(self.id.to_string()),
provider_id: Some(self.node_id.clone()),
constraints: self.constraints.clone(),
properties: serde_json::to_value(&self.properties).map_err(|error| {
Expand Down
145 changes: 145 additions & 0 deletions core/market/decentralized/src/db/models/subscription.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,145 @@
use derive_more::Display;
use diesel::backend::Backend;
use diesel::deserialize::{FromSql, Result as DeserializeResult};
use diesel::serialize::{Output, Result as SerializeResult, ToSql};
use diesel::sql_types::Text;
use digest::Digest;
use sha3::Sha3_256;
use std::str::FromStr;
use std::io::Write;
use thiserror::Error;
use uuid::Uuid;

use ya_client::model::ErrorMessage;
use digest::generic_array::GenericArray;


#[derive(Error, Debug)]
pub enum SubscriptionParseError {
#[error("Subscription id [{}] has invalid format.", .0)]
InvalidFormat(String),
#[error("Subscription id [{}] contains non hexadecimal characters.", .0)]
NotHexadecimal(String),
#[error("Subscription id [{}] has invalid length.", .0)]
InvalidLength(String),
}


#[derive(Display, Debug, Clone, AsExpression, FromSqlRow, Hash, PartialEq, Eq)]
#[display(fmt = "{}-{}", random_id, hash)]
#[sql_type = "Text"]
pub struct SubscriptionId {
random_id: String,
hash: String,
}

/// TODO: Should be cryptographically strong.
pub fn generate_random_id() -> String {
Uuid::new_v4().to_simple().to_string()
}

impl SubscriptionId {
pub fn generate_id(properties: &str, constraints: &str, node_id: &str) -> SubscriptionId {
SubscriptionId {
random_id: generate_random_id(),
hash: hash(properties, constraints, node_id),
}
}
}

fn hash(properties: &str, constraints: &str, node_id: &str) -> String {
let mut hasher = Sha3_256::new();

hasher.input(properties);
hasher.input(constraints);
hasher.input(node_id);

format!("{:x}", hasher.result())
}


impl FromStr for SubscriptionId {
type Err = SubscriptionParseError;

fn from_str(s: &str) -> Result<Self, Self::Err> {
let elements: Vec<&str> = s
.split('-')
.collect();

if elements.len() != 2 {
Err(SubscriptionParseError::InvalidFormat(s.to_string()))?;
}

if !elements
.iter()
.map(|slice| slice
.chars()
.all(|character| character.is_ascii_hexdigit()))
.all(|result| result == true) {
Err(SubscriptionParseError::NotHexadecimal(s.to_string()))?;
}

if elements[0].len() != 32 {
Err(SubscriptionParseError::InvalidLength(s.to_string()))?;
}

if elements[1].len() != 64 {
Err(SubscriptionParseError::InvalidLength(s.to_string()))?;
}

Ok(SubscriptionId {
random_id: elements[0].to_string(),
hash: elements[1].to_string(),
})
}
}

impl<DB> ToSql<Text, DB> for SubscriptionId
where
DB: Backend,
String: ToSql<Text, DB>,
{
fn to_sql<W: Write>(&self, out: &mut Output<W, DB>) -> SerializeResult {
self.to_string().to_sql(out)
}
}

impl<DB> FromSql<Text, DB> for SubscriptionId
where
DB: Backend,
String: FromSql<Text, DB>,
{
fn from_sql(bytes: Option<&DB::RawValue>) -> DeserializeResult<Self> {
let string = String::from_sql(bytes)?;
match SubscriptionId::from_str(&string) {
Ok(subscription) => Ok(subscription),
Err(error) => Err(error.into()),
}
}
}

impl From<SubscriptionParseError> for ErrorMessage {
fn from(err: SubscriptionParseError) -> Self {
ErrorMessage::new(err.to_string())
}
}


#[cfg(test)]
mod tests {
use super::*;

#[test]
fn test_subscription_from_str() {
let subscription_id = "c76161077d0343ab85ac986eb5f6ea38-edb0016d9f8bafb54540da34f05a8d510de8114488f23916276bdead05509a53";

let sub_id = SubscriptionId::from_str(subscription_id).unwrap();
assert_eq!(sub_id.hash.as_str(), "edb0016d9f8bafb54540da34f05a8d510de8114488f23916276bdead05509a53");
assert_eq!(sub_id.random_id.as_str(), "c76161077d0343ab85ac986eb5f6ea38");

assert_eq!(SubscriptionId::from_str("34324-241").is_ok(), false);
assert_eq!(SubscriptionId::from_str("c76161077d0343ab85ac986eb5f6ea38edb0016d9f8bafb54540da34f05a8d510de8114488f23916276bdead05509a53").is_ok(), false);
assert_eq!(SubscriptionId::from_str("gfht-ertry").is_ok(), false);
}

}
8 changes: 4 additions & 4 deletions core/market/decentralized/src/market.rs
Original file line number Diff line number Diff line change
Expand Up @@ -98,8 +98,8 @@ impl MarketService {
}

pub async fn subscribe_offer(&self, offer: Offer, id: Identity) -> Result<String, MarketError> {
let offer = ModelOffer::from_with_identity(&offer, &id);
let subscription_id = offer.id.clone();
let offer = ModelOffer::from_new(&offer, &id);
let subscription_id = offer.id.to_string();

self.provider_negotiation_engine
.subscribe_offer(&offer)
Expand All @@ -126,8 +126,8 @@ impl MarketService {
demand: Demand,
id: Identity,
) -> Result<String, MarketError> {
let demand = ModelDemand::from_with_identity(&demand, &id);
let subscription_id = demand.id.clone();
let demand = ModelDemand::from_new(&demand, &id);
let subscription_id = demand.id.to_string();

self.requestor_negotiation_engine
.subscribe_demand(&demand)
Expand Down
2 changes: 1 addition & 1 deletion core/market/decentralized/src/matcher/matcher.rs
Original file line number Diff line number Diff line change
Expand Up @@ -128,7 +128,7 @@ impl Matcher {
self.discovery
.broadcast_offer(model_offer.into_client_offer()?)
.await
.map_err(|error| OfferError::BroadcastOfferFailure(error, model_offer.id.clone()))?;
.map_err(|error| OfferError::BroadcastOfferFailure(error, model_offer.id.to_string()))?;
Ok(())
}

Expand Down

0 comments on commit fd6c148

Please sign in to comment.