From b27b4f66fdff42063461676e9d3c9f0ff2bb7c72 Mon Sep 17 00:00:00 2001 From: Mo Chen Date: Tue, 13 Jun 2023 15:02:34 -0500 Subject: [PATCH] Untangle some classes in iocore - Isolate the following classes into their own files - NetEvent - NetHandler - NetVCOptions - PollCont - AcceptOptions - Fix some includes other files to compile cleanly --- iocore/dns/P_DNSConnection.h | 1 + iocore/eventsystem/I_ProxyAllocator.h | 1 + iocore/eventsystem/I_Thread.h | 7 +- iocore/net/AcceptOptions.cc | 49 ++ iocore/net/AcceptOptions.h | 98 ++++ iocore/net/CMakeLists.txt | 5 + iocore/net/EventIO.cc | 228 +++++++++ iocore/net/EventIO.h | 126 +++++ iocore/net/I_NetProcessor.h | 71 +-- iocore/net/I_NetVConnection.h | 352 +------------- iocore/net/I_UDPNet.h | 1 + iocore/net/Makefile.am | 34 +- iocore/net/NetEvent.h | 2 + iocore/net/NetHandler.cc | 635 ++++++++++++++++++++++++++ iocore/net/NetHandler.h | 235 ++++++++++ iocore/net/NetVCOptions.cc | 71 +++ iocore/net/NetVCOptions.h | 322 +++++++++++++ iocore/net/P_NetAccept.h | 7 +- iocore/net/P_UDPNet.h | 2 +- iocore/net/P_UnixNet.h | 614 +------------------------ iocore/net/P_UnixPollDescriptor.h | 42 +- iocore/net/PollCont.cc | 95 ++++ iocore/net/PollCont.h | 43 ++ iocore/net/UnixNet.cc | 627 +------------------------ iocore/net/UnixNetProcessor.cc | 23 - iocore/net/UnixUDPNet.cc | 1 + iocore/net/test_I_UDPNet.cc | 1 + proxy/http/HttpDebugNames.cc | 1 + src/traffic_server/InkIOCoreAPI.cc | 1 + 29 files changed, 2010 insertions(+), 1685 deletions(-) create mode 100644 iocore/net/AcceptOptions.cc create mode 100644 iocore/net/AcceptOptions.h create mode 100644 iocore/net/EventIO.cc create mode 100644 iocore/net/EventIO.h create mode 100644 iocore/net/NetHandler.cc create mode 100644 iocore/net/NetHandler.h create mode 100644 iocore/net/NetVCOptions.cc create mode 100644 iocore/net/NetVCOptions.h create mode 100644 iocore/net/PollCont.cc create mode 100644 iocore/net/PollCont.h diff --git a/iocore/dns/P_DNSConnection.h b/iocore/dns/P_DNSConnection.h index 2a1e599f1a8..ed372eb9c2b 100644 --- a/iocore/dns/P_DNSConnection.h +++ b/iocore/dns/P_DNSConnection.h @@ -32,6 +32,7 @@ #include "I_EventSystem.h" #include "I_DNSProcessor.h" +#include "EventIO.h" // // Connection diff --git a/iocore/eventsystem/I_ProxyAllocator.h b/iocore/eventsystem/I_ProxyAllocator.h index a5320de7447..851de14624e 100644 --- a/iocore/eventsystem/I_ProxyAllocator.h +++ b/iocore/eventsystem/I_ProxyAllocator.h @@ -34,6 +34,7 @@ #include #include "tscore/ink_platform.h" +#include "tscore/Allocator.h" class EThread; diff --git a/iocore/eventsystem/I_Thread.h b/iocore/eventsystem/I_Thread.h index 43acbde76b5..30ea6e77d45 100644 --- a/iocore/eventsystem/I_Thread.h +++ b/iocore/eventsystem/I_Thread.h @@ -58,15 +58,12 @@ #pragma once -#if !defined(_I_EventSystem_h) && !defined(_P_EventSystem_h) -#error "include I_EventSystem.h or P_EventSystem.h" -#endif - #include +#include "I_ProxyAllocator.h" +#include "tscore/Ptr.h" #include "tscore/ink_platform.h" #include "tscore/ink_thread.h" -#include "I_ProxyAllocator.h" class ProxyMutex; diff --git a/iocore/net/AcceptOptions.cc b/iocore/net/AcceptOptions.cc new file mode 100644 index 00000000000..e1fc4bebaad --- /dev/null +++ b/iocore/net/AcceptOptions.cc @@ -0,0 +1,49 @@ +/** @file + + Asynchronous networking API + + @section license License + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + + */ + +#include "AcceptOptions.h" +#include "I_Net.h" + +AcceptOptions & +AcceptOptions::reset() +{ + local_port = 0; + local_ip.invalidate(); + accept_threads = -1; + ip_family = AF_INET; + etype = ET_NET; + localhost_only = false; + frequent_accept = true; + recv_bufsize = 0; + send_bufsize = 0; + sockopt_flags = 0; + packet_mark = 0; + packet_tos = 0; + packet_notsent_lowat = 0; + tfo_queue_length = 0; + f_inbound_transparent = false; + f_mptcp = false; + f_proxy_protocol = false; + return *this; +} diff --git a/iocore/net/AcceptOptions.h b/iocore/net/AcceptOptions.h new file mode 100644 index 00000000000..22f9bc960db --- /dev/null +++ b/iocore/net/AcceptOptions.h @@ -0,0 +1,98 @@ +/** @file + + This file implements an I/O Processor for network I/O + + @section license License + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + + */ + +#pragma once + +#include "tscore/ink_inet.h" +#include "I_Event.h" + +struct AcceptOptions { + using self = AcceptOptions; ///< Self reference type. + + /// Port on which to listen. + /// 0 => don't care, which is useful if the socket is already bound. + int local_port; + /// Local address to bind for accept. + /// If not set -> any address. + IpAddr local_ip; + /// IP address family. + /// @note Ignored if an explicit incoming address is set in the + /// the configuration (@c local_ip). If neither is set IPv4 is used. + int ip_family; + /// Should we use accept threads? If so, how many? + int accept_threads; + /// Event type to generate on accept. + EventType etype; + /** If @c true, the continuation is called back with + @c NET_EVENT_ACCEPT_SUCCEED + or @c NET_EVENT_ACCEPT_FAILED on success and failure resp. + */ + + bool localhost_only; + /// Are frequent accepts expected? + /// Default: @c false. + bool frequent_accept; + + /// Socket receive buffer size. + /// 0 => OS default. + int recv_bufsize; + /// Socket transmit buffer size. + /// 0 => OS default. + int send_bufsize; + /// defer accept for @c sockopt. + /// 0 => OS default. + int defer_accept; + /// Socket options for @c sockopt. + /// 0 => do not set options. + uint32_t sockopt_flags; + uint32_t packet_mark; + uint32_t packet_tos; + uint32_t packet_notsent_lowat; + + int tfo_queue_length; + + /** Transparency on client (user agent) connection. + @internal This is irrelevant at a socket level (since inbound + transparency must be set up when the listen socket is created) + but it's critical that the connection handling logic knows + whether the inbound (client / user agent) connection is + transparent. + */ + bool f_inbound_transparent; + + /** MPTCP enabled on listener. + @internal For logging and metrics purposes to know whether the + listener enabled MPTCP or not. + */ + bool f_mptcp; + + /// Proxy Protocol enabled + bool f_proxy_protocol; + + /// Default constructor. + /// Instance is constructed with default values. + AcceptOptions() { this->reset(); } + /// Reset all values to defaults. + self &reset(); +}; diff --git a/iocore/net/CMakeLists.txt b/iocore/net/CMakeLists.txt index 7aed806c984..618f2e49d40 100644 --- a/iocore/net/CMakeLists.txt +++ b/iocore/net/CMakeLists.txt @@ -17,14 +17,19 @@ add_library(inknet STATIC + AcceptOptions.cc ALPNSupport.cc BIO_fastopen.cc BoringSSLUtils.cc Connection.cc + EventIO.cc Inline.cc YamlSNIConfig.cc Net.cc + NetHandler.cc + NetVCOptions.cc NetVConnection.cc + PollCont.cc ProxyProtocol.cc Socks.cc SSLCertLookup.cc diff --git a/iocore/net/EventIO.cc b/iocore/net/EventIO.cc new file mode 100644 index 00000000000..7c4b7dafe39 --- /dev/null +++ b/iocore/net/EventIO.cc @@ -0,0 +1,228 @@ +/** @file + + A brief file description + + @section license License + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ + +#include "EventIO.h" +#include "tscore/ink_assert.h" +#include "P_Net.h" +#include "P_UnixNetProcessor.h" +#include "P_UnixNetVConnection.h" +#include "P_NetAccept.h" +#include "P_DNSConnection.h" +#include "P_UnixUDPConnection.h" +#include "P_UnixPollDescriptor.h" + +int +EventIO::start(EventLoop l, DNSConnection *vc, int events) +{ + type = EVENTIO_DNS_CONNECTION; + data.dnscon = vc; + return start_common(l, vc->fd, events); +} +int +EventIO::start(EventLoop l, NetAccept *vc, int events) +{ + type = EVENTIO_NETACCEPT; + data.na = vc; + return start_common(l, vc->server.fd, events); +} +int +EventIO::start(EventLoop l, NetEvent *ne, int events) +{ + type = EVENTIO_READWRITE_VC; + data.ne = ne; + return start_common(l, ne->get_fd(), events); +} + +int +EventIO::start(EventLoop l, UnixUDPConnection *vc, int events) +{ + type = EVENTIO_UDP_CONNECTION; + data.uc = vc; + return start_common(l, vc->fd, events); +} + +int +EventIO::start(EventLoop l, int afd, NetEvent *ne, int e) +{ + data.ne = ne; + return start_common(l, afd, e); +} + +int +EventIO::start_common(EventLoop l, int afd, int e) +{ + if (!this->syscall) { + return 0; + } + + fd = afd; + event_loop = l; +#if TS_USE_EPOLL + struct epoll_event ev; + memset(&ev, 0, sizeof(ev)); + ev.events = e | EPOLLEXCLUSIVE; + ev.data.ptr = this; +#ifndef USE_EDGE_TRIGGER + events = e; +#endif + return epoll_ctl(event_loop->epoll_fd, EPOLL_CTL_ADD, fd, &ev); +#endif +#if TS_USE_KQUEUE + events = e; + struct kevent ev[2]; + int n = 0; + if (e & EVENTIO_READ) { + EV_SET(&ev[n++], fd, EVFILT_READ, EV_ADD | INK_EV_EDGE_TRIGGER, 0, 0, this); + } + if (e & EVENTIO_WRITE) { + EV_SET(&ev[n++], fd, EVFILT_WRITE, EV_ADD | INK_EV_EDGE_TRIGGER, 0, 0, this); + } + return kevent(l->kqueue_fd, &ev[0], n, nullptr, 0, nullptr); +#endif +} + +int +EventIO::modify(int e) +{ + if (!this->syscall) { + return 0; + } + + ink_assert(event_loop); +#if TS_USE_EPOLL && !defined(USE_EDGE_TRIGGER) + struct epoll_event ev; + memset(&ev, 0, sizeof(ev)); + int new_events = events, old_events = events; + if (e < 0) + new_events &= ~(-e); + else + new_events |= e; + events = new_events; + ev.events = new_events; + ev.data.ptr = this; + if (!new_events) + return epoll_ctl(event_loop->epoll_fd, EPOLL_CTL_DEL, fd, &ev); + else if (!old_events) + return epoll_ctl(event_loop->epoll_fd, EPOLL_CTL_ADD, fd, &ev); + else + return epoll_ctl(event_loop->epoll_fd, EPOLL_CTL_MOD, fd, &ev); +#endif +#if TS_USE_KQUEUE && !defined(USE_EDGE_TRIGGER) + int n = 0; + struct kevent ev[2]; + int ee = events; + if (e < 0) { + ee &= ~(-e); + if ((-e) & EVENTIO_READ) + EV_SET(&ev[n++], fd, EVFILT_READ, EV_DELETE, 0, 0, this); + if ((-e) & EVENTIO_WRITE) + EV_SET(&ev[n++], fd, EVFILT_WRITE, EV_DELETE, 0, 0, this); + } else { + ee |= e; + if (e & EVENTIO_READ) + EV_SET(&ev[n++], fd, EVFILT_READ, EV_ADD | INK_EV_EDGE_TRIGGER, 0, 0, this); + if (e & EVENTIO_WRITE) + EV_SET(&ev[n++], fd, EVFILT_WRITE, EV_ADD | INK_EV_EDGE_TRIGGER, 0, 0, this); + } + events = ee; + if (n) + return kevent(event_loop->kqueue_fd, &ev[0], n, nullptr, 0, nullptr); + else + return 0; +#endif + (void)e; // ATS_UNUSED + return 0; +} + +int +EventIO::refresh(int e) +{ + if (!this->syscall) { + return 0; + } + + ink_assert(event_loop); +#if TS_USE_KQUEUE && defined(USE_EDGE_TRIGGER) + e = e & events; + struct kevent ev[2]; + int n = 0; + if (e & EVENTIO_READ) { + EV_SET(&ev[n++], fd, EVFILT_READ, EV_ADD | INK_EV_EDGE_TRIGGER, 0, 0, this); + } + if (e & EVENTIO_WRITE) { + EV_SET(&ev[n++], fd, EVFILT_WRITE, EV_ADD | INK_EV_EDGE_TRIGGER, 0, 0, this); + } + if (n) { + return kevent(event_loop->kqueue_fd, &ev[0], n, nullptr, 0, nullptr); + } else { + return 0; + } +#endif + (void)e; // ATS_UNUSED + return 0; +} + +int +EventIO::stop() +{ + if (!this->syscall) { + return 0; + } + if (event_loop) { + int retval = 0; +#if TS_USE_EPOLL + struct epoll_event ev; + memset(&ev, 0, sizeof(struct epoll_event)); + ev.events = EPOLLIN | EPOLLOUT | EPOLLET; + retval = epoll_ctl(event_loop->epoll_fd, EPOLL_CTL_DEL, fd, &ev); +#endif + event_loop = nullptr; + return retval; + } + return 0; +} + +int +EventIO::close() +{ + if (!this->syscall) { + return 0; + } + + stop(); + switch (type) { + default: + ink_assert(!"case"); + // fallthrough + case EVENTIO_DNS_CONNECTION: + return data.dnscon->close(); + break; + case EVENTIO_NETACCEPT: + return data.na->server.close(); + break; + case EVENTIO_READWRITE_VC: + return data.ne->close(); + break; + } + return -1; +} diff --git a/iocore/net/EventIO.h b/iocore/net/EventIO.h new file mode 100644 index 00000000000..c5136919707 --- /dev/null +++ b/iocore/net/EventIO.h @@ -0,0 +1,126 @@ +/** @file + + A brief file description + + @section license License + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ + +#pragma once +#include "P_UnixPollDescriptor.h" + +#define USE_EDGE_TRIGGER_EPOLL 1 +#define USE_EDGE_TRIGGER_KQUEUE 1 +#define USE_EDGE_TRIGGER_PORT 1 + +#define EVENTIO_NETACCEPT 1 +#define EVENTIO_READWRITE_VC 2 +#define EVENTIO_DNS_CONNECTION 3 +#define EVENTIO_UDP_CONNECTION 4 +#define EVENTIO_ASYNC_SIGNAL 5 +#define EVENTIO_IO_URING 6 + +#if TS_USE_EPOLL +#ifndef EPOLLEXCLUSIVE +#define EPOLLEXCLUSIVE 0 +#endif +#ifdef USE_EDGE_TRIGGER_EPOLL +#define USE_EDGE_TRIGGER 1 +#define EVENTIO_READ (EPOLLIN | EPOLLET) +#define EVENTIO_WRITE (EPOLLOUT | EPOLLET) +#else +#define EVENTIO_READ EPOLLIN +#define EVENTIO_WRITE EPOLLOUT +#endif +#define EVENTIO_ERROR (EPOLLERR | EPOLLPRI | EPOLLHUP) +#endif +#if TS_USE_KQUEUE +#ifdef USE_EDGE_TRIGGER_KQUEUE +#define USE_EDGE_TRIGGER 1 +#define INK_EV_EDGE_TRIGGER EV_CLEAR +#else +#define INK_EV_EDGE_TRIGGER 0 +#endif +#define EVENTIO_READ INK_EVP_IN +#define EVENTIO_WRITE INK_EVP_OUT +#define EVENTIO_ERROR (0x010 | 0x002 | 0x020) // ERR PRI HUP +#endif + +struct PollDescriptor; +using EventLoop = PollDescriptor *; + +class NetEvent; +class UnixUDPConnection; +class DiskHandler; +struct DNSConnection; +struct NetAccept; + +/// Unified API for setting and clearing kernel and epoll events. +struct EventIO { + int fd = -1; ///< file descriptor, often a system port +#if TS_USE_KQUEUE || TS_USE_EPOLL && !defined(USE_EDGE_TRIGGER) + int events = 0; ///< a bit mask of enabled events +#endif + EventLoop event_loop = nullptr; ///< the assigned event loop + bool syscall = true; ///< if false, disable all functionality (for QUIC) + int type = 0; ///< class identifier of union data. + union { + void *untyped; + NetEvent *ne; + DNSConnection *dnscon; + NetAccept *na; + UnixUDPConnection *uc; + DiskHandler *dh; + } data; ///< a kind of continuation + + /** The start methods all logically Setup a class to be called + when a file descriptor is available for read or write. + The type of the classes vary. Generally the file descriptor + is pulled from the class, but there is one option that lets + the file descriptor be expressed directly. + @param l the event loop + @param events a mask of flags (for details `man epoll_ctl`) + @return int the number of events created, -1 is error + */ + int start(EventLoop l, DNSConnection *vc, int events); + int start(EventLoop l, NetAccept *vc, int events); + int start(EventLoop l, NetEvent *ne, int events); + int start(EventLoop l, UnixUDPConnection *vc, int events); + int start(EventLoop l, int fd, NetEvent *ne, int events); + int start_common(EventLoop l, int fd, int events); + + /** Alter the events that will trigger the continuation, for level triggered I/O. + @param events add with positive mask(+EVENTIO_READ), or remove with negative mask (-EVENTIO_READ) + @return int the number of events created, -1 is error + */ + int modify(int events); + + /** Refresh the existing events (i.e. KQUEUE EV_CLEAR), for edge triggered I/O + @param events mask of events + @return int the number of events created, -1 is error + */ + int refresh(int events); + + /// Remove the kernel or epoll event. Returns 0 on success. + int stop(); + + /// Remove the epoll event and close the connection. Returns 0 on success. + int close(); + + EventIO() { data.untyped = nullptr; } +}; diff --git a/iocore/net/I_NetProcessor.h b/iocore/net/I_NetProcessor.h index b58038358cf..0ac2c0e790b 100644 --- a/iocore/net/I_NetProcessor.h +++ b/iocore/net/I_NetProcessor.h @@ -27,6 +27,7 @@ #include "I_EventSystem.h" #include "I_Socks.h" #include "I_NetVConnection.h" +#include "AcceptOptions.h" struct socks_conf_struct; #define NET_CONNECT_TIMEOUT 30 @@ -43,75 +44,7 @@ class NetProcessor : public Processor public: /** Options for @c accept. */ - struct AcceptOptions { - using self = AcceptOptions; ///< Self reference type. - - /// Port on which to listen. - /// 0 => don't care, which is useful if the socket is already bound. - int local_port; - /// Local address to bind for accept. - /// If not set -> any address. - IpAddr local_ip; - /// IP address family. - /// @note Ignored if an explicit incoming address is set in the - /// the configuration (@c local_ip). If neither is set IPv4 is used. - int ip_family; - /// Should we use accept threads? If so, how many? - int accept_threads; - /// Event type to generate on accept. - EventType etype; - /** If @c true, the continuation is called back with - @c NET_EVENT_ACCEPT_SUCCEED - or @c NET_EVENT_ACCEPT_FAILED on success and failure resp. - */ - - bool localhost_only; - /// Are frequent accepts expected? - /// Default: @c false. - bool frequent_accept; - - /// Socket receive buffer size. - /// 0 => OS default. - int recv_bufsize; - /// Socket transmit buffer size. - /// 0 => OS default. - int send_bufsize; - /// defer accept for @c sockopt. - /// 0 => OS default. - int defer_accept; - /// Socket options for @c sockopt. - /// 0 => do not set options. - uint32_t sockopt_flags; - uint32_t packet_mark; - uint32_t packet_tos; - uint32_t packet_notsent_lowat; - - int tfo_queue_length; - - /** Transparency on client (user agent) connection. - @internal This is irrelevant at a socket level (since inbound - transparency must be set up when the listen socket is created) - but it's critical that the connection handling logic knows - whether the inbound (client / user agent) connection is - transparent. - */ - bool f_inbound_transparent; - - /** MPTCP enabled on listener. - @internal For logging and metrics purposes to know whether the - listener enabled MPTCP or not. - */ - bool f_mptcp; - - /// Proxy Protocol enabled - bool f_proxy_protocol; - - /// Default constructor. - /// Instance is constructed with default values. - AcceptOptions() { this->reset(); } - /// Reset all values to defaults. - self &reset(); - }; + using AcceptOptions = ::AcceptOptions; /** Accept connections on a port. diff --git a/iocore/net/I_NetVConnection.h b/iocore/net/I_NetVConnection.h index 9a5d8117467..f16b5e84d12 100644 --- a/iocore/net/I_NetVConnection.h +++ b/iocore/net/I_NetVConnection.h @@ -23,6 +23,7 @@ */ #pragma once +#include "NetVCOptions.h" #include "ProxyProtocol.h" #include "I_Net.h" @@ -53,357 +54,6 @@ typedef enum { NET_VCONNECTION_OUT, // ATS <--> Server, Server-Side } NetVConnectionContext_t; -/** Holds client options for NetVConnection. - - This class holds various options a user can specify for - NetVConnection. Various clients need many slightly different - features. This is an attempt to prevent out of control growth of - the connection method signatures. Only options of interest need to - be explicitly set -- the rest get sensible default values. - - @note Binding addresses is a bit complex. It is not currently - possible to bind indiscriminately across protocols, which means - any connection must commit to IPv4 or IPv6. For this reason the - connection logic will look at the address family of @a local_addr - even if @a addr_binding is @c ANY_ADDR and bind to any address in - that protocol. If it's not an IP protocol, IPv4 will be used. -*/ -struct NetVCOptions { - using self = NetVCOptions; ///< Self reference type. - - /// Values for valid IP protocols. - enum ip_protocol_t { - USE_TCP, ///< TCP protocol. - USE_UDP ///< UDP protocol. - }; - - /// IP (TCP or UDP) protocol to use on socket. - ip_protocol_t ip_proto; - - /** IP address family. - - This is used for inbound connections only if @c local_ip is not - set, which is sometimes more convenient for the client. This - defaults to @c AF_INET so if the client sets neither this nor @c - local_ip then IPv4 is used. - - For outbound connections this is ignored and the family of the - remote address used. - - @note This is (inconsistently) called "domain" and "protocol" in - other places. "family" is used here because that's what the - standard IP data structures use. - - */ - uint16_t ip_family; - - /** The set of ways in which the local address should be bound. - - The protocol is set by the contents of @a local_addr regardless - of this value. @c ANY_ADDR will override only the address. - - @note The difference between @c INTF_ADDR and @c FOREIGN_ADDR is - whether transparency is enabled on the socket. It is the - client's responsibility to set this correctly based on whether - the address in @a local_addr is associated with an interface on - the local system ( @c INTF_ADDR ) or is owned by a foreign - system ( @c FOREIGN_ADDR ). A binding style of @c ANY_ADDR - causes the value in @a local_addr to be ignored. - - The IP address and port are separate because most clients treat - these independently. For the same reason @c IpAddr is used - to be clear that it contains no port data. - - @see local_addr - @see addr_binding - */ - enum addr_bind_style { - ANY_ADDR, ///< Bind to any available local address (don't care, default). - INTF_ADDR, ///< Bind to interface address in @a local_addr. - FOREIGN_ADDR ///< Bind to foreign address in @a local_addr. - }; - - /** Local address for the connection. - - For outbound connections this must have the same family as the - remote address (which is not stored in this structure). For - inbound connections the family of this value overrides @a - ip_family if set. - - @note Ignored if @a addr_binding is @c ANY_ADDR. - @see addr_binding - @see ip_family - */ - IpAddr local_ip; - - /** Local port for connection. - Set to 0 for "don't care" (default). - */ - uint16_t local_port; - - /// How to bind the local address. - /// @note Default is @c ANY_ADDR. - addr_bind_style addr_binding; - - /// Make the socket blocking on I/O (default: @c false) - bool f_blocking; - /// Make socket block on connect (default: @c false) - bool f_blocking_connect; - - // Use TCP Fast Open on this socket. The connect(2) call will be omitted. - bool f_tcp_fastopen = false; - - bool tls_upstream = false; - - /// Control use of SOCKS. - /// Set to @c NO_SOCKS to disable use of SOCKS. Otherwise SOCKS is - /// used if available. - unsigned char socks_support; - /// Version of SOCKS to use. - unsigned char socks_version; - - int socket_recv_bufsize; - int socket_send_bufsize; - - /// Configuration options for sockets. - /// @note These are not identical to internal socket options but - /// specifically defined for configuration. These are mask values - /// and so must be powers of 2. - uint32_t sockopt_flags; - /// Value for TCP no delay for @c sockopt_flags. - static uint32_t const SOCK_OPT_NO_DELAY = 1; - /// Value for keep alive for @c sockopt_flags. - static uint32_t const SOCK_OPT_KEEP_ALIVE = 2; - /// Value for linger on for @c sockopt_flags - static uint32_t const SOCK_OPT_LINGER_ON = 4; - /// Value for TCP Fast open @c sockopt_flags - static uint32_t const SOCK_OPT_TCP_FAST_OPEN = 8; - /// Value for SO_MARK @c sockopt_flags - static uint32_t const SOCK_OPT_PACKET_MARK = 16; - /// Value for IP_TOS @c sockopt_flags - static uint32_t const SOCK_OPT_PACKET_TOS = 32; - /// Value for TCP_NOTSENT_LOWAT @c sockopt_flags - static uint32_t const SOCK_OPT_TCP_NOTSENT_LOWAT = 64; - - uint32_t packet_mark; - uint32_t packet_tos; - uint32_t packet_notsent_lowat; - - EventType etype; - - /** ALPN protocol-lists. The format is OpenSSL protocol-lists format (vector of 8-bit length-prefixed, byte strings) - https://www.openssl.org/docs/manmaster/man3/SSL_CTX_set_alpn_protos.html - */ - std::string_view alpn_protos; - /** Server name to use for SNI data on an outbound connection. - */ - ats_scoped_str sni_servername; - /** FQDN used to connect to the origin. May be different - * than sni_servername if pristine host headers are used - */ - ats_scoped_str ssl_servername; - - /** Server host name from client's request to use for SNI data on an outbound connection. - */ - ats_scoped_str sni_hostname; - - /** Outbound sni policy which overrides proxy.ssl.client.sni_policy - */ - ats_scoped_str outbound_sni_policy; - - /** - * Client certificate to use in response to OS's certificate request - */ - ats_scoped_str ssl_client_cert_name; - /* - * File containing private key matching certificate - */ - const char *ssl_client_private_key_name = nullptr; - /* - * File containing CA certs for verifying origin's cert - */ - const char *ssl_client_ca_cert_name = nullptr; - /* - * Directory containing CA certs for verifying origin's cert - */ - const char *ssl_client_ca_cert_path = nullptr; - - unsigned char alpn_protocols_array[MAX_ALPN_STRING]; - int alpn_protocols_array_size = 0; - - /** - * Set to DISABLED, PERFMISSIVE, or ENFORCED - * Controls how the server certificate verification is handled - */ - YamlSNIConfig::Policy verifyServerPolicy = YamlSNIConfig::Policy::DISABLED; - - /** - * Bit mask of which features of the server certificate should be checked - * Currently SIGNATURE and NAME - */ - YamlSNIConfig::Property verifyServerProperties = YamlSNIConfig::Property::NONE; - - /// Reset all values to defaults. - void reset(); - - void set_sock_param(int _recv_bufsize, int _send_bufsize, unsigned long _opt_flags, unsigned long _packet_mark = 0, - unsigned long _packet_tos = 0, unsigned long _packet_notsent_lowat = 0); - - NetVCOptions() { reset(); } - ~NetVCOptions() {} - - /** Set the SNI server name. - A local copy is made of @a name. - */ - self & - set_sni_servername(const char *name, size_t len) - { - IpEndpoint ip; - - // Literal IPv4 and IPv6 addresses are not permitted in "HostName".(rfc6066#section-3) - if (name && len && ats_ip_pton(std::string_view(name, len), &ip) != 0) { - sni_servername = ats_strndup(name, len); - } else { - sni_servername = nullptr; - } - return *this; - } - - self & - set_ssl_client_cert_name(const char *name) - { - if (name) { - ssl_client_cert_name = ats_strdup(name); - } else { - ssl_client_cert_name = nullptr; - } - return *this; - } - - self & - set_ssl_servername(const char *name) - { - if (name) { - ssl_servername = ats_strdup(name); - } else { - ssl_servername = nullptr; - } - return *this; - } - - self & - set_sni_hostname(const char *name, size_t len) - { - IpEndpoint ip; - - // Literal IPv4 and IPv6 addresses are not permitted in "HostName".(rfc6066#section-3) - if (name && len && ats_ip_pton(std::string_view(name, len), &ip) != 0) { - sni_hostname = ats_strndup(name, len); - } else { - sni_hostname = nullptr; - } - return *this; - } - - self & - operator=(self const &that) - { - if (&that != this) { - /* - * It is odd but necessary to null the scoped string pointer here - * and then explicitly call release on them in the string assignments - * below. - * We a memcpy from that to this. This will put that's string pointers into - * this's memory. Therefore we must first explicitly null out - * this's original version of the string. The release after the - * memcpy removes the extra reference to that's copy of the string - * Removing the release will eventually cause a double free crash - */ - sni_servername = nullptr; // release any current name. - ssl_servername = nullptr; - sni_hostname = nullptr; - ssl_client_cert_name = nullptr; - memcpy(static_cast(this), &that, sizeof(self)); - if (that.sni_servername) { - sni_servername.release(); // otherwise we'll free the source string. - this->sni_servername = ats_strdup(that.sni_servername); - } - if (that.ssl_servername) { - ssl_servername.release(); // otherwise we'll free the source string. - this->ssl_servername = ats_strdup(that.ssl_servername); - } - if (that.sni_hostname) { - sni_hostname.release(); // otherwise we'll free the source string. - this->sni_hostname = ats_strdup(that.sni_hostname); - } - if (that.ssl_client_cert_name) { - this->ssl_client_cert_name.release(); // otherwise we'll free the source string. - this->ssl_client_cert_name = ats_strdup(that.ssl_client_cert_name); - } - } - return *this; - } - - std::string_view get_family_string() const; - - std::string_view get_proto_string() const; - - /// @name Debugging - //@{ - /// Convert @a s to its string equivalent. - static const char *toString(addr_bind_style s); - //@} - - // noncopyable - NetVCOptions(const NetVCOptions &) = delete; -}; - -inline void -NetVCOptions::reset() -{ - ip_proto = USE_TCP; - ip_family = AF_INET; - local_ip.invalidate(); - local_port = 0; - addr_binding = ANY_ADDR; - f_blocking = false; - f_blocking_connect = false; - socks_support = NORMAL_SOCKS; - socks_version = SOCKS_DEFAULT_VERSION; - socket_recv_bufsize = -#if defined(RECV_BUF_SIZE) - RECV_BUF_SIZE; -#else - 0; -#endif - socket_send_bufsize = 0; - sockopt_flags = 0; - packet_mark = 0; - packet_tos = 0; - packet_notsent_lowat = 0; - - etype = ET_NET; - - sni_servername = nullptr; - ssl_servername = nullptr; - sni_hostname = nullptr; - ssl_client_cert_name = nullptr; - ssl_client_private_key_name = nullptr; - outbound_sni_policy = nullptr; -} - -inline void -NetVCOptions::set_sock_param(int _recv_bufsize, int _send_bufsize, unsigned long _opt_flags, unsigned long _packet_mark, - unsigned long _packet_tos, unsigned long _packet_notsent_lowat) -{ - socket_recv_bufsize = _recv_bufsize; - socket_send_bufsize = _send_bufsize; - sockopt_flags = _opt_flags; - packet_mark = _packet_mark; - packet_tos = _packet_tos; - packet_notsent_lowat = _packet_notsent_lowat; -} - /** A VConnection for a network socket. Abstraction for a net connection. Similar to a socket descriptor VConnections are IO handles to diff --git a/iocore/net/I_UDPNet.h b/iocore/net/I_UDPNet.h index a08d01a9436..fd536323a60 100644 --- a/iocore/net/I_UDPNet.h +++ b/iocore/net/I_UDPNet.h @@ -34,6 +34,7 @@ #include "tscore/I_Version.h" #include "I_EventSystem.h" #include "tscore/ink_inet.h" +#include "NetVCOptions.h" /** UDP service diff --git a/iocore/net/Makefile.am b/iocore/net/Makefile.am index b567a359c62..e27c8e2f0da 100644 --- a/iocore/net/Makefile.am +++ b/iocore/net/Makefile.am @@ -132,12 +132,16 @@ test_libinknet_LDADD += \ endif libinknet_a_SOURCES = \ + AcceptOptions.cc \ + AcceptOptions.h \ ALPNSupport.cc \ BIO_fastopen.cc \ BIO_fastopen.h \ BoringSSLUtils.cc \ BoringSSLUtils.h \ Connection.cc \ + EventIO.h \ + EventIO.cc \ I_Net.h \ I_NetProcessor.h \ I_NetVConnection.h \ @@ -150,6 +154,10 @@ libinknet_a_SOURCES = \ YamlSNIConfig.h \ YamlSNIConfig.cc \ Net.cc \ + NetHandler.h \ + NetHandler.cc \ + NetVCOptions.h \ + NetVCOptions.cc \ NetVConnection.cc \ P_ALPNSupport.h \ P_SNIActionPerformer.h \ @@ -182,6 +190,8 @@ libinknet_a_SOURCES = \ P_UnixNetVConnection.h \ P_UnixPollDescriptor.h \ P_UnixUDPConnection.h \ + PollCont.h \ + PollCont.cc \ ProxyProtocol.h \ ProxyProtocol.cc \ Socks.cc \ @@ -225,18 +235,18 @@ libinknet_a_SOURCES = \ if ENABLE_QUIC libinknet_a_SOURCES += \ - P_QUICClosedConCollector.h \ - P_QUICPacketHandler.h \ - P_QUICNetProcessor.h \ - P_QUICNetVConnection.h \ - P_QUICNextProtocolAccept.h \ - QUICClosedConCollector.cc \ - QUICMultiCertConfigLoader.cc \ - QUICNet.cc \ - QUICNetProcessor_quiche.cc \ - QUICNetVConnection_quiche.cc \ - QUICNextProtocolAccept_quiche.cc \ - QUICPacketHandler_quiche.cc + P_QUICClosedConCollector.h \ + P_QUICPacketHandler.h \ + P_QUICNetProcessor.h \ + P_QUICNetVConnection.h \ + P_QUICNextProtocolAccept.h \ + QUICClosedConCollector.cc \ + QUICMultiCertConfigLoader.cc \ + QUICNet.cc \ + QUICNetProcessor_quiche.cc \ + QUICNetVConnection_quiche.cc \ + QUICNextProtocolAccept_quiche.cc \ + QUICPacketHandler_quiche.cc endif if BUILD_TESTS diff --git a/iocore/net/NetEvent.h b/iocore/net/NetEvent.h index c535374a42a..552d9878e02 100644 --- a/iocore/net/NetEvent.h +++ b/iocore/net/NetEvent.h @@ -25,7 +25,9 @@ #include +#include "EventIO.h" #include "I_EventSystem.h" +#include "P_UnixNetState.h" class NetHandler; diff --git a/iocore/net/NetHandler.cc b/iocore/net/NetHandler.cc new file mode 100644 index 00000000000..330463374dc --- /dev/null +++ b/iocore/net/NetHandler.cc @@ -0,0 +1,635 @@ +/** @file + + A brief file description + + @section license License + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ + +#include "NetHandler.h" + +#if TS_USE_LINUX_IO_URING +#include "I_IO_URING.h" +#endif + +#include "P_DNSConnection.h" +#include "P_Net.h" +#include "P_UnixNet.h" +#include "P_UnixNetProcessor.h" +#include "PollCont.h" + +using namespace std::literals; + +// NetHandler method definitions + +NetHandler::NetHandler() : Continuation(nullptr) +{ + SET_HANDLER(&NetHandler::mainNetEvent); +} + +int +NetHandler::startIO(NetEvent *ne) +{ + ink_assert(this->mutex->thread_holding == this_ethread()); + ink_assert(ne->get_thread() == this_ethread()); + int res = 0; + + PollDescriptor *pd = get_PollDescriptor(this->thread); + if (ne->ep.start(pd, ne, EVENTIO_READ | EVENTIO_WRITE) < 0) { + res = errno; + // EEXIST should be ok, though it should have been cleared before we got back here + if (errno != EEXIST) { + Debug("iocore_net", "NetHandler::startIO : failed on EventIO::start, errno = [%d](%s)", errno, strerror(errno)); + return -res; + } + } + + if (ne->read.triggered == 1) { + read_ready_list.enqueue(ne); + } + ne->nh = this; + return res; +} + +void +NetHandler::stopIO(NetEvent *ne) +{ + ink_release_assert(ne->nh == this); + + ne->ep.stop(); + + read_ready_list.remove(ne); + write_ready_list.remove(ne); + if (ne->read.in_enabled_list) { + read_enable_list.remove(ne); + ne->read.in_enabled_list = 0; + } + if (ne->write.in_enabled_list) { + write_enable_list.remove(ne); + ne->write.in_enabled_list = 0; + } + + ne->nh = nullptr; +} + +void +NetHandler::startCop(NetEvent *ne) +{ + ink_assert(this->mutex->thread_holding == this_ethread()); + ink_release_assert(ne->nh == this); + ink_assert(!open_list.in(ne)); + + open_list.enqueue(ne); +} + +void +NetHandler::stopCop(NetEvent *ne) +{ + ink_release_assert(ne->nh == this); + + open_list.remove(ne); + cop_list.remove(ne); + remove_from_keep_alive_queue(ne); + remove_from_active_queue(ne); +} + +int +NetHandler::update_nethandler_config(const char *str, RecDataT, RecData data, void *) +{ + uint32_t *updated_member = nullptr; // direct pointer to config member for update. + std::string_view name{str}; + + if (name == "proxy.config.net.max_connections_in"sv) { + updated_member = &NetHandler::global_config.max_connections_in; + Debug("net_queue", "proxy.config.net.max_connections_in updated to %" PRId64, data.rec_int); + } else if (name == "proxy.config.net.max_requests_in"sv) { + updated_member = &NetHandler::global_config.max_requests_in; + Debug("net_queue", "proxy.config.net.max_requests_in updated to %" PRId64, data.rec_int); + } else if (name == "proxy.config.net.inactive_threshold_in"sv) { + updated_member = &NetHandler::global_config.inactive_threshold_in; + Debug("net_queue", "proxy.config.net.inactive_threshold_in updated to %" PRId64, data.rec_int); + } else if (name == "proxy.config.net.transaction_no_activity_timeout_in"sv) { + updated_member = &NetHandler::global_config.transaction_no_activity_timeout_in; + Debug("net_queue", "proxy.config.net.transaction_no_activity_timeout_in updated to %" PRId64, data.rec_int); + } else if (name == "proxy.config.net.keep_alive_no_activity_timeout_in"sv) { + updated_member = &NetHandler::global_config.keep_alive_no_activity_timeout_in; + Debug("net_queue", "proxy.config.net.keep_alive_no_activity_timeout_in updated to %" PRId64, data.rec_int); + } else if (name == "proxy.config.net.default_inactivity_timeout"sv) { + updated_member = &NetHandler::global_config.default_inactivity_timeout; + Debug("net_queue", "proxy.config.net.default_inactivity_timeout updated to %" PRId64, data.rec_int); + } + + if (updated_member) { + *updated_member = data.rec_int; // do the actual update. + // portable form of the update, an index converted to so it can be passed as an event cookie. + void *idx = reinterpret_cast(static_cast(updated_member - &global_config[0])); + // Signal the NetHandler instances, passing the index of the updated config value. + for (int i = 0; i < eventProcessor.n_thread_groups; ++i) { + if (!active_thread_types[i]) { + continue; + } + for (EThread **tp = eventProcessor.thread_group[i]._thread, + **limit = eventProcessor.thread_group[i]._thread + eventProcessor.thread_group[i]._count; + tp < limit; ++tp) { + NetHandler *nh = get_NetHandler(*tp); + if (nh) { + nh->thread->schedule_imm(nh, TS_EVENT_MGMT_UPDATE, idx); + } + } + } + } + + return REC_ERR_OKAY; +} + +void +NetHandler::init_for_process() +{ + // read configuration values and setup callbacks for when they change + REC_ReadConfigInt32(global_config.max_connections_in, "proxy.config.net.max_connections_in"); + REC_ReadConfigInt32(global_config.max_requests_in, "proxy.config.net.max_requests_in"); + REC_ReadConfigInt32(global_config.inactive_threshold_in, "proxy.config.net.inactive_threshold_in"); + REC_ReadConfigInt32(global_config.transaction_no_activity_timeout_in, "proxy.config.net.transaction_no_activity_timeout_in"); + REC_ReadConfigInt32(global_config.keep_alive_no_activity_timeout_in, "proxy.config.net.keep_alive_no_activity_timeout_in"); + REC_ReadConfigInt32(global_config.default_inactivity_timeout, "proxy.config.net.default_inactivity_timeout"); + + RecRegisterConfigUpdateCb("proxy.config.net.max_connections_in", update_nethandler_config, nullptr); + RecRegisterConfigUpdateCb("proxy.config.net.max_requests_in", update_nethandler_config, nullptr); + RecRegisterConfigUpdateCb("proxy.config.net.inactive_threshold_in", update_nethandler_config, nullptr); + RecRegisterConfigUpdateCb("proxy.config.net.transaction_no_activity_timeout_in", update_nethandler_config, nullptr); + RecRegisterConfigUpdateCb("proxy.config.net.keep_alive_no_activity_timeout_in", update_nethandler_config, nullptr); + RecRegisterConfigUpdateCb("proxy.config.net.default_inactivity_timeout", update_nethandler_config, nullptr); + + Debug("net_queue", "proxy.config.net.max_connections_in updated to %d", global_config.max_connections_in); + Debug("net_queue", "proxy.config.net.max_requests_in updated to %d", global_config.max_requests_in); + Debug("net_queue", "proxy.config.net.inactive_threshold_in updated to %d", global_config.inactive_threshold_in); + Debug("net_queue", "proxy.config.net.transaction_no_activity_timeout_in updated to %d", + global_config.transaction_no_activity_timeout_in); + Debug("net_queue", "proxy.config.net.keep_alive_no_activity_timeout_in updated to %d", + global_config.keep_alive_no_activity_timeout_in); + Debug("net_queue", "proxy.config.net.default_inactivity_timeout updated to %d", global_config.default_inactivity_timeout); +} + +// +// Function used to release a NetEvent and free it. +// +void +NetHandler::free_netevent(NetEvent *ne) +{ + EThread *t = this->thread; + + ink_assert(t == this_ethread()); + ink_release_assert(ne->get_thread() == t); + ink_release_assert(ne->nh == this); + + // Release ne from InactivityCop + stopCop(ne); + // Release ne from NetHandler + stopIO(ne); + // Clear and deallocate ne + ne->free(t); +} + +// +// Move VC's enabled on a different thread to the ready list +// +void +NetHandler::process_enabled_list() +{ + NetEvent *ne = nullptr; + + SListM(NetEvent, NetState, read, enable_link) rq(read_enable_list.popall()); + while ((ne = rq.pop())) { + ne->ep.modify(EVENTIO_READ); + ne->ep.refresh(EVENTIO_READ); + ne->read.in_enabled_list = 0; + if ((ne->read.enabled && ne->read.triggered) || ne->closed) { + read_ready_list.in_or_enqueue(ne); + } + } + + SListM(NetEvent, NetState, write, enable_link) wq(write_enable_list.popall()); + while ((ne = wq.pop())) { + ne->ep.modify(EVENTIO_WRITE); + ne->ep.refresh(EVENTIO_WRITE); + ne->write.in_enabled_list = 0; + if ((ne->write.enabled && ne->write.triggered) || ne->closed) { + write_ready_list.in_or_enqueue(ne); + } + } +} + +// +// Walk through the ready list +// +void +NetHandler::process_ready_list() +{ + NetEvent *ne = nullptr; + +#if defined(USE_EDGE_TRIGGER) + // NetEvent * + while ((ne = read_ready_list.dequeue())) { + // Initialize the thread-local continuation flags + set_cont_flags(ne->get_control_flags()); + if (ne->closed) { + free_netevent(ne); + } else if (ne->read.enabled && ne->read.triggered) { + ne->net_read_io(this, this->thread); + } else if (!ne->read.enabled) { + read_ready_list.remove(ne); + } + } + while ((ne = write_ready_list.dequeue())) { + set_cont_flags(ne->get_control_flags()); + if (ne->closed) { + free_netevent(ne); + } else if (ne->write.enabled && ne->write.triggered) { + ne->net_write_io(this, this->thread); + } else if (!ne->write.enabled) { + write_ready_list.remove(ne); + } + } +#else /* !USE_EDGE_TRIGGER */ + while ((ne = read_ready_list.dequeue())) { + set_cont_flags(ne->get_control_flags()); + if (ne->closed) + free_netevent(ne); + else if (ne->read.enabled && ne->read.triggered) + ne->net_read_io(this, this->thread); + else if (!ne->read.enabled) + ne->ep.modify(-EVENTIO_READ); + } + while ((ne = write_ready_list.dequeue())) { + set_cont_flags(ne->get_control_flags()); + if (ne->closed) + free_netevent(ne); + else if (ne->write.enabled && ne->write.triggered) + write_to_net(this, ne, this->thread); + else if (!ne->write.enabled) + ne->ep.modify(-EVENTIO_WRITE); + } +#endif /* !USE_EDGE_TRIGGER */ +} + +// +// The main event for NetHandler +int +NetHandler::mainNetEvent(int event, Event *e) +{ + if (TS_EVENT_MGMT_UPDATE == event) { + intptr_t idx = reinterpret_cast(e->cookie); + // Copy to the same offset in the instance struct. + config[idx] = global_config[idx]; + if (config_value_affects_per_thread_value[idx]) { + this->configure_per_thread_values(); + } + return EVENT_CONT; + } else { + ink_assert(trigger_event == e && (event == EVENT_INTERVAL || event == EVENT_POLL)); + return this->waitForActivity(-1); + } +} + +static void +net_signal_hook_callback(EThread *thread) +{ +#if HAVE_EVENTFD + uint64_t counter; + ATS_UNUSED_RETURN(read(thread->evfd, &counter, sizeof(uint64_t))); +#else + char dummy[1024]; + ATS_UNUSED_RETURN(read(thread->evpipe[0], &dummy[0], 1024)); +#endif +} + +int +NetHandler::waitForActivity(ink_hrtime timeout) +{ + EventIO *epd = nullptr; +#if TS_USE_LINUX_IO_URING + IOUringContext *ur = IOUringContext::local_context(); + bool servicedh = false; +#endif + + NET_INCREMENT_DYN_STAT(net_handler_run_stat); + SCOPED_MUTEX_LOCK(lock, mutex, this->thread); + + process_enabled_list(); + +#if TS_USE_LINUX_IO_URING + ur->submit(); +#endif + + // Polling event by PollCont + PollCont *p = get_PollCont(this->thread); + p->do_poll(timeout); + + // Get & Process polling result + PollDescriptor *pd = get_PollDescriptor(this->thread); + NetEvent *ne = nullptr; + for (int x = 0; x < pd->result; x++) { + epd = static_cast get_ev_data(pd, x); + if (epd->type == EVENTIO_READWRITE_VC) { + ne = epd->data.ne; + // Remove triggered NetEvent from cop_list because it won't be timeout before next InactivityCop runs. + if (cop_list.in(ne)) { + cop_list.remove(ne); + } + int flags = get_ev_events(pd, x); + if (flags & (EVENTIO_ERROR)) { + ne->set_error_from_socket(); + } + if (flags & (EVENTIO_READ)) { + ne->read.triggered = 1; + if (!read_ready_list.in(ne)) { + read_ready_list.enqueue(ne); + } + } + if (flags & (EVENTIO_WRITE)) { + ne->write.triggered = 1; + if (!write_ready_list.in(ne)) { + write_ready_list.enqueue(ne); + } + } else if (!(flags & (EVENTIO_READ))) { + Debug("iocore_net_main", "Unhandled epoll event: 0x%04x", flags); + // In practice we sometimes see EPOLLERR and EPOLLHUP through there + // Anything else would be surprising + ink_assert((flags & ~(EVENTIO_ERROR)) == 0); + ne->write.triggered = 1; + if (!write_ready_list.in(ne)) { + write_ready_list.enqueue(ne); + } + } + } else if (epd->type == EVENTIO_DNS_CONNECTION) { + if (epd->data.dnscon != nullptr) { + epd->data.dnscon->trigger(); // Make sure the DNSHandler for this con knows we triggered +#if defined(USE_EDGE_TRIGGER) + epd->refresh(EVENTIO_READ); +#endif + } + } else if (epd->type == EVENTIO_ASYNC_SIGNAL) { + net_signal_hook_callback(this->thread); + } else if (epd->type == EVENTIO_NETACCEPT) { + this->thread->schedule_imm(epd->data.na); +#if TS_USE_LINUX_IO_URING + } else if (epd->type == EVENTIO_IO_URING) { + servicedh = true; +#endif + } + ev_next_event(pd, x); + } + + pd->result = 0; + + process_ready_list(); + +#if TS_USE_LINUX_IO_URING + if (servicedh) { + ur->service(); + } +#endif + + return EVENT_CONT; +} + +void +NetHandler::signalActivity() +{ +#if HAVE_EVENTFD + uint64_t counter = 1; + ATS_UNUSED_RETURN(write(thread->evfd, &counter, sizeof(uint64_t))); +#else + char dummy = 1; + ATS_UNUSED_RETURN(write(thread->evpipe[1], &dummy, 1)); +#endif +} + +bool +NetHandler::manage_active_queue(NetEvent *enabling_ne, bool ignore_queue_size = false) +{ + const int total_connections_in = active_queue_size + keep_alive_queue_size; + Debug("v_net_queue", + "max_connections_per_thread_in: %d max_requests_per_thread_in: %d total_connections_in: %d " + "active_queue_size: %d keep_alive_queue_size: %d", + max_connections_per_thread_in, max_requests_per_thread_in, total_connections_in, active_queue_size, keep_alive_queue_size); + + if (!max_requests_per_thread_in) { + // active queue has no max + return true; + } + + if (ignore_queue_size == false && max_requests_per_thread_in > active_queue_size) { + return true; + } + + ink_hrtime now = Thread::get_hrtime(); + + // loop over the non-active connections and try to close them + NetEvent *ne = active_queue.head; + NetEvent *ne_next = nullptr; + int closed = 0; + int handle_event = 0; + int total_idle_time = 0; + int total_idle_count = 0; + for (; ne != nullptr; ne = ne_next) { + ne_next = ne->active_queue_link.next; + // It seems dangerous closing the current ne at this point + // Let the activity_cop deal with it + if (ne == enabling_ne) { + continue; + } + if ((ne->next_inactivity_timeout_at && ne->next_inactivity_timeout_at <= now) || + (ne->next_activity_timeout_at && ne->next_activity_timeout_at <= now)) { + _close_ne(ne, now, handle_event, closed, total_idle_time, total_idle_count); + } + if (ignore_queue_size == false && max_requests_per_thread_in > active_queue_size) { + return true; + } + } + + if (max_requests_per_thread_in > active_queue_size) { + return true; + } + + return false; // failed to make room in the queue, all connections are active +} + +void +NetHandler::configure_per_thread_values() +{ + // figure out the number of threads and calculate the number of connections per thread + int threads = eventProcessor.thread_group[ET_NET]._count; + max_connections_per_thread_in = config.max_connections_in / threads; + max_requests_per_thread_in = config.max_requests_in / threads; + Debug("net_queue", "max_connections_per_thread_in updated to %d threads: %d", max_connections_per_thread_in, threads); + Debug("net_queue", "max_requests_per_thread_in updated to %d threads: %d", max_requests_per_thread_in, threads); +} + +void +NetHandler::manage_keep_alive_queue() +{ + uint32_t total_connections_in = active_queue_size + keep_alive_queue_size; + ink_hrtime now = Thread::get_hrtime(); + + Debug("v_net_queue", "max_connections_per_thread_in: %d total_connections_in: %d active_queue_size: %d keep_alive_queue_size: %d", + max_connections_per_thread_in, total_connections_in, active_queue_size, keep_alive_queue_size); + + if (!max_connections_per_thread_in || total_connections_in <= max_connections_per_thread_in) { + return; + } + + // loop over the non-active connections and try to close them + NetEvent *ne_next = nullptr; + int closed = 0; + int handle_event = 0; + int total_idle_time = 0; + int total_idle_count = 0; + for (NetEvent *ne = keep_alive_queue.head; ne != nullptr; ne = ne_next) { + ne_next = ne->keep_alive_queue_link.next; + _close_ne(ne, now, handle_event, closed, total_idle_time, total_idle_count); + + total_connections_in = active_queue_size + keep_alive_queue_size; + if (total_connections_in <= max_connections_per_thread_in) { + break; + } + } + + if (total_idle_count > 0) { + Debug("net_queue", "max cons: %d active: %d idle: %d already closed: %d, close event: %d mean idle: %d", + max_connections_per_thread_in, total_connections_in, keep_alive_queue_size, closed, handle_event, + total_idle_time / total_idle_count); + } +} + +void +NetHandler::_close_ne(NetEvent *ne, ink_hrtime now, int &handle_event, int &closed, int &total_idle_time, int &total_idle_count) +{ + if (ne->get_thread() != this_ethread()) { + return; + } + MUTEX_TRY_LOCK(lock, ne->get_mutex(), this_ethread()); + if (!lock.is_locked()) { + return; + } + ink_hrtime diff = (now - (ne->next_inactivity_timeout_at - ne->inactivity_timeout_in)) / HRTIME_SECOND; + if (diff > 0) { + total_idle_time += diff; + ++total_idle_count; + NET_SUM_DYN_STAT(keep_alive_queue_timeout_total_stat, diff); + NET_INCREMENT_DYN_STAT(keep_alive_queue_timeout_count_stat); + } + Debug("net_queue", "closing connection NetEvent=%p idle: %u now: %" PRId64 " at: %" PRId64 " in: %" PRId64 " diff: %" PRId64, ne, + keep_alive_queue_size, ink_hrtime_to_sec(now), ink_hrtime_to_sec(ne->next_inactivity_timeout_at), + ink_hrtime_to_sec(ne->inactivity_timeout_in), diff); + if (ne->closed) { + free_netevent(ne); + ++closed; + } else { + ne->next_inactivity_timeout_at = now; + // create a dummy event + Event event; + event.ethread = this_ethread(); + if (ne->inactivity_timeout_in && ne->next_inactivity_timeout_at <= now) { + if (ne->callback(VC_EVENT_INACTIVITY_TIMEOUT, &event) == EVENT_DONE) { + ++handle_event; + } + } else if (ne->active_timeout_in && ne->next_activity_timeout_at <= now) { + if (ne->callback(VC_EVENT_ACTIVE_TIMEOUT, &event) == EVENT_DONE) { + ++handle_event; + } + } + } +} + +void +NetHandler::add_to_keep_alive_queue(NetEvent *ne) +{ + Debug("net_queue", "NetEvent: %p", ne); + ink_assert(mutex->thread_holding == this_ethread()); + + if (keep_alive_queue.in(ne)) { + // already in the keep-alive queue, move the head + keep_alive_queue.remove(ne); + } else { + // in the active queue or no queue, new to this queue + remove_from_active_queue(ne); + ++keep_alive_queue_size; + } + keep_alive_queue.enqueue(ne); + + // if keep-alive queue is over size then close connections + manage_keep_alive_queue(); +} + +void +NetHandler::remove_from_keep_alive_queue(NetEvent *ne) +{ + Debug("net_queue", "NetEvent: %p", ne); + ink_assert(mutex->thread_holding == this_ethread()); + + if (keep_alive_queue.in(ne)) { + keep_alive_queue.remove(ne); + --keep_alive_queue_size; + } +} + +bool +NetHandler::add_to_active_queue(NetEvent *ne) +{ + Debug("net_queue", "NetEvent: %p", ne); + Debug("net_queue", "max_connections_per_thread_in: %d active_queue_size: %d keep_alive_queue_size: %d", + max_connections_per_thread_in, active_queue_size, keep_alive_queue_size); + ink_assert(mutex->thread_holding == this_ethread()); + + bool active_queue_full = false; + + // if active queue is over size then close inactive connections + if (manage_active_queue(ne) == false) { + active_queue_full = true; + } + + if (active_queue.in(ne)) { + // already in the active queue, move the head + active_queue.remove(ne); + } else { + if (active_queue_full) { + // there is no room left in the queue + NET_SUM_DYN_STAT(net_requests_max_throttled_in_stat, 1); + return false; + } + // in the keep-alive queue or no queue, new to this queue + remove_from_keep_alive_queue(ne); + ++active_queue_size; + } + active_queue.enqueue(ne); + + return true; +} + +void +NetHandler::remove_from_active_queue(NetEvent *ne) +{ + Debug("net_queue", "NetEvent: %p", ne); + ink_assert(mutex->thread_holding == this_ethread()); + + if (active_queue.in(ne)) { + active_queue.remove(ne); + --active_queue_size; + } +} diff --git a/iocore/net/NetHandler.h b/iocore/net/NetHandler.h new file mode 100644 index 00000000000..b733fced2b3 --- /dev/null +++ b/iocore/net/NetHandler.h @@ -0,0 +1,235 @@ +/** @file + + A brief file description + + @section license License + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ + +#pragma once + +#include "I_Continuation.h" +#include "I_EThread.h" +#include "NetEvent.h" + +// +// NetHandler +// +// A NetHandler handles the Network IO operations. It maintains +// lists of operations at multiples of it's periodicity. +// + +/** + NetHandler is the processor of NetEvent for the Net sub-system. The NetHandler + is the core component of the Net sub-system. Once started, it is responsible + for polling socket fds and perform the I/O tasks in NetEvent. + + The NetHandler is executed periodically to perform read/write tasks for + NetVConnection. The NetHandler::mainNetEvent() should be viewed as a part of + EThread::execute() loop. This is the reason that Net System is a sub-system. + + By get_NetHandler(this_ethread()), you can get the NetHandler object that + runs inside the current EThread and then @c startIO / @c stopIO which + assign/release a NetEvent to/from NetHandler. Before you call these functions, + holding the mutex of this NetHandler is required. + + The NetVConnection provides a set of do_io functions through which you can + specify continuations to be called back by its NetHandler. These function + calls do not block. Instead they return an VIO object and schedule the + callback to the continuation passed in when there are I/O events occurred. + + Multi-thread scheduler: + + The NetHandler should be viewed as multi-threaded schedulers which process + NetEvents from their queues. If vc wants to be managed by NetHandler, the vc + should be derived from NetEvent. The vc can be made of NetProcessor + (allocate_vc) either by directly adding a NetEvent to the queue + (NetHandler::startIO), or more conveniently, calling a method service call + (NetProcessor::connect_re) which synthesizes the NetEvent and places it in the + queue. + + Callback event codes: + + These event codes for do_io_read and reenable(read VIO) task: + VC_EVENT_READ_READY, VC_EVENT_READ_COMPLETE, + VC_EVENT_EOS, VC_EVENT_ERROR + + These event codes for do_io_write and reenable(write VIO) task: + VC_EVENT_WRITE_READY, VC_EVENT_WRITE_COMPLETE + VC_EVENT_ERROR + + There is no event and callback for do_io_shutdown / do_io_close task. + + NetVConnection allocation policy: + + VCs are allocated by the NetProcessor and deallocated by NetHandler. + A state machine may access the returned, non-recurring NetEvent / VIO until + it is closed by do_io_close. For recurring NetEvent, the NetEvent may be + accessed until it is closed. Once the NetEvent is closed, it's the + NetHandler's responsibility to deallocate it. + + Before assign to NetHandler or after release from NetHandler, it's the + NetEvent's responsibility to deallocate itself. + + */ +class NetHandler : public Continuation, public EThread::LoopTailHandler +{ + using self_type = NetHandler; ///< Self reference type. +public: + // @a thread and @a trigger_event are redundant - you can get the former from + // the latter. If we don't get rid of @a trigger_event we should remove @a + // thread. + EThread *thread = nullptr; + Event *trigger_event = nullptr; + QueM(NetEvent, NetState, read, ready_link) read_ready_list; + QueM(NetEvent, NetState, write, ready_link) write_ready_list; + Que(NetEvent, open_link) open_list; + DList(NetEvent, cop_link) cop_list; + ASLLM(NetEvent, NetState, read, enable_link) read_enable_list; + ASLLM(NetEvent, NetState, write, enable_link) write_enable_list; + Que(NetEvent, keep_alive_queue_link) keep_alive_queue; + uint32_t keep_alive_queue_size = 0; + Que(NetEvent, active_queue_link) active_queue; + uint32_t active_queue_size = 0; + +#ifdef TS_USE_LINUX_IO_URING + EventIO uring_evio; +#endif + + /// configuration settings for managing the active and keep-alive queues + struct Config { + uint32_t max_connections_in = 0; + uint32_t max_requests_in = 0; + uint32_t inactive_threshold_in = 0; + uint32_t transaction_no_activity_timeout_in = 0; + uint32_t keep_alive_no_activity_timeout_in = 0; + uint32_t default_inactivity_timeout = 0; + + /** Return the address of the first value in this struct. + + Doing updates is much easier if we treat this config struct as an array. + Making it a method means the knowledge of which member is the first one + is localized to this struct, not scattered about. + */ + uint32_t & + operator[](int n) + { + return *(&max_connections_in + n); + } + }; + /** Static global config, set and updated per process. + + This is updated asynchronously and then events are sent to the NetHandler + instances per thread to copy to the per thread config at a convenient time. + Because these are updated independently from the command line, the update + events just copy a single value from the global to the local. This + mechanism relies on members being identical types. + */ + static Config global_config; + Config config; ///< Per thread copy of the @c global_config + // Active and keep alive queue values that depend on other configuration + // values. These are never updated directly, they are computed from other + // config values. + uint32_t max_connections_per_thread_in = 0; + uint32_t max_requests_per_thread_in = 0; + /// Number of configuration items in @c Config. + static constexpr int CONFIG_ITEM_COUNT = sizeof(Config) / sizeof(uint32_t); + /// Which members of @c Config the per thread values depend on. + /// If one of these is updated, the per thread values must also be updated. + static const std::bitset config_value_affects_per_thread_value; + /// Set of thread types in which nethandlers are active. + /// This enables signaling the correct instances when the configuration is + /// updated. Event type threads that use @c NetHandler must set the + /// corresponding bit. + static std::bitset::digits> active_thread_types; + + int mainNetEvent(int event, Event *data); + int waitForActivity(ink_hrtime timeout) override; + void process_enabled_list(); + void process_ready_list(); + void manage_keep_alive_queue(); + bool manage_active_queue(NetEvent *ne, bool ignore_queue_size); + void add_to_keep_alive_queue(NetEvent *ne); + void remove_from_keep_alive_queue(NetEvent *ne); + bool add_to_active_queue(NetEvent *ne); + void remove_from_active_queue(NetEvent *ne); + + /// Per process initialization logic. + static void init_for_process(); + /// Update configuration values that are per thread and depend on other + /// configuration values. + void configure_per_thread_values(); + + /** + Start to handle read & write event on a NetEvent. + Initial the socket fd of ne for polling system. + Only be called when holding the mutex of this NetHandler. + + @param ne NetEvent to be managed by this NetHandler. + @return 0 on success, ne->nh set to this NetHandler. + -ERRNO on failure. + */ + int startIO(NetEvent *ne); + /** + Stop to handle read & write event on a NetEvent. + Remove the socket fd of ne from polling system. + Only be called when holding the mutex of this NetHandler and must call + stopCop(ne) first. + + @param ne NetEvent to be released. + @return ne->nh set to nullptr. + */ + void stopIO(NetEvent *ne); + + /** + Start to handle active timeout and inactivity timeout on a NetEvent. + Put the ne into open_list. All NetEvents in the open_list is checked for + timeout by InactivityCop. Only be called when holding the mutex of this + NetHandler and must call startIO(ne) first. + + @param ne NetEvent to be managed by InactivityCop + */ + void startCop(NetEvent *ne); + /** + Stop to handle active timeout and inactivity on a NetEvent. + Remove the ne from open_list and cop_list. + Also remove the ne from keep_alive_queue and active_queue if its context is + IN. Only be called when holding the mutex of this NetHandler. + + @param ne NetEvent to be released. + */ + void stopCop(NetEvent *ne); + + // Signal the epoll_wait to terminate. + void signalActivity() override; + + /** + Release a ne and free it. + + @param ne NetEvent to be detached. + */ + void free_netevent(NetEvent *ne); + + NetHandler(); + +private: + void _close_ne(NetEvent *ne, ink_hrtime now, int &handle_event, int &closed, int &total_idle_time, int &total_idle_count); + + /// Static method used as the callback for runtime configuration updates. + static int update_nethandler_config(const char *name, RecDataT, RecData data, void *); +}; diff --git a/iocore/net/NetVCOptions.cc b/iocore/net/NetVCOptions.cc new file mode 100644 index 00000000000..9e14b870bbb --- /dev/null +++ b/iocore/net/NetVCOptions.cc @@ -0,0 +1,71 @@ +/**@file + + A brief file description + + @section license License + + Licensed to the Apache Software + Foundation(ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ + +#include "NetVCOptions.h" +#include "I_Net.h" +#include "I_Socks.h" + +void +NetVCOptions::reset() +{ + ip_proto = USE_TCP; + ip_family = AF_INET; + local_ip.invalidate(); + local_port = 0; + addr_binding = ANY_ADDR; + socks_support = NORMAL_SOCKS; + socks_version = SOCKS_DEFAULT_VERSION; + socket_recv_bufsize = +#if defined(RECV_BUF_SIZE) + RECV_BUF_SIZE; +#else + 0; +#endif + socket_send_bufsize = 0; + sockopt_flags = 0; + packet_mark = 0; + packet_tos = 0; + packet_notsent_lowat = 0; + + etype = ET_NET; + + sni_servername = nullptr; + ssl_servername = nullptr; + sni_hostname = nullptr; + ssl_client_cert_name = nullptr; + ssl_client_private_key_name = nullptr; + outbound_sni_policy = nullptr; +} + +void +NetVCOptions::set_sock_param(int _recv_bufsize, int _send_bufsize, unsigned long _opt_flags, unsigned long _packet_mark, + unsigned long _packet_tos, unsigned long _packet_notsent_lowat) +{ + socket_recv_bufsize = _recv_bufsize; + socket_send_bufsize = _send_bufsize; + sockopt_flags = _opt_flags; + packet_mark = _packet_mark; + packet_tos = _packet_tos; + packet_notsent_lowat = _packet_notsent_lowat; +} diff --git a/iocore/net/NetVCOptions.h b/iocore/net/NetVCOptions.h new file mode 100644 index 00000000000..9b8c9af5b76 --- /dev/null +++ b/iocore/net/NetVCOptions.h @@ -0,0 +1,322 @@ +/** @file + + NetVConnection options class + + @section license License + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + + */ +#pragma once +#include "tscore/ink_inet.h" +#include "I_EventSystem.h" +#include "I_Event.h" +#include "YamlSNIConfig.h" + +#include + +struct NetVCOptions { + using self = NetVCOptions; ///< Self reference type. + + /// Values for valid IP protocols. + enum ip_protocol_t { + USE_TCP, ///< TCP protocol. + USE_UDP ///< UDP protocol. + }; + + /// IP (TCP or UDP) protocol to use on socket. + ip_protocol_t ip_proto; + + /** IP address family. + + This is used for inbound connections only if @c local_ip is not + set, which is sometimes more convenient for the client. This + defaults to @c AF_INET so if the client sets neither this nor @c + local_ip then IPv4 is used. + + For outbound connections this is ignored and the family of the + remote address used. + + @note This is (inconsistently) called "domain" and "protocol" in + other places. "family" is used here because that's what the + standard IP data structures use. + + */ + uint16_t ip_family; + + /** The set of ways in which the local address should be bound. + + The protocol is set by the contents of @a local_addr regardless + of this value. @c ANY_ADDR will override only the address. + + @note The difference between @c INTF_ADDR and @c FOREIGN_ADDR is + whether transparency is enabled on the socket. It is the + client's responsibility to set this correctly based on whether + the address in @a local_addr is associated with an interface on + the local system ( @c INTF_ADDR ) or is owned by a foreign + system ( @c FOREIGN_ADDR ). A binding style of @c ANY_ADDR + causes the value in @a local_addr to be ignored. + + The IP address and port are separate because most clients treat + these independently. For the same reason @c IpAddr is used + to be clear that it contains no port data. + + @see local_addr + @see addr_binding + */ + enum addr_bind_style { + ANY_ADDR, ///< Bind to any available local address (don't care, default). + INTF_ADDR, ///< Bind to interface address in @a local_addr. + FOREIGN_ADDR ///< Bind to foreign address in @a local_addr. + }; + + /** Local address for the connection. + + For outbound connections this must have the same family as the + remote address (which is not stored in this structure). For + inbound connections the family of this value overrides @a + ip_family if set. + + @note Ignored if @a addr_binding is @c ANY_ADDR. + @see addr_binding + @see ip_family + */ + IpAddr local_ip; + + /** Local port for connection. + Set to 0 for "don't care" (default). + */ + uint16_t local_port; + + /// How to bind the local address. + /// @note Default is @c ANY_ADDR. + addr_bind_style addr_binding; + + /// Make the socket blocking on I/O (default: @c false) + // TODO: make this const. We don't use blocking + bool f_blocking = false; + /// Make socket block on connect (default: @c false) + // TODO: make this const. We don't use blocking + bool f_blocking_connect = false; + + // Use TCP Fast Open on this socket. The connect(2) call will be omitted. + bool f_tcp_fastopen = false; + + /// Control use of SOCKS. + /// Set to @c NO_SOCKS to disable use of SOCKS. Otherwise SOCKS is + /// used if available. + unsigned char socks_support; + /// Version of SOCKS to use. + unsigned char socks_version; + + int socket_recv_bufsize; + int socket_send_bufsize; + + /// Configuration options for sockets. + /// @note These are not identical to internal socket options but + /// specifically defined for configuration. These are mask values + /// and so must be powers of 2. + uint32_t sockopt_flags; + /// Value for TCP no delay for @c sockopt_flags. + static uint32_t const SOCK_OPT_NO_DELAY = 1; + /// Value for keep alive for @c sockopt_flags. + static uint32_t const SOCK_OPT_KEEP_ALIVE = 2; + /// Value for linger on for @c sockopt_flags + static uint32_t const SOCK_OPT_LINGER_ON = 4; + /// Value for TCP Fast open @c sockopt_flags + static uint32_t const SOCK_OPT_TCP_FAST_OPEN = 8; + /// Value for SO_MARK @c sockopt_flags + static uint32_t const SOCK_OPT_PACKET_MARK = 16; + /// Value for IP_TOS @c sockopt_flags + static uint32_t const SOCK_OPT_PACKET_TOS = 32; + /// Value for TCP_NOTSENT_LOWAT @c sockopt_flags + static uint32_t const SOCK_OPT_TCP_NOTSENT_LOWAT = 64; + + uint32_t packet_mark; + uint32_t packet_tos; + uint32_t packet_notsent_lowat; + + EventType etype; + + /** ALPN protocol-lists. The format is OpenSSL protocol-lists format (vector of 8-bit length-prefixed, byte strings) + https://www.openssl.org/docs/manmaster/man3/SSL_CTX_set_alpn_protos.html + */ + std::string_view alpn_protos; + /** Server name to use for SNI data on an outbound connection. + */ + ats_scoped_str sni_servername; + /** FQDN used to connect to the origin. May be different + * than sni_servername if pristine host headers are used + */ + ats_scoped_str ssl_servername; + + /** Server host name from client's request to use for SNI data on an outbound connection. + */ + ats_scoped_str sni_hostname; + + /** Outbound sni policy which overrides proxy.ssl.client.sni_policy + */ + ats_scoped_str outbound_sni_policy; + + /** + * Client certificate to use in response to OS's certificate request + */ + ats_scoped_str ssl_client_cert_name; + /* + * File containing private key matching certificate + */ + const char *ssl_client_private_key_name = nullptr; + /* + * File containing CA certs for verifying origin's cert + */ + const char *ssl_client_ca_cert_name = nullptr; + /* + * Directory containing CA certs for verifying origin's cert + */ + const char *ssl_client_ca_cert_path = nullptr; + + bool tls_upstream = false; + + unsigned char alpn_protocols_array[MAX_ALPN_STRING]; + int alpn_protocols_array_size = 0; + + /** + * Set to DISABLED, PERFMISSIVE, or ENFORCED + * Controls how the server certificate verification is handled + */ + YamlSNIConfig::Policy verifyServerPolicy = YamlSNIConfig::Policy::DISABLED; + + /** + * Bit mask of which features of the server certificate should be checked + * Currently SIGNATURE and NAME + */ + YamlSNIConfig::Property verifyServerProperties = YamlSNIConfig::Property::NONE; + + /// Reset all values to defaults. + void reset(); + + void set_sock_param(int _recv_bufsize, int _send_bufsize, unsigned long _opt_flags, unsigned long _packet_mark = 0, + unsigned long _packet_tos = 0, unsigned long _packet_notsent_lowat = 0); + + NetVCOptions() { reset(); } + ~NetVCOptions() {} + + /** Set the SNI server name. + A local copy is made of @a name. + */ + self & + set_sni_servername(const char *name, size_t len) + { + IpEndpoint ip; + + // Literal IPv4 and IPv6 addresses are not permitted in "HostName".(rfc6066#section-3) + if (name && len && ats_ip_pton(std::string_view(name, len), &ip) != 0) { + sni_servername = ats_strndup(name, len); + } else { + sni_servername = nullptr; + } + return *this; + } + + self & + set_ssl_client_cert_name(const char *name) + { + if (name) { + ssl_client_cert_name = ats_strdup(name); + } else { + ssl_client_cert_name = nullptr; + } + return *this; + } + + self & + set_ssl_servername(const char *name) + { + if (name) { + ssl_servername = ats_strdup(name); + } else { + ssl_servername = nullptr; + } + return *this; + } + + self & + set_sni_hostname(const char *name, size_t len) + { + IpEndpoint ip; + + // Literal IPv4 and IPv6 addresses are not permitted in "HostName".(rfc6066#section-3) + if (name && len && ats_ip_pton(std::string_view(name, len), &ip) != 0) { + sni_hostname = ats_strndup(name, len); + } else { + sni_hostname = nullptr; + } + return *this; + } + + self & + operator=(self const &that) + { + if (&that != this) { + /* + * It is odd but necessary to null the scoped string pointer here + * and then explicitly call release on them in the string assignments + * below. + * We a memcpy from that to this. This will put that's string pointers into + * this's memory. Therefore we must first explicitly null out + * this's original version of the string. The release after the + * memcpy removes the extra reference to that's copy of the string + * Removing the release will eventually cause a double free crash + */ + sni_servername = nullptr; // release any current name. + ssl_servername = nullptr; + sni_hostname = nullptr; + ssl_client_cert_name = nullptr; + memcpy(static_cast(this), &that, sizeof(self)); + if (that.sni_servername) { + sni_servername.release(); // otherwise we'll free the source string. + this->sni_servername = ats_strdup(that.sni_servername); + } + if (that.ssl_servername) { + ssl_servername.release(); // otherwise we'll free the source string. + this->ssl_servername = ats_strdup(that.ssl_servername); + } + if (that.sni_hostname) { + sni_hostname.release(); // otherwise we'll free the source string. + this->sni_hostname = ats_strdup(that.sni_hostname); + } + if (that.ssl_client_cert_name) { + this->ssl_client_cert_name.release(); // otherwise we'll free the source string. + this->ssl_client_cert_name = ats_strdup(that.ssl_client_cert_name); + } + } + return *this; + } + + std::string_view get_family_string() const; + + std::string_view get_proto_string() const; + + /// @name Debugging + //@{ + /// Convert @a s to its string equivalent. + static const char *toString(addr_bind_style s); + //@} + + // noncopyable + NetVCOptions(const NetVCOptions &) = delete; +}; diff --git a/iocore/net/P_NetAccept.h b/iocore/net/P_NetAccept.h index e30394d4b24..e116affe51c 100644 --- a/iocore/net/P_NetAccept.h +++ b/iocore/net/P_NetAccept.h @@ -38,11 +38,14 @@ ****************************************************************************/ #pragma once +#include "EventIO.h" +#include "I_NetProcessor.h" #include #include "tscore/ink_platform.h" #include "P_Connection.h" struct NetAccept; +struct HttpProxyPort; class Event; class SSLNextProtocolAccept; // @@ -50,7 +53,7 @@ class SSLNextProtocolAccept; // Accepts as many connections as possible, returning the number accepted // or -1 to stop accepting. // -typedef int(AcceptFunction)(NetAccept *na, void *e, bool blockable); +using AcceptFunction = int(NetAccept *, void *, bool); using AcceptFunctionPtr = AcceptFunction *; AcceptFunction net_accept; @@ -91,7 +94,7 @@ struct NetAccept : public Continuation { EventIO ep; HttpProxyPort *proxyPort = nullptr; - NetProcessor::AcceptOptions opt; + AcceptOptions opt; virtual NetProcessor *getNetProcessor() const; diff --git a/iocore/net/P_UDPNet.h b/iocore/net/P_UDPNet.h index d1d862b840b..796e07a6423 100644 --- a/iocore/net/P_UDPNet.h +++ b/iocore/net/P_UDPNet.h @@ -32,6 +32,7 @@ #include "tscore/ink_platform.h" #include "I_UDPNet.h" +#include "PollCont.h" // added by YTS Team, yamsat static inline PollCont *get_UDPPollCont(EThread *); @@ -335,7 +336,6 @@ class UDPNetHandler : public Continuation, public EThread::LoopTailHandler UDPNetHandler(bool enable_gso); }; -struct PollCont; static inline PollCont * get_UDPPollCont(EThread *t) { diff --git a/iocore/net/P_UnixNet.h b/iocore/net/P_UnixNet.h index 06abfed6ed6..dc1a8d30860 100644 --- a/iocore/net/P_UnixNet.h +++ b/iocore/net/P_UnixNet.h @@ -27,119 +27,20 @@ #include "tscore/ink_platform.h" -#define USE_EDGE_TRIGGER_EPOLL 1 -#define USE_EDGE_TRIGGER_KQUEUE 1 -#define USE_EDGE_TRIGGER_PORT 1 - -#define EVENTIO_NETACCEPT 1 -#define EVENTIO_READWRITE_VC 2 -#define EVENTIO_DNS_CONNECTION 3 -#define EVENTIO_UDP_CONNECTION 4 -#define EVENTIO_ASYNC_SIGNAL 5 -#define EVENTIO_IO_URING 6 - -#if TS_USE_EPOLL -#ifndef EPOLLEXCLUSIVE -#define EPOLLEXCLUSIVE 0 -#endif -#ifdef USE_EDGE_TRIGGER_EPOLL -#define USE_EDGE_TRIGGER 1 -#define EVENTIO_READ (EPOLLIN | EPOLLET) -#define EVENTIO_WRITE (EPOLLOUT | EPOLLET) -#else -#define EVENTIO_READ EPOLLIN -#define EVENTIO_WRITE EPOLLOUT -#endif -#define EVENTIO_ERROR (EPOLLERR | EPOLLPRI | EPOLLHUP) -#endif - -#if TS_USE_KQUEUE -#ifdef USE_EDGE_TRIGGER_KQUEUE -#define USE_EDGE_TRIGGER 1 -#define INK_EV_EDGE_TRIGGER EV_CLEAR -#else -#define INK_EV_EDGE_TRIGGER 0 -#endif -#define EVENTIO_READ INK_EVP_IN -#define EVENTIO_WRITE INK_EVP_OUT -#define EVENTIO_ERROR (0x010 | 0x002 | 0x020) // ERR PRI HUP -#endif - -struct PollDescriptor; -using EventLoop = PollDescriptor *; - -class NetEvent; -class UnixUDPConnection; -class DiskHandler; -struct DNSConnection; -struct NetAccept; - -/// Unified API for setting and clearing kernel and epoll events. -struct EventIO { - int fd = -1; ///< file descriptor, often a system port -#if TS_USE_KQUEUE || TS_USE_EPOLL && !defined(USE_EDGE_TRIGGER) - int events = 0; ///< a bit mask of enabled events -#endif - EventLoop event_loop = nullptr; ///< the assigned event loop - bool syscall = true; ///< if false, disable all functionality (for QUIC) - int type = 0; ///< class identifier of union data. - union { - void *untyped; - NetEvent *ne; - DNSConnection *dnscon; - NetAccept *na; - UnixUDPConnection *uc; - DiskHandler *dh; - } data; ///< a kind of continuation - - /** The start methods all logically Setup a class to be called - when a file descriptor is available for read or write. - The type of the classes vary. Generally the file descriptor - is pulled from the class, but there is one option that lets - the file descriptor be expressed directly. - @param l the event loop - @param events a mask of flags (for details `man epoll_ctl`) - @return int the number of events created, -1 is error - */ - int start(EventLoop l, DNSConnection *vc, int events); - int start(EventLoop l, NetAccept *vc, int events); - int start(EventLoop l, NetEvent *ne, int events); - int start(EventLoop l, UnixUDPConnection *vc, int events); - int start(EventLoop l, int fd, NetEvent *ne, int events); - int start_common(EventLoop l, int fd, int events); - - /** Alter the events that will trigger the continuation, for level triggered I/O. - @param events add with positive mask(+EVENTIO_READ), or remove with negative mask (-EVENTIO_READ) - @return int the number of events created, -1 is error - */ - int modify(int events); - - /** Refresh the existing events (i.e. KQUEUE EV_CLEAR), for edge triggered I/O - @param events mask of events - @return int the number of events created, -1 is error - */ - int refresh(int events); - - /// Remove the kernel or epoll event. Returns 0 on success. - int stop(); - - /// Remove the epoll event and close the connection. Returns 0 on success. - int close(); - - EventIO() { data.untyped = nullptr; } -}; - +#include "PollCont.h" +#include "EventIO.h" +#include "NetHandler.h" #include "P_Net.h" +#include "P_NetAccept.h" #include "P_UnixNetProcessor.h" #include "P_UnixNetVConnection.h" -#include "P_NetAccept.h" -#include "P_DNSConnection.h" -#include "P_UnixUDPConnection.h" #include "P_UnixPollDescriptor.h" #include -class NetEvent; -class NetHandler; +NetHandler *get_NetHandler(EThread *t); +PollCont *get_PollCont(EThread *t); +PollDescriptor *get_PollDescriptor(EThread *t); + using NetContHandler = int (NetHandler::*)(int, void *); using uint32 = unsigned int; @@ -176,235 +77,6 @@ extern ink_hrtime last_transient_accept_error; // function prototype needed for SSLUnixNetVConnection unsigned int net_next_connection_number(); -struct PollCont : public Continuation { - NetHandler *net_handler; - PollDescriptor *pollDescriptor; - PollDescriptor *nextPollDescriptor; - int poll_timeout; - - PollCont(Ptr &m, int pt = net_config_poll_timeout); - PollCont(Ptr &m, NetHandler *nh, int pt = net_config_poll_timeout); - ~PollCont() override; - int pollEvent(int, Event *); - void do_poll(ink_hrtime timeout); -}; - -/** - NetHandler is the processor of NetEvent for the Net sub-system. The NetHandler - is the core component of the Net sub-system. Once started, it is responsible - for polling socket fds and perform the I/O tasks in NetEvent. - - The NetHandler is executed periodically to perform read/write tasks for - NetVConnection. The NetHandler::mainNetEvent() should be viewed as a part of - EThread::execute() loop. This is the reason that Net System is a sub-system. - - By get_NetHandler(this_ethread()), you can get the NetHandler object that - runs inside the current EThread and then @c startIO / @c stopIO which - assign/release a NetEvent to/from NetHandler. Before you call these functions, - holding the mutex of this NetHandler is required. - - The NetVConnection provides a set of do_io functions through which you can - specify continuations to be called back by its NetHandler. These function - calls do not block. Instead they return an VIO object and schedule the - callback to the continuation passed in when there are I/O events occurred. - - Multi-thread scheduler: - - The NetHandler should be viewed as multi-threaded schedulers which process - NetEvents from their queues. If vc wants to be managed by NetHandler, the vc - should be derived from NetEvent. The vc can be made of NetProcessor (allocate_vc) - either by directly adding a NetEvent to the queue (NetHandler::startIO), or more - conveniently, calling a method service call (NetProcessor::connect_re) which - synthesizes the NetEvent and places it in the queue. - - Callback event codes: - - These event codes for do_io_read and reenable(read VIO) task: - VC_EVENT_READ_READY, VC_EVENT_READ_COMPLETE, - VC_EVENT_EOS, VC_EVENT_ERROR - - These event codes for do_io_write and reenable(write VIO) task: - VC_EVENT_WRITE_READY, VC_EVENT_WRITE_COMPLETE - VC_EVENT_ERROR - - There is no event and callback for do_io_shutdown / do_io_close task. - - NetVConnection allocation policy: - - VCs are allocated by the NetProcessor and deallocated by NetHandler. - A state machine may access the returned, non-recurring NetEvent / VIO until - it is closed by do_io_close. For recurring NetEvent, the NetEvent may be - accessed until it is closed. Once the NetEvent is closed, it's the - NetHandler's responsibility to deallocate it. - - Before assign to NetHandler or after release from NetHandler, it's the - NetEvent's responsibility to deallocate itself. - - */ - -// -// NetHandler -// -// A NetHandler handles the Network IO operations. It maintains -// lists of operations at multiples of it's periodicity. -// -class NetHandler : public Continuation, public EThread::LoopTailHandler -{ - using self_type = NetHandler; ///< Self reference type. -public: - // @a thread and @a trigger_event are redundant - you can get the former from the latter. - // If we don't get rid of @a trigger_event we should remove @a thread. - EThread *thread = nullptr; - Event *trigger_event = nullptr; - QueM(NetEvent, NetState, read, ready_link) read_ready_list; - QueM(NetEvent, NetState, write, ready_link) write_ready_list; - Que(NetEvent, open_link) open_list; - DList(NetEvent, cop_link) cop_list; - ASLLM(NetEvent, NetState, read, enable_link) read_enable_list; - ASLLM(NetEvent, NetState, write, enable_link) write_enable_list; - Que(NetEvent, keep_alive_queue_link) keep_alive_queue; - uint32_t keep_alive_queue_size = 0; - Que(NetEvent, active_queue_link) active_queue; - uint32_t active_queue_size = 0; - -#ifdef TS_USE_LINUX_IO_URING - EventIO uring_evio; -#endif - - /// configuration settings for managing the active and keep-alive queues - struct Config { - uint32_t max_connections_in = 0; - uint32_t max_requests_in = 0; - uint32_t inactive_threshold_in = 0; - uint32_t transaction_no_activity_timeout_in = 0; - uint32_t keep_alive_no_activity_timeout_in = 0; - uint32_t default_inactivity_timeout = 0; - - /** Return the address of the first value in this struct. - - Doing updates is much easier if we treat this config struct as an array. - Making it a method means the knowledge of which member is the first one - is localized to this struct, not scattered about. - */ - uint32_t & - operator[](int n) - { - return *(&max_connections_in + n); - } - }; - /** Static global config, set and updated per process. - - This is updated asynchronously and then events are sent to the NetHandler instances per thread - to copy to the per thread config at a convenient time. Because these are updated independently - from the command line, the update events just copy a single value from the global to the - local. This mechanism relies on members being identical types. - */ - static Config global_config; - Config config; ///< Per thread copy of the @c global_config - // Active and keep alive queue values that depend on other configuration values. - // These are never updated directly, they are computed from other config values. - uint32_t max_connections_per_thread_in = 0; - uint32_t max_requests_per_thread_in = 0; - /// Number of configuration items in @c Config. - static constexpr int CONFIG_ITEM_COUNT = sizeof(Config) / sizeof(uint32_t); - /// Which members of @c Config the per thread values depend on. - /// If one of these is updated, the per thread values must also be updated. - static const std::bitset config_value_affects_per_thread_value; - /// Set of thread types in which nethandlers are active. - /// This enables signaling the correct instances when the configuration is updated. - /// Event type threads that use @c NetHandler must set the corresponding bit. - static std::bitset::digits> active_thread_types; - - int mainNetEvent(int event, Event *data); - int waitForActivity(ink_hrtime timeout) override; - void process_enabled_list(); - void process_ready_list(); - void manage_keep_alive_queue(); - bool manage_active_queue(NetEvent *ne, bool ignore_queue_size); - void add_to_keep_alive_queue(NetEvent *ne); - void remove_from_keep_alive_queue(NetEvent *ne); - bool add_to_active_queue(NetEvent *ne); - void remove_from_active_queue(NetEvent *ne); - - /// Per process initialization logic. - static void init_for_process(); - /// Update configuration values that are per thread and depend on other configuration values. - void configure_per_thread_values(); - - /** - Start to handle read & write event on a NetEvent. - Initial the socket fd of ne for polling system. - Only be called when holding the mutex of this NetHandler. - - @param ne NetEvent to be managed by this NetHandler. - @return 0 on success, ne->nh set to this NetHandler. - -ERRNO on failure. - */ - int startIO(NetEvent *ne); - /** - Stop to handle read & write event on a NetEvent. - Remove the socket fd of ne from polling system. - Only be called when holding the mutex of this NetHandler and must call stopCop(ne) first. - - @param ne NetEvent to be released. - @return ne->nh set to nullptr. - */ - void stopIO(NetEvent *ne); - - /** - Start to handle active timeout and inactivity timeout on a NetEvent. - Put the ne into open_list. All NetEvents in the open_list is checked for timeout by InactivityCop. - Only be called when holding the mutex of this NetHandler and must call startIO(ne) first. - - @param ne NetEvent to be managed by InactivityCop - */ - void startCop(NetEvent *ne); - /** - Stop to handle active timeout and inactivity on a NetEvent. - Remove the ne from open_list and cop_list. - Also remove the ne from keep_alive_queue and active_queue if its context is IN. - Only be called when holding the mutex of this NetHandler. - - @param ne NetEvent to be released. - */ - void stopCop(NetEvent *ne); - - // Signal the epoll_wait to terminate. - void signalActivity() override; - - /** - Release a ne and free it. - - @param ne NetEvent to be detached. - */ - void free_netevent(NetEvent *ne); - - NetHandler(); - -private: - void _close_ne(NetEvent *ne, ink_hrtime now, int &handle_event, int &closed, int &total_idle_time, int &total_idle_count); - - /// Static method used as the callback for runtime configuration updates. - static int update_nethandler_config(const char *name, RecDataT, RecData data, void *); -}; - -static inline NetHandler * -get_NetHandler(EThread *t) -{ - return (NetHandler *)ETHREAD_GET_PTR(t, unix_netProcessor.netHandler_offset); -} -static inline PollCont * -get_PollCont(EThread *t) -{ - return (PollCont *)ETHREAD_GET_PTR(t, unix_netProcessor.pollCont_offset); -} -static inline PollDescriptor * -get_PollDescriptor(EThread *t) -{ - PollCont *p = get_PollCont(t); - return p->pollDescriptor; -} - enum ThrottleType { ACCEPT, CONNECT, @@ -417,11 +89,12 @@ net_connections_to_throttle(ThrottleType t) int64_t sval = 0; NET_READ_GLOBAL_DYN_SUM(net_connections_currently_open_stat, sval); - int currently_open = (int)sval; + int currently_open = static_cast(sval); // deal with race if we got to multiple net threads - if (currently_open < 0) + if (currently_open < 0) { currently_open = 0; - return (int)(currently_open * headroom); + } + return static_cast(currently_open * headroom); } TS_INLINE void @@ -439,8 +112,9 @@ check_net_throttle(ThrottleType t) { int connections = net_connections_to_throttle(t); - if (net_connections_throttle != 0 && connections >= net_connections_throttle) + if (net_connections_throttle != 0 && connections >= net_connections_throttle) { return true; + } return false; } @@ -471,8 +145,9 @@ change_net_connections_throttle(const char *token, RecDataT data_type, RecData v net_connections_throttle = throttle; } else { net_connections_throttle = fds_throttle; - if (net_connections_throttle > throttle) + if (net_connections_throttle > throttle) { net_connections_throttle = throttle; + } } return 0; } @@ -579,260 +254,3 @@ write_disable(NetHandler *nh, NetEvent *ne) nh->write_ready_list.remove(ne); ne->ep.modify(-EVENTIO_WRITE); } - -TS_INLINE int -EventIO::start(EventLoop l, DNSConnection *vc, int events) -{ - type = EVENTIO_DNS_CONNECTION; - data.dnscon = vc; - return start_common(l, vc->fd, events); -} -TS_INLINE int -EventIO::start(EventLoop l, NetAccept *vc, int events) -{ - type = EVENTIO_NETACCEPT; - data.na = vc; - return start_common(l, vc->server.fd, events); -} -TS_INLINE int -EventIO::start(EventLoop l, NetEvent *ne, int events) -{ - type = EVENTIO_READWRITE_VC; - data.ne = ne; - return start_common(l, ne->get_fd(), events); -} - -TS_INLINE int -EventIO::start(EventLoop l, UnixUDPConnection *vc, int events) -{ - type = EVENTIO_UDP_CONNECTION; - data.uc = vc; - return start_common(l, vc->fd, events); -} - -TS_INLINE int -EventIO::close() -{ - if (!this->syscall) { - return 0; - } - - stop(); - switch (type) { - default: - ink_assert(!"case"); - // fallthrough - case EVENTIO_DNS_CONNECTION: - return data.dnscon->close(); - break; - case EVENTIO_NETACCEPT: - return data.na->server.close(); - break; - case EVENTIO_READWRITE_VC: - return data.ne->close(); - break; - } - return -1; -} - -TS_INLINE int -EventIO::start(EventLoop l, int afd, NetEvent *ne, int e) -{ - data.ne = ne; - return start_common(l, afd, e); -} - -TS_INLINE int -EventIO::start_common(EventLoop l, int afd, int e) -{ - if (!this->syscall) { - return 0; - } - - fd = afd; - event_loop = l; -#if TS_USE_EPOLL - struct epoll_event ev; - memset(&ev, 0, sizeof(ev)); - ev.events = e | EPOLLEXCLUSIVE; - ev.data.ptr = this; -#ifndef USE_EDGE_TRIGGER - events = e; -#endif - return epoll_ctl(event_loop->epoll_fd, EPOLL_CTL_ADD, fd, &ev); -#endif -#if TS_USE_KQUEUE - events = e; - struct kevent ev[2]; - int n = 0; - if (e & EVENTIO_READ) - EV_SET(&ev[n++], fd, EVFILT_READ, EV_ADD | INK_EV_EDGE_TRIGGER, 0, 0, this); - if (e & EVENTIO_WRITE) - EV_SET(&ev[n++], fd, EVFILT_WRITE, EV_ADD | INK_EV_EDGE_TRIGGER, 0, 0, this); - return kevent(l->kqueue_fd, &ev[0], n, nullptr, 0, nullptr); -#endif -} - -TS_INLINE int -EventIO::modify(int e) -{ - if (!this->syscall) { - return 0; - } - - ink_assert(event_loop); -#if TS_USE_EPOLL && !defined(USE_EDGE_TRIGGER) - struct epoll_event ev; - memset(&ev, 0, sizeof(ev)); - int new_events = events, old_events = events; - if (e < 0) - new_events &= ~(-e); - else - new_events |= e; - events = new_events; - ev.events = new_events; - ev.data.ptr = this; - if (!new_events) - return epoll_ctl(event_loop->epoll_fd, EPOLL_CTL_DEL, fd, &ev); - else if (!old_events) - return epoll_ctl(event_loop->epoll_fd, EPOLL_CTL_ADD, fd, &ev); - else - return epoll_ctl(event_loop->epoll_fd, EPOLL_CTL_MOD, fd, &ev); -#endif -#if TS_USE_KQUEUE && !defined(USE_EDGE_TRIGGER) - int n = 0; - struct kevent ev[2]; - int ee = events; - if (e < 0) { - ee &= ~(-e); - if ((-e) & EVENTIO_READ) - EV_SET(&ev[n++], fd, EVFILT_READ, EV_DELETE, 0, 0, this); - if ((-e) & EVENTIO_WRITE) - EV_SET(&ev[n++], fd, EVFILT_WRITE, EV_DELETE, 0, 0, this); - } else { - ee |= e; - if (e & EVENTIO_READ) - EV_SET(&ev[n++], fd, EVFILT_READ, EV_ADD | INK_EV_EDGE_TRIGGER, 0, 0, this); - if (e & EVENTIO_WRITE) - EV_SET(&ev[n++], fd, EVFILT_WRITE, EV_ADD | INK_EV_EDGE_TRIGGER, 0, 0, this); - } - events = ee; - if (n) - return kevent(event_loop->kqueue_fd, &ev[0], n, nullptr, 0, nullptr); - else - return 0; -#endif - (void)e; // ATS_UNUSED - return 0; -} - -TS_INLINE int -EventIO::refresh(int e) -{ - if (!this->syscall) { - return 0; - } - - ink_assert(event_loop); -#if TS_USE_KQUEUE && defined(USE_EDGE_TRIGGER) - e = e & events; - struct kevent ev[2]; - int n = 0; - if (e & EVENTIO_READ) - EV_SET(&ev[n++], fd, EVFILT_READ, EV_ADD | INK_EV_EDGE_TRIGGER, 0, 0, this); - if (e & EVENTIO_WRITE) - EV_SET(&ev[n++], fd, EVFILT_WRITE, EV_ADD | INK_EV_EDGE_TRIGGER, 0, 0, this); - if (n) - return kevent(event_loop->kqueue_fd, &ev[0], n, nullptr, 0, nullptr); - else - return 0; -#endif - (void)e; // ATS_UNUSED - return 0; -} - -TS_INLINE int -EventIO::stop() -{ - if (!this->syscall) { - return 0; - } - if (event_loop) { - int retval = 0; -#if TS_USE_EPOLL - struct epoll_event ev; - memset(&ev, 0, sizeof(struct epoll_event)); - ev.events = EPOLLIN | EPOLLOUT | EPOLLET; - retval = epoll_ctl(event_loop->epoll_fd, EPOLL_CTL_DEL, fd, &ev); -#endif - event_loop = nullptr; - return retval; - } - return 0; -} - -TS_INLINE int -NetHandler::startIO(NetEvent *ne) -{ - ink_assert(this->mutex->thread_holding == this_ethread()); - ink_assert(ne->get_thread() == this_ethread()); - int res = 0; - - PollDescriptor *pd = get_PollDescriptor(this->thread); - if (ne->ep.start(pd, ne, EVENTIO_READ | EVENTIO_WRITE) < 0) { - res = errno; - // EEXIST should be ok, though it should have been cleared before we got back here - if (errno != EEXIST) { - Debug("iocore_net", "NetHandler::startIO : failed on EventIO::start, errno = [%d](%s)", errno, strerror(errno)); - return -res; - } - } - - if (ne->read.triggered == 1) { - read_ready_list.enqueue(ne); - } - ne->nh = this; - return res; -} - -TS_INLINE void -NetHandler::stopIO(NetEvent *ne) -{ - ink_release_assert(ne->nh == this); - - ne->ep.stop(); - - read_ready_list.remove(ne); - write_ready_list.remove(ne); - if (ne->read.in_enabled_list) { - read_enable_list.remove(ne); - ne->read.in_enabled_list = 0; - } - if (ne->write.in_enabled_list) { - write_enable_list.remove(ne); - ne->write.in_enabled_list = 0; - } - - ne->nh = nullptr; -} - -TS_INLINE void -NetHandler::startCop(NetEvent *ne) -{ - ink_assert(this->mutex->thread_holding == this_ethread()); - ink_release_assert(ne->nh == this); - ink_assert(!open_list.in(ne)); - - open_list.enqueue(ne); -} - -TS_INLINE void -NetHandler::stopCop(NetEvent *ne) -{ - ink_release_assert(ne->nh == this); - - open_list.remove(ne); - cop_list.remove(ne); - remove_from_keep_alive_queue(ne); - remove_from_active_queue(ne); -} diff --git a/iocore/net/P_UnixPollDescriptor.h b/iocore/net/P_UnixPollDescriptor.h index 1ae606644bb..d4c981f4b4c 100644 --- a/iocore/net/P_UnixPollDescriptor.h +++ b/iocore/net/P_UnixPollDescriptor.h @@ -56,7 +56,30 @@ struct PollDescriptor { int kqueue_fd; #endif - PollDescriptor() { init(); } + PollDescriptor() + { + result = 0; +#if TS_USE_EPOLL + nfds = 0; + epoll_fd = epoll_create(POLL_DESCRIPTOR_SIZE); + memset(ePoll_Triggered_Events, 0, sizeof(ePoll_Triggered_Events)); + memset(pfd, 0, sizeof(pfd)); +#endif +#if TS_USE_KQUEUE + kqueue_fd = kqueue(); + memset(kq_Triggered_Events, 0, sizeof(kq_Triggered_Events)); +#endif + } + + virtual ~PollDescriptor() + { +#if TS_USE_EPOLL + close(epoll_fd); +#endif +#if TS_USE_KQUEUE + close(kqueue_fd); +#endif + } #if TS_USE_EPOLL #define get_ev_port(a) ((a)->epoll_fd) #define get_ev_events(a, x) ((a)->ePoll_Triggered_Events[(x)].events) @@ -103,21 +126,4 @@ struct PollDescriptor { return nullptr; #endif } - -private: - void - init() - { - result = 0; -#if TS_USE_EPOLL - nfds = 0; - epoll_fd = epoll_create(POLL_DESCRIPTOR_SIZE); - memset(ePoll_Triggered_Events, 0, sizeof(ePoll_Triggered_Events)); - memset(pfd, 0, sizeof(pfd)); -#endif -#if TS_USE_KQUEUE - kqueue_fd = kqueue(); - memset(kq_Triggered_Events, 0, sizeof(kq_Triggered_Events)); -#endif - } }; diff --git a/iocore/net/PollCont.cc b/iocore/net/PollCont.cc new file mode 100644 index 00000000000..3996121c6e0 --- /dev/null +++ b/iocore/net/PollCont.cc @@ -0,0 +1,95 @@ +/**@file + + A brief file description + + @section license License + + Licensed to the Apache Software + Foundation(ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ + +#include "PollCont.h" +#include "P_Net.h" + +PollCont::PollCont(Ptr &m, int pt) + : Continuation(m.get()), net_handler(nullptr), nextPollDescriptor(nullptr), poll_timeout(pt) +{ + pollDescriptor = new PollDescriptor(); + SET_HANDLER(&PollCont::pollEvent); +} + +PollCont::PollCont(Ptr &m, NetHandler *nh, int pt) + : Continuation(m.get()), net_handler(nh), nextPollDescriptor(nullptr), poll_timeout(pt) +{ + pollDescriptor = new PollDescriptor(); + SET_HANDLER(&PollCont::pollEvent); +} + +PollCont::~PollCont() +{ + delete pollDescriptor; + if (nextPollDescriptor != nullptr) { + delete nextPollDescriptor; + } +} + +// +// PollCont continuation which does the epoll_wait +// and stores the resultant events in ePoll_Triggered_Events +// +int +PollCont::pollEvent(int, Event *) +{ + this->do_poll(-1); + return EVENT_CONT; +} + +void +PollCont::do_poll(ink_hrtime timeout) +{ + if (likely(net_handler)) { + /* checking to see whether there are connections on the ready_queue (either + * read or write) that need processing [ebalsa] */ + if (likely(!net_handler->read_ready_list.empty() || !net_handler->write_ready_list.empty() || + !net_handler->read_enable_list.empty() || !net_handler->write_enable_list.empty())) { + NetDebug("iocore_net_poll", "rrq: %d, wrq: %d, rel: %d, wel: %d", net_handler->read_ready_list.empty(), + net_handler->write_ready_list.empty(), net_handler->read_enable_list.empty(), + net_handler->write_enable_list.empty()); + poll_timeout = 0; // poll immediately returns -- we have triggered stuff + // to process right now + } else if (timeout >= 0) { + poll_timeout = ink_hrtime_to_msec(timeout); + } else { + poll_timeout = net_config_poll_timeout; + } + } +// wait for fd's to trigger, or don't wait if timeout is 0 +#if TS_USE_EPOLL + pollDescriptor->result = + epoll_wait(pollDescriptor->epoll_fd, pollDescriptor->ePoll_Triggered_Events, POLL_DESCRIPTOR_SIZE, poll_timeout); + NetDebug("v_iocore_net_poll", "[PollCont::pollEvent] epoll_fd: %d, timeout: %d, results: %d", pollDescriptor->epoll_fd, + poll_timeout, pollDescriptor->result); +#elif TS_USE_KQUEUE + struct timespec tv; + tv.tv_sec = poll_timeout / 1000; + tv.tv_nsec = 1000000 * (poll_timeout % 1000); + pollDescriptor->result = + kevent(pollDescriptor->kqueue_fd, nullptr, 0, pollDescriptor->kq_Triggered_Events, POLL_DESCRIPTOR_SIZE, &tv); + NetDebug("v_iocore_net_poll", "[PollCont::pollEvent] kqueue_fd: %d, timeout: %d, results: %d", pollDescriptor->kqueue_fd, + poll_timeout, pollDescriptor->result); +#endif +} diff --git a/iocore/net/PollCont.h b/iocore/net/PollCont.h new file mode 100644 index 00000000000..bea7da49216 --- /dev/null +++ b/iocore/net/PollCont.h @@ -0,0 +1,43 @@ +/** @file + + A brief file description + + @section license License + + Licensed to the Apache Software Foundation (ASF) under one + or more contributor license agreements. See the NOTICE file + distributed with this work for additional information + regarding copyright ownership. The ASF licenses this file + to you under the Apache License, Version 2.0 (the + "License"); you may not use this file except in compliance + with the License. You may obtain a copy of the License at + + http://www.apache.org/licenses/LICENSE-2.0 + + Unless required by applicable law or agreed to in writing, software + distributed under the License is distributed on an "AS IS" BASIS, + WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + See the License for the specific language governing permissions and + limitations under the License. + */ + +#pragma once + +#include "I_Continuation.h" +#include "I_Net.h" + +class NetHandler; +struct PollDescriptor; + +struct PollCont : public Continuation { + NetHandler *net_handler; + PollDescriptor *pollDescriptor; + PollDescriptor *nextPollDescriptor; + int poll_timeout; + + PollCont(Ptr &m, int pt = net_config_poll_timeout); + PollCont(Ptr &m, NetHandler *nh, int pt = net_config_poll_timeout); + ~PollCont() override; + int pollEvent(int, Event *); + void do_poll(ink_hrtime timeout); +}; diff --git a/iocore/net/UnixNet.cc b/iocore/net/UnixNet.cc index 8ce096b9c63..0b33378cc67 100644 --- a/iocore/net/UnixNet.cc +++ b/iocore/net/UnixNet.cc @@ -1,8 +1,8 @@ /** @file - A brief file description + A brief file description - @section license License + @section license License Licensed to the Apache Software Foundation (ASF) under one or more contributor license agreements. See the NOTICE file @@ -22,15 +22,13 @@ */ #include "P_Net.h" -#include "I_AIO.h" +#include "P_UnixNet.h" #include "tscore/ink_hrtime.h" #if TS_USE_LINUX_IO_URING #include "I_IO_URING.h" #endif -using namespace std::literals; - ink_hrtime last_throttle_warning; ink_hrtime last_shedding_warning; int net_connections_throttle; @@ -43,7 +41,24 @@ NetHandler::Config NetHandler::global_config; std::bitset::digits> NetHandler::active_thread_types; const std::bitset NetHandler::config_value_affects_per_thread_value{0x3}; -extern "C" void fd_reify(struct ev_loop *); +NetHandler * +get_NetHandler(EThread *t) +{ + return (NetHandler *)ETHREAD_GET_PTR(t, unix_netProcessor.netHandler_offset); +} + +PollCont * +get_PollCont(EThread *t) +{ + return (PollCont *)ETHREAD_GET_PTR(t, unix_netProcessor.pollCont_offset); +} + +PollDescriptor * +get_PollDescriptor(EThread *t) +{ + PollCont *p = get_PollCont(t); + return p->pollDescriptor; +} // INKqa10496 // One Inactivity cop runs on each thread once every second and @@ -137,85 +152,6 @@ class InactivityCop : public Continuation } }; -PollCont::PollCont(Ptr &m, int pt) - : Continuation(m.get()), net_handler(nullptr), nextPollDescriptor(nullptr), poll_timeout(pt) -{ - pollDescriptor = new PollDescriptor(); - SET_HANDLER(&PollCont::pollEvent); -} - -PollCont::PollCont(Ptr &m, NetHandler *nh, int pt) - : Continuation(m.get()), net_handler(nh), nextPollDescriptor(nullptr), poll_timeout(pt) -{ - pollDescriptor = new PollDescriptor(); - SET_HANDLER(&PollCont::pollEvent); -} - -PollCont::~PollCont() -{ - delete pollDescriptor; - if (nextPollDescriptor != nullptr) { - delete nextPollDescriptor; - } -} - -// -// PollCont continuation which does the epoll_wait -// and stores the resultant events in ePoll_Triggered_Events -// -int -PollCont::pollEvent(int, Event *) -{ - this->do_poll(-1); - return EVENT_CONT; -} - -void -PollCont::do_poll(ink_hrtime timeout) -{ - if (likely(net_handler)) { - /* checking to see whether there are connections on the ready_queue (either read or write) that need processing [ebalsa] */ - if (likely(!net_handler->read_ready_list.empty() || !net_handler->write_ready_list.empty() || - !net_handler->read_enable_list.empty() || !net_handler->write_enable_list.empty())) { - NetDebug("iocore_net_poll", "rrq: %d, wrq: %d, rel: %d, wel: %d", net_handler->read_ready_list.empty(), - net_handler->write_ready_list.empty(), net_handler->read_enable_list.empty(), - net_handler->write_enable_list.empty()); - poll_timeout = 0; // poll immediately returns -- we have triggered stuff to process right now - } else if (timeout >= 0) { - poll_timeout = ink_hrtime_to_msec(timeout); - } else { - poll_timeout = net_config_poll_timeout; - } - } -// wait for fd's to trigger, or don't wait if timeout is 0 -#if TS_USE_EPOLL - pollDescriptor->result = - epoll_wait(pollDescriptor->epoll_fd, pollDescriptor->ePoll_Triggered_Events, POLL_DESCRIPTOR_SIZE, poll_timeout); - NetDebug("v_iocore_net_poll", "[PollCont::pollEvent] epoll_fd: %d, timeout: %d, results: %d", pollDescriptor->epoll_fd, - poll_timeout, pollDescriptor->result); -#elif TS_USE_KQUEUE - struct timespec tv; - tv.tv_sec = poll_timeout / 1000; - tv.tv_nsec = 1000000 * (poll_timeout % 1000); - pollDescriptor->result = - kevent(pollDescriptor->kqueue_fd, nullptr, 0, pollDescriptor->kq_Triggered_Events, POLL_DESCRIPTOR_SIZE, &tv); - NetDebug("v_iocore_net_poll", "[PollCont::pollEvent] kqueue_fd: %d, timeout: %d, results: %d", pollDescriptor->kqueue_fd, - poll_timeout, pollDescriptor->result); -#endif -} - -static void -net_signal_hook_callback(EThread *thread) -{ -#if HAVE_EVENTFD - uint64_t counter; - ATS_UNUSED_RETURN(read(thread->evfd, &counter, sizeof(uint64_t))); -#else - char dummy[1024]; - ATS_UNUSED_RETURN(read(thread->evpipe[0], &dummy[0], 1024)); -#endif -} - void initialize_thread_for_net(EThread *thread) { @@ -252,524 +188,3 @@ initialize_thread_for_net(EThread *thread) nh->uring_evio.start(pd, IOUringContext::local_context()->register_eventfd(), nullptr, EVENTIO_READ); #endif } - -// NetHandler method definitions - -NetHandler::NetHandler() : Continuation(nullptr) -{ - SET_HANDLER(&NetHandler::mainNetEvent); -} - -int -NetHandler::update_nethandler_config(const char *str, RecDataT, RecData data, void *) -{ - uint32_t *updated_member = nullptr; // direct pointer to config member for update. - std::string_view name{str}; - - if (name == "proxy.config.net.max_connections_in"sv) { - updated_member = &NetHandler::global_config.max_connections_in; - Debug("net_queue", "proxy.config.net.max_connections_in updated to %" PRId64, data.rec_int); - } else if (name == "proxy.config.net.max_requests_in"sv) { - updated_member = &NetHandler::global_config.max_requests_in; - Debug("net_queue", "proxy.config.net.max_requests_in updated to %" PRId64, data.rec_int); - } else if (name == "proxy.config.net.inactive_threshold_in"sv) { - updated_member = &NetHandler::global_config.inactive_threshold_in; - Debug("net_queue", "proxy.config.net.inactive_threshold_in updated to %" PRId64, data.rec_int); - } else if (name == "proxy.config.net.transaction_no_activity_timeout_in"sv) { - updated_member = &NetHandler::global_config.transaction_no_activity_timeout_in; - Debug("net_queue", "proxy.config.net.transaction_no_activity_timeout_in updated to %" PRId64, data.rec_int); - } else if (name == "proxy.config.net.keep_alive_no_activity_timeout_in"sv) { - updated_member = &NetHandler::global_config.keep_alive_no_activity_timeout_in; - Debug("net_queue", "proxy.config.net.keep_alive_no_activity_timeout_in updated to %" PRId64, data.rec_int); - } else if (name == "proxy.config.net.default_inactivity_timeout"sv) { - updated_member = &NetHandler::global_config.default_inactivity_timeout; - Debug("net_queue", "proxy.config.net.default_inactivity_timeout updated to %" PRId64, data.rec_int); - } - - if (updated_member) { - *updated_member = data.rec_int; // do the actual update. - // portable form of the update, an index converted to so it can be passed as an event cookie. - void *idx = reinterpret_cast(static_cast(updated_member - &global_config[0])); - // Signal the NetHandler instances, passing the index of the updated config value. - for (int i = 0; i < eventProcessor.n_thread_groups; ++i) { - if (!active_thread_types[i]) { - continue; - } - for (EThread **tp = eventProcessor.thread_group[i]._thread, - **limit = eventProcessor.thread_group[i]._thread + eventProcessor.thread_group[i]._count; - tp < limit; ++tp) { - NetHandler *nh = get_NetHandler(*tp); - if (nh) { - nh->thread->schedule_imm(nh, TS_EVENT_MGMT_UPDATE, idx); - } - } - } - } - - return REC_ERR_OKAY; -} - -void -NetHandler::init_for_process() -{ - // read configuration values and setup callbacks for when they change - REC_ReadConfigInt32(global_config.max_connections_in, "proxy.config.net.max_connections_in"); - REC_ReadConfigInt32(global_config.max_requests_in, "proxy.config.net.max_requests_in"); - REC_ReadConfigInt32(global_config.inactive_threshold_in, "proxy.config.net.inactive_threshold_in"); - REC_ReadConfigInt32(global_config.transaction_no_activity_timeout_in, "proxy.config.net.transaction_no_activity_timeout_in"); - REC_ReadConfigInt32(global_config.keep_alive_no_activity_timeout_in, "proxy.config.net.keep_alive_no_activity_timeout_in"); - REC_ReadConfigInt32(global_config.default_inactivity_timeout, "proxy.config.net.default_inactivity_timeout"); - - RecRegisterConfigUpdateCb("proxy.config.net.max_connections_in", update_nethandler_config, nullptr); - RecRegisterConfigUpdateCb("proxy.config.net.max_requests_in", update_nethandler_config, nullptr); - RecRegisterConfigUpdateCb("proxy.config.net.inactive_threshold_in", update_nethandler_config, nullptr); - RecRegisterConfigUpdateCb("proxy.config.net.transaction_no_activity_timeout_in", update_nethandler_config, nullptr); - RecRegisterConfigUpdateCb("proxy.config.net.keep_alive_no_activity_timeout_in", update_nethandler_config, nullptr); - RecRegisterConfigUpdateCb("proxy.config.net.default_inactivity_timeout", update_nethandler_config, nullptr); - - Debug("net_queue", "proxy.config.net.max_connections_in updated to %d", global_config.max_connections_in); - Debug("net_queue", "proxy.config.net.max_requests_in updated to %d", global_config.max_requests_in); - Debug("net_queue", "proxy.config.net.inactive_threshold_in updated to %d", global_config.inactive_threshold_in); - Debug("net_queue", "proxy.config.net.transaction_no_activity_timeout_in updated to %d", - global_config.transaction_no_activity_timeout_in); - Debug("net_queue", "proxy.config.net.keep_alive_no_activity_timeout_in updated to %d", - global_config.keep_alive_no_activity_timeout_in); - Debug("net_queue", "proxy.config.net.default_inactivity_timeout updated to %d", global_config.default_inactivity_timeout); -} - -// -// Function used to release a NetEvent and free it. -// -void -NetHandler::free_netevent(NetEvent *ne) -{ - EThread *t = this->thread; - - ink_assert(t == this_ethread()); - ink_release_assert(ne->get_thread() == t); - ink_release_assert(ne->nh == this); - - // Release ne from InactivityCop - stopCop(ne); - // Release ne from NetHandler - stopIO(ne); - // Clear and deallocate ne - ne->free(t); -} - -// -// Move VC's enabled on a different thread to the ready list -// -void -NetHandler::process_enabled_list() -{ - NetEvent *ne = nullptr; - - SListM(NetEvent, NetState, read, enable_link) rq(read_enable_list.popall()); - while ((ne = rq.pop())) { - ne->ep.modify(EVENTIO_READ); - ne->ep.refresh(EVENTIO_READ); - ne->read.in_enabled_list = 0; - if ((ne->read.enabled && ne->read.triggered) || ne->closed) { - read_ready_list.in_or_enqueue(ne); - } - } - - SListM(NetEvent, NetState, write, enable_link) wq(write_enable_list.popall()); - while ((ne = wq.pop())) { - ne->ep.modify(EVENTIO_WRITE); - ne->ep.refresh(EVENTIO_WRITE); - ne->write.in_enabled_list = 0; - if ((ne->write.enabled && ne->write.triggered) || ne->closed) { - write_ready_list.in_or_enqueue(ne); - } - } -} - -// -// Walk through the ready list -// -void -NetHandler::process_ready_list() -{ - NetEvent *ne = nullptr; - -#if defined(USE_EDGE_TRIGGER) - // NetEvent * - while ((ne = read_ready_list.dequeue())) { - // Initialize the thread-local continuation flags - set_cont_flags(ne->get_control_flags()); - if (ne->closed) { - free_netevent(ne); - } else if (ne->read.enabled && ne->read.triggered) { - ne->net_read_io(this, this->thread); - } else if (!ne->read.enabled) { - read_ready_list.remove(ne); - } - } - while ((ne = write_ready_list.dequeue())) { - set_cont_flags(ne->get_control_flags()); - if (ne->closed) { - free_netevent(ne); - } else if (ne->write.enabled && ne->write.triggered) { - ne->net_write_io(this, this->thread); - } else if (!ne->write.enabled) { - write_ready_list.remove(ne); - } - } -#else /* !USE_EDGE_TRIGGER */ - while ((ne = read_ready_list.dequeue())) { - set_cont_flags(ne->get_control_flags()); - if (ne->closed) - free_netevent(ne); - else if (ne->read.enabled && ne->read.triggered) - ne->net_read_io(this, this->thread); - else if (!ne->read.enabled) - ne->ep.modify(-EVENTIO_READ); - } - while ((ne = write_ready_list.dequeue())) { - set_cont_flags(ne->get_control_flags()); - if (ne->closed) - free_netevent(ne); - else if (ne->write.enabled && ne->write.triggered) - write_to_net(this, ne, this->thread); - else if (!ne->write.enabled) - ne->ep.modify(-EVENTIO_WRITE); - } -#endif /* !USE_EDGE_TRIGGER */ -} - -// -// The main event for NetHandler -int -NetHandler::mainNetEvent(int event, Event *e) -{ - if (TS_EVENT_MGMT_UPDATE == event) { - intptr_t idx = reinterpret_cast(e->cookie); - // Copy to the same offset in the instance struct. - config[idx] = global_config[idx]; - if (config_value_affects_per_thread_value[idx]) { - this->configure_per_thread_values(); - } - return EVENT_CONT; - } else { - ink_assert(trigger_event == e && (event == EVENT_INTERVAL || event == EVENT_POLL)); - return this->waitForActivity(-1); - } -} - -int -NetHandler::waitForActivity(ink_hrtime timeout) -{ - EventIO *epd = nullptr; -#if TS_USE_LINUX_IO_URING - IOUringContext *ur = IOUringContext::local_context(); - bool servicedh = false; -#endif - - NET_INCREMENT_DYN_STAT(net_handler_run_stat); - SCOPED_MUTEX_LOCK(lock, mutex, this->thread); - - process_enabled_list(); - -#if TS_USE_LINUX_IO_URING - ur->submit(); -#endif - - // Polling event by PollCont - PollCont *p = get_PollCont(this->thread); - p->do_poll(timeout); - - // Get & Process polling result - PollDescriptor *pd = get_PollDescriptor(this->thread); - NetEvent *ne = nullptr; - for (int x = 0; x < pd->result; x++) { - epd = static_cast get_ev_data(pd, x); - if (epd->type == EVENTIO_READWRITE_VC) { - ne = epd->data.ne; - // Remove triggered NetEvent from cop_list because it won't be timeout before next InactivityCop runs. - if (cop_list.in(ne)) { - cop_list.remove(ne); - } - int flags = get_ev_events(pd, x); - if (flags & (EVENTIO_ERROR)) { - ne->set_error_from_socket(); - } - if (flags & (EVENTIO_READ)) { - ne->read.triggered = 1; - if (!read_ready_list.in(ne)) { - read_ready_list.enqueue(ne); - } - } - if (flags & (EVENTIO_WRITE)) { - ne->write.triggered = 1; - if (!write_ready_list.in(ne)) { - write_ready_list.enqueue(ne); - } - } else if (!(flags & (EVENTIO_READ))) { - Debug("iocore_net_main", "Unhandled epoll event: 0x%04x", flags); - // In practice we sometimes see EPOLLERR and EPOLLHUP through there - // Anything else would be surprising - ink_assert((flags & ~(EVENTIO_ERROR)) == 0); - ne->write.triggered = 1; - if (!write_ready_list.in(ne)) { - write_ready_list.enqueue(ne); - } - } - } else if (epd->type == EVENTIO_DNS_CONNECTION) { - if (epd->data.dnscon != nullptr) { - epd->data.dnscon->trigger(); // Make sure the DNSHandler for this con knows we triggered -#if defined(USE_EDGE_TRIGGER) - epd->refresh(EVENTIO_READ); -#endif - } - } else if (epd->type == EVENTIO_ASYNC_SIGNAL) { - net_signal_hook_callback(this->thread); - } else if (epd->type == EVENTIO_NETACCEPT) { - this->thread->schedule_imm(epd->data.na); -#if TS_USE_LINUX_IO_URING - } else if (epd->type == EVENTIO_IO_URING) { - servicedh = true; -#endif - } - ev_next_event(pd, x); - } - - pd->result = 0; - - process_ready_list(); - -#if TS_USE_LINUX_IO_URING - if (servicedh) { - ur->service(); - } -#endif - - return EVENT_CONT; -} - -void -NetHandler::signalActivity() -{ -#if HAVE_EVENTFD - uint64_t counter = 1; - ATS_UNUSED_RETURN(write(thread->evfd, &counter, sizeof(uint64_t))); -#else - char dummy = 1; - ATS_UNUSED_RETURN(write(thread->evpipe[1], &dummy, 1)); -#endif -} - -bool -NetHandler::manage_active_queue(NetEvent *enabling_ne, bool ignore_queue_size = false) -{ - const int total_connections_in = active_queue_size + keep_alive_queue_size; - Debug("v_net_queue", - "max_connections_per_thread_in: %d max_requests_per_thread_in: %d total_connections_in: %d " - "active_queue_size: %d keep_alive_queue_size: %d", - max_connections_per_thread_in, max_requests_per_thread_in, total_connections_in, active_queue_size, keep_alive_queue_size); - - if (!max_requests_per_thread_in) { - // active queue has no max - return true; - } - - if (ignore_queue_size == false && max_requests_per_thread_in > active_queue_size) { - return true; - } - - ink_hrtime now = Thread::get_hrtime(); - - // loop over the non-active connections and try to close them - NetEvent *ne = active_queue.head; - NetEvent *ne_next = nullptr; - int closed = 0; - int handle_event = 0; - int total_idle_time = 0; - int total_idle_count = 0; - for (; ne != nullptr; ne = ne_next) { - ne_next = ne->active_queue_link.next; - // It seems dangerous closing the current ne at this point - // Let the activity_cop deal with it - if (ne == enabling_ne) { - continue; - } - if ((ne->next_inactivity_timeout_at && ne->next_inactivity_timeout_at <= now) || - (ne->next_activity_timeout_at && ne->next_activity_timeout_at <= now)) { - _close_ne(ne, now, handle_event, closed, total_idle_time, total_idle_count); - } - if (ignore_queue_size == false && max_requests_per_thread_in > active_queue_size) { - return true; - } - } - - if (max_requests_per_thread_in > active_queue_size) { - return true; - } - - return false; // failed to make room in the queue, all connections are active -} - -void -NetHandler::configure_per_thread_values() -{ - // figure out the number of threads and calculate the number of connections per thread - int threads = eventProcessor.thread_group[ET_NET]._count; - max_connections_per_thread_in = config.max_connections_in / threads; - max_requests_per_thread_in = config.max_requests_in / threads; - Debug("net_queue", "max_connections_per_thread_in updated to %d threads: %d", max_connections_per_thread_in, threads); - Debug("net_queue", "max_requests_per_thread_in updated to %d threads: %d", max_requests_per_thread_in, threads); -} - -void -NetHandler::manage_keep_alive_queue() -{ - uint32_t total_connections_in = active_queue_size + keep_alive_queue_size; - ink_hrtime now = Thread::get_hrtime(); - - Debug("v_net_queue", "max_connections_per_thread_in: %d total_connections_in: %d active_queue_size: %d keep_alive_queue_size: %d", - max_connections_per_thread_in, total_connections_in, active_queue_size, keep_alive_queue_size); - - if (!max_connections_per_thread_in || total_connections_in <= max_connections_per_thread_in) { - return; - } - - // loop over the non-active connections and try to close them - NetEvent *ne_next = nullptr; - int closed = 0; - int handle_event = 0; - int total_idle_time = 0; - int total_idle_count = 0; - for (NetEvent *ne = keep_alive_queue.head; ne != nullptr; ne = ne_next) { - ne_next = ne->keep_alive_queue_link.next; - _close_ne(ne, now, handle_event, closed, total_idle_time, total_idle_count); - - total_connections_in = active_queue_size + keep_alive_queue_size; - if (total_connections_in <= max_connections_per_thread_in) { - break; - } - } - - if (total_idle_count > 0) { - Debug("net_queue", "max cons: %d active: %d idle: %d already closed: %d, close event: %d mean idle: %d", - max_connections_per_thread_in, total_connections_in, keep_alive_queue_size, closed, handle_event, - total_idle_time / total_idle_count); - } -} - -void -NetHandler::_close_ne(NetEvent *ne, ink_hrtime now, int &handle_event, int &closed, int &total_idle_time, int &total_idle_count) -{ - if (ne->get_thread() != this_ethread()) { - return; - } - MUTEX_TRY_LOCK(lock, ne->get_mutex(), this_ethread()); - if (!lock.is_locked()) { - return; - } - ink_hrtime diff = (now - (ne->next_inactivity_timeout_at - ne->inactivity_timeout_in)) / HRTIME_SECOND; - if (diff > 0) { - total_idle_time += diff; - ++total_idle_count; - NET_SUM_DYN_STAT(keep_alive_queue_timeout_total_stat, diff); - NET_INCREMENT_DYN_STAT(keep_alive_queue_timeout_count_stat); - } - Debug("net_queue", "closing connection NetEvent=%p idle: %u now: %" PRId64 " at: %" PRId64 " in: %" PRId64 " diff: %" PRId64, ne, - keep_alive_queue_size, ink_hrtime_to_sec(now), ink_hrtime_to_sec(ne->next_inactivity_timeout_at), - ink_hrtime_to_sec(ne->inactivity_timeout_in), diff); - if (ne->closed) { - free_netevent(ne); - ++closed; - } else { - ne->next_inactivity_timeout_at = now; - // create a dummy event - Event event; - event.ethread = this_ethread(); - if (ne->inactivity_timeout_in && ne->next_inactivity_timeout_at <= now) { - if (ne->callback(VC_EVENT_INACTIVITY_TIMEOUT, &event) == EVENT_DONE) { - ++handle_event; - } - } else if (ne->active_timeout_in && ne->next_activity_timeout_at <= now) { - if (ne->callback(VC_EVENT_ACTIVE_TIMEOUT, &event) == EVENT_DONE) { - ++handle_event; - } - } - } -} - -void -NetHandler::add_to_keep_alive_queue(NetEvent *ne) -{ - Debug("net_queue", "NetEvent: %p", ne); - ink_assert(mutex->thread_holding == this_ethread()); - - if (keep_alive_queue.in(ne)) { - // already in the keep-alive queue, move the head - keep_alive_queue.remove(ne); - } else { - // in the active queue or no queue, new to this queue - remove_from_active_queue(ne); - ++keep_alive_queue_size; - } - keep_alive_queue.enqueue(ne); - - // if keep-alive queue is over size then close connections - manage_keep_alive_queue(); -} - -void -NetHandler::remove_from_keep_alive_queue(NetEvent *ne) -{ - Debug("net_queue", "NetEvent: %p", ne); - ink_assert(mutex->thread_holding == this_ethread()); - - if (keep_alive_queue.in(ne)) { - keep_alive_queue.remove(ne); - --keep_alive_queue_size; - } -} - -bool -NetHandler::add_to_active_queue(NetEvent *ne) -{ - Debug("net_queue", "NetEvent: %p", ne); - Debug("net_queue", "max_connections_per_thread_in: %d active_queue_size: %d keep_alive_queue_size: %d", - max_connections_per_thread_in, active_queue_size, keep_alive_queue_size); - ink_assert(mutex->thread_holding == this_ethread()); - - bool active_queue_full = false; - - // if active queue is over size then close inactive connections - if (manage_active_queue(ne) == false) { - active_queue_full = true; - } - - if (active_queue.in(ne)) { - // already in the active queue, move the head - active_queue.remove(ne); - } else { - if (active_queue_full) { - // there is no room left in the queue - NET_SUM_DYN_STAT(net_requests_max_throttled_in_stat, 1); - return false; - } - // in the keep-alive queue or no queue, new to this queue - remove_from_keep_alive_queue(ne); - ++active_queue_size; - } - active_queue.enqueue(ne); - - return true; -} - -void -NetHandler::remove_from_active_queue(NetEvent *ne) -{ - Debug("net_queue", "NetEvent: %p", ne); - ink_assert(mutex->thread_holding == this_ethread()); - - if (active_queue.in(ne)) { - active_queue.remove(ne); - --active_queue_size; - } -} diff --git a/iocore/net/UnixNetProcessor.cc b/iocore/net/UnixNetProcessor.cc index 1f96c993ed8..511274d8cef 100644 --- a/iocore/net/UnixNetProcessor.cc +++ b/iocore/net/UnixNetProcessor.cc @@ -49,29 +49,6 @@ net_next_connection_number() NetProcessor::AcceptOptions const NetProcessor::DEFAULT_ACCEPT_OPTIONS; -NetProcessor::AcceptOptions & -NetProcessor::AcceptOptions::reset() -{ - local_port = 0; - local_ip.invalidate(); - accept_threads = -1; - ip_family = AF_INET; - etype = ET_NET; - localhost_only = false; - frequent_accept = true; - recv_bufsize = 0; - send_bufsize = 0; - sockopt_flags = 0; - packet_mark = 0; - packet_tos = 0; - packet_notsent_lowat = 0; - tfo_queue_length = 0; - f_inbound_transparent = false; - f_mptcp = false; - f_proxy_protocol = false; - return *this; -} - Action * UnixNetProcessor::accept(Continuation *cont, AcceptOptions const &opt) { diff --git a/iocore/net/UnixUDPNet.cc b/iocore/net/UnixUDPNet.cc index 9f4e579945e..290586d54cf 100644 --- a/iocore/net/UnixUDPNet.cc +++ b/iocore/net/UnixUDPNet.cc @@ -34,6 +34,7 @@ #define __APPLE_USE_RFC_3542 #endif +#include "P_DNSConnection.h" #include "P_Net.h" #include "P_UDPNet.h" diff --git a/iocore/net/test_I_UDPNet.cc b/iocore/net/test_I_UDPNet.cc index f752f0e34f1..b05eb7b65d9 100644 --- a/iocore/net/test_I_UDPNet.cc +++ b/iocore/net/test_I_UDPNet.cc @@ -34,6 +34,7 @@ #include "I_UDPNet.h" #include "I_UDPPacket.h" #include "I_UDPConnection.h" +#include "P_UDPConnection.h" #include "diags.i" diff --git a/proxy/http/HttpDebugNames.cc b/proxy/http/HttpDebugNames.cc index c1c5db04d7b..c8bf7f45969 100644 --- a/proxy/http/HttpDebugNames.cc +++ b/proxy/http/HttpDebugNames.cc @@ -21,6 +21,7 @@ limitations under the License. */ +#include "I_DNSProcessor.h" #include "HttpDebugNames.h" #include "P_EventSystem.h" #include "StatPages.h" diff --git a/src/traffic_server/InkIOCoreAPI.cc b/src/traffic_server/InkIOCoreAPI.cc index 88e0012b392..f73c9749c9e 100644 --- a/src/traffic_server/InkIOCoreAPI.cc +++ b/src/traffic_server/InkIOCoreAPI.cc @@ -34,6 +34,7 @@ #include "I_Net.h" #include "I_Cache.h" #include "I_HostDB.h" +#include "P_UnixUDPConnection.h" // This assert is for internal API use only. #if TS_USE_FAST_SDK