Skip to content

Commit

Permalink
Stream: Add get/fill functions
Browse files Browse the repository at this point in the history
  • Loading branch information
dpt committed Jan 21, 2024
1 parent 8529936 commit 034944c
Show file tree
Hide file tree
Showing 8 changed files with 333 additions and 84 deletions.
42 changes: 42 additions & 0 deletions docs/stream.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,42 @@
DPTLib > io > stream
====================
"stream" is a sub-library of DPTLib for creating data sources and transforms.

The core type `stream_t` is an interface which can be used to wrap, or create, sources of bytes. The interface is primarily byte oriented but block operations are supported too. Byte access is efficiently implemented as a macro.

If a stream implementor also accepts a stream as input you can chain together the streams to build pipelines, similar to Unix pipes.

Implementations of TIFF-style PackBits RLE and "Move to Front" compression and decompression are provided but they're really only intended as examples.

This is a reimplementation of a technique that I was introduced to by Robin Watts and Paul Gardiner.

Creating a stream
-----------------
See `stream_mem_create()` and its associated functions for a concrete example of how to construct a stream.

Using a stream
--------------
Fetch a **byte** by using `stream_getc(stream)`. If the returned value is EOF then the stream has ended.

Fetch a **block** by first using `stream_remaining_and_fill(stream)`. This will attempt to fill the buffer `stream->buf` up. Note that we don't specify by how much the buffer will be filled to allow for flexibility.

Chaining streams
----------------
You can link streams together to create data pipelines that transform data in sequence. The second stream needs to be written to accept a stream as input, e.g. the example `stream_mtfcomp_create()` works like this.

Provided example streams
------------------------
* `stream-stdio`
- Creates a stream from a stdio FILE (read only).
* `stream-mem`
- Creates a stream from a single block of memory (read only).
* `stream-packbits`
- Performs PackBits RLE (de)compression.
* `stream-mtfcomp`
- Provides "move to front" adaptive (de)compression.

Taking it further
-----------------
Streams can be written to "fork" data into two separate pipes, to "cat" two pipes together, to "zip" pipes, etc.

Remember to never cross the streams.
23 changes: 11 additions & 12 deletions include/io/stream.h
Original file line number Diff line number Diff line change
Expand Up @@ -58,16 +58,17 @@ typedef result_t stream_op_t(stream_t *s,
void *opaque);
typedef result_t stream_seek_t(stream_t *s, stream_size_t pos);
typedef int stream_get_t(stream_t *s);
typedef stream_size_t stream_fill_t(stream_t *s, stream_size_t need);
typedef stream_size_t stream_fill_t(stream_t *s);
typedef stream_size_t stream_length_t(stream_t *s);
typedef void stream_destroy_t(stream_t *doomed);

/* This is exposed for efficiency - don't use these directly! */
struct stream
{
const unsigned char *buf; /**< Current buffer pointer. */
const unsigned char *end; /**< End of buffer pointer (exclusive - points to the char after buffer end). */

result_t last; /**< Last error. Set when we return EOF? */
result_t last; /**< Last error. Set whenever we return `stream_EOF`? */

stream_op_t *op;
stream_seek_t *seek;
Expand All @@ -78,31 +79,29 @@ struct stream
};

/*
* Main entry points
* User entry points
*/

stream_op_t stream_op;
stream_seek_t stream_seek;
stream_get_t stream_get; /**< `stream_getc()` is the inline alternative */
stream_fill_t stream_fill;
stream_length_t stream_length;
stream_destroy_t stream_destroy;

/* Get a byte from a stream. Returns EOF (not stream_EOF) at EOF. */
/** Get a byte from a stream. Returns `EOF` (not `stream_EOF`) at EOF. */
#define stream_getc(s) (((s)->buf != (s)->end) ? *(s)->buf++ : (s)->get(s))

/* Put back the last byte gotten. */
/** Put back the last byte gotten. */
#define stream_ungetc(s) --(s)->buf

/* Returns the number of bytes remaining in the current buffer. */
/** Returns the number of bytes remaining in the current buffer. */
#define stream_remaining(s) ((stream_size_t) ((s)->end - (s)->buf))

/* Returns the number of bytes remaining in the current buffer.
/** Returns the number of bytes remaining in the current buffer.
* Will attempt to fill the buffer if it's found to be empty. */
#define stream_remaining_and_fill(s) \
(stream_remaining(s) != 0 ? stream_remaining(s) : (s)->fill(s, 1))

/* As above but attempts to make 'need' bytes available. */
#define stream_remaining_need_and_fill(s, need) \
(stream_remaining(s) >= (need) ? stream_remaining(s) : (s)->fill(s, need))
(stream_remaining(s) != 0 ? stream_remaining(s) : (s)->fill(s))

#ifdef __cplusplus
}
Expand Down
6 changes: 6 additions & 0 deletions libraries/io/stream/stream-mem.c
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,11 @@ static int stream_mem_get(stream_t *s)
return EOF;
}

static stream_size_t stream_mem_fill(stream_t *s)
{
return stream_remaining(s); /* no refill */
}

static stream_size_t stream_mem_length(stream_t *s)
{
stream_mem_t *sm = (stream_mem_t *) s;
Expand Down Expand Up @@ -72,6 +77,7 @@ result_t stream_mem_create(const unsigned char *block,
sm->base.op = NULL;
sm->base.seek = stream_mem_seek;
sm->base.get = stream_mem_get;
sm->base.fill = stream_mem_fill;
sm->base.length = stream_mem_length;
sm->base.destroy = NULL;

Expand Down
52 changes: 29 additions & 23 deletions libraries/io/stream/stream-packbitscomp.c
Original file line number Diff line number Diff line change
Expand Up @@ -60,19 +60,22 @@ static void stream_packbitscomp_reset(stream_packbitscomp_t *c)
c->resume = 0;
}

static int stream_packbitscomp_get(stream_t *s)
static stream_size_t stream_packbitscomp_fill(stream_t *s)
{
stream_packbitscomp_t *sm = (stream_packbitscomp_t *) s;
unsigned char *orig_p;
unsigned char *p;
unsigned char *end;
int n;
int first;

/* are we only called when buffer empty? */
assert(sm->base.buf == sm->base.end);
orig_p = p = (unsigned char *) sm->base.buf; // cur buf ptr
end = sm->buffer + sm->bufsz; // abs buf end

if (p == end)
return stream_remaining(s); // already full

p = sm->buffer;
end = sm->buffer + sm->bufsz;
// assert(stream_remaining(s) < need);

if (sm->resume)
{
Expand Down Expand Up @@ -107,15 +110,16 @@ static int stream_packbitscomp_get(stream_t *s)
if (second != EOF)
stream_ungetc(sm->input);

DBUG(("stream_packbitscomp_get: n=%d", n));
DBUG(("stream_packbitscomp_fill: n=%d", n));

again: /* more of the current run left to pack */

/* we assume here that we need two spare bytes to continue (which is not
* always true) */
if (p + 2 > end)
if (end - p < 2)
{
/* save state */
DBUG(("stream_packbitscomp_fill: %d bytes spare, saving state", end - p));
sm->resume = 1;
sm->n = n;
sm->first = (unsigned char) first;
Expand All @@ -127,7 +131,7 @@ static int stream_packbitscomp_get(stream_t *s)
case Initial: /* Initial state: Set state to 'Run' or 'Literal'. */
if (n > 1)
{
DBUG(("stream_packbitscomp_get: Initial -> Run of %d", MIN(n, 128)));
DBUG(("stream_packbitscomp_fill: Initial -> Run of %d", MIN(n, 128)));
sm->state = Run;

/* Clamp run lengths to a maximum of 128. Technically they could go
Expand All @@ -143,7 +147,7 @@ static int stream_packbitscomp_get(stream_t *s)
}
else
{
DBUG(("stream_packbitscomp_get: Initial -> Literal"));
DBUG(("stream_packbitscomp_fill: Initial -> Literal"));
sm->state = Literal;

sm->lastliteral = p;
Expand All @@ -155,7 +159,7 @@ static int stream_packbitscomp_get(stream_t *s)
case Literal: /* Last object was a literal. */
if (n > 1)
{
DBUG(("stream_packbitscomp_get: Literal -> Run of %d", MIN(n, 128)));
DBUG(("stream_packbitscomp_fill: Literal -> Run of %d", MIN(n, 128)));
sm->state = LiteralRun;

*p++ = (unsigned char)(-MIN(n, 128) + 1);
Expand All @@ -167,7 +171,7 @@ static int stream_packbitscomp_get(stream_t *s)
}
else
{
DBUG(("stream_packbitscomp_get: Literal -> Literal"));
DBUG(("stream_packbitscomp_fill: Literal -> Literal"));

assert(sm->lastliteral);

Expand All @@ -186,7 +190,7 @@ static int stream_packbitscomp_get(stream_t *s)
case Run: /* Last object was a run. */
if (n > 1)
{
DBUG(("stream_packbitscomp_get: Run -> Run"));
DBUG(("stream_packbitscomp_fill: Run -> Run"));

*p++ = (unsigned char)(-MIN(n, 128) + 1);
*p++ = (unsigned char) first;
Expand All @@ -197,7 +201,7 @@ static int stream_packbitscomp_get(stream_t *s)
}
else
{
DBUG(("stream_packbitscomp_get: Run -> Literal"));
DBUG(("stream_packbitscomp_fill: Run -> Literal"));
sm->state = Literal;

sm->lastliteral = p;
Expand All @@ -218,32 +222,33 @@ static int stream_packbitscomp_get(stream_t *s)
* which case we convert literal-run-literal to a single literal. */
if (n == 1 && p[-2] == (unsigned char) -1 && ll <= 125)
{
DBUG(("stream_packbitscomp_get: LiteralRun merge literal-run-literal"));
DBUG(("stream_packbitscomp_fill: LiteralRun merge literal-run-literal"));
ll += 2; /* ..127 */
sm->state = (ll == 127) ? Initial : Literal;
*sm->lastliteral = ll;
p[-2] = p[-1];
}
else
{
DBUG(("stream_packbitscomp_get: LiteralRun -> Run"));
DBUG(("stream_packbitscomp_fill: LiteralRun -> Run"));
sm->state = Run;
}
goto again;
}
}
}

if (p == sm->buffer)
if (p == orig_p)
{
DBUG(("stream_packbitscomp_get: no bytes generated in decomp"));
return EOF; /* EOF at start */
DBUG(("stream_packbitscomp_fill: no bytes generated in decomp"));
// EOF was returned here in older code...
}
else
{
sm->base.end = p;
}

sm->base.buf = sm->buffer;
sm->base.end = p;

return *sm->base.buf++;
return stream_remaining(s);
}

result_t stream_packbitscomp_create(stream_t *input, int bufsz, stream_t **s)
Expand All @@ -270,7 +275,8 @@ result_t stream_packbitscomp_create(stream_t *input, int bufsz, stream_t **s)

sp->base.op = stream_packbitscomp_op;
sp->base.seek = NULL; /* can't seek */
sp->base.get = stream_packbitscomp_get;
sp->base.get = stream_get;
sp->base.fill = stream_packbitscomp_fill;
sp->base.length = NULL; /* unknown length */
sp->base.destroy = NULL;

Expand Down
Loading

0 comments on commit 034944c

Please sign in to comment.