Skip to content

Commit

Permalink
operate on line bytes to allow handling of plaintext lines
Browse files Browse the repository at this point in the history
  • Loading branch information
mdibaiee committed Apr 14, 2022
1 parent d9b36e9 commit a5741a2
Show file tree
Hide file tree
Showing 5 changed files with 86 additions and 97 deletions.
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 1 addition & 8 deletions Makefile
Original file line number Diff line number Diff line change
Expand Up @@ -281,11 +281,4 @@ docker-push:
# It is invoked only for builds on the master branch.
.PHONY: docker-push-dev
docker-push-dev:
docker push ghcr.io/estuary/flow:dev

.PHONY: docker-push-dev
docker-push-mahdi: rust-binaries musl-binaries docker-image
docker tag ghcr.io/estuary/flow:dev mdibaiee/flow:dev
docker push mdibaiee/flow:dev
docker tag mdibaiee/flow:dev mdibaiee/flow:deepsync-stream
docker push mdibaiee/flow:deepsync-stream
docker push ghcr.io/estuary/flow:dev
1 change: 1 addition & 0 deletions crates/connector_proxy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,6 @@ tempfile="*"
thiserror = "*"
tokio = { version = "1.15.0", features = ["full"] }
tokio-util = { version = "*", features = ["io"] }
tokio-stream = { version = "*", features = ["io-util"] }
tracing="*"
validator = { version = "*", features = ["derive"] }
15 changes: 11 additions & 4 deletions crates/connector_proxy/src/connector_runner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,9 +8,10 @@ use crate::interceptors::{
use crate::libs::command::{
check_exit_status, invoke_connector_delayed, invoke_connector_direct, parse_child,
};
use tokio::io::copy;
use futures::TryStreamExt;
use tokio::io::{copy, AsyncBufReadExt, BufReader};
use tokio::process::{ChildStderr, ChildStdin, ChildStdout};
use tokio_util::io::{ReaderStream, StreamReader};
use tokio_util::io::StreamReader;

pub async fn run_flow_capture_connector(
op: &FlowCaptureOperation,
Expand Down Expand Up @@ -110,11 +111,17 @@ fn parse_entrypoint(entrypoint: &Vec<String>) -> Result<(String, Vec<String>), E
}

fn request_stream() -> InterceptorStream {
Box::pin(ReaderStream::new(tokio::io::stdin()))
Box::pin(
tokio_stream::wrappers::LinesStream::new(BufReader::new(tokio::io::stdin()).lines())
.map_ok(Into::into),
)
}

fn response_stream(child_stdout: ChildStdout) -> InterceptorStream {
Box::pin(ReaderStream::new(child_stdout))
Box::pin(
tokio_stream::wrappers::LinesStream::new(BufReader::new(child_stdout).lines())
.map_ok(Into::into),
)
}

async fn streaming_all(
Expand Down
157 changes: 72 additions & 85 deletions crates/connector_proxy/src/libs/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -2,111 +2,63 @@ use crate::libs::airbyte_catalog::Message;
use crate::{apis::InterceptorStream, errors::create_custom_error};

use crate::errors::raise_err;
use bytes::{Buf, Bytes, BytesMut};
use futures::{stream, StreamExt, TryStream, TryStreamExt};
use serde_json::{Deserializer, Value};
use tokio::io::{AsyncRead, AsyncReadExt};
use bytes::Bytes;
use futures::{StreamExt, TryStream, TryStreamExt};
use tokio::io::{AsyncBufReadExt, AsyncRead, BufReader};
use tokio_util::io::StreamReader;
use validator::Validate;

use super::airbyte_catalog::{Log, LogLevel, MessageType};
use super::protobuf::decode_message;

// Creates a stream of bytes of lines from the given reader
// This allows our other methods such as stream_airbyte_messages to operate
// on lines, simplifying their logic
pub fn stream_all_bytes<R: 'static + AsyncRead + std::marker::Unpin>(
reader: R,
) -> impl TryStream<Item = std::io::Result<Bytes>, Error = std::io::Error, Ok = bytes::Bytes> {
stream::try_unfold(reader, |mut r| async {
// consistent with the default capacity of ReaderStream.
// https://github.com/tokio-rs/tokio/blob/master/tokio-util/src/io/reader_stream.rs#L8
let mut buf = BytesMut::with_capacity(4096);
match r.read_buf(&mut buf).await {
Ok(0) => Ok(None),
Ok(_) => Ok(Some((Bytes::from(buf), r))),
Err(e) => raise_err(&format!("error during streaming {:?}.", e)),
}
})
tokio_stream::wrappers::LinesStream::new(BufReader::new(reader).lines()).map_ok(Into::into)
}

/// Given a stream of bytes, try to deserialize them into Airbyte Messages.
/// Given a stream of lines, try to deserialize them into Airbyte Messages.
/// This can be used when reading responses from the Airbyte connector, and will
/// handle validation of messages as well as handling of AirbyteLogMessages.
/// Will ignore* messages that cannot be parsed to an AirbyteMessage.
/// Will ignore* lines that cannot be parsed to an AirbyteMessage.
/// * See https://docs.airbyte.com/understanding-airbyte/airbyte-specification#the-airbyte-protocol
pub fn stream_airbyte_responses(
in_stream: InterceptorStream,
) -> impl TryStream<Item = std::io::Result<Message>, Ok = Message, Error = std::io::Error> {
stream::once(async {
let mut buf = BytesMut::new();
let items = in_stream
.map(move |bytes| {
let b = bytes?;
buf.extend_from_slice(b.chunk());
let chunk = buf.chunk();

// Deserialize to Value first, instead of Message, to avoid missing 'is_eof' signals in error.
let deserializer = Deserializer::from_slice(chunk);
let mut value_stream = deserializer.into_iter::<Value>();

// Turn Values into Messages and validate them
let values: Vec<Result<Message, std::io::Error>> = value_stream
.by_ref()
.map_while(|value| match value {
Ok(v) => Some(Ok(v)),
Err(e) => {
// we must stop as soon as we hit EOF to avoid
// progressing value_stream.byte_offset() so that we can
// safely drop the buffer up to byte_offset() and pick up the leftovers
// when working with the next bytes
if e.is_eof() {
return None;
}

Some(raise_err(&format!(
"error in decoding JSON: {:?}, {:?}",
e,
std::str::from_utf8(chunk)
)))
}
})
.map(|value| match value {
Ok(v) => {
let message: Message = match serde_json::from_value(v) {
Ok(m) => m,
// We ignore JSONs that are not Airbyte Messages according
// to the specification:
// https://docs.airbyte.com/understanding-airbyte/airbyte-specification#the-airbyte-protocol
Err(_) => return Ok(None),
};

message.validate().map_err(|e| {
create_custom_error(&format!("error in validating message {:?}", e))
})?;

tracing::debug!("read message:: {:?}", &message);
Ok(Some(message))
}
Err(e) => Err(e),
in_stream.try_filter_map(|line| async move {
let message: Message = match serde_json::from_slice(&line) {
Ok(m) => m,
Err(e) => {
// It is currently ambiguous for us whether Airbyte protocol specification
// mandates that there must be no plaintext or not, as such we handle all
// errors in parsing of stdout lines by logging the issue, but not failing
Message {
message_type: MessageType::Log,
connection_status: None,
state: None,
record: None,
spec: None,
catalog: None,
log: Some(Log {
level: LogLevel::Debug,
message: format!("Encountered error while trying to parse Airbyte Message: {:?} in line {:?}", e, line)
})
// Flipping the Option and Result to filter out the None values
.filter_map(|value| match value {
Ok(Some(v)) => Some(Ok(v)),
Ok(None) => None,
Err(e) => Some(Err(e)),
})
.collect();

let byte_offset = value_stream.byte_offset();
drop(buf.split_to(byte_offset));
}
}
};

Ok::<_, std::io::Error>(stream::iter(values))
})
.try_flatten();
message
.validate()
.map_err(|e| create_custom_error(&format!("error in validating message {:?}", e)))?;

// We need to set explicit error type, see https://github.com/rust-lang/rust/issues/63502
Ok::<_, std::io::Error>(items)
Ok(Some(message))
})
.try_flatten()
// Handle logs here so we don't have to worry about them everywhere else
.try_filter_map(|message| async {
// For AirbyteLogMessages, log them and then filter them out
// so that we don't have to handle them elsewhere
if let Some(log) = message.log {
log.log();
Ok(None)
Expand Down Expand Up @@ -159,7 +111,8 @@ where

#[cfg(test)]
mod test {
use futures::future;
use bytes::Buf;
use futures::{future, stream};

use crate::libs::airbyte_catalog::{ConnectionStatus, MessageType, Status};

Expand Down Expand Up @@ -241,4 +194,38 @@ mod test {
input_message.connection_status.unwrap()
);
}

#[tokio::test]
async fn test_stream_airbyte_responses_plaintext_mixed() {
let input_message = Message {
message_type: MessageType::ConnectionStatus,
log: None,
state: None,
record: None,
spec: None,
catalog: None,
connection_status: Some(ConnectionStatus {
status: Status::Succeeded,
message: Some("test".to_string()),
}),
};
let input = vec![
Ok::<_, std::io::Error>(
"I am plaintext!\n{\"type\": \"CONNECTION_STATUS\", \"connectionStatus\": {"
.as_bytes(),
),
Ok::<_, std::io::Error>("\"status\": \"SUCCEEDED\",\"message\":\"test\"}}".as_bytes()),
];
let stream = stream::iter(input);
let reader = StreamReader::new(stream);

let byte_stream = Box::pin(stream_all_bytes(reader));
let mut messages = Box::pin(stream_airbyte_responses(byte_stream));

let result = messages.next().await.unwrap().unwrap();
assert_eq!(
result.connection_status.unwrap(),
input_message.connection_status.unwrap()
);
}
}

0 comments on commit a5741a2

Please sign in to comment.