Skip to content

Commit df71b80

Browse files
author
zhengshuxin
committed
Adding zero-copy sending.
1 parent ac058f4 commit df71b80

File tree

2 files changed

+75
-0
lines changed

2 files changed

+75
-0
lines changed

lib_acl_cpp/include/acl_cpp/stream/ostream.hpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -55,6 +55,8 @@ class ACL_CPP_API ostream : virtual public stream , public pipe_stream {
5555
int sendto(const void* data, size_t size,
5656
const struct sockaddr* dest_addr, int addrlen, int flags = 0);
5757

58+
ssize_t send(const void* buf, size_t len, int flags);
59+
5860
/**
5961
* 如果采用写缓冲方式,则最后需要调用本函数刷写缓冲区
6062
* @return {bool} 返回 false 表示写失败,有可能是连接关闭

lib_acl_cpp/src/stream/ostream.cpp

Lines changed: 73 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -50,6 +50,79 @@ int ostream::sendto(const void* data, size_t len,
5050
#endif
5151
}
5252

53+
ssize_t ostream::send(const void* buf, size_t len, int flags)
54+
{
55+
acl_assert(stream_);
56+
ACL_SOCKET fd = ACL_VSTREAM_SOCK(stream_);
57+
58+
#if defined(_WIN32) || defined(_WIN64)
59+
return ::send(fd, (char*) buf, (int) len, flags);
60+
#elif defined(__linux__)
61+
ssize_t ret = ::send(fd, buf, len, flags);
62+
# ifndef MSG_ZEROCOPY
63+
return ret;
64+
# else
65+
if (ret < 0) {
66+
return -1;
67+
}
68+
69+
if (!(flags & MSG_ZEROCOPY)) {
70+
return ret;
71+
}
72+
73+
struct pollfd pfd;
74+
pfd.fd = fd;
75+
pfd.events = POLLERR;
76+
int ret = poll(&pfd, 1, 5000);
77+
if (ret < 0) {
78+
return -1;
79+
}
80+
if (ret == 0) {
81+
acl_set_error(ACL_ETIMEDOUT);
82+
return -1;
83+
}
84+
85+
86+
char cmsgbuf[1024];
87+
char payload[1];
88+
struct msghdr msg;
89+
struct iovec iov;
90+
memset(&msg, 0, sizeof(msg));
91+
iov.iov_base = payload;
92+
iov.iov_len = sizeof(payload);
93+
msg.msg_iov = &iov;
94+
msg.msg_iovlen = 1;
95+
msg.msg_cnotrol = cmsgbuf;
96+
msg.msg_controllen = sizeof(cmsgbuf);
97+
98+
ssize_t n = ::recvmsg(fd, &msg, MSG_ERRQUEUE);
99+
if (n < 0) {
100+
return -1;
101+
}
102+
103+
struct cmsghdr *cmsg = CMSG_FIRSTHDR(&msg);
104+
if (cmsg == NULL) {
105+
return -1;
106+
}
107+
108+
if (cmsg->cmsg_level != SOL_IP || cmsg->cmsg_type != IP_RECVERR) {
109+
return -1;
110+
}
111+
112+
struct sock_extended_err *serr = (struct sock_extended_err *)CMSG_DATA(cmsg);
113+
if (serr && serr->ee_origin == SO_EE_ORIGIN_ZEROCOPY) {
114+
return len;
115+
} else {
116+
-1;
117+
}
118+
119+
return ret;
120+
# endif
121+
#else
122+
return ::send(fd, buf, len, flags);
123+
#endif
124+
}
125+
53126
bool ostream::fflush()
54127
{
55128
if (acl_vstream_fflush(stream_) == ACL_VSTREAM_EOF) {

0 commit comments

Comments
 (0)