forked from zeromq/libzmq
-
Notifications
You must be signed in to change notification settings - Fork 0
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
- Loading branch information
Showing
7 changed files
with
545 additions
and
39 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,313 @@ | ||
/* | ||
Copyright (c) 2007-2010 iMatix Corporation | ||
This file is part of 0MQ. | ||
0MQ is free software; you can redistribute it and/or modify it under | ||
the terms of the Lesser GNU General Public License as published by | ||
the Free Software Foundation; either version 3 of the License, or | ||
(at your option) any later version. | ||
0MQ is distributed in the hope that it will be useful, | ||
but WITHOUT ANY WARRANTY; without even the implied warranty of | ||
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the | ||
Lesser GNU General Public License for more details. | ||
You should have received a copy of the Lesser GNU General Public License | ||
along with this program. If not, see <http://www.gnu.org/licenses/>. | ||
*/ | ||
|
||
#include "../include/zmq.h" | ||
|
||
#include <sys/types.h> | ||
#include <sys/stat.h> | ||
#include <fcntl.h> | ||
#include <string.h> | ||
#include <sstream> | ||
#include <algorithm> | ||
|
||
#ifdef ZMQ_HAVE_WINDOWS | ||
#include <io.h> | ||
#else | ||
#include <unistd.h> | ||
#endif | ||
|
||
#include "atomic_counter.hpp" | ||
#include "msg_store.hpp" | ||
#include "err.hpp" | ||
|
||
zmq::msg_store_t::msg_store_t (int64_t filesize_, size_t block_size_) : | ||
fd (-1), | ||
filesize (filesize_), | ||
file_pos (0), | ||
write_pos (0), | ||
read_pos (0), | ||
block_size (block_size_), | ||
write_buf_start_addr (0) | ||
{ | ||
zmq_assert (filesize > 0); | ||
zmq_assert (block_size > 0); | ||
|
||
buf1 = new (std::nothrow) char [block_size]; | ||
zmq_assert (buf1); | ||
|
||
buf2 = new (std::nothrow) char [block_size]; | ||
zmq_assert (buf2); | ||
|
||
read_buf = write_buf = buf1; | ||
} | ||
|
||
zmq::msg_store_t::~msg_store_t () | ||
{ | ||
delete [] buf1; | ||
delete [] buf2; | ||
|
||
if (fd == -1) | ||
return; | ||
|
||
#ifdef ZMQ_HAVE_WINDOWS | ||
int rc = _close (fd); | ||
#else | ||
int rc = close (fd); | ||
#endif | ||
errno_assert (rc == 0); | ||
|
||
#ifdef ZMQ_HAVE_WINDOWS | ||
rc = _unlink (filename.c_str ()); | ||
#else | ||
rc = unlink (filename.c_str ()); | ||
#endif | ||
errno_assert (rc == 0); | ||
} | ||
|
||
int zmq::msg_store_t::init () | ||
{ | ||
static zmq::atomic_counter_t seqnum (0); | ||
|
||
// Get process ID. | ||
#ifdef ZMQ_HAVE_WINDOWS | ||
int pid = GetCurrentThreadId (); | ||
#else | ||
pid_t pid = getpid (); | ||
#endif | ||
|
||
std::ostringstream outs; | ||
outs << "zmq_" << pid << '_' << seqnum.get () << ".swap"; | ||
filename = outs.str (); | ||
|
||
seqnum.add (1); | ||
|
||
// Open the backing file. | ||
#ifdef ZMQ_HAVE_WINDOWS | ||
fd = _open (filename.c_str (), _O_RDWR | _O_CREAT, 0600); | ||
#else | ||
fd = open (filename.c_str (), O_RDWR | O_CREAT, 0600); | ||
#endif | ||
if (fd == -1) | ||
return -1; | ||
|
||
#ifdef ZMQ_HAVE_LINUX | ||
// Enable more aggresive read-ahead optimization. | ||
posix_fadvise (fd, 0, filesize, POSIX_FADV_SEQUENTIAL); | ||
#endif | ||
return 0; | ||
} | ||
|
||
bool zmq::msg_store_t::store (zmq_msg_t *msg_) | ||
{ | ||
size_t msg_size = zmq_msg_size (msg_); | ||
|
||
// Check buffer space availability. | ||
// NOTE: We always keep one byte open. | ||
if (buffer_space () <= (int64_t) (sizeof msg_size + 1 + msg_size)) | ||
return false; | ||
|
||
// Don't store the ZMQ_MSG_SHARED flag. | ||
uint8_t msg_flags = msg_->flags & ~ZMQ_MSG_SHARED; | ||
|
||
// Write message length, flags, and message body. | ||
copy_to_file (&msg_size, sizeof msg_size); | ||
copy_to_file (&msg_flags, sizeof msg_flags); | ||
copy_to_file (zmq_msg_data (msg_), msg_size); | ||
|
||
zmq_msg_close (msg_); | ||
|
||
return true; | ||
} | ||
|
||
void zmq::msg_store_t::fetch (zmq_msg_t *msg_) | ||
{ | ||
// There must be at least one message available. | ||
zmq_assert (read_pos != write_pos); | ||
|
||
// Retrieve the message size. | ||
size_t msg_size; | ||
copy_from_file (&msg_size, sizeof msg_size); | ||
|
||
// Initialize the message. | ||
zmq_msg_init_size (msg_, msg_size); | ||
|
||
// Retrieve the message flags. | ||
copy_from_file (&msg_->flags, sizeof msg_->flags); | ||
|
||
// Retrieve the message payload. | ||
copy_from_file (zmq_msg_data (msg_), msg_size); | ||
} | ||
|
||
void zmq::msg_store_t::commit () | ||
{ | ||
commit_pos = write_pos; | ||
} | ||
|
||
void zmq::msg_store_t::rollback () | ||
{ | ||
if (commit_pos == write_pos || read_pos == write_pos) | ||
return; | ||
|
||
|
||
if (write_pos > read_pos) | ||
zmq_assert (read_pos <= commit_pos && commit_pos <= write_pos); | ||
else | ||
zmq_assert (read_pos <= commit_pos || commit_pos <= write_pos); | ||
|
||
if (commit_pos / block_size == read_pos / block_size) { | ||
write_buf_start_addr = commit_pos % block_size; | ||
write_buf = read_buf; | ||
} | ||
else if (commit_pos / block_size != write_pos / block_size) { | ||
write_buf_start_addr = commit_pos % block_size; | ||
fill_buf (write_buf, write_buf_start_addr); | ||
} | ||
|
||
write_pos = commit_pos; | ||
} | ||
|
||
bool zmq::msg_store_t::empty () | ||
{ | ||
return read_pos == write_pos; | ||
} | ||
|
||
bool zmq::msg_store_t::full () | ||
{ | ||
return buffer_space () == 1; | ||
} | ||
|
||
void zmq::msg_store_t::copy_from_file (void *buffer_, size_t count_) | ||
{ | ||
char *ptr = (char*) buffer_; | ||
size_t n, n_left = count_; | ||
|
||
while (n_left > 0) { | ||
|
||
n = std::min (n_left, std::min ((size_t) (filesize - read_pos), | ||
(size_t) (block_size - read_pos % block_size))); | ||
|
||
memcpy (ptr, &read_buf [read_pos % block_size], n); | ||
ptr += n; | ||
|
||
read_pos = (read_pos + n) % filesize; | ||
if (read_pos % block_size == 0) { | ||
if (read_pos / block_size == write_pos / block_size) | ||
read_buf = write_buf; | ||
else | ||
fill_buf (read_buf, read_pos); | ||
} | ||
|
||
n_left -= n; | ||
} | ||
} | ||
|
||
void zmq::msg_store_t::copy_to_file (const void *buffer_, size_t count_) | ||
{ | ||
char *ptr = (char*) buffer_; | ||
size_t n, n_left = count_; | ||
|
||
while (n_left > 0) { | ||
|
||
n = std::min (n_left, std::min ((size_t) (filesize - write_pos), | ||
(size_t) (block_size - write_pos % block_size))); | ||
|
||
memcpy (&write_buf [write_pos % block_size], ptr, n); | ||
ptr += n; | ||
|
||
write_pos = (write_pos + n) % filesize; | ||
if (write_pos % block_size == 0) { | ||
|
||
save_write_buf (); | ||
write_buf_start_addr = write_pos; | ||
|
||
if (write_buf == read_buf) { | ||
if (read_buf == buf2) | ||
write_buf = buf1; | ||
else | ||
write_buf = buf2; | ||
} | ||
} | ||
|
||
n_left -= n; | ||
} | ||
} | ||
|
||
void zmq::msg_store_t::fill_buf (char *buf, int64_t pos) | ||
{ | ||
if (file_pos != pos) { | ||
#ifdef ZMQ_HAVE_WINDOWS | ||
__int64 offset = _lseeki64 (fd, pos, SEEK_SET); | ||
#else | ||
off_t offset = lseek (fd, (off_t) pos, SEEK_SET); | ||
#endif | ||
errno_assert (offset == pos); | ||
file_pos = pos; | ||
} | ||
|
||
size_t i = 0; | ||
size_t n = std::min (block_size, (size_t) (filesize - file_pos)); | ||
|
||
while (i < n) { | ||
#ifdef ZMQ_HAVE_WINDOWS | ||
int rc = _read (fd, &buf [i], n - i); | ||
#else | ||
ssize_t rc = read (fd, &buf [i], n - i); | ||
#endif | ||
errno_assert (rc > 0); | ||
i += rc; | ||
} | ||
|
||
file_pos += n; | ||
} | ||
|
||
void zmq::msg_store_t::save_write_buf () | ||
{ | ||
if (file_pos != write_buf_start_addr) { | ||
#ifdef ZMQ_HAVE_WINDOWS | ||
__int64 offset = _lseeki64 (fd, write_buf_start_addr, SEEK_SET); | ||
#else | ||
off_t offset = lseek (fd, (off_t) write_buf_start_addr, SEEK_SET); | ||
#endif | ||
errno_assert (offset == write_buf_start_addr); | ||
file_pos = write_buf_start_addr; | ||
} | ||
|
||
size_t i = 0; | ||
size_t n = std::min (block_size, (size_t) (filesize - file_pos)); | ||
|
||
while (i < n) { | ||
#ifdef ZMQ_HAVE_WINDOWS | ||
int rc = _write (fd, &write_buf [i], n - i); | ||
#else | ||
ssize_t rc = write (fd, &write_buf [i], n - i); | ||
#endif | ||
errno_assert (rc > 0); | ||
i += rc; | ||
} | ||
|
||
file_pos += n; | ||
} | ||
|
||
int64_t zmq::msg_store_t::buffer_space () | ||
{ | ||
if (write_pos < read_pos) | ||
return read_pos - write_pos; | ||
|
||
return filesize - (write_pos - read_pos); | ||
} |
Oops, something went wrong.