Permalink
Browse files

Perform all checksumming in a separate thread to speed up the hash se…

…arch in the rzip phase.
  • Loading branch information...
1 parent f4165ec commit 5edf8471d1eedf29ad2e477653062b3b7ca33c9e @ckolivas committed Mar 11, 2012
Showing with 76 additions and 26 deletions.
  1. +10 −0 lrzip_private.h
  2. +47 −22 rzip.c
  3. +14 −3 stream.c
  4. +5 −1 stream.h
View
10 lrzip_private.h
@@ -297,6 +297,12 @@ struct sliding_buffer {
int fd; /* The fd of the mmap */
};
+struct checksum {
+ uint32_t *cksum;
+ uchar *buf;
+ i64 len;
+};
+
struct rzip_control {
char *infile;
FILE *inFILE; // if a FILE is being read from
@@ -347,9 +353,13 @@ struct rzip_control {
uchar *hash;
unsigned char eof;
unsigned char magic_written;
+
+ pthread_mutex_t cksumlock;
md5_ctx ctx;
uchar md5_resblock[MD5_DIGEST_SIZE];
i64 md5_read; // How far into the file the md5 has done so far
+ struct checksum checksum;
+
const char *util_infile;
char delete_infile;
const char *util_outfile;
View
69 rzip.c
@@ -556,6 +556,28 @@ static void show_distrib(rzip_control *control, struct rzip_state *st)
primary * 100.0 / (total ? : 1));
}
+/* Perform all checksumming in a separate thread to speed up the hash search. */
+static void *cksumthread(void *data)
+{
+ rzip_control *control = (rzip_control *)data;
+
+ pthread_detach(pthread_self());
+
+ *control->checksum.cksum = CrcUpdate(*control->checksum.cksum, control->checksum.buf, control->checksum.len);
+ if (!NO_MD5)
+ md5_process_bytes(control->checksum.buf, control->checksum.len, &control->ctx);
+ free(control->checksum.buf);
+ unlock_mutex(control, &control->cksumlock);
+ return NULL;
+}
+
+static void cksum_update(rzip_control *control)
+{
+ pthread_t thread;
+
+ create_pthread(control, &thread, NULL, cksumthread, control);
+}
+
static bool hash_search(rzip_control *control, struct rzip_state *st, double pct_base, double pct_multiple)
{
struct sliding_buffer *sb = &control->sb;
@@ -670,18 +692,20 @@ static bool hash_search(rzip_control *control, struct rzip_state *st, double pct
}
}
- if (p > (i64)cksum_limit) {
- i64 n = MIN(st->chunk_size - p, control->page_size);
- uchar *ckbuf = malloc(n);
-
- if (unlikely(!ckbuf))
+ if (p > cksum_limit) {
+ /* We lock the mutex here and unlock it in the
+ * cksumthread. This lock protects all the data in
+ * control->checksum.
+ */
+ lock_mutex(control, &control->cksumlock);
+ control->checksum.len = MIN(st->chunk_size - p, control->page_size);
+ control->checksum.buf = malloc(control->checksum.len);
+ if (unlikely(!control->checksum.buf))
fatal_return(("Failed to malloc ckbuf in hash_search\n"), false);
- control->do_mcpy(control, ckbuf, cksum_limit, n);
- st->cksum = CrcUpdate(st->cksum, ckbuf, n);
- if (!NO_MD5)
- md5_process_bytes(ckbuf, n, &control->ctx);
- cksum_limit += n;
- free(ckbuf);
+ control->do_mcpy(control, control->checksum.buf, cksum_limit, control->checksum.len);
+ control->checksum.cksum = &st->cksum;
+ cksum_update(control);
+ cksum_limit += control->checksum.len;
}
}
@@ -692,19 +716,19 @@ static bool hash_search(rzip_control *control, struct rzip_state *st, double pct
put_literal(control, st, st->last_match, st->chunk_size);
if (st->chunk_size > cksum_limit) {
- i64 n = st->chunk_size - cksum_limit;
- uchar *ckbuf = malloc(n);
-
- if (unlikely(!ckbuf))
+ lock_mutex(control, &control->cksumlock);
+ control->checksum.len = st->chunk_size - cksum_limit;
+ control->checksum.buf = malloc(control->checksum.len);
+ if (unlikely(!control->checksum.buf))
fatal_return(("Failed to malloc ckbuf in hash_search\n"), false);
- control->do_mcpy(control, ckbuf, cksum_limit, n);
- st->cksum = CrcUpdate(st->cksum, ckbuf, n);
- if (!NO_MD5)
- md5_process_bytes(ckbuf, n, &control->ctx);
- cksum_limit += n;
- free(ckbuf);
+ control->do_mcpy(control, control->checksum.buf, cksum_limit, control->checksum.len);
+ control->checksum.cksum = &st->cksum;
+ cksum_update(control);
+ cksum_limit += control->checksum.len;
}
+ wait_mutex(control, &control->cksumlock);
+
if (unlikely(!put_literal(control, st, 0, 0)))
return false;
if (unlikely(!put_u32(control, st->ss, st->cksum)))
@@ -851,7 +875,8 @@ bool rzip_fd(rzip_control *control, int fd_in, int fd_out)
i64 free_space;
if (!NO_MD5)
- md5_init_ctx (&control->ctx);
+ md5_init_ctx(&control->ctx);
+ init_mutex(control, &control->cksumlock);
st = calloc(sizeof(*st), 1);
if (unlikely(!st))
View
17 stream.c
@@ -104,27 +104,38 @@ static pthread_mutex_t output_lock = PTHREAD_MUTEX_INITIALIZER;
static pthread_cond_t output_cond = PTHREAD_COND_INITIALIZER;
static pthread_t *threads;
-static bool init_mutex(rzip_control *control, pthread_mutex_t *mutex)
+bool init_mutex(rzip_control *control, pthread_mutex_t *mutex)
{
if (unlikely(pthread_mutex_init(mutex, NULL)))
fatal_return(("pthread_mutex_init failed"), false);
return true;
}
-static bool unlock_mutex(rzip_control *control, pthread_mutex_t *mutex)
+bool unlock_mutex(rzip_control *control, pthread_mutex_t *mutex)
{
if (unlikely(pthread_mutex_unlock(mutex)))
fatal_return(("pthread_mutex_unlock failed"), false);
return true;
}
-static bool lock_mutex(rzip_control *control, pthread_mutex_t *mutex)
+bool lock_mutex(rzip_control *control, pthread_mutex_t *mutex)
{
if (unlikely(pthread_mutex_lock(mutex)))
fatal_return(("pthread_mutex_lock failed"), false);
return true;
}
+/* Lock and unlock a mutex */
+bool wait_mutex(rzip_control *control, pthread_mutex_t *mutex)
+{
+ bool ret;
+
+ ret = lock_mutex(control, mutex);
+ if (likely(ret))
+ ret = unlock_mutex(control, mutex);
+ return ret;
+}
+
static bool cond_wait(rzip_control *control, pthread_cond_t *cond, pthread_mutex_t *mutex)
{
if (unlikely(pthread_cond_wait(cond, mutex)))
View
6 stream.h
@@ -23,9 +23,13 @@
#include "lrzip_private.h"
#include <pthread.h>
-bool create_pthread(pthread_t *thread, pthread_attr_t *attr,
+bool create_pthread(rzip_control *control, pthread_t *thread, pthread_attr_t * attr,
void * (*start_routine)(void *), void *arg);
bool join_pthread(pthread_t th, void **thread_return);
+bool init_mutex(rzip_control *control, pthread_mutex_t *mutex);
+bool unlock_mutex(rzip_control *control, pthread_mutex_t *mutex);
+bool lock_mutex(rzip_control *control, pthread_mutex_t *mutex);
+bool wait_mutex(rzip_control *control, pthread_mutex_t *mutex);
ssize_t write_1g(rzip_control *control, void *buf, i64 len);
ssize_t read_1g(rzip_control *control, int fd, void *buf, i64 len);
i64 get_readseek(rzip_control *control, int fd);

0 comments on commit 5edf847

Please sign in to comment.