From b97f2629cc444a1d95ed1b55b3a2cf7f80423587 Mon Sep 17 00:00:00 2001 From: lfbrehm <97600985+lfbrehm@users.noreply.github.com> Date: Thu, 16 May 2024 13:10:25 +0200 Subject: [PATCH] fix: Additional tracing and error messages --- .../data_proxy/src/grpc_api/proxy_service.rs | 264 ++++++++++-------- 1 file changed, 153 insertions(+), 111 deletions(-) diff --git a/components/data_proxy/src/grpc_api/proxy_service.rs b/components/data_proxy/src/grpc_api/proxy_service.rs index 35f9cc97..6385ac3f 100644 --- a/components/data_proxy/src/grpc_api/proxy_service.rs +++ b/components/data_proxy/src/grpc_api/proxy_service.rs @@ -64,7 +64,7 @@ impl DataproxyReplicationServiceImpl { } } -#[derive(PartialEq, Eq, Hash, Clone)] +#[derive(PartialEq, Eq, Hash, Clone, Debug)] enum AckSync { ObjectInit(DieselUlid), ObjectChunk(DieselUlid, i64), // Object id, completed_chunks, chunks_idx @@ -86,133 +86,161 @@ impl DataproxyReplicationService for DataproxyReplicationServiceImpl { ) -> Result, tonic::Status> { trace!("Received request: {request:?}"); let (metadata, _, mut request) = request.into_parts(); + trace!("1"); let token = get_token_from_md(&metadata).map_err(|_| { error!(error = "Token not found"); tonic::Status::unauthenticated("Token not found") })?; + trace!("2"); // Sends initial Vec<(object, location)> to sync/ack/stream handlers let (object_input_send, object_input_rcv) = async_channel::bounded(5); + + trace!("3"); // Sends ack messages to the ack handler let (object_ack_send, object_ack_rcv) = async_channel::bounded(100); + + trace!("4"); // Sends the finished ack hash map to the output handler to compare send/ack let (object_sync_send, object_sync_rcv) = async_channel::bounded(1); + + trace!("5"); // Handles output stream let (object_output_send, object_output_rcv) = tokio::sync::mpsc::channel(255); + + trace!("6"); // Error and retry handling for ArunaStreamReadWriter let (retry_send, retry_rcv) = async_channel::bounded(5); + trace!("7"); + let finished_state_handler = Arc::new(Mutex::new(false)); + + trace!("8"); let finished_state_clone = finished_state_handler.clone(); + trace!("9"); + let (id, pk) = self.get_endpoint_from_token(&token).await?; + + trace!("10"); let pk = crate::auth::crypto::ed25519_to_x25519_pubkey(&pk.key) .map_err(|_| tonic::Status::internal("Unable to convert pubkey"))?; - // Recieving loop + trace!("11"); + // Receiving loop let proxy_replication_service = self.clone(); + + trace!("12"); let output_sender = object_output_send.clone(); + + trace!("13"); tokio::spawn(async move { - while let Ok(message) = request.message().await { - trace!(?message); - match message { - Some(message) => { - let PullReplicationRequest { message } = message; + loop { + match request.message().await { + Ok(message) => { + trace!(?message); match message { - Some(message) => match message { - Message::InitMessage(init) => { - trace!(?init); - let msg = - proxy_replication_service.check_permissions(init, id).await; - object_input_send.send(msg).await.map_err(|e| { - error!(error = ?e, msg = e.to_string()); - e - })?; - } - Message::InfoAckMessage(InfoAckMessage { object_id }) => { - let object_id = DieselUlid::from_str(&object_id)?; - // Send object init into acknowledgement sync handler - - object_ack_send - .send(AckSync::ObjectInit(object_id)) - .await - .map_err(|e| { - error!(error = ?e, msg = e.to_string()); - e - })?; - } - Message::ChunkAckMessage(ChunkAckMessage { - object_id, - chunk_idx, - }) => { - let object_id = DieselUlid::from_str(&object_id)?; - // Send object init into acknowledgement sync handler - object_ack_send - .send(AckSync::ObjectChunk(object_id, chunk_idx)) - .await - .map_err(|e| { - error!(error = ?e, msg = e.to_string()); - e - })?; - retry_send.send(None).await.map_err(|e| { - error!(error = ?e, msg = e.to_string()); - e - })?; - } - Message::ErrorMessage(ErrorMessage { error }) => { - if let Some(err) = error { - match err { - Error::RetryChunk(RetryChunkMessage { - object_id, - chunk_idx, - }) => { - let msg = Some((chunk_idx, object_id)); - retry_send.send(msg).await.map_err(|e| { + Some(message) => { + let PullReplicationRequest { message } = message; + match message { + Some(message) => match message { + Message::InitMessage(init) => { + trace!(?init); + let msg = proxy_replication_service + .check_permissions(init, id) + .await; + object_input_send.send(msg).await.map_err(|e| { + error!(error = ?e, msg = e.to_string()); + e + })?; + } + Message::InfoAckMessage(InfoAckMessage { object_id }) => { + let object_id = DieselUlid::from_str(&object_id)?; + // Send object init into acknowledgement sync handler + + object_ack_send + .send(AckSync::ObjectInit(object_id)) + .await + .map_err(|e| { + error!(error = ?e, msg = e.to_string()); + e + })?; + } + Message::ChunkAckMessage(ChunkAckMessage { + object_id, + chunk_idx, + }) => { + let object_id = DieselUlid::from_str(&object_id)?; + // Send object init into acknowledgement sync handler + object_ack_send + .send(AckSync::ObjectChunk(object_id, chunk_idx)) + .await + .map_err(|e| { + error!(error = ?e, msg = e.to_string()); + e + })?; + retry_send.send(None).await.map_err(|e| { + error!(error = ?e, msg = e.to_string()); + e + })?; + } + Message::ErrorMessage(ErrorMessage { error }) => { + if let Some(err) = error { + match err { + Error::RetryChunk(RetryChunkMessage { + object_id, + chunk_idx, + }) => { + let msg = Some((chunk_idx, object_id)); + retry_send.send(msg).await.map_err(|e| { tracing::error!(error = ?e, msg = e.to_string()); e })?; - } - Error::Abort(_) => { - error!(error = "Aborted sync"); - return Err(anyhow!("Aborted sync")); - } - Error::RetryObjectId(object_id) => { - let ulid = + } + Error::Abort(_) => { + error!(error = "Aborted sync"); + return Err(anyhow!("Aborted sync")); + } + Error::RetryObjectId(object_id) => { + let ulid = DieselUlid::from_str(&object_id).map_err(|e| { tracing::error!(error = ?e, msg = e.to_string()); e })?; - let (object, location) = proxy_replication_service - .cache - .get_resource_cloned(&ulid, true) - .await?; - if let Some(auth) = proxy_replication_service - .cache - .auth - .read() - .await - .as_ref() - { - // Returns claims.sub as id -> Can return UserIds or DataproxyIds - // -> UserIds cannot be found in object.endpoints, so this should be safe - let (dataproxy_id, _, _) = auth - .check_permissions(&token) - .map_err(|_| { - error!( + let (object, location) = + proxy_replication_service + .cache + .get_resource_cloned(&ulid, true) + .await?; + if let Some(auth) = + proxy_replication_service + .cache + .auth + .read() + .await + .as_ref() + { + // Returns claims.sub as id -> Can return UserIds or DataproxyIds + // -> UserIds cannot be found in object.endpoints, so this should be safe + let (dataproxy_id, _, _) = auth + .check_permissions(&token) + .map_err(|_| { + error!( error = "DataProxy not authenticated" ); - tonic::Status::unauthenticated( + tonic::Status::unauthenticated( "DataProxy not authenticated", ) - })?; - if !object - .endpoints - .iter() - .any(|ep| ep.id == dataproxy_id) - { - trace!("Endpoint has no permission to replicate object"); - output_sender + })?; + if !object + .endpoints + .iter() + .any(|ep| ep.id == dataproxy_id) + { + trace!("Endpoint has no permission to replicate object"); + output_sender .send(Err( tonic::Status::unauthenticated( "Access denied", @@ -223,8 +251,8 @@ impl DataproxyReplicationService for DataproxyReplicationServiceImpl { tracing::error!(error = ?e, msg = e.to_string()); e })?; - } else { - object_input_send + } else { + object_input_send .send(Ok(vec![( object, location @@ -239,45 +267,58 @@ impl DataproxyReplicationService for DataproxyReplicationServiceImpl { tracing::error!(error = ?e, msg = e.to_string()); e })?; - }; + }; + } + } } } } + Message::FinishMessage(_) => { + { + let mut lock = finished_state_clone.lock().await; + *lock = true; + } + object_ack_send.send(AckSync::Finish).await.map_err( + |e| { + error!(error = ?e, msg = e.to_string()); + e + }, + )?; + return Ok(()); + } + }, + _ => { + error!( + error = "No message provided in PullReplicationRequest" + ); + return Err(anyhow!( + "No message provided in PullReplicationRequest" + )); } } - Message::FinishMessage(_) => { - { - let mut lock = finished_state_clone.lock().await; - *lock = true; - } - object_ack_send.send(AckSync::Finish).await.map_err(|e| { - error!(error = ?e, msg = e.to_string()); - e - })?; - return Ok(()); - } - }, - _ => { + } + None => { error!(error = "No message provided in PullReplicationRequest"); return Err(anyhow!( "No message provided in PullReplicationRequest" )); } - } + }; } - None => { - error!(error = "No message provided in PullReplicationRequest"); - return Err(anyhow!("No message provided in PullReplicationRequest")); + Err(err) => { + trace!(?err); + return Err(anyhow!(err)); } - }; + } } - Ok::<(), anyhow::Error>(()) + // Ok::<(), anyhow::Error>(()) }); // Ack sync loop tokio::spawn(async move { let mut sync_map: HashSet = HashSet::new(); while let Ok(ref ack_msg) = object_ack_rcv.recv().await { + trace!(?ack_msg); match ack_msg { init @ AckSync::ObjectInit(_) => { sync_map.insert(init.clone()); @@ -334,6 +375,7 @@ impl DataproxyReplicationService for DataproxyReplicationServiceImpl { // Objects Ok(Ok(objects)) => { + trace!(?objects); // Store access-checked objects let mut stored_objects: HashMap = HashMap::default(); for (object, location) in objects {