Skip to content

Commit

Permalink
tweaks
Browse files Browse the repository at this point in the history
  • Loading branch information
Jixiang Jiang committed Mar 20, 2022
1 parent ae0b168 commit 98f499e
Show file tree
Hide file tree
Showing 3 changed files with 14 additions and 15 deletions.
2 changes: 1 addition & 1 deletion crates/connector_proxy/src/libs/command.rs
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
use crate::errors::{raise_custom_error, Error};
use crate::errors::Error;

use serde::{Deserialize, Serialize};
use std::io::Write;
Expand Down
17 changes: 6 additions & 11 deletions crates/connector_proxy/src/libs/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -46,12 +46,11 @@ pub fn stream_all_airbyte_messages(
raise_custom_error(&format!("error in reading next in_stream: {:?}", e))?;
}
}
let buf_split = buf.split();
let chunk = buf_split.chunk();

let chunk = buf.chunk();
let deserializer = Deserializer::from_slice(&chunk);

// Deserialize to Value first, to avoid missing 'is_eof' signals in error.
// Deserialize to Value first, instead of Message, to avoid missing 'is_eof' signals in error.
let mut value_stream = deserializer.into_iter::<Value>();
while let Some(value) = value_stream.next() {
match value {
Expand All @@ -77,18 +76,14 @@ pub fn stream_all_airbyte_messages(
}
}

// TODO(Jixiang): Improve efficiency here.
// There are unnecessary copying activities in and out from the buf, especially for large messages that spans multiple
// bytes messages in the stream. Ideally, we could both write and read from the same buf. However, both reading and writing
// from the same buf is not recommended, which yields warning of https://github.com/rust-lang/rust/issues/59159.
let remaining = &chunk[value_stream.byte_offset()..];
buf.extend_from_slice(remaining);
let byte_offset = value_stream.byte_offset();
drop(buf.split_to(byte_offset));
}

if buf.len() > 0 {
raise_custom_error("unconsumed content in stream found!")?;
raise_custom_error("unconsumed content in stream found.")?;
}

tracing::info!("done reading all in_stream");
tracing::info!("done reading all in_stream.");
}
}
10 changes: 7 additions & 3 deletions crates/connector_proxy/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -10,7 +10,6 @@ use clap::{ArgEnum, Parser, Subcommand};
use tokio::{
io::AsyncReadExt,
signal::unix::{signal, SignalKind},
time::timeout,
};

use apis::{FlowCaptureOperation, FlowMaterializeOperation, FlowRuntimeProtocol};
Expand Down Expand Up @@ -201,12 +200,17 @@ async fn delayed_execute(command_config_path: String) -> Result<(), Error> {
match check_exit_status("delayed process", child.wait().await) {
Err(e) => {
let mut buf = Vec::new();
child.stderr.take().unwrap().read_to_end(&mut buf).await?;
child
.stderr
.take()
.ok_or(Error::MissingIOPipe)?
.read_to_end(&mut buf)
.await?;

tracing::error!(
"connector failed. command_config: {:?}. stderr from connector: {}",
&command_config,
std::str::from_utf8(&buf).unwrap()
std::str::from_utf8(&buf).expect("error when decoding stderr")
);
Err(e)
}
Expand Down

0 comments on commit 98f499e

Please sign in to comment.