Skip to content

Commit

Permalink
simplify delay waiting logic
Browse files Browse the repository at this point in the history
  • Loading branch information
Jixiang Jiang committed Mar 25, 2022
1 parent e5bddc9 commit 9ccd79b
Show file tree
Hide file tree
Showing 5 changed files with 34 additions and 84 deletions.
1 change: 0 additions & 1 deletion crates/connector_proxy/src/connector_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -79,7 +79,6 @@ pub async fn run_airbyte_source_connector(
invoke_connector_delayed(entrypoint, args).await?;

let adapted_request_stream = airbyte_interceptor.adapt_request_stream(
child.id().ok_or(Error::MissingPid)?,
op,
NetworkProxyCaptureInterceptor::adapt_request_stream(op, request_stream())?,
)?;
Expand Down
6 changes: 3 additions & 3 deletions crates/connector_proxy/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,12 @@ pub enum Error {
#[error("missing process io pipes.")]
MissingIOPipe,

#[error("missing process id where expected.")]
MissingPid,

#[error("mismatching runtime protocol")]
MismatchingRuntimeProtocol,

#[error("No ready signal is received. {0}")]
NotReady(&'static str),

#[error("invalid endpoint json config.")]
InvalidEndpointConfig,

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use crate::libs::airbyte_catalog::{
self, ConfiguredCatalog, ConfiguredStream, DestinationSyncMode, Range, ResourceSpec, Status,
SyncMode,
};
use crate::libs::command::send_sigcont;
use crate::libs::command::READY;
use crate::libs::json::{create_root_schema, tokenize_jsonpointer};
use crate::libs::protobuf::{decode_message, encode_message};
use crate::libs::stream::stream_all_airbyte_messages;
Expand Down Expand Up @@ -53,17 +53,12 @@ impl AirbyteSourceInterceptor {
}
}

fn adapt_spec_request_stream(
&mut self,
pid: u32,
in_stream: InterceptorStream,
) -> InterceptorStream {
fn adapt_spec_request_stream(&mut self, in_stream: InterceptorStream) -> InterceptorStream {
Box::pin(try_stream! {
let mut reader = StreamReader::new(in_stream);
decode_message::<SpecRequest, _>(&mut reader).await?.ok_or(create_custom_error("missing spec request."))?;

//send_sigcont(pid)?;
yield Bytes::from("READY");
yield Bytes::from(READY);
})
}

Expand Down Expand Up @@ -94,7 +89,6 @@ impl AirbyteSourceInterceptor {

fn adapt_discover_request(
&mut self,
pid: u32,
config_file_path: String,
in_stream: InterceptorStream,
) -> InterceptorStream {
Expand All @@ -104,8 +98,7 @@ impl AirbyteSourceInterceptor {

File::create(config_file_path)?.write_all(request.endpoint_spec_json.as_bytes())?;

//send_sigcont(pid)?;
yield Bytes::from("READY");
yield Bytes::from(READY);
})
}

Expand Down Expand Up @@ -155,7 +148,6 @@ impl AirbyteSourceInterceptor {

fn adapt_validate_request_stream(
&mut self,
pid: u32,
config_file_path: String,
validate_request: Arc<Mutex<Option<ValidateRequest>>>,
in_stream: InterceptorStream,
Expand All @@ -167,8 +159,7 @@ impl AirbyteSourceInterceptor {

File::create(config_file_path)?.write_all(request.endpoint_spec_json.as_bytes())?;

//send_sigcont(pid)?;
yield Bytes::from("READY");
yield Bytes::from(READY);
})
}

Expand Down Expand Up @@ -210,7 +201,6 @@ impl AirbyteSourceInterceptor {

fn adapt_pull_request_stream(
&mut self,
pid: u32,
config_file_path: String,
catalog_file_path: String,
state_file_path: String,
Expand Down Expand Up @@ -274,9 +264,7 @@ impl AirbyteSourceInterceptor {
// release the lock.
drop(stream_to_binding);

// Resume the connector process.
//send_sigcont(pid)?;
yield Bytes::from("READY");
yield Bytes::from(READY);
}
})
}
Expand Down Expand Up @@ -394,7 +382,6 @@ impl AirbyteSourceInterceptor {

pub fn adapt_request_stream(
&mut self,
pid: u32,
op: &FlowCaptureOperation,
in_stream: InterceptorStream,
) -> Result<InterceptorStream, Error> {
Expand All @@ -403,18 +390,16 @@ impl AirbyteSourceInterceptor {
let state_file_path = self.input_file_path(STATE_FILE_NAME);

match op {
FlowCaptureOperation::Spec => Ok(self.adapt_spec_request_stream(pid, in_stream)),
FlowCaptureOperation::Spec => Ok(self.adapt_spec_request_stream(in_stream)),
FlowCaptureOperation::Discover => {
Ok(self.adapt_discover_request(pid, config_file_path, in_stream))
Ok(self.adapt_discover_request(config_file_path, in_stream))
}
FlowCaptureOperation::Validate => Ok(self.adapt_validate_request_stream(
pid,
config_file_path,
Arc::clone(&self.validate_request),
in_stream,
)),
FlowCaptureOperation::Pull => Ok(self.adapt_pull_request_stream(
pid,
config_file_path,
catalog_file_path,
state_file_path,
Expand Down
51 changes: 20 additions & 31 deletions crates/connector_proxy/src/libs/command.rs
Original file line number Diff line number Diff line change
@@ -1,14 +1,13 @@
use crate::errors::Error;

use serde::{Deserialize, Serialize};
use std::io::Write;
use std::process::{ExitStatus, Stdio};
use tempfile::NamedTempFile;
use tokio::io::AsyncReadExt;
use tokio::io::{AsyncRead, AsyncReadExt, AsyncWrite, AsyncWriteExt};
use tokio::process::{Child, ChildStderr, ChildStdin, ChildStdout, Command};
use tokio::time::timeout;

const READY: &[u8] = "READY".as_bytes();
pub const READY: &[u8] = "READY".as_bytes();

// Start the connector directly.
pub fn invoke_connector_direct(
Expand Down Expand Up @@ -94,46 +93,36 @@ pub async fn invoke_connector_delayed(
&vec!["delayed-execute".to_string(), config_file_path.to_string()],
)?;

let (child, stdin, stdout, mut stderr) = parse_child(child)?;
parse_child(child)
}

// Waiting for "READY" from the bouncer process.
pub async fn write_ready<W: AsyncWrite + std::marker::Unpin>(writer: &mut W) {
writer
.write_all(READY)
.await
.expect("failed writing to stderr");
writer.flush().await.expect("failed flushing to stderr");
}

pub async fn read_ready<R: AsyncRead + std::marker::Unpin>(reader: &mut R) -> Result<(), Error> {
let mut ready_buf: Vec<u8> = vec![0; READY.len()];
match timeout(
std::time::Duration::from_secs(1),
stderr.read_exact(&mut ready_buf),
reader.read_exact(&mut ready_buf),
)
.await
{
Ok(_) => {
if &ready_buf == READY {
return Ok((child, stdin, stdout, stderr));
Ok(())
} else {
tracing::error!("received unexpected bytes.");
Err(Error::NotReady("received unexpected bytes."))
}
}
Err(_) => {
tracing::error!("timeout: reading from delayed-connector process wrapper.");
}
};

return Err(Error::BouncerProcessStartError);
}

pub fn write_ready() {
std::io::stderr()
.write_all(READY)
.expect("failed writing to stderr");
std::io::stderr()
.flush()
.expect("failed flushing to stderr");
}

pub fn send_sigcont(pid: u32) -> Result<(), std::io::Error> {
tracing::info!("resuming bouncer process.");
//unsafe {
// libc::kill(pid as i32, libc::SIGCONT);
//}
Ok(())
Err(_) => Err(Error::NotReady(
"timeout: reading from delayed-connector process wrapper.",
)),
}
}

// A more flexible API for starting the connector.
Expand Down
29 changes: 3 additions & 26 deletions crates/connector_proxy/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -21,11 +21,10 @@ use connector_runner::{
};
use errors::Error;
use libs::{
command::{check_exit_status, invoke_connector, write_ready, CommandConfig},
command::{check_exit_status, invoke_connector, read_ready, CommandConfig},
image_inspect::ImageInspect,
};
use std::process::Stdio;
use tokio::time::timeout;

#[derive(Debug, ArgEnum, Clone)]
pub enum CaptureConnectorProtocol {
Expand Down Expand Up @@ -178,30 +177,8 @@ async fn proxy_flow_materialize(
}

async fn delayed_execute(command_config_path: String) -> Result<(), Error> {
// Send "READY" to indicate the process is ready to receive sigcont signals, and keep waiting.
write_ready();
// Stop itself
//unsafe {
// libc::kill(std::process::id() as i32, libc::SIGSTOP);
//}

// Waiting for "READY" from the parent process.
let mut ready_buf: Vec<u8> = vec![0; 5];
match timeout(
std::time::Duration::from_secs(1),
tokio::io::stdin().read_exact(&mut ready_buf),
)
.await
{
Ok(_) => {
if &ready_buf != "READY".as_bytes() {
panic!("received unexpected bytes.");
}
}
Err(_) => {
panic!("timeout: reading from parent process.");
}
};
// Wait for the "READY" signal from the parent process before starting the connector.
read_ready(&mut tokio::io::stdin()).await?;

tracing::info!("delayed process execution continue...");

Expand Down

0 comments on commit 9ccd79b

Please sign in to comment.