Skip to content

Commit

Permalink
AsyncConnection: Fix potential return code overflow
Browse files Browse the repository at this point in the history
Signed-off-by: Haomai Wang <haomai@xsky.com>
  • Loading branch information
yuyuyu101 committed Dec 5, 2015
1 parent 8e95d4e commit a87aaf3
Show file tree
Hide file tree
Showing 2 changed files with 38 additions and 38 deletions.
52 changes: 26 additions & 26 deletions src/msg/async/AsyncConnection.cc
Expand Up @@ -210,9 +210,9 @@ AsyncConnection::~AsyncConnection()

/* return -1 means `fd` occurs error or closed, it should be closed
* return 0 means EAGAIN or EINTR */
int AsyncConnection::read_bulk(int fd, char *buf, int len)
ssize_t AsyncConnection::read_bulk(int fd, char *buf, unsigned len)
{
int nread = ::read(fd, buf, len);
ssize_t nread = ::read(fd, buf, len);
if (nread == -1) {
if (errno == EAGAIN || errno == EINTR) {
nread = 0;
Expand Down Expand Up @@ -294,12 +294,12 @@ void AsyncConnection::restore_sigpipe()

// return the length of msg needed to be sent,
// < 0 means error occured
int AsyncConnection::do_sendmsg(struct msghdr &msg, int len, bool more)
ssize_t AsyncConnection::do_sendmsg(struct msghdr &msg, unsigned len, bool more)
{
suppress_sigpipe();

while (len > 0) {
int r;
ssize_t r;
#if defined(MSG_NOSIGNAL)
r = ::sendmsg(sd, &msg, MSG_NOSIGNAL);
#else
Expand Down Expand Up @@ -338,12 +338,12 @@ int AsyncConnection::do_sendmsg(struct msghdr &msg, int len, bool more)
}
restore_sigpipe();
}
return len;
return (ssize_t)len;
}

// return the remaining bytes, it may larger than the length of ptr
// else return < 0 means error
int AsyncConnection::_try_send(bufferlist &send_bl, bool send)
ssize_t AsyncConnection::_try_send(bufferlist &send_bl, bool send)
{
ldout(async_msgr->cct, 20) << __func__ << " send bl length is " << send_bl.length() << dendl;
if (send_bl.length()) {
Expand Down Expand Up @@ -373,7 +373,7 @@ int AsyncConnection::_try_send(bufferlist &send_bl, bool send)
memset(&msg, 0, sizeof(msg));
msg.msg_iovlen = 0;
msg.msg_iov = msgvec;
int msglen = 0;
unsigned msglen = 0;
while (size > 0) {
msgvec[msg.msg_iovlen].iov_base = (void*)(pb->c_str());
msgvec[msg.msg_iovlen].iov_len = pb->length();
Expand All @@ -383,7 +383,7 @@ int AsyncConnection::_try_send(bufferlist &send_bl, bool send)
size--;
}

int r = do_sendmsg(msg, msglen, false);
ssize_t r = do_sendmsg(msg, msglen, false);
if (r < 0)
return r;

Expand Down Expand Up @@ -431,7 +431,7 @@ int AsyncConnection::_try_send(bufferlist &send_bl, bool send)
//
// return the remaining bytes, 0 means this buffer is finished
// else return < 0 means error
int AsyncConnection::read_until(uint64_t len, char *p)
ssize_t AsyncConnection::read_until(unsigned len, char *p)
{
ldout(async_msgr->cct, 25) << __func__ << " len is " << len << " state_offset is "
<< state_offset << dendl;
Expand All @@ -443,7 +443,7 @@ int AsyncConnection::read_until(uint64_t len, char *p)
}
}

int r = 0;
ssize_t r = 0;
uint64_t left = len - state_offset;
if (recv_end > recv_start) {
uint64_t to_read = MIN(recv_end - recv_start, left);
Expand Down Expand Up @@ -506,7 +506,7 @@ int AsyncConnection::read_until(uint64_t len, char *p)

void AsyncConnection::process()
{
int r = 0;
ssize_t r = 0;
int prev_state = state;
bool already_dispatch_writer = false;
Mutex::Locker l(lock);
Expand Down Expand Up @@ -613,7 +613,7 @@ void AsyncConnection::process()
ceph_msg_header header;
ceph_msg_header_old oldheader;
__u32 header_crc = 0;
int len;
unsigned len;
if (has_feature(CEPH_FEATURE_NOSRCADDR))
len = sizeof(header);
else
Expand Down Expand Up @@ -721,7 +721,7 @@ void AsyncConnection::process()
case STATE_OPEN_MESSAGE_READ_FRONT:
{
// read front
int front_len = current_header.front_len;
unsigned front_len = current_header.front_len;
if (front_len) {
if (!front.length()) {
bufferptr ptr = buffer::create(front_len);
Expand All @@ -744,7 +744,7 @@ void AsyncConnection::process()
case STATE_OPEN_MESSAGE_READ_MIDDLE:
{
// read middle
int middle_len = current_header.middle_len;
unsigned middle_len = current_header.middle_len;
if (middle_len) {
if (!middle.length()) {
bufferptr ptr = buffer::create(middle_len);
Expand All @@ -767,8 +767,8 @@ void AsyncConnection::process()
case STATE_OPEN_MESSAGE_READ_DATA_PREPARE:
{
// read data
uint64_t data_len = le32_to_cpu(current_header.data_len);
int data_off = le32_to_cpu(current_header.data_off);
unsigned data_len = le32_to_cpu(current_header.data_len);
unsigned data_off = le32_to_cpu(current_header.data_off);
if (data_len) {
// get a buffer
map<ceph_tid_t,pair<bufferlist,int> >::iterator p = rx_buffers.find(current_header.tid);
Expand Down Expand Up @@ -797,7 +797,7 @@ void AsyncConnection::process()
{
while (msg_left > 0) {
bufferptr bp = data_blp.get_current_ptr();
uint64_t read = MIN(bp.length(), msg_left);
unsigned read = MIN(bp.length(), msg_left);
r = read_until(read, bp.c_str());
if (r < 0) {
ldout(async_msgr->cct, 1) << __func__ << " read data error " << dendl;
Expand All @@ -821,7 +821,7 @@ void AsyncConnection::process()
{
ceph_msg_footer footer;
ceph_msg_footer_old old_footer;
int len;
unsigned len;
// footer
if (has_feature(CEPH_FEATURE_MSG_AUTH))
len = sizeof(footer);
Expand Down Expand Up @@ -1011,9 +1011,9 @@ void AsyncConnection::process()
fault();
}

int AsyncConnection::_process_connection()
ssize_t AsyncConnection::_process_connection()
{
int r = 0;
ssize_t r = 0;

switch(state) {
case STATE_WAIT_SEND:
Expand Down Expand Up @@ -1620,10 +1620,10 @@ int AsyncConnection::handle_connect_reply(ceph_msg_connect &connect, ceph_msg_co
return -1;
}

int AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &authorizer_bl,
bufferlist &authorizer_reply)
ssize_t AsyncConnection::handle_connect_msg(ceph_msg_connect &connect, bufferlist &authorizer_bl,
bufferlist &authorizer_reply)
{
int r = 0;
ssize_t r = 0;
ceph_msg_connect_reply reply;
bufferlist reply_bl;

Expand Down Expand Up @@ -2300,7 +2300,7 @@ void AsyncConnection::prepare_send_message(uint64_t features, Message *m, buffer
bl.append(m->get_data());
}

int AsyncConnection::write_message(Message *m, bufferlist& bl)
ssize_t AsyncConnection::write_message(Message *m, bufferlist& bl)
{
assert(can_write == CANWRITE);
m->set_seq(out_seq.inc());
Expand Down Expand Up @@ -2381,7 +2381,7 @@ int AsyncConnection::write_message(Message *m, bufferlist& bl)
logger->inc(l_msgr_send_bytes, complete_bl.length());
ldout(async_msgr->cct, 20) << __func__ << " sending " << m->get_seq()
<< " " << m << dendl;
int rc = _try_send(complete_bl);
ssize_t rc = _try_send(complete_bl);
if (rc < 0) {
ldout(async_msgr->cct, 1) << __func__ << " error sending " << m << ", "
<< cpp_strerror(errno) << dendl;
Expand Down Expand Up @@ -2457,7 +2457,7 @@ void AsyncConnection::handle_write()
{
ldout(async_msgr->cct, 10) << __func__ << " started." << dendl;
bufferlist bl;
int r = 0;
ssize_t r = 0;

write_lock.Lock();
if (can_write == CANWRITE) {
Expand Down
24 changes: 12 additions & 12 deletions src/msg/async/AsyncConnection.h
Expand Up @@ -45,25 +45,25 @@ class AsyncMessenger;
*/
class AsyncConnection : public Connection {

int read_bulk(int fd, char *buf, int len);
ssize_t read_bulk(int fd, char *buf, unsigned len);
void suppress_sigpipe();
void restore_sigpipe();
int do_sendmsg(struct msghdr &msg, int len, bool more);
int try_send(bufferlist &bl, bool send=true) {
ssize_t do_sendmsg(struct msghdr &msg, unsigned len, bool more);
ssize_t try_send(bufferlist &bl, bool send=true) {
Mutex::Locker l(write_lock);
return _try_send(bl, send);
}
// if "send" is false, it will only append bl to send buffer
// the main usage is avoid error happen outside messenger threads
int _try_send(bufferlist &bl, bool send=true);
int _send(Message *m);
ssize_t _try_send(bufferlist &bl, bool send=true);
ssize_t _send(Message *m);
void prepare_send_message(uint64_t features, Message *m, bufferlist &bl);
int read_until(uint64_t needed, char *p);
int _process_connection();
ssize_t read_until(unsigned needed, char *p);
ssize_t _process_connection();
void _connect();
void _stop();
int handle_connect_reply(ceph_msg_connect &connect, ceph_msg_connect_reply &r);
int handle_connect_msg(ceph_msg_connect &m, bufferlist &aubl, bufferlist &bl);
ssize_t handle_connect_msg(ceph_msg_connect &m, bufferlist &aubl, bufferlist &bl);
void was_session_reset();
void fault();
void discard_out_queue();
Expand All @@ -72,8 +72,8 @@ class AsyncConnection : public Connection {
int randomize_out_seq();
void handle_ack(uint64_t seq);
void _send_keepalive_or_ack(bool ack=false, utime_t *t=NULL);
int write_message(Message *m, bufferlist& bl);
int _reply_accept(char tag, ceph_msg_connect &connect, ceph_msg_connect_reply &reply,
ssize_t write_message(Message *m, bufferlist& bl);
ssize_t _reply_accept(char tag, ceph_msg_connect &connect, ceph_msg_connect_reply &reply,
bufferlist authorizer_reply) {
bufferlist reply_bl;
reply.tag = tag;
Expand All @@ -83,7 +83,7 @@ class AsyncConnection : public Connection {
if (reply.authorizer_len) {
reply_bl.append(authorizer_reply.c_str(), authorizer_reply.length());
}
int r = try_send(reply_bl);
ssize_t r = try_send(reply_bl);
if (r < 0)
return -1;

Expand Down Expand Up @@ -266,7 +266,7 @@ class AsyncConnection : public Connection {
// Open state
utime_t recv_stamp;
utime_t throttle_stamp;
uint64_t msg_left;
unsigned msg_left;
ceph_msg_header current_header;
bufferlist data_buf;
bufferlist::iterator data_blp;
Expand Down

0 comments on commit a87aaf3

Please sign in to comment.