Skip to content

Commit

Permalink
operate on line bytes to allow handling of plaintext lines
Browse files Browse the repository at this point in the history
  • Loading branch information
mdibaiee committed Apr 14, 2022
1 parent beeb2d3 commit 69bccea
Show file tree
Hide file tree
Showing 5 changed files with 160 additions and 117 deletions.
12 changes: 12 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

2 changes: 2 additions & 0 deletions crates/connector_proxy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -33,5 +33,7 @@ tempfile="*"
thiserror = "*"
tokio = { version = "1.15.0", features = ["full"] }
tokio-util = { version = "*", features = ["io"] }
tokio-stream = { version = "*", features = ["io-util"] }
bytelines = "*"
tracing="*"
validator = { version = "*", features = ["derive"] }
Original file line number Diff line number Diff line change
Expand Up @@ -2,14 +2,14 @@ use crate::apis::{FlowCaptureOperation, InterceptorStream};
use crate::errors::{Error, Must};
use crate::libs::network_tunnel::NetworkTunnel;
use crate::libs::protobuf::{decode_message, encode_message};
use crate::libs::stream::{get_decoded_message, stream_all_bytes};
use crate::libs::stream::get_decoded_message;
use futures::{future, stream, StreamExt, TryStreamExt};
use protocol::capture::{
ApplyRequest, DiscoverRequest, PullRequest, SpecResponse, ValidateRequest,
};

use serde_json::value::RawValue;
use tokio_util::io::StreamReader;
use tokio_util::io::{ReaderStream, StreamReader};

pub struct NetworkTunnelCaptureInterceptor {}

Expand Down Expand Up @@ -81,7 +81,7 @@ impl NetworkTunnelCaptureInterceptor {
}

let first = stream::once(future::ready(encode_message(&request)));
let rest = stream_all_bytes(reader);
let rest = ReaderStream::new(reader);

// We need to set explicit error type, see https://github.com/rust-lang/rust/issues/63502
Ok::<_, std::io::Error>(first.chain(rest))
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,13 @@ use crate::apis::{FlowMaterializeOperation, InterceptorStream};
use crate::errors::{Error, Must};
use crate::libs::network_tunnel::NetworkTunnel;
use crate::libs::protobuf::{decode_message, encode_message};
use crate::libs::stream::{get_decoded_message, stream_all_bytes};
use crate::libs::stream::get_decoded_message;

use futures::{future, stream, StreamExt, TryStreamExt};
use protocol::materialize::{ApplyRequest, SpecResponse, TransactionRequest, ValidateRequest};

use serde_json::value::RawValue;
use tokio_util::io::StreamReader;
use tokio_util::io::{ReaderStream, StreamReader};

pub struct NetworkTunnelMaterializeInterceptor {}

Expand Down Expand Up @@ -63,7 +63,7 @@ impl NetworkTunnelMaterializeInterceptor {
}
}
let first = stream::once(future::ready(encode_message(&request)));
let rest = stream_all_bytes(reader);
let rest = ReaderStream::new(reader);

// We need to set explicit error type, see https://github.com/rust-lang/rust/issues/63502
Ok::<_, std::io::Error>(first.chain(rest))
Expand Down
Loading

0 comments on commit 69bccea

Please sign in to comment.