Skip to content

Commit

Permalink
remove memory mode entirely (and some complexity with it)
Browse files Browse the repository at this point in the history
  • Loading branch information
Byron committed Aug 4, 2020
1 parent 657aa2c commit 8812e91
Show file tree
Hide file tree
Showing 8 changed files with 47 additions and 222 deletions.
66 changes: 17 additions & 49 deletions git-odb/src/pack/bundle/write/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -11,15 +11,14 @@ use error::Error;

mod types;
use filebuffer::FileBuffer;
pub use types::Outcome;
use types::PassThrough;
pub use types::{MemoryMode, Outcome};

#[derive(PartialEq, Eq, Debug, Hash, Ord, PartialOrd, Clone)]
#[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))]
pub struct Options {
pub thread_limit: Option<usize>,
pub iteration_mode: pack::data::iter::Mode,
pub memory_mode: MemoryMode,
pub index_kind: pack::index::Kind,
}

Expand All @@ -33,7 +32,6 @@ impl pack::Bundle {
Options {
thread_limit,
iteration_mode,
memory_mode,
index_kind,
}: Options,
) -> Result<Outcome, Error>
Expand All @@ -47,12 +45,10 @@ impl pack::Bundle {
reader: pack,
progress: read_progress,
};
let is_in_memory_completely = memory_mode.is_in_memory();
let indexing_progress = progress.add_child("create index file");

let data_file = match directory.as_ref() {
Some(directory) => Some(NamedTempFile::new_in(directory.as_ref())?),
None if is_in_memory_completely => None,
None => Some(NamedTempFile::new()?),
};
let data_path: Option<PathBuf> = data_file.as_ref().map(|f| f.as_ref().into());
Expand All @@ -68,27 +64,14 @@ impl pack::Bundle {
let directory = directory.as_ref();
let mut index_file = io::BufWriter::with_capacity(4096 * 8, NamedTempFile::new_in(directory)?);

let outcome = if is_in_memory_completely {
pack::index::File::write_data_iter_to_stream(
index_kind,
pack::index::write::Mode::noop_resolver,
memory_mode,
pack_entries_iter,
thread_limit,
indexing_progress,
&mut index_file,
)
} else {
pack::index::File::write_data_iter_to_stream(
index_kind,
move || new_pack_file_resolver(data_path),
memory_mode,
pack_entries_iter,
thread_limit,
indexing_progress,
&mut index_file,
)
}?;
let outcome = pack::index::File::write_data_iter_to_stream(
index_kind,
move || new_pack_file_resolver(data_path),
pack_entries_iter,
thread_limit,
indexing_progress,
&mut index_file,
)?;

let data_file = pack.writer.expect("data file to always be set in write mode");
let index_path = directory.join(format!("{}.idx", outcome.index_hash.to_sha1_hex_string()));
Expand All @@ -108,29 +91,14 @@ impl pack::Bundle {
})?;
outcome
}
None => {
if is_in_memory_completely {
pack::index::File::write_data_iter_to_stream(
index_kind,
pack::index::write::Mode::noop_resolver,
memory_mode,
pack_entries_iter,
thread_limit,
indexing_progress,
io::sink(),
)
} else {
pack::index::File::write_data_iter_to_stream(
index_kind,
move || new_pack_file_resolver(data_path),
memory_mode,
pack_entries_iter,
thread_limit,
indexing_progress,
io::sink(),
)
}?
}
None => pack::index::File::write_data_iter_to_stream(
index_kind,
move || new_pack_file_resolver(data_path),
pack_entries_iter,
thread_limit,
indexing_progress,
io::sink(),
)?,
};

Ok(Outcome {
Expand Down
2 changes: 0 additions & 2 deletions git-odb/src/pack/bundle/write/types.rs
Original file line number Diff line number Diff line change
Expand Up @@ -9,8 +9,6 @@ pub struct Outcome {
pub pack_kind: pack::data::Kind,
}

pub type MemoryMode = pack::index::write::Mode;

pub(crate) struct PassThrough<R> {
pub reader: R,
pub writer: Option<NamedTempFile>,
Expand Down
64 changes: 27 additions & 37 deletions git-odb/src/pack/index/write/consume.rs
Original file line number Diff line number Diff line change
@@ -1,70 +1,60 @@
use crate::{
hash, loose, pack,
pack::index::write::types::{Cache, EntrySlice, Mode, ObjectKind},
pack::index::write::types::{EntrySlice, ObjectKind},
pack::index::write::Error,
zlib,
};
use git_features::progress::Progress;
use git_object::{owned, HashKind};
use std::{cell::RefCell, io};
use std::{cell::RefCell, collections::BTreeMap, io};

pub(crate) fn apply_deltas<F, P>(
mut nodes: Vec<pack::tree::Node<pack::index::write::types::TreeEntry>>,
nodes: Vec<pack::tree::Node<pack::index::write::types::TreeEntry>>,
(bytes_buf, progress): &mut (Vec<u8>, P),
mode: Mode,
resolve: F,
hash_kind: HashKind,
) -> Result<usize, Error>
where
F: for<'r> Fn(EntrySlice, &'r mut Vec<u8>) -> Option<()> + Send + Sync,
P: Progress,
{
let mut decompressed_bytes_by_pack_offset = BTreeMap::new();
let bytes_buf = RefCell::new(bytes_buf);
let mut num_objects = 0;
let decompress_from_cache = |cache: Cache, pack_offset: u64, entry_size: usize| -> Result<Vec<u8>, Error> {
Ok(match cache {
Cache::Unset => {
let mut bytes_buf = bytes_buf.borrow_mut();
bytes_buf.resize(entry_size, 0);
match mode {
Mode::ResolveDeltas | Mode::ResolveBasesAndDeltas => {
resolve(pack_offset..pack_offset + entry_size as u64, &mut bytes_buf)
.ok_or_else(|| Error::ConsumeResolveFailed(pack_offset))?;
let entry = pack::data::Entry::from_bytes(&bytes_buf, pack_offset);
decompress_all_at_once(
&bytes_buf[entry.header_size() as usize..],
entry.decompressed_size as usize,
)?
}
Mode::InMemory => unreachable!("BUG: If there is no cache, we always need a resolver"),
}
}
Cache::Decompressed(bytes) => bytes,
})
let decompress_from_resolver = |pack_offset: u64, entry_size: usize| -> Result<Vec<u8>, Error> {
let mut bytes_buf = bytes_buf.borrow_mut();
bytes_buf.resize(entry_size, 0);
resolve(pack_offset..pack_offset + entry_size as u64, &mut bytes_buf)
.ok_or_else(|| Error::ConsumeResolveFailed(pack_offset))?;
let entry = pack::data::Entry::from_bytes(&bytes_buf, pack_offset);
decompress_all_at_once(
&bytes_buf[entry.header_size() as usize..],
entry.decompressed_size as usize,
)
};

// Traverse the tree breadth first and loose the data produced for the base as it won't be needed anymore.
progress.init(None, Some("objects"));

// each node is a base, and its children always start out as deltas which become a base after applying them.
// These will be pushed onto our stack until all are processed
while let Some(mut base) = nodes.pop() {
let base_bytes = decompress_from_cache(
std::mem::take(&mut base.data.cache),
base.data.pack_offset,
base.data.entry_len,
)?;
let root_level = 0;
let mut nodes: Vec<_> = nodes.into_iter().map(|n| (root_level, n)).collect();
while let Some((level, mut base)) = nodes.pop() {
let base_bytes = if level == root_level {
decompress_from_resolver(base.data.pack_offset, base.data.entry_len)?
} else {
decompressed_bytes_by_pack_offset
.remove(&base.data.pack_offset)
.expect("we store the resolved delta buffer when done")
};
let base_kind = base.data.kind.to_kind().expect("base object as source of iteration");
let id = compute_hash(base_kind, &base_bytes, hash_kind);
num_objects += 1;

base.data.id = id;
for mut child in base.store_changes_then_into_child_iter() {
let delta_bytes = decompress_from_cache(
std::mem::take(&mut child.data.cache),
child.data.pack_offset,
child.data.entry_len,
)?;
let delta_bytes = decompress_from_resolver(child.data.pack_offset, child.data.entry_len)?;
let (base_size, consumed) = pack::data::decode::delta_header_size_ofs(&delta_bytes);
let mut header_ofs = consumed;
assert_eq!(
Expand All @@ -79,9 +69,9 @@ where
fully_resolved_delta_bytes.resize(result_size as usize, 0);
pack::data::decode::apply_delta(&base_bytes, &mut fully_resolved_delta_bytes, &delta_bytes[header_ofs..]);

child.data.cache = Cache::Decompressed(fully_resolved_delta_bytes.to_owned());
decompressed_bytes_by_pack_offset.insert(child.data.pack_offset, fully_resolved_delta_bytes.to_owned());
child.data.kind = ObjectKind::Base(base_kind);
nodes.push(child);
nodes.push((level + 1, child));
}
}

Expand Down
7 changes: 2 additions & 5 deletions git-odb/src/pack/index/write/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ mod error;
pub use error::Error;

mod types;
pub use types::{EntrySlice, Mode, Outcome};
pub use types::{EntrySlice, Outcome};
use types::{ObjectKind, Reducer, TreeEntry};

mod consume;
Expand All @@ -22,7 +22,6 @@ impl pack::index::File {
pub fn write_data_iter_to_stream<F, F2, P>(
kind: pack::index::Kind,
make_resolver: F,
mode: Mode,
entries: impl Iterator<Item = Result<pack::data::iter::Entry, pack::data::iter::Error>>,
thread_limit: Option<usize>,
mut root_progress: P,
Expand Down Expand Up @@ -81,7 +80,6 @@ impl pack::index::File {
entry_len,
kind: ObjectKind::Base(header.to_kind().expect("a base object")),
crc32,
cache: mode.base_cache(decompressed),
},
)?;
}
Expand All @@ -98,7 +96,6 @@ impl pack::index::File {
entry_len,
kind: ObjectKind::OfsDelta,
crc32,
cache: mode.delta_cache(decompressed),
},
)?;
}
Expand Down Expand Up @@ -131,7 +128,7 @@ impl pack::index::File {
reduce_progress.lock().add_child(format!("thread {}", thread_index)),
)
},
|root_nodes, state| apply_deltas(root_nodes, state, mode, &resolver, kind.hash()),
|root_nodes, state| apply_deltas(root_nodes, state, &resolver, kind.hash()),
Reducer::new(num_objects, &reduce_progress),
)?;
let mut items = tree.into_items();
Expand Down
56 changes: 0 additions & 56 deletions git-odb/src/pack/index/write/types.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,6 @@
use crate::pack;
use git_features::{parallel, progress::Progress};
use git_object::owned;
use std::io;

#[derive(PartialEq, Eq, Debug, Hash, Ord, PartialOrd, Clone)]
#[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))]
Expand All @@ -12,17 +11,6 @@ pub struct Outcome {
pub num_objects: u32,
}

pub(crate) enum Cache {
Unset,
Decompressed(Vec<u8>),
}

impl Default for Cache {
fn default() -> Self {
Cache::Unset
}
}

#[derive(Clone)]
pub(crate) enum ObjectKind {
Base(git_object::Kind),
Expand All @@ -44,7 +32,6 @@ pub(crate) struct TreeEntry {
pub entry_len: usize,
pub kind: ObjectKind,
pub crc32: u32,
pub cache: Cache,
}

impl Default for TreeEntry {
Expand All @@ -55,55 +42,12 @@ impl Default for TreeEntry {
entry_len: 0,
kind: ObjectKind::OfsDelta,
crc32: 0,
cache: Cache::Unset,
}
}
}

pub type EntrySlice = std::ops::Range<u64>;

#[derive(PartialEq, Eq, Debug, Hash, Ord, PartialOrd, Clone, Copy)]
#[cfg_attr(feature = "serde1", derive(serde::Serialize, serde::Deserialize))]
pub enum Mode {
/// Base + deltas in memory, decompressed
InMemory,
/// Bases in memory, decompressed
ResolveDeltas,
ResolveBasesAndDeltas,
}

impl Mode {
pub(crate) fn base_cache(&self, decompressed: Vec<u8>) -> Cache {
match self {
Mode::InMemory | Mode::ResolveDeltas => Cache::Decompressed(decompressed),
Mode::ResolveBasesAndDeltas => Cache::Unset,
}
}
pub(crate) fn delta_cache(&self, decompressed: Vec<u8>) -> Cache {
match self {
Mode::InMemory => Cache::Decompressed(decompressed),
Mode::ResolveDeltas | Mode::ResolveBasesAndDeltas => Cache::Unset,
}
}
pub(crate) fn is_in_memory(&self) -> bool {
match self {
Mode::InMemory => true,
Mode::ResolveDeltas | Mode::ResolveBasesAndDeltas => false,
}
}
}

pub type ResolverFn = fn(EntrySlice, &mut Vec<u8>) -> Option<()>;

impl Mode {
pub fn noop_resolver() -> io::Result<ResolverFn> {
fn noop(_: EntrySlice, _: &mut Vec<u8>) -> Option<()> {
None
};
Ok(noop)
}
}

pub(crate) struct Reducer<'a, P> {
item_count: usize,
progress: &'a parking_lot::Mutex<P>,
Expand Down
17 changes: 1 addition & 16 deletions git-odb/tests/pack/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -85,20 +85,7 @@ mod method {
.map(|slice| out.copy_from_slice(slice))
}
};
assert_index_write(
mode,
index_path,
data_path,
resolve,
pack::index::write::Mode::ResolveBasesAndDeltas,
)?;
assert_index_write(
mode,
index_path,
data_path,
pack::index::write::Mode::noop_resolver()?,
pack::index::write::Mode::InMemory,
)?;
assert_index_write(mode, index_path, data_path, resolve)?;
}
}
Ok(())
Expand All @@ -109,7 +96,6 @@ mod method {
index_path: &&str,
data_path: &&str,
resolve: F,
memory_mode: pack::index::write::Mode,
) -> Result<(), Box<dyn std::error::Error>>
where
F: Fn(pack::index::write::EntrySlice, &mut Vec<u8>) -> Option<()> + Send + Sync,
Expand All @@ -123,7 +109,6 @@ mod method {
let outcome = pack::index::File::write_data_iter_to_stream(
desired_kind,
|| Ok(resolve),
memory_mode,
pack_iter,
None,
progress::Discard,
Expand Down

0 comments on commit 8812e91

Please sign in to comment.