From 98f499e66216765a3b3fb47cffe3a305b242f1da Mon Sep 17 00:00:00 2001 From: Jixiang Jiang Date: Sun, 20 Mar 2022 14:12:56 -0400 Subject: [PATCH] tweaks --- crates/connector_proxy/src/libs/command.rs | 2 +- crates/connector_proxy/src/libs/stream.rs | 17 ++++++----------- crates/connector_proxy/src/main.rs | 10 +++++++--- 3 files changed, 14 insertions(+), 15 deletions(-) diff --git a/crates/connector_proxy/src/libs/command.rs b/crates/connector_proxy/src/libs/command.rs index 1981577dbe..2ad0960595 100644 --- a/crates/connector_proxy/src/libs/command.rs +++ b/crates/connector_proxy/src/libs/command.rs @@ -1,4 +1,4 @@ -use crate::errors::{raise_custom_error, Error}; +use crate::errors::Error; use serde::{Deserialize, Serialize}; use std::io::Write; diff --git a/crates/connector_proxy/src/libs/stream.rs b/crates/connector_proxy/src/libs/stream.rs index 5b9f533048..33d1e441e4 100644 --- a/crates/connector_proxy/src/libs/stream.rs +++ b/crates/connector_proxy/src/libs/stream.rs @@ -46,12 +46,11 @@ pub fn stream_all_airbyte_messages( raise_custom_error(&format!("error in reading next in_stream: {:?}", e))?; } } - let buf_split = buf.split(); - let chunk = buf_split.chunk(); + let chunk = buf.chunk(); let deserializer = Deserializer::from_slice(&chunk); - // Deserialize to Value first, to avoid missing 'is_eof' signals in error. + // Deserialize to Value first, instead of Message, to avoid missing 'is_eof' signals in error. let mut value_stream = deserializer.into_iter::(); while let Some(value) = value_stream.next() { match value { @@ -77,18 +76,14 @@ pub fn stream_all_airbyte_messages( } } - // TODO(Jixiang): Improve efficiency here. - // There are unnecessary copying activities in and out from the buf, especially for large messages that spans multiple - // bytes messages in the stream. Ideally, we could both write and read from the same buf. However, both reading and writing - // from the same buf is not recommended, which yields warning of https://github.com/rust-lang/rust/issues/59159. - let remaining = &chunk[value_stream.byte_offset()..]; - buf.extend_from_slice(remaining); + let byte_offset = value_stream.byte_offset(); + drop(buf.split_to(byte_offset)); } if buf.len() > 0 { - raise_custom_error("unconsumed content in stream found!")?; + raise_custom_error("unconsumed content in stream found.")?; } - tracing::info!("done reading all in_stream"); + tracing::info!("done reading all in_stream."); } } diff --git a/crates/connector_proxy/src/main.rs b/crates/connector_proxy/src/main.rs index f835fe8108..b38c9550b7 100644 --- a/crates/connector_proxy/src/main.rs +++ b/crates/connector_proxy/src/main.rs @@ -10,7 +10,6 @@ use clap::{ArgEnum, Parser, Subcommand}; use tokio::{ io::AsyncReadExt, signal::unix::{signal, SignalKind}, - time::timeout, }; use apis::{FlowCaptureOperation, FlowMaterializeOperation, FlowRuntimeProtocol}; @@ -201,12 +200,17 @@ async fn delayed_execute(command_config_path: String) -> Result<(), Error> { match check_exit_status("delayed process", child.wait().await) { Err(e) => { let mut buf = Vec::new(); - child.stderr.take().unwrap().read_to_end(&mut buf).await?; + child + .stderr + .take() + .ok_or(Error::MissingIOPipe)? + .read_to_end(&mut buf) + .await?; tracing::error!( "connector failed. command_config: {:?}. stderr from connector: {}", &command_config, - std::str::from_utf8(&buf).unwrap() + std::str::from_utf8(&buf).expect("error when decoding stderr") ); Err(e) }