@@ -5,13 +5,11 @@ use crate::{
55use bytes:: Bytes ;
66use futures:: FutureExt ;
77use http:: header:: { CONTENT_TYPE , SET_COOKIE } ;
8- use http:: { Method , Request , Response , Uri } ;
8+ use http:: { HeaderMap , Method , Request , Response , StatusCode , Uri } ;
99use hyper:: body:: HttpBody ;
1010use hyper:: { client:: connect:: Connection , Body } ;
1111use lambda_runtime_api_client:: { build_request, Client } ;
12- use serde:: Deserialize ;
13- use serde_json:: json;
14- use std:: collections:: HashMap ;
12+ use serde:: { Deserialize , Serialize } ;
1513use std:: str:: FromStr ;
1614use std:: {
1715 env,
@@ -203,6 +201,16 @@ pub(crate) struct EventCompletionStreamingRequest<'a, B> {
203201 pub ( crate ) body : Response < B > ,
204202}
205203
204+ #[ derive( Debug , Serialize ) ]
205+ #[ serde( rename_all = "camelCase" ) ]
206+ struct MetadataPrelude {
207+ #[ serde( serialize_with = "http_serde::status_code::serialize" ) ]
208+ status_code : StatusCode ,
209+ #[ serde( serialize_with = "http_serde::header_map::serialize" ) ]
210+ headers : HeaderMap ,
211+ cookies : Vec < String > ,
212+ }
213+
206214impl < ' a , B > IntoRequest for EventCompletionStreamingRequest < ' a , B >
207215where
208216 B : HttpBody + Unpin + Send + ' static ,
@@ -216,45 +224,39 @@ where
216224 let ( parts, mut body) = self . body . into_parts ( ) ;
217225
218226 let mut builder = build_request ( ) . method ( Method :: POST ) . uri ( uri) ;
219- let headers = builder. headers_mut ( ) . unwrap ( ) ;
227+ let req_headers = builder. headers_mut ( ) . unwrap ( ) ;
220228
221- headers . insert ( "Transfer-Encoding" , "chunked" . parse ( ) ?) ;
222- headers . insert ( "Lambda-Runtime-Function-Response-Mode" , "streaming" . parse ( ) ?) ;
223- headers . insert (
229+ req_headers . insert ( "Transfer-Encoding" , "chunked" . parse ( ) ?) ;
230+ req_headers . insert ( "Lambda-Runtime-Function-Response-Mode" , "streaming" . parse ( ) ?) ;
231+ req_headers . insert (
224232 "Content-Type" ,
225233 "application/vnd.awslambda.http-integration-response" . parse ( ) ?,
226234 ) ;
227235
228- let ( mut tx, rx) = Body :: channel ( ) ;
236+ let mut prelude_headers = parts. headers ;
237+ // default Content-Type
238+ prelude_headers
239+ . entry ( CONTENT_TYPE )
240+ . or_insert ( "application/octet-stream" . parse ( ) ?) ;
229241
230- tokio :: spawn ( async move {
231- let mut header_map = parts . headers ;
232- // default Content-Type
233- header_map
234- . entry ( CONTENT_TYPE )
235- . or_insert ( "application/octet-stream" . parse ( ) . unwrap ( ) ) ;
242+ let cookies = prelude_headers . get_all ( SET_COOKIE ) ;
243+ let cookies = cookies
244+ . iter ( )
245+ . map ( |c| String :: from_utf8_lossy ( c . as_bytes ( ) ) . to_string ( ) )
246+ . collect :: < Vec < String > > ( ) ;
247+ prelude_headers . remove ( SET_COOKIE ) ;
236248
237- let cookies = header_map . get_all ( SET_COOKIE ) ;
238- let cookies = cookies
239- . iter ( )
240- . map ( |c| String :: from_utf8_lossy ( c . as_bytes ( ) ) . to_string ( ) )
241- . collect :: < Vec < String > > ( ) ;
249+ let metadata_prelude = serde_json :: to_string ( & MetadataPrelude {
250+ status_code : parts . status ,
251+ headers : prelude_headers ,
252+ cookies ,
253+ } ) ? ;
242254
243- let headers = header_map
244- . iter ( )
245- . filter ( |( k, _) | * k != SET_COOKIE )
246- . map ( |( k, v) | ( k. as_str ( ) , String :: from_utf8_lossy ( v. as_bytes ( ) ) . to_string ( ) ) )
247- . collect :: < HashMap < & str , String > > ( ) ;
255+ trace ! ( ?metadata_prelude) ;
248256
249- let metadata_prelude = json ! ( {
250- "statusCode" : parts. status. as_u16( ) ,
251- "headers" : headers,
252- "cookies" : cookies,
253- } )
254- . to_string ( ) ;
255-
256- trace ! ( "metadata_prelude: {}" , metadata_prelude) ;
257+ let ( mut tx, rx) = Body :: channel ( ) ;
257258
259+ tokio:: spawn ( async move {
258260 tx. send_data ( metadata_prelude. into ( ) ) . await . unwrap ( ) ;
259261 tx. send_data ( "\u{0} " . repeat ( 8 ) . into ( ) ) . await . unwrap ( ) ;
260262
0 commit comments