Skip to content

Commit

Permalink
Add a streaming writer
Browse files Browse the repository at this point in the history
Fixes #75
  • Loading branch information
birktj committed Apr 19, 2019
1 parent d6653b9 commit ca2f2ad
Show file tree
Hide file tree
Showing 2 changed files with 187 additions and 3 deletions.
188 changes: 186 additions & 2 deletions src/encoder.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3,7 +3,7 @@ extern crate deflate;
use std::borrow::Cow;
use std::error;
use std::fmt;
use std::io::{self, Write};
use std::io::{self, Read, Write};
use std::mem;
use std::result;

Expand Down Expand Up @@ -153,6 +153,23 @@ impl<W: Write> Writer<W> {
}
self.write_chunk(chunk::IDAT, &zlib.finish()?)
}

/// Create an stream writer.
///
/// This allows you create images that do not fit
/// in memory. The default chunk size is 4K, use
/// `stream_writer_with_size` to set another chuck
/// size.
pub fn stream_writer(&mut self) -> StreamWriter<W> {
self.stream_writer_with_size(4 * 1024)
}

/// Create a stream writer with custom buffer size.
///
/// See `stream_writer`
pub fn stream_writer_with_size(&mut self, size: usize) -> StreamWriter<W> {
StreamWriter::new(self, size)
}
}

impl<W: Write> Drop for Writer<W> {
Expand All @@ -161,6 +178,126 @@ impl<W: Write> Drop for Writer<W> {
}
}

struct ChunkWriter<'a, W: Write> {
writer: &'a mut Writer<W>,
buffer: Vec<u8>,
index: usize,
}

impl<'a, W: Write> ChunkWriter<'a, W> {
fn new(writer: &'a mut Writer<W>, buf_len: usize) -> ChunkWriter<'a, W> {
ChunkWriter {
writer,
buffer: vec![0; buf_len],
index: 0,
}
}
}

impl<'a, W: Write> Write for ChunkWriter<'a, W> {
fn write(&mut self, mut buf: &[u8]) -> io::Result<usize> {
let written = buf.read(&mut self.buffer[self.index..])?;
self.index += written;

if self.index + 1 >= self.buffer.len() {
self.writer.write_chunk(chunk::IDAT, &self.buffer)?;
self.index = 0;
}

Ok(written)
}

fn flush(&mut self) -> io::Result<()> {
if self.index > 0 {
self.writer.write_chunk(chunk::IDAT, &self.buffer[..self.index+1])?;
}
self.index = 0;
Ok(())
}
}

impl<'a, W: Write> Drop for ChunkWriter<'a, W> {
fn drop(&mut self) {
let _ = self.flush();
}
}


/// Streaming png writer
///
/// This may may silently fail in the destructor so it is a good idea to call
/// `finish` or `flush` before droping.
pub struct StreamWriter<'a, W: Write> {
writer: deflate::write::ZlibEncoder<ChunkWriter<'a, W>>,
prev_buf: Vec<u8>,
curr_buf: Vec<u8>,
index: usize,
bpp: usize,
filter: FilterType,
}

impl<'a, W: Write> StreamWriter<'a, W> {
fn new(writer: &'a mut Writer<W>, buf_len: usize) -> StreamWriter<'a, W> {
let bpp = writer.info.bytes_per_pixel();
let in_len = writer.info.raw_row_length() - 1;
let filter = writer.info.filter;
let prev_buf = vec![0; in_len];
let curr_buf = vec![0; in_len];

let compression = writer.info.compression.clone();
let chunk_writer = ChunkWriter::new(writer, buf_len);
let zlib = deflate::write::ZlibEncoder::new(chunk_writer, compression);

StreamWriter {
writer: zlib,
index: 0,
prev_buf,
curr_buf,
bpp,
filter,
}
}

pub fn finish(mut self) -> Result<()> {
// TODO: call `writer.finish` somehow?
self.flush()?;
Ok(())
}
}

impl<'a, W: Write> Write for StreamWriter<'a, W> {
fn write(&mut self, mut buf: &[u8]) -> io::Result<usize> {
let written = buf.read(&mut self.curr_buf[self.index..])?;
self.index += written;

if self.index >= self.curr_buf.len() {
self.writer.write_all(&[self.filter as u8])?;
filter(self.filter, self.bpp, &self.prev_buf, &mut self.curr_buf);
self.writer.write_all(&self.curr_buf)?;
mem::swap(&mut self.prev_buf, &mut self.curr_buf);
self.index = 0;
}

Ok(written)
}

fn flush(&mut self) -> io::Result<()> {
self.writer.flush()?;
if self.index > 0 {
dbg!(self.index);
let message = format!("wrong data size, got {} bytes too many", self.index);
return Err(EncodingError::Format(message.into()).into());
}
Ok(())
}
}

impl<'a, W: Write> Drop for StreamWriter<'a, W> {
fn drop(&mut self) {
let _ = self.flush();
}
}

#[cfg(test)]
mod tests {
use super::*;
Expand Down Expand Up @@ -213,6 +350,53 @@ mod tests {
}
}

#[test]
fn roundtrip_stream() {
// More loops = more random testing, but also more test wait time
for _ in 0..10 {
for path in glob::glob("tests/pngsuite/*.png").unwrap().map(|r| r.unwrap()) {
if path.file_name().unwrap().to_str().unwrap().starts_with("x") {
// x* files are expected to fail to decode
continue;
}
// Decode image
let decoder = ::Decoder::new(File::open(path).unwrap());
let (info, mut reader) = decoder.read_info().unwrap();
if info.line_size != 32 {
// TODO encoding only works with line size 32?
continue;
}
let mut buf = vec![0; info.buffer_size()];
reader.next_frame(&mut buf).unwrap();
// Encode decoded image
let mut out = Vec::new();
{
let mut wrapper = RandomChunkWriter {
rng: self::rand::thread_rng(),
w: &mut out
};

let mut encoder = Encoder::new(&mut wrapper, info.width, info.height).write_header().unwrap();
let mut stream_writer = encoder.stream_writer();

let mut outer_wrapper = RandomChunkWriter {
rng: self::rand::thread_rng(),
w: &mut stream_writer
};

outer_wrapper.write_all(&buf).unwrap();
}
// Decode encoded decoded image
let decoder = ::Decoder::new(&*out);
let (info, mut reader) = decoder.read_info().unwrap();
let mut buf2 = vec![0; info.buffer_size()];
reader.next_frame(&mut buf2).unwrap();
// check if the encoded image is ok:
assert_eq!(buf, buf2);
}
}
}

#[test]
fn expect_error_on_wrong_image_len() -> Result<()> {
use std::io::Cursor;
Expand Down Expand Up @@ -254,4 +438,4 @@ mod tests {
}
}

}
}
2 changes: 1 addition & 1 deletion src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,7 +59,7 @@ mod utils;
pub use crate::common::*;
pub use crate::decoder::{Decoder, Reader, OutputInfo, StreamingDecoder, Decoded, DecodingError, Limits};
#[cfg(feature = "png-encoding")]
pub use crate::encoder::{Encoder, Writer, EncodingError};
pub use crate::encoder::{Encoder, Writer, StreamWriter, EncodingError};
pub use crate::filter::FilterType;

pub use crate::traits::{Parameter, HasParameters};

0 comments on commit ca2f2ad

Please sign in to comment.