Skip to content

Commit

Permalink
lib: istream-concat now seeks parent streams to correct offset.
Browse files Browse the repository at this point in the history
All of the streams' offsets were somewhat random.
  • Loading branch information
sirainen committed Jan 26, 2016
1 parent 0c3ec25 commit 5c92436
Show file tree
Hide file tree
Showing 2 changed files with 53 additions and 2 deletions.
23 changes: 22 additions & 1 deletion src/lib/istream-concat.c
Expand Up @@ -12,15 +12,19 @@ struct concat_istream {
uoff_t *input_size;

unsigned int cur_idx, unknown_size_idx;
size_t prev_stream_left, prev_skip;
size_t prev_stream_left, prev_stream_skip, prev_skip;
};

static void i_stream_concat_skip(struct concat_istream *cstream);

static void i_stream_concat_close(struct iostream_private *stream,
bool close_parent)
{
struct concat_istream *cstream = (struct concat_istream *)stream;
unsigned int i;

(void)i_stream_concat_skip(cstream);

if (close_parent) {
for (i = 0; cstream->input[i] != NULL; i++)
i_stream_close(cstream->input[i]);
Expand Down Expand Up @@ -53,22 +57,31 @@ i_stream_concat_set_max_buffer_size(struct iostream_private *stream,

static void i_stream_concat_read_next(struct concat_istream *cstream)
{
struct istream *prev_input = cstream->cur_input;
const unsigned char *data;
size_t data_size, size;

i_assert(cstream->cur_input->eof);

if (cstream->prev_stream_skip != 0) {
i_stream_skip(cstream->input[cstream->cur_idx-1], cstream->prev_stream_skip);
cstream->prev_stream_skip = 0;
}

data = i_stream_get_data(cstream->cur_input, &data_size);
cstream->cur_idx++;
cstream->cur_input = cstream->input[cstream->cur_idx];
i_stream_seek(cstream->cur_input, 0);

if (cstream->prev_stream_left > 0 || cstream->istream.pos == 0) {
/* all the pending data is already in w_buffer */
cstream->prev_stream_skip = data_size;
cstream->prev_stream_left += data_size;
i_assert(cstream->prev_stream_left ==
cstream->istream.pos - cstream->istream.skip);
return;
}
i_assert(cstream->prev_stream_skip == 0);

/* we already verified that the data size is less than the
maximum buffer size */
Expand All @@ -81,6 +94,7 @@ static void i_stream_concat_read_next(struct concat_istream *cstream)

cstream->prev_stream_left = data_size;
memcpy(cstream->istream.w_buffer, data, data_size);
i_stream_skip(prev_input, data_size);
cstream->istream.skip = 0;
cstream->istream.pos = data_size;
}
Expand All @@ -101,6 +115,9 @@ static void i_stream_concat_skip(struct concat_istream *cstream)
bytes_skipped = 0;
} else {
/* done with the buffer */
i_stream_skip(cstream->input[cstream->cur_idx-1], cstream->prev_stream_skip);
cstream->prev_stream_skip = 0;

bytes_skipped -= cstream->prev_stream_left;
cstream->prev_stream_left = 0;
}
Expand Down Expand Up @@ -187,6 +204,9 @@ static ssize_t i_stream_concat_read(struct istream_private *stream)
}
stream->buffer = stream->w_buffer;

/* we'll copy all the new input to w_buffer. if we skip over
prev_stream_left bytes, the next read will switch to
pointing to cur_input's data directly. */
if (new_bytes_count > size)
new_bytes_count = size;
memcpy(stream->w_buffer + stream->pos,
Expand Down Expand Up @@ -247,6 +267,7 @@ static void i_stream_concat_seek(struct istream_private *stream,
stream->istream.v_offset = v_offset;
stream->skip = stream->pos = 0;
cstream->prev_stream_left = 0;
cstream->prev_stream_skip = 0;
cstream->prev_skip = 0;

cstream->cur_idx = find_v_offset(cstream, &v_offset);
Expand Down
32 changes: 31 additions & 1 deletion src/lib/test-istream-concat.c
Expand Up @@ -45,10 +45,14 @@ static void test_istream_concat_one(unsigned int buffer_size)
test_assert((char)data[j] == input_string[(input->v_offset + j) % STREAM_BYTES]);
}
}
test_assert(i_stream_read(input) == -1);
i_stream_skip(input, i_stream_get_data_size(input));
i_stream_unref(&input);

for (i = 0; i < STREAM_COUNT; i++)
for (i = 0; i < STREAM_COUNT; i++) {
test_assert(i_stream_is_eof(streams[i]));
i_stream_unref(&streams[i]);
}
}

static bool test_istream_concat_random(void)
Expand Down Expand Up @@ -113,6 +117,30 @@ static bool test_istream_concat_random(void)
return !test_has_failed();
}

static void test_istream_concat_early_end(void)
{
struct istream *input, *streams[2];

test_begin("istream concat early end");

streams[0] = test_istream_create("stream");
test_istream_set_size(streams[0], 3);
test_istream_set_allow_eof(streams[0], FALSE);
streams[1] = NULL;

input = i_stream_create_concat(streams);
test_assert(i_stream_read(input) == 3);
test_istream_set_size(streams[0], 5);
test_assert(i_stream_read(input) == 2);
i_stream_skip(input, 5);
i_stream_unref(&input);

test_assert(streams[0]->v_offset == 5);
i_stream_unref(&streams[0]);

test_end();
}

void test_istream_concat(void)
{
unsigned int i;
Expand All @@ -129,4 +157,6 @@ void test_istream_concat(void)
i = 101; /* don't break a T_BEGIN */
} T_END;
test_end();

test_istream_concat_early_end();
}

0 comments on commit 5c92436

Please sign in to comment.