-
Notifications
You must be signed in to change notification settings - Fork 58
/
common.rs
162 lines (138 loc) · 4.32 KB
/
common.rs
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
use serde::Deserialize;
use uuid::Uuid;
use ya_core_model::market;
use ya_persistence::executor::DbExecutor;
use ya_service_api::constants::NET_SERVICE_ID;
use ya_service_bus::{RpcEndpoint, RpcMessage};
use crate::dao::{ActivityDao, NotFoundAsOption};
use crate::error::Error;
use ya_model::market::Agreement;
use ya_service_bus::typed as bus;
pub type RpcMessageResult<T> = Result<<T as RpcMessage>::Item, <T as RpcMessage>::Error>;
pub const DEFAULT_REQUEST_TIMEOUT: f32 = 12.0;
#[derive(Deserialize)]
pub struct PathActivity {
pub activity_id: String,
}
#[derive(Deserialize)]
pub struct QueryTimeout {
#[serde(rename = "timeout", default = "default_query_timeout")]
pub timeout: Option<f32>,
}
#[derive(Deserialize, Debug)]
pub struct QueryTimeoutMaxCount {
/// number of milliseconds to wait
#[serde(rename = "timeout", default = "default_query_timeout")]
pub timeout: Option<f32>,
/// maximum count of events to return
#[serde(rename = "maxCount")]
pub max_count: Option<u32>,
}
#[inline(always)]
pub(crate) fn default_query_timeout() -> Option<f32> {
Some(DEFAULT_REQUEST_TIMEOUT)
}
#[inline(always)]
pub(crate) fn generate_id() -> String {
// TODO: replace with a cryptographically secure generator
Uuid::new_v4().to_simple().to_string()
}
pub(crate) fn into_json_response<T>(
result: std::result::Result<T, Error>,
) -> actix_web::HttpResponse
where
T: serde::Serialize,
{
let result = match result {
Ok(value) => serde_json::to_string(&value).map_err(Error::from),
Err(e) => Err(e),
};
match result {
Ok(value) => actix_web::HttpResponse::Ok()
.content_type("application/json")
.body(value)
.into(),
Err(e) => e.into(),
}
}
pub(crate) async fn get_agreement(agreement_id: impl ToString) -> Result<Agreement, Error> {
Ok(bus::service(market::BUS_ID)
.send(market::GetAgreement {
agreement_id: agreement_id.to_string(),
})
.await??)
}
pub(crate) async fn get_activity_agreement(
db: &DbExecutor,
activity_id: &str,
_timeout: Option<f32>,
) -> Result<Agreement, Error> {
let agreement_id = db
.as_dao::<ActivityDao>()
.get_agreement_id(activity_id)
.await
.not_found_as_option()
.map_err(Error::from)?
.ok_or(Error::NotFound)?;
let agreement = bus::service(market::BUS_ID)
.send(market::GetAgreement { agreement_id })
.await??;
Ok(agreement)
}
pub(crate) async fn authorize_activity_initiator(
db: &DbExecutor,
caller: impl ToString,
activity_id: &str,
) -> Result<(), Error> {
let agreement_id = db
.as_dao::<ActivityDao>()
.get_agreement_id(&activity_id)
.await
.map_err(Error::from)?;
authorize_agreement_initiator(caller, agreement_id).await
}
pub(crate) async fn authorize_activity_executor(
db: &DbExecutor,
caller: impl ToString,
activity_id: &str,
) -> Result<(), Error> {
let agreement_id = db
.as_dao::<ActivityDao>()
.get_agreement_id(&activity_id)
.await
.map_err(Error::from)?;
authorize_agreement_executor(caller, agreement_id).await
}
pub(crate) async fn authorize_agreement_initiator(
caller: impl ToString,
agreement_id: String,
) -> Result<(), Error> {
let agreement = get_agreement(agreement_id).await?;
let initiator_id = agreement
.demand
.requestor_id
.ok_or(Error::BadRequest("no requestor id".into()))?;
authorize_caller(caller, initiator_id)
}
pub(crate) async fn authorize_agreement_executor(
caller: impl ToString,
agreement_id: String,
) -> Result<(), Error> {
let agreement = get_agreement(agreement_id).await?;
let executor_id = agreement
.offer
.provider_id
.ok_or(Error::BadRequest("no provider id".into()))?;
authorize_caller(caller, executor_id)
}
#[inline(always)]
pub(crate) fn authorize_caller(caller: impl ToString, authorized: String) -> Result<(), Error> {
// FIXME: impl a proper caller struct / parser
let pat = format!("{}/", NET_SERVICE_ID);
let caller = caller.to_string().replacen(&pat, "", 1);
log::debug!("checking caller: {} vs expected: {}", caller, authorized);
match caller == authorized {
true => Ok(()),
false => Err(Error::Forbidden),
}
}