Skip to content

Commit

Permalink
grn_ja_reader: support Zlib compressed values
Browse files Browse the repository at this point in the history
GitHub: #441
  • Loading branch information
s-yata committed Dec 7, 2015
1 parent 64a499d commit 7c338c6
Show file tree
Hide file tree
Showing 2 changed files with 62 additions and 17 deletions.
2 changes: 1 addition & 1 deletion lib/grn_store.h
Expand Up @@ -104,7 +104,6 @@ void grn_ja_check(grn_ctx *ctx, grn_ja *ja);

/*
* grn_ja_reader is designed to improve the performance of sequential access.
* FIXME: Compressed values are not supported yet.
*/
typedef struct {
grn_ja *ja;
Expand Down Expand Up @@ -150,6 +149,7 @@ grn_rc grn_ja_reader_read(grn_ctx *ctx, grn_ja_reader *reader, void *buf);
/*
* grn_ja_reader_pread() reads a part of the current value to `buf`.
* If `offset` and `size` are invalid, the behavior is undefined.
* FIXME: Compressed values are not supported yet.
*/
grn_rc grn_ja_reader_pread(grn_ctx *ctx, grn_ja_reader *reader,
size_t offset, size_t size, void *buf);
Expand Down
77 changes: 61 additions & 16 deletions lib/store.c
Expand Up @@ -1621,18 +1621,9 @@ grn_ja_reader_close(grn_ctx *ctx, grn_ja_reader *reader)
return GRN_SUCCESS;
}

#ifdef GRN_WITH_ZLIB
static grn_rc
grn_ja_reader_seek_zlib(grn_ctx *ctx, grn_ja_reader *reader, grn_id id)
{
// TODO
return GRN_FUNCTION_NOT_IMPLEMENTED;
}
#endif /* GRN_WITH_ZLIB */

#ifdef GRN_WITH_LZ4
#if defined(GRN_WITH_ZLIB) || defined(GRN_WITH_LZ4)
static grn_rc
grn_ja_reader_seek_lz4(grn_ctx *ctx, grn_ja_reader *reader, grn_id id)
grn_ja_reader_seek_compressed(grn_ctx *ctx, grn_ja_reader *reader, grn_id id)
{
grn_ja_einfo *einfo;
void *seg_addr;
Expand Down Expand Up @@ -1676,7 +1667,7 @@ grn_ja_reader_seek_lz4(grn_ctx *ctx, grn_ja_reader *reader, grn_id id)
reader->value_size = (uint32_t)*(uint64_t *)seg_addr;
return GRN_SUCCESS;
}
#endif /* GRN_WITH_LZ4 */
#endif /* defined(GRN_WITH_ZLIB) || defined(GRN_WITH_LZ4) */

static grn_rc
grn_ja_reader_seek_raw(grn_ctx *ctx, grn_ja_reader *reader, grn_id id)
Expand Down Expand Up @@ -1725,12 +1716,12 @@ grn_ja_reader_seek(grn_ctx *ctx, grn_ja_reader *reader, grn_id id)
{
#ifdef GRN_WITH_ZLIB
if (reader->ja->header->flags & GRN_OBJ_COMPRESS_ZLIB) {
return grn_ja_reader_seek_zlib(ctx, reader, id);
return grn_ja_reader_seek_compressed(ctx, reader, id);
}
#endif /* GRN_WITH_ZLIB */
#ifdef GRN_WITH_LZ4
if (reader->ja->header->flags & GRN_OBJ_COMPRESS_LZ4) {
return grn_ja_reader_seek_lz4(ctx, reader, id);
return grn_ja_reader_seek_compressed(ctx, reader, id);
}
#endif /* GRN_WITH_LZ4 */
return grn_ja_reader_seek_raw(ctx, reader, id);
Expand All @@ -1740,8 +1731,62 @@ grn_ja_reader_seek(grn_ctx *ctx, grn_ja_reader *reader, grn_id id)
static grn_rc
grn_ja_reader_read_zlib(grn_ctx *ctx, grn_ja_reader *reader, void *buf)
{
// TODO
return GRN_FUNCTION_NOT_IMPLEMENTED;
uLong dest_size = reader->value_size;
grn_ja_einfo *einfo = (grn_ja_einfo *)reader->einfo;
if (EHUGE_P(einfo)) {
/* TODO: Use z_stream to avoid copy. */
grn_io *io = reader->ja->io;
void *seg_addr;
char *packed_ptr;
uint32_t size, seg_id;
if (reader->packed_size > reader->packed_buf_size) {
void *new_buf = GRN_REALLOC(reader->packed_buf, reader->packed_size);
if (!new_buf) {
return GRN_NO_MEMORY_AVAILABLE;
}
reader->packed_buf = new_buf;
reader->packed_buf_size = reader->packed_size;
}
packed_ptr = (char *)reader->packed_buf;
grn_memcpy(packed_ptr, (char *)reader->body_seg_addr + sizeof(uint64_t),
io->header->segment_size - sizeof(uint64_t));
packed_ptr += io->header->segment_size - sizeof(uint64_t);
size = reader->packed_size - (io->header->segment_size - sizeof(uint64_t));
seg_id = reader->body_seg_id + 1;
while (size > io->header->segment_size) {
GRN_IO_SEG_REF(io, seg_id, seg_addr);
if (!seg_addr) {
return GRN_UNKNOWN_ERROR;
}
grn_memcpy(packed_ptr, seg_addr, io->header->segment_size);
GRN_IO_SEG_UNREF(io, seg_id);
seg_id++;
size -= io->header->segment_size;
packed_ptr += io->header->segment_size;
}
GRN_IO_SEG_REF(io, seg_id, seg_addr);
if (!seg_addr) {
return GRN_UNKNOWN_ERROR;
}
grn_memcpy(packed_ptr, seg_addr, size);
GRN_IO_SEG_UNREF(io, seg_id);
seg_id++;
if (uncompress((Bytef *)buf, &dest_size, (const Bytef *)reader->packed_buf,
reader->packed_size - sizeof(uint64_t)) != Z_OK) {
return GRN_UNKNOWN_ERROR;
}
} else {
char *packed_addr = (char *)reader->body_seg_addr;
packed_addr += reader->body_seg_offset + sizeof(uint64_t);
if (uncompress((Bytef *)buf, &dest_size, (const Bytef *)packed_addr,
reader->packed_size - sizeof(uint64_t)) != Z_OK) {
return GRN_UNKNOWN_ERROR;
}
}
if (dest_size != reader->value_size) {
return GRN_UNKNOWN_ERROR;
}
return GRN_SUCCESS;
}
#endif /* GRN_WITH_ZLIB */

Expand Down

0 comments on commit 7c338c6

Please sign in to comment.