Skip to content

Commit

Permalink
[clone] This actually works: first MVP of retrieving packs via clone
Browse files Browse the repository at this point in the history
  • Loading branch information
Byron committed Sep 10, 2020
1 parent 264ec82 commit c06d819
Show file tree
Hide file tree
Showing 11 changed files with 154 additions and 36 deletions.
13 changes: 13 additions & 0 deletions git-features/src/interrupt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -52,6 +52,19 @@ where
}
}

impl<R> io::BufRead for Read<R>
where
R: io::BufRead,
{
fn fill_buf(&mut self) -> io::Result<&[u8]> {
self.inner.fill_buf()
}

fn consume(&mut self, amt: usize) {
self.inner.consume(amt)
}
}

#[cfg(not(feature = "disable-interrupts"))]
static IS_INTERRUPTED: AtomicBool = AtomicBool::new(false);

Expand Down
14 changes: 14 additions & 0 deletions git-features/src/progress/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,3 +45,17 @@ where
Ok(bytes_read)
}
}

impl<R, P> io::BufRead for Read<R, P>
where
R: io::BufRead,
P: Progress,
{
fn fill_buf(&mut self) -> io::Result<&[u8]> {
self.reader.fill_buf()
}

fn consume(&mut self, amt: usize) {
self.reader.consume(amt)
}
}
96 changes: 80 additions & 16 deletions git-odb/src/pack/bundle/write/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,17 +25,59 @@ pub struct Options {
}

impl pack::Bundle {
pub fn write_stream_to_directory<P>(
pack: impl io::BufRead,
pack_size: Option<u64>,
directory: Option<impl AsRef<Path>>,
mut progress: P,
options: Options,
) -> Result<Outcome, Error>
where
P: Progress,
<P as Progress>::SubProgress: Send + 'static,
<<P as Progress>::SubProgress as Progress>::SubProgress: Send,
{
let mut read_progress = progress.add_child("read pack");
read_progress.init(pack_size.map(|s| s as usize), progress::bytes());
let pack = progress::Read {
reader: pack,
progress: progress::ThroughputOnDrop::new(read_progress),
};

let data_file = Arc::new(parking_lot::Mutex::new(match directory.as_ref() {
Some(directory) => NamedTempFile::new_in(directory.as_ref())?,
None => NamedTempFile::new()?,
}));
let data_path: PathBuf = data_file.lock().path().into();
let pack = PassThrough {
reader: interrupt::Read { inner: pack },
writer: Some(data_file.clone()),
};
let pack_entries_iter = pack::data::Iter::new_from_header(
pack,
options.iteration_mode,
pack::data::iter::CompressedBytesMode::CRC32,
)?;
let pack_kind = pack_entries_iter.kind();
let (outcome, data_path, index_path) =
pack::Bundle::inner_write(directory, progress, options, data_file, data_path, pack_entries_iter)?;

Ok(Outcome {
index: outcome,
pack_kind,
data_path,
index_path,
})
}
/// If `directory` is `None`, the output will be written to a sink
pub fn write_to_directory<P>(
/// In this case, `pack` will be read in its own thread to offset these costs.
/// If that's not possible, use `write_stream_to_directory` instead.
pub fn write_to_directory_eagerly<P>(
pack: impl io::Read + Send + 'static,
pack_size: Option<u64>,
directory: Option<impl AsRef<Path>>,
mut progress: P,
Options {
thread_limit,
iteration_mode,
index_kind,
}: Options,
options: Options,
) -> Result<Outcome, Error>
where
P: Progress,
Expand All @@ -48,7 +90,6 @@ impl pack::Bundle {
reader: pack,
progress: progress::ThroughputOnDrop::new(read_progress),
};
let indexing_progress = progress.add_child("create index file");

let data_file = Arc::new(parking_lot::Mutex::new(match directory.as_ref() {
Some(directory) => NamedTempFile::new_in(directory.as_ref())?,
Expand All @@ -63,15 +104,45 @@ impl pack::Bundle {
let buffered_pack = io::BufReader::with_capacity(eight_pages, pack);
let pack_entries_iter = pack::data::Iter::new_from_header(
buffered_pack,
iteration_mode,
options.iteration_mode,
pack::data::iter::CompressedBytesMode::CRC32,
)?;
let pack_kind = pack_entries_iter.kind();
let num_objects = pack_entries_iter.size_hint().0;
let pack_entries_iter =
git_features::parallel::EagerIterIf::new(|| num_objects > 25_000, pack_entries_iter, 5_000, 5);

let (outcome, data_path, index_path) = match directory {
let (outcome, data_path, index_path) =
pack::Bundle::inner_write(directory, progress, options, data_file, data_path, pack_entries_iter)?;

Ok(Outcome {
index: outcome,
pack_kind,
data_path,
index_path,
})
}

fn inner_write<P, I>(
directory: Option<impl AsRef<Path>>,
mut progress: P,
Options {
thread_limit,
iteration_mode: _,
index_kind,
}: Options,
data_file: Arc<parking_lot::Mutex<NamedTempFile>>,
data_path: PathBuf,
pack_entries_iter: I,
) -> Result<(pack::index::write::Outcome, Option<PathBuf>, Option<PathBuf>), Error>
where
I: Iterator<Item = Result<pack::data::iter::Entry, pack::data::iter::Error>>,
P: Progress,
<P as Progress>::SubProgress: Send + 'static,
<<P as Progress>::SubProgress as Progress>::SubProgress: Send,
{
let indexing_progress = progress.add_child("create index file");
Ok(match directory {
Some(directory) => {
let directory = directory.as_ref();
let mut index_file = NamedTempFile::new_in(directory)?;
Expand Down Expand Up @@ -115,13 +186,6 @@ impl pack::Bundle {
None,
None,
),
};

Ok(Outcome {
index: outcome,
pack_kind,
data_path,
index_path,
})
}
}
Expand Down
12 changes: 12 additions & 0 deletions git-odb/src/pack/bundle/write/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -36,3 +36,15 @@ where
Ok(bytes_read)
}
}
impl<R> io::BufRead for PassThrough<R>
where
R: io::BufRead,
{
fn fill_buf(&mut self) -> io::Result<&[u8]> {
self.reader.fill_buf()
}

fn consume(&mut self, amt: usize) {
self.reader.consume(amt)
}
}
2 changes: 1 addition & 1 deletion git-odb/tests/pack/bundle.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ mod write_to_directory {
pack_file: &str,
) -> Result<bundle::write::Outcome, Box<dyn std::error::Error>> {
let pack_file = fs::File::open(fixture_path(pack_file))?;
pack::Bundle::write_to_directory(
pack::Bundle::write_to_directory_eagerly(
pack_file,
None,
directory,
Expand Down
2 changes: 1 addition & 1 deletion git-protocol/src/fetch/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -180,7 +180,7 @@ where
return Ok(());
}

Response::check_required_features(&fetch_features)?;
Response::check_required_features(protocol_version, &fetch_features)?;
let sideband_all = fetch_features.iter().any(|(n, _)| *n == "sideband-all");
let mut arguments = Arguments::new(protocol_version, fetch_features)?;
let mut previous_response = None::<Response>;
Expand Down
23 changes: 14 additions & 9 deletions git-protocol/src/fetch/response.rs
Original file line number Diff line number Diff line change
Expand Up @@ -82,15 +82,20 @@ impl Response {
pub fn has_pack(&self) -> bool {
self.has_pack
}
pub fn check_required_features(features: &[Feature]) -> Result<(), Error> {
let has = |name: &str| features.iter().any(|f| f.0 == name);
// Let's focus on V2 standards, and simply not support old servers to keep our code simpler
if !has("multi_ack_detailed") {
return Err(Error::MissingServerCapability("multi_ack_detailed"));
}
// It's easy to NOT do sideband for us, but then again, everyone supports it.
if !has("side-band") && !has("side-band-64k") {
return Err(Error::MissingServerCapability("side-band OR side-band-64k"));
pub fn check_required_features(version: Protocol, features: &[Feature]) -> Result<(), Error> {
match version {
Protocol::V1 => {
let has = |name: &str| features.iter().any(|f| f.0 == name);
// Let's focus on V2 standards, and simply not support old servers to keep our code simpler
if !has("multi_ack_detailed") {
return Err(Error::MissingServerCapability("multi_ack_detailed"));
}
// It's easy to NOT do sideband for us, but then again, everyone supports it.
if !has("side-band") && !has("side-band-64k") {
return Err(Error::MissingServerCapability("side-band OR side-band-64k"));
}
}
Protocol::V2 => {}
}
Ok(())
}
Expand Down
4 changes: 2 additions & 2 deletions gitoxide-core/src/pack/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -90,11 +90,11 @@ where
Some(pack) => {
let pack_len = pack.metadata()?.len();
let pack_file = fs::File::open(pack)?;
pack::Bundle::write_to_directory(pack_file, Some(pack_len), directory, progress, options)
pack::Bundle::write_to_directory_eagerly(pack_file, Some(pack_len), directory, progress, options)
}
None => {
let stdin = io::stdin();
pack::Bundle::write_to_directory(stdin, None, directory, progress, options)
pack::Bundle::write_to_directory_eagerly(stdin, None, directory, progress, options)
}
}
.with_context(|| "Failed to write pack and index")?;
Expand Down
21 changes: 15 additions & 6 deletions gitoxide-core/src/pack/receive.rs
Original file line number Diff line number Diff line change
@@ -1,8 +1,11 @@
use crate::{OutputFormat, Protocol};
use git_features::progress::Progress;
use git_odb::pack;
use git_protocol::fetch::{Action, Arguments, Ref, Response};
use std::{io, io::BufRead, path::PathBuf};

pub const PROGRESS_RANGE: std::ops::RangeInclusive<u8> = 1..=2;

pub struct Context<W: io::Write> {
pub thread_limit: Option<usize>,
pub format: OutputFormat,
Expand All @@ -19,22 +22,28 @@ impl<W: io::Write> git_protocol::fetch::Delegate for CloneDelegate<W> {
for r in refs {
arguments.want(r.unpack_common().1.to_borrowed());
}
Action::Continue
Action::Close
}

fn receive_pack<P>(&mut self, input: impl BufRead, progress: P, refs: &[Ref], previous: &Response) -> io::Result<()>
fn receive_pack<P>(
&mut self,
input: impl BufRead,
progress: P,
_refs: &[Ref],
_previous: &Response,
) -> io::Result<()>
where
P: Progress,
<P as Progress>::SubProgress: Send + 'static,
<<P as Progress>::SubProgress as Progress>::SubProgress: Send + 'static,
{
let options = git_odb::pack::bundle::write::Options {
let options = pack::bundle::write::Options {
thread_limit: self.ctx.thread_limit,
index_kind: git_odb::pack::index::Kind::V2,
iteration_mode: git_odb::pack::data::iter::Mode::Verify,
index_kind: pack::index::Kind::V2,
iteration_mode: pack::data::iter::Mode::Verify,
};
let outcome =
git_odb::pack::bundle::Bundle::write_to_directory(input, None, self.directory.take(), progress, options)
pack::bundle::Bundle::write_stream_to_directory(input, None, self.directory.take(), progress, options)
.map_err(|err| io::Error::new(io::ErrorKind::Other, err))?;
writeln!(self.ctx.out, "{:?}", outcome)?;
Ok(())
Expand Down
2 changes: 1 addition & 1 deletion src/plumbing/lean/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,7 @@ pub fn main() -> Result<()> {
url,
directory,
}) => {
let (_handle, progress) = prepare(verbose, "pack-receive", None);
let (_handle, progress) = prepare(verbose, "pack-receive", core::pack::receive::PROGRESS_RANGE);
core::pack::receive(
protocol,
&url,
Expand Down
1 change: 1 addition & 0 deletions tasks.md
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@
* **gixp-pack-receive**
* [ ] hookup `git-protocol` with delegate to allow for receiving full packs
* [ ] **gixp-pack-receive** may optionally write received refs to the specified directory
* [ ] json support
* [ ] journey tests for each connection method
* [ ] file
* [ ] git
Expand Down

0 comments on commit c06d819

Please sign in to comment.