Skip to content

Commit

Permalink
feat!: make it possible to trace incoming and outgoing packetlines.
Browse files Browse the repository at this point in the history
Due to the way this is (and has to be) setup, unfortunately one
has to integrate that with two crates, instead of just one.

This changes touches multiple crates, most of which receive a single
boolean as last argument to indicate whether the tracing should
happen in the first place.
  • Loading branch information
Byron committed Oct 18, 2023
1 parent f9ae1bc commit c3edef1
Show file tree
Hide file tree
Showing 50 changed files with 358 additions and 80 deletions.
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

14 changes: 14 additions & 0 deletions gitoxide-core/src/pack/receive.rs
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,7 @@ impl<W> protocol::fetch::DelegateBlocking for CloneDelegate<W> {
mod blocking_io {
use std::{io, io::BufRead, path::PathBuf};

use gix::config::tree::Key;
use gix::{
bstr::BString,
protocol,
Expand Down Expand Up @@ -180,6 +181,12 @@ mod blocking_io {
progress,
protocol::FetchConnection::TerminateOnSuccessfulCompletion,
gix::env::agent(),
std::env::var_os(
gix::config::tree::Gitoxide::TRACE_PACKET
.environment_override()
.expect("set"),
)
.is_some(),
)?;
Ok(())
}
Expand All @@ -196,6 +203,7 @@ mod async_io {

use async_trait::async_trait;
use futures_io::AsyncBufRead;
use gix::config::tree::Key;
use gix::{
bstr::{BString, ByteSlice},
odb::pack,
Expand Down Expand Up @@ -264,6 +272,12 @@ mod async_io {
progress,
protocol::FetchConnection::TerminateOnSuccessfulCompletion,
gix::env::agent(),
std::env::var_os(
gix::config::tree::Gitoxide::TRACE_PACKET
.environment_override()
.expect("set"),
)
.is_some(),
))
})
.await?;
Expand Down
1 change: 1 addition & 0 deletions gix-filter/src/driver/process/client.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,6 +76,7 @@ impl Client {
let mut input = gix_packetline::StreamingPeekableIter::new(
process.stdout.take().expect("configured stdout when spawning"),
&[gix_packetline::PacketLineRef::Flush],
false, /* packet tracing */
);
let mut read = input.as_read();
let mut buf = String::new();
Expand Down
7 changes: 5 additions & 2 deletions gix-filter/src/driver/process/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,8 +63,11 @@ impl Server {
pick_version: &mut dyn FnMut(&[usize]) -> Option<usize>,
available_capabilities: &[&str],
) -> Result<Self, handshake::Error> {
let mut input =
gix_packetline::StreamingPeekableIter::new(stdin.lock(), &[gix_packetline::PacketLineRef::Flush]);
let mut input = gix_packetline::StreamingPeekableIter::new(
stdin.lock(),
&[gix_packetline::PacketLineRef::Flush],
false, /* packet tracing */
);
let mut read = input.as_read();
let mut buf = String::new();
read.read_line_to_string(&mut buf)?;
Expand Down
2 changes: 2 additions & 0 deletions gix-packetline-blocking/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,8 @@ blocking-io = []
serde = ["dep:serde", "bstr/serde"]

[dependencies]
gix-trace = { version = "^0.1.3", path = "../gix-trace" }

serde = { version = "1.0.114", optional = true, default-features = false, features = ["std", "derive"]}
thiserror = "1.0.34"
faster-hex = "0.8.0"
Expand Down
2 changes: 2 additions & 0 deletions gix-packetline/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,8 @@ path = "tests/blocking-packetline.rs"
required-features = ["blocking-io", "maybe-async/is_sync"]

[dependencies]
gix-trace = { version = "^0.1.3", path = "../gix-trace" }

serde = { version = "1.0.114", optional = true, default-features = false, features = ["std", "derive"]}
thiserror = "1.0.34"
faster-hex = "0.8.0"
Expand Down
2 changes: 2 additions & 0 deletions gix-packetline/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -91,6 +91,8 @@ pub struct StreamingPeekableIter<T> {
delimiters: &'static [PacketLineRef<'static>],
is_done: bool,
stopped_at: Option<PacketLineRef<'static>>,
#[cfg_attr(all(not(feature = "async-io"), not(feature = "blocking-io")), allow(dead_code))]
trace: bool,
}

/// Utilities to help decoding packet lines
Expand Down
20 changes: 20 additions & 0 deletions gix-packetline/src/read/async_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,12 +44,30 @@ where
delimiters: &[PacketLineRef<'static>],
fail_on_err_lines: bool,
buf_resize: bool,
trace: bool,
) -> ExhaustiveOutcome<'a> {
(
false,
None,
Some(match Self::read_line_inner(reader, buf).await {
Ok(Ok(line)) => {
if trace {
match line {
#[allow(unused_variables)]
PacketLineRef::Data(d) => {
gix_trace::trace!("<< {}", d.as_bstr().trim().as_bstr());
}
PacketLineRef::Flush => {
gix_trace::trace!("<< FLUSH");
}
PacketLineRef::Delimiter => {
gix_trace::trace!("<< DELIM");
}
PacketLineRef::ResponseEnd => {
gix_trace::trace!("<< RESPONSE_END");
}
}
}
if delimiters.contains(&line) {
let stopped_at = delimiters.iter().find(|l| **l == line).copied();
buf.clear();
Expand Down Expand Up @@ -111,6 +129,7 @@ where
self.delimiters,
self.fail_on_err_lines,
false,
self.trace,
)
.await;
self.is_done = is_done;
Expand All @@ -134,6 +153,7 @@ where
self.delimiters,
self.fail_on_err_lines,
true,
self.trace,
)
.await;
self.is_done = is_done;
Expand Down
21 changes: 21 additions & 0 deletions gix-packetline/src/read/blocking_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,12 +38,30 @@ where
delimiters: &[PacketLineRef<'static>],
fail_on_err_lines: bool,
buf_resize: bool,
trace: bool,
) -> ExhaustiveOutcome<'a> {
(
false,
None,
Some(match Self::read_line_inner(reader, buf) {
Ok(Ok(line)) => {
if trace {
match line {
#[allow(unused_variables)]
PacketLineRef::Data(d) => {
gix_trace::trace!("<< {}", d.as_bstr().trim().as_bstr());
}
PacketLineRef::Flush => {
gix_trace::trace!("<< FLUSH");
}
PacketLineRef::Delimiter => {
gix_trace::trace!("<< DELIM");
}
PacketLineRef::ResponseEnd => {
gix_trace::trace!("<< RESPONSE_END");
}
}
}
if delimiters.contains(&line) {
let stopped_at = delimiters.iter().find(|l| **l == line).copied();
buf.clear();
Expand All @@ -66,6 +84,7 @@ where
if buf_resize {
buf.resize(len, 0);
}
// TODO(borrowchk): remove additional decoding of internal buffer which is needed only to make it past borrowchk
Ok(Ok(crate::decode(buf).expect("only valid data here")))
}
Ok(Err(err)) => {
Expand Down Expand Up @@ -105,6 +124,7 @@ where
self.delimiters,
self.fail_on_err_lines,
false,
self.trace,
);
self.is_done = is_done;
self.stopped_at = stopped_at;
Expand All @@ -127,6 +147,7 @@ where
self.delimiters,
self.fail_on_err_lines,
true,
self.trace,
);
self.is_done = is_done;
self.stopped_at = stopped_at;
Expand Down
4 changes: 3 additions & 1 deletion gix-packetline/src/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,7 +43,8 @@ pub use error::Error;

impl<T> StreamingPeekableIter<T> {
/// Return a new instance from `read` which will stop decoding packet lines when receiving one of the given `delimiters`.
pub fn new(read: T, delimiters: &'static [PacketLineRef<'static>]) -> Self {
/// If `trace` is `true`, all packetlines received or sent will be passed to the facilities of the `gix-trace` crate.
pub fn new(read: T, delimiters: &'static [PacketLineRef<'static>], trace: bool) -> Self {
StreamingPeekableIter {
read,
#[cfg(any(feature = "blocking-io", feature = "async-io"))]
Expand All @@ -53,6 +54,7 @@ impl<T> StreamingPeekableIter<T> {
fail_on_err_lines: false,
is_done: false,
stopped_at: None,
trace,
}
}

Expand Down
4 changes: 2 additions & 2 deletions gix-packetline/src/read/sidebands/async_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -80,12 +80,12 @@ mod tests {
/// We want to declare items containing pointers of `StreamingPeekableIter` `Send` as well, so it must be `Send` itself.
#[test]
fn streaming_peekable_iter_is_send() {
receiver(StreamingPeekableIter::new(Vec::<u8>::new(), &[]));
receiver(StreamingPeekableIter::new(Vec::<u8>::new(), &[], false));
}

#[test]
fn state_is_send() {
let mut s = StreamingPeekableIter::new(Vec::<u8>::new(), &[]);
let mut s = StreamingPeekableIter::new(Vec::<u8>::new(), &[], false);
receiver(State::Idle { parent: Some(&mut s) });
}
}
Expand Down
16 changes: 9 additions & 7 deletions gix-packetline/tests/read/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,7 +20,7 @@ pub mod streaming_peek_iter {

#[maybe_async::test(feature = "blocking-io", async(feature = "async-io", async_std::test))]
async fn peek_follows_read_line_delimiter_logic() -> crate::Result {
let mut rd = gix_packetline::StreamingPeekableIter::new(&b"0005a00000005b"[..], &[PacketLineRef::Flush]);
let mut rd = gix_packetline::StreamingPeekableIter::new(&b"0005a00000005b"[..], &[PacketLineRef::Flush], false);
let res = rd.peek_line().await;
assert_eq!(res.expect("line")??, PacketLineRef::Data(b"a"));
rd.read_line().await;
Expand All @@ -46,7 +46,8 @@ pub mod streaming_peek_iter {

#[maybe_async::test(feature = "blocking-io", async(feature = "async-io", async_std::test))]
async fn peek_follows_read_line_err_logic() -> crate::Result {
let mut rd = gix_packetline::StreamingPeekableIter::new(&b"0005a0009ERR e0000"[..], &[PacketLineRef::Flush]);
let mut rd =
gix_packetline::StreamingPeekableIter::new(&b"0005a0009ERR e0000"[..], &[PacketLineRef::Flush], false);
rd.fail_on_err_lines(true);
let res = rd.peek_line().await;
assert_eq!(res.expect("line")??, PacketLineRef::Data(b"a"));
Expand All @@ -73,7 +74,8 @@ pub mod streaming_peek_iter {

#[maybe_async::test(feature = "blocking-io", async(feature = "async-io", async_std::test))]
async fn peek_non_data() -> crate::Result {
let mut rd = gix_packetline::StreamingPeekableIter::new(&b"000000010002"[..], &[PacketLineRef::ResponseEnd]);
let mut rd =
gix_packetline::StreamingPeekableIter::new(&b"000000010002"[..], &[PacketLineRef::ResponseEnd], false);
let res = rd.read_line().await;
assert_eq!(res.expect("line")??, PacketLineRef::Flush);
let res = rd.read_line().await;
Expand All @@ -100,7 +102,7 @@ pub mod streaming_peek_iter {
#[maybe_async::test(feature = "blocking-io", async(feature = "async-io", async_std::test))]
async fn fail_on_err_lines() -> crate::Result {
let input = b"00010009ERR e0002";
let mut rd = gix_packetline::StreamingPeekableIter::new(&input[..], &[]);
let mut rd = gix_packetline::StreamingPeekableIter::new(&input[..], &[], false);
let res = rd.read_line().await;
assert_eq!(res.expect("line")??, PacketLineRef::Delimiter);
let res = rd.read_line().await;
Expand All @@ -110,7 +112,7 @@ pub mod streaming_peek_iter {
"by default no special handling"
);

let mut rd = gix_packetline::StreamingPeekableIter::new(&input[..], &[]);
let mut rd = gix_packetline::StreamingPeekableIter::new(&input[..], &[], false);
rd.fail_on_err_lines(true);
let res = rd.read_line().await;
assert_eq!(res.expect("line")??, PacketLineRef::Delimiter);
Expand Down Expand Up @@ -138,7 +140,7 @@ pub mod streaming_peek_iter {
#[maybe_async::test(feature = "blocking-io", async(feature = "async-io", async_std::test))]
async fn peek() -> crate::Result {
let bytes = fixture_bytes("v1/fetch/01-many-refs.response");
let mut rd = gix_packetline::StreamingPeekableIter::new(&bytes[..], &[PacketLineRef::Flush]);
let mut rd = gix_packetline::StreamingPeekableIter::new(&bytes[..], &[PacketLineRef::Flush], false);
let res = rd.peek_line().await;
assert_eq!(res.expect("line")??, first_line(), "peek returns first line");
let res = rd.peek_line().await;
Expand Down Expand Up @@ -175,7 +177,7 @@ pub mod streaming_peek_iter {
async fn read_from_file_and_reader_advancement() -> crate::Result {
let mut bytes = fixture_bytes("v1/fetch/01-many-refs.response");
bytes.extend(fixture_bytes("v1/fetch/01-many-refs.response"));
let mut rd = gix_packetline::StreamingPeekableIter::new(&bytes[..], &[PacketLineRef::Flush]);
let mut rd = gix_packetline::StreamingPeekableIter::new(&bytes[..], &[PacketLineRef::Flush], false);
let res = rd.read_line().await;
assert_eq!(res.expect("line")??, first_line());
let res = exhaust(&mut rd).await;
Expand Down
12 changes: 6 additions & 6 deletions gix-packetline/tests/read/sideband.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ mod util {
#[maybe_async::test(feature = "blocking-io", async(feature = "async-io", async_std::test))]
async fn read_pack_with_progress_extraction() -> crate::Result {
let buf = fixture_bytes("v1/01-clone.combined-output");
let mut rd = gix_packetline::StreamingPeekableIter::new(&buf[..], &[PacketLineRef::Flush]);
let mut rd = gix_packetline::StreamingPeekableIter::new(&buf[..], &[PacketLineRef::Flush], false);

// Read without sideband decoding
let mut out = Vec::new();
Expand Down Expand Up @@ -103,7 +103,7 @@ async fn read_pack_with_progress_extraction() -> crate::Result {
async fn read_line_trait_method_reads_one_packet_line_at_a_time() -> crate::Result {
let buf = fixture_bytes("v1/01-clone.combined-output-no-binary");

let mut rd = gix_packetline::StreamingPeekableIter::new(&buf[..], &[PacketLineRef::Flush]);
let mut rd = gix_packetline::StreamingPeekableIter::new(&buf[..], &[PacketLineRef::Flush], false);

let mut out = String::new();
let mut r = rd.as_read();
Expand Down Expand Up @@ -149,7 +149,7 @@ async fn read_line_trait_method_reads_one_packet_line_at_a_time() -> crate::Resu
async fn readline_reads_one_packet_line_at_a_time() -> crate::Result {
let buf = fixture_bytes("v1/01-clone.combined-output-no-binary");

let mut rd = gix_packetline::StreamingPeekableIter::new(&buf[..], &[PacketLineRef::Flush]);
let mut rd = gix_packetline::StreamingPeekableIter::new(&buf[..], &[PacketLineRef::Flush], false);

let mut r = rd.as_read();
let line = r.read_data_line().await.unwrap()??.as_bstr().unwrap();
Expand Down Expand Up @@ -194,7 +194,7 @@ async fn readline_reads_one_packet_line_at_a_time() -> crate::Result {
#[maybe_async::test(feature = "blocking-io", async(feature = "async-io", async_std::test))]
async fn peek_past_an_actual_eof_is_an_error() -> crate::Result {
let input = b"0009ERR e";
let mut rd = gix_packetline::StreamingPeekableIter::new(&input[..], &[]);
let mut rd = gix_packetline::StreamingPeekableIter::new(&input[..], &[], false);
let mut reader = rd.as_read();
let res = reader.peek_data_line().await;
assert_eq!(res.expect("one line")??, b"ERR e");
Expand All @@ -218,7 +218,7 @@ async fn peek_past_an_actual_eof_is_an_error() -> crate::Result {
#[maybe_async::test(feature = "blocking-io", async(feature = "async-io", async_std::test))]
async fn peek_past_a_delimiter_is_no_error() -> crate::Result {
let input = b"0009hello0000";
let mut rd = gix_packetline::StreamingPeekableIter::new(&input[..], &[PacketLineRef::Flush]);
let mut rd = gix_packetline::StreamingPeekableIter::new(&input[..], &[PacketLineRef::Flush], false);
let mut reader = rd.as_read();
let res = reader.peek_data_line().await;
assert_eq!(res.expect("one line")??, b"hello");
Expand All @@ -238,7 +238,7 @@ async fn peek_past_a_delimiter_is_no_error() -> crate::Result {
#[maybe_async::test(feature = "blocking-io", async(feature = "async-io", async_std::test))]
async fn handling_of_err_lines() {
let input = b"0009ERR e0009ERR x0000";
let mut rd = gix_packetline::StreamingPeekableIter::new(&input[..], &[]);
let mut rd = gix_packetline::StreamingPeekableIter::new(&input[..], &[], false);
rd.fail_on_err_lines(true);
let mut buf = [0u8; 2];
let mut reader = rd.as_read();
Expand Down
8 changes: 6 additions & 2 deletions gix-protocol/src/fetch/arguments/async_io.rs
Original file line number Diff line number Diff line change
Expand Up @@ -19,8 +19,11 @@ impl Arguments {
transport.connection_persists_across_multiple_requests(),
add_done_argument,
)?;
let mut line_writer =
transport.request(client::WriteMode::OneLfTerminatedLinePerWriteCall, on_into_read)?;
let mut line_writer = transport.request(
client::WriteMode::OneLfTerminatedLinePerWriteCall,
on_into_read,
self.trace,
)?;
let had_args = !self.args.is_empty();
for arg in self.args.drain(..) {
line_writer.write_all(&arg).await?;
Expand All @@ -47,6 +50,7 @@ impl Arguments {
Command::Fetch.as_str(),
self.features.iter().filter(|(_, v)| v.is_some()).cloned(),
Some(std::mem::replace(&mut self.args, retained_state).into_iter()),
self.trace,
)
.await
}
Expand Down

0 comments on commit c3edef1

Please sign in to comment.