Skip to content

Commit

Permalink
Split DatFile into multiple structs with specific purposes.
Browse files Browse the repository at this point in the history
Now, instead of a general purpose DatFile that both reads and writes,
there is DatFileReader and DatFileWriter. DatFile itself is now private
and external consumers of this module only interact with the type
necessary for the work their doing.

Butterfly now owns a DatFileWriter, since that is primary what it does
with this file. It creates a DatFileReader at startup to ingest existing
rumors but doesn't hold onto this struct.

rst-reader uses a DatFileReader, since that matches its reason for
existing.

Using this technique, only DatFileReader needs to hold onto the
BufReader, which avoids the problem of atomically writing the file.

Signed-off-by: Josh Black <raskchanky@gmail.com>
  • Loading branch information
raskchanky committed Jul 2, 2019
1 parent bdcb816 commit 79e10ee
Show file tree
Hide file tree
Showing 3 changed files with 179 additions and 164 deletions.
282 changes: 149 additions & 133 deletions components/butterfly/src/rumor/dat_file.rs
Expand Up @@ -56,205 +56,163 @@ const HEADER_VERSION_2_SIZE: usize =
/// * Header Body - Variable bytes - see Header
/// * Rumors - Variable bytes
#[derive(Debug)]
pub struct DatFile {
struct DatFile {
header: Header,
path: PathBuf,
}

impl DatFile {
#[derive(Debug)]
pub struct DatFileReader {
dat_file: DatFile,
reader: BufReader<File>,
}

#[derive(Debug)]
pub struct DatFileWriter(DatFile);

impl DatFileReader {
/// # Locking
/// * `MemberList::entries` (read) This method must not be called while any MemberList::entries
/// lock is held.
pub fn read_or_create_mlr(data_path: PathBuf, server: &Server) -> Result<Self> {
let size = {
let file = OpenOptions::new().create(true)
.read(true)
.write(true)
.open(&data_path)
.map_err(|err| Error::DatFileIO(data_path.clone(), err))?;
file.metadata()
.map_err(|err| Error::DatFileIO(data_path.clone(), err))?
.len()
};
let mut dat_file = DatFile { path: data_path,
header: Header::default(), };
let size = OpenOptions::new().create(true)
.read(true)
.write(true)
.open(&data_path)
.map_err(|err| Error::DatFileIO(data_path.clone(), err))?
.metadata()
.map_err(|err| Error::DatFileIO(data_path.clone(), err))?
.len();

let dat_file = DatFile { path: data_path.clone(),
header: Header::default(), };

if size == 0 {
dat_file.write_mlr(server)?;
} else {
dat_file.read_header()?;
DatFileWriter::new(data_path.clone()).write_mlr(server)?;
}

Ok(dat_file)
Self::reader_creation(dat_file)
}

pub fn read(data_path: &Path) -> Result<Self> {
let mut dat_file = DatFile { header: Default::default(),
path: data_path.to_path_buf(), };

dat_file.read_header()?;
Ok(dat_file)
}

pub fn path(&self) -> &Path { &self.path }

pub fn reader(&self) -> Result<BufReader<File>> {
let file = File::open(&self.path)?;
let mut reader = BufReader::new(file);
reader.seek(SeekFrom::Start(self.header.header_offset()))
.map_err(|err| Error::DatFileIO(self.path.clone(), err))?;
Ok(reader)
}

fn read_header(&mut self) -> Result<()> {
let mut version = [0; 1];
let file = File::open(&self.path)?;
let mut reader = BufReader::new(file);

reader.read_exact(&mut version)
.map_err(|err| Error::DatFileIO(self.path.clone(), err))?;
debug!("Header Version: {}", version[0]);

// If this has happened, it's likely that the file is corrupt
if version[0] > HEADER_VERSION {
let msg = format!("Unable to read Dat File {}: corrupt file header.",
self.path.display());
let err = io::Error::new(io::ErrorKind::InvalidData, msg);
return Err(Error::DatFileIO(self.path.clone(), err));
}

let real_header =
Header::from_file(&mut reader, version[0]).map_err(|err| {
Error::DatFileIO(self.path.clone(), err)
})?;
self.header = real_header;
debug!("Header: {:?}", self.header);

Ok(())
}

fn read_and_process<F>(&mut self,
reader: &mut BufReader<File>,
offset: u64,
mut op: F)
-> Result<()>
where F: FnMut(&mut Vec<u8>) -> Result<()>
{
let mut bytes_read = 0;
let mut size_buf = [0; 8];
let mut rumor_buf: Vec<u8> = vec![];

loop {
if bytes_read >= offset {
break;
}

reader.read_exact(&mut size_buf)
.map_err(|err| Error::DatFileIO(self.path.clone(), err))?;
let rumor_size = LittleEndian::read_u64(&size_buf);
rumor_buf.resize(rumor_size as usize, 0);
reader.read_exact(&mut rumor_buf)
.map_err(|err| Error::DatFileIO(self.path.clone(), err))?;
bytes_read += size_buf.len() as u64 + rumor_size;
op(&mut rumor_buf)?;
}

Ok(())
let dat_file = DatFile { header: Default::default(),
path: data_path.to_path_buf(), };
Self::reader_creation(dat_file)
}

pub fn read_rumors<T>(&mut self, reader: &mut BufReader<File>) -> Result<Vec<T>>
where T: Message<newscast::Rumor>
{
let mut rumors = Vec::new();

if let Some(offset) = self.header.offset_for_rumor(T::MESSAGE_ID) {
self.read_and_process(reader, offset, |r| {
rumors.push(T::from_bytes(&r)?);
Ok(())
})?;
}

Ok(rumors)
fn reader_creation(mut dat_file: DatFile) -> Result<Self> {
let mut reader = BufReader::new(File::open(&dat_file.path)?);
dat_file.read_header(&mut reader)?;
let dat_file_reader = DatFileReader { dat_file, reader };
Ok(dat_file_reader)
}

pub fn read_members(&mut self, reader: &mut BufReader<File>) -> Result<Vec<Membership>> {
let mut members = Vec::new();

if let Some(offset) = self.header.member_offset() {
self.read_and_process(reader, offset, |r| {
members.push(Membership::from_bytes(&r)?);
Ok(())
})?;
}

Ok(members)
}
pub fn path(&self) -> &Path { &self.dat_file.path }

/// # Locking
/// * `MemberList::entries` (write) This method must not be called while any MemberList::entries
/// lock is held.
pub fn read_into_mlw(&mut self, server: &Server) -> Result<()> {
let mut reader = self.reader()?;

// Remove this once https://github.com/rust-lang/rust-clippy/issues/4133 is resolved
#[allow(clippy::identity_conversion)]
for Membership { member, health } in self.read_members(&mut reader)? {
for Membership { member, health } in self.read_members()? {
server.insert_member_mlw(member, health);
}

// Remove this once https://github.com/rust-lang/rust-clippy/issues/4133 is resolved
#[allow(clippy::identity_conversion)]
for service in self.read_rumors::<Service>(&mut reader)? {
for service in self.read_rumors::<Service>()? {
server.insert_service_mlw(service);
}

// Remove this once https://github.com/rust-lang/rust-clippy/issues/4133 is resolved
#[allow(clippy::identity_conversion)]
for service_config in self.read_rumors::<ServiceConfig>(&mut reader)? {
for service_config in self.read_rumors::<ServiceConfig>()? {
server.insert_service_config(service_config);
}

// Remove this once https://github.com/rust-lang/rust-clippy/issues/4133 is resolved
#[allow(clippy::identity_conversion)]
for service_file in self.read_rumors::<ServiceFile>(&mut reader)? {
for service_file in self.read_rumors::<ServiceFile>()? {
server.insert_service_file(service_file);
}

// Remove this once https://github.com/rust-lang/rust-clippy/issues/4133 is resolved
#[allow(clippy::identity_conversion)]
for election in self.read_rumors::<Election>(&mut reader)? {
for election in self.read_rumors::<Election>()? {
server.insert_election_mlr(election);
}

// Remove this once https://github.com/rust-lang/rust-clippy/issues/4133 is resolved
#[allow(clippy::identity_conversion)]
for update_election in self.read_rumors::<ElectionUpdate>(&mut reader)? {
for update_election in self.read_rumors::<ElectionUpdate>()? {
server.insert_update_election_mlr(update_election);
}

// Remove this once https://github.com/rust-lang/rust-clippy/issues/4133 is resolved
#[allow(clippy::identity_conversion)]
for departure in self.read_rumors::<Departure>(&mut reader)? {
for departure in self.read_rumors::<Departure>()? {
server.insert_departure_mlw(departure);
}

Ok(())
}

pub fn read_rumors<T>(&mut self) -> Result<Vec<T>>
where T: Message<newscast::Rumor>
{
let mut rumors = Vec::new();

if let Some(offset) = self.dat_file.header.offset_for_rumor(T::MESSAGE_ID) {
self.dat_file
.read_and_process(&mut self.reader, offset, |r| {
rumors.push(T::from_bytes(&r)?);
Ok(())
})?;
}

Ok(rumors)
}

pub fn read_members(&mut self) -> Result<Vec<Membership>> {
let mut members = Vec::new();

if let Some(offset) = self.dat_file.header.member_offset() {
self.dat_file
.read_and_process(&mut self.reader, offset, |r| {
members.push(Membership::from_bytes(&r)?);
Ok(())
})?;
}

Ok(members)
}
}

impl DatFileWriter {
pub fn new(data_path: PathBuf) -> Self {
let dat_file = DatFile { path: data_path,
header: Header::default(), };
DatFileWriter(dat_file)
}

pub fn path(&self) -> &Path { &self.0.path }

/// # Locking
/// * `MemberList::entries` (read) This method must not be called while any MemberList::entries
/// lock is held.
pub fn write_mlr(&self, server: &Server) -> Result<usize> {
let mut header = Header::default();
let w =
AtomicWriter::new(&self.path).map_err(|err| Error::DatFileIO(self.path.clone(), err))?;
let w = AtomicWriter::new(&self.0.path).map_err(|err| {
Error::DatFileIO(self.0.path.clone(), err)
})?;
w.with_writer(|mut f| {
let mut writer = BufWriter::new(&mut f);
let header_reserve = vec![0; HEADER_VERSION_2_SIZE];
writer.write(&[HEADER_VERSION])
.map_err(|err| Error::DatFileIO(self.path.clone(), err))?;
.map_err(|err| Error::DatFileIO(self.0.path.clone(), err))?;
writer.write(&header_reserve)
.map_err(|err| Error::DatFileIO(self.path.clone(), err))?;
.map_err(|err| Error::DatFileIO(self.0.path.clone(), err))?;
header.insert_member_offset(self.write_member_list_mlr(&mut writer,
&server.member_list)?);
header.insert_offset_for_rumor(Service::MESSAGE_ID,
Expand Down Expand Up @@ -282,7 +240,7 @@ impl DatFile {
})
.map_err(|err| {
match err {
Error::UnknownIOError(e) => Error::DatFileIO(self.path.clone(), e),
Error::UnknownIOError(e) => Error::DatFileIO(self.0.path.clone(), e),
e => e,
}
})
Expand All @@ -293,7 +251,7 @@ impl DatFile {
{
let bytes = header.write_to_bytes();
let total = writer.write(&bytes)
.map_err(|err| Error::DatFileIO(self.path.clone(), err))?;
.map_err(|err| Error::DatFileIO(self.0.path.clone(), err))?;
Ok(total)
}

Expand All @@ -319,10 +277,10 @@ impl DatFile {
let bytes = membership.clone().write_to_bytes().unwrap();
LittleEndian::write_u64(&mut len_buf, bytes.len() as u64);
total += writer.write(&len_buf)
.map_err(|err| Error::DatFileIO(self.path.clone(), err))?
.map_err(|err| Error::DatFileIO(self.0.path.clone(), err))?
as u64;
total += writer.write(&bytes)
.map_err(|err| Error::DatFileIO(self.path.clone(), err))?
.map_err(|err| Error::DatFileIO(self.0.path.clone(), err))?
as u64;
Ok(total)
}
Expand Down Expand Up @@ -353,15 +311,73 @@ impl DatFile {
let bytes = rumor.write_to_bytes().unwrap();
LittleEndian::write_u64(&mut rumor_len, bytes.len() as u64);
total += writer.write(&rumor_len)
.map_err(|err| Error::DatFileIO(self.path.clone(), err))?
.map_err(|err| Error::DatFileIO(self.0.path.clone(), err))?
as u64;
total += writer.write(&bytes)
.map_err(|err| Error::DatFileIO(self.path.clone(), err))?
.map_err(|err| Error::DatFileIO(self.0.path.clone(), err))?
as u64;
Ok(total)
}
}

impl DatFile {
fn read_header(&mut self, reader: &mut BufReader<File>) -> Result<()> {
let mut version = [0; 1];

reader.read_exact(&mut version)
.map_err(|err| Error::DatFileIO(self.path.clone(), err))?;
debug!("Header Version: {}", version[0]);

// If this has happened, it's likely that the file is corrupt
if version[0] > HEADER_VERSION {
let msg = format!("Unable to read Dat File {}: corrupt file header.",
self.path.display());
let err = io::Error::new(io::ErrorKind::InvalidData, msg);
return Err(Error::DatFileIO(self.path.clone(), err));
}

let real_header =
Header::from_file(reader, version[0]).map_err(|err| {
Error::DatFileIO(self.path.clone(), err)
})?;
self.header = real_header;
debug!("Header: {:?}", self.header);

reader.seek(SeekFrom::Start(self.header.header_offset()))
.map_err(|err| Error::DatFileIO(self.path.clone(), err))?;
Ok(())
}

fn read_and_process<F>(&mut self,
reader: &mut BufReader<File>,
offset: u64,
mut op: F)
-> Result<()>
where F: FnMut(&mut Vec<u8>) -> Result<()>
{
let mut bytes_read = 0;
let mut size_buf = [0; 8];
let mut rumor_buf: Vec<u8> = vec![];

loop {
if bytes_read >= offset {
break;
}

reader.read_exact(&mut size_buf)
.map_err(|err| Error::DatFileIO(self.path.clone(), err))?;
let rumor_size = LittleEndian::read_u64(&size_buf);
rumor_buf.resize(rumor_size as usize, 0);
reader.read_exact(&mut rumor_buf)
.map_err(|err| Error::DatFileIO(self.path.clone(), err))?;
bytes_read += size_buf.len() as u64 + rumor_size;
op(&mut rumor_buf)?;
}

Ok(())
}
}

/// Describes contents and structure of dat file.
///
/// The information in this header is used to enable IO seeking operations on a binary dat
Expand Down

0 comments on commit 79e10ee

Please sign in to comment.