1
1
//! Module that deals with requests to /api/v2/canister/.../call
2
2
3
3
use crate :: {
4
- common:: { make_plaintext_response, make_response, remove_effective_principal_id} ,
5
- receive_request_body,
6
- validator_executor:: ValidatorExecutor ,
4
+ common:: CborUserError , validator_executor:: ValidatorExecutor , verify_cbor_content_header,
7
5
HttpError , IngressFilterService ,
8
6
} ;
9
7
10
- use axum:: body:: Body ;
8
+ use axum:: {
9
+ body:: Body ,
10
+ extract:: State ,
11
+ response:: { IntoResponse , Response } ,
12
+ Router ,
13
+ } ;
14
+ use bytes:: Bytes ;
11
15
use http:: Request ;
12
- use http_body_util:: BodyExt ;
13
- use hyper:: { Response , StatusCode } ;
16
+ use hyper:: StatusCode ;
14
17
use ic_crypto_interfaces_sig_verification:: IngressSigVerifier ;
15
18
use ic_interfaces:: ingress_pool:: IngressPoolThrottler ;
16
19
use ic_interfaces_registry:: RegistryClient ;
@@ -28,12 +31,10 @@ use ic_types::{
28
31
CanisterId , CountBytes , NodeId , RegistryVersion , SubnetId ,
29
32
} ;
30
33
use std:: convert:: { Infallible , TryInto } ;
31
- use std:: future:: Future ;
32
- use std:: pin:: Pin ;
34
+ use std:: sync:: Mutex ;
33
35
use std:: sync:: { Arc , RwLock } ;
34
- use std:: task:: { Context , Poll } ;
35
36
use tokio:: sync:: mpsc:: UnboundedSender ;
36
- use tower:: { BoxError , Service , ServiceExt } ;
37
+ use tower:: { util :: BoxCloneService , ServiceBuilder , ServiceExt } ;
37
38
38
39
#[ derive( Clone ) ]
39
40
pub struct CallService {
@@ -42,19 +43,25 @@ pub struct CallService {
42
43
subnet_id : SubnetId ,
43
44
registry_client : Arc < dyn RegistryClient > ,
44
45
validator_executor : ValidatorExecutor < SignedIngressContent > ,
45
- ingress_filter : IngressFilterService ,
46
+ ingress_filter : Arc < Mutex < IngressFilterService > > ,
46
47
ingress_throttler : Arc < RwLock < dyn IngressPoolThrottler + Send + Sync > > ,
47
48
ingress_tx : UnboundedSender < UnvalidatedArtifactMutation < IngressArtifact > > ,
48
49
}
49
50
51
+ impl CallService {
52
+ pub ( crate ) fn route ( ) -> & ' static str {
53
+ "/api/v2/canister/:effective_canister_id/call"
54
+ }
55
+ }
56
+
50
57
pub struct CallServiceBuilder {
51
58
log : Option < ReplicaLogger > ,
52
59
node_id : NodeId ,
53
60
subnet_id : SubnetId ,
54
61
malicious_flags : Option < MaliciousFlags > ,
55
62
ingress_verifier : Arc < dyn IngressSigVerifier + Send + Sync > ,
56
63
registry_client : Arc < dyn RegistryClient > ,
57
- ingress_filter : IngressFilterService ,
64
+ ingress_filter : Arc < Mutex < IngressFilterService > > ,
58
65
ingress_throttler : Arc < RwLock < dyn IngressPoolThrottler + Send + Sync > > ,
59
66
ingress_tx : UnboundedSender < UnvalidatedArtifactMutation < IngressArtifact > > ,
60
67
}
@@ -76,7 +83,7 @@ impl CallServiceBuilder {
76
83
malicious_flags : None ,
77
84
ingress_verifier,
78
85
registry_client,
79
- ingress_filter,
86
+ ingress_filter : Arc :: new ( Mutex :: new ( ingress_filter ) ) ,
80
87
ingress_throttler,
81
88
ingress_tx,
82
89
}
@@ -92,9 +99,9 @@ impl CallServiceBuilder {
92
99
self
93
100
}
94
101
95
- pub fn build ( self ) -> CallService {
102
+ pub ( crate ) fn build_router ( self ) -> Router {
96
103
let log = self . log . unwrap_or ( no_op_logger ( ) ) ;
97
- CallService {
104
+ let state = CallService {
98
105
log : log. clone ( ) ,
99
106
node_id : self . node_id ,
100
107
subnet_id : self . subnet_id ,
@@ -108,7 +115,18 @@ impl CallServiceBuilder {
108
115
ingress_filter : self . ingress_filter ,
109
116
ingress_throttler : self . ingress_throttler ,
110
117
ingress_tx : self . ingress_tx ,
111
- }
118
+ } ;
119
+ Router :: new ( ) . route_service (
120
+ CallService :: route ( ) ,
121
+ axum:: routing:: post ( call) . with_state ( state) . layer (
122
+ ServiceBuilder :: new ( ) . layer ( axum:: middleware:: from_fn ( verify_cbor_content_header) ) ,
123
+ ) ,
124
+ )
125
+ }
126
+
127
+ pub fn build_service ( self ) -> BoxCloneService < Request < Body > , Response , Infallible > {
128
+ let router = self . build_router ( ) ;
129
+ BoxCloneService :: new ( router. into_service ( ) )
112
130
}
113
131
}
114
132
@@ -161,154 +179,109 @@ fn get_registry_data(
161
179
}
162
180
163
181
/// Handles a call to /api/v2/canister/../call
164
- impl Service < Request < Body > > for CallService {
165
- type Response = Response < Body > ;
166
- type Error = Infallible ;
167
- #[ allow( clippy:: type_complexity) ]
168
- type Future = Pin < Box < dyn Future < Output = Result < Self :: Response , Self :: Error > > + Send > > ;
182
+ pub ( crate ) async fn call (
183
+ axum:: extract:: Path ( effective_canister_id) : axum:: extract:: Path < CanisterId > ,
184
+ State ( CallService {
185
+ log,
186
+ node_id,
187
+ subnet_id,
188
+ registry_client,
189
+ validator_executor,
190
+ ingress_filter,
191
+ ingress_throttler,
192
+ ingress_tx,
193
+ } ) : State < CallService > ,
194
+ body : Bytes ,
195
+ ) -> impl IntoResponse {
196
+ let msg: SignedIngress = match SignedRequestBytes :: from ( body. to_vec ( ) ) . try_into ( ) {
197
+ Ok ( msg) => msg,
198
+ Err ( e) => {
199
+ let status = StatusCode :: BAD_REQUEST ;
200
+ let text = format ! ( "Could not parse body as call message: {}" , e) ;
201
+ return ( status, text) . into_response ( ) ;
202
+ }
203
+ } ;
169
204
170
- fn poll_ready ( & mut self , _cx : & mut Context < ' _ > ) -> Poll < Result < ( ) , Self :: Error > > {
171
- Poll :: Ready ( Ok ( ( ) ) )
205
+ // Reject requests where `canister_id` != `effective_canister_id` for non mgmt canister calls.
206
+ // This needs to be enforced because boundary nodes block access based on the `effective_canister_id`
207
+ // in the url and the replica processes the request based on the `canister_id`.
208
+ // If this is not enforced, a blocked canisters can still be accessed by specifying
209
+ // a non-blocked `effective_canister_id` and a blocked `canister_id`.
210
+ if msg. canister_id ( ) != CanisterId :: ic_00 ( ) && msg. canister_id ( ) != effective_canister_id {
211
+ let status = StatusCode :: BAD_REQUEST ;
212
+ let text = format ! (
213
+ "Specified CanisterId {} does not match effective canister id in URL {}" ,
214
+ msg. canister_id( ) ,
215
+ effective_canister_id
216
+ ) ;
217
+ return ( status, text) . into_response ( ) ;
172
218
}
173
219
174
- fn call ( & mut self , request : Request < Body > ) -> Self :: Future {
175
- let ingress_tx = self . ingress_tx . clone ( ) ;
176
- let ingress_filter = self . ingress_filter . clone ( ) ;
177
- let log = self . log . clone ( ) ;
178
- let validator_executor = self . validator_executor . clone ( ) ;
179
- let node_id = self . node_id ;
180
- let ingress_throttler = self . ingress_throttler . clone ( ) ;
181
- let registry_client = self . registry_client . clone ( ) ;
182
- let subnet_id = self . subnet_id ;
183
- Box :: pin ( async move {
184
- let ( mut parts, body) = request. into_parts ( ) ;
185
- let body = match receive_request_body ( body) . await {
186
- Ok ( bytes) => bytes,
187
- Err ( e) => return Ok ( e) ,
188
- } ;
189
- let msg: SignedIngress = match SignedRequestBytes :: from ( body. to_vec ( ) ) . try_into ( ) {
190
- Ok ( msg) => msg,
191
- Err ( e) => {
192
- let res = make_plaintext_response (
193
- StatusCode :: BAD_REQUEST ,
194
- format ! ( "Could not parse body as call message: {}" , e) ,
195
- ) ;
196
- return Ok ( res) ;
197
- }
198
- } ;
199
-
200
- let effective_principal_id = match remove_effective_principal_id ( & mut parts) {
201
- Ok ( principal_id) => principal_id,
202
- Err ( res) => {
203
- error ! (
204
- log,
205
- "Effective canister ID is not attached to call request. This is a bug."
206
- ) ;
207
- return Ok ( res) ;
208
- }
209
- } ;
210
-
211
- let effective_canister_id =
212
- CanisterId :: unchecked_from_principal ( effective_principal_id) ;
213
-
214
- // Reject requests where `canister_id` != `effective_canister_id` for non mgmt canister calls.
215
- // This needs to be enforced because boundary nodes block access based on the `effective_canister_id`
216
- // in the url and the replica processes the request based on the `canister_id`.
217
- // If this is not enforced, a blocked canisters can still be accessed by specifying
218
- // a non-blocked `effective_canister_id` and a blocked `canister_id`.
219
- if msg. canister_id ( ) != CanisterId :: ic_00 ( )
220
- && msg. canister_id ( ) != effective_canister_id
221
- {
222
- let res = make_plaintext_response (
223
- StatusCode :: BAD_REQUEST ,
224
- format ! (
225
- "Specified CanisterId {} does not match effective canister id in URL {}" ,
226
- msg. canister_id( ) ,
227
- effective_canister_id
228
- ) ,
229
- ) ;
230
- return Ok ( res) ;
220
+ let message_id = msg. id ( ) ;
221
+ let registry_version = registry_client. get_latest_version ( ) ;
222
+ let ( ingress_registry_settings, provisional_whitelist) =
223
+ match get_registry_data ( & log, subnet_id, registry_version, registry_client. as_ref ( ) ) {
224
+ Ok ( ( s, p) ) => ( s, p) ,
225
+ Err ( HttpError { status, message } ) => {
226
+ return ( status, message) . into_response ( ) ;
231
227
}
228
+ } ;
229
+ if msg. count_bytes ( ) > ingress_registry_settings. max_ingress_bytes_per_message {
230
+ let status = StatusCode :: PAYLOAD_TOO_LARGE ;
231
+ let text = format ! (
232
+ "Request {} is too large. Message byte size {} is larger than the max allowed {}." ,
233
+ message_id,
234
+ msg. count_bytes( ) ,
235
+ ingress_registry_settings. max_ingress_bytes_per_message
236
+ ) ;
237
+ return ( status, text) . into_response ( ) ;
238
+ }
232
239
233
- let message_id = msg. id ( ) ;
234
- let registry_version = registry_client. get_latest_version ( ) ;
235
- let ( ingress_registry_settings, provisional_whitelist) = match get_registry_data (
236
- & log,
237
- subnet_id,
238
- registry_version,
239
- registry_client. as_ref ( ) ,
240
- ) {
241
- Ok ( ( s, p) ) => ( s, p) ,
242
- Err ( HttpError { status, message } ) => {
243
- return Ok ( make_plaintext_response ( status, message) ) ;
244
- }
245
- } ;
246
- if msg. count_bytes ( ) > ingress_registry_settings. max_ingress_bytes_per_message {
247
- let res = make_plaintext_response (
248
- StatusCode :: PAYLOAD_TOO_LARGE ,
249
- format ! (
250
- "Request {} is too large. Message byte size {} is larger than the max allowed {}." ,
251
- message_id,
252
- msg. count_bytes( ) ,
253
- ingress_registry_settings. max_ingress_bytes_per_message
254
- ) ,
255
- ) ;
256
- return Ok ( res) ;
257
- }
240
+ if let Err ( http_err) = validator_executor
241
+ . validate_request ( msg. as_ref ( ) . clone ( ) , registry_version)
242
+ . await
243
+ {
244
+ return ( http_err. status , http_err. message ) . into_response ( ) ;
245
+ }
258
246
259
- if let Err ( http_err) = validator_executor
260
- . validate_request ( msg. as_ref ( ) . clone ( ) , registry_version)
261
- . await
262
- {
263
- let res = make_plaintext_response ( http_err. status , http_err. message ) ;
264
- return Ok ( res) ;
265
- }
247
+ let ingress_filter = ingress_filter. lock ( ) . unwrap ( ) . clone ( ) ;
266
248
267
- match ingress_filter
268
- . oneshot ( ( provisional_whitelist, msg. content ( ) . clone ( ) ) )
269
- . await
270
- {
271
- Err ( _) => panic ! ( "Can't panic on Infallible" ) ,
272
- Ok ( Err ( err) ) => {
273
- return Ok ( make_response ( err) ) ;
274
- }
275
- Ok ( Ok ( ( ) ) ) => ( ) ,
276
- }
249
+ match ingress_filter
250
+ . oneshot ( ( provisional_whitelist, msg. content ( ) . clone ( ) ) )
251
+ . await
252
+ {
253
+ Err ( _) => panic ! ( "Can't panic on Infallible" ) ,
254
+ Ok ( Err ( err) ) => {
255
+ return CborUserError ( err) . into_response ( ) ;
256
+ }
257
+ Ok ( Ok ( ( ) ) ) => ( ) ,
258
+ }
277
259
278
- let ingress_log_entry = msg. log_entry ( ) ;
260
+ let ingress_log_entry = msg. log_entry ( ) ;
279
261
280
- let is_overloaded = ingress_throttler. read ( ) . unwrap ( ) . exceeds_threshold ( )
281
- || ingress_tx
282
- . send ( UnvalidatedArtifactMutation :: Insert ( ( msg, node_id) ) )
283
- . is_err ( ) ;
262
+ let is_overloaded = ingress_throttler. read ( ) . unwrap ( ) . exceeds_threshold ( )
263
+ || ingress_tx
264
+ . send ( UnvalidatedArtifactMutation :: Insert ( ( msg, node_id) ) )
265
+ . is_err ( ) ;
284
266
285
- let response = if is_overloaded {
286
- make_plaintext_response (
287
- StatusCode :: TOO_MANY_REQUESTS ,
288
- "Service is overloaded, try again later." . to_string ( ) ,
289
- )
290
- } else {
291
- // We're pretty much done, just need to send the message to ingress and
292
- // make_response to the client
293
- info_sample ! (
294
- "message_id" => & message_id,
295
- log,
296
- "ingress_message_submit" ;
297
- ingress_message => ingress_log_entry
298
- ) ;
299
- make_accepted_response ( )
300
- } ;
301
- Ok ( response)
302
- } )
267
+ if is_overloaded {
268
+ let status = StatusCode :: TOO_MANY_REQUESTS ;
269
+ let text = "Service is overloaded, try again later." . to_string ( ) ;
270
+ ( status, text) . into_response ( )
271
+ } else {
272
+ // We're pretty much done, just need to send the message to ingress and
273
+ // make_response to the client
274
+ info_sample ! (
275
+ "message_id" => & message_id,
276
+ log,
277
+ "ingress_message_submit" ;
278
+ ingress_message => ingress_log_entry
279
+ ) ;
280
+ let status = StatusCode :: ACCEPTED ;
281
+ ( status, "" ) . into_response ( )
303
282
}
304
283
}
305
284
306
- fn make_accepted_response ( ) -> Response < Body > {
307
- let mut response = Response :: new ( Body :: new ( String :: new ( ) . map_err ( BoxError :: from) ) ) ;
308
- * response. status_mut ( ) = StatusCode :: ACCEPTED ;
309
- response
310
- }
311
-
312
285
#[ cfg( test) ]
313
286
mod test {
314
287
use super :: * ;
0 commit comments