Skip to content
This repository has been archived by the owner on Jun 7, 2022. It is now read-only.

Commit

Permalink
Merge pull request #192 from darrenldl/dev
Browse files Browse the repository at this point in the history
Changed from using block predicates to using equivalent header predicates
  • Loading branch information
darrenldl committed May 7, 2019
2 parents 6b63f2e + 6f036ef commit a4ab24f
Show file tree
Hide file tree
Showing 12 changed files with 97 additions and 59 deletions.
20 changes: 11 additions & 9 deletions src/block_preds.rs
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
macro_rules! block_pred_same_ver_uid {
(
$block:expr
) => {{
let version = $block.get_version();
let uid = $block.get_uid();
move |block: &Block| -> bool { block.get_version() == version && block.get_uid() == uid }
}};
}
// macro_rules! block_pred_same_ver_uid {
// (
// $block:expr
// ) => {{
// use sbx_block::Block;

// let version = $block.get_version();
// let uid = $block.get_uid();
// move |block: &Block| -> bool { block.get_version() == version && block.get_uid() == uid }
// }};
// }
42 changes: 22 additions & 20 deletions src/block_utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ pub fn read_block_lazily(
});
}

match block.sync_from_buffer(&buffer[0..block_size], None) {
match block.sync_from_buffer(&buffer[0..block_size], None, None) {
Ok(()) => {}
Err(_) => {
return Ok(LazyReadResult {
Expand Down Expand Up @@ -366,7 +366,7 @@ pub fn guess_burst_err_resistance_level(

let mut blocks_processed = 0;

let pred = block_pred_same_ver_uid!(ref_block);
let header_pred = header_pred_same_ver_uid!(ref_block);

reader.seek(SeekFrom::Start(from_pos))?;

Expand All @@ -383,7 +383,8 @@ pub fn guess_burst_err_resistance_level(
break;
}

seq_nums[blocks_processed] = match block.sync_from_buffer(&buffer, Some(&pred)) {
seq_nums[blocks_processed] = match block.sync_from_buffer(&buffer, Some(&header_pred), None)
{
Ok(()) => Some(block.get_seq_num()),
Err(_) => None,
};
Expand Down Expand Up @@ -473,7 +474,7 @@ pub fn guess_starting_block_index(

let mut block_index_count: HashMap<u64, usize, _> = HashMap::with_capacity(block_indices.len());

let pred = block_pred_same_ver_uid!(ref_block);
let header_pred = header_pred_same_ver_uid!(ref_block);

reader.seek(SeekFrom::Start(from_pos))?;

Expand All @@ -491,26 +492,27 @@ pub fn guess_starting_block_index(
break;
}

block_indices[blocks_processed] = match block.sync_from_buffer(&buffer, Some(&pred)) {
Ok(()) => {
if block.is_meta() {
None
} else {
let block_index = sbx_block::calc_data_block_write_index(
block.get_seq_num(),
Some(true),
data_par_burst,
);

if block_index < blocks_processed as u64 {
block_indices[blocks_processed] =
match block.sync_from_buffer(&buffer, Some(&header_pred), None) {
Ok(()) => {
if block.is_meta() {
None
} else {
Some(block_index - blocks_processed as u64)
let block_index = sbx_block::calc_data_block_write_index(
block.get_seq_num(),
Some(true),
data_par_burst,
);

if block_index < blocks_processed as u64 {
None
} else {
Some(block_index - blocks_processed as u64)
}
}
}
}
Err(_) => None,
};
Err(_) => None,
};

blocks_processed += 1;
}
Expand Down
4 changes: 3 additions & 1 deletion src/check_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,8 @@ pub fn check_file(param: &Param) -> Result<Option<Stats>, Error> {
let mut block_pos: u64;
let mut bytes_processed: u64 = 0;

let header_pred = header_pred_same_ver_uid!(ref_block);

reporter.start();

// seek to calculated position
Expand All @@ -240,7 +242,7 @@ pub fn check_file(param: &Param) -> Result<Option<Stats>, Error> {

break_if_eof_seen!(read_res);

match block.sync_from_buffer(&buffer, None) {
match block.sync_from_buffer(&buffer, Some(&header_pred), None) {
Ok(_) => match block.block_type() {
BlockType::Meta => {
stats.meta_or_par_blocks_decoded += 1;
Expand Down
17 changes: 9 additions & 8 deletions src/decode_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -680,7 +680,7 @@ pub fn decode(

let version = ref_block.get_version();

let pred = block_pred_same_ver_uid!(ref_block);
let header_pred = header_pred_same_ver_uid!(ref_block);

// calulate length to read and position to seek to
let RequiredLenAndSeekTo {
Expand Down Expand Up @@ -738,7 +738,7 @@ pub fn decode(

break_if_eof_seen!(read_res);

if let Err(_) = block.sync_from_buffer(&buffer, Some(&pred)) {
if let Err(_) = block.sync_from_buffer(&buffer, Some(&header_pred), None) {
stats.incre_blocks_failed();
continue;
}
Expand Down Expand Up @@ -877,7 +877,7 @@ pub fn decode(
reader.read(sbx_block::slice_buf_mut(version, &mut buffer))?;

let decode_successful = !read_res.eof_seen
&& match block.sync_from_buffer(&buffer, Some(&pred)) {
&& match block.sync_from_buffer(&buffer, Some(&header_pred), None) {
Ok(()) => true,
Err(_) => false,
};
Expand Down Expand Up @@ -912,7 +912,7 @@ pub fn decode(
))?;

let decode_successful = !read_res.eof_seen
&& match block.sync_from_buffer(&buffer, Some(&pred)) {
&& match block.sync_from_buffer(&buffer, Some(&header_pred), None) {
Ok(_) => block.get_seq_num() == seq_num,
_ => false,
};
Expand Down Expand Up @@ -997,10 +997,11 @@ pub fn decode(
data_par_burst,
);

let block_okay = match block.sync_from_buffer(&buffer, Some(&pred)) {
Ok(_) => block.get_seq_num() == seq_num,
Err(_) => false,
};
let block_okay =
match block.sync_from_buffer(&buffer, Some(&header_pred), None) {
Ok(_) => block.get_seq_num() == seq_num,
Err(_) => false,
};

if block_okay {
let block_seq_num = block.get_seq_num();
Expand Down
11 changes: 11 additions & 0 deletions src/header_preds.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,11 @@
macro_rules! header_pred_same_ver_uid {
(
$block:expr
) => {{
use sbx_block::Header;

let version = $block.get_version();
let uid = $block.get_uid();
move |header: &Header| -> bool { header.version == version && header.uid == uid }
}};
}
3 changes: 3 additions & 0 deletions src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,9 @@ mod misc_macros;
#[macro_use]
mod cli_macros;

#[macro_use]
mod header_preds;

#[macro_use]
mod block_preds;

Expand Down
13 changes: 8 additions & 5 deletions src/repair_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ use crate::sbx_specs::Version;

use crate::sbx_block;
use crate::sbx_block::Block;
use crate::sbx_block::Header;
use crate::sbx_specs::SBX_LARGEST_BLOCK_SIZE;
use crate::sbx_specs::{ver_to_block_size, ver_to_usize};

Expand Down Expand Up @@ -171,7 +172,7 @@ impl Param {

fn update_rs_codec_and_stats(
version: Version,
pred: &Fn(&Block) -> bool,
header_pred: &Fn(&Header) -> bool,
read_res: &ReadResult,
block: &mut Block,
cur_seq_num: u32,
Expand All @@ -184,7 +185,9 @@ fn update_rs_codec_and_stats(
// read an incomplete block
stats.blocks_decode_failed += 1;
rs_codec.mark_missing()
} else if let Err(_) = block.sync_from_buffer(rs_codec.get_block_buffer(), Some(pred)) {
} else if let Err(_) =
block.sync_from_buffer(rs_codec.get_block_buffer(), Some(header_pred), None)
{
stats.blocks_decode_failed += 1;
rs_codec.mark_missing()
} else {
Expand Down Expand Up @@ -307,7 +310,7 @@ pub fn repair_file(param: &Param) -> Result<Option<Stats>, Error> {
param.json_printer.json_enabled(),
));

let pred = block_pred_same_ver_uid!(ref_block);
let header_pred = header_pred_same_ver_uid!(ref_block);

let mut rs_codec = RSRepairer::new(
&param.json_printer,
Expand Down Expand Up @@ -335,7 +338,7 @@ pub fn repair_file(param: &Param) -> Result<Option<Stats>, Error> {
let read_res = reader.read(sbx_block::slice_buf_mut(version, &mut buffer))?;

let block_broken = read_res.eof_seen
|| match block.sync_from_buffer(&buffer, Some(&pred)) {
|| match block.sync_from_buffer(&buffer, Some(&header_pred), None) {
Ok(()) => false,
Err(_) => true,
};
Expand Down Expand Up @@ -396,7 +399,7 @@ pub fn repair_file(param: &Param) -> Result<Option<Stats>, Error> {

let codec_state = update_rs_codec_and_stats(
version,
&pred,
&header_pred,
&read_res,
&mut block,
seq_num,
Expand Down
8 changes: 4 additions & 4 deletions src/rs_codec/repairer_tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -122,7 +122,7 @@ fn test_repairer_repair_properly_simple_cases() {
assert_eq!(pos, blocks[0].0);
let mut block = Block::dummy();

block.sync_from_buffer(blocks[0].1, None).unwrap();
block.sync_from_buffer(blocks[0].1, None, None).unwrap();
assert_eq!(1 + 0, block.get_seq_num());
assert_eq!([0; 6], block.get_uid());
assert_eq!(Version::V17, block.get_version());
Expand All @@ -133,7 +133,7 @@ fn test_repairer_repair_properly_simple_cases() {
assert_eq!(pos, blocks[1].0);
let mut block = Block::dummy();

block.sync_from_buffer(blocks[1].1, None).unwrap();
block.sync_from_buffer(blocks[1].1, None, None).unwrap();
assert_eq!(1 + 5, block.get_seq_num());
assert_eq!([0; 6], block.get_uid());
assert_eq!(Version::V17, block.get_version());
Expand All @@ -144,7 +144,7 @@ fn test_repairer_repair_properly_simple_cases() {
assert_eq!(pos, blocks[2].0);
let mut block = Block::dummy();

block.sync_from_buffer(blocks[2].1, None).unwrap();
block.sync_from_buffer(blocks[2].1, None, None).unwrap();
assert_eq!(1 + 11, block.get_seq_num());
assert_eq!([0; 6], block.get_uid());
assert_eq!(Version::V17, block.get_version());
Expand Down Expand Up @@ -372,7 +372,7 @@ quickcheck! {

let mut block = Block::dummy();

block.sync_from_buffer(b, None).unwrap();
block.sync_from_buffer(b, None, None).unwrap();
if !(block.get_seq_num() == start_seq_num + corrupt_pos_s[i] as u32
&& block.get_uid() == uid
&& block.get_version() == version) {
Expand Down
12 changes: 9 additions & 3 deletions src/sbx_block/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@ mod metadata;
mod metadata_tests;
mod tests;

use self::header::Header;
pub use self::header::Header;
pub use self::metadata::make_distribution_string;
pub use self::metadata::make_too_much_meta_err_string;
pub use self::metadata::Metadata;
Expand Down Expand Up @@ -146,7 +146,6 @@ macro_rules! check_buffer {
) => {
if $buf.len() < block_size!($self) {
panic!("Insufficient buffer size");
//return Err(Error::InsufficientBufferSize);
}
};
}
Expand Down Expand Up @@ -205,7 +204,7 @@ pub fn slice_data_buf_mut(version: Version, buffer: &mut [u8]) -> &mut [u8] {
pub fn check_if_buffer_valid(buffer: &[u8]) -> bool {
let mut block = Block::new(Version::V1, b"\x00\x00\x00\x00\x00\x00", BlockType::Data);

match block.sync_from_buffer(buffer, None) {
match block.sync_from_buffer(buffer, None, None) {
Ok(()) => {}
Err(_) => {
return false;
Expand Down Expand Up @@ -805,10 +804,17 @@ impl Block {
pub fn sync_from_buffer(
&mut self,
buffer: &[u8],
header_pred: Option<&Fn(&Header) -> bool>,
pred: Option<&Fn(&Block) -> bool>,
) -> Result<(), Error> {
self.sync_from_buffer_header_only(buffer)?;

if let Some(pred) = header_pred {
if !pred(&self.header) {
return Err(Error::FailedPred);
}
}

check_buffer!(self, buffer);

self.enforce_crc(buffer)?;
Expand Down
6 changes: 3 additions & 3 deletions src/sbx_block/tests.rs
Original file line number Diff line number Diff line change
Expand Up @@ -518,7 +518,7 @@ fn test_sync_from_buffer_simple_cases() {

let mut block = Block::new(sbx_specs::Version::V1, &[0; 6], BlockType::Meta);

block.sync_from_buffer(template, None).unwrap();
block.sync_from_buffer(template, None, None).unwrap();

assert_eq!(BlockType::Data, block.block_type());
assert!(block.is_data());
Expand All @@ -529,7 +529,7 @@ fn test_sync_from_buffer_simple_cases() {

let mut block = Block::new(sbx_specs::Version::V1, &[0; 6], BlockType::Data);

block.sync_from_buffer(template, None).unwrap();
block.sync_from_buffer(template, None, None).unwrap();

assert_eq!(BlockType::Meta, block.block_type());
assert!(!block.is_data());
Expand All @@ -539,7 +539,7 @@ fn test_sync_from_buffer_simple_cases() {
let template : &[u8; 512] = b"SBx\x01\x33\x3B\x03\x03\x03\x03\x03\x03\x00\x00\x00\x00\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A\x1A";
let mut block = Block::new(sbx_specs::Version::V1, &[0; 6], BlockType::Data);

block.sync_from_buffer(template, None).unwrap();
block.sync_from_buffer(template, None, None).unwrap();

assert_eq!(BlockType::Meta, block.block_type());
assert!(!block.is_data());
Expand Down
16 changes: 12 additions & 4 deletions src/sort_core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ pub fn sort_file(param: &Param) -> Result<Option<Stats>, Error> {

let mut meta_written = false;

let pred = block_pred_same_ver_uid!(ref_block);
let header_pred = header_pred_same_ver_uid!(ref_block);

// calulate length to read and position to seek to
let RequiredLenAndSeekTo {
Expand Down Expand Up @@ -315,7 +315,7 @@ pub fn sort_file(param: &Param) -> Result<Option<Stats>, Error> {

break_if_eof_seen!(read_res);

if let Err(_) = block.sync_from_buffer(&buffer, Some(&pred)) {
if let Err(_) = block.sync_from_buffer(&buffer, Some(&header_pred), None) {
// only consider it failed if the buffer is not completely blank
// unless report blank is true
if misc_utils::buffer_is_blank(sbx_block::slice_buf(ref_block.get_version(), &buffer)) {
Expand Down Expand Up @@ -351,7 +351,11 @@ pub fn sort_file(param: &Param) -> Result<Option<Stats>, Error> {
read_res.eof_seen || {
// if block at output position is a valid metadata block,
// then don't overwrite
match check_block.sync_from_buffer(&check_buffer, Some(&pred)) {
match check_block.sync_from_buffer(
&check_buffer,
Some(&header_pred),
None,
) {
Ok(()) => check_block.get_seq_num() != 0,
Err(_) => true,
}
Expand Down Expand Up @@ -414,7 +418,11 @@ pub fn sort_file(param: &Param) -> Result<Option<Stats>, Error> {
read_res.eof_seen || {
// if block at output position is a valid block and has same seq number,
// then don't overwrite
match check_block.sync_from_buffer(&check_buffer, Some(&pred)) {
match check_block.sync_from_buffer(
&check_buffer,
Some(&header_pred),
None,
) {
Ok(()) => check_block.get_seq_num() != block.get_seq_num(),
Err(_) => true,
}
Expand Down

0 comments on commit a4ab24f

Please sign in to comment.