Skip to content

Commit

Permalink
simplify delay waiting logic
Browse files Browse the repository at this point in the history
  • Loading branch information
Jixiang Jiang committed Mar 25, 2022
1 parent e5bddc9 commit 85915af
Show file tree
Hide file tree
Showing 5 changed files with 45 additions and 108 deletions.
11 changes: 6 additions & 5 deletions crates/connector_proxy/src/connector_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,9 @@ use crate::interceptors::{
network_proxy_capture_interceptor::NetworkProxyCaptureInterceptor,
network_proxy_materialize_interceptor::NetworkProxyMaterializeInterceptor,
};
use crate::libs::command::{check_exit_status, invoke_connector_delayed, invoke_connector_direct};
use crate::libs::command::{
check_exit_status, invoke_connector_delayed, invoke_connector_direct, parse_child,
};
use tokio::io::copy;
use tokio::process::{ChildStderr, ChildStdin, ChildStdout};
use tokio_util::io::{ReaderStream, StreamReader};
Expand All @@ -18,7 +20,7 @@ pub async fn run_flow_capture_connector(
args.push(op.to_string());

let (mut child, child_stdin, child_stdout, child_stderr) =
invoke_connector_direct(entrypoint, args)?;
parse_child(invoke_connector_direct(entrypoint, args)?)?;

let adapted_request_stream =
NetworkProxyCaptureInterceptor::adapt_request_stream(op, request_stream())?;
Expand All @@ -45,7 +47,7 @@ pub async fn run_flow_materialize_connector(
args.push(op.to_string());

let (mut child, child_stdin, child_stdout, child_stderr) =
invoke_connector_direct(entrypoint, args)?;
parse_child(invoke_connector_direct(entrypoint, args)?)?;

let adapted_request_stream =
NetworkProxyMaterializeInterceptor::adapt_request_stream(op, request_stream())?;
Expand Down Expand Up @@ -76,10 +78,9 @@ pub async fn run_airbyte_source_connector(
let args = airbyte_interceptor.adapt_command_args(op, args)?;

let (mut child, child_stdin, child_stdout, child_stderr) =
invoke_connector_delayed(entrypoint, args).await?;
parse_child(invoke_connector_delayed(entrypoint, args).await?)?;

let adapted_request_stream = airbyte_interceptor.adapt_request_stream(
child.id().ok_or(Error::MissingPid)?,
op,
NetworkProxyCaptureInterceptor::adapt_request_stream(op, request_stream())?,
)?;
Expand Down
6 changes: 3 additions & 3 deletions crates/connector_proxy/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@ pub enum Error {
#[error("missing process io pipes.")]
MissingIOPipe,

#[error("missing process id where expected.")]
MissingPid,

#[error("mismatching runtime protocol")]
MismatchingRuntimeProtocol,

#[error("No ready signal is received. {0}")]
NotReady(&'static str),

#[error("invalid endpoint json config.")]
InvalidEndpointConfig,

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::libs::airbyte_catalog::{
self, ConfiguredCatalog, ConfiguredStream, DestinationSyncMode, Range, ResourceSpec, Status,
SyncMode,
};
use crate::libs::command::send_sigcont;
use crate::libs::command::READY;
use crate::libs::json::{create_root_schema, tokenize_jsonpointer};
use crate::libs::protobuf::{decode_message, encode_message};
use crate::libs::stream::stream_all_airbyte_messages;
Expand All @@ -18,7 +18,6 @@ use protocol::capture::{
};
use protocol::flow::{DriverCheckpoint, Slice};
use std::collections::HashMap;
use std::str::FromStr;
use std::sync::Arc;
use tokio::sync::Mutex;
use validator::Validate;
Expand Down Expand Up @@ -53,17 +52,12 @@ impl AirbyteSourceInterceptor {
}
}

fn adapt_spec_request_stream(
&mut self,
pid: u32,
in_stream: InterceptorStream,
) -> InterceptorStream {
fn adapt_spec_request_stream(&mut self, in_stream: InterceptorStream) -> InterceptorStream {
Box::pin(try_stream! {
let mut reader = StreamReader::new(in_stream);
decode_message::<SpecRequest, _>(&mut reader).await?.ok_or(create_custom_error("missing spec request."))?;

//send_sigcont(pid)?;
yield Bytes::from("READY");
yield Bytes::from(READY);
})
}

Expand Down Expand Up @@ -94,7 +88,6 @@ impl AirbyteSourceInterceptor {

fn adapt_discover_request(
&mut self,
pid: u32,
config_file_path: String,
in_stream: InterceptorStream,
) -> InterceptorStream {
Expand All @@ -104,8 +97,7 @@ impl AirbyteSourceInterceptor {

File::create(config_file_path)?.write_all(request.endpoint_spec_json.as_bytes())?;

//send_sigcont(pid)?;
yield Bytes::from("READY");
yield Bytes::from(READY);
})
}

Expand Down Expand Up @@ -155,7 +147,6 @@ impl AirbyteSourceInterceptor {

fn adapt_validate_request_stream(
&mut self,
pid: u32,
config_file_path: String,
validate_request: Arc<Mutex<Option<ValidateRequest>>>,
in_stream: InterceptorStream,
Expand All @@ -167,8 +158,7 @@ impl AirbyteSourceInterceptor {

File::create(config_file_path)?.write_all(request.endpoint_spec_json.as_bytes())?;

//send_sigcont(pid)?;
yield Bytes::from("READY");
yield Bytes::from(READY);
})
}

Expand Down Expand Up @@ -210,7 +200,6 @@ impl AirbyteSourceInterceptor {

fn adapt_pull_request_stream(
&mut self,
pid: u32,
config_file_path: String,
catalog_file_path: String,
state_file_path: String,
Expand Down Expand Up @@ -274,9 +263,7 @@ impl AirbyteSourceInterceptor {
// release the lock.
drop(stream_to_binding);

// Resume the connector process.
//send_sigcont(pid)?;
yield Bytes::from("READY");
yield Bytes::from(READY);
}
})
}
Expand Down Expand Up @@ -337,8 +324,8 @@ impl AirbyteSourceInterceptor {
}

if transaction_pending {
// We generate a synthetic commit now,
// and the empty checkpoint means the assumed behavior of the next invocation will be "full refresh".
// We generate a synthetic commit now, and the empty checkpoint means the assumed behavior
// of the next invocation will be "full refresh".
let mut resp = PullResponse::default();
resp.checkpoint = Some(DriverCheckpoint{
driver_checkpoint_json: Vec::new(),
Expand Down Expand Up @@ -394,7 +381,6 @@ impl AirbyteSourceInterceptor {

pub fn adapt_request_stream(
&mut self,
pid: u32,
op: &FlowCaptureOperation,
in_stream: InterceptorStream,
) -> Result<InterceptorStream, Error> {
Expand All @@ -403,18 +389,16 @@ impl AirbyteSourceInterceptor {
let state_file_path = self.input_file_path(STATE_FILE_NAME);

match op {
FlowCaptureOperation::Spec => Ok(self.adapt_spec_request_stream(pid, in_stream)),
FlowCaptureOperation::Spec => Ok(self.adapt_spec_request_stream(in_stream)),
FlowCaptureOperation::Discover => {
Ok(self.adapt_discover_request(pid, config_file_path, in_stream))
Ok(self.adapt_discover_request(config_file_path, in_stream))
}
FlowCaptureOperation::Validate => Ok(self.adapt_validate_request_stream(
pid,
config_file_path,
Arc::clone(&self.validate_request),
in_stream,
)),
FlowCaptureOperation::Pull => Ok(self.adapt_pull_request_stream(
pid,
config_file_path,
catalog_file_path,
state_file_path,
Expand Down
71 changes: 23 additions & 48 deletions crates/connector_proxy/src/libs/command.rs
Original file line number Diff line number Diff line change
@@ -1,29 +1,23 @@
use crate::errors::Error;

use serde::{Deserialize, Serialize};
use std::io::Write;
use std::process::{ExitStatus, Stdio};
use tempfile::NamedTempFile;
use tokio::io::AsyncReadExt;
use tokio::io::{AsyncRead, AsyncReadExt};
use tokio::process::{Child, ChildStderr, ChildStdin, ChildStdout, Command};
use tokio::time::timeout;

const READY: &[u8] = "READY".as_bytes();
pub const READY: &[u8] = "READY".as_bytes();

// Start the connector directly.
pub fn invoke_connector_direct(
entrypoint: String,
args: Vec<String>,
) -> Result<(Child, ChildStdin, ChildStdout, ChildStderr), Error> {
let child = invoke_connector(
pub fn invoke_connector_direct(entrypoint: String, args: Vec<String>) -> Result<Child, Error> {
invoke_connector(
Stdio::piped(),
Stdio::piped(),
Stdio::piped(),
&entrypoint,
&args,
)?;

parse_child(child)
)
}

// Check the connector execution exit status.
Expand Down Expand Up @@ -58,14 +52,13 @@ pub struct CommandConfig {
pub args: Vec<String>,
}
// Instead of starting the connector directly, `invoke_connector_delayed` starts a bouncer process first, which will
// start the real connector after receiving a sigcont signal. Two actions are involved,
// 1. the bouncer process calls the `write_ready` function to inform the parent process that it is ready to receive sigcont signal.
// 2. upon informed, the parent process starts preparing for the connector execution (e.g. creating the input data file),
// and triggers the bouncer process to start the connector (via the `send_sigcont` function) once preparation is done.
// start the real connector after reading a "READY" string from Stdin. Two actions are involved,
// The caller of `invoke_connector_delayed` is responsible of sending "READY" to the Stdin of the returned Child process,
// before sending anything else.
pub async fn invoke_connector_delayed(
entrypoint: String,
args: Vec<String>,
) -> Result<(Child, ChildStdin, ChildStdout, ChildStderr), Error> {
) -> Result<Child, Error> {
tracing::info!("invoke delayed connector {}, {:?}", entrypoint, args);

// Saves the configs to start the connector.
Expand All @@ -86,54 +79,34 @@ pub async fn invoke_connector_delayed(
.to_str()
.expect("unexpected binary path");

let child = invoke_connector(
invoke_connector(
Stdio::piped(),
Stdio::piped(),
Stdio::piped(),
bouncer_process_entrypoint,
&vec!["delayed-execute".to_string(), config_file_path.to_string()],
)?;

let (child, stdin, stdout, mut stderr) = parse_child(child)?;
)
}

// Waiting for "READY" from the bouncer process.
pub async fn read_ready<R: AsyncRead + std::marker::Unpin>(reader: &mut R) -> Result<(), Error> {
let mut ready_buf: Vec<u8> = vec![0; READY.len()];
match timeout(
std::time::Duration::from_secs(1),
stderr.read_exact(&mut ready_buf),
reader.read_exact(&mut ready_buf),
)
.await
{
Ok(_) => {
if &ready_buf == READY {
return Ok((child, stdin, stdout, stderr));
Ok(())
} else {
tracing::error!("received unexpected bytes.");
Err(Error::NotReady("received unexpected bytes."))
}
}
Err(_) => {
tracing::error!("timeout: reading from delayed-connector process wrapper.");
}
};

return Err(Error::BouncerProcessStartError);
}

pub fn write_ready() {
std::io::stderr()
.write_all(READY)
.expect("failed writing to stderr");
std::io::stderr()
.flush()
.expect("failed flushing to stderr");
}

pub fn send_sigcont(pid: u32) -> Result<(), std::io::Error> {
tracing::info!("resuming bouncer process.");
//unsafe {
// libc::kill(pid as i32, libc::SIGCONT);
//}
Ok(())
Err(_) => Err(Error::NotReady(
"timeout: reading from delayed-connector process wrapper.",
)),
}
}

// A more flexible API for starting the connector.
Expand All @@ -155,7 +128,9 @@ pub fn invoke_connector(
.map_err(|e| e.into())
}

fn parse_child(mut child: Child) -> Result<(Child, ChildStdin, ChildStdout, ChildStderr), Error> {
pub fn parse_child(
mut child: Child,
) -> Result<(Child, ChildStdin, ChildStdout, ChildStderr), Error> {
let stdout = child.stdout.take().ok_or(Error::MissingIOPipe)?;
let stdin = child.stdin.take().ok_or(Error::MissingIOPipe)?;
let stderr = child.stderr.take().ok_or(Error::MissingIOPipe)?;
Expand Down
29 changes: 3 additions & 26 deletions crates/connector_proxy/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,10 @@ use connector_runner::{
};
use errors::Error;
use libs::{
command::{check_exit_status, invoke_connector, write_ready, CommandConfig},
command::{check_exit_status, invoke_connector, read_ready, CommandConfig},
image_inspect::ImageInspect,
};
use std::process::Stdio;
use tokio::time::timeout;

#[derive(Debug, ArgEnum, Clone)]
pub enum CaptureConnectorProtocol {
Expand Down Expand Up @@ -178,30 +177,8 @@ async fn proxy_flow_materialize(
}

async fn delayed_execute(command_config_path: String) -> Result<(), Error> {
// Send "READY" to indicate the process is ready to receive sigcont signals, and keep waiting.
write_ready();
// Stop itself
//unsafe {
// libc::kill(std::process::id() as i32, libc::SIGSTOP);
//}

// Waiting for "READY" from the parent process.
let mut ready_buf: Vec<u8> = vec![0; 5];
match timeout(
std::time::Duration::from_secs(1),
tokio::io::stdin().read_exact(&mut ready_buf),
)
.await
{
Ok(_) => {
if &ready_buf != "READY".as_bytes() {
panic!("received unexpected bytes.");
}
}
Err(_) => {
panic!("timeout: reading from parent process.");
}
};
// Wait for the "READY" signal from the parent process before starting the connector.
read_ready(&mut tokio::io::stdin()).await?;

tracing::info!("delayed process execution continue...");

Expand Down

0 comments on commit 85915af

Please sign in to comment.